// yt - A fully featured command line YouTube client // // Copyright (C) 2025 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use std::{ io::{Write, stderr}, sync::atomic::{AtomicUsize, Ordering}, }; use anyhow::{Context, Result}; use blake3::Hash; use futures::{StreamExt, future::join_all, stream}; use log::{Level, debug, error, log_enabled}; use serde_json::json; use tokio_util::task::LocalPoolHandle; use yt_dlp::{InfoJson, YoutubeDLOptions, json_cast, json_get, process_ie_result}; use crate::{ ansi_escape_codes::{clear_whole_line, move_to_col}, app::App, storage::subscriptions::Subscription, }; use super::process_subscription; pub(super) struct Updater { max_backlog: usize, hashes: Vec, pool: LocalPoolHandle, } static REACHED_NUMBER: AtomicUsize = const { AtomicUsize::new(1) }; impl Updater { pub(super) fn new(max_backlog: usize, hashes: Vec) -> Self { // TODO(@bpeetz): The number should not be hardcoded. <2025-06-14> let pool = LocalPoolHandle::new(16); Self { max_backlog, hashes, pool, } } pub(super) async fn update( self, app: &App, subscriptions: Vec, total_number: Option, current_progress: Option, ) -> Result<()> { let total_number = total_number.unwrap_or(subscriptions.len()); if let Some(current_progress) = current_progress { REACHED_NUMBER.store(current_progress, Ordering::Relaxed); } let mut stream = stream::iter(subscriptions) .map(|sub| self.get_new_entries(sub, total_number)) .buffer_unordered(16 * 4); while let Some(output) = stream.next().await { let mut entries = output?; if let Some(next) = entries.next() { let (sub, entry) = next; process_subscription(app, sub, entry).await?; join_all(entries.map(|(sub, entry)| process_subscription(app, sub, entry))) .await .into_iter() .collect::>()?; } } Ok(()) } async fn get_new_entries( &self, sub: Subscription, total_number: usize, ) -> Result> { let max_backlog = self.max_backlog; let hashes = self.hashes.clone(); let yt_dlp = YoutubeDLOptions::new() .set("playliststart", 1) .set("playlistend", max_backlog) .set("noplaylist", false) .set( "extractor_args", json! {{"youtubetab": {"approximate_date": [""]}}}, ) // TODO: This also removes unlisted and other stuff. Find a good way to remove the // members-only videos from the feed. <2025-04-17> .set("match-filter", "availability=public") .build()?; self.pool .spawn_pinned(move || { async move { if !log_enabled!(Level::Debug) { clear_whole_line(); move_to_col(1); eprint!( "({}/{total_number}) Checking playlist {}...", REACHED_NUMBER.fetch_add(1, Ordering::Relaxed), sub.name ); move_to_col(1); stderr().flush()?; } let info = yt_dlp .extract_info(&sub.url, false, false) .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; let empty = vec![]; let entries = info .get("entries") .map_or(&empty, |val| json_cast!(val, as_array)); let valid_entries: Vec<(Subscription, InfoJson)> = entries .iter() .take(max_backlog) .filter_map(|entry| -> Option<(Subscription, InfoJson)> { let id = json_get!(entry, "id", as_str); let extractor_hash = blake3::hash(id.as_bytes()); if hashes.contains(&extractor_hash) { debug!( "Skipping entry, as it is already present: '{extractor_hash}'", ); None } else { Some((sub.clone(), json_cast!(entry, as_object).to_owned())) } }) .collect(); Ok(valid_entries .into_iter() .map(|(sub, entry)| { let inner_yt_dlp = YoutubeDLOptions::new() .set("noplaylist", true) .build() .expect("Worked before, should work now"); match inner_yt_dlp.process_ie_result(entry, false) { Ok(output) => Ok((sub, output)), Err(err) => Err(err), } }) // Don't fail the whole update, if one of the entries fails to fetch. .filter_map(|base| match base { Ok(ok) => Some(ok), Err(err) => { let process_ie_result::Error::Python(err) = &err; if err.contains( "Join this channel to get access to members-only content ", ) { // Hide this error } else { // Show the error, but don't fail. let error = err .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") .unwrap_or(err); error!("{error}"); } None } })) } }) .await? } }