about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-16 09:54:24 +0100
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-16 09:54:24 +0100
commit1d7bc17e62a64ec213e43530b050bc41f978c610 (patch)
tree2bf8c1804d6271c8939c6c5d4d50ec42682565dd
parentchore(yt): Change the type of `max_backlog` to `usize` (diff)
downloadyt-1d7bc17e62a64ec213e43530b050bc41f978c610.zip
feat(yt/update): Port the Python updater to rust
This has the massive upside, that we no longer need to communicate via
JSON, and thus can filter errors and use included rust logger.
The big downside of this approach is, that this port regresses the
update speed by a factor of 3 (i.e., previously updating took 1 min for
my set of subscriptions, it now takes 3 mins).
-rw-r--r--yt/src/main.rs2
-rw-r--r--yt/src/update/mod.rs151
-rw-r--r--yt/src/update/updater.rs162
3 files changed, 207 insertions, 108 deletions
diff --git a/yt/src/main.rs b/yt/src/main.rs
index 4378346..5b5eda4 100644
--- a/yt/src/main.rs
+++ b/yt/src/main.rs
@@ -173,7 +173,7 @@ async fn main() -> Result<()> {
 
             let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog);
 
-            update::update(&app, max_backlog, subscriptions, verbosity).await?;
+            update::update(&app, max_backlog, subscriptions).await?;
         }
         Command::Subscriptions { cmd } => match cmd {
             SubscriptionCommand::Add { name, url } => {
diff --git a/yt/src/update/mod.rs b/yt/src/update/mod.rs
index 3cdc61b..730e7c0 100644
--- a/yt/src/update/mod.rs
+++ b/yt/src/update/mod.rs
@@ -8,116 +8,63 @@
 // You should have received a copy of the License along with this program.
 // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
 
-use std::{collections::HashMap, process::Stdio, str::FromStr, string::ToString};
+use std::str::FromStr;
 
 use anyhow::{Context, Ok, Result};
 use chrono::{DateTime, Utc};
-use log::{error, info, warn};
-use tokio::{
-    io::{AsyncBufReadExt, BufReader},
-    process::Command,
-};
+use log::{info, warn};
 use url::Url;
 use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson};
 
 use crate::{
     app::App,
     storage::{
-        subscriptions::{get, Subscription},
+        subscriptions::{self, Subscription},
         video_database::{
             extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video,
             VideoStatus,
         },
     },
-    unreachable::Unreachable,
     videos::display::format_video::FormatVideo,
 };
 
+mod updater;
+use updater::Updater;
+
 pub async fn update(
     app: &App,
-    max_backlog: u32,
-    subs_to_update: Vec<String>,
-    verbosity: u8,
+    max_backlog: usize,
+    subscription_names_to_update: Vec<String>,
 ) -> Result<()> {
-    let subscriptions = get(app).await?;
-    let mut back_subs: HashMap<Url, Subscription> = HashMap::new();
-    let logging = verbosity > 0;
-    let log_level = match verbosity {
-        // 0 => 50,   // logging.CRITICAL
-        0 => 40,   // logging.ERROR
-        1 => 30,   // logging.WARNING
-        2 => 20,   // logging.INFO
-        3.. => 10, // logging.DEBUG
-    };
-    info!("Passing log_level {} to the update script", log_level);
+    let subscriptions = subscriptions::get(app).await?;
 
-    let mut urls: Vec<String> = vec![];
-    for (name, sub) in subscriptions.0 {
-        if subs_to_update.contains(&name) || subs_to_update.is_empty() {
-            urls.push(sub.url.to_string());
-            back_subs.insert(sub.url.clone(), sub);
-        } else {
-            info!(
-                "Not updating subscription '{}' as it was not specified",
-                name
-            );
-        }
-    }
+    let urls: Vec<_> = if subscription_names_to_update.is_empty() {
+        subscriptions.0.values().collect()
+    } else {
+        subscriptions
+            .0
+            .values()
+            .filter(|sub| {
+                if subscription_names_to_update.contains(&sub.name) {
+                    true
+                } else {
+                    info!(
+                        "Not updating subscription '{}' as it was not specified",
+                        sub.name
+                    );
+                    false
+                }
+            })
+            .collect()
+    };
 
     // We can get away with not having to re-fetch the hashes every time, as the returned video
     // should not contain duplicates.
     let hashes = get_all_hashes(app).await?;
 
-    let mut child = Command::new("raw_update.py")
-        .arg(max_backlog.to_string())
-        .arg(urls.len().to_string())
-        .arg(log_level.to_string())
-        .args(&urls)
-        .args(hashes.iter().map(ToString::to_string).collect::<Vec<_>>())
-        .stdout(Stdio::piped())
-        .stderr(if logging {
-            Stdio::inherit()
-        } else {
-            Stdio::null()
-        })
-        .stdin(Stdio::null())
-        .spawn()
-        .context("Failed to call python3 update_raw")?;
-
-    let mut out = BufReader::new(
-        child
-            .stdout
-            .take()
-            .unreachable("Should be able to take child stdout"),
-    )
-    .lines();
-
-    while let Some(line) = out.next_line().await? {
-        // use tokio::{fs::File, io::AsyncWriteExt};
-        // let mut output = File::create("output.json").await?;
-        // output.write(line.as_bytes()).await?;
-        // output.flush().await?;
-        // output.sync_all().await?;
-        // drop(output);
-
-        let output_json: HashMap<Url, InfoJson> = serde_json::from_str(&line)
-            .unreachable("The json is generated by our own script. It should be valid");
-
-        for (url, value) in output_json {
-            let sub = back_subs.get(&url).unreachable("This was stored before");
-            process_subscription(app, sub, value, &hashes)
-                .await
-                .with_context(|| format!("Failed to process subscription: '{}'", sub.name))?;
-        }
-    }
-
-    let out = child.wait().await?;
-    if !out.success() {
-        error!(
-            "The update_raw.py invokation failed (exit code: {}).",
-            out.code()
-                .map_or("<No exit code>".to_owned(), |f| f.to_string())
-        );
+    {
+        let mut updater = Updater::new(max_backlog, &hashes);
+        updater.update(app, &urls).await?;
     }
 
     Ok(())
@@ -231,31 +178,21 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu
     Ok(video)
 }
 
-async fn process_subscription(
-    app: &App,
-    sub: &Subscription,
-    entry: InfoJson,
-    hashes: &[blake3::Hash],
-) -> Result<()> {
+async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> {
     let video =
         video_entry_to_video(entry, Some(sub)).context("Failed to parse search entry as Video")?;
 
-    if hashes.contains(video.extractor_hash.hash()) {
-        // We already stored the video information
-        unreachable!("The python update script should have never provided us a duplicated video");
-    } else {
-        add_video(app, video.clone())
+    add_video(app, video.clone())
+        .await
+        .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
+    println!(
+        "{}",
+        (&video
+            .to_formatted_video(app)
             .await
-            .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
-        println!(
-            "{}",
-            (&video
-                .to_formatted_video(app)
-                .await
-                .with_context(|| format!("Failed to format video: '{}'", video.title))?
-                .colorize(app))
-                .to_line_display()
-        );
-        Ok(())
-    }
+            .with_context(|| format!("Failed to format video: '{}'", video.title))?
+            .colorize(app))
+            .to_line_display()
+    );
+    Ok(())
 }
diff --git a/yt/src/update/updater.rs b/yt/src/update/updater.rs
new file mode 100644
index 0000000..d54def8
--- /dev/null
+++ b/yt/src/update/updater.rs
@@ -0,0 +1,162 @@
+use std::io::{stderr, Write};
+
+use anyhow::{Context, Result};
+use blake3::Hash;
+use futures::{
+    stream::{self},
+    StreamExt, TryStreamExt,
+};
+use log::{debug, error, info, log_enabled, Level};
+use owo_colors::OwoColorize;
+use serde_json::json;
+use yt_dlp::{error::YtDlpError, process_ie_result, wrapper::info_json::InfoJson};
+
+use crate::{app::App, storage::subscriptions::Subscription};
+
+use super::process_subscription;
+
+pub(super) struct Updater<'a> {
+    max_backlog: usize,
+    hashes: &'a [Hash],
+}
+
+impl<'a> Updater<'a> {
+    pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self {
+        Self {
+            max_backlog,
+            hashes,
+        }
+    }
+
+    pub(super) async fn update(
+        &mut self,
+        app: &App,
+        subscriptions: &[&Subscription],
+    ) -> Result<()> {
+        let mut stream = stream::iter(subscriptions)
+            .map(|sub| self.get_new_entries(sub))
+            .buffer_unordered(100);
+
+        while let Some(output) = stream.next().await {
+            let mut entries = output?;
+
+            if entries.is_empty() {
+                continue;
+            }
+
+            let (sub, entry) = entries.remove(0);
+            process_subscription(app, sub, entry).await?;
+
+            let entry_stream: Result<()> = stream::iter(entries)
+                .map(|(sub, entry)| process_subscription(app, sub, entry))
+                .buffer_unordered(100)
+                .try_collect()
+                .await;
+            entry_stream?;
+        }
+
+        Ok(())
+    }
+
+    async fn get_new_entries(
+        &self,
+        sub: &'a Subscription,
+    ) -> Result<Vec<(&'a Subscription, InfoJson)>> {
+        // ANSI ESCAPE CODES Wrappers {{{
+        // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands
+        const CSI: &str = "\x1b[";
+        fn clear_whole_line() {
+            eprint!("{CSI}2K");
+        }
+        fn move_to_col(x: usize) {
+            eprint!("{CSI}{x}G");
+        }
+        // fn hide_cursor() {
+        //     eprint!("{CSI}?25l");
+        // }
+        // fn show_cursor() {
+        //     eprint!("{CSI}?25h");
+        // }
+        // }}}
+
+        let json = json! {
+            {
+                "playliststart": 1,
+                "playlistend": self.max_backlog,
+                "noplaylist": false,
+                "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+            }
+        };
+        let yt_dlp_opts = json.as_object().expect("This is hardcoded");
+
+        if !log_enabled!(Level::Debug) {
+            clear_whole_line();
+            move_to_col(1);
+            eprint!("Checking playlist {}...", sub.name);
+            move_to_col(1);
+            stderr().flush()?;
+        }
+
+        let info = yt_dlp::extract_info(yt_dlp_opts, &sub.url, false, false)
+            .await
+            .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?;
+
+        let entries = info.entries.unwrap_or(vec![]);
+        let valid_entries: Vec<(&Subscription, InfoJson)> = entries
+            .into_iter()
+            .take(self.max_backlog)
+            .filter_map(|entry| -> Option<(&Subscription, InfoJson)> {
+                let id = entry.id.as_ref().expect("Should exist?");
+                let extractor_hash = blake3::hash(id.as_bytes());
+                if self.hashes.contains(&extractor_hash) {
+                    debug!(
+                        "Skipping entry, as it is already present: '{}'",
+                        extractor_hash
+                    );
+                    None
+                } else {
+                    Some((sub, entry))
+                }
+            })
+            .collect();
+
+        let processed_entries = {
+            let base: Result<Vec<(&Subscription, InfoJson)>, YtDlpError> =
+                stream::iter(valid_entries)
+                    .map(|(sub, entry)| async move {
+                        match process_ie_result(yt_dlp_opts, entry, false).await {
+                            Ok(output) => Ok((sub, output)),
+                            Err(err) => Err(err),
+                        }
+                    })
+                    .buffer_unordered(100)
+                    .try_collect()
+                    .await;
+            match base {
+                Ok(ok) => ok,
+                Err(err) => {
+                    if let YtDlpError::PythonError { error, kind } = &err {
+                        if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>"
+                            && error.to_string().as_str().contains(
+                                "Join this channel to get access to members-only content ",
+                            )
+                        {
+                            vec![]
+                        } else {
+                            let error_string = error.to_string();
+                            let error = error_string
+                                .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ")
+                                .expect("This prefix should exists");
+                            error!("{error}");
+                            vec![]
+                        }
+                    } else {
+                        Err(err).context("Failed to process new entries.")?
+                    }
+                }
+            }
+        };
+
+        Ok(processed_entries)
+    }
+}