diff options
-rw-r--r-- | yt/src/main.rs | 2 | ||||
-rw-r--r-- | yt/src/update/mod.rs | 151 | ||||
-rw-r--r-- | yt/src/update/updater.rs | 162 |
3 files changed, 207 insertions, 108 deletions
diff --git a/yt/src/main.rs b/yt/src/main.rs index 4378346..5b5eda4 100644 --- a/yt/src/main.rs +++ b/yt/src/main.rs @@ -173,7 +173,7 @@ async fn main() -> Result<()> { let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog); - update::update(&app, max_backlog, subscriptions, verbosity).await?; + update::update(&app, max_backlog, subscriptions).await?; } Command::Subscriptions { cmd } => match cmd { SubscriptionCommand::Add { name, url } => { diff --git a/yt/src/update/mod.rs b/yt/src/update/mod.rs index 3cdc61b..730e7c0 100644 --- a/yt/src/update/mod.rs +++ b/yt/src/update/mod.rs @@ -8,116 +8,63 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::{collections::HashMap, process::Stdio, str::FromStr, string::ToString}; +use std::str::FromStr; use anyhow::{Context, Ok, Result}; use chrono::{DateTime, Utc}; -use log::{error, info, warn}; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::Command, -}; +use log::{info, warn}; use url::Url; use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson}; use crate::{ app::App, storage::{ - subscriptions::{get, Subscription}, + subscriptions::{self, Subscription}, video_database::{ extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video, VideoStatus, }, }, - unreachable::Unreachable, videos::display::format_video::FormatVideo, }; +mod updater; +use updater::Updater; + pub async fn update( app: &App, - max_backlog: u32, - subs_to_update: Vec<String>, - verbosity: u8, + max_backlog: usize, + subscription_names_to_update: Vec<String>, ) -> Result<()> { - let subscriptions = get(app).await?; - let mut back_subs: HashMap<Url, Subscription> = HashMap::new(); - let logging = verbosity > 0; - let log_level = match verbosity { - // 0 => 50, // logging.CRITICAL - 0 => 40, // logging.ERROR - 1 => 30, // logging.WARNING - 2 => 20, // logging.INFO - 3.. => 10, // logging.DEBUG - }; - info!("Passing log_level {} to the update script", log_level); + let subscriptions = subscriptions::get(app).await?; - let mut urls: Vec<String> = vec![]; - for (name, sub) in subscriptions.0 { - if subs_to_update.contains(&name) || subs_to_update.is_empty() { - urls.push(sub.url.to_string()); - back_subs.insert(sub.url.clone(), sub); - } else { - info!( - "Not updating subscription '{}' as it was not specified", - name - ); - } - } + let urls: Vec<_> = if subscription_names_to_update.is_empty() { + subscriptions.0.values().collect() + } else { + subscriptions + .0 + .values() + .filter(|sub| { + if subscription_names_to_update.contains(&sub.name) { + true + } else { + info!( + "Not updating subscription '{}' as it was not specified", + sub.name + ); + false + } + }) + .collect() + }; // We can get away with not having to re-fetch the hashes every time, as the returned video // should not contain duplicates. let hashes = get_all_hashes(app).await?; - let mut child = Command::new("raw_update.py") - .arg(max_backlog.to_string()) - .arg(urls.len().to_string()) - .arg(log_level.to_string()) - .args(&urls) - .args(hashes.iter().map(ToString::to_string).collect::<Vec<_>>()) - .stdout(Stdio::piped()) - .stderr(if logging { - Stdio::inherit() - } else { - Stdio::null() - }) - .stdin(Stdio::null()) - .spawn() - .context("Failed to call python3 update_raw")?; - - let mut out = BufReader::new( - child - .stdout - .take() - .unreachable("Should be able to take child stdout"), - ) - .lines(); - - while let Some(line) = out.next_line().await? { - // use tokio::{fs::File, io::AsyncWriteExt}; - // let mut output = File::create("output.json").await?; - // output.write(line.as_bytes()).await?; - // output.flush().await?; - // output.sync_all().await?; - // drop(output); - - let output_json: HashMap<Url, InfoJson> = serde_json::from_str(&line) - .unreachable("The json is generated by our own script. It should be valid"); - - for (url, value) in output_json { - let sub = back_subs.get(&url).unreachable("This was stored before"); - process_subscription(app, sub, value, &hashes) - .await - .with_context(|| format!("Failed to process subscription: '{}'", sub.name))?; - } - } - - let out = child.wait().await?; - if !out.success() { - error!( - "The update_raw.py invokation failed (exit code: {}).", - out.code() - .map_or("<No exit code>".to_owned(), |f| f.to_string()) - ); + { + let mut updater = Updater::new(max_backlog, &hashes); + updater.update(app, &urls).await?; } Ok(()) @@ -231,31 +178,21 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu Ok(video) } -async fn process_subscription( - app: &App, - sub: &Subscription, - entry: InfoJson, - hashes: &[blake3::Hash], -) -> Result<()> { +async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> { let video = video_entry_to_video(entry, Some(sub)).context("Failed to parse search entry as Video")?; - if hashes.contains(video.extractor_hash.hash()) { - // We already stored the video information - unreachable!("The python update script should have never provided us a duplicated video"); - } else { - add_video(app, video.clone()) + add_video(app, video.clone()) + .await + .with_context(|| format!("Failed to add video to database: '{}'", video.title))?; + println!( + "{}", + (&video + .to_formatted_video(app) .await - .with_context(|| format!("Failed to add video to database: '{}'", video.title))?; - println!( - "{}", - (&video - .to_formatted_video(app) - .await - .with_context(|| format!("Failed to format video: '{}'", video.title))? - .colorize(app)) - .to_line_display() - ); - Ok(()) - } + .with_context(|| format!("Failed to format video: '{}'", video.title))? + .colorize(app)) + .to_line_display() + ); + Ok(()) } diff --git a/yt/src/update/updater.rs b/yt/src/update/updater.rs new file mode 100644 index 0000000..d54def8 --- /dev/null +++ b/yt/src/update/updater.rs @@ -0,0 +1,162 @@ +use std::io::{stderr, Write}; + +use anyhow::{Context, Result}; +use blake3::Hash; +use futures::{ + stream::{self}, + StreamExt, TryStreamExt, +}; +use log::{debug, error, info, log_enabled, Level}; +use owo_colors::OwoColorize; +use serde_json::json; +use yt_dlp::{error::YtDlpError, process_ie_result, wrapper::info_json::InfoJson}; + +use crate::{app::App, storage::subscriptions::Subscription}; + +use super::process_subscription; + +pub(super) struct Updater<'a> { + max_backlog: usize, + hashes: &'a [Hash], +} + +impl<'a> Updater<'a> { + pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self { + Self { + max_backlog, + hashes, + } + } + + pub(super) async fn update( + &mut self, + app: &App, + subscriptions: &[&Subscription], + ) -> Result<()> { + let mut stream = stream::iter(subscriptions) + .map(|sub| self.get_new_entries(sub)) + .buffer_unordered(100); + + while let Some(output) = stream.next().await { + let mut entries = output?; + + if entries.is_empty() { + continue; + } + + let (sub, entry) = entries.remove(0); + process_subscription(app, sub, entry).await?; + + let entry_stream: Result<()> = stream::iter(entries) + .map(|(sub, entry)| process_subscription(app, sub, entry)) + .buffer_unordered(100) + .try_collect() + .await; + entry_stream?; + } + + Ok(()) + } + + async fn get_new_entries( + &self, + sub: &'a Subscription, + ) -> Result<Vec<(&'a Subscription, InfoJson)>> { + // ANSI ESCAPE CODES Wrappers {{{ + // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands + const CSI: &str = "\x1b["; + fn clear_whole_line() { + eprint!("{CSI}2K"); + } + fn move_to_col(x: usize) { + eprint!("{CSI}{x}G"); + } + // fn hide_cursor() { + // eprint!("{CSI}?25l"); + // } + // fn show_cursor() { + // eprint!("{CSI}?25h"); + // } + // }}} + + let json = json! { + { + "playliststart": 1, + "playlistend": self.max_backlog, + "noplaylist": false, + "extractor_args": {"youtubetab": {"approximate_date": [""]}}, + } + }; + let yt_dlp_opts = json.as_object().expect("This is hardcoded"); + + if !log_enabled!(Level::Debug) { + clear_whole_line(); + move_to_col(1); + eprint!("Checking playlist {}...", sub.name); + move_to_col(1); + stderr().flush()?; + } + + let info = yt_dlp::extract_info(yt_dlp_opts, &sub.url, false, false) + .await + .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; + + let entries = info.entries.unwrap_or(vec![]); + let valid_entries: Vec<(&Subscription, InfoJson)> = entries + .into_iter() + .take(self.max_backlog) + .filter_map(|entry| -> Option<(&Subscription, InfoJson)> { + let id = entry.id.as_ref().expect("Should exist?"); + let extractor_hash = blake3::hash(id.as_bytes()); + if self.hashes.contains(&extractor_hash) { + debug!( + "Skipping entry, as it is already present: '{}'", + extractor_hash + ); + None + } else { + Some((sub, entry)) + } + }) + .collect(); + + let processed_entries = { + let base: Result<Vec<(&Subscription, InfoJson)>, YtDlpError> = + stream::iter(valid_entries) + .map(|(sub, entry)| async move { + match process_ie_result(yt_dlp_opts, entry, false).await { + Ok(output) => Ok((sub, output)), + Err(err) => Err(err), + } + }) + .buffer_unordered(100) + .try_collect() + .await; + match base { + Ok(ok) => ok, + Err(err) => { + if let YtDlpError::PythonError { error, kind } = &err { + if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>" + && error.to_string().as_str().contains( + "Join this channel to get access to members-only content ", + ) + { + vec![] + } else { + let error_string = error.to_string(); + let error = error_string + .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") + .expect("This prefix should exists"); + error!("{error}"); + vec![] + } + } else { + Err(err).context("Failed to process new entries.")? + } + } + } + }; + + Ok(processed_entries) + } +} |