diff options
Diffstat (limited to '')
-rw-r--r-- | crates/fmt/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/libmpv2/libmpv2-sys/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/yt/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/yt/src/cli.rs | 33 | ||||
-rw-r--r-- | crates/yt/src/main.rs | 63 | ||||
-rw-r--r-- | crates/yt/src/select/mod.rs | 225 | ||||
-rw-r--r-- | crates/yt/src/select/selection_file/duration.rs | 173 | ||||
-rw-r--r-- | crates/yt/src/update/mod.rs | 55 | ||||
-rw-r--r-- | crates/yt/src/update/updater.rs | 224 | ||||
-rw-r--r-- | crates/yt/src/version/mod.rs | 24 | ||||
-rw-r--r-- | crates/yt/src/watch/playlist_handler/client_messages/mod.rs | 1 | ||||
-rw-r--r-- | crates/yt_dlp/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/yt_dlp/src/lib.rs | 155 |
13 files changed, 643 insertions, 317 deletions
diff --git a/crates/fmt/Cargo.toml b/crates/fmt/Cargo.toml index 7f82a09..f3cf4ad 100644 --- a/crates/fmt/Cargo.toml +++ b/crates/fmt/Cargo.toml @@ -24,7 +24,7 @@ publish = false path = "src/fmt.rs" [dependencies] -unicode-width = "0.2.0" +unicode-width = "0.2.1" [lints] workspace = true diff --git a/crates/libmpv2/libmpv2-sys/Cargo.toml b/crates/libmpv2/libmpv2-sys/Cargo.toml index b0514b8..96141d3 100644 --- a/crates/libmpv2/libmpv2-sys/Cargo.toml +++ b/crates/libmpv2/libmpv2-sys/Cargo.toml @@ -23,4 +23,4 @@ rust-version.workspace = true publish = false [build-dependencies] -bindgen = { version = "0.71.1" } +bindgen = { version = "0.72.0" } diff --git a/crates/yt/Cargo.toml b/crates/yt/Cargo.toml index 17d4016..6803e68 100644 --- a/crates/yt/Cargo.toml +++ b/crates/yt/Cargo.toml @@ -50,6 +50,7 @@ yt_dlp.workspace = true termsize.workspace = true uu_fmt.workspace = true notify = { version = "8.0.0", default-features = false } +tokio-util = { version = "0.7.15", features = ["rt"] } [[bin]] name = "yt" diff --git a/crates/yt/src/cli.rs b/crates/yt/src/cli.rs index de7a5b8..634e422 100644 --- a/crates/yt/src/cli.rs +++ b/crates/yt/src/cli.rs @@ -123,13 +123,35 @@ pub enum Command { /// Update the video database Update { + /// The maximal number of videos to fetch for each subscription. #[arg(short, long)] - /// The number of videos to updating max_backlog: Option<usize>, - #[arg(short, long)] - /// The subscriptions to update (can be given multiple times) + /// How many subs were already checked. + /// + /// Only used in the progress display in combination with `--grouped`. + #[arg(short, long, hide = true)] + current_progress: Option<usize>, + + /// How many subs are to be checked. + /// + /// Only used in the progress display in combination with `--grouped`. + #[arg(short, long, hide = true)] + total_number: Option<usize>, + + /// The subscriptions to update subscriptions: Vec<String>, + + /// Perform the updates in blocks. + /// + /// This works around the memory leaks in the default update invocation. + #[arg( + short, + long, + conflicts_with = "total_number", + conflicts_with = "current_progress" + )] + grouped: bool, }, /// Manipulate subscription @@ -282,6 +304,10 @@ pub enum SelectCommand { #[arg(long, short)] done: bool, + /// Generate a directory, where each file contains only one subscription. + #[arg(long, short, conflicts_with = "use_last_selection")] + split: bool, + /// Use the last selection file (useful if you've spend time on it and want to get it again) #[arg(long, short, conflicts_with = "done")] use_last_selection: bool, @@ -345,6 +371,7 @@ impl Default for SelectCommand { Self::File { done: false, use_last_selection: false, + split: false, } } } diff --git a/crates/yt/src/main.rs b/crates/yt/src/main.rs index 39f52f4..930d269 100644 --- a/crates/yt/src/main.rs +++ b/crates/yt/src/main.rs @@ -13,7 +13,7 @@ // to print it anyways. #![allow(clippy::missing_errors_doc)] -use std::sync::Arc; +use std::{env::current_exe, sync::Arc}; use anyhow::{Context, Result, bail}; use app::App; @@ -115,12 +115,20 @@ async fn main() -> Result<()> { SelectCommand::File { done, use_last_selection, - } => Box::pin(select::select(&app, done, use_last_selection)).await?, + split, + } => { + if split { + assert!(!use_last_selection); + Box::pin(select::select_split(&app, done)).await? + } else { + Box::pin(select::select_file(&app, done, use_last_selection)).await? + } + } _ => Box::pin(handle_select_cmd(&app, cmd, None)).await?, } } Command::Sedowa {} => { - Box::pin(select::select(&app, false, false)).await?; + Box::pin(select::select_file(&app, false, false)).await?; let arc_app = Arc::new(app); dowa(arc_app).await?; @@ -153,6 +161,9 @@ async fn main() -> Result<()> { Command::Update { max_backlog, subscriptions, + grouped, + current_progress, + total_number, } => { let all_subs = subscriptions::get(&app).await?; @@ -167,7 +178,49 @@ async fn main() -> Result<()> { let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog); - update::update(&app, max_backlog, subscriptions).await?; + if grouped { + const CHUNK_SIZE: usize = 50; + + assert!(current_progress.is_none() && total_number.is_none()); + + let subs = { + if subscriptions.is_empty() { + all_subs.0.into_iter().map(|sub| sub.0).collect() + } else { + subscriptions + } + }; + + let total_number = subs.len(); + let mut current_progress = 0; + for chunk in subs.chunks(CHUNK_SIZE) { + info!( + "$ yt update {}", + chunk + .iter() + .map(|sub_name| format!("{sub_name:#?}")) + .collect::<Vec<_>>() + .join(" ") + ); + + let status = std::process::Command::new( + current_exe().context("Failed to get the current exe to re-execute")?, + ) + .arg("update") + .args(["--current-progress", current_progress.to_string().as_str()]) + .args(["--total-number", total_number.to_string().as_str()]) + .args(chunk) + .status()?; + + if !status.success() { + bail!("grouped yt update: Child process failed."); + } + + current_progress += CHUNK_SIZE; + } + } else { + update::update(&app, max_backlog, subscriptions, total_number, current_progress).await?; + } } Command::Subscriptions { cmd } => match cmd { SubscriptionCommand::Add { name, url } => { @@ -228,7 +281,7 @@ async fn main() -> Result<()> { async fn dowa(arc_app: Arc<App>) -> Result<()> { let max_cache_size = arc_app.config.download.max_cache_size; - info!("Max cache size: '{}'", max_cache_size); + info!("Max cache size: '{max_cache_size}'"); let arc_app_clone = Arc::clone(&arc_app); let download: JoinHandle<()> = tokio::spawn(async move { diff --git a/crates/yt/src/select/mod.rs b/crates/yt/src/select/mod.rs index 8db9ae3..668ab02 100644 --- a/crates/yt/src/select/mod.rs +++ b/crates/yt/src/select/mod.rs @@ -10,9 +10,12 @@ // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. use std::{ + collections::HashMap, env::{self}, - fs, + fs::{self, File}, io::{BufRead, BufReader, BufWriter, Write}, + iter, + path::Path, string::String, }; @@ -28,6 +31,7 @@ use anyhow::{Context, Result, bail}; use clap::Parser; use cmds::handle_select_cmd; use futures::{TryStreamExt, stream::FuturesOrdered}; +use log::info; use selection_file::process_line; use tempfile::Builder; use tokio::process::Command; @@ -35,11 +39,92 @@ use tokio::process::Command; pub mod cmds; pub mod selection_file; -async fn to_select_file_display_owned(video: Video, app: &App) -> Result<String> { - video.to_select_file_display(app).await +pub async fn select_split(app: &App, done: bool) -> Result<()> { + let temp_dir = Builder::new() + .prefix("yt_video_select-") + .rand_bytes(6) + .tempdir() + .context("Failed to get tempdir")?; + + let matching_videos = get_videos(app, done).await?; + + let mut no_author = vec![]; + let mut author_map = HashMap::new(); + for video in matching_videos { + if let Some(sub) = &video.parent_subscription_name { + if author_map.contains_key(sub) { + let vec: &mut Vec<_> = author_map + .get_mut(sub) + .unreachable("This key is set, we checked in the if above"); + + vec.push(video); + } else { + author_map.insert(sub.to_owned(), vec![video]); + } + } else { + no_author.push(video); + } + } + + let author_map = { + let mut temp_vec: Vec<_> = author_map.into_iter().collect(); + + // PERFORMANCE: The clone here should not be neeed. <2025-06-15> + temp_vec.sort_by_key(|(name, _)| name.to_owned()); + + temp_vec + }; + + for (index, (name, videos)) in author_map + .into_iter() + .chain(iter::once(( + "<No parent subscription>".to_owned(), + no_author, + ))) + .enumerate() + { + let mut file_path = temp_dir.path().join(format!("{index:02}_{name}")); + file_path.set_extension("yts"); + + let tmp_file = File::create(&file_path) + .with_context(|| format!("Falied to create file at: {}", file_path.display()))?; + + write_videos_to_file(app, &tmp_file, &videos) + .await + .with_context(|| format!("Falied to populate file at: {}", file_path.display()))?; + } + + open_editor_at(temp_dir.path()).await?; + + let mut paths = vec![]; + for maybe_entry in temp_dir + .path() + .read_dir() + .context("Failed to open temp dir for reading")? + { + let entry = maybe_entry.context("Failed to read entry in temp dir")?; + + if !entry.file_type()?.is_file() { + bail!("Found non-file entry: {}", entry.path().display()); + } + + paths.push(entry.path()); + } + + paths.sort(); + + let mut processed = 0; + for path in paths { + let read_file = File::open(path)?; + processed = process_file(app, &read_file, processed).await?; + } + + info!("Processed {processed} records."); + temp_dir.close().context("Failed to close the temp dir")?; + Ok(()) } -pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<()> { +pub async fn select_file(app: &App, done: bool, use_last_selection: bool) -> Result<()> { let temp_file = Builder::new() .prefix("yt_video_select-") .suffix(".yts") @@ -50,66 +135,75 @@ pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<( if use_last_selection { fs::copy(&app.config.paths.last_selection_path, &temp_file)?; } else { - let matching_videos = if done { - get::videos(app, VideoStatusMarker::ALL).await? - } else { - get::videos( - app, - &[ - VideoStatusMarker::Pick, - // - VideoStatusMarker::Watch, - VideoStatusMarker::Cached, - ], - ) - .await? - }; - - // Warmup the cache for the display rendering of the videos. - // Otherwise the futures would all try to warm it up at the same time. - if let Some(vid) = matching_videos.first() { - drop(vid.to_line_display(app).await?); - } + let matching_videos = get_videos(app, done).await?; - let mut edit_file = BufWriter::new(&temp_file); - - matching_videos - .into_iter() - .map(|vid| to_select_file_display_owned(vid, app)) - .collect::<FuturesOrdered<_>>() - .try_collect::<Vec<String>>() - .await? - .into_iter() - .try_for_each(|line| -> Result<()> { - edit_file - .write_all(line.as_bytes()) - .context("Failed to write to `edit_file`")?; - - Ok(()) - })?; - - edit_file.write_all(HELP_STR.as_bytes())?; - edit_file.flush().context("Failed to flush edit file")?; - }; - - { - let editor = env::var("EDITOR").unwrap_or("nvim".to_owned()); - - let mut nvim = Command::new(editor); - nvim.arg(temp_file.path()); - let status = nvim.status().await.context("Falied to run nvim")?; - if !status.success() { - bail!("nvim exited with error status: {}", status) - } + write_videos_to_file(app, temp_file.as_file(), &matching_videos).await?; } + open_editor_at(temp_file.path()).await?; + let read_file = temp_file.reopen()?; fs::copy(temp_file.path(), &app.config.paths.last_selection_path) .context("Failed to persist selection file")?; - let reader = BufReader::new(&read_file); + let processed = process_file(app, &read_file, 0).await?; + info!("Processed {processed} records."); + + Ok(()) +} + +async fn get_videos(app: &App, include_done: bool) -> Result<Vec<Video>> { + if include_done { + get::videos(app, VideoStatusMarker::ALL).await + } else { + get::videos( + app, + &[ + VideoStatusMarker::Pick, + // + VideoStatusMarker::Watch, + VideoStatusMarker::Cached, + ], + ) + .await + } +} + +async fn write_videos_to_file(app: &App, file: &File, videos: &[Video]) -> Result<()> { + // Warm-up the cache for the display rendering of the videos. + // Otherwise the futures would all try to warm it up at the same time. + if let Some(vid) = videos.first() { + drop(vid.to_line_display(app).await?); + } + + let mut edit_file = BufWriter::new(file); + + videos + .iter() + .map(|vid| vid.to_select_file_display(app)) + .collect::<FuturesOrdered<_>>() + .try_collect::<Vec<String>>() + .await? + .into_iter() + .try_for_each(|line| -> Result<()> { + edit_file + .write_all(line.as_bytes()) + .context("Failed to write to `edit_file`")?; + + Ok(()) + })?; + + edit_file.write_all(HELP_STR.as_bytes())?; + edit_file.flush().context("Failed to flush edit file")?; + + Ok(()) +} + +async fn process_file(app: &App, file: &File, processed: i64) -> Result<i64> { + let reader = BufReader::new(file); + + let mut line_number = -processed; - let mut line_number = 0; for line in reader.lines() { let line = line.context("Failed to read a line")?; @@ -149,7 +243,24 @@ pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<( } } - Ok(()) + Ok(line_number * -1) +} + +async fn open_editor_at(path: &Path) -> Result<()> { + let editor = env::var("EDITOR").unwrap_or("nvim".to_owned()); + + let mut nvim = Command::new(&editor); + nvim.arg(path); + let status = nvim + .status() + .await + .with_context(|| format!("Falied to run editor: {editor}"))?; + + if status.success() { + Ok(()) + } else { + bail!("Editor ({editor}) exited with error status: {}", status) + } } // // FIXME: There should be no reason why we need to re-run yt, just to get the help string. But I've diff --git a/crates/yt/src/select/selection_file/duration.rs b/crates/yt/src/select/selection_file/duration.rs index 77c4fc5..668a0b8 100644 --- a/crates/yt/src/select/selection_file/duration.rs +++ b/crates/yt/src/select/selection_file/duration.rs @@ -12,7 +12,7 @@ use std::str::FromStr; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::{Result, bail}; const SECOND: u64 = 1; const MINUTE: u64 = 60 * SECOND; @@ -73,52 +73,109 @@ impl FromStr for MaybeDuration { type Err = anyhow::Error; fn from_str(s: &str) -> Result<Self, Self::Err> { - fn parse_num(str: &str, suffix: char) -> Result<u64> { - str.strip_suffix(suffix) - .with_context(|| format!("Failed to strip suffix '{suffix}' of number: '{str}'"))? - .parse::<u64>() - .with_context(|| format!("Failed to parse '{suffix}'")) + #[derive(Debug, Clone, Copy)] + enum Token { + Number(u64), + UnitConstant((char, u64)), + } + + struct Tokenizer<'a> { + input: &'a str, + } + + impl Tokenizer<'_> { + fn next(&mut self) -> Result<Option<Token>> { + loop { + if let Some(next) = self.peek() { + match next { + '0'..='9' => { + let mut number = self.expect_num(); + while matches!(self.peek(), Some('0'..='9')) { + number *= 10; + number += self.expect_num(); + } + break Ok(Some(Token::Number(number))); + } + 's' => { + self.chomp(); + break Ok(Some(Token::UnitConstant(('s', SECOND)))); + } + 'm' => { + self.chomp(); + break Ok(Some(Token::UnitConstant(('m', MINUTE)))); + } + 'h' => { + self.chomp(); + break Ok(Some(Token::UnitConstant(('h', HOUR)))); + } + 'd' => { + self.chomp(); + break Ok(Some(Token::UnitConstant(('d', DAY)))); + } + ' ' => { + // Simply ignore white space + self.chomp(); + } + other => bail!("Unknown unit: {other:#?}"), + } + } else { + break Ok(None); + } + } + } + + fn chomp(&mut self) { + self.input = &self.input[1..]; + } + + fn peek(&self) -> Option<char> { + self.input.chars().next() + } + + fn expect_num(&mut self) -> u64 { + let next = self.peek().expect("Should be some at this point"); + self.chomp(); + assert!(next.is_ascii_digit()); + (next as u64) - ('0' as u64) + } } if s == "[No duration]" { return Ok(Self { time: None }); } - let buf: Vec<_> = s.split(' ').collect(); - - let days; - let hours; - let minutes; - let seconds; - - assert_eq!(buf.len(), 2, "Other lengths should not happen"); - - if buf[0].ends_with('d') { - days = parse_num(buf[0], 'd')?; - hours = parse_num(buf[1], 'h')?; - minutes = parse_num(buf[2], 'm')?; - seconds = 0; - } else if buf[0].ends_with('h') { - days = 0; - hours = parse_num(buf[0], 'h')?; - minutes = parse_num(buf[1], 'm')?; - seconds = 0; - } else if buf[0].ends_with('m') { - days = 0; - hours = 0; - minutes = parse_num(buf[0], 'm')?; - seconds = parse_num(buf[1], 's')?; - } else { - unreachable!( - "The first part always ends with 'h' or 'm', but was: {:#?}", - buf - ) + let mut tokenizer = Tokenizer { input: s }; + + let mut value = 0; + let mut current_val = None; + while let Some(token) = tokenizer.next()? { + match token { + Token::Number(number) => { + if let Some(current_val) = current_val { + bail!("Failed to find unit for number: {current_val}"); + } + + { + current_val = Some(number); + } + } + Token::UnitConstant((name, unit)) => { + if let Some(cval) = current_val { + value += cval * unit; + current_val = None; + } else { + bail!("Found unit without number: {name:#?}"); + } + } + } + } + + if let Some(current_val) = current_val { + bail!("Duration endet without unit, number was: {current_val}"); } Ok(Self { - time: Some(Duration::from_secs( - days * DAY + hours * HOUR + minutes * MINUTE + seconds * SECOND, - )), + time: Some(Duration::from_secs(value)), }) } } @@ -156,30 +213,34 @@ mod test { use super::MaybeDuration; - #[test] - fn test_display_duration_1h() { - let dur = MaybeDuration::from_secs(HOUR); - assert_eq!("1h 0m".to_owned(), dur.to_string()); + fn mk_roundtrip(input: MaybeDuration, expected: &str) { + let output = MaybeDuration::from_str(expected).unwrap(); + + assert_eq!(input.to_string(), output.to_string()); + assert_eq!(input.to_string(), expected); + assert_eq!( + MaybeDuration::from_str(input.to_string().as_str()).unwrap(), + output + ); } + #[test] - fn test_display_duration_30min() { - let dur = MaybeDuration::from_secs(MINUTE * 30); - assert_eq!("30m 0s".to_owned(), dur.to_string()); + fn test_roundtrip_duration_1h() { + mk_roundtrip(MaybeDuration::from_secs(HOUR), "1h 0m"); } #[test] - fn test_display_duration_1d() { - let dur = MaybeDuration::from_secs(DAY + MINUTE * 30 + HOUR * 2); - assert_eq!("1d 2h 30m".to_owned(), dur.to_string()); + fn test_roundtrip_duration_30min() { + mk_roundtrip(MaybeDuration::from_secs(MINUTE * 30), "30m 0s"); } - #[test] - fn test_display_duration_roundtrip() { - let dur = MaybeDuration::zero(); - let dur_str = dur.to_string(); - - assert_eq!( - MaybeDuration::zero(), - MaybeDuration::from_str(&dur_str).unwrap() + fn test_roundtrip_duration_1d() { + mk_roundtrip( + MaybeDuration::from_secs(DAY + MINUTE * 30 + HOUR * 2), + "1d 2h 30m", ); } + #[test] + fn test_roundtrip_duration_none() { + mk_roundtrip(MaybeDuration::from_maybe_secs_f64(None), "[No duration]"); + } } diff --git a/crates/yt/src/update/mod.rs b/crates/yt/src/update/mod.rs index f0b1e2c..d866882 100644 --- a/crates/yt/src/update/mod.rs +++ b/crates/yt/src/update/mod.rs @@ -13,7 +13,7 @@ use std::{str::FromStr, time::Duration}; use anyhow::{Context, Ok, Result}; use chrono::{DateTime, Utc}; -use log::{info, warn}; +use log::warn; use url::Url; use yt_dlp::{InfoJson, json_cast, json_get}; @@ -36,26 +36,18 @@ pub async fn update( app: &App, max_backlog: usize, subscription_names_to_update: Vec<String>, + total_number: Option<usize>, + current_progress: Option<usize>, ) -> Result<()> { let subscriptions = subscriptions::get(app).await?; - let urls: Vec<_> = if subscription_names_to_update.is_empty() { - subscriptions.0.values().collect() + let subs: Vec<Subscription> = if subscription_names_to_update.is_empty() { + subscriptions.0.into_values().collect() } else { subscriptions .0 - .values() - .filter(|sub| { - if subscription_names_to_update.contains(&sub.name) { - true - } else { - info!( - "Not updating subscription '{}' as it was not specified", - sub.name - ); - false - } - }) + .into_values() + .filter(|sub| subscription_names_to_update.contains(&sub.name)) .collect() }; @@ -63,10 +55,10 @@ pub async fn update( // should not contain duplicates. let hashes = get_all_hashes(app).await?; - { - let mut updater = Updater::new(max_backlog, &hashes); - updater.update(app, &urls).await?; - } + let updater = Updater::new(max_backlog, hashes); + updater + .update(app, subs, total_number, current_progress) + .await?; Ok(()) } @@ -144,7 +136,14 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res let url = { let smug_url: Url = json_get!(entry, "webpage_url", as_str).parse()?; - // unsmuggle_url(&smug_url)? + // TODO(@bpeetz): We should probably add this? <2025-06-14> + // if '#__youtubedl_smuggle' not in smug_url: + // return smug_url, default + // url, _, sdata = smug_url.rpartition('#') + // jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0] + // data = json.loads(jsond) + // return url, data + smug_url }; @@ -152,13 +151,15 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res let subscription_name = if let Some(sub) = sub { Some(sub.name.clone()) - } else if let Some(uploader) = entry.get("uploader") { - if entry.get("webpage_url_domain") - == Some(&serde_json::Value::String("youtube.com".to_owned())) + } else if let Some(uploader) = entry.get("uploader").map(|val| json_cast!(val, as_str)) { + if entry + .get("webpage_url_domain") + .map(|val| json_cast!(val, as_str)) + == Some("youtube.com") { Some(format!("{uploader} - Videos")) } else { - Some(json_cast!(uploader, as_str).to_owned()) + Some(uploader.to_owned()) } } else { None @@ -185,9 +186,9 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res Ok(video) } -async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> { - let video = - video_entry_to_video(&entry, Some(sub)).context("Failed to parse search entry as Video")?; +async fn process_subscription(app: &App, sub: Subscription, entry: InfoJson) -> Result<()> { + let video = video_entry_to_video(&entry, Some(&sub)) + .context("Failed to parse search entry as Video")?; add_video(app, video.clone()) .await diff --git a/crates/yt/src/update/updater.rs b/crates/yt/src/update/updater.rs index 8da654b..04bcaa1 100644 --- a/crates/yt/src/update/updater.rs +++ b/crates/yt/src/update/updater.rs @@ -8,17 +8,18 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::io::{Write, stderr}; +use std::{ + io::{Write, stderr}, + sync::atomic::{AtomicUsize, Ordering}, +}; use anyhow::{Context, Result}; use blake3::Hash; -use futures::{ - StreamExt, TryStreamExt, - stream::{self}, -}; +use futures::{StreamExt, future::join_all, stream}; use log::{Level, debug, error, log_enabled}; use serde_json::json; -use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get}; +use tokio_util::task::LocalPoolHandle; +use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get, process_ie_result}; use crate::{ ansi_escape_codes::{clear_whole_line, move_to_col}, @@ -28,44 +29,55 @@ use crate::{ use super::process_subscription; -pub(super) struct Updater<'a> { +pub(super) struct Updater { max_backlog: usize, - hashes: &'a [Hash], + hashes: Vec<Hash>, + pool: LocalPoolHandle, } -impl<'a> Updater<'a> { - pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self { +static REACHED_NUMBER: AtomicUsize = const { AtomicUsize::new(1) }; + +impl Updater { + pub(super) fn new(max_backlog: usize, hashes: Vec<Hash>) -> Self { + // TODO(@bpeetz): The number should not be hardcoded. <2025-06-14> + let pool = LocalPoolHandle::new(16); + Self { max_backlog, hashes, + pool, } } pub(super) async fn update( - &mut self, + self, app: &App, - subscriptions: &[&Subscription], + subscriptions: Vec<Subscription>, + total_number: Option<usize>, + current_progress: Option<usize>, ) -> Result<()> { + let total_number = total_number.unwrap_or(subscriptions.len()); + + if let Some(current_progress) = current_progress { + REACHED_NUMBER.store(current_progress, Ordering::Relaxed); + } + let mut stream = stream::iter(subscriptions) - .map(|sub| self.get_new_entries(sub)) - .buffer_unordered(100); + .map(|sub| self.get_new_entries(sub, total_number)) + .buffer_unordered(16 * 4); while let Some(output) = stream.next().await { let mut entries = output?; - if entries.is_empty() { - continue; - } - - let (sub, entry) = entries.remove(0); - process_subscription(app, sub, entry).await?; + if let Some(next) = entries.next() { + let (sub, entry) = next; + process_subscription(app, sub, entry).await?; - let entry_stream: Result<()> = stream::iter(entries) - .map(|(sub, entry)| process_subscription(app, sub, entry)) - .buffer_unordered(100) - .try_collect() - .await; - entry_stream?; + join_all(entries.map(|(sub, entry)| process_subscription(app, sub, entry))) + .await + .into_iter() + .collect::<Result<(), _>>()?; + } } Ok(()) @@ -73,11 +85,15 @@ impl<'a> Updater<'a> { async fn get_new_entries( &self, - sub: &'a Subscription, - ) -> Result<Vec<(&'a Subscription, InfoJson)>> { + sub: Subscription, + total_number: usize, + ) -> Result<impl Iterator<Item = (Subscription, InfoJson)>> { + let max_backlog = self.max_backlog; + let hashes = self.hashes.clone(); + let yt_dlp = YoutubeDLOptions::new() .set("playliststart", 1) - .set("playlistend", self.max_backlog) + .set("playlistend", max_backlog) .set("noplaylist", false) .set( "extractor_args", @@ -88,80 +104,84 @@ impl<'a> Updater<'a> { .set("match-filter", "availability=public") .build()?; - if !log_enabled!(Level::Debug) { - clear_whole_line(); - move_to_col(1); - eprint!("Checking playlist {}...", sub.name); - move_to_col(1); - stderr().flush()?; - } - - let info = yt_dlp - .extract_info(&sub.url, false, false) - .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; - - let empty = vec![]; - let entries = info - .get("entries") - .map_or(&empty, |val| json_cast!(val, as_array)); - - let valid_entries: Vec<(&Subscription, InfoJson)> = entries - .iter() - .take(self.max_backlog) - .filter_map(|entry| -> Option<(&Subscription, InfoJson)> { - let id = json_get!(entry, "id", as_str); - let extractor_hash = blake3::hash(id.as_bytes()); - if self.hashes.contains(&extractor_hash) { - debug!("Skipping entry, as it is already present: '{extractor_hash}'",); - None - } else { - Some((sub, json_cast!(entry, as_object).to_owned())) + self.pool + .spawn_pinned(move || { + async move { + if !log_enabled!(Level::Debug) { + clear_whole_line(); + move_to_col(1); + eprint!( + "({}/{total_number}) Checking playlist {}...", + REACHED_NUMBER.fetch_add(1, Ordering::Relaxed), + sub.name + ); + move_to_col(1); + stderr().flush()?; + } + + let info = yt_dlp + .extract_info(&sub.url, false, false) + .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; + + let empty = vec![]; + let entries = info + .get("entries") + .map_or(&empty, |val| json_cast!(val, as_array)); + + let valid_entries: Vec<(Subscription, InfoJson)> = entries + .iter() + .take(max_backlog) + .filter_map(|entry| -> Option<(Subscription, InfoJson)> { + let id = json_get!(entry, "id", as_str); + let extractor_hash = blake3::hash(id.as_bytes()); + + if hashes.contains(&extractor_hash) { + debug!( + "Skipping entry, as it is already present: '{extractor_hash}'", + ); + None + } else { + Some((sub.clone(), json_cast!(entry, as_object).to_owned())) + } + }) + .collect(); + + Ok(valid_entries + .into_iter() + .map(|(sub, entry)| { + let inner_yt_dlp = YoutubeDLOptions::new() + .set("noplaylist", true) + .build() + .expect("Worked before, should work now"); + + match inner_yt_dlp.process_ie_result(entry, false) { + Ok(output) => Ok((sub, output)), + Err(err) => Err(err), + } + }) + // Don't fail the whole update, if one of the entries fails to fetch. + .filter_map(|base| match base { + Ok(ok) => Some(ok), + Err(err) => { + let process_ie_result::Error::Python(err) = &err; + + if err.contains( + "Join this channel to get access to members-only content ", + ) { + // Hide this error + } else { + // Show the error, but don't fail. + let error = err + .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") + .unwrap_or(err); + error!("{error}"); + } + + None + } + })) } }) - .collect(); - - let processed_entries: Vec<(&Subscription, InfoJson)> = stream::iter(valid_entries) - .map( - async |(sub, entry)| match yt_dlp.process_ie_result(entry, false) { - Ok(output) => Ok((sub, output)), - Err(err) => Err(err), - }, - ) - .buffer_unordered(100) - .collect::<Vec<_>>() - .await - .into_iter() - // Don't fail the whole update, if one of the entries fails to fetch. - .filter_map(|base| match base { - Ok(ok) => Some(ok), - Err(err) => { - // TODO(@bpeetz): Add this <2025-06-13> - // if let YtDlpError::PythonError { error, kind } = &err { - // if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>" - // && error.to_string().as_str().contains( - // "Join this channel to get access to members-only content ", - // ) - // { - // // Hide this error - // } else { - // let error_string = error.to_string(); - // let error = error_string - // .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") - // .expect("This prefix should exists"); - // error!("{error}"); - // } - // return None; - // } - - // TODO(@bpeetz): Ideally, we _would_ actually exit on unexpected errors, but - // this is fine for now. <2025-06-13> - // Some(Err(err).context("Failed to process new entries.")) - error!("While processing entry: {err}"); - None - } - }) - .collect(); - - Ok(processed_entries) + .await? } } diff --git a/crates/yt/src/version/mod.rs b/crates/yt/src/version/mod.rs index 05d85e0..9a91f3b 100644 --- a/crates/yt/src/version/mod.rs +++ b/crates/yt/src/version/mod.rs @@ -8,26 +8,12 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::process::Command; - use anyhow::{Context, Result}; use sqlx::{SqlitePool, sqlite::SqliteConnectOptions}; +use yt_dlp::YoutubeDLOptions; use crate::{config::Config, storage::migrate::get_version_db}; -fn get_cmd_version(cmd: &str) -> Result<String> { - let out = String::from_utf8( - Command::new(cmd) - .arg("--version") - .output() - .with_context(|| format!("Failed to run `{cmd} --version`"))? - .stdout, - ) - .context("Failed to interpret output as utf8")?; - - Ok(out.trim().to_owned()) -} - pub async fn show(config: &Config) -> Result<()> { let db_version = { let options = SqliteConnectOptions::new() @@ -44,16 +30,16 @@ pub async fn show(config: &Config) -> Result<()> { .context("Failed to determine database version")? }; - // TODO(@bpeetz): Use `pyo3`'s build in mechanism instead of executing the python CLI <2025-02-21> - let python_version = get_cmd_version("python")?; - let yt_dlp_version = get_cmd_version("yt-dlp")?; + let yt_dlp_version = { + let yt_dlp = YoutubeDLOptions::new().build()?; + yt_dlp.version() + }; println!( "{}: {} db version: {db_version} -python: {python_version} yt-dlp: {yt_dlp_version}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), diff --git a/crates/yt/src/watch/playlist_handler/client_messages/mod.rs b/crates/yt/src/watch/playlist_handler/client_messages/mod.rs index 6f7a59e..c05ca87 100644 --- a/crates/yt/src/watch/playlist_handler/client_messages/mod.rs +++ b/crates/yt/src/watch/playlist_handler/client_messages/mod.rs @@ -19,6 +19,7 @@ use tokio::process::Command; use super::mpv_message; async fn run_self_in_external_command(app: &App, args: &[&str]) -> Result<()> { + // TODO(@bpeetz): Can we trust this value? <2025-06-15> let binary = env::current_exe().context("Failed to determine the current executable to re-execute")?; diff --git a/crates/yt_dlp/Cargo.toml b/crates/yt_dlp/Cargo.toml index ddd5f9b..90f2e10 100644 --- a/crates/yt_dlp/Cargo.toml +++ b/crates/yt_dlp/Cargo.toml @@ -10,7 +10,7 @@ [package] name = "yt_dlp" -description = "A rust fii wrapper library for the python yt_dlp library" +description = "A rust ffi wrapper library for the python yt_dlp library" keywords = [] categories = [] version.workspace = true diff --git a/crates/yt_dlp/src/lib.rs b/crates/yt_dlp/src/lib.rs index 34b8a5d..dd42fc6 100644 --- a/crates/yt_dlp/src/lib.rs +++ b/crates/yt_dlp/src/lib.rs @@ -1,19 +1,18 @@ //! The `yt_dlp` interface is completely contained in the [`YoutubeDL`] structure. -use std::io::Write; -use std::mem; -use std::{env, fs::File, path::PathBuf}; +use std::{self, env, mem, path::PathBuf}; use indexmap::IndexMap; use log::{Level, debug, error, info, log_enabled}; use logging::setup_logging; -use rustpython::vm::builtins::PyList; use rustpython::{ InterpreterConfig, vm::{ - self, Interpreter, PyObjectRef, PyRef, VirtualMachine, - builtins::{PyBaseException, PyDict, PyStr}, + self, AsObject, Interpreter, PyObjectRef, PyPayload, PyRef, VirtualMachine, + builtins::{PyBaseException, PyBaseExceptionRef, PyDict, PyList, PyStr}, function::{FuncArgs, KwArgs, PosArgs}, + py_io::Write, + suggestion::offer_suggestions, }, }; use url::Url; @@ -177,12 +176,16 @@ impl YoutubeDL { Ok::<_, PyRef<PyBaseException>>((yt_dlp_module, youtube_dl_class)) }) { - Ok(ok) => ok, + Ok(ok) => Ok(ok), Err(err) => { - interpreter.finalize(Some(err)); - return Err(build::Error::Python); + // TODO(@bpeetz): Do we want to run `interpreter.finalize` here? <2025-06-14> + // interpreter.finalize(Some(err)); + interpreter.enter(|vm| { + let buffer = process_exception(vm, &err); + Err(build::Error::Python(buffer)) + }) } - }; + }?; Ok(Self { interpreter, @@ -313,30 +316,13 @@ impl YoutubeDL { let result_json = json_dumps(result, vm); - if let Ok(confirm) = env::var("YT_STORE_INFO_JSON") { - if confirm == "yes" { - let mut file = File::create("output.info.json").unwrap(); - write!( - file, - "{}", - serde_json::to_string_pretty(&serde_json::Value::Object( - result_json.clone() - )) - .expect("Valid json") - ) - .unwrap(); - } - } - Ok::<_, PyRef<PyBaseException>>(result_json) }) { Ok(ok) => Ok(ok), - Err(err) => { - self.interpreter.enter(|vm| { - vm.print_exception(err); - }); - Err(extract_info::Error::Python) - } + Err(err) => self.interpreter.enter(|vm| { + let buffer = process_exception(vm, &err); + Err(extract_info::Error::Python(buffer)) + }), } } @@ -387,30 +373,28 @@ impl YoutubeDL { Ok::<_, PyRef<PyBaseException>>(result_json) }) { Ok(ok) => Ok(ok), - Err(err) => { - self.interpreter.enter(|vm| { - vm.print_exception(err); - }); - Err(process_ie_result::Error::Python) - } + Err(err) => self.interpreter.enter(|vm| { + let buffer = process_exception(vm, &err); + Err(process_ie_result::Error::Python(buffer)) + }), } } } #[allow(missing_docs)] pub mod process_ie_result { - #[derive(Debug, thiserror::Error, Clone, Copy)] + #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Python threw an exception")] - Python, + #[error("Python threw an exception: {0}")] + Python(String), } } #[allow(missing_docs)] pub mod extract_info { - #[derive(Debug, thiserror::Error, Clone, Copy)] + #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Python threw an exception")] - Python, + #[error("Python threw an exception: {0}")] + Python(String), } } @@ -488,8 +472,8 @@ impl YoutubeDLOptions { pub mod build { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Python threw an exception")] - Python, + #[error("Python threw an exception: {0}")] + Python(String), #[error("Io error: {0}")] Io(#[from] std::io::Error), @@ -539,3 +523,84 @@ pub fn json_dumps( _ => unreachable!("These should not be json.dumps output"), } } + +// Inlined and changed from `vm.write_exception_inner` +fn write_exception<W: Write>( + vm: &VirtualMachine, + output: &mut W, + exc: &PyBaseExceptionRef, +) -> Result<(), W::Error> { + let varargs = exc.args(); + let args_repr = { + match varargs.len() { + 0 => vec![], + 1 => { + let args0_repr = if true { + varargs[0] + .str(vm) + .unwrap_or_else(|_| PyStr::from("<element str() failed>").into_ref(&vm.ctx)) + } else { + varargs[0].repr(vm).unwrap_or_else(|_| { + PyStr::from("<element repr() failed>").into_ref(&vm.ctx) + }) + }; + vec![args0_repr] + } + _ => varargs + .iter() + .map(|vararg| { + vararg.repr(vm).unwrap_or_else(|_| { + PyStr::from("<element repr() failed>").into_ref(&vm.ctx) + }) + }) + .collect(), + } + }; + + let exc_class = exc.class(); + + if exc_class.fast_issubclass(vm.ctx.exceptions.syntax_error) { + unreachable!( + "A syntax error should never be raised, \ + as yt_dlp should not have them and neither our embedded code" + ); + } + + let exc_name = exc_class.name(); + match args_repr.len() { + 0 => write!(output, "{exc_name}"), + 1 => write!(output, "{}: {}", exc_name, args_repr[0]), + _ => write!( + output, + "{}: ({})", + exc_name, + args_repr + .iter() + .map(|val| val.as_str()) + .collect::<Vec<_>>() + .join(", "), + ), + }?; + + match offer_suggestions(exc, vm) { + Some(suggestions) => { + write!(output, ". Did you mean: '{suggestions}'?") + } + None => Ok(()), + } +} + +fn process_exception(vm: &VirtualMachine, err: &PyBaseExceptionRef) -> String { + let mut buffer = String::new(); + write_exception(vm, &mut buffer, err) + .expect("We are writing into an *in-memory* string, it will always work"); + + if log_enabled!(Level::Debug) { + let mut output = String::new(); + vm.write_exception(&mut output, err) + .expect("We are writing into an *in-memory* string, it will always work"); + debug!("Python threw an exception: {output}"); + } + + buffer +} |