diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-02-16 09:57:23 +0100 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-02-16 09:57:23 +0100 |
commit | 6a137c6ca968654810ccfddd90908a227287387f (patch) | |
tree | d729b258ecc9fb8a58e108f5c534b3850d5e1011 /python_update/raw_update.py | |
parent | feat(yt/update): Port the Python updater to rust (diff) | |
download | yt-6a137c6ca968654810ccfddd90908a227287387f.zip |
refactor(treewide): Remove all references of the now obsolete update_raw.py
Diffstat (limited to '')
-rwxr-xr-x | python_update/raw_update.py | 179 |
1 files changed, 0 insertions, 179 deletions
diff --git a/python_update/raw_update.py b/python_update/raw_update.py deleted file mode 100755 index 28a2bac..0000000 --- a/python_update/raw_update.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/env python - -# yt - A fully featured command line YouTube client -# -# Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> -# SPDX-License-Identifier: GPL-3.0-or-later -# -# This file is part of Yt. -# -# You should have received a copy of the License along with this program. -# If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. - -# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`). -# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b - -from blake3 import blake3 -from dataclasses import dataclass -from functools import partial -from typing import Any, Iterable, Optional, Tuple, TypeVar -import asyncio -import itertools -import json -import logging -import sys - -import yt_dlp - - -@dataclass(frozen=True) -class Playlist: - name: str - url: str - reverse: bool - - -@dataclass(frozen=True) -class Video: - url: str - title: str - description: str - publish_date: float - watch_date: Optional[float] - duration: float - thumbnail_url: Optional[str] - extractor_hash: str - - @property - def watched(self) -> bool: - return self.watch_date is not None - - -logger = logging.getLogger("yt") -logging.basicConfig(encoding="utf-8", level=int(sys.argv[3])) - -_ytdl_logger = logging.getLogger("yt_dlp") -_ytdl_logger.propagate = False -_ytdl_logger.addHandler(logging.NullHandler()) -YTDL_COMMON_OPTS = {"logger": _ytdl_logger} - -T = TypeVar("T") - - -def take(amount: int, iterable: Iterable[T]) -> Iterable[T]: - """Take the first elements of an iterable. - - If the given iterable has less elements than the given amount, the returned iterable has the - same amount of elements as the given iterable. Otherwise the returned iterable has `amount` - elements. - - :param amount: The number of elements to take - :param iterable: The iterable to take elements from - :return: The first elements of the given iterable - """ - for _, elem in zip(range(amount), iterable): - yield elem - - -class Fetcher: - def __init__(self, max_backlog): - self.max_items = max_backlog - self.ydl_opts = { - **YTDL_COMMON_OPTS, - "playliststart": 1, - "playlistend": max_backlog, - "noplaylist": False, - "extractor_args": {"youtubetab": {"approximate_date": [""]}}, - } - - async def get_unprocessed_entries( - self, url: str, hashes: Iterable[str] - ) -> Iterable[Tuple[str, str, Any]]: - result = [] - with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: - logger.info("Checking playlist '%s'...", url) - try: - loop = asyncio.get_event_loop() - info = await loop.run_in_executor( - None, - partial(ydl.extract_info, url, download=False, process=False), - ) - except yt_dlp.DownloadError as download_error: - logger.error( - "Failed to get playlist '%s'. Error was: '%s'", - url, - download_error, - ) - else: - entries = info.get("entries", []) - for entry in take(self.max_items, entries): - logger.debug(json.dumps(entry)) - id = str.encode(yt_dlp.utils.unsmuggle_url(entry["id"])[0]) - ehash = blake3(id).hexdigest() - if ehash not in hashes: - result.append((url, entry)) - return result - - def _process_ie(self, entry): - with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: - processed = ydl.process_ie_result(entry, False) - - return { - "description": processed.get("description"), - "duration": processed.get("duration"), - "upload_date": processed.get("upload_date"), - "thumbnails": processed.get("thumbnails"), - "thumbnail": processed.get("thumbnail"), - "title": processed.get("title"), - "webpage_url": processed.get("webpage_url"), - "id": processed.get("id"), - } - - async def process_entry(self, url: str, entry: Any) -> Optional[Any]: - try: - loop = asyncio.get_event_loop() - processed = await loop.run_in_executor(None, self._process_ie, entry) - except yt_dlp.DownloadError as download_error: - logger.error( - "Failed to get a video of playlist '%s'. Error was: '%s'", - url, - download_error, - ) - return None - else: - print(json.dumps({url: processed})) - - -class Updater: - def __init__(self, max_backlog=20): - self.max_items = max_backlog - self.fetcher = Fetcher(max_backlog) - self.hashes = None - - async def update_url(self, url: str): - logger.info(f"Updating {url}...") - new_entries = await self.fetcher.get_unprocessed_entries(url, self.hashes) - - await asyncio.gather( - *itertools.starmap(self.fetcher.process_entry, new_entries) - ) - - async def do_update(self, urls: Iterable[str]): - await asyncio.gather(*map(self.update_url, urls)) - - def update(self, urls: Iterable[str], hashes: Iterable[str]): - self.hashes = hashes - asyncio.run(self.do_update(urls)) - - -def update(): - max_backlog = int(sys.argv[1]) - subscriptions_number = int(sys.argv[2]) - u = Updater(max_backlog=max_backlog) - u.update( - sys.argv[4 : (4 + subscriptions_number)], sys.argv[(4 + subscriptions_number) :] - ) - - -logger.debug(sys.argv) -update() |