diff options
Diffstat (limited to 'pkgs/by-name/yt')
-rw-r--r-- | pkgs/by-name/yt/yt/Cargo.toml | 2 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/flake.nix | 7 | ||||
-rwxr-xr-x | pkgs/by-name/yt/yt/python_update/raw_update.py | 150 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/cli.rs | 71 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/main.rs | 74 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/status/mod.rs | 2 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/storage/subscriptions.rs | 147 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/storage/video_database/schema.sql | 8 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/subscribe/mod.rs | 65 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/src/update/mod.rs | 339 | ||||
-rw-r--r-- | pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs | 28 |
11 files changed, 554 insertions, 339 deletions
diff --git a/pkgs/by-name/yt/yt/Cargo.toml b/pkgs/by-name/yt/yt/Cargo.toml index d703d0cb..ca2c3c8d 100644 --- a/pkgs/by-name/yt/yt/Cargo.toml +++ b/pkgs/by-name/yt/yt/Cargo.toml @@ -19,7 +19,7 @@ serde_json = "1.0.125" sqlx = { version = "0.7.4", features = ["runtime-tokio", "sqlite"] } stderrlog = "0.6.0" tempfile = "3.12.0" -tokio = { version = "1.39.3", features = ["rt-multi-thread", "macros", "process", "time"] } +tokio = { version = "1.39.3", features = ["rt-multi-thread", "macros", "process", "time", "io-std"] } url = { version = "2.5.2", features = ["serde"] } xdg = "2.5.2" yt_dlp = { path = "./yt_dlp/" } diff --git a/pkgs/by-name/yt/yt/flake.nix b/pkgs/by-name/yt/yt/flake.nix index 5ced0fbd..7733d7c7 100644 --- a/pkgs/by-name/yt/yt/flake.nix +++ b/pkgs/by-name/yt/yt/flake.nix @@ -14,7 +14,10 @@ }: (flake-utils.lib.eachDefaultSystem (system: let pkgs = nixpkgs.legacyPackages."${system}"; - yt-dlp = pkgs.python3.withPackages (ps: [ps.yt-dlp]); + python = pkgs.python3.withPackages (ps: + with ps; [ + yt-dlp + ]); buildInputs = with pkgs; [ mpv-unwrapped.dev @@ -49,7 +52,7 @@ sqlx-cli sqlite-interactive - yt-dlp + python jq cargo-edit diff --git a/pkgs/by-name/yt/yt/python_update/raw_update.py b/pkgs/by-name/yt/yt/python_update/raw_update.py new file mode 100755 index 00000000..3388bc7c --- /dev/null +++ b/pkgs/by-name/yt/yt/python_update/raw_update.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python + +# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`). +# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b + +import asyncio +import itertools +import json +import logging +import sys +from dataclasses import dataclass +from functools import partial +from typing import Any, Iterable, Optional, Tuple, TypeVar + +import yt_dlp + + +@dataclass(frozen=True) +class Playlist: + name: str + url: str + reverse: bool + + +@dataclass(frozen=True) +class Video: + url: str + title: str + description: str + publish_date: float + watch_date: Optional[float] + duration: float + thumbnail_url: Optional[str] + extractor_hash: str + + @property + def watched(self) -> bool: + return self.watch_date is not None + + +logger = logging.getLogger("yt") +logging.basicConfig(encoding="utf-8", level=logging.DEBUG) + +_ytdl_logger = logging.getLogger("yt_dlp") +_ytdl_logger.propagate = False +_ytdl_logger.addHandler(logging.NullHandler()) +YTDL_COMMON_OPTS = {"logger": _ytdl_logger} + +T = TypeVar("T") + + +def take(amount: int, iterable: Iterable[T]) -> Iterable[T]: + """Take the first elements of an iterable. + + If the given iterable has less elements than the given amount, the returned iterable has the + same amount of elements as the given iterable. Otherwise the returned iterable has `amount` + elements. + + :param amount: The number of elements to take + :param iterable: The iterable to take elements from + :return: The first elements of the given iterable + """ + for _, elem in zip(range(amount), iterable): + yield elem + + +class Fetcher: + def __init__(self, max_backlog): + self.max_items = max_backlog + self.ydl_opts = { + **YTDL_COMMON_OPTS, + "playliststart": 1, + "playlistend": max_backlog, + "noplaylist": False, + "extractor_args": {"youtubetab": {"approximate_date": [""]}}, + } + + async def get_unprocessed_entries(self, url: str) -> Iterable[Tuple[str, Any]]: + result = [] + with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: + logger.info("Checking playlist '%s'...", url) + try: + loop = asyncio.get_event_loop() + info = await loop.run_in_executor( + None, + partial(ydl.extract_info, url, download=False, process=False), + ) + except yt_dlp.DownloadError as download_error: + logger.error( + "Failed to get playlist '%s'. Error was: '%s'", + url, + download_error, + ) + else: + entries = info.get("entries", []) + for entry in take(self.max_items, entries): + result.append((url, entry)) + return result + + def _process_ie(self, entry): + with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: + processed = ydl.process_ie_result(entry, False) + + # walk through the ie_result dictionary to force evaluation of lazily loaded resources + repr(processed) + + return processed + + async def process_entry(self, url: str, entry: Any) -> Optional[Any]: + try: + loop = asyncio.get_event_loop() + processed = await loop.run_in_executor(None, self._process_ie, entry) + except yt_dlp.DownloadError as download_error: + logger.error( + "Failed to get a video of playlist '%s'. Error was: '%s'", + url, + download_error, + ) + return None + else: + print(json.dumps({url: processed})) + + +class Updater: + def __init__(self, max_backlog=20): + self.max_items = max_backlog + self.fetcher = Fetcher(max_backlog) + + async def update_url(self, url: str): + print(f"Updating {url}...", file=sys.stderr) + new_entries = await self.fetcher.get_unprocessed_entries(url) + + await asyncio.gather( + *itertools.starmap(self.fetcher.process_entry, new_entries) + ) + + async def do_update(self, urls: Iterable[str]): + await asyncio.gather(*map(self.update_url, urls)) + + def update(self, urls: Iterable[str]): + asyncio.run(self.do_update(urls)) + + +def update(max_backlog: int): + u = Updater(max_backlog=max_backlog) + u.update(sys.argv[2:]) + + +max_backlog = int(sys.argv[1]) +update(max_backlog) diff --git a/pkgs/by-name/yt/yt/src/cli.rs b/pkgs/by-name/yt/yt/src/cli.rs index 45590d56..23d400f8 100644 --- a/pkgs/by-name/yt/yt/src/cli.rs +++ b/pkgs/by-name/yt/yt/src/cli.rs @@ -65,22 +65,6 @@ pub enum Command { cmd: Option<SelectCommand>, }, - /// Subscribe to an URL - Subscribe { - #[arg(short, long)] - /// The human readable name of the subscription - name: Option<String>, - - /// The URL to listen to - url: Url, - }, - - /// Unsubscribe from an URL - Unsubscribe { - /// The human readable name of the subscription - name: String, - }, - /// Update the video database Update { #[arg(short, long, default_value = "20")] @@ -96,18 +80,12 @@ pub enum Command { concurrent_processes: usize, }, - /// Update one subscription (this is here to parallelize the normal `update`) - #[command(hide = true)] - UpdateOnce { - /// The name of the subscription to update - sub_name: String, - - /// The number of videos to stop updating - max_backlog: u32, + /// Manipulate subscription + #[command(visible_alias = "subs")] + Subscriptions { + #[command(subcommand)] + cmd: SubscriptionCommand, }, - - /// List all subscriptions - Subscriptions {}, } impl Default for Command { @@ -118,6 +96,42 @@ impl Default for Command { } } +#[derive(Subcommand, Clone, Debug)] +pub enum SubscriptionCommand { + /// Subscribe to an URL + Add { + #[arg(short, long)] + /// The human readable name of the subscription + name: Option<String>, + + /// The URL to listen to + url: Url, + }, + + /// Unsubscribe from an URL + Remove { + /// The human readable name of the subscription + name: String, + }, + + /// Import a bunch of URLs as subscriptions. + Import { + /// The file containing the URLs. Will use Stdin otherwise. + file: Option<PathBuf>, + + /// Remove any previous subscriptions + #[arg(short, long)] + force: bool + }, + + /// List all subscriptions + List { + /// Only show the URLs + #[arg(short, long)] + url: bool, + }, +} + #[derive(Clone, Debug, Args)] #[command(infer_subcommands = true)] /// Mark the video given by the hash to be watched @@ -192,6 +206,9 @@ impl Default for SelectCommand { pub enum CheckCommand { /// Check if the given info.json is deserializable InfoJson { path: PathBuf }, + + /// Check if the given update info.json is deserializable + UpdateInfoJson { path: PathBuf }, } #[derive(Subcommand, Clone, Copy, Debug)] diff --git a/pkgs/by-name/yt/yt/src/main.rs b/pkgs/by-name/yt/yt/src/main.rs index 302fba1e..ab16cfa7 100644 --- a/pkgs/by-name/yt/yt/src/main.rs +++ b/pkgs/by-name/yt/yt/src/main.rs @@ -1,11 +1,16 @@ -use std::fs; +use std::{collections::HashMap, fs}; use anyhow::{bail, Context, Result}; use app::App; use cache::invalidate; use clap::Parser; -use cli::{CacheCommand, CheckCommand, SelectCommand}; +use cli::{CacheCommand, CheckCommand, SelectCommand, SubscriptionCommand}; use select::cmds::handle_select_cmd; +use tokio::{ + fs::File, + io::{stdin, BufReader}, +}; +use url::Url; use yt_dlp::wrapper::info_json::InfoJson; use crate::{cli::Command, storage::subscriptions::get_subscriptions}; @@ -56,22 +61,12 @@ async fn main() -> Result<()> { _ => handle_select_cmd(&app, cmd, None).await?, } } - Command::Subscribe { name, url } => { - subscribe::subscribe(name, url) - .await - .context("Failed to add a subscription")?; - } - Command::Unsubscribe { name } => { - subscribe::unsubscribe(name) - .await - .context("Failed to remove a subscription")?; - } Command::Update { max_backlog, subscriptions, concurrent_processes, } => { - let all_subs = get_subscriptions()?; + let all_subs = get_subscriptions(&app).await?; for sub in &subscriptions { if let None = all_subs.0.get(sub) { @@ -82,21 +77,43 @@ async fn main() -> Result<()> { } } - update::update(max_backlog, subscriptions, concurrent_processes).await?; - } - Command::UpdateOnce { - sub_name, - max_backlog, - } => { - update::update_once(&app, sub_name, max_backlog).await?; + update::update(&app, max_backlog, subscriptions, concurrent_processes).await?; } - Command::Subscriptions {} => { - let all_subs = get_subscriptions()?; - for (key, val) in all_subs.0 { - println!("{}: '{}'", key, val.url); + Command::Subscriptions { cmd } => match cmd { + SubscriptionCommand::Add { name, url } => { + subscribe::subscribe(&app, name, url) + .await + .context("Failed to add a subscription")?; } - } + SubscriptionCommand::Remove { name } => { + subscribe::unsubscribe(&app, name) + .await + .context("Failed to remove a subscription")?; + } + SubscriptionCommand::List { url } => { + let all_subs = get_subscriptions(&app).await?; + + if url { + for val in all_subs.0.values() { + println!("{}", val.url); + } + } else { + for (key, val) in all_subs.0 { + println!("{}: '{}'", key, val.url); + } + } + } + SubscriptionCommand::Import { file, force } => { + if let Some(file) = file { + let f = File::open(file).await?; + + subscribe::import(&app, BufReader::new(f), force).await? + } else { + subscribe::import(&app, BufReader::new(stdin()), force).await? + }; + } + }, Command::Watch {} => watch::watch(&app).await?, @@ -115,6 +132,13 @@ async fn main() -> Result<()> { let _: InfoJson = serde_json::from_str(&string).context("Failed to deserialize value")?; } + CheckCommand::UpdateInfoJson { path } => { + let string = fs::read_to_string(&path) + .with_context(|| format!("Failed to read '{}' to string!", path.display()))?; + + let _: HashMap<Url, InfoJson> = + serde_json::from_str(&string).context("Failed to deserialize value")?; + } }, Command::Comments {} => { comments::comments(&app).await?; diff --git a/pkgs/by-name/yt/yt/src/status/mod.rs b/pkgs/by-name/yt/yt/src/status/mod.rs index 43632048..b0ce2bab 100644 --- a/pkgs/by-name/yt/yt/src/status/mod.rs +++ b/pkgs/by-name/yt/yt/src/status/mod.rs @@ -60,7 +60,7 @@ pub async fn show(app: &App) -> Result<()> { let drop_videos_changing = (get!(@changing all_videos, Drop)).len(); let dropped_videos_changing = (get!(@changing all_videos, Dropped)).len(); - let subscriptions = get_subscriptions()?; + let subscriptions = get_subscriptions(&app).await?; let subscriptions_len = subscriptions.0.len(); println!( "\ diff --git a/pkgs/by-name/yt/yt/src/storage/subscriptions.rs b/pkgs/by-name/yt/yt/src/storage/subscriptions.rs index 13ec493b..0c262e1a 100644 --- a/pkgs/by-name/yt/yt/src/storage/subscriptions.rs +++ b/pkgs/by-name/yt/yt/src/storage/subscriptions.rs @@ -1,20 +1,17 @@ //! Handle subscriptions -use std::{ - collections::HashMap, - fs::{self, File}, -}; - -use anyhow::{bail, Context, Result}; -use log::{debug, warn}; -use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +use anyhow::Result; +use log::debug; use serde_json::{json, Value}; +use sqlx::query; use url::Url; use yt_dlp::wrapper::info_json::InfoType; -use crate::constants; +use crate::app::App; -#[derive(Deserialize, Serialize, Clone, Debug)] +#[derive(Clone, Debug)] pub struct Subscription { /// The human readable name of this subscription pub name: String, @@ -48,78 +45,86 @@ pub async fn check_url(url: &Url) -> Result<bool> { Ok(info._type == Some(InfoType::Playlist)) } -#[derive(Deserialize, Serialize, Default)] +#[derive(Default)] pub struct Subscriptions(pub(crate) HashMap<String, Subscription>); -/// Get a list of subscriptions -pub fn get_subscriptions() -> Result<Subscriptions> { - let subscriptions_path = constants::subscriptions()?; - - if subscriptions_path.exists() { - let subscriptions_file = File::open(&subscriptions_path).with_context(|| { - format!( - "Failed to open the subscription file at: '{}'", - subscriptions_path.display() - ) - })?; - - let subscriptions = serde_json::from_reader(subscriptions_file) - .context("Failed to deserialize the subscriptions file.")?; - - Ok(subscriptions) - } else { - warn!("No subscriptions file found!"); - return Ok(Subscriptions::default()); - } -} - -pub fn write_subscriptions(subs: Subscriptions) -> Result<()> { - let subscriptions_path = constants::subscriptions()?; - - fs::create_dir_all(subscriptions_path.parent().expect("Should have a parent"))?; - - let subscriptions_file = File::create(&subscriptions_path).with_context(|| { - format!( - "Failed to create the subscription file at: '{}'", - subscriptions_path.display() - ) - })?; - - serde_json::to_writer(subscriptions_file, &subs) - .context("Failed to serialize and write the subscriptions file.")?; +pub async fn remove_all_subscriptions(app: &App) -> Result<()> { + query!( + " + DELETE FROM subscriptions; + ", + ) + .execute(&app.database) + .await?; Ok(()) } -pub async fn remove_subscription(sub: &Subscription) -> Result<()> { - let mut present_subscriptions = get_subscriptions()?; - - let string = format!("Unsubscribed from '{}' at '{}'", sub.name, sub.url); - - if let None = present_subscriptions.0.remove(&sub.name) { - bail!( - "The subscription '{}' was not in your subscription list!", - sub.name - ); - } - - write_subscriptions(present_subscriptions)?; +/// Get a list of subscriptions +pub async fn get_subscriptions(app: &App) -> Result<Subscriptions> { + let raw_subs = query!( + " + SELECT * + FROM subscriptions; + " + ) + .fetch_all(&app.database) + .await?; + + let subscriptions: HashMap<String, Subscription> = raw_subs + .into_iter() + .map(|sub| { + ( + sub.name.clone(), + Subscription::new( + sub.name, + Url::parse(&sub.url).expect("This should be valid"), + ), + ) + }) + .collect(); - println!("{}", string); + Ok(Subscriptions(subscriptions)) +} +pub async fn add_subscription(app: &App, sub: &Subscription) -> Result<()> { + let url = sub.url.to_string(); + + query!( + " + INSERT INTO subscriptions ( + name, + url + ) VALUES (?, ?); + ", + sub.name, + url + ) + .execute(&app.database) + .await?; + + println!("Subscribed to '{}' at '{}'", sub.name, sub.url); Ok(()) } -pub async fn add_subscription(sub: Subscription) -> Result<()> { - let mut present_subscriptions = get_subscriptions()?; - - let string = format!("Subscribed to {} at {}", sub.name, sub.url); - - present_subscriptions.0.insert(sub.name.clone(), sub); - - write_subscriptions(present_subscriptions)?; - - println!("{}", string); +pub async fn remove_subscription(app: &App, sub: &Subscription) -> Result<()> { + let output = query!( + " + DELETE FROM subscriptions + WHERE name = ? + ", + sub.name, + ) + .execute(&app.database) + .await?; + + assert_eq!( + output.rows_affected(), + 1, + "The remove subscriptino query did effect more (or less) than one row. This is a bug." + ); + + println!("Unsubscribed from '{}' at '{}'", sub.name, sub.url); Ok(()) } diff --git a/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql b/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql index 2e9e18af..74378ff8 100644 --- a/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql +++ b/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql @@ -37,4 +37,10 @@ CREATE TABLE IF NOT EXISTS video_options ( subtitle_langs TEXT NOT NULL, playback_speed REAL NOT NULL, FOREIGN KEY(extractor_hash) REFERENCES videos (extractor_hash) -) +); + +-- Store subscriptions +CREATE TABLE IF NOT EXISTS subscriptions ( + name TEXT UNIQUE NOT NULL PRIMARY KEY, + url TEXT NOT NULL +); diff --git a/pkgs/by-name/yt/yt/src/subscribe/mod.rs b/pkgs/by-name/yt/yt/src/subscribe/mod.rs index 88be038a..3311dc42 100644 --- a/pkgs/by-name/yt/yt/src/subscribe/mod.rs +++ b/pkgs/by-name/yt/yt/src/subscribe/mod.rs @@ -1,19 +1,26 @@ +use std::str::FromStr; + use anyhow::{bail, Context, Result}; use futures::FutureExt; use log::warn; use serde_json::{json, Value}; +use tokio::io::{AsyncBufRead, AsyncBufReadExt}; use url::Url; use yt_dlp::wrapper::info_json::InfoType; -use crate::storage::subscriptions::{ - add_subscription, check_url, get_subscriptions, remove_subscription, Subscription, +use crate::{ + app::App, + storage::subscriptions::{ + add_subscription, check_url, get_subscriptions, remove_all_subscriptions, + remove_subscription, Subscription, + }, }; -pub async fn unsubscribe(name: String) -> Result<()> { - let present_subscriptions = get_subscriptions()?; +pub async fn unsubscribe(app: &App, name: String) -> Result<()> { + let present_subscriptions = get_subscriptions(&app).await?; if let Some(subscription) = present_subscriptions.0.get(&name) { - remove_subscription(subscription).await?; + remove_subscription(&app, subscription).await?; } else { bail!("Couldn't find subscription: '{}'", &name); } @@ -21,7 +28,36 @@ pub async fn unsubscribe(name: String) -> Result<()> { Ok(()) } -pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> { +pub async fn import<W: AsyncBufRead + AsyncBufReadExt + Unpin>( + app: &App, + reader: W, + force: bool, +) -> Result<()> { + if force { + remove_all_subscriptions(&app).await?; + } + + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + let url = + Url::from_str(&line).with_context(|| format!("Failed to parse '{}' as url", line))?; + match subscribe(app, None, url) + .await + .with_context(|| format!("Failed to subscribe to: '{}'", line)) + { + Ok(_) => (), + Err(err) => eprintln!( + "Error while subscribing to '{}': '{}'", + line, + err.source().expect("Should have a source").to_string() + ), + } + } + + Ok(()) +} + +pub async fn subscribe(app: &App, name: Option<String>, url: Url) -> Result<()> { if !(url.as_str().ends_with("videos") || url.as_str().ends_with("streams") || url.as_str().ends_with("shorts")) @@ -35,6 +71,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> { if let Some(name) = name { let out: Result<()> = async move { actual_subscribe( + &app, Some(name.clone() + " {Videos}"), url.join("videos/").expect("Works"), ) @@ -44,6 +81,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> { })?; actual_subscribe( + &app, Some(name.clone() + " {Streams}"), url.join("streams/").expect("Works"), ) @@ -53,6 +91,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> { })?; actual_subscribe( + &app, Some(name.clone() + " {Shorts}"), url.join("shorts/").expect("Works"), ) @@ -66,26 +105,26 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> { out? } else { - actual_subscribe(None, url.join("videos/").expect("Works")) + actual_subscribe(&app, None, url.join("videos/").expect("Works")) .await .with_context(|| format!("Failed to subscribe to the '{}' variant", "{Videos}"))?; - actual_subscribe(None, url.join("streams/").expect("Works")) + actual_subscribe(&app, None, url.join("streams/").expect("Works")) .await .with_context(|| format!("Failed to subscribe to the '{}' variant", "{Streams}"))?; - actual_subscribe(None, url.join("shorts/").expect("Works")) + actual_subscribe(&app, None, url.join("shorts/").expect("Works")) .await .with_context(|| format!("Failed to subscribe to the '{}' variant", "{Shorts}"))?; } } else { - actual_subscribe(name, url).await?; + actual_subscribe(&app, name, url).await?; } Ok(()) } -async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> { +async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<()> { let name = if let Some(name) = name { if !check_url(&url).await? { bail!("The url ('{}') does not represent a playlist!", &url) @@ -112,7 +151,7 @@ async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> { } }; - let present_subscriptions = get_subscriptions()?; + let present_subscriptions = get_subscriptions(&app).await?; if let Some(subs) = present_subscriptions.0.get(&name) { bail!( @@ -126,7 +165,7 @@ async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> { let sub = Subscription { name, url }; - add_subscription(sub).await?; + add_subscription(&app, &sub).await?; Ok(()) } diff --git a/pkgs/by-name/yt/yt/src/update/mod.rs b/pkgs/by-name/yt/yt/src/update/mod.rs index 2cbae9fd..60399a0e 100644 --- a/pkgs/by-name/yt/yt/src/update/mod.rs +++ b/pkgs/by-name/yt/yt/src/update/mod.rs @@ -1,125 +1,98 @@ -use std::{env::current_exe, str::FromStr, sync::Arc}; +use std::{collections::HashMap, process::Stdio, str::FromStr}; -use anyhow::{bail, Context, Ok, Result}; +use anyhow::{Context, Ok, Result}; use chrono::{DateTime, Utc}; -use log::{debug, error, info, warn}; -use serde_json::{json, Value}; -use tokio::{process::Command, sync::Semaphore, task::JoinSet}; -use yt_dlp::unsmuggle_url; +use log::{error, info, warn}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::Command, +}; +use url::Url; +use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson}; use crate::{ app::App, storage::{ subscriptions::{get_subscriptions, Subscription}, video_database::{ - extractor_hash::ExtractorHash, getters::get_video_hashes, setters::add_video, Video, + extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video, VideoStatus, }, }, }; pub async fn update( + app: &App, max_backlog: u32, subs_to_update: Vec<String>, - concurrent_processes: usize, + _concurrent_processes: usize, ) -> Result<()> { - let subscriptions = get_subscriptions()?; - - let mut join_set = JoinSet::new(); - let permits = Arc::new(Semaphore::const_new(concurrent_processes)); - - for key in subscriptions.0.into_keys() { - if subs_to_update.contains(&key) || subs_to_update.is_empty() { - let new_permits = Arc::clone(&permits); - - join_set.spawn(async move { - let _permit = new_permits - .acquire() - .await - .expect("The semaphore should not be closed"); - - debug!( - "Started downloading: `yt 'update-once' '{}' '{}'`", - &key, - max_backlog.to_string() - ); - - let exe_name = current_exe().context("Failed to get the current executable")?; - let mut child = Command::new(exe_name) - // TODO: Add currying of the verbosity flags <2024-07-28> - // .arg("-vvvv") - .arg("update-once") - .arg(&key) - .arg(max_backlog.to_string()) - .spawn() - .context("Failed to call yt update-once")?; - - let output = child.wait().await; - - Ok((output, key)) - }); + let subscriptions = get_subscriptions(&app).await?; + let mut back_subs: HashMap<Url, Subscription> = HashMap::new(); + + let mut urls: Vec<String> = vec![]; + for (name, sub) in subscriptions.0 { + if subs_to_update.contains(&name) || subs_to_update.is_empty() { + urls.push(sub.url.to_string()); + back_subs.insert(sub.url.clone(), sub); } else { info!( "Not updating subscription '{}' as it was not specified", - key + name ); } } - while let Some(res) = join_set.join_next().await { - let (output, key) = res??; - - debug!("{} finished its update.", &key); - match output { - std::result::Result::Ok(status) => { - if !status.success() { - // This should already be printed as error, so we only print it here as debug. - debug!("A yt update-once invokation failed for subscription: '{key}'") - } - } - Err(err) => { - error!("Failed to update subscription ('{key}') because of io error: '{err}'") - } + let mut child = Command::new("./python_update/raw_update.py") + .arg(max_backlog.to_string()) + .args(&urls) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .stdin(Stdio::null()) + .spawn() + .context("Failed to call python3 update_raw")?; + + let mut out = BufReader::new( + child + .stdout + .take() + .expect("Should be able to take child stdout"), + ) + .lines(); + + let hashes = get_all_hashes(app).await?; + + while let Some(line) = out.next_line().await? { + // use tokio::{fs::File, io::AsyncWriteExt}; + // let mut output = File::create("output.json").await?; + // output.write(line.as_bytes()).await?; + // output.flush().await?; + // output.sync_all().await?; + // drop(output); + + let output_json: HashMap<Url, InfoJson> = + serde_json::from_str(&line).expect("This should be valid json"); + + for (url, value) in output_json { + let sub = back_subs.get(&url).expect("This was stored before"); + process_subscription(app, sub, value, &hashes).await? } } - Ok(()) -} - -pub async fn update_once(app: &App, sub_name: String, max_backlog: u32) -> Result<()> { - let subs = get_subscriptions()?; - - if let Some(sub) = subs.0.get(&sub_name) { - if let Err(err) = update_subscription(app, sub, max_backlog).await { - error!("Failed to update subscription ('{sub_name}'): {err}") - } - } else { - bail!( - "Your subscription name '{}' is not a subscription (This is probably a bug)!", - sub_name - ); + let out = child.wait().await?; + if out.success() { + error!("A yt update-once invokation failed for all subscriptions.") } Ok(()) } -async fn update_subscription(app: &App, sub: &Subscription, max_backlog: u32) -> Result<()> { - println!("Updating subscription '{}'...", sub.name); - - let yt_opts = match json!( { - "playliststart": 1, - "playlistend": max_backlog, - // "noplaylist": false, - "extractor_args": { - "youtubetab": { - "approximate_date": [""] - } - } - }) { - Value::Object(map) => map, - _ => unreachable!("This is hardcoded"), - }; - +async fn process_subscription( + app: &App, + sub: &Subscription, + entry: InfoJson, + hashes: &Vec<blake3::Hash>, +) -> Result<()> { macro_rules! unwrap_option { ($option:expr) => { match $option { @@ -133,108 +106,92 @@ async fn update_subscription(app: &App, sub: &Subscription, max_backlog: u32) -> }; } - let videos = yt_dlp::extract_info(&yt_opts, &sub.url, false, true) - .await - .with_context(|| { - format!( - "Failed to get info for subscription '{}': '{}'", - sub.name, sub.url - ) - })?; - - let hashes = get_video_hashes(app, &sub).await?; - - let real_videos = unwrap_option!(videos.entries) - .iter() - .map(|entry| -> Result<Option<Video>> { - let publish_date = if let Some(date) = &entry.upload_date { - let year: u32 = date - .chars() - .take(4) - .collect::<String>() - .parse() - .expect("Should work."); - let month: u32 = date - .chars() - .skip(4) - .take(2) - .collect::<String>() - .parse() - .expect("Should work"); - let day: u32 = date - .chars() - .skip(6) - .take(2) - .collect::<String>() - .parse() - .expect("Should work"); - - let date_string = format!("{year:04}-{month:02}-{day:02}T00:00:00Z"); - Some( - DateTime::<Utc>::from_str(&date_string) - .expect("This should always work") - .timestamp(), - ) - } else { - warn!( - "The video '{}' lacks it's upload date!", - unwrap_option!(&entry.title) - ); - None - }; - - let thumbnail_url = match (&entry.thumbnails, &entry.thumbnail) { - (None, None) => None, - (None, Some(thumbnail)) => Some(thumbnail.to_owned()), - - // TODO: The algorithm is not exactly the best <2024-05-28> - (Some(thumbnails), None) => Some( - thumbnails - .get(0) - .expect("At least one should exist") - .url - .clone(), - ), - (Some(_), Some(thumnail)) => Some(thumnail.to_owned()), - }; - - let url = { - let smug_url: url::Url = unwrap_option!(entry.webpage_url.clone()); - unsmuggle_url(smug_url)? - }; - - let extractor_hash = blake3::hash(url.as_str().as_bytes()); - - if hashes.contains(&extractor_hash) { - // We already stored the video information - Ok(None) - } else { - let video = Video { - cache_path: None, - description: entry.description.clone(), - duration: entry.duration, - extractor_hash: ExtractorHash::from_hash(extractor_hash), - last_status_change: Utc::now().timestamp(), - parent_subscription_name: Some(sub.name.clone()), - priority: 0, - publish_date, - status: VideoStatus::Pick, - status_change: false, - thumbnail_url, - title: unwrap_option!(entry.title.clone()), - url, - }; - - println!("{}", video.to_color_display()); - Ok(Some(video)) - } - }) - .collect::<Result<Vec<Option<Video>>>>()?; + let publish_date = if let Some(date) = &entry.upload_date { + let year: u32 = date + .chars() + .take(4) + .collect::<String>() + .parse() + .expect("Should work."); + let month: u32 = date + .chars() + .skip(4) + .take(2) + .collect::<String>() + .parse() + .expect("Should work"); + let day: u32 = date + .chars() + .skip(6) + .take(2) + .collect::<String>() + .parse() + .expect("Should work"); + + let date_string = format!("{year:04}-{month:02}-{day:02}T00:00:00Z"); + Some( + DateTime::<Utc>::from_str(&date_string) + .expect("This should always work") + .timestamp(), + ) + } else { + warn!( + "The video '{}' lacks it's upload date!", + unwrap_option!(&entry.title) + ); + None + }; - for video in real_videos { - if let Some(video) = video { - add_video(app, video).await?; - } + let thumbnail_url = match (&entry.thumbnails, &entry.thumbnail) { + (None, None) => None, + (None, Some(thumbnail)) => Some(thumbnail.to_owned()), + + // TODO: The algorithm is not exactly the best <2024-05-28> + (Some(thumbnails), None) => Some( + thumbnails + .get(0) + .expect("At least one should exist") + .url + .clone(), + ), + (Some(_), Some(thumnail)) => Some(thumnail.to_owned()), + }; + + let url = { + let smug_url: url::Url = unwrap_option!(entry.webpage_url.clone()); + unsmuggle_url(smug_url)? + }; + + let extractor_hash = blake3::hash(url.as_str().as_bytes()); + + if hashes.contains(&extractor_hash) { + // We already stored the video information + println!( + "(Ignoring duplicated video from: '{}' -> '{}')", + sub.name, + unwrap_option!(entry.title) + ); + return Ok(()); + } else { + let video = Video { + cache_path: None, + description: entry.description.clone(), + duration: entry.duration, + extractor_hash: ExtractorHash::from_hash(extractor_hash), + last_status_change: Utc::now().timestamp(), + parent_subscription_name: Some(sub.name.clone()), + priority: 0, + publish_date, + status: VideoStatus::Pick, + status_change: false, + thumbnail_url, + title: unwrap_option!(entry.title.clone()), + url, + }; + + println!("{}", video.to_color_display()); + add_video(app, video).await?; } + Ok(()) } diff --git a/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs b/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs index 9b50aa93..46da6eee 100644 --- a/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs +++ b/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs @@ -41,6 +41,7 @@ pub struct InfoJson { pub chapters: Option<Vec<Chapter>>, pub comment_count: Option<u32>, pub comments: Option<Vec<Comment>>, + pub concurrent_view_count: Option<u32>, pub description: Option<String>, pub display_id: Option<String>, pub downloader_options: Option<DownloaderOptions>, @@ -48,6 +49,8 @@ pub struct InfoJson { pub duration_string: Option<String>, pub dynamic_range: Option<String>, pub entries: Option<Vec<InfoJson>>, + pub episode: Option<String>, + pub episode_number: Option<u32>, pub epoch: Option<u32>, pub ext: Option<String>, pub extractor: Option<Extractor>, @@ -99,6 +102,9 @@ pub struct InfoJson { pub requested_formats: Option<Vec<Format>>, pub requested_subtitles: Option<HashMap<String, Subtitle>>, pub resolution: Option<String>, + pub season: Option<String>, + pub season_number: Option<u32>, + pub series: Option<String>, pub source_preference: Option<i32>, pub sponsorblock_chapters: Option<Vec<SponsorblockChapter>>, pub stretched_ratio: Option<Todo>, @@ -283,6 +289,9 @@ pub enum Extractor { #[serde(alias = "generic")] Generic, + #[serde(alias = "SVTSeries")] + SVTSeries, + #[serde(alias = "youtube")] YouTube, @@ -296,6 +305,9 @@ pub enum ExtractorKey { #[serde(alias = "Generic")] Generic, + #[serde(alias = "SVTSeries")] + SVTSeries, + #[serde(alias = "Youtube")] YouTube, @@ -425,7 +437,7 @@ pub struct ThumbNail { #[serde(deny_unknown_fields)] pub struct Format { pub __needs_testing: Option<bool>, - pub __working: Option<Todo>, + pub __working: Option<bool>, pub abr: Option<f64>, pub acodec: Option<String>, pub aspect_ratio: Option<f64>, @@ -450,8 +462,10 @@ pub struct Format { pub has_drm: Option<bool>, pub height: Option<u32>, pub http_headers: Option<HttpHeader>, + pub is_dash_periods: Option<bool>, pub language: Option<String>, pub language_preference: Option<i32>, + pub manifest_stream_number: Option<u32>, pub manifest_url: Option<Url>, pub preference: Option<i32>, pub protocol: Option<String>, @@ -477,20 +491,20 @@ pub struct DownloaderOptions { #[serde(deny_unknown_fields)] pub struct HttpHeader { #[serde(alias = "User-Agent")] - pub user_agent: String, + pub user_agent: Option<String>, #[serde(alias = "Accept")] - pub accept: String, + pub accept: Option<String>, #[serde(alias = "Accept-Language")] - pub accept_language: String, + pub accept_language: Option<String>, #[serde(alias = "Sec-Fetch-Mode")] - pub sec_fetch_mode: String, + pub sec_fetch_mode: Option<String>, } #[derive(Debug, Deserialize, Serialize, PartialEq, PartialOrd)] #[serde(deny_unknown_fields)] pub struct Fragment { - pub url: String, - pub duration: f64, + pub url: Option<Url>, + pub duration: Option<f64>, pub path: Option<PathBuf>, } |