diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-06-13 20:54:49 +0200 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-06-13 20:56:40 +0200 |
commit | 69145b4deed4fe512239a9f88e6af69d3b8c0309 (patch) | |
tree | 7643edd350c1cec75f91e0982ffd3c840f8661fb /yt/src | |
parent | build(treewide): Update (diff) | |
download | yt-69145b4deed4fe512239a9f88e6af69d3b8c0309.zip |
feat({yt_dlp,yt}): Migrate from pyo3 to rustpython
That allows us to avoid cpython's GIL and gives us full ability to leverage async/concurrent code to speed up python operations. I have also taken the opportunity to change the `InfoJson` struct to an untyped json value, as that is what it actually is.
Diffstat (limited to 'yt/src')
-rw-r--r-- | yt/src/cli.rs | 12 | ||||
-rw-r--r-- | yt/src/comments/comment.rs | 89 | ||||
-rw-r--r-- | yt/src/comments/description.rs | 8 | ||||
-rw-r--r-- | yt/src/comments/mod.rs | 26 | ||||
-rw-r--r-- | yt/src/download/download_options.rs | 197 | ||||
-rw-r--r-- | yt/src/download/mod.rs | 27 | ||||
-rw-r--r-- | yt/src/download/progress_hook.rs | 190 | ||||
-rw-r--r-- | yt/src/main.rs | 20 | ||||
-rw-r--r-- | yt/src/select/cmds/add.rs | 104 | ||||
-rw-r--r-- | yt/src/storage/subscriptions.rs | 25 | ||||
-rw-r--r-- | yt/src/storage/video_database/get/mod.rs | 2 | ||||
-rw-r--r-- | yt/src/subscribe/mod.rs | 27 | ||||
-rw-r--r-- | yt/src/update/mod.rs | 69 | ||||
-rw-r--r-- | yt/src/update/updater.rs | 117 |
14 files changed, 587 insertions, 326 deletions
diff --git a/yt/src/cli.rs b/yt/src/cli.rs index 037f45c..e7ee4c2 100644 --- a/yt/src/cli.rs +++ b/yt/src/cli.rs @@ -103,12 +103,6 @@ pub enum Command { /// Show, the configuration options in effect Config {}, - /// Perform various tests - Check { - #[command(subcommand)] - command: CheckCommand, - }, - /// Display the comments of the currently playing video Comments {}, /// Display the description of the currently playing video @@ -355,12 +349,6 @@ impl Default for SelectCommand { } } -#[derive(Subcommand, Clone, Debug)] -pub enum CheckCommand { - /// Check if the given `*.info.json` file is deserializable. - InfoJson { path: PathBuf }, -} - #[derive(Subcommand, Clone, Copy, Debug)] pub enum CacheCommand { /// Invalidate all cache entries diff --git a/yt/src/comments/comment.rs b/yt/src/comments/comment.rs index 6b8cf73..5bc939c 100644 --- a/yt/src/comments/comment.rs +++ b/yt/src/comments/comment.rs @@ -9,7 +9,94 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use yt_dlp::wrapper::info_json::Comment; +use serde::{Deserialize, Deserializer, Serialize}; +use url::Url; + +#[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq, PartialOrd, Ord)] +#[serde(from = "String")] +#[serde(deny_unknown_fields)] +pub enum Parent { + Root, + Id(String), +} + +impl Parent { + #[must_use] + pub fn id(&self) -> Option<&str> { + if let Self::Id(id) = self { + Some(id) + } else { + None + } + } +} + +impl From<String> for Parent { + fn from(value: String) -> Self { + if value == "root" { + Self::Root + } else { + Self::Id(value) + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq, PartialOrd, Ord)] +#[serde(from = "String")] +#[serde(deny_unknown_fields)] +pub struct Id { + pub id: String, +} +impl From<String> for Id { + fn from(value: String) -> Self { + Self { + // Take the last element if the string is split with dots, otherwise take the full id + id: value.split('.').last().unwrap_or(&value).to_owned(), + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq, PartialOrd, Ord)] +#[allow(clippy::struct_excessive_bools)] +pub struct Comment { + pub id: Id, + pub text: String, + #[serde(default = "zero")] + pub like_count: u32, + pub is_pinned: bool, + pub author_id: String, + #[serde(default = "unknown")] + pub author: String, + pub author_is_verified: bool, + pub author_thumbnail: Url, + pub parent: Parent, + #[serde(deserialize_with = "edited_from_time_text", alias = "_time_text")] + pub edited: bool, + // Can't also be deserialized, as it's already used in 'edited' + // _time_text: String, + pub timestamp: i64, + pub author_url: Option<Url>, + pub author_is_uploader: bool, + pub is_favorited: bool, +} + +fn unknown() -> String { + "<Unknown>".to_string() +} +fn zero() -> u32 { + 0 +} +fn edited_from_time_text<'de, D>(d: D) -> Result<bool, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(d)?; + if s.contains(" (edited)") { + Ok(true) + } else { + Ok(false) + } +} #[derive(Debug, Clone)] #[allow(clippy::module_name_repetitions)] diff --git a/yt/src/comments/description.rs b/yt/src/comments/description.rs index d22a40f..e8cb29d 100644 --- a/yt/src/comments/description.rs +++ b/yt/src/comments/description.rs @@ -17,7 +17,7 @@ use crate::{ }; use anyhow::{Result, bail}; -use yt_dlp::wrapper::info_json::InfoJson; +use yt_dlp::{InfoJson, json_cast}; pub async fn description(app: &App) -> Result<()> { let description = get(app).await?; @@ -39,6 +39,8 @@ pub async fn get(app: &App) -> Result<String> { ); Ok(info_json - .description - .unwrap_or("<No description>".to_owned())) + .get("description") + .map(|val| json_cast!(val, as_str)) + .unwrap_or("<No description>") + .to_owned()) } diff --git a/yt/src/comments/mod.rs b/yt/src/comments/mod.rs index daecf8d..876146d 100644 --- a/yt/src/comments/mod.rs +++ b/yt/src/comments/mod.rs @@ -11,11 +11,11 @@ use std::mem; -use anyhow::{Context, Result, bail}; -use comment::{CommentExt, Comments}; +use anyhow::{Result, bail}; +use comment::{Comment, CommentExt, Comments, Parent}; use output::display_fmt_and_less; use regex::Regex; -use yt_dlp::wrapper::info_json::{Comment, InfoJson, Parent}; +use yt_dlp::{InfoJson, json_cast}; use crate::{ app::App, @@ -39,23 +39,25 @@ pub async fn get(app: &App) -> Result<Comments> { bail!("Could not find a currently playing video!"); }; - let mut info_json: InfoJson = get::video_info_json(¤tly_playing_video)?.unreachable( - "A currently *playing* must be cached. And thus the info.json should be available", + let info_json: InfoJson = get::video_info_json(¤tly_playing_video)?.unreachable( + "A currently *playing* video must be cached. And thus the info.json should be available", ); - let base_comments = mem::take(&mut info_json.comments).with_context(|| { - format!( + let base_comments = if let Some(comments) = info_json.get("comments") { + json_cast!(comments, as_array) + } else { + bail!( "The video ('{}') does not have comments!", info_json - .title - .as_ref() - .unwrap_or(&("<No Title>".to_owned())) + .get("title") + .map(|val| json_cast!(val, as_str)) + .unwrap_or("<No Title>") ) - })?; - drop(info_json); + }; let mut comments = Comments::new(); for c in base_comments { + let c: Comment = serde_json::from_value(c.to_owned())?; if let Parent::Id(id) = &c.parent { comments.insert(&(id.clone()), CommentExt::from(c)); } else { diff --git a/yt/src/download/download_options.rs b/yt/src/download/download_options.rs index 8f5a609..03c20ba 100644 --- a/yt/src/download/download_options.rs +++ b/yt/src/download/download_options.rs @@ -9,105 +9,110 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. +use anyhow::Context; use serde_json::{Value, json}; +use yt_dlp::{YoutubeDL, YoutubeDLOptions}; use crate::{app::App, storage::video_database::YtDlpOptions}; -#[must_use] -pub fn download_opts(app: &App, additional_opts: &YtDlpOptions) -> serde_json::Map<String, Value> { - match json!({ - "extract_flat": "in_playlist", - "extractor_args": { - "youtube": { - "comment_sort": [ - "top" - ], - "max_comments": [ - "150", - "all", - "100" - ] - } - }, +use super::progress_hook::wrapped_progress_hook; - "prefer_free_formats": true, - "ffmpeg_location": env!("FFMPEG_LOCATION"), - "format": "bestvideo[height<=?1080]+bestaudio/best", - "fragment_retries": 10, - "getcomments": true, - "ignoreerrors": false, - "retries": 10, - - "writeinfojson": true, - // NOTE: This results in a constant warning message. <2025-01-04> - // "writeannotations": true, - "writesubtitles": true, - "writeautomaticsub": true, - - "outtmpl": { - "default": app.config.paths.download_dir.join("%(channel)s/%(title)s.%(ext)s"), - "chapter": "%(title)s - %(section_number)03d %(section_title)s [%(id)s].%(ext)s" - }, - "compat_opts": {}, - "forceprint": {}, - "print_to_file": {}, - "windowsfilenames": false, - "restrictfilenames": false, - "trim_file_names": false, - "postprocessors": [ - { - "api": "https://sponsor.ajay.app", - "categories": [ - "interaction", - "intro", - "music_offtopic", - "sponsor", - "outro", - "poi_highlight", - "preview", - "selfpromo", - "filler", - "chapter" - ], - "key": "SponsorBlock", - "when": "after_filter" - }, - { - "force_keyframes": false, - "key": "ModifyChapters", - "remove_chapters_patterns": [], - "remove_ranges": [], - "remove_sponsor_segments": [ - "sponsor" - ], - "sponsorblock_chapter_title": "[SponsorBlock]: %(category_names)l" - }, - { - "add_chapters": true, - "add_infojson": null, - "add_metadata": false, - "key": "FFmpegMetadata" - }, - { - "key": "FFmpegConcat", - "only_multi_video": true, - "when": "playlist" - } - ] - }) { - Value::Object(mut obj) => { - obj.insert( - "subtitleslangs".to_owned(), - Value::Array( - additional_opts - .subtitle_langs - .split(',') - .map(|val| Value::String(val.to_owned())) - .collect::<Vec<_>>(), - ), - ); - obj - } - _ => unreachable!("This is an object"), - } +pub fn download_opts(app: &App, additional_opts: &YtDlpOptions) -> anyhow::Result<YoutubeDL> { + YoutubeDLOptions::new() + .with_progress_hook(wrapped_progress_hook) + .set("extract_flat", "in_playlist") + .set( + "extractor_args", + json! { + { + "youtube": { + "comment_sort": [ "top" ], + "max_comments": [ "150", "all", "100" ] + } + } + }, + ) + //.set("cookiesfrombrowser", json! {("firefox", "me.google", None::<String>, "youtube_dlp")}) + .set("prefer_free_formats", true) + .set("ffmpeg_location", env!("FFMPEG_LOCATION")) + .set("format", "bestvideo[height<=?1080]+bestaudio/best") + .set("fragment_retries", 10) + .set("getcomments", true) + .set("ignoreerrors", false) + .set("retries", 10) + .set("writeinfojson", true) + // NOTE: This results in a constant warning message. <2025-01-04> + //.set("writeannotations", true) + .set("writesubtitles", true) + .set("writeautomaticsub", true) + .set( + "outtmpl", + json! { + { + "default": app.config.paths.download_dir.join("%(channel)s/%(title)s.%(ext)s"), + "chapter": "%(title)s - %(section_number)03d %(section_title)s [%(id)s].%(ext)s" + } + }, + ) + .set("compat_opts", json! {{}}) + .set("forceprint", json! {{}}) + .set("print_to_file", json! {{}}) + .set("windowsfilenames", false) + .set("restrictfilenames", false) + .set("trim_file_names", false) + .set( + "postprocessors", + json! { + [ + { + "api": "https://sponsor.ajay.app", + "categories": [ + "interaction", + "intro", + "music_offtopic", + "sponsor", + "outro", + "poi_highlight", + "preview", + "selfpromo", + "filler", + "chapter" + ], + "key": "SponsorBlock", + "when": "after_filter" + }, + { + "force_keyframes": false, + "key": "ModifyChapters", + "remove_chapters_patterns": [], + "remove_ranges": [], + "remove_sponsor_segments": [ "sponsor" ], + "sponsorblock_chapter_title": "[SponsorBlock]: %(category_names)l" + }, + { + "add_chapters": true, + "add_infojson": null, + "add_metadata": false, + "key": "FFmpegMetadata" + }, + { + "key": "FFmpegConcat", + "only_multi_video": true, + "when": "playlist" + } + ] + }, + ) + .set( + "subtitleslangs", + Value::Array( + additional_opts + .subtitle_langs + .split(',') + .map(|val| Value::String(val.to_owned())) + .collect::<Vec<_>>(), + ), + ) + .build() + .context("Failed to instanciate download yt_dlp") } diff --git a/yt/src/download/mod.rs b/yt/src/download/mod.rs index 871e869..110bf55 100644 --- a/yt/src/download/mod.rs +++ b/yt/src/download/mod.rs @@ -29,9 +29,11 @@ use bytes::Bytes; use futures::{FutureExt, future::BoxFuture}; use log::{debug, error, info, warn}; use tokio::{fs, task::JoinHandle, time}; +use yt_dlp::{json_cast, json_get}; #[allow(clippy::module_name_repetitions)] pub mod download_options; +pub mod progress_hook; #[derive(Debug)] #[allow(clippy::module_name_repetitions)] @@ -299,24 +301,25 @@ impl Downloader { let add_opts = YtDlpOptions { subtitle_langs: String::new(), }; - let opts = &download_opts(app, &add_opts); + let yt_dlp = download_opts(app, &add_opts)?; - let result = - yt_dlp::extract_info(opts, &video.url, false, true).with_context(|| { + let result = yt_dlp + .extract_info(&video.url, false, true) + .with_context(|| { format!("Failed to extract video information: '{}'", video.title) })?; - let size = if let Some(val) = result.filesize { - val - } else if let Some(val) = result.filesize_approx { - val - } else if result.duration.is_some() && result.tbr.is_some() { + let size = if let Some(val) = result.get("filesize") { + json_cast!(val, as_u64) + } else if let Some(val) = result.get("filesize_approx") { + json_cast!(val, as_u64) + } else if result.get("duration").is_some() && result.get("tbr").is_some() { #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let duration = result.duration.expect("Is some").ceil() as u64; + let duration = json_get!(result, "duration", as_f64).ceil() as u64; // TODO: yt_dlp gets this from the format #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let tbr = result.tbr.expect("Is Some").ceil() as u64; + let tbr = json_get!(result, "tbr", as_f64).ceil() as u64; duration * tbr * (1000 / 8) } else { @@ -341,8 +344,10 @@ impl Downloader { debug!("Download started: {}", &video.title); let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?; + let yt_dlp = download_opts(app, &addional_opts)?; - let result = yt_dlp::download(&[video.url.clone()], &download_opts(app, &addional_opts)) + let result = yt_dlp + .download(&[video.url.to_owned()]) .with_context(|| format!("Failed to download video: '{}'", video.title))?; assert_eq!(result.len(), 1); diff --git a/yt/src/download/progress_hook.rs b/yt/src/download/progress_hook.rs new file mode 100644 index 0000000..65156e7 --- /dev/null +++ b/yt/src/download/progress_hook.rs @@ -0,0 +1,190 @@ +use std::{ + io::{Write, stderr}, + process, +}; + +use bytes::Bytes; +use log::{Level, log_enabled}; +use yt_dlp::mk_python_function; + +use crate::select::selection_file::duration::MaybeDuration; + +// #[allow(clippy::too_many_lines)] +// #[allow(clippy::missing_panics_doc)] +// #[allow(clippy::items_after_statements)] +// #[allow( +// clippy::cast_possible_truncation, +// clippy::cast_sign_loss, +// clippy::cast_precision_loss +// )] +pub fn progress_hook( + input: serde_json::Map<String, serde_json::Value>, +) -> Result<(), std::io::Error> { + // Only add the handler, if the log-level is higher than Debug (this avoids covering debug + // messages). + if log_enabled!(Level::Debug) { + return Ok(()); + } + + // ANSI ESCAPE CODES Wrappers {{{ + // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands + const CSI: &str = "\x1b["; + fn clear_whole_line() { + eprint!("{CSI}2K"); + } + fn move_to_col(x: usize) { + eprint!("{CSI}{x}G"); + } + // }}} + + macro_rules! get { + (@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{ + let a = $item.get($name).expect(concat!( + "The field '", + stringify!($name), + "' should exist." + )); + + if a.$type_fun() { + a.$get_fun().expect( + "The should have been checked in the if guard, so unpacking here is fine", + ) + } else { + panic!( + "Value {} => \n{}\n is not of type: {}", + $name, + a, + stringify!($type_fun) + ); + } + }}; + + ($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{ + let a = get! {@interrogate input, is_object, as_object, $name1}; + let b = get! {@interrogate a, $type_fun, $get_fun, $name2}; + b + }}; + + ($type_fun:ident, $get_fun:ident, $name:expr) => {{ + get! {@interrogate input, $type_fun, $get_fun, $name} + }}; + } + + macro_rules! default_get { + (@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{ + let a = if let Some(field) = $item.get($name) { + field.$get_fun().unwrap_or($default) + } else { + $default + }; + a + }}; + + ($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{ + let a = get! {@interrogate input, is_object, as_object, $name1}; + let b = default_get! {@interrogate a, $default, $get_fun, $name2}; + b + }}; + + ($get_fun:ident, $default:expr, $name:expr) => {{ + default_get! {@interrogate input, $default, $get_fun, $name} + }}; + } + + macro_rules! c { + ($color:expr, $format:expr) => { + format!("\x1b[{}m{}\x1b[0m", $color, $format) + }; + } + + fn format_bytes(bytes: u64) -> String { + let bytes = Bytes::new(bytes); + bytes.to_string() + } + + fn format_speed(speed: f64) -> String { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let bytes = Bytes::new(speed.floor() as u64); + format!("{bytes}/s") + } + + let get_title = || -> String { + match get! {is_string, as_str, "info_dict", "ext"} { + "vtt" => { + format!( + "Subtitles ({})", + default_get! {as_str, "<No Subtitle Language>", "info_dict", "name"} + ) + } + "webm" | "mp4" | "mp3" | "m4a" => { + default_get! { as_str, "<No title>", "info_dict", "title"}.to_owned() + } + other => panic!("The extension '{other}' is not yet implemented"), + } + }; + + match get! {is_string, as_str, "status"} { + "downloading" => { + let elapsed = default_get! {as_f64, 0.0f64, "elapsed"}; + let eta = default_get! {as_f64, 0.0, "eta"}; + let speed = default_get! {as_f64, 0.0, "speed"}; + + let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"}; + let (total_bytes, bytes_is_estimate): (u64, &'static str) = { + let total_bytes = default_get!(as_u64, 0, "total_bytes"); + if total_bytes == 0 { + let maybe_estimate = default_get!(as_u64, 0, "total_bytes_estimate"); + + if maybe_estimate == 0 { + // The download speed should be in bytes per second and the eta in seconds. + // Thus multiplying them gets us the raw bytes (which were estimated by `yt_dlp`, from their `info.json`) + let bytes_still_needed = (speed * eta).ceil() as u64; + + (downloaded_bytes + bytes_still_needed, "~") + } else { + (maybe_estimate, "~") + } + } else { + (total_bytes, "") + } + }; + let percent: f64 = { + if total_bytes == 0 { + 100.0 + } else { + (downloaded_bytes as f64 / total_bytes as f64) * 100.0 + } + }; + + clear_whole_line(); + move_to_col(1); + + eprint!( + "'{}' [{}/{} at {}] -> [{} of {}{} {}] ", + c!("34;1", get_title()), + c!("33;1", MaybeDuration::from_secs_f64(elapsed)), + c!("33;1", MaybeDuration::from_secs_f64(eta)), + c!("32;1", format_speed(speed)), + c!("31;1", format_bytes(downloaded_bytes)), + c!("31;1", bytes_is_estimate), + c!("31;1", format_bytes(total_bytes)), + c!("36;1", format!("{:.02}%", percent)) + ); + stderr().flush()?; + } + "finished" => { + eprintln!("-> Finished downloading."); + } + "error" => { + // TODO: This should probably return an Err. But I'm not so sure where the error would + // bubble up to (i.e., who would catch it) <2025-01-21> + eprintln!("-> Error while downloading: {}", get_title()); + process::exit(1); + } + other => unreachable!("'{other}' should not be a valid state!"), + }; + + Ok(()) +} + +mk_python_function!(progress_hook, wrapped_progress_hook); diff --git a/yt/src/main.rs b/yt/src/main.rs index ffb3e14..413dc5e 100644 --- a/yt/src/main.rs +++ b/yt/src/main.rs @@ -13,16 +13,16 @@ // to print it anyways. #![allow(clippy::missing_errors_doc)] -use std::{fs, sync::Arc}; +use std::sync::Arc; use anyhow::{Context, Result, bail}; use app::App; use bytes::Bytes; use cache::{invalidate, maintain}; use clap::Parser; -use cli::{CacheCommand, CheckCommand, SelectCommand, SubscriptionCommand, VideosCommand}; +use cli::{CacheCommand, SelectCommand, SubscriptionCommand, VideosCommand}; use config::Config; -use log::info; +use log::{error, info}; use select::cmds::handle_select_cmd; use storage::video_database::get::video_by_hash; use tokio::{ @@ -30,7 +30,6 @@ use tokio::{ io::{BufReader, stdin}, task::JoinHandle, }; -use yt_dlp::wrapper::info_json::InfoJson; use crate::{cli::Command, storage::subscriptions}; @@ -200,7 +199,7 @@ async fn main() -> Result<()> { subscribe::import(&app, BufReader::new(f), force).await?; } else { subscribe::import(&app, BufReader::new(stdin()), force).await?; - }; + } } }, @@ -215,17 +214,6 @@ async fn main() -> Result<()> { CacheCommand::Maintain { all } => maintain(&app, all).await?, }, - Command::Check { command } => match command { - CheckCommand::InfoJson { path } => { - let string = fs::read_to_string(&path) - .with_context(|| format!("Failed to read '{}' to string!", path.display()))?; - - drop( - serde_json::from_str::<InfoJson>(&string) - .context("Failed to deserialize value")?, - ); - } - }, Command::Comments {} => { comments::comments(&app).await?; } diff --git a/yt/src/select/cmds/add.rs b/yt/src/select/cmds/add.rs index 8b183f0..387b3a1 100644 --- a/yt/src/select/cmds/add.rs +++ b/yt/src/select/cmds/add.rs @@ -14,15 +14,13 @@ use crate::{ storage::video_database::{ self, extractor_hash::ExtractorHash, get::get_all_hashes, set::add_video, }, - unreachable::Unreachable, update::video_entry_to_video, }; use anyhow::{Context, Result, bail}; use log::{error, warn}; -use serde_json::{Map, Value}; use url::Url; -use yt_dlp::wrapper::info_json::InfoType; +use yt_dlp::{InfoJson, YoutubeDL, json_cast, json_get}; #[allow(clippy::too_many_lines)] pub(super) async fn add( @@ -32,16 +30,11 @@ pub(super) async fn add( stop: Option<usize>, ) -> Result<()> { for url in urls { - async fn process_and_add( - app: &App, - entry: yt_dlp::wrapper::info_json::InfoJson, - opts: &Map<String, Value>, - ) -> Result<()> { - let url = entry - .url - .unreachable("`yt_dlp` should guarantee that this is Some at this point"); - - let entry = yt_dlp::extract_info(opts, &url, false, true) + async fn process_and_add(app: &App, entry: InfoJson, yt_dlp: &YoutubeDL) -> Result<()> { + let url = json_get!(entry, "url", as_str).parse()?; + + let entry = yt_dlp + .extract_info(&url, false, true) .with_context(|| format!("Failed to fetch entry for url: '{url}'"))?; add_entry(app, entry).await?; @@ -49,19 +42,13 @@ pub(super) async fn add( Ok(()) } - async fn add_entry(app: &App, entry: yt_dlp::wrapper::info_json::InfoJson) -> Result<()> { + async fn add_entry(app: &App, entry: InfoJson) -> Result<()> { // We have to re-fetch all hashes every time, because a user could try to add the same // URL twice (for whatever reason.) let hashes = get_all_hashes(app) .await .context("Failed to fetch all video hashes")?; - let extractor_hash = blake3::hash( - entry - .id - .as_ref() - .expect("This should be some at this point") - .as_bytes(), - ); + let extractor_hash = blake3::hash(json_get!(entry, "id", as_str).as_bytes()); if hashes.contains(&extractor_hash) { error!( "Video '{}'{} is already in the database. Skipped adding it", @@ -71,17 +58,17 @@ pub(super) async fn add( .with_context(|| format!( "Failed to format hash of video '{}' as short hash", entry - .url - .map_or("<Unknown video Url>".to_owned(), |url| url.to_string()) + .get("url") + .map_or("<Unknown video Url>".to_owned(), ToString::to_string) ))?, entry - .title + .get("title") .map_or(String::new(), |title| format!(" ('{title}')")) ); return Ok(()); } - let video = video_entry_to_video(entry, None)?; + let video = video_entry_to_video(&entry, None)?; add_video(app, video.clone()).await?; println!("{}", &video.to_line_display(app).await?); @@ -89,18 +76,19 @@ pub(super) async fn add( Ok(()) } - let opts = download_opts( + let yt_dlp = download_opts( app, &video_database::YtDlpOptions { subtitle_langs: String::new(), }, - ); + )?; - let entry = yt_dlp::extract_info(&opts, &url, false, true) + let entry = yt_dlp + .extract_info(&url, false, true) .with_context(|| format!("Failed to fetch entry for url: '{url}'"))?; - match entry._type { - Some(InfoType::Video) => { + match entry.get("_type").map(|val| json_cast!(val, as_str)) { + Some("Video") => { add_entry(app, entry).await?; if start.is_some() || stop.is_some() { warn!( @@ -108,13 +96,14 @@ pub(super) async fn add( ); } } - Some(InfoType::Playlist) => { - if let Some(entries) = entry.entries { + Some("Playlist") => { + if let Some(entries) = entry.get("entries") { + let entries = json_cast!(entries, as_array); let start = start.unwrap_or(0); let stop = stop.unwrap_or(entries.len() - 1); - let mut respected_entries: Vec<_> = take_vector(entries, start, stop) - .with_context(|| { + let respected_entries = + take_vector(entries, start, stop).with_context(|| { format!( "Failed to take entries starting at: {start} and ending with {stop}" ) @@ -124,11 +113,23 @@ pub(super) async fn add( warn!("No entries found, after applying your start/stop limits."); } else { // Pre-warm the cache - process_and_add(app, respected_entries.remove(0), &opts).await?; + process_and_add( + app, + json_cast!(respected_entries[0], as_object).to_owned(), + &yt_dlp, + ) + .await?; + let respected_entries = &respected_entries[1..]; let futures: Vec<_> = respected_entries - .into_iter() - .map(|entry| process_and_add(app, entry, &opts)) + .iter() + .map(|entry| { + process_and_add( + app, + json_cast!(entry, as_object).to_owned(), + &yt_dlp, + ) + }) .collect(); for fut in futures { @@ -149,7 +150,7 @@ pub(super) async fn add( Ok(()) } -fn take_vector<T>(vector: Vec<T>, start: usize, stop: usize) -> Result<Vec<T>> { +fn take_vector<T>(vector: &[T], start: usize, stop: usize) -> Result<&[T]> { let length = vector.len(); if stop >= length { @@ -158,26 +159,7 @@ fn take_vector<T>(vector: Vec<T>, start: usize, stop: usize) -> Result<Vec<T>> { ); } - let end_skip = { - let base = length - .checked_sub(stop) - .unreachable("The check above should have caught this case."); - - base.checked_sub(1) - .unreachable("The check above should have caught this case.") - }; - - // NOTE: We're using this instead of the `vector[start..=stop]` notation, because I wanted to - // avoid the needed allocation to turn the slice into a vector. <2025-01-04> - - // TODO: This function could also just return a slice, but oh well.. <2025-01-04> - Ok(vector - .into_iter() - .skip(start) - .rev() - .skip(end_skip) - .rev() - .collect()) + Ok(&vector[start..=stop]) } #[cfg(test)] @@ -188,7 +170,7 @@ mod test { fn test_vector_take() { let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let new_vec = take_vector(vec, 2, 8).unwrap(); + let new_vec = take_vector(&vec, 2, 8).unwrap(); assert_eq!(new_vec, vec![2, 3, 4, 5, 6, 7, 8]); } @@ -197,13 +179,13 @@ mod test { fn test_vector_take_overflow() { let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - assert!(take_vector(vec, 0, 12).is_err()); + assert!(take_vector(&vec, 0, 12).is_err()); } #[test] fn test_vector_take_equal() { let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - assert!(take_vector(vec, 0, 11).is_err()); + assert!(take_vector(&vec, 0, 11).is_err()); } } diff --git a/yt/src/storage/subscriptions.rs b/yt/src/storage/subscriptions.rs index 37b57fc..6c0d08a 100644 --- a/yt/src/storage/subscriptions.rs +++ b/yt/src/storage/subscriptions.rs @@ -15,10 +15,9 @@ use std::collections::HashMap; use anyhow::Result; use log::debug; -use serde_json::{Value, json}; use sqlx::query; use url::Url; -use yt_dlp::wrapper::info_json::InfoType; +use yt_dlp::YoutubeDLOptions; use crate::{app::App, unreachable::Unreachable}; @@ -39,21 +38,19 @@ impl Subscription { } /// Check whether an URL could be used as a subscription URL -pub async fn check_url(url: &Url) -> Result<bool> { - let Value::Object(yt_opts) = json!( { - "playliststart": 1, - "playlistend": 10, - "noplaylist": false, - "extract_flat": "in_playlist", - }) else { - unreachable!("This is hardcoded"); - }; - - let info = yt_dlp::extract_info(&yt_opts, url, false, false)?; +pub async fn check_url(url: Url) -> Result<bool> { + let yt_dlp = YoutubeDLOptions::new() + .set("playliststart", 1) + .set("playlistend", 10) + .set("noplaylist", false) + .set("extract_flat", "in_playlist") + .build()?; + + let info = yt_dlp.extract_info(&url, false, false)?; debug!("{:#?}", info); - Ok(info._type == Some(InfoType::Playlist)) + Ok(info.get("_type") == Some(&serde_json::Value::String("Playlist".to_owned()))) } #[derive(Default, Debug)] diff --git a/yt/src/storage/video_database/get/mod.rs b/yt/src/storage/video_database/get/mod.rs index a1871e2..0456cd3 100644 --- a/yt/src/storage/video_database/get/mod.rs +++ b/yt/src/storage/video_database/get/mod.rs @@ -18,7 +18,7 @@ use anyhow::{Context, Result, bail}; use blake3::Hash; use log::{debug, trace}; use sqlx::query; -use yt_dlp::wrapper::info_json::InfoJson; +use yt_dlp::InfoJson; use crate::{ app::App, diff --git a/yt/src/subscribe/mod.rs b/yt/src/subscribe/mod.rs index d77e2bc..e6a5f51 100644 --- a/yt/src/subscribe/mod.rs +++ b/yt/src/subscribe/mod.rs @@ -14,10 +14,9 @@ use std::str::FromStr; use anyhow::{Context, Result, bail}; use futures::FutureExt; use log::warn; -use serde_json::{Value, json}; use tokio::io::{AsyncBufRead, AsyncBufReadExt}; use url::Url; -use yt_dlp::wrapper::info_json::InfoType; +use yt_dlp::{YoutubeDLOptions, json_get}; use crate::{ app::App, @@ -149,19 +148,17 @@ async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<( let name = if let Some(name) = name { name } else { - let Value::Object(yt_opts) = json!( { - "playliststart": 1, - "playlistend": 10, - "noplaylist": false, - "extract_flat": "in_playlist", - }) else { - unreachable!("This is hardcoded") - }; - - let info = yt_dlp::extract_info(&yt_opts, &url, false, false)?; - - if info._type == Some(InfoType::Playlist) { - info.title.expect("This should be some for a playlist") + let yt_dlp = YoutubeDLOptions::new() + .set("playliststart", 1) + .set("playlistend", 10) + .set("noplaylist", false) + .set("extract_flat", "in_playlist") + .build()?; + + let info = yt_dlp.extract_info(&url, false, false)?; + + if info.get("_type") == Some(&serde_json::Value::String("Playlist".to_owned())) { + json_get!(info, "title", as_str).to_owned() } else { bail!("The url ('{}') does not represent a playlist!", &url) } diff --git a/yt/src/update/mod.rs b/yt/src/update/mod.rs index 7efe0da..f0b1e2c 100644 --- a/yt/src/update/mod.rs +++ b/yt/src/update/mod.rs @@ -15,7 +15,7 @@ use anyhow::{Context, Ok, Result}; use chrono::{DateTime, Utc}; use log::{info, warn}; use url::Url; -use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson}; +use yt_dlp::{InfoJson, json_cast, json_get}; use crate::{ app::App, @@ -72,19 +72,7 @@ pub async fn update( } #[allow(clippy::too_many_lines)] -pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Result<Video> { - macro_rules! unwrap_option { - ($option:expr) => { - match $option { - Some(x) => x, - None => anyhow::bail!(concat!( - "Expected a value, but '", - stringify!($option), - "' is None!" - )), - } - }; - } +pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Result<Video> { fn fmt_context(date: &str, extended: Option<&str>) -> String { let f = format!( "Failed to parse the `upload_date` of the entry ('{date}'). \ @@ -97,7 +85,9 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu } } - let publish_date = if let Some(date) = &entry.upload_date { + let publish_date = if let Some(date) = &entry.get("upload_date") { + let date = json_cast!(date, as_str); + let year: u32 = date .chars() .take(4) @@ -113,7 +103,7 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu .with_context(|| fmt_context(date, None))?; let day: u32 = date .chars() - .skip(6) + .skip(4 + 2) .take(2) .collect::<String>() .parse() @@ -128,42 +118,59 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu } else { warn!( "The video '{}' lacks it's upload date!", - unwrap_option!(&entry.title) + json_get!(entry, "title", as_str) ); None }; - let thumbnail_url = match (&entry.thumbnails, &entry.thumbnail) { + let thumbnail_url = match (&entry.get("thumbnails"), &entry.get("thumbnail")) { (None, None) => None, - (None, Some(thumbnail)) => Some(thumbnail.to_owned()), + (None, Some(thumbnail)) => Some(Url::from_str(json_cast!(thumbnail, as_str))?), // TODO: The algorithm is not exactly the best <2024-05-28> - (Some(thumbnails), None) => thumbnails.first().map(|thumbnail| thumbnail.url.clone()), - (Some(_), Some(thumnail)) => Some(thumnail.to_owned()), + (Some(thumbnails), None) => { + if let Some(thumbnail) = json_cast!(thumbnails, as_array).first() { + Some(Url::from_str(json_get!( + json_cast!(thumbnail, as_object), + "url", + as_str + ))?) + } else { + None + } + } + (Some(_), Some(thumnail)) => Some(Url::from_str(json_cast!(thumnail, as_str))?), }; let url = { - let smug_url: Url = unwrap_option!(entry.webpage_url.clone()); - unsmuggle_url(&smug_url)? + let smug_url: Url = json_get!(entry, "webpage_url", as_str).parse()?; + // unsmuggle_url(&smug_url)? + smug_url }; - let extractor_hash = blake3::hash(unwrap_option!(entry.id).as_bytes()); + let extractor_hash = blake3::hash(json_get!(entry, "id", as_str).as_bytes()); let subscription_name = if let Some(sub) = sub { Some(sub.name.clone()) - } else if let Some(uploader) = entry.uploader { - if entry.webpage_url_domain == Some("youtube.com".to_owned()) { + } else if let Some(uploader) = entry.get("uploader") { + if entry.get("webpage_url_domain") + == Some(&serde_json::Value::String("youtube.com".to_owned())) + { Some(format!("{uploader} - Videos")) } else { - Some(uploader.clone()) + Some(json_cast!(uploader, as_str).to_owned()) } } else { None }; let video = Video { - description: entry.description.clone(), - duration: MaybeDuration::from_maybe_secs_f64(entry.duration), + description: entry + .get("description") + .map(|val| json_cast!(val, as_str).to_owned()), + duration: MaybeDuration::from_maybe_secs_f64( + entry.get("duration").map(|val| json_cast!(val, as_f64)), + ), extractor_hash: ExtractorHash::from_hash(extractor_hash), last_status_change: TimeStamp::from_now(), parent_subscription_name: subscription_name, @@ -171,7 +178,7 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu publish_date: publish_date.map(TimeStamp::from_secs), status: VideoStatus::Pick, thumbnail_url, - title: unwrap_option!(entry.title.clone()), + title: json_get!(entry, "title", as_str).to_owned(), url, watch_progress: Duration::default(), }; @@ -180,7 +187,7 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> { let video = - video_entry_to_video(entry, Some(sub)).context("Failed to parse search entry as Video")?; + video_entry_to_video(&entry, Some(sub)).context("Failed to parse search entry as Video")?; add_video(app, video.clone()) .await diff --git a/yt/src/update/updater.rs b/yt/src/update/updater.rs index fe96da3..900fba7 100644 --- a/yt/src/update/updater.rs +++ b/yt/src/update/updater.rs @@ -18,7 +18,7 @@ use futures::{ }; use log::{Level, debug, error, log_enabled}; use serde_json::json; -use yt_dlp::{error::YtDlpError, process_ie_result, wrapper::info_json::InfoJson}; +use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get}; use crate::{app::App, storage::subscriptions::Subscription}; @@ -71,6 +71,7 @@ impl<'a> Updater<'a> { &self, sub: &'a Subscription, ) -> Result<Vec<(&'a Subscription, InfoJson)>> { + // TODO(@bpeetz): Deduplicate with the progress_hook. <2025-06-13> ) // ANSI ESCAPE CODES Wrappers {{{ // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands const CSI: &str = "\x1b["; @@ -88,15 +89,18 @@ impl<'a> Updater<'a> { // } // }}} - let json = json! { - { - "playliststart": 1, - "playlistend": self.max_backlog, - "noplaylist": false, - "extractor_args": {"youtubetab": {"approximate_date": [""]}}, - } - }; - let yt_dlp_opts = json.as_object().expect("This is hardcoded"); + let yt_dlp = YoutubeDLOptions::new() + .set("playliststart", 1) + .set("playlistend", self.max_backlog) + .set("noplaylist", false) + .set( + "extractor_args", + json! {{"youtubetab": {"approximate_date": [""]}}}, + ) + // TODO: This also removes unlisted and other stuff. Find a good way to remove the + // members-only videos from the feed. <2025-04-17> + .set("match-filter", "availability=public") + .build()?; if !log_enabled!(Level::Debug) { clear_whole_line(); @@ -106,64 +110,71 @@ impl<'a> Updater<'a> { stderr().flush()?; } - let info = yt_dlp::extract_info(yt_dlp_opts, &sub.url, false, false) + let info = yt_dlp + .extract_info(&sub.url, false, false) .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; - let entries = info.entries.unwrap_or(vec![]); + let empty = vec![]; + let entries = info + .get("entries") + .map_or(&empty, |val| json_cast!(val, as_array)); + let valid_entries: Vec<(&Subscription, InfoJson)> = entries - .into_iter() + .iter() .take(self.max_backlog) .filter_map(|entry| -> Option<(&Subscription, InfoJson)> { - let id = entry.id.as_ref().expect("Should exist?"); + let id = json_get!(entry, "id", as_str); let extractor_hash = blake3::hash(id.as_bytes()); if self.hashes.contains(&extractor_hash) { - debug!( - "Skipping entry, as it is already present: '{}'", - extractor_hash - ); + debug!("Skipping entry, as it is already present: '{extractor_hash}'",); None } else { - Some((sub, entry)) + Some((sub, json_cast!(entry, as_object).to_owned())) } }) .collect(); - let processed_entries = { - let base: Result<Vec<(&Subscription, InfoJson)>, YtDlpError> = - stream::iter(valid_entries) - .map(|(sub, entry)| async move { - match process_ie_result(yt_dlp_opts, entry, false) { - Ok(output) => Ok((sub, output)), - Err(err) => Err(err), - } - }) - .buffer_unordered(100) - .try_collect() - .await; - match base { - Ok(ok) => ok, + let processed_entries: Vec<(&Subscription, InfoJson)> = stream::iter(valid_entries) + .map( + async |(sub, entry)| match yt_dlp.process_ie_result(entry, false) { + Ok(output) => Ok((sub, output)), + Err(err) => Err(err), + }, + ) + .buffer_unordered(100) + .collect::<Vec<_>>() + .await + .into_iter() + // Don't fail the whole update, if one of the entries fails to fetch. + .filter_map(|base| match base { + Ok(ok) => Some(ok), Err(err) => { - if let YtDlpError::PythonError { error, kind } = &err { - if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>" - && error.to_string().as_str().contains( - "Join this channel to get access to members-only content ", - ) - { - vec![] - } else { - let error_string = error.to_string(); - let error = error_string - .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") - .expect("This prefix should exists"); - error!("{error}"); - vec![] - } - } else { - Err(err).context("Failed to process new entries.")? - } + // TODO(@bpeetz): Add this <2025-06-13> + // if let YtDlpError::PythonError { error, kind } = &err { + // if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>" + // && error.to_string().as_str().contains( + // "Join this channel to get access to members-only content ", + // ) + // { + // // Hide this error + // } else { + // let error_string = error.to_string(); + // let error = error_string + // .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") + // .expect("This prefix should exists"); + // error!("{error}"); + // } + // return None; + // } + + // TODO(@bpeetz): Ideally, we _would_ actually exit on unexpected errors, but + // this is fine for now. <2025-06-13> + // Some(Err(err).context("Failed to process new entries.")) + error!("While processing entry: {err}"); + None } - } - }; + }) + .collect(); Ok(processed_entries) } |