aboutsummaryrefslogtreecommitdiffstats
path: root/python_update
diff options
context:
space:
mode:
Diffstat (limited to 'python_update')
-rwxr-xr-xpython_update/raw_update.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/python_update/raw_update.py b/python_update/raw_update.py
new file mode 100755
index 0000000..82be0a1
--- /dev/null
+++ b/python_update/raw_update.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+
+# yt - A fully featured command line YouTube client
+#
+# Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Yt.
+#
+# You should have received a copy of the License along with this program.
+# If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
+
+# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`).
+# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b
+
+import asyncio
+import itertools
+import json
+import logging
+import sys
+from dataclasses import dataclass
+from functools import partial
+from typing import Any, Iterable, Optional, Tuple, TypeVar
+
+import yt_dlp
+
+
+@dataclass(frozen=True)
+class Playlist:
+ name: str
+ url: str
+ reverse: bool
+
+
+@dataclass(frozen=True)
+class Video:
+ url: str
+ title: str
+ description: str
+ publish_date: float
+ watch_date: Optional[float]
+ duration: float
+ thumbnail_url: Optional[str]
+ extractor_hash: str
+
+ @property
+ def watched(self) -> bool:
+ return self.watch_date is not None
+
+
+logger = logging.getLogger("yt")
+logging.basicConfig(encoding="utf-8", level=logging.DEBUG)
+
+_ytdl_logger = logging.getLogger("yt_dlp")
+_ytdl_logger.propagate = False
+_ytdl_logger.addHandler(logging.NullHandler())
+YTDL_COMMON_OPTS = {"logger": _ytdl_logger}
+
+T = TypeVar("T")
+
+
+def take(amount: int, iterable: Iterable[T]) -> Iterable[T]:
+ """Take the first elements of an iterable.
+
+ If the given iterable has less elements than the given amount, the returned iterable has the
+ same amount of elements as the given iterable. Otherwise the returned iterable has `amount`
+ elements.
+
+ :param amount: The number of elements to take
+ :param iterable: The iterable to take elements from
+ :return: The first elements of the given iterable
+ """
+ for _, elem in zip(range(amount), iterable):
+ yield elem
+
+
+class Fetcher:
+ def __init__(self, max_backlog):
+ self.max_items = max_backlog
+ self.ydl_opts = {
+ **YTDL_COMMON_OPTS,
+ "playliststart": 1,
+ "playlistend": max_backlog,
+ "noplaylist": False,
+ "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+ }
+
+ async def get_unprocessed_entries(self, url: str) -> Iterable[Tuple[str, Any]]:
+ result = []
+ with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+ logger.info("Checking playlist '%s'...", url)
+ try:
+ loop = asyncio.get_event_loop()
+ info = await loop.run_in_executor(
+ None,
+ partial(ydl.extract_info, url, download=False, process=False),
+ )
+ except yt_dlp.DownloadError as download_error:
+ logger.error(
+ "Failed to get playlist '%s'. Error was: '%s'",
+ url,
+ download_error,
+ )
+ else:
+ entries = info.get("entries", [])
+ for entry in take(self.max_items, entries):
+ result.append((url, entry))
+ return result
+
+ def _process_ie(self, entry):
+ with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+ processed = ydl.process_ie_result(entry, False)
+
+ # walk through the ie_result dictionary to force evaluation of lazily loaded resources
+ repr(processed)
+
+ return processed
+
+ async def process_entry(self, url: str, entry: Any) -> Optional[Any]:
+ try:
+ loop = asyncio.get_event_loop()
+ processed = await loop.run_in_executor(None, self._process_ie, entry)
+ except yt_dlp.DownloadError as download_error:
+ logger.error(
+ "Failed to get a video of playlist '%s'. Error was: '%s'",
+ url,
+ download_error,
+ )
+ return None
+ else:
+ print(json.dumps({url: processed}))
+
+
+class Updater:
+ def __init__(self, max_backlog=20):
+ self.max_items = max_backlog
+ self.fetcher = Fetcher(max_backlog)
+
+ async def update_url(self, url: str):
+ print(f"Updating {url}...", file=sys.stderr)
+ new_entries = await self.fetcher.get_unprocessed_entries(url)
+
+ await asyncio.gather(
+ *itertools.starmap(self.fetcher.process_entry, new_entries)
+ )
+
+ async def do_update(self, urls: Iterable[str]):
+ await asyncio.gather(*map(self.update_url, urls))
+
+ def update(self, urls: Iterable[str]):
+ asyncio.run(self.do_update(urls))
+
+
+def update(max_backlog: int):
+ u = Updater(max_backlog=max_backlog)
+ u.update(sys.argv[2:])
+
+
+max_backlog = int(sys.argv[1])
+update(max_backlog)