From af1c601b4d27b295caed66365cf7aee83758d82a Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sat, 7 Mar 2020 22:27:08 +0000 Subject: [PATCH 1/8] Update query.py --- pytube/query.py | 485 +++++++++++++++++++++++++++++++----------------- 1 file changed, 317 insertions(+), 168 deletions(-) diff --git a/pytube/query.py b/pytube/query.py index 1e7819dd1..12b0998b7 100644 --- a/pytube/query.py +++ b/pytube/query.py @@ -20,6 +20,322 @@ def __init__(self, fmt_streams): self.fmt_streams = fmt_streams self.itag_index = {int(s.itag): s for s in fmt_streams} + def order_by(self, attribute_name: str) -> "StreamQuery": + """Apply a sort order. Filters out stream the do not have the attribute. + + :param str attribute_name: + The name of the attribute to sort by. + """ + has_attribute = [ + s for s in self.fmt_streams if getattr(s, attribute_name) is not None + ] + # Check that the attributes have string values. + if has_attribute and isinstance(getattr(has_attribute[0], attribute_name), str): + # Try to return a StreamQuery sorted by the integer representations + # of the values. + try: + return StreamQuery( + sorted( + has_attribute, + key=lambda s: int( + "".join(filter(str.isdigit, getattr(s, attribute_name))) + ), # type: ignore # noqa: E501 + ) + ) + except ValueError: + pass + + return StreamQuery( + sorted(has_attribute, key=lambda s: getattr(s, attribute_name)) + ) + + def desc(self) -> "StreamQuery": + """Sort streams in descending order. + + :rtype: :class:`StreamQuery ` + + """ + return StreamQuery(self.fmt_streams[::-1]) + + def asc(self) -> "StreamQuery": + """Sort streams in ascending order. + + :rtype: :class:`StreamQuery ` + + """ + return self + + def get_by_itag(self, itag: int) -> Optional[Stream]: + """Get the corresponding :class:`Stream ` for a given itag. + + :param int itag: + YouTube format identifier code. + :rtype: :class:`Stream ` or None + :returns: + The :class:`Stream ` matching the given itag or None if + not found. + + """ + return self.itag_index.get(int(itag)) + + def get_by_resolution(self, resolution: int) -> Optional[Stream]: + """Get the corresponding :class:`Stream ` for a given resolution. + + Stream must be a progressive mp4. + + :param str resolution: + Video resolution i.e. "720p", "480p", "360p", "240p", "144p". + :rtype: :class:`Stream ` or None + :returns: + The :class:`Stream ` matching the given itag or None if + not found. + + """ + return self.is_progressive().subtype("mp4").res(resolution).first() + + def get_lowest_resolution(self) -> Optional[Stream]: + """Get lowest resolution stream that is a progressive mp4. + + :rtype: :class:`Stream ` or None + :returns: + The :class:`Stream ` matching the given itag or None if + not found. + + """ + return self.is_progressive().subtype("mp4").order_by("resolution").first() + + def get_highest_resolution(self) -> Optional[Stream]: + """Get highest resolution stream that is a progressive video. + + :rtype: :class:`Stream ` or None + :returns: + The :class:`Stream ` matching the given itag or None if + not found. + + """ + return self.is_progressive().order_by("resolution").last() + + def get_best_audio(self, subtype: str = "mp4") -> Optional[Stream]: + """Get highest bitrate audio stream for given codec (defaults to mp4). + + :param str subtype: + Audio subtype, defaults to mp4. + :rtype: :class:`Stream ` or None + :returns: + The :class:`Stream ` matching the given itag or None if + not found. + """ + return self.audio_only().subtype(subtype).order_by("abr").last() + + def otf(self, is_otf: bool = False) -> "StreamQuery": + """Filter stream by OTF, useful if some streams have 404 URLs. + + :param bool is_otf: Set to False to retrieve only non-OTF streams + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object with otf filtered streams. + """ + return StreamQuery( + list(filter(lambda s: s.is_otf == is_otf), self.fmt_streams) + ) + + def audio_only(self, audio_only_bool: bool = True) -> Optional["StreamQuery"]: + """Return only streams which are audio only if audio_only_bool is + True, return only streams which are not audio only if audio_only_bool + is False. + + :param bool audio_only_bool: + Audio only streams, defaults to True. + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that either every Stream is + audio only, or every stream is not audio only. + """ + condition = ( + lambda s: (s.includes_audio_track and not s.includes_video_track) + is audio_only_bool + ) + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def video_only(self, video_only_bool: bool = True) -> Optional["StreamQuery"]: + """Return only streams which are audio only if video_only_bool is + True, return only streams which are not audio only if video_only_bool + is False. + :param bool video_only_bool: + Video only streams, defaults to True. + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that either every Stream is + video only, or every stream is not video only. + """ + condition = ( + lambda s: (s.includes_video_track and not s.includes_audio_track) + is video_only_bool + ) + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def abr(self, average_bitrate: int) -> Optional["StreamQuery"]: + """Return only streams of the given average bitrate. + :param str average_bitrate: + The audio average bitrate. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such that every Stream has the + bitrate defined by the user, or None if none are available. + """ + condition = lambda s: s.abr == f"{str(average_bitrate)}kbps" + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def res(self, resolution: int) -> Optional["StreamQuery"]: + """Return only streams of the given resolution. + :param str resolution: + The video resolution. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such that every Stream is of the + resolution defined by the user, or None if none are available. + """ + condition = lambda s: s.resolution == f"{str(resolution)}p" + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def fps(self, framerate: int) -> Optional["StreamQuery"]: + """Return only streams of the given framerate. + :param str framerate: + The frames per second. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such that every Stream is of the + resolution defined by the user, or None if none are available. + """ + condition = lambda s: s.fps == framerate + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def primary_type(self, type: str) -> Optional["StreamQuery"]: + """Return only streams of the given type. + :param str type: + "type" part of the ``mime_type`` (e.g.: audio, video). + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that every Stream is of the + type defined by the user. + """ + condition = lambda s: s.type == type + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def mime_type(self, mime_type: str) -> Optional["StreamQuery"]: + """Return only streams of the given mime_type. + :param str mime_type: + Two-part identifier for file formats and format contents + composed of a "type", a "subtype". + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that every Stream is of the + mime_type defined by the user. + """ + condition = lambda s: s.mime_type == mime_type + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def subtype(self, subtype: str) -> Optional["StreamQuery"]: + """Return only streams of the given subtype . + :param str subtype: + "subtype" part of the ``mime_type`` (e.g.: audio, video). + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that every Stream is of the + subtype defined by the user. + """ + condition = lambda s: s.subtype == subtype + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def video_codec(self, vid_codec: str) -> Optional["StreamQuery"]: + """Return only streams with the given video codec. + :param str vid_codec: Video compression format. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such that every stream is encoded + using the given video codec, or None if none are available. + """ + condition = lambda s: s.video_codec == vid_codec + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def audio_codec(self, aud_codec: str) -> Optional["StreamQuery"]: + """Return only streams with the given audio codec. + :param str aud_codec: Audio compression format. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such that every stream is encoded + using the given audio codec, or None if none are available. + """ + condition = lambda s: s.audio_codec == aud_codec + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def is_progressive(self) -> Optional["StreamQuery"]: + """Return only progressive streams. + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that every Stream is progressive. + """ + condition = lambda s: s.is_progressive + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def is_adaptive(self) -> Optional["StreamQuery"]: + """Return only adaptive/DASH streams. + :rtype: :class:`StreamQuery ` + :returns: A StreamQuery object such that every Stream is adaptive. + """ + condition = lambda s: s.is_adaptive + return StreamQuery(list(filter(condition, self.fmt_streams))) + + def custom_filters(self, filters: List[Callable]) -> Optional["StreamQuery"]: + """Return only those streams such that when applied to all functions + in the given list, the functions all return True. + :rtype: :class:`StreamQuery ` or None + :returns: A StreamQuery object such for every stream, every function in the + given list returns True when the stream is applied to it, or None if none + are available. + """ + fmt_streams = self.fmt_streams + for func in filters: + fmt_streams = list(filter(func, fmt_streams)) + return StreamQuery(fmt_streams) + + def first(self) -> Optional[Stream]: + """Get the first :class:`Stream ` in the results. + + :rtype: :class:`Stream ` or None + :returns: + the first result of this query or None if the result doesn't + contain any streams. + + """ + try: + return self.fmt_streams[0] + except IndexError: + return None + + def last(self): + """Get the last :class:`Stream ` in the results. + + :rtype: :class:`Stream ` or None + :returns: + Return the last result of this query or None if the result + doesn't contain any streams. + + """ + try: + return self.fmt_streams[-1] + except IndexError: + pass + + @deprecated("Get the size of this list directly using len()") + def count(self, value: Optional[str] = None) -> int: # pragma: no cover + """Get the count of items in the list. + + :rtype: int + """ + if value: + return self.fmt_streams.count(value) + + return len(self) + + @deprecated("This object can be treated as a list, all() is useless") + def all(self) -> List[Stream]: # pragma: no cover + """Get all the results represented by this query as a list. + + :rtype: list + + """ + return self.fmt_streams + + @deprecated("Replaced by new individual methods") def filter( self, fps=None, @@ -172,180 +488,13 @@ def filter( return self._filter(filters) + @deprecated("Replaced by new individual methods") def _filter(self, filters: List[Callable]) -> "StreamQuery": fmt_streams = self.fmt_streams for filter_lambda in filters: fmt_streams = filter(filter_lambda, fmt_streams) return StreamQuery(list(fmt_streams)) - def order_by(self, attribute_name: str) -> "StreamQuery": - """Apply a sort order. Filters out stream the do not have the attribute. - - :param str attribute_name: - The name of the attribute to sort by. - """ - has_attribute = [ - s for s in self.fmt_streams if getattr(s, attribute_name) is not None - ] - # Check that the attributes have string values. - if has_attribute and isinstance(getattr(has_attribute[0], attribute_name), str): - # Try to return a StreamQuery sorted by the integer representations - # of the values. - try: - return StreamQuery( - sorted( - has_attribute, - key=lambda s: int( - "".join(filter(str.isdigit, getattr(s, attribute_name))) - ), # type: ignore # noqa: E501 - ) - ) - except ValueError: - pass - - return StreamQuery( - sorted(has_attribute, key=lambda s: getattr(s, attribute_name)) - ) - - def desc(self) -> "StreamQuery": - """Sort streams in descending order. - - :rtype: :class:`StreamQuery ` - - """ - return StreamQuery(self.fmt_streams[::-1]) - - def asc(self) -> "StreamQuery": - """Sort streams in ascending order. - - :rtype: :class:`StreamQuery ` - - """ - return self - - def get_by_itag(self, itag: int) -> Optional[Stream]: - """Get the corresponding :class:`Stream ` for a given itag. - - :param int itag: - YouTube format identifier code. - :rtype: :class:`Stream ` or None - :returns: - The :class:`Stream ` matching the given itag or None if - not found. - - """ - return self.itag_index.get(int(itag)) - - def get_by_resolution(self, resolution: str) -> Optional[Stream]: - """Get the corresponding :class:`Stream ` for a given resolution. - - Stream must be a progressive mp4. - - :param str resolution: - Video resolution i.e. "720p", "480p", "360p", "240p", "144p" - :rtype: :class:`Stream ` or None - :returns: - The :class:`Stream ` matching the given itag or None if - not found. - - """ - return self.filter( - progressive=True, subtype="mp4", resolution=resolution - ).first() - - def get_lowest_resolution(self) -> Optional[Stream]: - """Get lowest resolution stream that is a progressive mp4. - - :rtype: :class:`Stream ` or None - :returns: - The :class:`Stream ` matching the given itag or None if - not found. - - """ - return ( - self.filter(progressive=True, subtype="mp4").order_by("resolution").first() - ) - - def get_highest_resolution(self) -> Optional[Stream]: - """Get highest resolution stream that is a progressive video. - - :rtype: :class:`Stream ` or None - :returns: - The :class:`Stream ` matching the given itag or None if - not found. - - """ - return self.filter(progressive=True).order_by("resolution").last() - - def get_audio_only(self, subtype: str = "mp4") -> Optional[Stream]: - """Get highest bitrate audio stream for given codec (defaults to mp4) - - :param str subtype: - Audio subtype, defaults to mp4 - :rtype: :class:`Stream ` or None - :returns: - The :class:`Stream ` matching the given itag or None if - not found. - """ - return self.filter(only_audio=True, subtype=subtype).order_by("abr").last() - - def otf(self, is_otf: bool = False) -> "StreamQuery": - """Filter stream by OTF, useful if some streams have 404 URLs - - :param bool is_otf: Set to False to retrieve only non-OTF streams - :rtype: :class:`StreamQuery ` - :returns: A StreamQuery object with otf filtered streams - """ - return self._filter([lambda s: s.is_otf == is_otf]) - - def first(self) -> Optional[Stream]: - """Get the first :class:`Stream ` in the results. - - :rtype: :class:`Stream ` or None - :returns: - the first result of this query or None if the result doesn't - contain any streams. - - """ - try: - return self.fmt_streams[0] - except IndexError: - return None - - def last(self): - """Get the last :class:`Stream ` in the results. - - :rtype: :class:`Stream ` or None - :returns: - Return the last result of this query or None if the result - doesn't contain any streams. - - """ - try: - return self.fmt_streams[-1] - except IndexError: - pass - - @deprecated("Get the size of this list directly using len()") - def count(self, value: Optional[str] = None) -> int: # pragma: no cover - """Get the count of items in the list. - - :rtype: int - """ - if value: - return self.fmt_streams.count(value) - - return len(self) - - @deprecated("This object can be treated as a list, all() is useless") - def all(self) -> List[Stream]: # pragma: no cover - """Get all the results represented by this query as a list. - - :rtype: list - - """ - return self.fmt_streams - def __getitem__(self, i: Union[slice, int]): return self.fmt_streams[i] From 2dd37256de9918bb0344ab7c0ecde5414162b55d Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sat, 7 Mar 2020 23:24:20 +0000 Subject: [PATCH 2/8] Update query.py --- pytube/query.py | 92 +++++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/pytube/query.py b/pytube/query.py index 12b0998b7..de634735c 100644 --- a/pytube/query.py +++ b/pytube/query.py @@ -134,9 +134,7 @@ def otf(self, is_otf: bool = False) -> "StreamQuery": :rtype: :class:`StreamQuery ` :returns: A StreamQuery object with otf filtered streams. """ - return StreamQuery( - list(filter(lambda s: s.is_otf == is_otf), self.fmt_streams) - ) + return StreamQuery(list(filter(lambda s: s.is_otf == is_otf), self.fmt_streams)) def audio_only(self, audio_only_bool: bool = True) -> Optional["StreamQuery"]: """Return only streams which are audio only if audio_only_bool is @@ -149,11 +147,19 @@ def audio_only(self, audio_only_bool: bool = True) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that either every Stream is audio only, or every stream is not audio only. """ - condition = ( - lambda s: (s.includes_audio_track and not s.includes_video_track) - is audio_only_bool + return StreamQuery( + list( + filter( + ( + lambda s: ( + s.includes_audio_track and not s.includes_video_track + ) + is audio_only_bool + ), + self.fmt_streams, + ) + ) ) - return StreamQuery(list(filter(condition, self.fmt_streams))) def video_only(self, video_only_bool: bool = True) -> Optional["StreamQuery"]: """Return only streams which are audio only if video_only_bool is @@ -165,11 +171,19 @@ def video_only(self, video_only_bool: bool = True) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that either every Stream is video only, or every stream is not video only. """ - condition = ( - lambda s: (s.includes_video_track and not s.includes_audio_track) - is video_only_bool + return StreamQuery( + list( + filter( + ( + lambda s: ( + s.includes_video_track and not s.includes_audio_track + ) + is video_only_bool + ), + self.fmt_streams, + ) + ) ) - return StreamQuery(list(filter(condition, self.fmt_streams))) def abr(self, average_bitrate: int) -> Optional["StreamQuery"]: """Return only streams of the given average bitrate. @@ -179,8 +193,13 @@ def abr(self, average_bitrate: int) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that every Stream has the bitrate defined by the user, or None if none are available. """ - condition = lambda s: s.abr == f"{str(average_bitrate)}kbps" - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list( + filter( + lambda s: s.abr == f"{str(average_bitrate)}kbps", self.fmt_streams + ) + ) + ) def res(self, resolution: int) -> Optional["StreamQuery"]: """Return only streams of the given resolution. @@ -190,8 +209,13 @@ def res(self, resolution: int) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that every Stream is of the resolution defined by the user, or None if none are available. """ - condition = lambda s: s.resolution == f"{str(resolution)}p" - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list( + filter( + lambda s: s.resolution == f"{str(resolution)}p", self.fmt_streams + ) + ) + ) def fps(self, framerate: int) -> Optional["StreamQuery"]: """Return only streams of the given framerate. @@ -201,19 +225,17 @@ def fps(self, framerate: int) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that every Stream is of the resolution defined by the user, or None if none are available. """ - condition = lambda s: s.fps == framerate - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery(list(filter(lambda s: s.fps == framerate, self.fmt_streams))) def primary_type(self, type: str) -> Optional["StreamQuery"]: """Return only streams of the given type. :param str type: "type" part of the ``mime_type`` (e.g.: audio, video). :rtype: :class:`StreamQuery ` - :returns: A StreamQuery object such that every Stream is of the + :returns: A StreamQuery object such that every Stream is of the type defined by the user. """ - condition = lambda s: s.type == type - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery(list(filter(lambda s: s.type == type, self.fmt_streams))) def mime_type(self, mime_type: str) -> Optional["StreamQuery"]: """Return only streams of the given mime_type. @@ -221,22 +243,24 @@ def mime_type(self, mime_type: str) -> Optional["StreamQuery"]: Two-part identifier for file formats and format contents composed of a "type", a "subtype". :rtype: :class:`StreamQuery ` - :returns: A StreamQuery object such that every Stream is of the + :returns: A StreamQuery object such that every Stream is of the mime_type defined by the user. """ - condition = lambda s: s.mime_type == mime_type - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list(filter(lambda s: s.mime_type == mime_type, self.fmt_streams)) + ) def subtype(self, subtype: str) -> Optional["StreamQuery"]: """Return only streams of the given subtype . :param str subtype: "subtype" part of the ``mime_type`` (e.g.: audio, video). :rtype: :class:`StreamQuery ` - :returns: A StreamQuery object such that every Stream is of the + :returns: A StreamQuery object such that every Stream is of the subtype defined by the user. """ - condition = lambda s: s.subtype == subtype - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list(filter(lambda s: s.subtype == subtype, self.fmt_streams)) + ) def video_codec(self, vid_codec: str) -> Optional["StreamQuery"]: """Return only streams with the given video codec. @@ -245,8 +269,9 @@ def video_codec(self, vid_codec: str) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that every stream is encoded using the given video codec, or None if none are available. """ - condition = lambda s: s.video_codec == vid_codec - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list(filter(lambda s: s.video_codec == vid_codec, self.fmt_streams)) + ) def audio_codec(self, aud_codec: str) -> Optional["StreamQuery"]: """Return only streams with the given audio codec. @@ -255,24 +280,23 @@ def audio_codec(self, aud_codec: str) -> Optional["StreamQuery"]: :returns: A StreamQuery object such that every stream is encoded using the given audio codec, or None if none are available. """ - condition = lambda s: s.audio_codec == aud_codec - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery( + list(filter(lambda s: s.audio_codec == aud_codec, self.fmt_streams)) + ) def is_progressive(self) -> Optional["StreamQuery"]: """Return only progressive streams. :rtype: :class:`StreamQuery ` :returns: A StreamQuery object such that every Stream is progressive. """ - condition = lambda s: s.is_progressive - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery(list(filter(lambda s: s.is_progressive, self.fmt_streams))) def is_adaptive(self) -> Optional["StreamQuery"]: """Return only adaptive/DASH streams. :rtype: :class:`StreamQuery ` :returns: A StreamQuery object such that every Stream is adaptive. """ - condition = lambda s: s.is_adaptive - return StreamQuery(list(filter(condition, self.fmt_streams))) + return StreamQuery(list(filter(lambda s: s.is_adaptive, self.fmt_streams))) def custom_filters(self, filters: List[Callable]) -> Optional["StreamQuery"]: """Return only those streams such that when applied to all functions From c8d4d8f6880671733cca76507dbffc564092a607 Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sat, 7 Mar 2020 23:24:55 +0000 Subject: [PATCH 3/8] Update cli.py --- pytube/cli.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/pytube/cli.py b/pytube/cli.py index 05c272577..3c1ca6ca4 100755 --- a/pytube/cli.py +++ b/pytube/cli.py @@ -81,7 +81,7 @@ def _parse_args( "--itag", type=int, help="The itag for the desired stream", ) parser.add_argument( - "-r", "--resolution", type=str, help="The resolution for the desired stream", + "-r", "--resolution", type=int, help="The resolution for the desired stream", ) parser.add_argument( "-l", @@ -271,34 +271,28 @@ def ffmpeg_process( if resolution == "best": highest_quality_stream = ( - youtube.streams.filter(progressive=False).order_by("resolution").last() + youtube.streams.is_adaptive().order_by("resolution").last() ) mp4_stream = ( - youtube.streams.filter(progressive=False, subtype="mp4") - .order_by("resolution") - .last() + youtube.streams.is_adaptive().subtype("mp4").order_by("resolution").last() ) if highest_quality_stream.resolution == mp4_stream.resolution: video_stream = mp4_stream else: video_stream = highest_quality_stream else: - video_stream = youtube.streams.filter( - progressive=False, resolution=resolution, subtype="mp4" - ).first() + video_stream = ( + youtube.streams.is_adaptive().res(resolution).subtype("mp4").first() + ) if not video_stream: - video_stream = youtube.streams.filter( - progressive=False, resolution=resolution - ).first() + video_stream = youtube.streams.is_adaptive().res(resolution).first() if video_stream is None: print(f"Could not find a stream with resolution: {resolution}") print("Try one of these:") display_streams(youtube) sys.exit() - audio_stream = youtube.streams.get_audio_only(video_stream.subtype) - if not audio_stream: - audio_stream = youtube.streams.filter(only_audio=True).order_by("abr").last() + audio_stream = youtube.streams.get_best_audio(video_stream.subtype) if not audio_stream: print("Could not find an audio only stream") sys.exit() @@ -370,13 +364,13 @@ def download_by_itag(youtube: YouTube, itag: int, target: Optional[str] = None) def download_by_resolution( - youtube: YouTube, resolution: str, target: Optional[str] = None + youtube: YouTube, resolution: int, target: Optional[str] = None ) -> None: """Start downloading a YouTube video. :param YouTube youtube: A valid YouTube object. - :param str resolution: + :param int resolution: YouTube video resolution. :param str target: Target directory for download From 51f33ecc1cd7852c2096fa0c4b88e3103d1e0fa4 Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sun, 8 Mar 2020 21:54:00 +0000 Subject: [PATCH 4/8] Resolution accepts int param --- pytube/contrib/playlist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pytube/contrib/playlist.py b/pytube/contrib/playlist.py index ccb8a794b..a27894997 100644 --- a/pytube/contrib/playlist.py +++ b/pytube/contrib/playlist.py @@ -189,7 +189,7 @@ def download_all( download_path: Optional[str] = None, prefix_number: bool = True, reverse_numbering: bool = False, - resolution: str = "720p", + resolution: int = 720, ) -> None: # pragma: no cover """Download all the videos in the the playlist. @@ -207,8 +207,8 @@ def download_all( playlists are ordered newest -> oldest. :type reverse_numbering: bool :param resolution: - Video resolution i.e. "720p", "480p", "360p", "240p", "144p" - :type resolution: str + Video resolution i.e. 720, 480, 360, 240, 144 + :type resolution: int """ logger.debug("total videos found: %d", len(self.video_urls)) logger.debug("starting download") From 207e5e629b55d872919bde9a3cf9a76ca7404093 Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sun, 8 Mar 2020 21:54:42 +0000 Subject: [PATCH 5/8] Resolution is an int --- pytube/cli.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pytube/cli.py b/pytube/cli.py index 3c1ca6ca4..8e9e3ba62 100755 --- a/pytube/cli.py +++ b/pytube/cli.py @@ -281,11 +281,10 @@ def ffmpeg_process( else: video_stream = highest_quality_stream else: - video_stream = ( - youtube.streams.is_adaptive().res(resolution).subtype("mp4").first() - ) + int_res = int(resolution) + video_stream = youtube.streams.is_adaptive().res(int_res).subtype("mp4").first() if not video_stream: - video_stream = youtube.streams.is_adaptive().res(resolution).first() + video_stream = youtube.streams.is_adaptive().res(int_res).first() if video_stream is None: print(f"Could not find a stream with resolution: {resolution}") print("Try one of these:") From 5f4bea06d07d85f7ffac8f4bf9efc64ce8a425df Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Sun, 8 Mar 2020 21:55:37 +0000 Subject: [PATCH 6/8] Resolution is an int --- pytube/query.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pytube/query.py b/pytube/query.py index de634735c..cebfc36f5 100644 --- a/pytube/query.py +++ b/pytube/query.py @@ -134,9 +134,9 @@ def otf(self, is_otf: bool = False) -> "StreamQuery": :rtype: :class:`StreamQuery ` :returns: A StreamQuery object with otf filtered streams. """ - return StreamQuery(list(filter(lambda s: s.is_otf == is_otf), self.fmt_streams)) + return StreamQuery(list(filter(lambda s: s.is_otf == is_otf, self.fmt_streams))) - def audio_only(self, audio_only_bool: bool = True) -> Optional["StreamQuery"]: + def audio_only(self, audio_only_bool: bool = True) -> "StreamQuery": """Return only streams which are audio only if audio_only_bool is True, return only streams which are not audio only if audio_only_bool is False. @@ -161,7 +161,7 @@ def audio_only(self, audio_only_bool: bool = True) -> Optional["StreamQuery"]: ) ) - def video_only(self, video_only_bool: bool = True) -> Optional["StreamQuery"]: + def video_only(self, video_only_bool: bool = True) -> "StreamQuery": """Return only streams which are audio only if video_only_bool is True, return only streams which are not audio only if video_only_bool is False. @@ -185,7 +185,7 @@ def video_only(self, video_only_bool: bool = True) -> Optional["StreamQuery"]: ) ) - def abr(self, average_bitrate: int) -> Optional["StreamQuery"]: + def abr(self, average_bitrate: int) -> "StreamQuery": """Return only streams of the given average bitrate. :param str average_bitrate: The audio average bitrate. @@ -201,7 +201,7 @@ def abr(self, average_bitrate: int) -> Optional["StreamQuery"]: ) ) - def res(self, resolution: int) -> Optional["StreamQuery"]: + def res(self, resolution: int) -> "StreamQuery": """Return only streams of the given resolution. :param str resolution: The video resolution. @@ -217,7 +217,7 @@ def res(self, resolution: int) -> Optional["StreamQuery"]: ) ) - def fps(self, framerate: int) -> Optional["StreamQuery"]: + def fps(self, framerate: int) -> "StreamQuery": """Return only streams of the given framerate. :param str framerate: The frames per second. @@ -227,7 +227,7 @@ def fps(self, framerate: int) -> Optional["StreamQuery"]: """ return StreamQuery(list(filter(lambda s: s.fps == framerate, self.fmt_streams))) - def primary_type(self, type: str) -> Optional["StreamQuery"]: + def primary_type(self, type: str) -> "StreamQuery": """Return only streams of the given type. :param str type: "type" part of the ``mime_type`` (e.g.: audio, video). @@ -237,7 +237,7 @@ def primary_type(self, type: str) -> Optional["StreamQuery"]: """ return StreamQuery(list(filter(lambda s: s.type == type, self.fmt_streams))) - def mime_type(self, mime_type: str) -> Optional["StreamQuery"]: + def mime_type(self, mime_type: str) -> "StreamQuery": """Return only streams of the given mime_type. :param str mime_type: Two-part identifier for file formats and format contents @@ -250,7 +250,7 @@ def mime_type(self, mime_type: str) -> Optional["StreamQuery"]: list(filter(lambda s: s.mime_type == mime_type, self.fmt_streams)) ) - def subtype(self, subtype: str) -> Optional["StreamQuery"]: + def subtype(self, subtype: str) -> "StreamQuery": """Return only streams of the given subtype . :param str subtype: "subtype" part of the ``mime_type`` (e.g.: audio, video). @@ -262,7 +262,7 @@ def subtype(self, subtype: str) -> Optional["StreamQuery"]: list(filter(lambda s: s.subtype == subtype, self.fmt_streams)) ) - def video_codec(self, vid_codec: str) -> Optional["StreamQuery"]: + def video_codec(self, vid_codec: str) -> "StreamQuery": """Return only streams with the given video codec. :param str vid_codec: Video compression format. :rtype: :class:`StreamQuery ` or None @@ -273,7 +273,7 @@ def video_codec(self, vid_codec: str) -> Optional["StreamQuery"]: list(filter(lambda s: s.video_codec == vid_codec, self.fmt_streams)) ) - def audio_codec(self, aud_codec: str) -> Optional["StreamQuery"]: + def audio_codec(self, aud_codec: str) -> "StreamQuery": """Return only streams with the given audio codec. :param str aud_codec: Audio compression format. :rtype: :class:`StreamQuery ` or None @@ -284,21 +284,21 @@ def audio_codec(self, aud_codec: str) -> Optional["StreamQuery"]: list(filter(lambda s: s.audio_codec == aud_codec, self.fmt_streams)) ) - def is_progressive(self) -> Optional["StreamQuery"]: + def is_progressive(self) -> "StreamQuery": """Return only progressive streams. :rtype: :class:`StreamQuery ` :returns: A StreamQuery object such that every Stream is progressive. """ return StreamQuery(list(filter(lambda s: s.is_progressive, self.fmt_streams))) - def is_adaptive(self) -> Optional["StreamQuery"]: + def is_adaptive(self) -> "StreamQuery": """Return only adaptive/DASH streams. :rtype: :class:`StreamQuery ` :returns: A StreamQuery object such that every Stream is adaptive. """ return StreamQuery(list(filter(lambda s: s.is_adaptive, self.fmt_streams))) - def custom_filters(self, filters: List[Callable]) -> Optional["StreamQuery"]: + def custom_filters(self, filters: List[Callable]) -> "StreamQuery": """Return only those streams such that when applied to all functions in the given list, the functions all return True. :rtype: :class:`StreamQuery ` or None From 58428c52a0f9dc5c3352b558e8576bf5c1beb11b Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Mon, 9 Mar 2020 00:22:20 +0000 Subject: [PATCH 7/8] Update test_query.py --- tests/test_query.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index dc4cec836..f81e89d25 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -123,7 +123,7 @@ def test_get_by_non_existent_itag(cipher_signature): def test_get_by_resolution(cipher_signature): - assert cipher_signature.streams.get_by_resolution("360p").itag == 18 + assert cipher_signature.streams.get_by_resolution(360).itag == 18 def test_get_lowest_resolution(cipher_signature): @@ -141,11 +141,11 @@ def test_filter_is_dash(cipher_signature): def test_get_audio_only(cipher_signature): - assert cipher_signature.streams.get_audio_only().itag == 140 + assert cipher_signature.streams.get_best_audio().itag == 140 def test_get_audio_only_with_subtype(cipher_signature): - assert cipher_signature.streams.get_audio_only(subtype="webm").itag == 251 + assert cipher_signature.streams.get_best_audio(subtype="webm").itag == 251 def test_sequence(cipher_signature): From 9f0874312ffd299943f2c62577e6c7fd5c9e3b7f Mon Sep 17 00:00:00 2001 From: swiftyy-mage Date: Mon, 9 Mar 2020 00:22:38 +0000 Subject: [PATCH 8/8] Update test_cli.py --- tests/test_cli.py | 661 +++++++++++----------------------------------- 1 file changed, 155 insertions(+), 506 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index bdcf0758c..f81e89d25 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,524 +1,173 @@ # -*- coding: utf-8 -*- -import argparse -from unittest import mock -from unittest.mock import MagicMock, patch - +"""Unit tests for the :class:`StreamQuery ` class.""" import pytest -from pytube import cli, StreamQuery, Caption, CaptionQuery -from pytube.exceptions import PytubeError - -parse_args = cli._parse_args - - -@mock.patch("pytube.cli._parse_args") -def test_main_invalid_url(_parse_args): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["crikey",],) - _parse_args.return_value = args - with pytest.raises(SystemExit): - cli.main() - - -@mock.patch("pytube.cli.display_streams") -@mock.patch("pytube.cli.YouTube") -def test_download_when_itag_not_found(youtube, display_streams): - # Given - youtube.streams = mock.Mock() - youtube.streams.get_by_itag.return_value = None - # When - with pytest.raises(SystemExit): - cli.download_by_itag(youtube, 123) - # Then - youtube.streams.get_by_itag.assert_called_with(123) - display_streams.assert_called_with(youtube) - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.Stream") -def test_download_when_itag_is_found(youtube, stream): - stream.itag = 123 - stream.exists_at_path.return_value = False - youtube.streams = StreamQuery([stream]) - with patch.object( - youtube.streams, "get_by_itag", wraps=youtube.streams.get_by_itag - ) as wrapped_itag: - cli.download_by_itag(youtube, 123) - wrapped_itag.assert_called_with(123) - youtube.register_on_progress_callback.assert_called_with(cli.on_progress) - stream.download.assert_called() - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.Stream") -def test_display_stream(youtube, stream): - # Given - stream.itag = 123 - stream.__repr__ = MagicMock(return_value="") - youtube.streams = StreamQuery([stream]) - # When - cli.display_streams(youtube) - # Then - stream.__repr__.assert_called() - - -@mock.patch("pytube.cli._print_available_captions") -@mock.patch("pytube.cli.YouTube") -def test_download_caption_with_none(youtube, print_available): - # Given - caption = Caption( - {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"} - ) - youtube.captions = CaptionQuery([caption]) - # When - cli.download_caption(youtube, None) - # Then - print_available.assert_called_with(youtube.captions) - - -@mock.patch("pytube.cli.YouTube") -def test_download_caption_with_language_found(youtube): - youtube.title = "video title" - caption = Caption( - {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"} - ) - caption.download = MagicMock(return_value="file_path") - youtube.captions = CaptionQuery([caption]) - cli.download_caption(youtube, "en") - caption.download.assert_called_with(title="video title", output_path=None) - - -@mock.patch("pytube.cli._print_available_captions") -@mock.patch("pytube.cli.YouTube") -def test_download_caption_with_lang_not_found(youtube, print_available): - # Given - caption = Caption( - {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"} - ) - youtube.captions = CaptionQuery([caption]) - # When - cli.download_caption(youtube, "blah") - # Then - print_available.assert_called_with(youtube.captions) +@pytest.mark.parametrize( + ("test_input", "expected"), + [ + ({"progressive": True}, [18]), + ({"resolution": "720p"}, [136, 247]), + ({"res": "720p"}, [136, 247]), + ({"fps": 30, "resolution": "480p"}, [135, 244]), + ({"mime_type": "audio/mp4"}, [140]), + ({"type": "audio"}, [140, 249, 250, 251]), + ({"subtype": "3gpp"}, []), + ({"abr": "128kbps"}, [140]), + ({"bitrate": "128kbps"}, [140]), + ({"audio_codec": "opus"}, [249, 250, 251]), + ({"video_codec": "vp9"}, [248, 247, 244, 243, 242, 278]), + ({"only_audio": True}, [140, 249, 250, 251]), + ({"only_video": True, "video_codec": "avc1.4d4015"}, [133]), + ({"adaptive": True, "resolution": "1080p"}, [137, 248]), + ({"custom_filter_functions": [lambda s: s.itag == 18]}, [18]), + ], +) +def test_filters(test_input, expected, cipher_signature): + """Ensure filters produce the expected results.""" + result = [s.itag for s in cipher_signature.streams.filter(**test_input)] + assert result == expected + + +@pytest.mark.parametrize("test_input", ["first", "last"]) +def test_empty(test_input, cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.last` and + :meth:`~pytube.StreamQuery.first` return None if the resultset is + empty. + """ + query = cipher_signature.streams.filter(video_codec="vp20") + fn = getattr(query, test_input) + assert fn() is None + + +def test_get_last(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.last` returns the expected + :class:`Stream `. + """ + assert cipher_signature.streams[-1].itag == 251 + + +def test_get_first(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.first` returns the expected + :class:`Stream `. + """ + assert cipher_signature.streams[0].itag == 18 + + +def test_order_by(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.order_by` sorts the list of + :class:`Stream ` instances in the expected order. + """ + itags = [ + s.itag for s in cipher_signature.streams.filter(type="audio").order_by("itag") + ] + assert itags == [140, 249, 250, 251] -def test_print_available_captions(capsys): - # Given - caption1 = Caption( - {"url": "url1", "name": {"simpleText": "name1"}, "languageCode": "en"} - ) - caption2 = Caption( - {"url": "url2", "name": {"simpleText": "name2"}, "languageCode": "fr"} - ) - query = CaptionQuery([caption1, caption2]) - # When - cli._print_available_captions(query) - # Then - captured = capsys.readouterr() - assert captured.out == "Available caption codes are: en, fr\n" - - -def test_display_progress_bar(capsys): - cli.display_progress_bar(bytes_received=25, filesize=100, scale=0.55) - out, _ = capsys.readouterr() - assert "25.0%" in out - - -@mock.patch("pytube.Stream") -def test_on_progress(stream): - stream.filesize = 10 - cli.display_progress_bar = MagicMock() - cli.on_progress(stream, "", 7) - cli.display_progress_bar.assert_called_once_with(3, 10) - - -def test_parse_args_falsey(): - parser = argparse.ArgumentParser() - args = cli._parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0"]) - assert args.url == "http://youtube.com/watch?v=9bZkp7q19f0" - assert args.build_playback_report is False - assert args.itag is None - assert args.list is False - assert args.verbosity == 0 - - -def test_parse_args_truthy(): - parser = argparse.ArgumentParser() - args = cli._parse_args( - parser, - [ - "http://youtube.com/watch?v=9bZkp7q19f0", - "--build-playback-report", - "-c", - "en", - "-l", - "--itag=10", - "-vvv", - ], - ) - assert args.url == "http://youtube.com/watch?v=9bZkp7q19f0" - assert args.build_playback_report is True - assert args.itag == 10 - assert args.list is True - assert args.verbosity == 3 - - -@mock.patch("pytube.cli.setup_logger", return_value=None) -def test_main_logging_setup(setup_logger): - # Given - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://fakeurl", "-v"]) - cli._parse_args = MagicMock(return_value=args) - # When - with pytest.raises(SystemExit): - cli.main() - # Then - setup_logger.assert_called_with(40) - - -@mock.patch("pytube.cli.YouTube", return_value=None) -def test_main_download_by_itag(youtube): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "--itag=10"]) - cli._parse_args = MagicMock(return_value=args) - cli.download_by_itag = MagicMock() - cli.main() - youtube.assert_called() - cli.download_by_itag.assert_called() - - -@mock.patch("pytube.cli.YouTube", return_value=None) -def test_main_build_playback_report(youtube): - parser = argparse.ArgumentParser() - args = parse_args( - parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "--build-playback-report"] - ) - cli._parse_args = MagicMock(return_value=args) - cli.build_playback_report = MagicMock() - cli.main() - youtube.assert_called() - cli.build_playback_report.assert_called() - - -@mock.patch("pytube.cli.YouTube", return_value=None) -def test_main_display_streams(youtube): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-l"]) - cli._parse_args = MagicMock(return_value=args) - cli.display_streams = MagicMock() - cli.main() - youtube.assert_called() - cli.display_streams.assert_called() - - -@mock.patch("pytube.cli.YouTube", return_value=None) -def test_main_download_caption(youtube): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-c"]) - cli._parse_args = MagicMock(return_value=args) - cli.download_caption = MagicMock() - cli.main() - youtube.assert_called() - cli.download_caption.assert_called() - - -@mock.patch("pytube.cli.YouTube", return_value=None) -@mock.patch("pytube.cli.download_by_resolution") -def test_download_by_resolution_flag(youtube, download_by_resolution): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-r", "320p"]) - cli._parse_args = MagicMock(return_value=args) - cli.main() - youtube.assert_called() - download_by_resolution.assert_called() - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli.Playlist") -@mock.patch("pytube.cli._perform_args_on_youtube") -def test_download_with_playlist(perform_args_on_youtube, playlist, youtube): - # Given - cli.safe_filename = MagicMock(return_value="safe_title") - parser = argparse.ArgumentParser() - args = parse_args(parser, ["https://www.youtube.com/playlist?list=PLyn"]) - cli._parse_args = MagicMock(return_value=args) - videos = [youtube] - playlist_instance = playlist.return_value - playlist_instance.videos = videos - # When - cli.main() - # Then - playlist.assert_called() - perform_args_on_youtube.assert_called_with(youtube, args) - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli.Playlist") -@mock.patch("pytube.cli._perform_args_on_youtube") -def test_download_with_playlist_video_error( - perform_args_on_youtube, playlist, youtube, capsys -): - # Given - cli.safe_filename = MagicMock(return_value="safe_title") - parser = argparse.ArgumentParser() - args = parse_args(parser, ["https://www.youtube.com/playlist?list=PLyn"]) - cli._parse_args = MagicMock(return_value=args) - videos = [youtube] - playlist_instance = playlist.return_value - playlist_instance.videos = videos - perform_args_on_youtube.side_effect = PytubeError() - # When - cli.main() - # Then - playlist.assert_called() - captured = capsys.readouterr() - assert "There was an error with video" in captured.out - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.StreamQuery") -@mock.patch("pytube.Stream") -@mock.patch("pytube.cli._download") -def test_download_by_resolution(download, stream, stream_query, youtube): - # Given - stream_query.get_by_resolution.return_value = stream - youtube.streams = stream_query - # When - cli.download_by_resolution(youtube=youtube, resolution="320p", target="test_target") - # Then - download.assert_called_with(stream, target="test_target") - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.StreamQuery") -@mock.patch("pytube.cli._download") -def test_download_by_resolution_not_exists(download, stream_query, youtube): - stream_query.get_by_resolution.return_value = None - youtube.streams = stream_query - with pytest.raises(SystemExit): - cli.download_by_resolution( - youtube=youtube, resolution="DOESNT EXIST", target="test_target" - ) - download.assert_not_called() - - -@mock.patch("pytube.Stream") -def test_download_stream_file_exists(stream, capsys): - # Given - stream.exists_at_path.return_value = True - # When - cli._download(stream=stream) - # Then - captured = capsys.readouterr() - assert "Already downloaded at" in captured.out - stream.download.assert_not_called() - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli.ffmpeg_process") -def test_perform_args_should_ffmpeg_process(ffmpeg_process, youtube): - # Given - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "best"]) - cli._parse_args = MagicMock(return_value=args) - # When - cli._perform_args_on_youtube(youtube, args) - # Then - ffmpeg_process.assert_called_with(youtube=youtube, resolution="best", target=None) - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli._ffmpeg_downloader") -def test_ffmpeg_process_best_should_download(_ffmpeg_downloader, youtube): - # Given - target = "/target" - streams = MagicMock() - youtube.streams = streams - video_stream = MagicMock() - streams.filter.return_value.order_by.return_value.last.return_value = video_stream - audio_stream = MagicMock() - streams.get_audio_only.return_value = audio_stream - # When - cli.ffmpeg_process(youtube, "best", target) - # Then - _ffmpeg_downloader.assert_called_with( - audio_stream=audio_stream, video_stream=video_stream, target=target - ) +def test_order_by_descending(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.desc` sorts the list of + :class:`Stream ` instances in the reverse order. + """ + # numerical values + itags = [ + s.itag + for s in cipher_signature.streams.filter(type="audio").order_by("itag").desc() + ] + assert itags == [251, 250, 249, 140] -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli._ffmpeg_downloader") -def test_ffmpeg_process_res_should_download(_ffmpeg_downloader, youtube): - # Given - target = "/target" - streams = MagicMock() - youtube.streams = streams - video_stream = MagicMock() - streams.filter.return_value.first.return_value = video_stream - audio_stream = MagicMock() - streams.get_audio_only.return_value = audio_stream - # When - cli.ffmpeg_process(youtube, "XYZp", target) - # Then - _ffmpeg_downloader.assert_called_with( - audio_stream=audio_stream, video_stream=video_stream, target=target - ) +def test_order_by_non_numerical(cipher_signature): + mime_types = [ + s.mime_type + for s in cipher_signature.streams.filter(res="360p") + .order_by("mime_type") + .desc() + ] + assert mime_types == ["video/webm", "video/mp4", "video/mp4"] -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli._ffmpeg_downloader") -def test_ffmpeg_process_res_none_should_not_download(_ffmpeg_downloader, youtube): - # Given - target = "/target" - streams = MagicMock() - youtube.streams = streams - streams.filter.return_value.first.return_value = None - audio_stream = MagicMock() - streams.get_audio_only.return_value = audio_stream - # When - with pytest.raises(SystemExit): - cli.ffmpeg_process(youtube, "XYZp", target) - # Then - _ffmpeg_downloader.assert_not_called() - - -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli._ffmpeg_downloader") -def test_ffmpeg_process_audio_none_should_fallback_download( - _ffmpeg_downloader, youtube -): - # Given - target = "/target" - streams = MagicMock() - youtube.streams = streams - stream = MagicMock() - streams.filter.return_value.order_by.return_value.last.return_value = stream - streams.get_audio_only.return_value = None - # When - cli.ffmpeg_process(youtube, "best", target) - # Then - _ffmpeg_downloader.assert_called_with( - audio_stream=stream, video_stream=stream, target=target - ) + +def test_order_by_ascending(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.desc` sorts the list of + :class:`Stream ` instances in ascending order. + """ + # numerical values + itags = [ + s.itag + for s in cipher_signature.streams.filter(type="audio").order_by("itag").asc() + ] + assert itags == [140, 249, 250, 251] -@mock.patch("pytube.cli.YouTube") -@mock.patch("pytube.cli._ffmpeg_downloader") -def test_ffmpeg_process_audio_fallback_none_should_exit(_ffmpeg_downloader, youtube): - # Given - target = "/target" - streams = MagicMock() - youtube.streams = streams - stream = MagicMock() - streams.filter.return_value.order_by.return_value.last.side_effect = [ - stream, - stream, - None, +def test_order_by_non_numerical_ascending(cipher_signature): + mime_types = [ + s.mime_type + for s in cipher_signature.streams.filter(res="360p").order_by("mime_type").asc() ] - streams.get_audio_only.return_value = None - # When - with pytest.raises(SystemExit): - cli.ffmpeg_process(youtube, "best", target) - # Then - _ffmpeg_downloader.assert_not_called() - - -@mock.patch("pytube.cli.os.unlink", return_value=None) -@mock.patch("pytube.cli.subprocess.run", return_value=None) -@mock.patch("pytube.cli._download", return_value=None) -@mock.patch("pytube.cli._unique_name", return_value=None) -def test_ffmpeg_downloader(unique_name, download, run, unlink): - # Given - target = "target" - audio_stream = MagicMock() - video_stream = MagicMock() - video_stream.id = "video_id" - audio_stream.subtype = "audio_subtype" - video_stream.subtype = "video_subtype" - unique_name.side_effect = ["video_name", "audio_name"] - - # When - cli._ffmpeg_downloader( - audio_stream=audio_stream, video_stream=video_stream, target=target - ) - # Then - download.assert_called() - run.assert_called_with( - [ - "ffmpeg", - "-i", - "target/video_name.video_subtype", - "-i", - "target/audio_name.audio_subtype", - "-codec", - "copy", - "target/safe_title.video_subtype", - ] - ) - unlink.assert_called() - - -@mock.patch("pytube.cli.download_audio") -@mock.patch("pytube.cli.YouTube.__init__", return_value=None) -def test_download_audio_args(youtube, download_audio): - # Given - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-a", "mp4"]) - cli._parse_args = MagicMock(return_value=args) - # When - cli.main() - # Then - youtube.assert_called() - download_audio.assert_called() - - -@mock.patch("pytube.cli._download") -@mock.patch("pytube.cli.YouTube") -def test_download_audio(youtube, download): - # Given - youtube_instance = youtube.return_value - audio_stream = MagicMock() - youtube_instance.streams.filter.return_value.order_by.return_value.last.return_value = ( - audio_stream - ) - # When - cli.download_audio(youtube_instance, "filetype", "target") - # Then - download.assert_called_with(audio_stream, target="target") - - -@mock.patch("pytube.cli._download") -@mock.patch("pytube.cli.YouTube") -def test_download_audio_none(youtube, download): - # Given - youtube_instance = youtube.return_value - youtube_instance.streams.filter.return_value.order_by.return_value.last.return_value = ( - None - ) - # When - with pytest.raises(SystemExit): - cli.download_audio(youtube_instance, "filetype", "target") - # Then - download.assert_not_called() + assert mime_types == ["video/mp4", "video/mp4", "video/webm"] -@mock.patch("pytube.cli.YouTube.__init__", return_value=None) -def test_perform_args_on_youtube(youtube): - parser = argparse.ArgumentParser() - args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0"]) - cli._parse_args = MagicMock(return_value=args) - cli._perform_args_on_youtube = MagicMock() - cli.main() - youtube.assert_called() - cli._perform_args_on_youtube.assert_called() +def test_order_by_with_none_values(cipher_signature): + abrs = [s.abr for s in cipher_signature.streams.order_by("abr").asc()] + assert abrs == ["50kbps", "70kbps", "96kbps", "128kbps", "160kbps"] -@mock.patch("pytube.cli.os.path.exists", return_value=False) -def test_unique_name(path_exists): - assert cli._unique_name("base", "subtype", "video", "target") == "base_video_0" +def test_get_by_itag(cipher_signature): + """Ensure :meth:`~pytube.StreamQuery.get_by_itag` returns the expected + :class:`Stream `. + """ + assert cipher_signature.streams.get_by_itag(18).itag == 18 -@mock.patch("pytube.cli.os.path.exists") -def test_unique_name_counter(path_exists): - path_exists.side_effect = [True, False] - assert cli._unique_name("base", "subtype", "video", "target") == "base_video_1" +def test_get_by_non_existent_itag(cipher_signature): + assert not cipher_signature.streams.get_by_itag(22983) + + +def test_get_by_resolution(cipher_signature): + assert cipher_signature.streams.get_by_resolution(360).itag == 18 + + +def test_get_lowest_resolution(cipher_signature): + assert cipher_signature.streams.get_lowest_resolution().itag == 18 + + +def test_get_highest_resolution(cipher_signature): + assert cipher_signature.streams.get_highest_resolution().itag == 18 + + +def test_filter_is_dash(cipher_signature): + streams = cipher_signature.streams.filter(is_dash=False) + itags = [s.itag for s in streams] + assert itags == [18, 398, 397, 396, 395, 394] + + +def test_get_audio_only(cipher_signature): + assert cipher_signature.streams.get_best_audio().itag == 140 + + +def test_get_audio_only_with_subtype(cipher_signature): + assert cipher_signature.streams.get_best_audio(subtype="webm").itag == 251 + + +def test_sequence(cipher_signature): + assert len(cipher_signature.streams) == 22 + assert cipher_signature.streams[0] is not None + + +def test_otf(cipher_signature): + non_otf = cipher_signature.streams.otf() + assert len(non_otf) == 22 + + otf = cipher_signature.streams.otf(True) + assert len(otf) == 0 + + +def test_repr(cipher_signature): + assert repr( + cipher_signature.streams.filter( + progressive=True, subtype="mp4", resolution="360p" + ) + ) == ( + '[]' + )