about summary refs log tree commit diff stats
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--crates/fmt/Cargo.toml2
-rw-r--r--crates/libmpv2/libmpv2-sys/Cargo.toml2
-rw-r--r--crates/yt/Cargo.toml1
-rw-r--r--crates/yt/src/cli.rs33
-rw-r--r--crates/yt/src/main.rs63
-rw-r--r--crates/yt/src/select/mod.rs225
-rw-r--r--crates/yt/src/select/selection_file/duration.rs173
-rw-r--r--crates/yt/src/update/mod.rs55
-rw-r--r--crates/yt/src/update/updater.rs224
-rw-r--r--crates/yt/src/version/mod.rs24
-rw-r--r--crates/yt/src/watch/playlist_handler/client_messages/mod.rs1
-rw-r--r--crates/yt_dlp/Cargo.toml2
-rw-r--r--crates/yt_dlp/src/lib.rs155
13 files changed, 643 insertions, 317 deletions
diff --git a/crates/fmt/Cargo.toml b/crates/fmt/Cargo.toml
index 7f82a09..f3cf4ad 100644
--- a/crates/fmt/Cargo.toml
+++ b/crates/fmt/Cargo.toml
@@ -24,7 +24,7 @@ publish = false
 path = "src/fmt.rs"
 
 [dependencies]
-unicode-width = "0.2.0"
+unicode-width = "0.2.1"
 
 [lints]
 workspace = true
diff --git a/crates/libmpv2/libmpv2-sys/Cargo.toml b/crates/libmpv2/libmpv2-sys/Cargo.toml
index b0514b8..96141d3 100644
--- a/crates/libmpv2/libmpv2-sys/Cargo.toml
+++ b/crates/libmpv2/libmpv2-sys/Cargo.toml
@@ -23,4 +23,4 @@ rust-version.workspace = true
 publish = false
 
 [build-dependencies]
-bindgen = { version = "0.71.1" }
+bindgen = { version = "0.72.0" }
diff --git a/crates/yt/Cargo.toml b/crates/yt/Cargo.toml
index 17d4016..6803e68 100644
--- a/crates/yt/Cargo.toml
+++ b/crates/yt/Cargo.toml
@@ -50,6 +50,7 @@ yt_dlp.workspace = true
 termsize.workspace = true
 uu_fmt.workspace = true
 notify = { version = "8.0.0", default-features = false }
+tokio-util = { version = "0.7.15", features = ["rt"] }
 
 [[bin]]
 name = "yt"
diff --git a/crates/yt/src/cli.rs b/crates/yt/src/cli.rs
index de7a5b8..634e422 100644
--- a/crates/yt/src/cli.rs
+++ b/crates/yt/src/cli.rs
@@ -123,13 +123,35 @@ pub enum Command {
 
     /// Update the video database
     Update {
+        /// The maximal number of videos to fetch for each subscription.
         #[arg(short, long)]
-        /// The number of videos to updating
         max_backlog: Option<usize>,
 
-        #[arg(short, long)]
-        /// The subscriptions to update (can be given multiple times)
+        /// How many subs were already checked.
+        ///
+        /// Only used in the progress display in combination with `--grouped`.
+        #[arg(short, long, hide = true)]
+        current_progress: Option<usize>,
+
+        /// How many subs are to be checked.
+        ///
+        /// Only used in the progress display in combination with `--grouped`.
+        #[arg(short, long, hide = true)]
+        total_number: Option<usize>,
+
+        /// The subscriptions to update
         subscriptions: Vec<String>,
+
+        /// Perform the updates in blocks.
+        ///
+        /// This works around the memory leaks in the default update invocation.
+        #[arg(
+            short,
+            long,
+            conflicts_with = "total_number",
+            conflicts_with = "current_progress"
+        )]
+        grouped: bool,
     },
 
     /// Manipulate subscription
@@ -282,6 +304,10 @@ pub enum SelectCommand {
         #[arg(long, short)]
         done: bool,
 
+        /// Generate a directory, where each file contains only one subscription.
+        #[arg(long, short, conflicts_with = "use_last_selection")]
+        split: bool,
+
         /// Use the last selection file (useful if you've spend time on it and want to get it again)
         #[arg(long, short, conflicts_with = "done")]
         use_last_selection: bool,
@@ -345,6 +371,7 @@ impl Default for SelectCommand {
         Self::File {
             done: false,
             use_last_selection: false,
+            split: false,
         }
     }
 }
diff --git a/crates/yt/src/main.rs b/crates/yt/src/main.rs
index 39f52f4..930d269 100644
--- a/crates/yt/src/main.rs
+++ b/crates/yt/src/main.rs
@@ -13,7 +13,7 @@
 // to print it anyways.
 #![allow(clippy::missing_errors_doc)]
 
-use std::sync::Arc;
+use std::{env::current_exe, sync::Arc};
 
 use anyhow::{Context, Result, bail};
 use app::App;
@@ -115,12 +115,20 @@ async fn main() -> Result<()> {
                 SelectCommand::File {
                     done,
                     use_last_selection,
-                } => Box::pin(select::select(&app, done, use_last_selection)).await?,
+                    split,
+                } => {
+                    if split {
+                        assert!(!use_last_selection);
+                        Box::pin(select::select_split(&app, done)).await?
+                    } else {
+                        Box::pin(select::select_file(&app, done, use_last_selection)).await?
+                    }
+                }
                 _ => Box::pin(handle_select_cmd(&app, cmd, None)).await?,
             }
         }
         Command::Sedowa {} => {
-            Box::pin(select::select(&app, false, false)).await?;
+            Box::pin(select::select_file(&app, false, false)).await?;
 
             let arc_app = Arc::new(app);
             dowa(arc_app).await?;
@@ -153,6 +161,9 @@ async fn main() -> Result<()> {
         Command::Update {
             max_backlog,
             subscriptions,
+            grouped,
+            current_progress,
+            total_number,
         } => {
             let all_subs = subscriptions::get(&app).await?;
 
@@ -167,7 +178,49 @@ async fn main() -> Result<()> {
 
             let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog);
 
-            update::update(&app, max_backlog, subscriptions).await?;
+            if grouped {
+                const CHUNK_SIZE: usize = 50;
+
+                assert!(current_progress.is_none() && total_number.is_none());
+
+                let subs = {
+                    if subscriptions.is_empty() {
+                        all_subs.0.into_iter().map(|sub| sub.0).collect()
+                    } else {
+                        subscriptions
+                    }
+                };
+
+                let total_number = subs.len();
+                let mut current_progress = 0;
+                for chunk in subs.chunks(CHUNK_SIZE) {
+                    info!(
+                        "$ yt update {}",
+                        chunk
+                            .iter()
+                            .map(|sub_name| format!("{sub_name:#?}"))
+                            .collect::<Vec<_>>()
+                            .join(" ")
+                    );
+
+                    let status = std::process::Command::new(
+                        current_exe().context("Failed to get the current exe to re-execute")?,
+                    )
+                    .arg("update")
+                    .args(["--current-progress", current_progress.to_string().as_str()])
+                    .args(["--total-number", total_number.to_string().as_str()])
+                    .args(chunk)
+                    .status()?;
+
+                    if !status.success() {
+                        bail!("grouped yt update: Child process failed.");
+                    }
+
+                    current_progress += CHUNK_SIZE;
+                }
+            } else {
+                update::update(&app, max_backlog, subscriptions, total_number, current_progress).await?;
+            }
         }
         Command::Subscriptions { cmd } => match cmd {
             SubscriptionCommand::Add { name, url } => {
@@ -228,7 +281,7 @@ async fn main() -> Result<()> {
 
 async fn dowa(arc_app: Arc<App>) -> Result<()> {
     let max_cache_size = arc_app.config.download.max_cache_size;
-    info!("Max cache size: '{}'", max_cache_size);
+    info!("Max cache size: '{max_cache_size}'");
 
     let arc_app_clone = Arc::clone(&arc_app);
     let download: JoinHandle<()> = tokio::spawn(async move {
diff --git a/crates/yt/src/select/mod.rs b/crates/yt/src/select/mod.rs
index 8db9ae3..668ab02 100644
--- a/crates/yt/src/select/mod.rs
+++ b/crates/yt/src/select/mod.rs
@@ -10,9 +10,12 @@
 // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
 
 use std::{
+    collections::HashMap,
     env::{self},
-    fs,
+    fs::{self, File},
     io::{BufRead, BufReader, BufWriter, Write},
+    iter,
+    path::Path,
     string::String,
 };
 
@@ -28,6 +31,7 @@ use anyhow::{Context, Result, bail};
 use clap::Parser;
 use cmds::handle_select_cmd;
 use futures::{TryStreamExt, stream::FuturesOrdered};
+use log::info;
 use selection_file::process_line;
 use tempfile::Builder;
 use tokio::process::Command;
@@ -35,11 +39,92 @@ use tokio::process::Command;
 pub mod cmds;
 pub mod selection_file;
 
-async fn to_select_file_display_owned(video: Video, app: &App) -> Result<String> {
-    video.to_select_file_display(app).await
+pub async fn select_split(app: &App, done: bool) -> Result<()> {
+    let temp_dir = Builder::new()
+        .prefix("yt_video_select-")
+        .rand_bytes(6)
+        .tempdir()
+        .context("Failed to get tempdir")?;
+
+    let matching_videos = get_videos(app, done).await?;
+
+    let mut no_author = vec![];
+    let mut author_map = HashMap::new();
+    for video in matching_videos {
+        if let Some(sub) = &video.parent_subscription_name {
+            if author_map.contains_key(sub) {
+                let vec: &mut Vec<_> = author_map
+                    .get_mut(sub)
+                    .unreachable("This key is set, we checked in the if above");
+
+                vec.push(video);
+            } else {
+                author_map.insert(sub.to_owned(), vec![video]);
+            }
+        } else {
+            no_author.push(video);
+        }
+    }
+
+    let author_map = {
+        let mut temp_vec: Vec<_> = author_map.into_iter().collect();
+
+        // PERFORMANCE: The clone here should not be neeed.  <2025-06-15>
+        temp_vec.sort_by_key(|(name, _)| name.to_owned());
+
+        temp_vec
+    };
+
+    for (index, (name, videos)) in author_map
+        .into_iter()
+        .chain(iter::once((
+            "<No parent subscription>".to_owned(),
+            no_author,
+        )))
+        .enumerate()
+    {
+        let mut file_path = temp_dir.path().join(format!("{index:02}_{name}"));
+        file_path.set_extension("yts");
+
+        let tmp_file = File::create(&file_path)
+            .with_context(|| format!("Falied to create file at: {}", file_path.display()))?;
+
+        write_videos_to_file(app, &tmp_file, &videos)
+            .await
+            .with_context(|| format!("Falied to populate file at: {}", file_path.display()))?;
+    }
+
+    open_editor_at(temp_dir.path()).await?;
+
+    let mut paths = vec![];
+    for maybe_entry in temp_dir
+        .path()
+        .read_dir()
+        .context("Failed to open temp dir for reading")?
+    {
+        let entry = maybe_entry.context("Failed to read entry in temp dir")?;
+
+        if !entry.file_type()?.is_file() {
+            bail!("Found non-file entry: {}", entry.path().display());
+        }
+
+        paths.push(entry.path());
+    }
+
+    paths.sort();
+
+    let mut processed = 0;
+    for path in paths {
+        let read_file = File::open(path)?;
+        processed = process_file(app, &read_file, processed).await?;
+    }
+
+    info!("Processed {processed} records.");
+    temp_dir.close().context("Failed to close the temp dir")?;
+    Ok(())
 }
 
-pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<()> {
+pub async fn select_file(app: &App, done: bool, use_last_selection: bool) -> Result<()> {
     let temp_file = Builder::new()
         .prefix("yt_video_select-")
         .suffix(".yts")
@@ -50,66 +135,75 @@ pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<(
     if use_last_selection {
         fs::copy(&app.config.paths.last_selection_path, &temp_file)?;
     } else {
-        let matching_videos = if done {
-            get::videos(app, VideoStatusMarker::ALL).await?
-        } else {
-            get::videos(
-                app,
-                &[
-                    VideoStatusMarker::Pick,
-                    //
-                    VideoStatusMarker::Watch,
-                    VideoStatusMarker::Cached,
-                ],
-            )
-            .await?
-        };
-
-        // Warmup the cache for the display rendering of the videos.
-        // Otherwise the futures would all try to warm it up at the same time.
-        if let Some(vid) = matching_videos.first() {
-            drop(vid.to_line_display(app).await?);
-        }
+        let matching_videos = get_videos(app, done).await?;
 
-        let mut edit_file = BufWriter::new(&temp_file);
-
-        matching_videos
-            .into_iter()
-            .map(|vid| to_select_file_display_owned(vid, app))
-            .collect::<FuturesOrdered<_>>()
-            .try_collect::<Vec<String>>()
-            .await?
-            .into_iter()
-            .try_for_each(|line| -> Result<()> {
-                edit_file
-                    .write_all(line.as_bytes())
-                    .context("Failed to write to `edit_file`")?;
-
-                Ok(())
-            })?;
-
-        edit_file.write_all(HELP_STR.as_bytes())?;
-        edit_file.flush().context("Failed to flush edit file")?;
-    };
-
-    {
-        let editor = env::var("EDITOR").unwrap_or("nvim".to_owned());
-
-        let mut nvim = Command::new(editor);
-        nvim.arg(temp_file.path());
-        let status = nvim.status().await.context("Falied to run nvim")?;
-        if !status.success() {
-            bail!("nvim exited with error status: {}", status)
-        }
+        write_videos_to_file(app, temp_file.as_file(), &matching_videos).await?;
     }
 
+    open_editor_at(temp_file.path()).await?;
+
     let read_file = temp_file.reopen()?;
     fs::copy(temp_file.path(), &app.config.paths.last_selection_path)
         .context("Failed to persist selection file")?;
 
-    let reader = BufReader::new(&read_file);
+    let processed = process_file(app, &read_file, 0).await?;
+    info!("Processed {processed} records.");
+
+    Ok(())
+}
+
+async fn get_videos(app: &App, include_done: bool) -> Result<Vec<Video>> {
+    if include_done {
+        get::videos(app, VideoStatusMarker::ALL).await
+    } else {
+        get::videos(
+            app,
+            &[
+                VideoStatusMarker::Pick,
+                //
+                VideoStatusMarker::Watch,
+                VideoStatusMarker::Cached,
+            ],
+        )
+        .await
+    }
+}
+
+async fn write_videos_to_file(app: &App, file: &File, videos: &[Video]) -> Result<()> {
+    // Warm-up the cache for the display rendering of the videos.
+    // Otherwise the futures would all try to warm it up at the same time.
+    if let Some(vid) = videos.first() {
+        drop(vid.to_line_display(app).await?);
+    }
+
+    let mut edit_file = BufWriter::new(file);
+
+    videos
+        .iter()
+        .map(|vid| vid.to_select_file_display(app))
+        .collect::<FuturesOrdered<_>>()
+        .try_collect::<Vec<String>>()
+        .await?
+        .into_iter()
+        .try_for_each(|line| -> Result<()> {
+            edit_file
+                .write_all(line.as_bytes())
+                .context("Failed to write to `edit_file`")?;
+
+            Ok(())
+        })?;
+
+    edit_file.write_all(HELP_STR.as_bytes())?;
+    edit_file.flush().context("Failed to flush edit file")?;
+
+    Ok(())
+}
+
+async fn process_file(app: &App, file: &File, processed: i64) -> Result<i64> {
+    let reader = BufReader::new(file);
+
+    let mut line_number = -processed;
 
-    let mut line_number = 0;
     for line in reader.lines() {
         let line = line.context("Failed to read a line")?;
 
@@ -149,7 +243,24 @@ pub async fn select(app: &App, done: bool, use_last_selection: bool) -> Result<(
         }
     }
 
-    Ok(())
+    Ok(line_number * -1)
+}
+
+async fn open_editor_at(path: &Path) -> Result<()> {
+    let editor = env::var("EDITOR").unwrap_or("nvim".to_owned());
+
+    let mut nvim = Command::new(&editor);
+    nvim.arg(path);
+    let status = nvim
+        .status()
+        .await
+        .with_context(|| format!("Falied to run editor: {editor}"))?;
+
+    if status.success() {
+        Ok(())
+    } else {
+        bail!("Editor ({editor}) exited with error status: {}", status)
+    }
 }
 
 // // FIXME: There should be no reason why we need to re-run yt, just to get the help string. But I've
diff --git a/crates/yt/src/select/selection_file/duration.rs b/crates/yt/src/select/selection_file/duration.rs
index 77c4fc5..668a0b8 100644
--- a/crates/yt/src/select/selection_file/duration.rs
+++ b/crates/yt/src/select/selection_file/duration.rs
@@ -12,7 +12,7 @@
 use std::str::FromStr;
 use std::time::Duration;
 
-use anyhow::{Context, Result};
+use anyhow::{Result, bail};
 
 const SECOND: u64 = 1;
 const MINUTE: u64 = 60 * SECOND;
@@ -73,52 +73,109 @@ impl FromStr for MaybeDuration {
     type Err = anyhow::Error;
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
-        fn parse_num(str: &str, suffix: char) -> Result<u64> {
-            str.strip_suffix(suffix)
-                .with_context(|| format!("Failed to strip suffix '{suffix}' of number: '{str}'"))?
-                .parse::<u64>()
-                .with_context(|| format!("Failed to parse '{suffix}'"))
+        #[derive(Debug, Clone, Copy)]
+        enum Token {
+            Number(u64),
+            UnitConstant((char, u64)),
+        }
+
+        struct Tokenizer<'a> {
+            input: &'a str,
+        }
+
+        impl Tokenizer<'_> {
+            fn next(&mut self) -> Result<Option<Token>> {
+                loop {
+                    if let Some(next) = self.peek() {
+                        match next {
+                            '0'..='9' => {
+                                let mut number = self.expect_num();
+                                while matches!(self.peek(), Some('0'..='9')) {
+                                    number *= 10;
+                                    number += self.expect_num();
+                                }
+                                break Ok(Some(Token::Number(number)));
+                            }
+                            's' => {
+                                self.chomp();
+                                break Ok(Some(Token::UnitConstant(('s', SECOND))));
+                            }
+                            'm' => {
+                                self.chomp();
+                                break Ok(Some(Token::UnitConstant(('m', MINUTE))));
+                            }
+                            'h' => {
+                                self.chomp();
+                                break Ok(Some(Token::UnitConstant(('h', HOUR))));
+                            }
+                            'd' => {
+                                self.chomp();
+                                break Ok(Some(Token::UnitConstant(('d', DAY))));
+                            }
+                            ' ' => {
+                                // Simply ignore white space
+                                self.chomp();
+                            }
+                            other => bail!("Unknown unit: {other:#?}"),
+                        }
+                    } else {
+                        break Ok(None);
+                    }
+                }
+            }
+
+            fn chomp(&mut self) {
+                self.input = &self.input[1..];
+            }
+
+            fn peek(&self) -> Option<char> {
+                self.input.chars().next()
+            }
+
+            fn expect_num(&mut self) -> u64 {
+                let next = self.peek().expect("Should be some at this point");
+                self.chomp();
+                assert!(next.is_ascii_digit());
+                (next as u64) - ('0' as u64)
+            }
         }
 
         if s == "[No duration]" {
             return Ok(Self { time: None });
         }
 
-        let buf: Vec<_> = s.split(' ').collect();
-
-        let days;
-        let hours;
-        let minutes;
-        let seconds;
-
-        assert_eq!(buf.len(), 2, "Other lengths should not happen");
-
-        if buf[0].ends_with('d') {
-            days = parse_num(buf[0], 'd')?;
-            hours = parse_num(buf[1], 'h')?;
-            minutes = parse_num(buf[2], 'm')?;
-            seconds = 0;
-        } else if buf[0].ends_with('h') {
-            days = 0;
-            hours = parse_num(buf[0], 'h')?;
-            minutes = parse_num(buf[1], 'm')?;
-            seconds = 0;
-        } else if buf[0].ends_with('m') {
-            days = 0;
-            hours = 0;
-            minutes = parse_num(buf[0], 'm')?;
-            seconds = parse_num(buf[1], 's')?;
-        } else {
-            unreachable!(
-                "The first part always ends with 'h' or 'm', but was: {:#?}",
-                buf
-            )
+        let mut tokenizer = Tokenizer { input: s };
+
+        let mut value = 0;
+        let mut current_val = None;
+        while let Some(token) = tokenizer.next()? {
+            match token {
+                Token::Number(number) => {
+                    if let Some(current_val) = current_val {
+                        bail!("Failed to find unit for number: {current_val}");
+                    }
+
+                    {
+                        current_val = Some(number);
+                    }
+                }
+                Token::UnitConstant((name, unit)) => {
+                    if let Some(cval) = current_val {
+                        value += cval * unit;
+                        current_val = None;
+                    } else {
+                        bail!("Found unit without number: {name:#?}");
+                    }
+                }
+            }
+        }
+
+        if let Some(current_val) = current_val {
+            bail!("Duration endet without unit, number was: {current_val}");
         }
 
         Ok(Self {
-            time: Some(Duration::from_secs(
-                days * DAY + hours * HOUR + minutes * MINUTE + seconds * SECOND,
-            )),
+            time: Some(Duration::from_secs(value)),
         })
     }
 }
@@ -156,30 +213,34 @@ mod test {
 
     use super::MaybeDuration;
 
-    #[test]
-    fn test_display_duration_1h() {
-        let dur = MaybeDuration::from_secs(HOUR);
-        assert_eq!("1h 0m".to_owned(), dur.to_string());
+    fn mk_roundtrip(input: MaybeDuration, expected: &str) {
+        let output = MaybeDuration::from_str(expected).unwrap();
+
+        assert_eq!(input.to_string(), output.to_string());
+        assert_eq!(input.to_string(), expected);
+        assert_eq!(
+            MaybeDuration::from_str(input.to_string().as_str()).unwrap(),
+            output
+        );
     }
+
     #[test]
-    fn test_display_duration_30min() {
-        let dur = MaybeDuration::from_secs(MINUTE * 30);
-        assert_eq!("30m 0s".to_owned(), dur.to_string());
+    fn test_roundtrip_duration_1h() {
+        mk_roundtrip(MaybeDuration::from_secs(HOUR), "1h 0m");
     }
     #[test]
-    fn test_display_duration_1d() {
-        let dur = MaybeDuration::from_secs(DAY + MINUTE * 30 + HOUR * 2);
-        assert_eq!("1d 2h 30m".to_owned(), dur.to_string());
+    fn test_roundtrip_duration_30min() {
+        mk_roundtrip(MaybeDuration::from_secs(MINUTE * 30), "30m 0s");
     }
-
     #[test]
-    fn test_display_duration_roundtrip() {
-        let dur = MaybeDuration::zero();
-        let dur_str = dur.to_string();
-
-        assert_eq!(
-            MaybeDuration::zero(),
-            MaybeDuration::from_str(&dur_str).unwrap()
+    fn test_roundtrip_duration_1d() {
+        mk_roundtrip(
+            MaybeDuration::from_secs(DAY + MINUTE * 30 + HOUR * 2),
+            "1d 2h 30m",
         );
     }
+    #[test]
+    fn test_roundtrip_duration_none() {
+        mk_roundtrip(MaybeDuration::from_maybe_secs_f64(None), "[No duration]");
+    }
 }
diff --git a/crates/yt/src/update/mod.rs b/crates/yt/src/update/mod.rs
index f0b1e2c..d866882 100644
--- a/crates/yt/src/update/mod.rs
+++ b/crates/yt/src/update/mod.rs
@@ -13,7 +13,7 @@ use std::{str::FromStr, time::Duration};
 
 use anyhow::{Context, Ok, Result};
 use chrono::{DateTime, Utc};
-use log::{info, warn};
+use log::warn;
 use url::Url;
 use yt_dlp::{InfoJson, json_cast, json_get};
 
@@ -36,26 +36,18 @@ pub async fn update(
     app: &App,
     max_backlog: usize,
     subscription_names_to_update: Vec<String>,
+    total_number: Option<usize>,
+    current_progress: Option<usize>,
 ) -> Result<()> {
     let subscriptions = subscriptions::get(app).await?;
 
-    let urls: Vec<_> = if subscription_names_to_update.is_empty() {
-        subscriptions.0.values().collect()
+    let subs: Vec<Subscription> = if subscription_names_to_update.is_empty() {
+        subscriptions.0.into_values().collect()
     } else {
         subscriptions
             .0
-            .values()
-            .filter(|sub| {
-                if subscription_names_to_update.contains(&sub.name) {
-                    true
-                } else {
-                    info!(
-                        "Not updating subscription '{}' as it was not specified",
-                        sub.name
-                    );
-                    false
-                }
-            })
+            .into_values()
+            .filter(|sub| subscription_names_to_update.contains(&sub.name))
             .collect()
     };
 
@@ -63,10 +55,10 @@ pub async fn update(
     // should not contain duplicates.
     let hashes = get_all_hashes(app).await?;
 
-    {
-        let mut updater = Updater::new(max_backlog, &hashes);
-        updater.update(app, &urls).await?;
-    }
+    let updater = Updater::new(max_backlog, hashes);
+    updater
+        .update(app, subs, total_number, current_progress)
+        .await?;
 
     Ok(())
 }
@@ -144,7 +136,14 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res
 
     let url = {
         let smug_url: Url = json_get!(entry, "webpage_url", as_str).parse()?;
-        // unsmuggle_url(&smug_url)?
+        // TODO(@bpeetz): We should probably add this? <2025-06-14>
+        // if '#__youtubedl_smuggle' not in smug_url:
+        //     return smug_url, default
+        // url, _, sdata = smug_url.rpartition('#')
+        // jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
+        // data = json.loads(jsond)
+        // return url, data
+
         smug_url
     };
 
@@ -152,13 +151,15 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res
 
     let subscription_name = if let Some(sub) = sub {
         Some(sub.name.clone())
-    } else if let Some(uploader) = entry.get("uploader") {
-        if entry.get("webpage_url_domain")
-            == Some(&serde_json::Value::String("youtube.com".to_owned()))
+    } else if let Some(uploader) = entry.get("uploader").map(|val| json_cast!(val, as_str)) {
+        if entry
+            .get("webpage_url_domain")
+            .map(|val| json_cast!(val, as_str))
+            == Some("youtube.com")
         {
             Some(format!("{uploader} - Videos"))
         } else {
-            Some(json_cast!(uploader, as_str).to_owned())
+            Some(uploader.to_owned())
         }
     } else {
         None
@@ -185,9 +186,9 @@ pub fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) -> Res
     Ok(video)
 }
 
-async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> {
-    let video =
-        video_entry_to_video(&entry, Some(sub)).context("Failed to parse search entry as Video")?;
+async fn process_subscription(app: &App, sub: Subscription, entry: InfoJson) -> Result<()> {
+    let video = video_entry_to_video(&entry, Some(&sub))
+        .context("Failed to parse search entry as Video")?;
 
     add_video(app, video.clone())
         .await
diff --git a/crates/yt/src/update/updater.rs b/crates/yt/src/update/updater.rs
index 8da654b..04bcaa1 100644
--- a/crates/yt/src/update/updater.rs
+++ b/crates/yt/src/update/updater.rs
@@ -8,17 +8,18 @@
 // You should have received a copy of the License along with this program.
 // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
 
-use std::io::{Write, stderr};
+use std::{
+    io::{Write, stderr},
+    sync::atomic::{AtomicUsize, Ordering},
+};
 
 use anyhow::{Context, Result};
 use blake3::Hash;
-use futures::{
-    StreamExt, TryStreamExt,
-    stream::{self},
-};
+use futures::{StreamExt, future::join_all, stream};
 use log::{Level, debug, error, log_enabled};
 use serde_json::json;
-use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get};
+use tokio_util::task::LocalPoolHandle;
+use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get, process_ie_result};
 
 use crate::{
     ansi_escape_codes::{clear_whole_line, move_to_col},
@@ -28,44 +29,55 @@ use crate::{
 
 use super::process_subscription;
 
-pub(super) struct Updater<'a> {
+pub(super) struct Updater {
     max_backlog: usize,
-    hashes: &'a [Hash],
+    hashes: Vec<Hash>,
+    pool: LocalPoolHandle,
 }
 
-impl<'a> Updater<'a> {
-    pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self {
+static REACHED_NUMBER: AtomicUsize = const { AtomicUsize::new(1) };
+
+impl Updater {
+    pub(super) fn new(max_backlog: usize, hashes: Vec<Hash>) -> Self {
+        // TODO(@bpeetz): The number should not be hardcoded. <2025-06-14>
+        let pool = LocalPoolHandle::new(16);
+
         Self {
             max_backlog,
             hashes,
+            pool,
         }
     }
 
     pub(super) async fn update(
-        &mut self,
+        self,
         app: &App,
-        subscriptions: &[&Subscription],
+        subscriptions: Vec<Subscription>,
+        total_number: Option<usize>,
+        current_progress: Option<usize>,
     ) -> Result<()> {
+        let total_number = total_number.unwrap_or(subscriptions.len());
+
+        if let Some(current_progress) = current_progress {
+            REACHED_NUMBER.store(current_progress, Ordering::Relaxed);
+        }
+
         let mut stream = stream::iter(subscriptions)
-            .map(|sub| self.get_new_entries(sub))
-            .buffer_unordered(100);
+            .map(|sub| self.get_new_entries(sub, total_number))
+            .buffer_unordered(16 * 4);
 
         while let Some(output) = stream.next().await {
             let mut entries = output?;
 
-            if entries.is_empty() {
-                continue;
-            }
-
-            let (sub, entry) = entries.remove(0);
-            process_subscription(app, sub, entry).await?;
+            if let Some(next) = entries.next() {
+                let (sub, entry) = next;
+                process_subscription(app, sub, entry).await?;
 
-            let entry_stream: Result<()> = stream::iter(entries)
-                .map(|(sub, entry)| process_subscription(app, sub, entry))
-                .buffer_unordered(100)
-                .try_collect()
-                .await;
-            entry_stream?;
+                join_all(entries.map(|(sub, entry)| process_subscription(app, sub, entry)))
+                    .await
+                    .into_iter()
+                    .collect::<Result<(), _>>()?;
+            }
         }
 
         Ok(())
@@ -73,11 +85,15 @@ impl<'a> Updater<'a> {
 
     async fn get_new_entries(
         &self,
-        sub: &'a Subscription,
-    ) -> Result<Vec<(&'a Subscription, InfoJson)>> {
+        sub: Subscription,
+        total_number: usize,
+    ) -> Result<impl Iterator<Item = (Subscription, InfoJson)>> {
+        let max_backlog = self.max_backlog;
+        let hashes = self.hashes.clone();
+
         let yt_dlp = YoutubeDLOptions::new()
             .set("playliststart", 1)
-            .set("playlistend", self.max_backlog)
+            .set("playlistend", max_backlog)
             .set("noplaylist", false)
             .set(
                 "extractor_args",
@@ -88,80 +104,84 @@ impl<'a> Updater<'a> {
             .set("match-filter", "availability=public")
             .build()?;
 
-        if !log_enabled!(Level::Debug) {
-            clear_whole_line();
-            move_to_col(1);
-            eprint!("Checking playlist {}...", sub.name);
-            move_to_col(1);
-            stderr().flush()?;
-        }
-
-        let info = yt_dlp
-            .extract_info(&sub.url, false, false)
-            .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?;
-
-        let empty = vec![];
-        let entries = info
-            .get("entries")
-            .map_or(&empty, |val| json_cast!(val, as_array));
-
-        let valid_entries: Vec<(&Subscription, InfoJson)> = entries
-            .iter()
-            .take(self.max_backlog)
-            .filter_map(|entry| -> Option<(&Subscription, InfoJson)> {
-                let id = json_get!(entry, "id", as_str);
-                let extractor_hash = blake3::hash(id.as_bytes());
-                if self.hashes.contains(&extractor_hash) {
-                    debug!("Skipping entry, as it is already present: '{extractor_hash}'",);
-                    None
-                } else {
-                    Some((sub, json_cast!(entry, as_object).to_owned()))
+        self.pool
+            .spawn_pinned(move || {
+                async move {
+                    if !log_enabled!(Level::Debug) {
+                        clear_whole_line();
+                        move_to_col(1);
+                        eprint!(
+                            "({}/{total_number}) Checking playlist {}...",
+                            REACHED_NUMBER.fetch_add(1, Ordering::Relaxed),
+                            sub.name
+                        );
+                        move_to_col(1);
+                        stderr().flush()?;
+                    }
+
+                    let info = yt_dlp
+                        .extract_info(&sub.url, false, false)
+                        .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?;
+
+                    let empty = vec![];
+                    let entries = info
+                        .get("entries")
+                        .map_or(&empty, |val| json_cast!(val, as_array));
+
+                    let valid_entries: Vec<(Subscription, InfoJson)> = entries
+                        .iter()
+                        .take(max_backlog)
+                        .filter_map(|entry| -> Option<(Subscription, InfoJson)> {
+                            let id = json_get!(entry, "id", as_str);
+                            let extractor_hash = blake3::hash(id.as_bytes());
+
+                            if hashes.contains(&extractor_hash) {
+                                debug!(
+                                    "Skipping entry, as it is already present: '{extractor_hash}'",
+                                );
+                                None
+                            } else {
+                                Some((sub.clone(), json_cast!(entry, as_object).to_owned()))
+                            }
+                        })
+                        .collect();
+
+                    Ok(valid_entries
+                        .into_iter()
+                        .map(|(sub, entry)| {
+                            let inner_yt_dlp = YoutubeDLOptions::new()
+                                .set("noplaylist", true)
+                                .build()
+                                .expect("Worked before, should work now");
+
+                            match inner_yt_dlp.process_ie_result(entry, false) {
+                                Ok(output) => Ok((sub, output)),
+                                Err(err) => Err(err),
+                            }
+                        })
+                        // Don't fail the whole update, if one of the entries fails to fetch.
+                        .filter_map(|base| match base {
+                            Ok(ok) => Some(ok),
+                            Err(err) => {
+                                let process_ie_result::Error::Python(err) = &err;
+
+                                if err.contains(
+                                    "Join this channel to get access to members-only content ",
+                                ) {
+                                    // Hide this error
+                                } else {
+                                    // Show the error, but don't fail.
+                                    let error = err
+                                        .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ")
+                                        .unwrap_or(err);
+                                    error!("{error}");
+                                }
+
+                                None
+                            }
+                        }))
                 }
             })
-            .collect();
-
-        let processed_entries: Vec<(&Subscription, InfoJson)> = stream::iter(valid_entries)
-            .map(
-                async |(sub, entry)| match yt_dlp.process_ie_result(entry, false) {
-                    Ok(output) => Ok((sub, output)),
-                    Err(err) => Err(err),
-                },
-            )
-            .buffer_unordered(100)
-            .collect::<Vec<_>>()
-            .await
-            .into_iter()
-            // Don't fail the whole update, if one of the entries fails to fetch.
-            .filter_map(|base| match base {
-                Ok(ok) => Some(ok),
-                Err(err) => {
-                    // TODO(@bpeetz): Add this <2025-06-13>
-                    // if let YtDlpError::PythonError { error, kind } = &err {
-                    //     if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>"
-                    //         && error.to_string().as_str().contains(
-                    //             "Join this channel to get access to members-only content ",
-                    //         )
-                    //     {
-                    //         // Hide this error
-                    //     } else {
-                    //         let error_string = error.to_string();
-                    //         let error = error_string
-                    //             .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ")
-                    //             .expect("This prefix should exists");
-                    //         error!("{error}");
-                    //     }
-                    //     return None;
-                    // }
-
-                    // TODO(@bpeetz): Ideally, we _would_ actually exit on unexpected errors, but
-                    // this is fine for now.  <2025-06-13>
-                    // Some(Err(err).context("Failed to process new entries."))
-                    error!("While processing entry: {err}");
-                    None
-                }
-            })
-            .collect();
-
-        Ok(processed_entries)
+            .await?
     }
 }
diff --git a/crates/yt/src/version/mod.rs b/crates/yt/src/version/mod.rs
index 05d85e0..9a91f3b 100644
--- a/crates/yt/src/version/mod.rs
+++ b/crates/yt/src/version/mod.rs
@@ -8,26 +8,12 @@
 // You should have received a copy of the License along with this program.
 // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
 
-use std::process::Command;
-
 use anyhow::{Context, Result};
 use sqlx::{SqlitePool, sqlite::SqliteConnectOptions};
+use yt_dlp::YoutubeDLOptions;
 
 use crate::{config::Config, storage::migrate::get_version_db};
 
-fn get_cmd_version(cmd: &str) -> Result<String> {
-    let out = String::from_utf8(
-        Command::new(cmd)
-            .arg("--version")
-            .output()
-            .with_context(|| format!("Failed to run `{cmd} --version`"))?
-            .stdout,
-    )
-    .context("Failed to interpret output as utf8")?;
-
-    Ok(out.trim().to_owned())
-}
-
 pub async fn show(config: &Config) -> Result<()> {
     let db_version = {
         let options = SqliteConnectOptions::new()
@@ -44,16 +30,16 @@ pub async fn show(config: &Config) -> Result<()> {
             .context("Failed to determine database version")?
     };
 
-    // TODO(@bpeetz): Use `pyo3`'s build in mechanism instead of executing the python CLI <2025-02-21>
-    let python_version = get_cmd_version("python")?;
-    let yt_dlp_version = get_cmd_version("yt-dlp")?;
+    let yt_dlp_version = {
+        let yt_dlp = YoutubeDLOptions::new().build()?;
+        yt_dlp.version()
+    };
 
     println!(
         "{}: {}
 
 db version: {db_version}
 
-python: {python_version}
 yt-dlp: {yt_dlp_version}",
         env!("CARGO_PKG_NAME"),
         env!("CARGO_PKG_VERSION"),
diff --git a/crates/yt/src/watch/playlist_handler/client_messages/mod.rs b/crates/yt/src/watch/playlist_handler/client_messages/mod.rs
index 6f7a59e..c05ca87 100644
--- a/crates/yt/src/watch/playlist_handler/client_messages/mod.rs
+++ b/crates/yt/src/watch/playlist_handler/client_messages/mod.rs
@@ -19,6 +19,7 @@ use tokio::process::Command;
 use super::mpv_message;
 
 async fn run_self_in_external_command(app: &App, args: &[&str]) -> Result<()> {
+    // TODO(@bpeetz): Can we trust this value? <2025-06-15>
     let binary =
         env::current_exe().context("Failed to determine the current executable to re-execute")?;
 
diff --git a/crates/yt_dlp/Cargo.toml b/crates/yt_dlp/Cargo.toml
index ddd5f9b..90f2e10 100644
--- a/crates/yt_dlp/Cargo.toml
+++ b/crates/yt_dlp/Cargo.toml
@@ -10,7 +10,7 @@
 
 [package]
 name = "yt_dlp"
-description = "A rust fii wrapper library for the python yt_dlp library"
+description = "A rust ffi wrapper library for the python yt_dlp library"
 keywords = []
 categories = []
 version.workspace = true
diff --git a/crates/yt_dlp/src/lib.rs b/crates/yt_dlp/src/lib.rs
index 34b8a5d..dd42fc6 100644
--- a/crates/yt_dlp/src/lib.rs
+++ b/crates/yt_dlp/src/lib.rs
@@ -1,19 +1,18 @@
 //! The `yt_dlp` interface is completely contained in the [`YoutubeDL`] structure.
 
-use std::io::Write;
-use std::mem;
-use std::{env, fs::File, path::PathBuf};
+use std::{self, env, mem, path::PathBuf};
 
 use indexmap::IndexMap;
 use log::{Level, debug, error, info, log_enabled};
 use logging::setup_logging;
-use rustpython::vm::builtins::PyList;
 use rustpython::{
     InterpreterConfig,
     vm::{
-        self, Interpreter, PyObjectRef, PyRef, VirtualMachine,
-        builtins::{PyBaseException, PyDict, PyStr},
+        self, AsObject, Interpreter, PyObjectRef, PyPayload, PyRef, VirtualMachine,
+        builtins::{PyBaseException, PyBaseExceptionRef, PyDict, PyList, PyStr},
         function::{FuncArgs, KwArgs, PosArgs},
+        py_io::Write,
+        suggestion::offer_suggestions,
     },
 };
 use url::Url;
@@ -177,12 +176,16 @@ impl YoutubeDL {
 
             Ok::<_, PyRef<PyBaseException>>((yt_dlp_module, youtube_dl_class))
         }) {
-            Ok(ok) => ok,
+            Ok(ok) => Ok(ok),
             Err(err) => {
-                interpreter.finalize(Some(err));
-                return Err(build::Error::Python);
+                // TODO(@bpeetz): Do we want to run `interpreter.finalize` here? <2025-06-14>
+                // interpreter.finalize(Some(err));
+                interpreter.enter(|vm| {
+                    let buffer = process_exception(vm, &err);
+                    Err(build::Error::Python(buffer))
+                })
             }
-        };
+        }?;
 
         Ok(Self {
             interpreter,
@@ -313,30 +316,13 @@ impl YoutubeDL {
 
             let result_json = json_dumps(result, vm);
 
-            if let Ok(confirm) = env::var("YT_STORE_INFO_JSON") {
-                if confirm == "yes" {
-                    let mut file = File::create("output.info.json").unwrap();
-                    write!(
-                        file,
-                        "{}",
-                        serde_json::to_string_pretty(&serde_json::Value::Object(
-                            result_json.clone()
-                        ))
-                        .expect("Valid json")
-                    )
-                    .unwrap();
-                }
-            }
-
             Ok::<_, PyRef<PyBaseException>>(result_json)
         }) {
             Ok(ok) => Ok(ok),
-            Err(err) => {
-                self.interpreter.enter(|vm| {
-                    vm.print_exception(err);
-                });
-                Err(extract_info::Error::Python)
-            }
+            Err(err) => self.interpreter.enter(|vm| {
+                let buffer = process_exception(vm, &err);
+                Err(extract_info::Error::Python(buffer))
+            }),
         }
     }
 
@@ -387,30 +373,28 @@ impl YoutubeDL {
             Ok::<_, PyRef<PyBaseException>>(result_json)
         }) {
             Ok(ok) => Ok(ok),
-            Err(err) => {
-                self.interpreter.enter(|vm| {
-                    vm.print_exception(err);
-                });
-                Err(process_ie_result::Error::Python)
-            }
+            Err(err) => self.interpreter.enter(|vm| {
+                let buffer = process_exception(vm, &err);
+                Err(process_ie_result::Error::Python(buffer))
+            }),
         }
     }
 }
 
 #[allow(missing_docs)]
 pub mod process_ie_result {
-    #[derive(Debug, thiserror::Error, Clone, Copy)]
+    #[derive(Debug, thiserror::Error)]
     pub enum Error {
-        #[error("Python threw an exception")]
-        Python,
+        #[error("Python threw an exception: {0}")]
+        Python(String),
     }
 }
 #[allow(missing_docs)]
 pub mod extract_info {
-    #[derive(Debug, thiserror::Error, Clone, Copy)]
+    #[derive(Debug, thiserror::Error)]
     pub enum Error {
-        #[error("Python threw an exception")]
-        Python,
+        #[error("Python threw an exception: {0}")]
+        Python(String),
     }
 }
 
@@ -488,8 +472,8 @@ impl YoutubeDLOptions {
 pub mod build {
     #[derive(Debug, thiserror::Error)]
     pub enum Error {
-        #[error("Python threw an exception")]
-        Python,
+        #[error("Python threw an exception: {0}")]
+        Python(String),
 
         #[error("Io error: {0}")]
         Io(#[from] std::io::Error),
@@ -539,3 +523,84 @@ pub fn json_dumps(
         _ => unreachable!("These should not be json.dumps output"),
     }
 }
+
+// Inlined and changed from `vm.write_exception_inner`
+fn write_exception<W: Write>(
+    vm: &VirtualMachine,
+    output: &mut W,
+    exc: &PyBaseExceptionRef,
+) -> Result<(), W::Error> {
+    let varargs = exc.args();
+    let args_repr = {
+        match varargs.len() {
+            0 => vec![],
+            1 => {
+                let args0_repr = if true {
+                    varargs[0]
+                        .str(vm)
+                        .unwrap_or_else(|_| PyStr::from("<element str() failed>").into_ref(&vm.ctx))
+                } else {
+                    varargs[0].repr(vm).unwrap_or_else(|_| {
+                        PyStr::from("<element repr() failed>").into_ref(&vm.ctx)
+                    })
+                };
+                vec![args0_repr]
+            }
+            _ => varargs
+                .iter()
+                .map(|vararg| {
+                    vararg.repr(vm).unwrap_or_else(|_| {
+                        PyStr::from("<element repr() failed>").into_ref(&vm.ctx)
+                    })
+                })
+                .collect(),
+        }
+    };
+
+    let exc_class = exc.class();
+
+    if exc_class.fast_issubclass(vm.ctx.exceptions.syntax_error) {
+        unreachable!(
+            "A syntax error should never be raised, \
+                                as yt_dlp should not have them and neither our embedded code"
+        );
+    }
+
+    let exc_name = exc_class.name();
+    match args_repr.len() {
+        0 => write!(output, "{exc_name}"),
+        1 => write!(output, "{}: {}", exc_name, args_repr[0]),
+        _ => write!(
+            output,
+            "{}: ({})",
+            exc_name,
+            args_repr
+                .iter()
+                .map(|val| val.as_str())
+                .collect::<Vec<_>>()
+                .join(", "),
+        ),
+    }?;
+
+    match offer_suggestions(exc, vm) {
+        Some(suggestions) => {
+            write!(output, ". Did you mean: '{suggestions}'?")
+        }
+        None => Ok(()),
+    }
+}
+
+fn process_exception(vm: &VirtualMachine, err: &PyBaseExceptionRef) -> String {
+    let mut buffer = String::new();
+    write_exception(vm, &mut buffer, err)
+        .expect("We are writing into an *in-memory* string, it will always work");
+
+    if log_enabled!(Level::Debug) {
+        let mut output = String::new();
+        vm.write_exception(&mut output, err)
+            .expect("We are writing into an *in-memory* string, it will always work");
+        debug!("Python threw an exception: {output}");
+    }
+
+    buffer
+}