about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--yt/src/main.rs2
-rw-r--r--yt/src/update/mod.rs151
-rw-r--r--yt/src/update/updater.rs162
3 files changed, 207 insertions, 108 deletions
diff --git a/yt/src/main.rs b/yt/src/main.rs
index 4378346..5b5eda4 100644
--- a/yt/src/main.rs
+++ b/yt/src/main.rs
@@ -173,7 +173,7 @@ async fn main() -> Result<()> {
 
             let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog);
 
-            update::update(&app, max_backlog, subscriptions, verbosity).await?;
+            update::update(&app, max_backlog, subscriptions).await?;
         }
         Command::Subscriptions { cmd } => match cmd {
             SubscriptionCommand::Add { name, url } => {
diff --git a/yt/src/update/mod.rs b/yt/src/update/mod.rs
index 3cdc61b..730e7c0 100644
--- a/yt/src/update/mod.rs
+++ b/yt/src/update/mod.rs
@@ -8,116 +8,63 @@
 // You should have received a copy of the License along with this program.
 // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
 
-use std::{collections::HashMap, process::Stdio, str::FromStr, string::ToString};
+use std::str::FromStr;
 
 use anyhow::{Context, Ok, Result};
 use chrono::{DateTime, Utc};
-use log::{error, info, warn};
-use tokio::{
-    io::{AsyncBufReadExt, BufReader},
-    process::Command,
-};
+use log::{info, warn};
 use url::Url;
 use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson};
 
 use crate::{
     app::App,
     storage::{
-        subscriptions::{get, Subscription},
+        subscriptions::{self, Subscription},
         video_database::{
             extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video,
             VideoStatus,
         },
     },
-    unreachable::Unreachable,
     videos::display::format_video::FormatVideo,
 };
 
+mod updater;
+use updater::Updater;
+
 pub async fn update(
     app: &App,
-    max_backlog: u32,
-    subs_to_update: Vec<String>,
-    verbosity: u8,
+    max_backlog: usize,
+    subscription_names_to_update: Vec<String>,
 ) -> Result<()> {
-    let subscriptions = get(app).await?;
-    let mut back_subs: HashMap<Url, Subscription> = HashMap::new();
-    let logging = verbosity > 0;
-    let log_level = match verbosity {
-        // 0 => 50,   // logging.CRITICAL
-        0 => 40,   // logging.ERROR
-        1 => 30,   // logging.WARNING
-        2 => 20,   // logging.INFO
-        3.. => 10, // logging.DEBUG
-    };
-    info!("Passing log_level {} to the update script", log_level);
+    let subscriptions = subscriptions::get(app).await?;
 
-    let mut urls: Vec<String> = vec![];
-    for (name, sub) in subscriptions.0 {
-        if subs_to_update.contains(&name) || subs_to_update.is_empty() {
-            urls.push(sub.url.to_string());
-            back_subs.insert(sub.url.clone(), sub);
-        } else {
-            info!(
-                "Not updating subscription '{}' as it was not specified",
-                name
-            );
-        }
-    }
+    let urls: Vec<_> = if subscription_names_to_update.is_empty() {
+        subscriptions.0.values().collect()
+    } else {
+        subscriptions
+            .0
+            .values()
+            .filter(|sub| {
+                if subscription_names_to_update.contains(&sub.name) {
+                    true
+                } else {
+                    info!(
+                        "Not updating subscription '{}' as it was not specified",
+                        sub.name
+                    );
+                    false
+                }
+            })
+            .collect()
+    };
 
     // We can get away with not having to re-fetch the hashes every time, as the returned video
     // should not contain duplicates.
     let hashes = get_all_hashes(app).await?;
 
-    let mut child = Command::new("raw_update.py")
-        .arg(max_backlog.to_string())
-        .arg(urls.len().to_string())
-        .arg(log_level.to_string())
-        .args(&urls)
-        .args(hashes.iter().map(ToString::to_string).collect::<Vec<_>>())
-        .stdout(Stdio::piped())
-        .stderr(if logging {
-            Stdio::inherit()
-        } else {
-            Stdio::null()
-        })
-        .stdin(Stdio::null())
-        .spawn()
-        .context("Failed to call python3 update_raw")?;
-
-    let mut out = BufReader::new(
-        child
-            .stdout
-            .take()
-            .unreachable("Should be able to take child stdout"),
-    )
-    .lines();
-
-    while let Some(line) = out.next_line().await? {
-        // use tokio::{fs::File, io::AsyncWriteExt};
-        // let mut output = File::create("output.json").await?;
-        // output.write(line.as_bytes()).await?;
-        // output.flush().await?;
-        // output.sync_all().await?;
-        // drop(output);
-
-        let output_json: HashMap<Url, InfoJson> = serde_json::from_str(&line)
-            .unreachable("The json is generated by our own script. It should be valid");
-
-        for (url, value) in output_json {
-            let sub = back_subs.get(&url).unreachable("This was stored before");
-            process_subscription(app, sub, value, &hashes)
-                .await
-                .with_context(|| format!("Failed to process subscription: '{}'", sub.name))?;
-        }
-    }
-
-    let out = child.wait().await?;
-    if !out.success() {
-        error!(
-            "The update_raw.py invokation failed (exit code: {}).",
-            out.code()
-                .map_or("<No exit code>".to_owned(), |f| f.to_string())
-        );
+    {
+        let mut updater = Updater::new(max_backlog, &hashes);
+        updater.update(app, &urls).await?;
     }
 
     Ok(())
@@ -231,31 +178,21 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu
     Ok(video)
 }
 
-async fn process_subscription(
-    app: &App,
-    sub: &Subscription,
-    entry: InfoJson,
-    hashes: &[blake3::Hash],
-) -> Result<()> {
+async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> {
     let video =
         video_entry_to_video(entry, Some(sub)).context("Failed to parse search entry as Video")?;
 
-    if hashes.contains(video.extractor_hash.hash()) {
-        // We already stored the video information
-        unreachable!("The python update script should have never provided us a duplicated video");
-    } else {
-        add_video(app, video.clone())
+    add_video(app, video.clone())
+        .await
+        .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
+    println!(
+        "{}",
+        (&video
+            .to_formatted_video(app)
             .await
-            .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
-        println!(
-            "{}",
-            (&video
-                .to_formatted_video(app)
-                .await
-                .with_context(|| format!("Failed to format video: '{}'", video.title))?
-                .colorize(app))
-                .to_line_display()
-        );
-        Ok(())
-    }
+            .with_context(|| format!("Failed to format video: '{}'", video.title))?
+            .colorize(app))
+            .to_line_display()
+    );
+    Ok(())
 }
diff --git a/yt/src/update/updater.rs b/yt/src/update/updater.rs
new file mode 100644
index 0000000..d54def8
--- /dev/null
+++ b/yt/src/update/updater.rs
@@ -0,0 +1,162 @@
+use std::io::{stderr, Write};
+
+use anyhow::{Context, Result};
+use blake3::Hash;
+use futures::{
+    stream::{self},
+    StreamExt, TryStreamExt,
+};
+use log::{debug, error, info, log_enabled, Level};
+use owo_colors::OwoColorize;
+use serde_json::json;
+use yt_dlp::{error::YtDlpError, process_ie_result, wrapper::info_json::InfoJson};
+
+use crate::{app::App, storage::subscriptions::Subscription};
+
+use super::process_subscription;
+
+pub(super) struct Updater<'a> {
+    max_backlog: usize,
+    hashes: &'a [Hash],
+}
+
+impl<'a> Updater<'a> {
+    pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self {
+        Self {
+            max_backlog,
+            hashes,
+        }
+    }
+
+    pub(super) async fn update(
+        &mut self,
+        app: &App,
+        subscriptions: &[&Subscription],
+    ) -> Result<()> {
+        let mut stream = stream::iter(subscriptions)
+            .map(|sub| self.get_new_entries(sub))
+            .buffer_unordered(100);
+
+        while let Some(output) = stream.next().await {
+            let mut entries = output?;
+
+            if entries.is_empty() {
+                continue;
+            }
+
+            let (sub, entry) = entries.remove(0);
+            process_subscription(app, sub, entry).await?;
+
+            let entry_stream: Result<()> = stream::iter(entries)
+                .map(|(sub, entry)| process_subscription(app, sub, entry))
+                .buffer_unordered(100)
+                .try_collect()
+                .await;
+            entry_stream?;
+        }
+
+        Ok(())
+    }
+
+    async fn get_new_entries(
+        &self,
+        sub: &'a Subscription,
+    ) -> Result<Vec<(&'a Subscription, InfoJson)>> {
+        // ANSI ESCAPE CODES Wrappers {{{
+        // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands
+        const CSI: &str = "\x1b[";
+        fn clear_whole_line() {
+            eprint!("{CSI}2K");
+        }
+        fn move_to_col(x: usize) {
+            eprint!("{CSI}{x}G");
+        }
+        // fn hide_cursor() {
+        //     eprint!("{CSI}?25l");
+        // }
+        // fn show_cursor() {
+        //     eprint!("{CSI}?25h");
+        // }
+        // }}}
+
+        let json = json! {
+            {
+                "playliststart": 1,
+                "playlistend": self.max_backlog,
+                "noplaylist": false,
+                "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+            }
+        };
+        let yt_dlp_opts = json.as_object().expect("This is hardcoded");
+
+        if !log_enabled!(Level::Debug) {
+            clear_whole_line();
+            move_to_col(1);
+            eprint!("Checking playlist {}...", sub.name);
+            move_to_col(1);
+            stderr().flush()?;
+        }
+
+        let info = yt_dlp::extract_info(yt_dlp_opts, &sub.url, false, false)
+            .await
+            .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?;
+
+        let entries = info.entries.unwrap_or(vec![]);
+        let valid_entries: Vec<(&Subscription, InfoJson)> = entries
+            .into_iter()
+            .take(self.max_backlog)
+            .filter_map(|entry| -> Option<(&Subscription, InfoJson)> {
+                let id = entry.id.as_ref().expect("Should exist?");
+                let extractor_hash = blake3::hash(id.as_bytes());
+                if self.hashes.contains(&extractor_hash) {
+                    debug!(
+                        "Skipping entry, as it is already present: '{}'",
+                        extractor_hash
+                    );
+                    None
+                } else {
+                    Some((sub, entry))
+                }
+            })
+            .collect();
+
+        let processed_entries = {
+            let base: Result<Vec<(&Subscription, InfoJson)>, YtDlpError> =
+                stream::iter(valid_entries)
+                    .map(|(sub, entry)| async move {
+                        match process_ie_result(yt_dlp_opts, entry, false).await {
+                            Ok(output) => Ok((sub, output)),
+                            Err(err) => Err(err),
+                        }
+                    })
+                    .buffer_unordered(100)
+                    .try_collect()
+                    .await;
+            match base {
+                Ok(ok) => ok,
+                Err(err) => {
+                    if let YtDlpError::PythonError { error, kind } = &err {
+                        if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>"
+                            && error.to_string().as_str().contains(
+                                "Join this channel to get access to members-only content ",
+                            )
+                        {
+                            vec![]
+                        } else {
+                            let error_string = error.to_string();
+                            let error = error_string
+                                .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ")
+                                .expect("This prefix should exists");
+                            error!("{error}");
+                            vec![]
+                        }
+                    } else {
+                        Err(err).context("Failed to process new entries.")?
+                    }
+                }
+            }
+        };
+
+        Ok(processed_entries)
+    }
+}