// yt - A fully featured command line YouTube client // // Copyright (C) 2025 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use std::io::{Write, stderr}; use anyhow::{Context, Result}; use blake3::Hash; use futures::{ StreamExt, TryStreamExt, stream::{self}, }; use log::{Level, debug, error, log_enabled}; use serde_json::json; use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get}; use crate::{ ansi_escape_codes::{clear_whole_line, move_to_col}, app::App, storage::subscriptions::Subscription, }; use super::process_subscription; pub(super) struct Updater<'a> { max_backlog: usize, hashes: &'a [Hash], } impl<'a> Updater<'a> { pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self { Self { max_backlog, hashes, } } pub(super) async fn update( &mut self, app: &App, subscriptions: &[&Subscription], ) -> Result<()> { let mut stream = stream::iter(subscriptions) .map(|sub| self.get_new_entries(sub)) .buffer_unordered(100); while let Some(output) = stream.next().await { let mut entries = output?; if entries.is_empty() { continue; } let (sub, entry) = entries.remove(0); process_subscription(app, sub, entry).await?; let entry_stream: Result<()> = stream::iter(entries) .map(|(sub, entry)| process_subscription(app, sub, entry)) .buffer_unordered(100) .try_collect() .await; entry_stream?; } Ok(()) } async fn get_new_entries( &self, sub: &'a Subscription, ) -> Result> { let yt_dlp = YoutubeDLOptions::new() .set("playliststart", 1) .set("playlistend", self.max_backlog) .set("noplaylist", false) .set( "extractor_args", json! {{"youtubetab": {"approximate_date": [""]}}}, ) // TODO: This also removes unlisted and other stuff. Find a good way to remove the // members-only videos from the feed. <2025-04-17> .set("match-filter", "availability=public") .build()?; if !log_enabled!(Level::Debug) { clear_whole_line(); move_to_col(1); eprint!("Checking playlist {}...", sub.name); move_to_col(1); stderr().flush()?; } let info = yt_dlp .extract_info(&sub.url, false, false) .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; let empty = vec![]; let entries = info .get("entries") .map_or(&empty, |val| json_cast!(val, as_array)); let valid_entries: Vec<(&Subscription, InfoJson)> = entries .iter() .take(self.max_backlog) .filter_map(|entry| -> Option<(&Subscription, InfoJson)> { let id = json_get!(entry, "id", as_str); let extractor_hash = blake3::hash(id.as_bytes()); if self.hashes.contains(&extractor_hash) { debug!("Skipping entry, as it is already present: '{extractor_hash}'",); None } else { Some((sub, json_cast!(entry, as_object).to_owned())) } }) .collect(); let processed_entries: Vec<(&Subscription, InfoJson)> = stream::iter(valid_entries) .map( async |(sub, entry)| match yt_dlp.process_ie_result(entry, false) { Ok(output) => Ok((sub, output)), Err(err) => Err(err), }, ) .buffer_unordered(100) .collect::>() .await .into_iter() // Don't fail the whole update, if one of the entries fails to fetch. .filter_map(|base| match base { Ok(ok) => Some(ok), Err(err) => { // TODO(@bpeetz): Add this <2025-06-13> // if let YtDlpError::PythonError { error, kind } = &err { // if kind.as_str() == "" // && error.to_string().as_str().contains( // "Join this channel to get access to members-only content ", // ) // { // // Hide this error // } else { // let error_string = error.to_string(); // let error = error_string // .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") // .expect("This prefix should exists"); // error!("{error}"); // } // return None; // } // TODO(@bpeetz): Ideally, we _would_ actually exit on unexpected errors, but // this is fine for now. <2025-06-13> // Some(Err(err).context("Failed to process new entries.")) error!("While processing entry: {err}"); None } }) .collect(); Ok(processed_entries) } }