diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-18 18:01:29 +0200 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-18 18:01:29 +0200 |
commit | c4524db090d2d31af8bc3e7ec64c1ea9f5ec72aa (patch) | |
tree | f80cefb7b315155e4cca0bb4b78e5e6cd5418ab5 /crates/yt/src/commands | |
parent | test(crates/yt): Add basic integration tests (diff) | |
download | yt-c4524db090d2d31af8bc3e7ec64c1ea9f5ec72aa.zip |
feat(crates/yt): Separate all commands from their implementation code
This also comes with a re-worked and tested implementation of the comments rendering code.
Diffstat (limited to 'crates/yt/src/commands')
35 files changed, 3327 insertions, 0 deletions
diff --git a/crates/yt/src/commands/comments/implm/mod.rs b/crates/yt/src/commands/comments/implm/mod.rs new file mode 100644 index 0000000..1c02718 --- /dev/null +++ b/crates/yt/src/commands/comments/implm/mod.rs @@ -0,0 +1,15 @@ +use crate::{ + app::App, commands::comments::CommentsCommand, output::display_less, storage::db::video::Video, +}; + +use anyhow::Result; + +impl CommentsCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + let comments = Video::get_current_comments(app).await?; + + display_less(comments.render(app.config.global.display_colors))?; + + Ok(()) + } +} diff --git a/crates/yt/src/commands/comments/mod.rs b/crates/yt/src/commands/comments/mod.rs new file mode 100644 index 0000000..d87c75d --- /dev/null +++ b/crates/yt/src/commands/comments/mod.rs @@ -0,0 +1,6 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct CommentsCommand {} diff --git a/crates/yt/src/commands/config/implm.rs b/crates/yt/src/commands/config/implm.rs new file mode 100644 index 0000000..409ef43 --- /dev/null +++ b/crates/yt/src/commands/config/implm.rs @@ -0,0 +1,13 @@ +use crate::{app::App, commands::config::ConfigCommand}; + +use anyhow::Result; + +impl ConfigCommand { + pub(crate) fn implm(self, app: &App) -> Result<()> { + let config_str = toml::to_string(&app.config)?; + + print!("{config_str}"); + + Ok(()) + } +} diff --git a/crates/yt/src/commands/config/mod.rs b/crates/yt/src/commands/config/mod.rs new file mode 100644 index 0000000..9ec289b --- /dev/null +++ b/crates/yt/src/commands/config/mod.rs @@ -0,0 +1,6 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct ConfigCommand {} diff --git a/crates/yt/src/commands/description/implm.rs b/crates/yt/src/commands/description/implm.rs new file mode 100644 index 0000000..7c39b1c --- /dev/null +++ b/crates/yt/src/commands/description/implm.rs @@ -0,0 +1,16 @@ +use crate::{ + app::App, commands::description::DescriptionCommand, output::display_fmt_and_less, + storage::db::video::Video, +}; + +use anyhow::Result; + +impl DescriptionCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + let description = Video::get_current_description(app).await?; + + display_fmt_and_less(&description)?; + + Ok(()) + } +} diff --git a/crates/yt/src/commands/description/mod.rs b/crates/yt/src/commands/description/mod.rs new file mode 100644 index 0000000..b5b2a10 --- /dev/null +++ b/crates/yt/src/commands/description/mod.rs @@ -0,0 +1,6 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct DescriptionCommand {} diff --git a/crates/yt/src/commands/download/implm/download/download_options.rs b/crates/yt/src/commands/download/implm/download/download_options.rs new file mode 100644 index 0000000..15fed7e --- /dev/null +++ b/crates/yt/src/commands/download/implm/download/download_options.rs @@ -0,0 +1,121 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use anyhow::Context; +use serde_json::{Value, json}; +use yt_dlp::{YoutubeDL, options::YoutubeDLOptions}; + +use crate::app::App; + +use super::progress_hook::wrapped_progress_hook; + +pub(crate) fn download_opts( + app: &App, + subtitle_langs: Option<&String>, +) -> anyhow::Result<YoutubeDL> { + YoutubeDLOptions::new() + .with_progress_hook(wrapped_progress_hook) + .set("extract_flat", "in_playlist") + .set( + "extractor_args", + json! { + { + "youtube": { + "comment_sort": [ "top" ], + "max_comments": [ "150", "all", "100" ] + } + } + }, + ) + //.set("cookiesfrombrowser", json! {("firefox", "me.google", None::<String>, "youtube_dlp")}) + .set("prefer_free_formats", true) + .set("ffmpeg_location", env!("FFMPEG_LOCATION")) + .set("format", "bestvideo[height<=?1080]+bestaudio/best") + .set("fragment_retries", 10) + .set("getcomments", true) + .set("ignoreerrors", false) + .set("retries", 10) + .set("writeinfojson", true) + // NOTE: This results in a constant warning message. <2025-01-04> + //.set("writeannotations", true) + .set("writesubtitles", true) + .set("writeautomaticsub", true) + .set( + "outtmpl", + json! { + { + "default": app.config.paths.download_dir.join("%(channel)s/%(title)s.%(ext)s"), + "chapter": "%(title)s - %(section_number)03d %(section_title)s [%(id)s].%(ext)s" + } + }, + ) + .set("compat_opts", json! {{}}) + .set("forceprint", json! {{}}) + .set("print_to_file", json! {{}}) + .set("windowsfilenames", false) + .set("restrictfilenames", false) + .set("trim_file_names", false) + .set( + "postprocessors", + json! { + [ + { + "api": "https://sponsor.ajay.app", + "categories": [ + "interaction", + "intro", + "music_offtopic", + "sponsor", + "outro", + "poi_highlight", + "preview", + "selfpromo", + "filler", + "chapter" + ], + "key": "SponsorBlock", + "when": "after_filter" + }, + { + "force_keyframes": false, + "key": "ModifyChapters", + "remove_chapters_patterns": [], + "remove_ranges": [], + "remove_sponsor_segments": [ "sponsor" ], + "sponsorblock_chapter_title": "[SponsorBlock]: %(category_names)l" + }, + { + "add_chapters": true, + "add_infojson": null, + "add_metadata": false, + "key": "FFmpegMetadata" + }, + { + "key": "FFmpegConcat", + "only_multi_video": true, + "when": "playlist" + } + ] + }, + ) + .set( + "subtitleslangs", + Value::Array( + subtitle_langs + .map_or("", String::as_str) + .split(',') + .map(|val| Value::String(val.to_owned())) + .collect::<Vec<_>>(), + ), + ) + .build() + .context("Failed to instanciate download yt_dlp") +} diff --git a/crates/yt/src/commands/download/implm/download/mod.rs b/crates/yt/src/commands/download/implm/download/mod.rs new file mode 100644 index 0000000..f0d5f67 --- /dev/null +++ b/crates/yt/src/commands/download/implm/download/mod.rs @@ -0,0 +1,293 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; + +use crate::{ + app::App, + commands::download::implm::download::download_options::download_opts, + shared::bytes::Bytes, + storage::{ + db::{extractor_hash::ExtractorHash, insert::Operations, video::Video}, + notify::{wait_for_cache_reduction, wait_for_db_write}, + }, + yt_dlp::get_current_cache_allocation, +}; + +use anyhow::{Context, Result, bail}; +use log::{debug, error, info, warn}; +use tokio::{select, task::JoinHandle, time}; +use yt_dlp::YoutubeDL; + +#[allow(clippy::module_name_repetitions)] +pub(crate) mod download_options; +pub(crate) mod progress_hook; + +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub(crate) struct CurrentDownload { + task_handle: JoinHandle<Result<(PathBuf, Video)>>, + yt_dlp: Arc<YoutubeDL>, + extractor_hash: ExtractorHash, +} + +impl CurrentDownload { + fn new_from_video(app: &App, video: Video) -> Result<Self> { + let extractor_hash = video.extractor_hash; + + debug!("Download started: {}", &video.title); + let yt_dlp = Arc::new(download_opts(app, video.subtitle_langs.as_ref())?); + + let local_yt_dlp = Arc::clone(&yt_dlp); + + let task_handle = tokio::task::spawn_blocking(move || { + let mut result = local_yt_dlp + .download(&[video.url.clone()]) + .with_context(|| format!("Failed to download video: '{}'", video.title))?; + + assert_eq!(result.len(), 1); + Ok((result.remove(0), video)) + }); + + Ok(Self { + task_handle, + yt_dlp, + extractor_hash, + }) + } + + fn abort(self) -> Result<()> { + debug!("Cancelling download."); + self.yt_dlp.close()?; + + Ok(()) + } + + fn is_finished(&self) -> bool { + self.task_handle.is_finished() + } + + async fn finalize(self, app: &App) -> Result<()> { + let (result, mut video) = self.task_handle.await??; + + let mut ops = Operations::new("Downloader: Set download path"); + video.set_download_path(&result, &mut ops); + ops.commit(app) + .await + .with_context(|| format!("Failed to committ download of video: '{}'", video.title))?; + + info!( + "Video '{}' was downlaoded to path: {}", + video.title, + result.display() + ); + + Ok(()) + } +} + +enum CacheSizeCheck { + /// The video can be downloaded + Fits, + + /// The video and the current cache size together would exceed the size + TooLarge, + + /// The video would not even fit into the empty cache + ExceedsMaxCacheSize, +} + +#[derive(Debug)] +pub(crate) struct Downloader { + current_download: Option<CurrentDownload>, + video_size_cache: HashMap<ExtractorHash, u64>, + printed_warning: bool, + cached_cache_allocation: Option<Bytes>, +} + +impl Default for Downloader { + fn default() -> Self { + Self::new() + } +} + +impl Downloader { + #[must_use] + pub(crate) fn new() -> Self { + Self { + current_download: None, + video_size_cache: HashMap::new(), + printed_warning: false, + cached_cache_allocation: None, + } + } + + /// Check if enough cache is available. + /// + /// Will wait for the next cache deletion if not. + async fn is_enough_cache_available( + &mut self, + app: &App, + max_cache_size: u64, + next_video: &Video, + ) -> Result<CacheSizeCheck> { + if let Some(cdownload) = &self.current_download { + if cdownload.extractor_hash == next_video.extractor_hash { + // If the video is already being downloaded it will always fit. Otherwise the + // download would not have been started. + return Ok(CacheSizeCheck::Fits); + } + } + let cache_allocation = get_current_cache_allocation(app).await?; + let video_size = self.get_approx_video_size(next_video)?; + + if video_size >= max_cache_size { + error!( + "The video '{}' ({}) exceeds the maximum cache size ({})! \ + Please set a bigger maximum (`--max-cache-size`) or skip it.", + next_video.title, + Bytes::new(video_size), + Bytes::new(max_cache_size) + ); + + return Ok(CacheSizeCheck::ExceedsMaxCacheSize); + } + + if cache_allocation.as_u64() + video_size >= max_cache_size { + if !self.printed_warning { + warn!( + "Can't download video: '{}' ({}) as it's too large for the cache ({} of {} allocated). \ + Waiting for cache size reduction..", + next_video.title, + Bytes::new(video_size), + &cache_allocation, + Bytes::new(max_cache_size) + ); + self.printed_warning = true; + + // Update this value immediately. + // This avoids printing the "Current cache size has changed .." warning below. + self.cached_cache_allocation = Some(cache_allocation); + } + + if let Some(cca) = self.cached_cache_allocation { + if cca != cache_allocation { + // Only print the warning if the display string has actually changed. + // Otherwise, we might confuse the user + if cca.to_string() != cache_allocation.to_string() { + warn!("Current cache size has changed, it's now: '{cache_allocation}'"); + } + debug!( + "Cache size has changed: {} -> {}", + cca.as_u64(), + cache_allocation.as_u64() + ); + self.cached_cache_allocation = Some(cache_allocation); + } + } else { + unreachable!( + "The `printed_warning` should be false in this case, \ + and thus should have already set the `cached_cache_allocation`." + ); + } + + // Wait and hope, that a large video is deleted from the cache. + wait_for_cache_reduction(app).await?; + Ok(CacheSizeCheck::TooLarge) + } else { + self.printed_warning = false; + Ok(CacheSizeCheck::Fits) + } + } + + /// The entry point to the Downloader. + /// This Downloader will periodically check if the database has changed, and then also + /// change which videos it downloads. + /// This will run, until the database doesn't contain any watchable videos + pub(crate) async fn consume(&mut self, app: Arc<App>, max_cache_size: u64) -> Result<()> { + while let Some(next_video) = Video::next_to_download(&app).await? { + match self + .is_enough_cache_available(&app, max_cache_size, &next_video) + .await? + { + CacheSizeCheck::Fits => (), + CacheSizeCheck::TooLarge => continue, + CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."), + } + + if self.current_download.is_some() { + let current_download = self.current_download.take().expect("It is `Some`."); + + if current_download.is_finished() { + // The download is done, finalize it and leave it removed. + current_download.finalize(&app).await?; + continue; + } + + if next_video.extractor_hash == current_download.extractor_hash { + // We still want to download the same video. + // reset the taken value + self.current_download = Some(current_download); + } else { + info!( + "Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!", + next_video.extractor_hash.as_short_hash(&app).await?, + current_download + .extractor_hash + .as_short_hash(&app) + .await? + ); + + // Replace the currently downloading video + current_download + .abort() + .context("Failed to abort last download")?; + + let new_current_download = CurrentDownload::new_from_video(&app, next_video)?; + + self.current_download = Some(new_current_download); + } + } else { + info!( + "No video is being downloaded right now, setting it to '{}'", + next_video.title + ); + let new_current_download = CurrentDownload::new_from_video(&app, next_video)?; + self.current_download = Some(new_current_download); + } + + // We have to continuously check, if the current download is done. + // As such we simply wait or recheck on the next write to the db. + select! { + () = time::sleep(Duration::from_secs(1)) => (), + Ok(()) = wait_for_db_write(&app) => (), + } + } + + info!("Finished downloading!"); + Ok(()) + } + + fn get_approx_video_size(&mut self, video: &Video) -> Result<u64> { + if let Some(value) = self.video_size_cache.get(&video.extractor_hash) { + Ok(*value) + } else { + let size = video.get_approx_size()?; + + assert_eq!( + self.video_size_cache.insert(video.extractor_hash, size), + None + ); + + Ok(size) + } + } +} diff --git a/crates/yt/src/commands/download/implm/download/progress_hook.rs b/crates/yt/src/commands/download/implm/download/progress_hook.rs new file mode 100644 index 0000000..19fe122 --- /dev/null +++ b/crates/yt/src/commands/download/implm/download/progress_hook.rs @@ -0,0 +1,175 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use std::{ + io::{Write, stderr}, + process, + sync::atomic::Ordering, +}; + +use colors::{Colorize, IntoCanvas}; +use log::{Level, log_enabled}; +use yt_dlp::{json_cast, json_get, wrap_progress_hook}; + +use crate::{ + ansi_escape_codes::{clear_whole_line, move_to_col}, + config::SHOULD_DISPLAY_COLOR, + select::duration::MaybeDuration, + shared::bytes::Bytes, +}; + +macro_rules! json_get_default { + ($value:expr, $name:literal, $convert:ident, $default:expr) => { + $value.get($name).map_or($default, |v| { + if v == &serde_json::Value::Null { + $default + } else { + json_cast!(@log_key $name, v, $convert) + } + }) + }; +} + +fn format_bytes(bytes: u64) -> String { + let bytes = Bytes::new(bytes); + bytes.to_string() +} + +fn format_speed(speed: f64) -> String { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let bytes = Bytes::new(speed.floor() as u64); + format!("{bytes}/s") +} + +/// # Panics +/// If expectations fail. +#[allow(clippy::needless_pass_by_value)] +pub(crate) fn progress_hook( + input: serde_json::Map<String, serde_json::Value>, +) -> Result<(), std::io::Error> { + // Only add the handler, if the log-level is higher than Debug (this avoids covering debug + // messages). + if log_enabled!(Level::Debug) { + return Ok(()); + } + + let info_dict = json_get!(input, "info_dict", as_object); + + let get_title = || -> String { + match json_get!(info_dict, "ext", as_str) { + "vtt" => { + format!( + "Subtitles ({})", + json_get_default!(info_dict, "name", as_str, "<No Subtitle Language>") + ) + } + "webm" | "mp4" | "mp3" | "m4a" => { + json_get_default!(info_dict, "title", as_str, "<No title>").to_owned() + } + other => panic!("The extension '{other}' is not yet implemented"), + } + }; + + match json_get!(input, "status", as_str) { + "downloading" => { + let elapsed = json_get_default!(input, "elapsed", as_f64, 0.0); + let eta = json_get_default!(input, "eta", as_f64, 0.0); + let speed = json_get_default!(input, "speed", as_f64, 0.0); + + let downloaded_bytes = json_get!(input, "downloaded_bytes", as_u64); + let (total_bytes, bytes_is_estimate): (u64, &'static str) = { + let total_bytes = json_get_default!(input, "total_bytes", as_u64, 0); + + if total_bytes == 0 { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let maybe_estimate = + json_get_default!(input, "total_bytes_estimate", as_f64, 0.0) as u64; + + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + if maybe_estimate == 0 { + // The download speed should be in bytes + // per second and the eta in seconds. + // Thus multiplying them gets us the raw bytes + // (which were estimated by `yt_dlp`, from their `info.json`) + let bytes_still_needed = (speed * eta).ceil() as u64; + + (downloaded_bytes + bytes_still_needed, "~") + } else { + (maybe_estimate, "~") + } + } else { + (total_bytes, "") + } + }; + + let percent: f64 = { + if total_bytes == 0 { + 100.0 + } else { + #[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + clippy::cast_precision_loss + )] + { + (downloaded_bytes as f64 / total_bytes as f64) * 100.0 + } + } + }; + + clear_whole_line(); + move_to_col(1); + + let should_use_color = SHOULD_DISPLAY_COLOR.load(Ordering::Relaxed); + + eprint!( + "{} [{}/{} at {}] -> [{} of {}{} {}] ", + get_title().bold().blue().render(should_use_color), + MaybeDuration::from_secs_f64(elapsed) + .bold() + .yellow() + .render(should_use_color), + MaybeDuration::from_secs_f64(eta) + .bold() + .yellow() + .render(should_use_color), + format_speed(speed).bold().green().render(should_use_color), + format_bytes(downloaded_bytes) + .bold() + .red() + .render(should_use_color), + bytes_is_estimate.bold().red().render(should_use_color), + format_bytes(total_bytes) + .bold() + .red() + .render(should_use_color), + format!("{percent:.02}%") + .bold() + .cyan() + .render(should_use_color), + ); + stderr().flush()?; + } + "finished" => { + eprintln!("-> Finished downloading."); + } + "error" => { + // TODO: This should probably return an Err. But I'm not so sure where the error would + // bubble up to (i.e., who would catch it) <2025-01-21> + eprintln!("-> Error while downloading: {}", get_title()); + process::exit(1); + } + other => unreachable!("'{other}' should not be a valid state!"), + } + + Ok(()) +} + +wrap_progress_hook!(progress_hook, wrapped_progress_hook); diff --git a/crates/yt/src/commands/download/implm/mod.rs b/crates/yt/src/commands/download/implm/mod.rs new file mode 100644 index 0000000..e8867cf --- /dev/null +++ b/crates/yt/src/commands/download/implm/mod.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use crate::{ + app::App, + commands::download::DownloadCommand, + shared::bytes::Bytes, + storage::db::{ + insert::{Operations, maintenance::clear_stale_downloaded_paths}, + video::{Video, VideoStatusMarker}, + }, +}; + +use anyhow::Result; +use log::info; + +mod download; + +impl DownloadCommand { + pub(crate) async fn implm(self, app: Arc<App>) -> Result<()> { + let DownloadCommand { + force, + max_cache_size, + } = self; + + let max_cache_size = max_cache_size.unwrap_or(app.config.download.max_cache_size.as_u64()); + info!("Max cache size: '{}'", Bytes::new(max_cache_size)); + + clear_stale_downloaded_paths(&app).await?; + if force { + let mut all = Video::in_states(&app, &[VideoStatusMarker::Cached]).await?; + + let mut ops = Operations::new("Download: Clear old download paths due to `--force`"); + for a in &mut all { + a.remove_download_path(&mut ops); + } + ops.commit(&app).await?; + } + + download::Downloader::new() + .consume(app, max_cache_size) + .await?; + + Ok(()) + } +} diff --git a/crates/yt/src/commands/download/mod.rs b/crates/yt/src/commands/download/mod.rs new file mode 100644 index 0000000..48c6ee4 --- /dev/null +++ b/crates/yt/src/commands/download/mod.rs @@ -0,0 +1,24 @@ +use anyhow::Context; +use clap::Parser; + +use crate::shared::bytes::Bytes; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct DownloadCommand { + /// Forcefully re-download all cached videos (i.e. delete all already downloaded paths, then download). + #[arg(short, long)] + force: bool, + + /// The maximum size the download dir should have. + #[arg(short, long, value_parser = byte_parser)] + max_cache_size: Option<u64>, +} + +fn byte_parser(input: &str) -> Result<u64, anyhow::Error> { + Ok(input + .parse::<Bytes>() + .with_context(|| format!("Failed to parse '{input}' as bytes!"))? + .as_u64()) +} diff --git a/crates/yt/src/commands/mod.rs b/crates/yt/src/commands/mod.rs new file mode 100644 index 0000000..a6aa2af --- /dev/null +++ b/crates/yt/src/commands/mod.rs @@ -0,0 +1,153 @@ +use std::{ffi::OsStr, thread}; + +use clap::Subcommand; +use clap_complete::CompletionCandidate; +use tokio::runtime::Runtime; + +use crate::{ + app::App, + commands::{ + comments::CommentsCommand, config::ConfigCommand, database::DatabaseCommand, + description::DescriptionCommand, download::DownloadCommand, playlist::PlaylistCommand, + select::SelectCommand, status::StatusCommand, subscriptions::SubscriptionCommand, + update::UpdateCommand, videos::VideosCommand, watch::WatchCommand, + }, + config::Config, + storage::db::subscription::Subscriptions, +}; + +pub(crate) mod implm; + +mod comments; +mod config; +mod database; +mod description; +mod download; +mod playlist; +mod select; +mod status; +mod subscriptions; +mod update; +mod videos; +mod watch; + +#[derive(Subcommand, Debug)] +pub(crate) enum Command { + /// Display the comments of the currently playing video. + Comments { + #[command(flatten)] + cmd: CommentsCommand, + }, + + /// Show, the configuration options in effect. + Config { + #[command(flatten)] + cmd: ConfigCommand, + }, + + /// Interact with the video database. + #[command(visible_alias = "db")] + Database { + #[command(subcommand)] + cmd: DatabaseCommand, + }, + + /// Display the description of the currently playing video + Description { + #[command(flatten)] + cmd: DescriptionCommand, + }, + + /// Download and cache URLs + Download { + #[command(flatten)] + cmd: DownloadCommand, + }, + + /// Visualize the current playlist + Playlist { + #[command(flatten)] + cmd: PlaylistCommand, + }, + + /// Change the state of videos in the database (the default) + Select { + #[command(subcommand)] + cmd: Option<SelectCommand>, + }, + + /// Show, which videos have been selected to be watched (and their cache status) + Status { + #[command(flatten)] + cmd: StatusCommand, + }, + + /// Manipulate subscription + #[command(visible_alias = "subs")] + Subscriptions { + #[command(subcommand)] + cmd: SubscriptionCommand, + }, + + /// Update the video database + Update { + #[command(flatten)] + cmd: UpdateCommand, + }, + + /// Work with single videos + Videos { + #[command(subcommand)] + cmd: VideosCommand, + }, + + /// Watch the already cached (and selected) videos + Watch { + #[command(flatten)] + cmd: WatchCommand, + }, +} + +impl Default for Command { + fn default() -> Self { + Self::Select { + cmd: Some(SelectCommand::default()), + } + } +} + +fn complete_subscription(current: &OsStr) -> Vec<CompletionCandidate> { + let mut output = vec![]; + + let Some(current_prog) = current.to_str().map(ToOwned::to_owned) else { + return output; + }; + + let Ok(config) = Config::from_config_file(None, None, None) else { + return output; + }; + + let handle = thread::spawn(move || { + let Ok(rt) = Runtime::new() else { + return output; + }; + + let Ok(app) = rt.block_on(App::new(config, false)) else { + return output; + }; + + let Ok(all) = rt.block_on(Subscriptions::get(&app)) else { + return output; + }; + + for sub in all.0.into_keys() { + if sub.starts_with(¤t_prog) { + output.push(CompletionCandidate::new(sub)); + } + } + + output + }); + + handle.join().unwrap_or_default() +} diff --git a/crates/yt/src/commands/playlist/implm.rs b/crates/yt/src/commands/playlist/implm.rs new file mode 100644 index 0000000..98a8e64 --- /dev/null +++ b/crates/yt/src/commands/playlist/implm.rs @@ -0,0 +1,105 @@ +use std::{fmt::Write, path::Path}; + +use crate::{ + ansi_escape_codes, + app::App, + commands::playlist::PlaylistCommand, + storage::{ + db::{ + playlist::Playlist, + video::{Video, VideoStatus}, + }, + notify::wait_for_db_write, + }, + videos::RenderWithApp, +}; + +use anyhow::Result; +use futures::{TryStreamExt, stream::FuturesOrdered}; + +impl PlaylistCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + let PlaylistCommand { watch } = self; + + let mut previous_output_length = 0; + loop { + let playlist = Playlist::create(app).await?.videos; + + let output = playlist + .into_iter() + .map(|video| async move { + let mut output = String::new(); + + let (_, is_focused) = cache_values(&video); + + if is_focused { + output.push_str("🔻 "); + } else { + output.push_str(" "); + } + + output.push_str(&video.title_fmt().to_string(app)); + + output.push_str(" ("); + output.push_str(&video.parent_subscription_name_fmt().to_string(app)); + output.push(')'); + + output.push_str(" ["); + output.push_str(&video.duration_fmt().to_string(app)); + + if is_focused { + output.push_str(" ("); + if let Some(duration) = video.duration.as_secs() { + let watch_progress: f64 = f64::from( + u32::try_from(video.watch_progress.as_secs()).expect("No overflow"), + ); + let duration = f64::from(u32::try_from(duration).expect("No overflow")); + + write!(output, "{:0.0}%", (watch_progress / duration) * 100.0)?; + } else { + write!(output, "{}", video.watch_progress_fmt().to_string(app))?; + } + + output.push(')'); + } + output.push(']'); + + output.push('\n'); + + Ok::<String, anyhow::Error>(output) + }) + .collect::<FuturesOrdered<_>>() + .try_collect::<String>() + .await?; + + // Delete the previous output + ansi_escape_codes::cursor_up(previous_output_length); + ansi_escape_codes::erase_from_cursor_to_bottom(); + + previous_output_length = output.chars().filter(|ch| *ch == '\n').count(); + + print!("{output}"); + + if !watch { + break; + } + + wait_for_db_write(app).await?; + } + + Ok(()) + } +} + +/// Extract the values of the [`VideoStatus::Cached`] value from a Video. +fn cache_values(video: &Video) -> (&Path, bool) { + if let VideoStatus::Cached { + cache_path, + is_focused, + } = &video.status + { + (cache_path, *is_focused) + } else { + unreachable!("All of these videos should be cached"); + } +} diff --git a/crates/yt/src/commands/playlist/mod.rs b/crates/yt/src/commands/playlist/mod.rs new file mode 100644 index 0000000..8290b3e --- /dev/null +++ b/crates/yt/src/commands/playlist/mod.rs @@ -0,0 +1,10 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct PlaylistCommand { + /// Linger and display changes + #[arg(short, long)] + watch: bool, +} diff --git a/crates/yt/src/commands/select/implm/fs_generators/help.str b/crates/yt/src/commands/select/implm/fs_generators/help.str new file mode 100644 index 0000000..e3cc347 --- /dev/null +++ b/crates/yt/src/commands/select/implm/fs_generators/help.str @@ -0,0 +1,12 @@ +# Commands: +# w, watch [-p,-s,-l] Mark the video given by the hash to be watched +# wd, watched [-p,-s,-l] Mark the video given by the hash as already watched +# d, drop [-p,-s,-l] Mark the video given by the hash to be dropped +# u, url [-p,-s,-l] Open the video URL in Firefox's `timesinks.youtube` profile +# p, pick [-p,-s,-l] Reset the videos status to 'Pick' +# a, add URL Add a video, defined by the URL +# +# See `yt select <cmd_name> --help` for more help. +# +# These lines can be re-ordered; they are executed from top to bottom. +# vim: filetype=yts conceallevel=2 concealcursor=nc colorcolumn= nowrap diff --git a/crates/yt/src/commands/select/implm/fs_generators/help.str.license b/crates/yt/src/commands/select/implm/fs_generators/help.str.license new file mode 100644 index 0000000..a0e196c --- /dev/null +++ b/crates/yt/src/commands/select/implm/fs_generators/help.str.license @@ -0,0 +1,10 @@ +yt - A fully featured command line YouTube client + +Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> +Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +SPDX-License-Identifier: GPL-3.0-or-later + +This file is part of Yt. + +You should have received a copy of the License along with this program. +If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. diff --git a/crates/yt/src/commands/select/implm/fs_generators/mod.rs b/crates/yt/src/commands/select/implm/fs_generators/mod.rs new file mode 100644 index 0000000..8ccda3c --- /dev/null +++ b/crates/yt/src/commands/select/implm/fs_generators/mod.rs @@ -0,0 +1,345 @@ +use std::{ + collections::HashMap, + env, + fs::{self, File, OpenOptions}, + io::{BufRead, BufReader, BufWriter, Read, Write}, + iter, + os::fd::{AsFd, AsRawFd}, + path::Path, +}; + +use crate::{ + app::App, + cli::CliArgs, + commands::{ + Command, + select::{ + SelectCommand, SelectSplitSortKey, SelectSplitSortMode, + implm::standalone::{self, handle_select_cmd}, + }, + }, + storage::db::{ + extractor_hash::ExtractorHash, + insert::Operations, + video::{Video, VideoStatusMarker}, + }, +}; + +use anyhow::{Context, Result, bail}; +use clap::Parser; +use futures::{TryStreamExt, stream::FuturesOrdered}; +use log::info; +use shlex::Shlex; +use tokio::process; + +const HELP_STR: &str = include_str!("./help.str"); + +pub(crate) async fn select_split( + app: &App, + done: bool, + sort_key: SelectSplitSortKey, + sort_mode: SelectSplitSortMode, +) -> Result<()> { + let temp_dir = tempfile::Builder::new() + .prefix("yt_video_select-") + .rand_bytes(6) + .tempdir() + .context("Failed to get tempdir")?; + + let matching_videos = get_videos(app, done).await?; + + let mut no_author = vec![]; + let mut author_map = HashMap::new(); + for video in matching_videos { + if let Some(sub) = &video.parent_subscription_name { + if author_map.contains_key(sub) { + let vec: &mut Vec<_> = author_map + .get_mut(sub) + .expect("This key is set, we checked in the if above"); + + vec.push(video); + } else { + author_map.insert(sub.to_owned(), vec![video]); + } + } else { + no_author.push(video); + } + } + + let author_map = { + let mut temp_vec: Vec<_> = author_map.into_iter().collect(); + + match sort_key { + SelectSplitSortKey::Publisher => { + // PERFORMANCE: The clone here should not be neeed. <2025-06-15> + temp_vec.sort_by_key(|(name, _): &(String, Vec<Video>)| name.to_owned()); + } + SelectSplitSortKey::Videos => { + temp_vec.sort_by_key(|(_, videos): &(String, Vec<Video>)| videos.len()); + } + } + + match sort_mode { + SelectSplitSortMode::Asc => { + // Std's default mode is ascending. + } + SelectSplitSortMode::Desc => { + temp_vec.reverse(); + } + } + + temp_vec + }; + + for (index, (name, videos)) in author_map + .into_iter() + .chain(iter::once(( + "<No parent subscription>".to_owned(), + no_author, + ))) + .enumerate() + { + let mut file_path = temp_dir.path().join(format!("{index:02}_{name}")); + file_path.set_extension("yts"); + + let tmp_file = File::create(&file_path) + .with_context(|| format!("Falied to create file at: {}", file_path.display()))?; + + write_videos_to_file(app, &tmp_file, &videos) + .await + .with_context(|| format!("Falied to populate file at: {}", file_path.display()))?; + } + + open_editor_at(temp_dir.path()).await?; + + let mut paths = vec![]; + for maybe_entry in temp_dir + .path() + .read_dir() + .context("Failed to open temp dir for reading")? + { + let entry = maybe_entry.context("Failed to read entry in temp dir")?; + + if !entry.file_type()?.is_file() { + bail!("Found non-file entry: {}", entry.path().display()); + } + + paths.push(entry.path()); + } + + paths.sort(); + + let mut persistent_file = OpenOptions::new() + .read(false) + .write(true) + .create(true) + .truncate(true) + .open(&app.config.paths.last_selection_path) + .context("Failed to open persistent selection file")?; + + for path in paths { + let mut read_file = File::open(path)?; + + let mut buffer = vec![]; + read_file.read_to_end(&mut buffer)?; + persistent_file.write_all(&buffer)?; + } + + persistent_file.flush()?; + let persistent_file = OpenOptions::new() + .read(true) + .open(format!( + "/proc/self/fd/{}", + persistent_file.as_fd().as_raw_fd() + )) + .context("Failed to re-open persistent file")?; + + let processed = process_file(app, &persistent_file).await?; + + info!("Processed {processed} records."); + temp_dir.close().context("Failed to close the temp dir")?; + Ok(()) +} + +pub(crate) async fn select_file(app: &App, done: bool, use_last_selection: bool) -> Result<()> { + let temp_file = tempfile::Builder::new() + .prefix("yt_video_select-") + .suffix(".yts") + .rand_bytes(6) + .tempfile() + .context("Failed to get tempfile")?; + + if use_last_selection { + fs::copy(&app.config.paths.last_selection_path, &temp_file)?; + } else { + let matching_videos = get_videos(app, done).await?; + + write_videos_to_file(app, temp_file.as_file(), &matching_videos).await?; + } + + open_editor_at(temp_file.path()).await?; + + let read_file = OpenOptions::new().read(true).open(temp_file.path())?; + fs::copy(temp_file.path(), &app.config.paths.last_selection_path) + .context("Failed to persist selection file")?; + + let processed = process_file(app, &read_file).await?; + info!("Processed {processed} records."); + + Ok(()) +} + +async fn get_videos(app: &App, include_done: bool) -> Result<Vec<Video>> { + if include_done { + Video::in_states(app, VideoStatusMarker::ALL).await + } else { + Video::in_states( + app, + &[ + VideoStatusMarker::Pick, + // + VideoStatusMarker::Watch, + VideoStatusMarker::Cached, + ], + ) + .await + } +} + +async fn write_videos_to_file(app: &App, file: &File, videos: &[Video]) -> Result<()> { + // Warm-up the cache for the display rendering of the videos. + // Otherwise the futures would all try to warm it up at the same time. + if let Some(vid) = videos.first() { + drop(vid.to_line_display(app, None).await?); + } + + let mut edit_file = BufWriter::new(file); + + videos + .iter() + .map(|vid| vid.to_select_file_display(app)) + .collect::<FuturesOrdered<_>>() + .try_collect::<Vec<String>>() + .await? + .into_iter() + .try_for_each(|line| -> Result<()> { + edit_file + .write_all(line.as_bytes()) + .context("Failed to write to `edit_file`")?; + + Ok(()) + })?; + + edit_file.write_all(HELP_STR.as_bytes())?; + edit_file.flush().context("Failed to flush edit file")?; + + Ok(()) +} + +async fn process_file(app: &App, file: &File) -> Result<i64> { + let mut line_number = 0; + + let mut ops = Operations::new("Select: process file"); + + // Fetch all the hashes once, instead of every time we need to process a line. + let all_hashes = ExtractorHash::get_all(app).await?; + + let reader = BufReader::new(file); + for line in reader.lines() { + let line = line.context("Failed to read a line")?; + + if let Some(line) = process_line(&line)? { + line_number -= 1; + + // debug!( + // "Parsed command: `{}`", + // line.iter() + // .map(|val| format!("\"{}\"", val)) + // .collect::<Vec<String>>() + // .join(" ") + // ); + + let arg_line = ["yt", "select"] + .into_iter() + .chain(line.iter().map(String::as_str)); + + let args = CliArgs::parse_from(arg_line); + + let Command::Select { cmd } = args + .command + .expect("This will be some, as we constructed it above.") + else { + unreachable!("This is checked in the `filter_line` function") + }; + + match cmd.expect( + "This value should always be some \ + here, as it would otherwise thrown an error above.", + ) { + SelectCommand::File { .. } | SelectCommand::Split { .. } => { + bail!("You cannot use `select file` or `select split` recursively.") + } + SelectCommand::Add { urls, start, stop } => { + Box::pin(standalone::add::add(app, urls, start, stop)).await?; + } + other => { + let shared = other + .clone() + .into_shared() + .expect("The ones without shared should have been filtered out."); + + let hash = shared.hash.realize(app, Some(&all_hashes)).await?; + let mut video = hash + .get_with_app(app) + .await + .expect("The hash was already realized, it should therefore exist"); + + handle_select_cmd(app, other, &mut video, Some(line_number), &mut ops).await?; + } + } + } + } + + ops.commit(app).await?; + Ok(-line_number) +} + +async fn open_editor_at(path: &Path) -> Result<()> { + let editor = env::var("EDITOR").unwrap_or("nvim".to_owned()); + + let mut nvim = process::Command::new(&editor); + nvim.arg(path); + let status = nvim + .status() + .await + .with_context(|| format!("Falied to run editor: {editor}"))?; + + if status.success() { + Ok(()) + } else { + bail!("Editor ({editor}) exited with error status: {}", status) + } +} + +fn process_line(line: &str) -> Result<Option<Vec<String>>> { + // Filter out comments and empty lines + if line.starts_with('#') || line.trim().is_empty() { + Ok(None) + } else { + let split: Vec<_> = { + let mut shl = Shlex::new(line); + let res = shl.by_ref().collect(); + + if shl.had_error { + bail!("Failed to parse line '{line}'") + } + + assert_eq!(shl.line_no, 1, "A unexpected newline appeared"); + res + }; + + assert!(!split.is_empty()); + + Ok(Some(split)) + } +} diff --git a/crates/yt/src/commands/select/implm/mod.rs b/crates/yt/src/commands/select/implm/mod.rs new file mode 100644 index 0000000..755076c --- /dev/null +++ b/crates/yt/src/commands/select/implm/mod.rs @@ -0,0 +1,42 @@ +use crate::{app::App, commands::select::SelectCommand, storage::db::insert::Operations}; + +use anyhow::Result; + +mod fs_generators; +mod standalone; + +impl SelectCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + match self { + SelectCommand::File { + done, + use_last_selection, + } => Box::pin(fs_generators::select_file(&app, done, use_last_selection)).await?, + SelectCommand::Split { + done, + sort_key, + sort_mode, + } => Box::pin(fs_generators::select_split(&app, done, sort_key, sort_mode)).await?, + SelectCommand::Add { urls, start, stop } => { + Box::pin(standalone::add::add(&app, urls, start, stop)).await?; + } + other => { + let shared = other + .clone() + .into_shared() + .expect("The ones without shared should have been filtered out."); + let hash = shared.hash.realize(&app, None).await?; + let mut video = hash + .get_with_app(&app) + .await + .expect("The hash was already realized, it should therefore exist"); + + let mut ops = Operations::new("Main: handle select cmd"); + standalone::handle_select_cmd(&app, other, &mut video, None, &mut ops).await?; + ops.commit(&app).await?; + } + } + + Ok(()) + } +} diff --git a/crates/yt/src/commands/select/implm/standalone/add.rs b/crates/yt/src/commands/select/implm/standalone/add.rs new file mode 100644 index 0000000..ec32039 --- /dev/null +++ b/crates/yt/src/commands/select/implm/standalone/add.rs @@ -0,0 +1,186 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use crate::{ + app::App, + storage::db::{extractor_hash::ExtractorHash, insert::Operations, video::Video}, + yt_dlp::yt_dlp_opts_updating, +}; + +use anyhow::{Context, Result, bail}; +use log::{error, warn}; +use url::Url; +use yt_dlp::{YoutubeDL, info_json::InfoJson, json_cast, json_get}; + +#[allow(clippy::too_many_lines)] +pub(crate) async fn add( + app: &App, + urls: Vec<Url>, + start: Option<usize>, + stop: Option<usize>, +) -> Result<()> { + for url in urls { + async fn process_and_add(app: &App, entry: InfoJson, yt_dlp: &YoutubeDL) -> Result<()> { + let url = json_get!(entry, "url", as_str).parse()?; + + let entry = yt_dlp + .extract_info(&url, false, true) + .with_context(|| format!("Failed to fetch entry for url: '{url}'"))?; + + add_entry(app, entry).await?; + + Ok(()) + } + + async fn add_entry(app: &App, entry: InfoJson) -> Result<()> { + // We have to re-fetch all hashes every time, because a user could try to add the same + // URL twice (for whatever reason.) + let hashes = ExtractorHash::get_all(app) + .await + .context("Failed to fetch all video hashes")?; + + let extractor_hash = ExtractorHash::from_info_json(&entry); + if hashes.contains(&extractor_hash) { + error!( + "Video '{}'{} is already in the database. Skipped adding it", + extractor_hash + .as_short_hash(app) + .await + .with_context(|| format!( + "Failed to format hash of video '{}' as short hash", + entry + .get("url") + .map_or("<Unknown video Url>".to_owned(), ToString::to_string) + ))?, + entry.get("title").map_or(String::new(), |title| format!( + " (\"{}\")", + json_cast!(title, as_str) + )) + ); + return Ok(()); + } + + let mut ops = Operations::new("SelectAdd: Video entry to video"); + let video = Video::from_info_json(&entry, None)?.add(&mut ops)?; + ops.commit(app).await?; + + println!("{}", &video.to_line_display(app, None).await?); + + Ok(()) + } + + let yt_dlp = yt_dlp_opts_updating(start.unwrap_or(0) + stop.unwrap_or(0))?; + + let entry = yt_dlp + .extract_info(&url, false, true) + .with_context(|| format!("Failed to fetch entry for url: '{url}'"))?; + + match entry.get("_type").map(|val| json_cast!(val, as_str)) { + Some("video") => { + add_entry(app, entry).await?; + if start.is_some() || stop.is_some() { + warn!( + "You added `start` and/or `stop` markers for a single *video*! These will be ignored." + ); + } + } + Some("playlist") => { + if let Some(entries) = entry.get("entries") { + let entries = json_cast!(entries, as_array); + let start = start.unwrap_or(0); + let stop = stop.unwrap_or(entries.len() - 1); + + let respected_entries = + take_vector(entries, start, stop).with_context(|| { + format!( + "Failed to take entries starting at: {start} and ending with {stop}" + ) + })?; + + if respected_entries.is_empty() { + warn!("No entries found, after applying your start/stop limits."); + } else { + // Pre-warm the cache + process_and_add( + app, + json_cast!(respected_entries[0], as_object).to_owned(), + &yt_dlp, + ) + .await?; + let respected_entries = &respected_entries[1..]; + + let futures: Vec<_> = respected_entries + .iter() + .map(|entry| { + process_and_add( + app, + json_cast!(entry, as_object).to_owned(), + &yt_dlp, + ) + }) + .collect(); + + for fut in futures { + fut.await?; + } + } + } else { + bail!("Your playlist does not seem to have any entries!") + } + } + other => bail!( + "Your URL should point to a video or a playlist, but points to a '{:#?}'", + other + ), + } + } + + Ok(()) +} + +fn take_vector<T>(vector: &[T], start: usize, stop: usize) -> Result<&[T]> { + let length = vector.len(); + + if stop >= length { + bail!( + "Your stop marker ({stop}) exceeds the possible entries ({length})! Remember that it is zero indexed." + ); + } + + Ok(&vector[start..=stop]) +} + +#[cfg(test)] +mod test { + use super::take_vector; + + #[test] + fn test_vector_take() { + let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + let new_vec = take_vector(&vec, 2, 8).unwrap(); + + assert_eq!(new_vec, vec![2, 3, 4, 5, 6, 7, 8]); + } + + #[test] + fn test_vector_take_overflow() { + let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + assert!(take_vector(&vec, 0, 12).is_err()); + } + + #[test] + fn test_vector_take_equal() { + let vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + assert!(take_vector(&vec, 0, 11).is_err()); + } +} diff --git a/crates/yt/src/commands/select/implm/standalone/mod.rs b/crates/yt/src/commands/select/implm/standalone/mod.rs new file mode 100644 index 0000000..dd6de45 --- /dev/null +++ b/crates/yt/src/commands/select/implm/standalone/mod.rs @@ -0,0 +1,122 @@ +use std::io::{Write, stderr}; + +use crate::{ + ansi_escape_codes, + app::App, + commands::select::{SelectCommand, SharedSelectionCommandArgs}, + storage::db::{ + insert::{Operations, video::Operation}, + video::{Priority, Video, VideoStatus}, + }, +}; + +use anyhow::{Context, Result, bail}; + +pub(super) mod add; + +pub(crate) async fn handle_select_cmd( + app: &App, + cmd: SelectCommand, + video: &mut Video, + line_number: Option<i64>, + ops: &mut Operations<Operation>, +) -> Result<()> { + let status = match cmd { + SelectCommand::Pick { shared } => Some((VideoStatus::Pick, shared)), + SelectCommand::Drop { shared } => Some((VideoStatus::Drop, shared)), + SelectCommand::Watched { shared } => Some((VideoStatus::Watched, shared)), + SelectCommand::Watch { shared } => { + if let VideoStatus::Cached { + cache_path, + is_focused, + } = &video.status + { + Some(( + VideoStatus::Cached { + cache_path: cache_path.to_owned(), + is_focused: *is_focused, + }, + shared, + )) + } else { + Some((VideoStatus::Watch, shared)) + } + } + SelectCommand::Url { shared } => { + let Some(url) = shared.url else { + bail!("You need to provide a url to `select url ..`") + }; + + let mut firefox = std::process::Command::new("firefox"); + firefox.args(["-P", "timesinks.youtube"]); + firefox.arg(url.as_str()); + let _handle = firefox.spawn().context("Failed to run firefox")?; + None + } + SelectCommand::File { .. } | SelectCommand::Split { .. } | SelectCommand::Add { .. } => { + unreachable!("These should have been filtered out") + } + }; + + if let Some((status, shared)) = status { + handle_status_change( + app, + video, + shared, + line_number, + status, + line_number.is_none(), + ops, + ) + .await?; + } + + Ok(()) +} + +async fn handle_status_change( + app: &App, + video: &mut Video, + shared: SharedSelectionCommandArgs, + line_number: Option<i64>, + new_status: VideoStatus, + is_single: bool, + ops: &mut Operations<Operation>, +) -> Result<()> { + let priority = compute_priority(line_number, shared.priority); + + video.set_status(new_status, ops); + if let Some(priority) = priority { + video.set_priority(priority, ops); + } + + if let Some(subtitle_langs) = shared.subtitle_langs { + video.set_subtitle_langs(subtitle_langs, ops); + } + if let Some(playback_speed) = shared.playback_speed { + video.set_playback_speed(playback_speed, ops); + } + + if !is_single { + ansi_escape_codes::clear_whole_line(); + ansi_escape_codes::move_to_col(1); + } + + eprint!("{}", &video.to_line_display(app, None).await?); + + if is_single { + eprintln!(); + } else { + stderr().flush()?; + } + + Ok(()) +} + +fn compute_priority(line_number: Option<i64>, priority: Option<i64>) -> Option<Priority> { + if let Some(pri) = priority { + Some(Priority::from(pri)) + } else { + line_number.map(Priority::from) + } +} diff --git a/crates/yt/src/commands/select/mod.rs b/crates/yt/src/commands/select/mod.rs new file mode 100644 index 0000000..1b06206 --- /dev/null +++ b/crates/yt/src/commands/select/mod.rs @@ -0,0 +1,219 @@ +use std::{ + fmt::{self, Display, Formatter}, + str::FromStr, +}; + +use chrono::NaiveDate; +use clap::{Args, Subcommand, ValueEnum}; +use url::Url; + +use crate::{select::duration::MaybeDuration, storage::db::extractor_hash::LazyExtractorHash}; + +mod implm; + +#[derive(Subcommand, Clone, Debug)] +// NOTE: Keep this in sync with the [`constants::HELP_STR`] constant. <2024-08-20> +// NOTE: Also keep this in sync with the `tree-sitter-yts/grammar.js`. <2024-11-04> +pub(crate) enum SelectCommand { + /// Open a `git rebase` like file to select the videos to watch (the default) + File { + /// Include done (watched, dropped) videos + #[arg(long, short)] + done: bool, + + /// Use the last selection file (useful if you've spend time on it and want to get it again) + #[arg(long, short, conflicts_with = "done")] + use_last_selection: bool, + }, + + /// Generate a directory, where each file contains only one subscription. + Split { + /// Include done (watched, dropped) videos + #[arg(long, short)] + done: bool, + + /// Which key to use for sorting. + #[arg(default_value_t)] + sort_key: SelectSplitSortKey, + + /// Which mode to use for sorting. + #[arg(default_value_t)] + sort_mode: SelectSplitSortMode, + }, + + /// Add a video to the database + /// + /// This optionally supports to add a playlist. + /// When a playlist is added, the `start` and `stop` arguments can be used to select which + /// playlist entries to include. + #[command(visible_alias = "a")] + Add { + urls: Vec<Url>, + + /// Start adding playlist entries at this playlist index (zero based and inclusive) + #[arg(short = 's', long)] + start: Option<usize>, + + /// Stop adding playlist entries at this playlist index (zero based and inclusive) + #[arg(short = 'e', long)] + stop: Option<usize>, + }, + + /// Mark the video given by the hash to be watched + #[command(visible_alias = "w")] + Watch { + #[command(flatten)] + shared: SharedSelectionCommandArgs, + }, + + /// Mark the video given by the hash to be dropped + #[command(visible_alias = "d")] + Drop { + #[command(flatten)] + shared: SharedSelectionCommandArgs, + }, + + /// Mark the video given by the hash as already watched + #[command(visible_alias = "wd")] + Watched { + #[command(flatten)] + shared: SharedSelectionCommandArgs, + }, + + /// Open the video URL in Firefox's `timesinks.youtube` profile + #[command(visible_alias = "u")] + Url { + #[command(flatten)] + shared: SharedSelectionCommandArgs, + }, + + /// Reset the videos status to 'Pick' + #[command(visible_alias = "p")] + Pick { + #[command(flatten)] + shared: SharedSelectionCommandArgs, + }, +} +impl Default for SelectCommand { + fn default() -> Self { + Self::File { + done: false, + use_last_selection: false, + } + } +} + +#[derive(Clone, Debug, Args)] +#[command(infer_subcommands = true)] +/// Mark the video given by the hash to be watched +pub(crate) struct SharedSelectionCommandArgs { + /// The ordering priority (higher means more at the top) + #[arg(short, long)] + pub(crate) priority: Option<i64>, + + /// The subtitles to download (e.g. 'en,de,sv') + #[arg(short = 'l', long)] + pub(crate) subtitle_langs: Option<String>, + + /// The speed to set mpv to + #[arg(short = 's', long)] + pub(crate) playback_speed: Option<f64>, + + /// The short extractor hash + pub(crate) hash: LazyExtractorHash, + + pub(crate) title: Option<String>, + + pub(crate) date: Option<OptionalNaiveDate>, + + pub(crate) publisher: Option<OptionalPublisher>, + + pub(crate) duration: Option<MaybeDuration>, + + pub(crate) url: Option<Url>, +} + +impl SelectCommand { + pub(crate) fn into_shared(self) -> Option<SharedSelectionCommandArgs> { + match self { + SelectCommand::File { .. } + | SelectCommand::Split { .. } + | SelectCommand::Add { .. } => None, + SelectCommand::Watch { shared } + | SelectCommand::Drop { shared } + | SelectCommand::Watched { shared } + | SelectCommand::Url { shared } + | SelectCommand::Pick { shared } => Some(shared), + } + } +} + +#[derive(Clone, Debug, Copy)] +pub(crate) struct OptionalNaiveDate { + pub(crate) date: Option<NaiveDate>, +} +impl FromStr for OptionalNaiveDate { + type Err = anyhow::Error; + fn from_str(v: &str) -> Result<Self, Self::Err> { + if v == "[No release date]" { + Ok(Self { date: None }) + } else { + Ok(Self { + date: Some(NaiveDate::from_str(v)?), + }) + } + } +} +#[derive(Clone, Debug)] +pub(crate) struct OptionalPublisher { + pub(crate) publisher: Option<String>, +} +impl FromStr for OptionalPublisher { + type Err = anyhow::Error; + fn from_str(v: &str) -> Result<Self, Self::Err> { + if v == "[No author]" { + Ok(Self { publisher: None }) + } else { + Ok(Self { + publisher: Some(v.to_owned()), + }) + } + } +} + +#[derive(Default, ValueEnum, Clone, Copy, Debug)] +pub(crate) enum SelectSplitSortKey { + /// Sort by the name of the publisher. + #[default] + Publisher, + + /// Sort by the number of unselected videos per publisher. + Videos, +} +impl Display for SelectSplitSortKey { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + SelectSplitSortKey::Publisher => f.write_str("publisher"), + SelectSplitSortKey::Videos => f.write_str("videos"), + } + } +} + +#[derive(Default, ValueEnum, Clone, Copy, Debug)] +pub(crate) enum SelectSplitSortMode { + /// Sort in ascending order (small -> big) + #[default] + Asc, + + /// Sort in descending order (big -> small) + Desc, +} + +impl Display for SelectSplitSortMode { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + SelectSplitSortMode::Asc => f.write_str("asc"), + SelectSplitSortMode::Desc => f.write_str("desc"), + } + } +} diff --git a/crates/yt/src/commands/status/implm.rs b/crates/yt/src/commands/status/implm.rs new file mode 100644 index 0000000..fc046c9 --- /dev/null +++ b/crates/yt/src/commands/status/implm.rs @@ -0,0 +1,147 @@ +use std::time::Duration; + +use crate::{ + app::App, + commands::status::StatusCommand, + select::duration::MaybeDuration, + shared::bytes::Bytes, + storage::db::{ + subscription::Subscriptions, + video::{Video, VideoStatusMarker}, + }, + yt_dlp::get_current_cache_allocation, +}; + +use anyhow::{Context, Result}; + +macro_rules! get { + ($videos:expr, $status:ident) => { + $videos + .iter() + .filter(|vid| vid.status.as_marker() == VideoStatusMarker::$status) + .count() + }; + + (@collect $videos:expr, $status:ident) => { + $videos + .iter() + .filter(|vid| vid.status.as_marker() == VideoStatusMarker::$status) + .collect() + }; +} + +impl StatusCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + let StatusCommand { format } = self; + + let all_videos = Video::in_states(app, VideoStatusMarker::ALL).await?; + + // lengths + let picked_videos_len = get!(all_videos, Pick); + + let watch_videos_len = get!(all_videos, Watch); + let cached_videos_len = get!(all_videos, Cached); + let watched_videos_len = get!(all_videos, Watched); + let watched_videos: Vec<_> = get!(@collect all_videos, Watched); + + let drop_videos_len = get!(all_videos, Drop); + let dropped_videos_len = get!(all_videos, Dropped); + + let subscriptions = Subscriptions::get(app).await?; + let subscriptions_len = subscriptions.0.len(); + + let watchtime_status = { + let total_watch_time_raw = watched_videos + .iter() + .fold(Duration::default(), |acc, vid| acc + vid.watch_progress); + + // Most things are watched at a speed of s (which is defined in the config file). + // Thus + // y = x * s -> y / s = x + let total_watch_time = Duration::from_secs_f64( + (total_watch_time_raw.as_secs_f64()) / app.config.select.playback_speed, + ); + + let speed = app.config.select.playback_speed; + + // Do not print the adjusted time, if the user has keep the speed level at 1. + #[allow(clippy::float_cmp)] + if speed == 1.0 { + format!( + "Total Watchtime: {}\n", + MaybeDuration::from_std(total_watch_time_raw) + ) + } else { + format!( + "Total Watchtime: {} (at {speed} speed: {})\n", + MaybeDuration::from_std(total_watch_time_raw), + MaybeDuration::from_std(total_watch_time), + ) + } + }; + + let watch_rate: f64 = { + fn to_f64(input: usize) -> f64 { + f64::from(u32::try_from(input).expect("This should never exceed u32::MAX")) + } + + let count = + to_f64(watched_videos_len) / (to_f64(drop_videos_len) + to_f64(dropped_videos_len)); + count * 100.0 + }; + + let cache_usage: Bytes = get_current_cache_allocation(app) + .await + .context("Failed to get current cache allocation")?; + + if let Some(fmt) = format { + let output = fmt + .replace( + "{picked_videos_len}", + picked_videos_len.to_string().as_str(), + ) + .replace("{watch_videos_len}", watch_videos_len.to_string().as_str()) + .replace( + "{cached_videos_len}", + cached_videos_len.to_string().as_str(), + ) + .replace( + "{watched_videos_len}", + watched_videos_len.to_string().as_str(), + ) + .replace("{watch_rate}", watch_rate.to_string().as_str()) + .replace("{drop_videos_len}", drop_videos_len.to_string().as_str()) + .replace( + "{dropped_videos_len}", + dropped_videos_len.to_string().as_str(), + ) + .replace("{watchtime_status}", watchtime_status.to_string().as_str()) + .replace( + "{subscriptions_len}", + subscriptions_len.to_string().as_str(), + ) + .replace("{cache_usage}", cache_usage.to_string().as_str()); + + print!("{output}"); + } else { + println!( + "\ +Picked Videos: {picked_videos_len} + +Watch Videos: {watch_videos_len} +Cached Videos: {cached_videos_len} +Watched Videos: {watched_videos_len} (watch rate: {watch_rate:.2} %) + +Drop Videos: {drop_videos_len} +Dropped Videos: {dropped_videos_len} + +{watchtime_status} + + Subscriptions: {subscriptions_len} + Cache usage: {cache_usage}" + ); + } + + Ok(()) + } +} diff --git a/crates/yt/src/commands/status/mod.rs b/crates/yt/src/commands/status/mod.rs new file mode 100644 index 0000000..dc6e865 --- /dev/null +++ b/crates/yt/src/commands/status/mod.rs @@ -0,0 +1,10 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct StatusCommand { + /// Which format to use + #[arg(short, long)] + format: Option<String>, +} diff --git a/crates/yt/src/commands/subscriptions/implm.rs b/crates/yt/src/commands/subscriptions/implm.rs new file mode 100644 index 0000000..3051522 --- /dev/null +++ b/crates/yt/src/commands/subscriptions/implm.rs @@ -0,0 +1,243 @@ +use std::str::FromStr; + +use crate::{ + app::App, + commands::subscriptions::SubscriptionCommand, + storage::db::{ + insert::{Operations, subscription::Operation}, + subscription::{Subscription, Subscriptions, check_url}, + }, +}; + +use anyhow::{Context, Result, bail}; +use log::{error, warn}; +use tokio::{ + fs::File, + io::{AsyncBufRead, AsyncBufReadExt, BufReader, stdin}, +}; +use url::Url; +use yt_dlp::{json_cast, json_get, options::YoutubeDLOptions}; + +impl SubscriptionCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + match self { + SubscriptionCommand::Add { + name, + url, + no_check, + } => { + let mut ops = Operations::new("main: subscribe"); + subscribe(&app, name, url, no_check, &mut ops) + .await + .context("Failed to add a subscription")?; + ops.commit(&app).await?; + } + SubscriptionCommand::Remove { name } => { + let mut present_subscriptions = Subscriptions::get(app).await?; + + let mut ops = Operations::new("Subscribe: unsubscribe"); + if let Some(subscription) = present_subscriptions.0.remove(&name) { + subscription.remove(&mut ops); + } else { + bail!("Couldn't find subscription: '{}'", &name); + } + ops.commit(app) + .await + .with_context(|| format!("Failed to unsubscribe from {name:?}"))?; + } + SubscriptionCommand::List {} => { + let all_subs = Subscriptions::get(&app).await?; + + for (key, val) in all_subs.0 { + println!("{}: '{}'", key, val.url); + } + } + SubscriptionCommand::Export {} => { + let all_subs = Subscriptions::get(&app).await?; + for val in all_subs.0.values() { + println!("{}", val.url); + } + } + SubscriptionCommand::Import { + file, + force, + no_check, + } => { + if let Some(file) = file { + let f = File::open(file).await?; + + import(&app, BufReader::new(f), force, no_check).await?; + } else { + import(&app, BufReader::new(stdin()), force, no_check).await?; + } + } + } + + Ok(()) + } +} + +async fn import<W: AsyncBufRead + AsyncBufReadExt + Unpin>( + app: &App, + reader: W, + force: bool, + no_check: bool, +) -> Result<()> { + let mut ops = Operations::new("SubscribeImport: init"); + + let all = Subscriptions::get(app).await?; + if force { + all.remove(&mut ops); + } + ops.commit(app).await?; + let mut ops = Operations::new("SubscribeImport: after all subs remove"); + + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + let url = + Url::from_str(&line).with_context(|| format!("Failed to parse '{line}' as url"))?; + + match subscribe(app, None, url, no_check, &mut ops) + .await + .with_context(|| format!("Failed to subscribe to: '{line}'")) + { + Ok(()) => (), + Err(err) => eprintln!( + "Error while subscribing to '{}': '{}'", + line, + err.source().expect("Should have a source") + ), + } + } + ops.commit(app).await?; + + Ok(()) +} + +async fn subscribe( + app: &App, + name: Option<String>, + url: Url, + no_check: bool, + ops: &mut Operations<Operation>, +) -> Result<()> { + if !(url.as_str().ends_with("videos") + || url.as_str().ends_with("streams") + || url.as_str().ends_with("shorts") + || url.as_str().ends_with("videos/") + || url.as_str().ends_with("streams/") + || url.as_str().ends_with("shorts/")) + && url.as_str().contains("youtube.com") + { + warn!( + "Your youtube url does not seem like it actually tracks a channels playlist \ + (videos, streams, shorts). Adding subscriptions for each of them..." + ); + + let url = Url::parse(&(url.as_str().to_owned() + "/")) + .expect("This was an url, it should stay one"); + + let (videos, streams, shorts) = if let Some(name) = name { + ( + Some(name.clone() + " {Videos}"), + Some(name.clone() + " {Streams}"), + Some(name.clone() + " {Shorts}"), + ) + } else { + (None, None, None) + }; + + let _ = actual_subscribe( + app, + videos, + url.join("videos/").expect("See above."), + no_check, + ops, + ) + .await + .map_err(|err| { + error!("Failed to subscribe to the '{}' variant: {err}", "{Videos}"); + }); + + let _ = actual_subscribe( + app, + streams, + url.join("streams/").expect("See above."), + no_check, + ops, + ) + .await + .map_err(|err| { + error!( + "Failed to subscribe to the '{}' variant: {err}", + "{Streams}" + ); + }); + + let _ = actual_subscribe( + app, + shorts, + url.join("shorts/").expect("See above."), + no_check, + ops, + ) + .await + .map_err(|err| { + error!("Failed to subscribe to the '{}' variant: {err}", "{Shorts}"); + }); + } else { + actual_subscribe(app, name, url, no_check, ops).await?; + } + + Ok(()) +} + +async fn actual_subscribe( + app: &App, + name: Option<String>, + url: Url, + no_check: bool, + ops: &mut Operations<Operation>, +) -> Result<()> { + if !no_check && !check_url(url.clone()).await? { + bail!("The url ('{}') does not represent a playlist!", &url) + } + + let name = if let Some(name) = name { + name + } else { + let yt_dlp = YoutubeDLOptions::new() + .set("playliststart", 1) + .set("playlistend", 10) + .set("noplaylist", false) + .set("extract_flat", "in_playlist") + .build()?; + + let info = yt_dlp.extract_info(&url, false, false)?; + + if info.get("_type").map(|v| json_cast!(v, as_str)) == Some("playlist") { + json_get!(info, "title", as_str).to_owned() + } else { + bail!("The url ('{}') does not represent a playlist!", &url) + } + }; + + let present_subscriptions = Subscriptions::get(app).await?; + + if let Some(subs) = present_subscriptions.0.get(&name) { + bail!( + "The subscription '{}' could not be added, \ + as another one with the same name ('{}') already exists. \ + It points to the Url: '{}'", + name, + name, + subs.url + ); + } + + let sub = Subscription { name, url }; + + sub.add(ops); + + Ok(()) +} diff --git a/crates/yt/src/commands/subscriptions/mod.rs b/crates/yt/src/commands/subscriptions/mod.rs new file mode 100644 index 0000000..530f5f5 --- /dev/null +++ b/crates/yt/src/commands/subscriptions/mod.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use clap::Subcommand; +use clap_complete::ArgValueCompleter; +use url::Url; + +use crate::commands::complete_subscription; + +mod implm; + +#[derive(Subcommand, Clone, Debug)] +pub(crate) enum SubscriptionCommand { + /// Subscribe to an URL + Add { + #[arg(short, long)] + /// The human readable name of the subscription + name: Option<String>, + + /// The URL to listen to + url: Url, + + /// Don't check, whether the URL actually points to something yt understands. + #[arg(long, default_value_t = false)] + no_check: bool, + }, + + /// Unsubscribe from an URL + Remove { + /// The human readable name of the subscription + #[arg(add = ArgValueCompleter::new(complete_subscription))] + name: String, + }, + + /// Import a bunch of URLs as subscriptions. + Import { + /// The file containing the URLs. Will use Stdin otherwise. + file: Option<PathBuf>, + + /// Remove any previous subscriptions + #[arg(short, long)] + force: bool, + + /// Don't check, whether the URLs actually point to something yt understands. + #[arg(long, default_value_t = false)] + no_check: bool, + }, + /// Write all subscriptions in an format understood by `import` + Export {}, + + /// List all subscriptions + List {}, +} diff --git a/crates/yt/src/commands/update/implm/mod.rs b/crates/yt/src/commands/update/implm/mod.rs new file mode 100644 index 0000000..bb9323e --- /dev/null +++ b/crates/yt/src/commands/update/implm/mod.rs @@ -0,0 +1,52 @@ +use crate::{ + app::App, + commands::update::{UpdateCommand, implm::updater::Updater}, + storage::db::{ + extractor_hash::ExtractorHash, + subscription::{Subscription, Subscriptions}, + }, +}; + +use anyhow::{Result, bail}; + +mod updater; + +impl UpdateCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + let UpdateCommand { + max_backlog, + subscriptions: subscription_names_to_update, + } = self; + + let mut all_subs = Subscriptions::get(&app).await?; + + let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog); + + let subs: Vec<Subscription> = if subscription_names_to_update.is_empty() { + all_subs.0.into_values().collect() + } else { + subscription_names_to_update + .into_iter() + .map(|sub| { + if let Some(val) = all_subs.0.remove(&sub) { + Ok(val) + } else { + bail!( + "Your specified subscription to update '{}' is not a subscription!", + sub + ) + } + }) + .collect::<Result<_>>()? + }; + + // We can get away with not having to re-fetch the hashes every time, as the returned video + // should not contain duplicates. + let hashes = ExtractorHash::get_all(app).await?; + + let updater = Updater::new(max_backlog, app.config.update.pool_size, hashes); + updater.update(app, subs).await?; + + Ok(()) + } +} diff --git a/crates/yt/src/commands/update/implm/updater.rs b/crates/yt/src/commands/update/implm/updater.rs new file mode 100644 index 0000000..5969d54 --- /dev/null +++ b/crates/yt/src/commands/update/implm/updater.rs @@ -0,0 +1,197 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +use anyhow::{Context, Result}; +use futures::{StreamExt, future::join_all, stream}; +use log::{Level, debug, error, log_enabled}; +use tokio::io::{AsyncWriteExt, stderr}; +use tokio_util::task::LocalPoolHandle; +use yt_dlp::{ + info_json::InfoJson, json_cast, options::YoutubeDLOptions, process_ie_result, + python_error::PythonError, +}; + +use crate::{ + ansi_escape_codes, + app::App, + storage::db::{ + extractor_hash::ExtractorHash, insert::Operations, subscription::Subscription, video::Video, + }, + yt_dlp::yt_dlp_opts_updating, +}; + +pub(super) struct Updater { + max_backlog: usize, + hashes: Vec<ExtractorHash>, + pool: LocalPoolHandle, +} + +static REACHED_NUMBER: AtomicUsize = const { AtomicUsize::new(1) }; + +impl Updater { + pub(super) fn new(max_backlog: usize, max_threads: usize, hashes: Vec<ExtractorHash>) -> Self { + let pool = LocalPoolHandle::new(max_threads); + + Self { + max_backlog, + hashes, + pool, + } + } + + pub(super) async fn update(self, app: &App, subscriptions: Vec<Subscription>) -> Result<()> { + let total_number = subscriptions.len(); + + let mut stream = stream::iter(subscriptions) + .map(|sub| self.get_new_entries(sub, total_number)) + .buffer_unordered(app.config.update.futures); + + while let Some(output) = stream.next().await { + let mut entries = output?; + + if let Some(next) = entries.next() { + let (sub, entry) = next; + process_subscription(app, sub, entry).await?; + + join_all(entries.map(|(sub, entry)| process_subscription(app, sub, entry))) + .await + .into_iter() + .collect::<Result<(), _>>()?; + } + } + + Ok(()) + } + + #[allow(clippy::too_many_lines)] + async fn get_new_entries( + &self, + sub: Subscription, + total_number: usize, + ) -> Result<impl Iterator<Item = (Subscription, InfoJson)>> { + let max_backlog = self.max_backlog; + let hashes = self.hashes.clone(); + + let yt_dlp = yt_dlp_opts_updating(max_backlog)?; + + self.pool + .spawn_pinned(move || { + async move { + if !log_enabled!(Level::Debug) { + ansi_escape_codes::clear_whole_line(); + ansi_escape_codes::move_to_col(1); + eprint!( + "({}/{total_number}) Checking playlist {}...", + REACHED_NUMBER.fetch_add(1, Ordering::Relaxed), + sub.name + ); + ansi_escape_codes::move_to_col(1); + stderr().flush().await?; + } + + let info = yt_dlp + .extract_info(&sub.url, false, false) + .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?; + + let empty = vec![]; + let entries = info + .get("entries") + .map_or(&empty, |val| json_cast!(val, as_array)); + + let valid_entries: Vec<(Subscription, InfoJson)> = entries + .iter() + .take(max_backlog) + .filter_map(|entry| -> Option<(Subscription, InfoJson)> { + let extractor_hash = + ExtractorHash::from_info_json(json_cast!(entry, as_object)); + + if hashes.contains(&extractor_hash) { + debug!( + "Skipping entry, as it is \ + already present: '{extractor_hash}'", + ); + None + } else { + Some((sub.clone(), json_cast!(entry, as_object).to_owned())) + } + }) + .collect(); + + Ok(valid_entries + .into_iter() + .map(|(sub, entry)| { + let inner_yt_dlp = YoutubeDLOptions::new() + .set("noplaylist", true) + .build() + .expect("Worked before, should work now"); + + match inner_yt_dlp.process_ie_result(entry, false) { + Ok(output) => Ok((sub, output)), + Err(err) => Err(err), + } + }) + // Don't fail the whole update, if one of the entries fails to fetch. + .filter_map(move |base| match base { + Ok(ok) => Some(ok), + Err(err) => { + match err { + process_ie_result::Error::Python(PythonError(err)) => { + if err.contains( + "Join this channel to get access \ + to members-only content ", + ) { + // Hide this error + } else { + // Show the error, but don't fail. + let error = err + .strip_prefix( + "DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ", + ) + .unwrap_or(&err); + error!("While fetching {:#?}: {error}", sub.name); + } + + None + } + process_ie_result::Error::InfoJsonPrepare(error) => { + error!( + "While fetching {:#?}: Failed to prepare \ + info json: {error}", + sub.name + ); + None + } + } + } + })) + } + }) + .await? + } +} + +async fn process_subscription(app: &App, sub: Subscription, entry: InfoJson) -> Result<()> { + let mut ops = Operations::new("Update: process subscription"); + let video = Video::from_info_json(&entry, Some(&sub)) + .context("Failed to parse search entry as Video")?; + + let title = video.title.clone(); + let url = video.url.clone(); + let video = video.add(&mut ops).with_context(|| { + format!("Failed to add video to database: '{title}' (with url: '{url}')") + })?; + + ops.commit(app).await.with_context(|| { + format!( + "Failed to add video to database: '{}' (with url: '{}')", + video.title, video.url + ) + })?; + println!( + "{}", + &video + .to_line_display(app, None) + .await + .with_context(|| format!("Failed to format video: '{}'", video.title))? + ); + Ok(()) +} diff --git a/crates/yt/src/commands/update/mod.rs b/crates/yt/src/commands/update/mod.rs new file mode 100644 index 0000000..6f1c865 --- /dev/null +++ b/crates/yt/src/commands/update/mod.rs @@ -0,0 +1,17 @@ +use clap::Parser; +use clap_complete::ArgValueCompleter; + +use crate::commands::complete_subscription; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct UpdateCommand { + /// The maximal number of videos to fetch for each subscription. + #[arg(short, long)] + max_backlog: Option<usize>, + + /// The subscriptions to update + #[arg(add = ArgValueCompleter::new(complete_subscription))] + subscriptions: Vec<String>, +} diff --git a/crates/yt/src/commands/videos/implm.rs b/crates/yt/src/commands/videos/implm.rs new file mode 100644 index 0000000..7d13ceb --- /dev/null +++ b/crates/yt/src/commands/videos/implm.rs @@ -0,0 +1,63 @@ +use crate::{ + app::App, + commands::videos::VideosCommand, + storage::db::video::{Video, VideoStatusMarker}, +}; + +use anyhow::{Context, Result}; +use futures::{TryStreamExt, stream::FuturesUnordered}; + +impl VideosCommand { + pub(crate) async fn implm(self, app: &App) -> Result<()> { + match self { + VideosCommand::List { + search_query, + limit, + format, + } => { + let all_videos = Video::in_states(app, VideoStatusMarker::ALL).await?; + + // turn one video to a color display, to pre-warm the hash shrinking cache + if let Some(val) = all_videos.first() { + val.to_line_display(app, format.clone()).await?; + } + + let limit = limit.unwrap_or(all_videos.len()); + + let all_video_strings: Vec<String> = all_videos + .into_iter() + .take(limit) + .map(|vid| to_line_display_owned(vid, app, format.clone())) + .collect::<FuturesUnordered<_>>() + .try_collect::<Vec<String>>() + .await?; + + if let Some(query) = search_query { + all_video_strings + .into_iter() + .filter(|video| video.to_lowercase().contains(&query.to_lowercase())) + .for_each(|video| println!("{video}")); + } else { + println!("{}", all_video_strings.join("\n")); + } + } + VideosCommand::Info { hash, format } => { + let video = hash.realize(&app, None).await?.get_with_app(&app).await?; + + print!( + "{}", + &video + .to_info_display(&app, format) + .await + .context("Failed to format video")? + ); + } + } + + Ok(()) + } +} + +async fn to_line_display_owned(video: Video, app: &App, format: Option<String>) -> Result<String> { + video.to_line_display(app, format).await +} diff --git a/crates/yt/src/commands/videos/mod.rs b/crates/yt/src/commands/videos/mod.rs new file mode 100644 index 0000000..93a11a1 --- /dev/null +++ b/crates/yt/src/commands/videos/mod.rs @@ -0,0 +1,36 @@ +use clap::{ArgAction, Subcommand}; + +use crate::storage::db::extractor_hash::LazyExtractorHash; + +mod implm; + +#[derive(Subcommand, Clone, Debug)] +pub(crate) enum VideosCommand { + /// List the videos in the database + #[command(visible_alias = "ls")] + List { + /// An optional search query to limit the results + #[arg(action = ArgAction::Append)] + search_query: Option<String>, + + /// The format string to use. + // TODO(@bpeetz): Encode the default format, as the default string here. <2025-07-04> + #[arg(short, long)] + format: Option<String>, + + /// The number of videos to show + #[arg(short, long)] + limit: Option<usize>, + }, + + /// Get detailed information about a video + Info { + /// The short hash of the video + hash: LazyExtractorHash, + + /// The format string to use. + // TODO(@bpeetz): Encode the default format, as the default string here. <2025-07-04> + #[arg(short, long)] + format: Option<String>, + }, +} diff --git a/crates/yt/src/commands/watch/implm/mod.rs b/crates/yt/src/commands/watch/implm/mod.rs new file mode 100644 index 0000000..338f80a --- /dev/null +++ b/crates/yt/src/commands/watch/implm/mod.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use crate::{app::App, commands::watch::WatchCommand}; + +use anyhow::Result; + +mod watch; + +impl WatchCommand { + pub(crate) async fn implm(self, app: Arc<App>) -> Result<()> { + let WatchCommand { + provide_ipc_socket, + headless, + } = self; + + watch::watch(app, provide_ipc_socket, headless).await?; + + Ok(()) + } +} diff --git a/crates/yt/src/commands/watch/implm/watch/mod.rs b/crates/yt/src/commands/watch/implm/watch/mod.rs new file mode 100644 index 0000000..1436d8d --- /dev/null +++ b/crates/yt/src/commands/watch/implm/watch/mod.rs @@ -0,0 +1,235 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use std::{ + fs, + path::PathBuf, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, +}; + +use anyhow::{Context, Result}; +use libmpv2::{Mpv, events::EventContext}; +use log::{debug, info, trace, warn}; +use tokio::{task, time}; + +use self::playlist_handler::Status; +use crate::{ + app::App, + storage::{ + db::{insert::{maintenance::clear_stale_downloaded_paths, Operations}, playlist::Playlist}, + notify::wait_for_db_write, + }, +}; + +pub(crate) mod playlist_handler; + +fn init_mpv(app: &App, ipc_socket: Option<PathBuf>, headless: bool) -> Result<(Mpv, EventContext)> { + // set some default values, to make things easier (these can be overridden by the config file, + // which we load later) + let mpv = Mpv::with_initializer(|mpv| { + if let Some(socket) = ipc_socket { + mpv.set_property( + "input-ipc-server", + socket + .to_str() + .expect("This path comes from us, it should never contain not-utf8"), + )?; + } + + if headless { + // Do not provide video output. + mpv.set_property("vid", "no")?; + } else { + // Enable default key bindings, so the user can actually interact with + // the player (and e.g. close the window). + mpv.set_property("input-default-bindings", "yes")?; + mpv.set_property("input-vo-keyboard", "yes")?; + + // Show the on screen controller. + mpv.set_property("osc", "yes")?; + + // Don't automatically advance to the next video (or exit the player) + mpv.set_option("keep-open", "always")?; + + // Always display an window, even for non-video playback. + // As mpv does not have cli access, no window means no control and no user feedback. + mpv.set_option("force-window", "yes")?; + } + + Ok(()) + }) + .context("Failed to initialize mpv")?; + + let config_path = &app.config.paths.mpv_config_path; + if config_path.try_exists()? { + info!("Found mpv.conf at '{}'!", config_path.display()); + mpv.command( + "load-config-file", + &[config_path + .to_str() + .context("Failed to parse the config path is utf8-stringt")?], + )?; + } else { + warn!( + "Did not find a mpv.conf file at '{}'", + config_path.display() + ); + } + + let input_path = &app.config.paths.mpv_input_path; + if input_path.try_exists()? { + info!("Found mpv.input.conf at '{}'!", input_path.display()); + mpv.command( + "load-input-conf", + &[input_path + .to_str() + .context("Failed to parse the input path as utf8 string")?], + )?; + } else { + warn!( + "Did not find a mpv.input.conf file at '{}'", + input_path.display() + ); + } + + let ev_ctx = EventContext::new(mpv.ctx); + ev_ctx.disable_deprecated_events()?; + + Ok((mpv, ev_ctx)) +} + +pub(crate) async fn watch(app: Arc<App>, provide_ipc_socket: bool, headless: bool) -> Result<()> { + clear_stale_downloaded_paths(&app).await?; + + let ipc_socket = if provide_ipc_socket { + Some(app.config.paths.mpv_ipc_socket_path.clone()) + } else { + None + }; + + let (mpv, mut ev_ctx) = + init_mpv(&app, ipc_socket, headless).context("Failed to initialize mpv instance")?; + let mpv = Arc::new(mpv); + + // We now _know_ that the socket is set-up and ready. + if provide_ipc_socket { + println!("{}", app.config.paths.mpv_ipc_socket_path.display()); + } + + let should_break = Arc::new(AtomicBool::new(false)); + + let local_app = Arc::clone(&app); + let local_mpv = Arc::clone(&mpv); + let local_should_break = Arc::clone(&should_break); + let progress_handle = task::spawn(async move { + loop { + if local_should_break.load(Ordering::Relaxed) { + trace!("WatchProgressThread: Stopping, as we received exit signal."); + break; + } + + let mut playlist = Playlist::create(&local_app).await?; + + if let Some(index) = playlist.current_index() { + trace!("WatchProgressThread: Saving watch progress for current video"); + + let mut ops = Operations::new("WatchProgressThread: save watch progress thread"); + playlist.save_watch_progress(&local_mpv, index, &mut ops); + ops.commit(&local_app).await?; + } else { + trace!( + "WatchProgressThread: Tried to save current watch progress, but no video active." + ); + } + + time::sleep(local_app.config.watch.watch_progress_save_intervall).await; + } + + Ok::<(), anyhow::Error>(()) + }); + + // Set up the initial playlist. + let playlist = Playlist::create(&app).await?; + playlist.resync_with_mpv(&app, &mpv)?; + + let mut have_warned = (false, 0); + 'watchloop: loop { + 'waitloop: while let Ok(value) = playlist_handler::status(&app).await { + match value { + Status::NoMoreAvailable => { + break 'watchloop; + } + Status::NoCached { marked_watch } => { + // try again next time. + if have_warned.0 { + if have_warned.1 != marked_watch { + warn!("Now {marked_watch} videos are marked as to be watched."); + have_warned.1 = marked_watch; + } + } else { + warn!( + "There is nothing to watch yet, but still {marked_watch} videos marked as to be watched. \ + Will idle, until they become available" + ); + have_warned = (true, marked_watch); + } + wait_for_db_write(&app).await?; + + // Add the new videos, if they are there. + let playlist = Playlist::create(&app).await?; + playlist.resync_with_mpv(&app, &mpv)?; + } + Status::Available { newly_available } => { + debug!("Checked for currently available videos and found {newly_available}!"); + have_warned.0 = false; + + // Something just became available! + break 'waitloop; + } + } + } + + // TODO(@bpeetz): Is the following assumption correct? <2025-07-10> + // We wait until forever for the next event, because we really don't need to do anything + // else. + if let Some(ev) = ev_ctx.wait_event(f64::MAX) { + match ev { + Ok(event) => { + trace!("Mpv event triggered: {event:#?}"); + if playlist_handler::handle_mpv_event(&app, &mpv, &event) + .await + .with_context(|| format!("Failed to handle mpv event: '{event:#?}'"))? + { + break; + } + } + Err(e) => debug!("Mpv Event errored: {e}"), + } + } + } + + should_break.store(true, Ordering::Relaxed); + progress_handle.await??; + + if provide_ipc_socket { + fs::remove_file(&app.config.paths.mpv_ipc_socket_path).with_context(|| { + format!( + "Failed to clean-up the mpv ipc socket at {}", + app.config.paths.mpv_ipc_socket_path.display() + ) + })?; + } + + Ok(()) +} diff --git a/crates/yt/src/commands/watch/implm/watch/playlist_handler/client_messages.rs b/crates/yt/src/commands/watch/implm/watch/playlist_handler/client_messages.rs new file mode 100644 index 0000000..6c8ebbe --- /dev/null +++ b/crates/yt/src/commands/watch/implm/watch/playlist_handler/client_messages.rs @@ -0,0 +1,99 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use std::{env, time::Duration}; + +use crate::{app::App, storage::db::video::Video}; + +use anyhow::{Context, Result, bail}; +use libmpv2::Mpv; +use tokio::process::Command; + +use super::mpv_message; + +async fn run_self_in_external_command(app: &App, args: &[&str]) -> Result<()> { + // TODO(@bpeetz): Can we trust this value? <2025-06-15> + let binary = + env::current_exe().context("Failed to determine the current executable to re-execute")?; + + let status = Command::new("riverctl") + .args(["focus-output", "next"]) + .status() + .await?; + if !status.success() { + bail!("focusing the next output failed!"); + } + + let arguments = [ + &[ + "--title", + "floating please", + "--command", + binary + .to_str() + .context("Failed to turn the executable path to a utf8-string")?, + "--db-path", + app.config + .paths + .database_path + .to_str() + .context("Failed to parse the database_path as a utf8-string")?, + ], + args, + ] + .concat(); + + let status = Command::new("alacritty").args(arguments).status().await?; + if !status.success() { + bail!("Falied to start `yt comments`"); + } + + let status = Command::new("riverctl") + .args(["focus-output", "next"]) + .status() + .await?; + + if !status.success() { + bail!("focusing the next output failed!"); + } + + Ok(()) +} + +pub(super) async fn handle_yt_description_external(app: &App) -> Result<()> { + run_self_in_external_command(app, &["description"]).await?; + Ok(()) +} +pub(super) async fn handle_yt_description_local(app: &App, mpv: &Mpv) -> Result<()> { + let description: String = Video::get_current_description(app) + .await? + .chars() + .take(app.config.watch.local_displays_length) + .collect(); + + mpv_message(mpv, &description, Duration::from_secs(6))?; + Ok(()) +} + +pub(super) async fn handle_yt_comments_external(app: &App) -> Result<()> { + run_self_in_external_command(app, &["comments"]).await?; + Ok(()) +} +pub(super) async fn handle_yt_comments_local(app: &App, mpv: &Mpv) -> Result<()> { + let comments: String = Video::get_current_comments(app) + .await? + .render(false) + .chars() + .take(app.config.watch.local_displays_length) + .collect(); + + mpv_message(mpv, &comments, Duration::from_secs(6))?; + Ok(()) +} diff --git a/crates/yt/src/commands/watch/implm/watch/playlist_handler/mod.rs b/crates/yt/src/commands/watch/implm/watch/playlist_handler/mod.rs new file mode 100644 index 0000000..443fd26 --- /dev/null +++ b/crates/yt/src/commands/watch/implm/watch/playlist_handler/mod.rs @@ -0,0 +1,218 @@ +// yt - A fully featured command line YouTube client +// +// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de> +// SPDX-License-Identifier: GPL-3.0-or-later +// +// This file is part of Yt. +// +// You should have received a copy of the License along with this program. +// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +use std::time::Duration; + +use crate::{ + app::App, + storage::db::{ + insert::{Operations, playlist::VideoTransition}, + playlist::{Playlist, PlaylistIndex}, + video::{Video, VideoStatusMarker}, + }, +}; + +use anyhow::{Context, Result}; +use libmpv2::{EndFileReason, Mpv, events::Event}; +use log::{debug, info}; + +mod client_messages; + +#[derive(Debug, Clone, Copy)] +pub(crate) enum Status { + /// There are no videos cached and no more marked to be watched. + /// Waiting is pointless. + NoMoreAvailable, + + /// There are no videos cached, but some (> 0) are marked to be watched. + /// So we should wait for them to become available. + NoCached { marked_watch: usize }, + + /// There are videos cached and ready to be inserted into the playback queue. + Available { newly_available: usize }, +} + +fn mpv_message(mpv: &Mpv, message: &str, time: Duration) -> Result<()> { + mpv.command( + "show-text", + &[message, time.as_millis().to_string().as_str()], + )?; + Ok(()) +} + +/// Return the status of the playback queue +pub(crate) async fn status(app: &App) -> Result<Status> { + let playlist = Playlist::create(app).await?; + + let playlist_len = playlist.len(); + let marked_watch_num = Video::in_states(app, &[VideoStatusMarker::Watch]) + .await? + .len(); + + if playlist_len == 0 && marked_watch_num == 0 { + Ok(Status::NoMoreAvailable) + } else if playlist_len == 0 && marked_watch_num != 0 { + Ok(Status::NoCached { + marked_watch: marked_watch_num, + }) + } else if playlist_len != 0 { + Ok(Status::Available { + newly_available: playlist_len, + }) + } else { + unreachable!( + "The playlist length is {playlist_len}, but the number of marked watch videos is {marked_watch_num}! This is a bug." + ); + } +} + +/// # Returns +/// This will return [`true`], if the event handling should be stopped +/// +/// # Panics +/// Only if internal assertions fail. +#[allow(clippy::too_many_lines)] +pub(crate) async fn handle_mpv_event(app: &App, mpv: &Mpv, event: &Event<'_>) -> Result<bool> { + let mut ops = Operations::new("PlaylistHandler: handle event"); + + // Construct the playlist lazily. + // This avoids unneeded db lookups. + // (We use the moved `call_once` as guard for this) + let call_once = String::new(); + let playlist = move || { + drop(call_once); + Playlist::create(app) + }; + + let should_stop_event_handling = match event { + Event::EndFile(r) => match r.reason { + EndFileReason::Eof => { + info!("Mpv reached the end of the current video. Marking it watched."); + playlist().await?.resync_with_mpv(app, mpv)?; + + false + } + EndFileReason::Stop => { + // This reason is incredibly ambiguous. It _both_ means actually pausing a + // video and going to the next one in the playlist. + // Oh, and it's also called, when a video is removed from the playlist (at + // least via "playlist-remove current") + info!("Paused video (or went to next playlist entry); Doing nothing"); + + false + } + EndFileReason::Quit => { + info!("Mpv quit. Exiting playback"); + + playlist().await?.save_current_watch_progress(mpv, &mut ops); + + true + } + EndFileReason::Error => { + unreachable!("This should have been raised as a separate error") + } + EndFileReason::Redirect => { + // TODO: We probably need to handle this somehow <2025-02-17> + false + } + }, + Event::StartFile(_) => { + let mut playlist = playlist().await?; + + let mpv_pos = usize::try_from(mpv.get_property::<i64>("playlist-pos")?) + .expect("The value is strictly positive"); + + let yt_pos = playlist.current_index().map(usize::from); + + if (Some(mpv_pos) != yt_pos) || yt_pos.is_none() { + debug!( + "StartFileHandler: mpv pos {mpv_pos} and our pos {yt_pos:?} do not align. Reloading.." + ); + + if let Some((_, vid)) = playlist.get_focused_mut() { + vid.set_focused(false, &mut ops); + ops.commit(app) + .await + .context("Failed to commit video unfocusing")?; + + ops = Operations::new("PlaylistHandler: after set-focused"); + } + + let video = playlist + .get_mut(PlaylistIndex::from(mpv_pos)) + .expect("The mpv pos should not be out of bounds"); + + video.set_focused(true, &mut ops); + + playlist.resync_with_mpv(app, mpv)?; + } + + false + } + Event::Seek => { + playlist().await?.save_current_watch_progress(mpv, &mut ops); + + false + } + Event::ClientMessage(a) => { + debug!("Got Client Message event: '{}'", a.join(" ")); + + match a.as_slice() { + &["yt-comments-external"] => { + client_messages::handle_yt_comments_external(app).await?; + } + &["yt-comments-local"] => { + client_messages::handle_yt_comments_local(app, mpv).await?; + } + + &["yt-description-external"] => { + client_messages::handle_yt_description_external(app).await?; + } + &["yt-description-local"] => { + client_messages::handle_yt_description_local(app, mpv).await?; + } + + &["yt-mark-picked"] => { + playlist().await?.mark_current_done( + app, + mpv, + VideoTransition::Picked, + &mut ops, + )?; + + mpv_message(mpv, "Marked the video as picked", Duration::from_secs(3))?; + } + &["yt-mark-watched"] => { + playlist().await?.mark_current_done( + app, + mpv, + VideoTransition::Watched, + &mut ops, + )?; + + mpv_message(mpv, "Marked the video watched", Duration::from_secs(3))?; + } + &["yt-check-new-videos"] => { + playlist().await?.resync_with_mpv(app, mpv)?; + } + other => { + debug!("Unknown message: {}", other.join(" ")); + } + } + + false + } + _ => false, + }; + + ops.commit(app).await?; + + Ok(should_stop_event_handling) +} diff --git a/crates/yt/src/commands/watch/mod.rs b/crates/yt/src/commands/watch/mod.rs new file mode 100644 index 0000000..8bae5c9 --- /dev/null +++ b/crates/yt/src/commands/watch/mod.rs @@ -0,0 +1,14 @@ +use clap::Parser; + +mod implm; + +#[derive(Parser, Debug)] +pub(crate) struct WatchCommand { + /// Print the path to an ipc socket for mpv control to stdout at startup. + #[arg(long)] + provide_ipc_socket: bool, + + /// Don't start an mpv window at all. + #[arg(long)] + headless: bool, +} |