Revert "Add formatters module"

Reverting again, apparently I had a misunderstanding of how to revert. Trying again.
This reverts commit dca4021dd7.
This commit is contained in:
Chris Howell 2020-08-21 12:21:12 -07:00
parent d75ad8c402
commit 1e9b2c7727
7 changed files with 50 additions and 335 deletions

View File

@ -24,10 +24,10 @@ def get_test_suite():
setuptools.setup(
name="youtube_transcript_api",
version="0.3.0",
version="0.3.1",
author="Jonas Depoix",
author_email="jonas.depoix@web.de",
description="This is an python API which allows you to get the transcripts/subtitles for a given YouTube video. It also works for automatically generated subtitles and it does not require a headless browser, like other selenium based solutions do!",
description="This is an python API which allows you to get the transcripts/subtitles for a given YouTube video. It also works for automatically generated subtitles, supports translating subtitles and it does not require a headless browser, like other selenium based solutions do!",
long_description=get_long_description(),
long_description_content_type="text/markdown",
keywords="youtube-api subtitles youtube transcripts transcript subtitle youtube-subtitles youtube-transcripts cli",

View File

@ -12,8 +12,6 @@ from ._errors import (
CookiePathInvalid,
CookiesInvalid
)
from .formatters import formats
class YouTubeTranscriptApi():
@classmethod
@ -72,8 +70,7 @@ class YouTubeTranscriptApi():
return TranscriptListFetcher(http_client).fetch(video_id)
@classmethod
def get_transcripts(cls, video_ids, languages=('en',),
continue_after_error=False, proxies=None, cookies=None, format=None):
def get_transcripts(cls, video_ids, languages=('en',), continue_after_error=False, proxies=None, cookies=None):
"""
Retrieves the transcripts for a list of videos.
@ -99,8 +96,7 @@ class YouTubeTranscriptApi():
for video_id in video_ids:
try:
data[video_id] = cls.get_transcript(video_id, languages,
proxies, cookies, format=format)
data[video_id] = cls.get_transcript(video_id, languages, proxies, cookies)
except Exception as exception:
if not continue_after_error:
raise exception
@ -110,8 +106,7 @@ class YouTubeTranscriptApi():
return data, unretrievable_videos
@classmethod
def get_transcript(cls, video_id, languages=('en',), proxies=None,
cookies=None, format=None):
def get_transcript(cls, video_id, languages=('en',), proxies=None, cookies=None):
"""
Retrieves the transcript for a single video. This is just a shortcut for calling::
@ -130,11 +125,8 @@ class YouTubeTranscriptApi():
:return: a list of dictionaries containing the 'text', 'start' and 'duration' keys
:rtype [{'text': str, 'start': float, 'end': float}]:
"""
Formatter = formats.get_formatter(format)
transcript = cls.list_transcripts(
video_id,proxies, cookies).find_transcript(languages).fetch()
return Formatter.format(transcript)
return cls.list_transcripts(video_id, proxies, cookies).find_transcript(languages).fetch()
@classmethod
def _load_cookies(cls, cookies, video_id):
cookie_jar = {}

View File

@ -1,9 +1,10 @@
import json
import pprint
import argparse
from ._api import YouTubeTranscriptApi
from .formatters import formats
class YouTubeTranscriptCli():
@ -25,24 +26,19 @@ class YouTubeTranscriptCli():
transcripts = []
exceptions = []
Formatter = formats.get_formatter(parsed_args.format)
for video_id in parsed_args.video_ids:
try:
transcript = self._fetch_transcript(
parsed_args, proxies, cookies, video_id)
transcripts.append(Formatter.format(transcript))
transcripts.append(self._fetch_transcript(parsed_args, proxies, cookies, video_id))
except Exception as exception:
exceptions.append(exception)
return ''.join(
return '\n\n'.join(
[str(exception) for exception in exceptions]
+ ([Formatter.combine(transcripts)] if transcripts else [])
+ ([json.dumps(transcripts) if parsed_args.json else pprint.pformat(transcripts)] if transcripts else [])
)
def _fetch_transcript(self, parsed_args, proxies, cookies, video_id):
transcript_list = YouTubeTranscriptApi.list_transcripts(
video_id, proxies=proxies, cookies=cookies)
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, proxies=proxies, cookies=cookies)
if parsed_args.list_transcripts:
return str(transcript_list)
@ -102,9 +98,11 @@ class YouTubeTranscriptCli():
help='If this flag is set transcripts which have been manually created will not be retrieved.',
)
parser.add_argument(
'--format',
default=None,
help="Use this flag to set which parser format to use, default is 'json'",
'--json',
action='store_const',
const=True,
default=False,
help='If this flag is set the output will be JSON formatted.',
)
parser.add_argument(
'--translate',

View File

@ -1,174 +0,0 @@
from collections import defaultdict
import json
import re
def parse_timecode(time):
"""Converts a `time` into a formatted transcript timecode.
:param time: a float representing time in seconds.
:type time: float
:return: a string formatted as a timecode, 'HH:MM:SS,MS'
:rtype str
:example:
>>> parse_timecode(6.93)
'00:00:06,930'
"""
time = float(time)
times = {
'hours': str(int(time) // 3600).rjust(2, '0'),
'mins': str(int(time) // 60).rjust(2, '0'),
'secs': str(int(time) % 60).rjust(2, '0'),
'ms': str(int(round((time - int(time))*1000, 2))).rjust(3, '0')
}
return "{hours}:{mins}:{secs},{ms}".format(**times)
class TranscriptFormatter(object):
"""Abstract Base TranscriptFormatter class
This class should be inherited from to create additional
custom transcript formatters.
"""
HTML_TAG_REGEX = re.compile(r'<[^>]*>', re.IGNORECASE)
DELIMITER = ''
@classmethod
def combine(cls, transcripts):
"""Subclass may override this class method.
Default behavior of this method will ''.join() the str()
of each transcript in transcripts.
:param transcripts: a list of many transcripts
:type transcript_data: list[<formatted transcript>, ...]
:return: A string joined on the `cls.DELIMITER` to combine transcripts
:rtype: str
"""
return cls.DELIMITER.join(
str(transcript) for transcript in transcripts)
@classmethod
def format(cls, transcript_data):
"""Any subclass must implement this format class method.
:param transcript_data: a list of transcripts, 1 or more.
:type transcript_data: list[list[dict], list[dict]]
:return: A list where each item is an individual transcript
as a string.
:rtype: list[str]
"""
raise NotImplementedError(
cls.__name__ + '.format'
)
class JSONTranscriptFormatter(TranscriptFormatter):
"""Formatter for outputting JSON data"""
DELIMITER = ','
@classmethod
def combine(cls, transcripts):
return json.dumps(transcripts)
@classmethod
def format(cls, transcript_data):
return transcript_data
class TextTranscriptFormatter(TranscriptFormatter):
"""Formatter for outputting a Plain Text Format
Converts the fetched transcript data into separated lines of
plain text separated by newline breaks (\n) with no timecodes.
"""
DELIMITER = '\n\n'
@classmethod
def format(cls, transcript_data):
return '{}\n'.format('\n'.join(
line['text']for line in transcript_data))
class SRTTranscriptFormatter(TranscriptFormatter):
"""Formatter for outputting the SRT Format
Converts the fetched transcript data into a simple .srt file format.
"""
DELIMITER = '\n\n'
@classmethod
def format(cls, transcript_data):
output = []
for frame, item in enumerate(transcript_data, start=1):
start_time = float(item.get('start'))
duration = float(item.get('duration', '0.0'))
output.append("{frame}\n".format(frame=frame))
output.append("{start_time} --> {end_time}\n".format(
start_time=parse_timecode(start_time),
end_time=parse_timecode(start_time + duration)
))
output.append("{text}".format(text=item.get('text')))
if frame < len(transcript_data):
output.append('\n\n')
return '{}\n'.format(''.join(output))
class TranscriptFormatterFactory(object):
"""A Transcript Class Factory
Allows for adding additional custom Transcript classes for the API
to use. Custom Transcript classes must inherit from the
TranscriptFormatter abstract base class.
"""
def __init__(self):
self._formatters = defaultdict(JSONTranscriptFormatter)
def add_formatter(self, name, formatter_class):
"""Allows for creating additional transcript formatters.
:param name: a name given to the `formatter_class`
:type name: str
:param formatter_class: a subclass of TranscriptFormatter
:type formatter_class: class
:rtype None
"""
if not issubclass(formatter_class, TranscriptFormatter):
raise TypeError((
'{0} must be a subclass of TranscriptFormatter'
).format(formatter_class)
)
self._formatters.update({name: formatter_class})
def add_formatters(self, formatters_dict):
"""Allow creation of multiple transcript formatters at a time.
:param formatters_dict: key(s) are the string name to be given
to the formatter class, value for each key should be a subclass
of TranscriptFormatter.
:type formatters_dict: dict
:rtype None
"""
for name, formatter_class in formatters_dict.items():
self.add_formatter(name, formatter_class)
def get_formatter(self, name):
"""Retrieve a formatter class by its assigned name.
:param name: the string name given to the formatter class.
:type name: str
:return: a subclass of `TranscriptFormatter`
"""
return self._formatters[name]
formats = TranscriptFormatterFactory()
formats.add_formatters({
'json': JSONTranscriptFormatter,
'srt': SRTTranscriptFormatter,
'text': TextTranscriptFormatter
})

View File

@ -1,6 +1,6 @@
from unittest import TestCase
from mock import patch
import json
import os
import requests
@ -21,10 +21,7 @@ from youtube_transcript_api import (
def load_asset(filename):
filepath = '{dirname}/assets/{filename}'.format(
dirname=os.path.dirname(__file__), filename=filename)
with open(filepath, 'r', encoding='utf-8') as file:
with open('{dirname}/assets/{filename}'.format(dirname=os.path.dirname(__file__), filename=filename)) as file:
return file.read()
@ -161,7 +158,7 @@ class TestYouTubeTranscriptApi(TestCase):
def test_get_transcript__with_proxy(self):
proxies = {'http': '', 'https:': ''}
transcript = YouTubeTranscriptApi.get_transcript(
'GJLlxj_dtq8', proxies=proxies, format=None
'GJLlxj_dtq8', proxies=proxies
)
self.assertEqual(
transcript,
@ -194,8 +191,8 @@ class TestYouTubeTranscriptApi(TestCase):
YouTubeTranscriptApi.get_transcripts([video_id_1, video_id_2], languages=languages)
mock_get_transcript.assert_any_call(video_id_1, languages, None, None, format=None)
mock_get_transcript.assert_any_call(video_id_2, languages, None, None, format=None)
mock_get_transcript.assert_any_call(video_id_1, languages, None, None)
mock_get_transcript.assert_any_call(video_id_2, languages, None, None)
self.assertEqual(mock_get_transcript.call_count, 2)
@patch('youtube_transcript_api.YouTubeTranscriptApi.get_transcript', side_effect=Exception('Error'))
@ -210,20 +207,20 @@ class TestYouTubeTranscriptApi(TestCase):
YouTubeTranscriptApi.get_transcripts(['video_id_1', 'video_id_2'], continue_after_error=True)
mock_get_transcript.assert_any_call(video_id_1, ('en',), None, None, format=None)
mock_get_transcript.assert_any_call(video_id_2, ('en',), None, None, format=None)
mock_get_transcript.assert_any_call(video_id_1, ('en',), None, None)
mock_get_transcript.assert_any_call(video_id_2, ('en',), None, None)
@patch('youtube_transcript_api.YouTubeTranscriptApi.get_transcript')
def test_get_transcripts__with_cookies(self, mock_get_transcript):
cookies = '/example_cookies.txt'
YouTubeTranscriptApi.get_transcripts(['GJLlxj_dtq8'], cookies=cookies)
mock_get_transcript.assert_any_call('GJLlxj_dtq8', ('en',), None, cookies, format=None)
mock_get_transcript.assert_any_call('GJLlxj_dtq8', ('en',), None, cookies)
@patch('youtube_transcript_api.YouTubeTranscriptApi.get_transcript')
def test_get_transcripts__with_proxies(self, mock_get_transcript):
proxies = {'http': '', 'https:': ''}
YouTubeTranscriptApi.get_transcripts(['GJLlxj_dtq8'], proxies=proxies)
mock_get_transcript.assert_any_call('GJLlxj_dtq8', ('en',), proxies, None, format=None)
mock_get_transcript.assert_any_call('GJLlxj_dtq8', ('en',), proxies, None)
def test_load_cookies(self):
dirname, filename = os.path.split(os.path.abspath(__file__))

View File

@ -25,50 +25,50 @@ class TestYouTubeTranscriptCli(TestCase):
YouTubeTranscriptApi.list_transcripts = MagicMock(return_value=self.transcript_list_mock)
def test_argument_parsing(self):
parsed_args = YouTubeTranscriptCli('v1 v2 --format json --languages de en'.split())._parse_args()
parsed_args = YouTubeTranscriptCli('v1 v2 --json --languages de en'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.http_proxy, '')
self.assertEqual(parsed_args.https_proxy, '')
parsed_args = YouTubeTranscriptCli('v1 v2 --languages de en --format json'.split())._parse_args()
parsed_args = YouTubeTranscriptCli('v1 v2 --languages de en --json'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.http_proxy, '')
self.assertEqual(parsed_args.https_proxy, '')
parsed_args = YouTubeTranscriptCli(' --format json v1 v2 --languages de en'.split())._parse_args()
parsed_args = YouTubeTranscriptCli(' --json v1 v2 --languages de en'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.http_proxy, '')
self.assertEqual(parsed_args.https_proxy, '')
parsed_args = YouTubeTranscriptCli(
'v1 v2 --languages de en --format json --http-proxy http://user:pass@domain:port --https-proxy https://user:pass@domain:port'.split()
'v1 v2 --languages de en --json --http-proxy http://user:pass@domain:port --https-proxy https://user:pass@domain:port'.split()
)._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.http_proxy, 'http://user:pass@domain:port')
self.assertEqual(parsed_args.https_proxy, 'https://user:pass@domain:port')
parsed_args = YouTubeTranscriptCli(
'v1 v2 --languages de en --format json --http-proxy http://user:pass@domain:port'.split()
'v1 v2 --languages de en --json --http-proxy http://user:pass@domain:port'.split()
)._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.http_proxy, 'http://user:pass@domain:port')
self.assertEqual(parsed_args.https_proxy, '')
parsed_args = YouTubeTranscriptCli(
'v1 v2 --languages de en --format json --https-proxy https://user:pass@domain:port'.split()
'v1 v2 --languages de en --json --https-proxy https://user:pass@domain:port'.split()
)._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.https_proxy, 'https://user:pass@domain:port')
self.assertEqual(parsed_args.http_proxy, '')
@ -76,28 +76,28 @@ class TestYouTubeTranscriptCli(TestCase):
def test_argument_parsing__only_video_ids(self):
parsed_args = YouTubeTranscriptCli('v1 v2'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, None)
self.assertEqual(parsed_args.json, False)
self.assertEqual(parsed_args.languages, ['en'])
def test_argument_parsing__fail_without_video_ids(self):
with self.assertRaises(SystemExit):
YouTubeTranscriptCli('--format json'.split())._parse_args()
YouTubeTranscriptCli('--json'.split())._parse_args()
def test_argument_parsing__json(self):
parsed_args = YouTubeTranscriptCli('v1 v2 --format json'.split())._parse_args()
parsed_args = YouTubeTranscriptCli('v1 v2 --json'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['en'])
parsed_args = YouTubeTranscriptCli('--format json v1 v2'.split())._parse_args()
parsed_args = YouTubeTranscriptCli('--json v1 v2'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, 'json')
self.assertEqual(parsed_args.json, True)
self.assertEqual(parsed_args.languages, ['en'])
def test_argument_parsing__languages(self):
parsed_args = YouTubeTranscriptCli('v1 v2 --languages de en'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, None)
self.assertEqual(parsed_args.json, False)
self.assertEqual(parsed_args.languages, ['de', 'en'])
def test_argument_parsing__proxies(self):
@ -135,13 +135,13 @@ class TestYouTubeTranscriptCli(TestCase):
def test_argument_parsing__translate(self):
parsed_args = YouTubeTranscriptCli('v1 v2 --languages de en --translate cz'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, None)
self.assertEqual(parsed_args.json, False)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.translate, 'cz')
parsed_args = YouTubeTranscriptCli('v1 v2 --translate cz --languages de en'.split())._parse_args()
self.assertEqual(parsed_args.video_ids, ['v1', 'v2'])
self.assertEqual(parsed_args.format, None)
self.assertEqual(parsed_args.json, False)
self.assertEqual(parsed_args.languages, ['de', 'en'])
self.assertEqual(parsed_args.translate, 'cz')
@ -204,7 +204,8 @@ class TestYouTubeTranscriptCli(TestCase):
YouTubeTranscriptApi.list_transcripts.assert_any_call('v2', proxies=None, cookies=None)
def test_run__json_output(self):
output = YouTubeTranscriptCli('v1 v2 --languages de en --format json'.split()).run()
output = YouTubeTranscriptCli('v1 v2 --languages de en --json'.split()).run()
# will fail if output is not valid json
json.loads(output)

View File

@ -1,99 +0,0 @@
from unittest import TestCase
from mock import MagicMock
import json
from youtube_transcript_api.formatters import (
JSONTranscriptFormatter,
parse_timecode,
SRTTranscriptFormatter,
TextTranscriptFormatter,
TranscriptFormatter,
TranscriptFormatterFactory
)
class TestTranscriptFormatters(TestCase):
@classmethod
def setUpClass(cls):
cls.transcript = [
{
'text': 'Hey, this is just a test',
'start': 0.0,
'duration': 1.54
},
{
'text': 'this is not the original transcript',
'start': 1.54,
'duration': 4.16
},
{
'text': 'just something shorter, I made up for testing',
'start': 5.7,
'duration': 3.239
}
]
def test_base_formatter_combine(self):
expecting = ''.join([str(line) for line in self.transcript])
self.assertEqual(
TranscriptFormatter.combine(self.transcript),
expecting
)
def test_base_format_not_implemented(self):
with self.assertRaises(NotImplementedError):
TranscriptFormatter.format(self.transcript)
def test_text_formatter_format(self):
text = '\n'.join([line.get('text') for line in self.transcript])
text_fmt = TextTranscriptFormatter.format(self.transcript)
self.assertIn(text + '\n', text_fmt)
def test_srt_formatter_format(self):
start = self.transcript[0].get('start')
duration = self.transcript[0].get('duration')
srt_fmt = SRTTranscriptFormatter.format(self.transcript)
self.assertIn('{start} --> {end}'.format(
start=parse_timecode(start),
end=parse_timecode(start+duration)
), srt_fmt)
def test_json_formatter_format(self):
json_fmt = JSONTranscriptFormatter.format(self.transcript)
self.assertIsInstance(json.dumps(json_fmt), str)
def test_invalid_parse_timecode(self):
start_time = 'not_float'
with self.assertRaises(ValueError):
parse_timecode(start_time)
def test_valid_parse_timecode(self):
start_time = 0.0
end_time = 5.20
self.assertEqual(
parse_timecode(start_time),
'00:00:00,000'
)
self.assertEqual(
parse_timecode(end_time),
'00:00:05,200'
)
def test_formatter_factory_valid_single_add(self):
factory = TranscriptFormatterFactory()
factory.add_formatter('json', JSONTranscriptFormatter)
self.assertDictEqual(
getattr(factory, '_formatters'),
{'json': JSONTranscriptFormatter}
)
def test_formatter_factory_invalid_single_add(self):
factory = TranscriptFormatterFactory()
with self.assertRaises(TypeError):
factory.add_formatter('magic', MagicMock)