diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-15 06:52:33 +0200 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-15 06:52:33 +0200 |
commit | 95ccd01d26c6664c9917332d4f19c949dfb905cd (patch) | |
tree | e402199d82b440f7a420f0386ec34f989ab8f7c8 /crates | |
parent | fix(crates/yt/storage/migrate): Merge the `videos` and `video_options` tables (diff) | |
download | yt-95ccd01d26c6664c9917332d4f19c949dfb905cd.zip |
refactor(crates/yt): Use the new storage layer
Diffstat (limited to 'crates')
-rw-r--r-- | crates/yt/src/cache/mod.rs | 96 | ||||
-rw-r--r-- | crates/yt/src/cli.rs | 16 | ||||
-rw-r--r-- | crates/yt/src/comments/description.rs | 26 | ||||
-rw-r--r-- | crates/yt/src/comments/mod.rs | 27 | ||||
-rw-r--r-- | crates/yt/src/download/download_options.rs | 4 | ||||
-rw-r--r-- | crates/yt/src/download/mod.rs | 36 | ||||
-rw-r--r-- | crates/yt/src/main.rs | 18 | ||||
-rw-r--r-- | crates/yt/src/select/cmds/add.rs | 24 | ||||
-rw-r--r-- | crates/yt/src/select/cmds/mod.rs | 91 | ||||
-rw-r--r-- | crates/yt/src/select/mod.rs | 53 | ||||
-rw-r--r-- | crates/yt/src/status/mod.rs | 6 | ||||
-rw-r--r-- | crates/yt/src/subscribe/mod.rs | 146 | ||||
-rw-r--r-- | crates/yt/src/update/mod.rs | 43 | ||||
-rw-r--r-- | crates/yt/src/update/updater.rs | 24 | ||||
-rw-r--r-- | crates/yt/src/version/mod.rs | 2 | ||||
-rw-r--r-- | crates/yt/src/watch/mod.rs | 36 | ||||
-rw-r--r-- | crates/yt/src/watch/playlist.rs | 12 | ||||
-rw-r--r-- | crates/yt/src/watch/playlist_handler/mod.rs | 141 |
18 files changed, 410 insertions, 391 deletions
diff --git a/crates/yt/src/cache/mod.rs b/crates/yt/src/cache/mod.rs index 589c6ba..44a7e72 100644 --- a/crates/yt/src/cache/mod.rs +++ b/crates/yt/src/cache/mod.rs @@ -9,96 +9,56 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use anyhow::{Context, Result}; -use log::{debug, info}; -use tokio::fs; +use anyhow::Result; +use log::info; use crate::{ app::App, - storage::video_database::{ - Video, VideoStatus, VideoStatusMarker, downloader::set_video_cache_path, get, + storage::db::{ + insert::{Operations, video::Operation}, + video::{Video, VideoStatus, VideoStatusMarker}, }, }; -async fn invalidate_video(app: &App, video: &Video, hard: bool) -> Result<()> { - info!("Invalidating cache of video: '{}'", video.title); +fn invalidate_video(video: &mut Video, ops: &mut Operations<Operation>) { + info!("Deleting downloaded path of video: '{}'", video.title); - if hard { - if let VideoStatus::Cached { - cache_path: path, .. - } = &video.status - { - info!("Removing cached video at: '{}'", path.display()); - if let Err(err) = fs::remove_file(path).await.map_err(|err| err.kind()) { - match err { - std::io::ErrorKind::NotFound => { - // The path is already gone - debug!( - "Not actually removing path: '{}'. It is already gone.", - path.display() - ); - } - err => Err(std::io::Error::from(err)).with_context(|| { - format!( - "Failed to delete video ('{}') cache path: '{}'.", - video.title, - path.display() - ) - })?, - } - } - } - } + assert_eq!(video.status.as_marker(), VideoStatusMarker::Cached); - set_video_cache_path(app, &video.extractor_hash, None).await?; - - Ok(()) + video.remove_download_path(ops); } -pub(crate) async fn invalidate(app: &App, hard: bool) -> Result<()> { - let all_cached_things = get::videos(app, &[VideoStatusMarker::Cached]).await?; +pub(crate) async fn invalidate(app: &App) -> Result<()> { + let mut all_cached_things = Video::in_states(app, &[VideoStatusMarker::Cached]).await?; info!("Got videos to invalidate: '{}'", all_cached_things.len()); - for video in all_cached_things { - invalidate_video(app, &video, hard).await?; + let mut ops = Operations::new("Cache: Invalidate cache entries"); + + for video in &mut all_cached_things { + invalidate_video(video, &mut ops); } + ops.commit(app).await?; + Ok(()) } - let domain = if all { - VideoStatusMarker::ALL.as_slice() - } else { - &[VideoStatusMarker::Watch, VideoStatusMarker::Cached] - }; /// Remove the cache paths from the db, that no longer exist on the file system. -pub(crate) async fn maintain(app: &App, all: bool) -> Result<()> { - - let cached_videos = get::videos(app, domain).await?; - - let mut found_focused = 0; - for vid in cached_videos { - if let VideoStatus::Cached { - cache_path: path, - is_focused, - } = &vid.status - { - info!("Checking if path ('{}') exists", path.display()); - if !path.exists() { - invalidate_video(app, &vid, false).await?; - } - - if *is_focused { - found_focused += 1; +pub(crate) async fn maintain(app: &App) -> Result<()> { + let mut cached_videos = Video::in_states(app, &[VideoStatusMarker::Cached]).await?; + + let mut ops = Operations::new("DbMaintain: init"); + for vid in &mut cached_videos { + if let VideoStatus::Cached { cache_path, .. } = &vid.status { + if !cache_path.exists() { + invalidate_video(vid, &mut ops); } + } else { + unreachable!("We only asked for cached videos.") } } - - assert!( - found_focused <= 1, - "Only one video can be focused at a time" - ); + ops.commit(app).await?; Ok(()) } diff --git a/crates/yt/src/cli.rs b/crates/yt/src/cli.rs index e30f5f7..8892efd 100644 --- a/crates/yt/src/cli.rs +++ b/crates/yt/src/cli.rs @@ -29,7 +29,7 @@ use crate::{ config::Config, select::selection_file::duration::MaybeDuration, shared::bytes::Bytes, - storage::{subscriptions, video_database::extractor_hash::LazyExtractorHash}, + storage::db::{extractor_hash::LazyExtractorHash, subscription::Subscriptions}, }; #[derive(Parser, Debug)] @@ -443,22 +443,14 @@ impl Default for SelectCommand { #[derive(Subcommand, Clone, Copy, Debug)] pub(crate) enum CacheCommand { /// Invalidate all cache entries - Invalidate { - /// Also delete the cache path - #[arg(short = 'f', long)] - hard: bool, - }, + Invalidate {}, /// Perform basic maintenance operations on the database. /// This helps recovering from invalid db states after a crash (or force exit via <CTRL-C>). /// /// 1. Check every path for validity (removing all invalid cache entries) #[command(verbatim_doc_comment)] - Maintain { - /// Check every video (otherwise only the videos to be watched are checked) - #[arg(short, long)] - all: bool, - }, + Maintain {}, } fn complete_subscription(current: &OsStr) -> Vec<CompletionCandidate> { @@ -481,7 +473,7 @@ fn complete_subscription(current: &OsStr) -> Vec<CompletionCandidate> { return output; }; - let Ok(all) = rt.block_on(subscriptions::get(&app)) else { + let Ok(all) = rt.block_on(Subscriptions::get(&app)) else { return output; }; diff --git a/crates/yt/src/comments/description.rs b/crates/yt/src/comments/description.rs index 9f87441..2065970 100644 --- a/crates/yt/src/comments/description.rs +++ b/crates/yt/src/comments/description.rs @@ -9,12 +9,7 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use crate::{ - App, - comments::output::display_fmt_and_less, - storage::video_database::{Video, get}, - unreachable::Unreachable, -}; +use crate::{App, comments::output::display_fmt_and_less, storage::db::video::Video}; use anyhow::{Result, bail}; use yt_dlp::json_cast; @@ -26,17 +21,16 @@ pub(crate) async fn description(app: &App) -> Result<()> { Ok(()) } -pub async fn get(app: &App) -> Result<String> { - let currently_playing_video: Video = - if let Some(video) = get::currently_focused_video(app).await? { - video - } else { - bail!("Could not find a currently playing video!"); - }; +pub(crate) async fn get(app: &App) -> Result<String> { + let currently_playing_video: Video = if let Some(video) = Video::currently_focused(app).await? { + video + } else { + bail!("Could not find a currently playing video!"); + }; - let info_json = get::video_info_json(¤tly_playing_video)?.unreachable( - "A currently *playing* must be cached. And thus the info.json should be available", - ); + let info_json = ¤tly_playing_video + .get_info_json()? + .expect("A currently *playing* must be cached. And thus the info.json should be available"); Ok(info_json .get("description") diff --git a/crates/yt/src/comments/mod.rs b/crates/yt/src/comments/mod.rs index b856cad..178d8f0 100644 --- a/crates/yt/src/comments/mod.rs +++ b/crates/yt/src/comments/mod.rs @@ -17,29 +17,24 @@ use output::display_fmt_and_less; use regex::Regex; use yt_dlp::json_cast; -use crate::{ - app::App, - storage::video_database::{Video, get}, - unreachable::Unreachable, -}; +use crate::{app::App, storage::db::video::Video}; mod comment; mod display; -pub mod output; +pub(crate) mod output; -pub mod description; -pub use description::*; +pub(crate) mod description; +pub(crate) use description::*; #[allow(clippy::too_many_lines)] -pub async fn get(app: &App) -> Result<Comments> { - let currently_playing_video: Video = - if let Some(video) = get::currently_focused_video(app).await? { - video - } else { - bail!("Could not find a currently playing video!"); - }; +pub(crate) async fn get(app: &App) -> Result<Comments> { + let currently_playing_video: Video = if let Some(video) = Video::currently_focused(app).await? { + video + } else { + bail!("Could not find a currently playing video!"); + }; - let info_json = get::video_info_json(¤tly_playing_video)?.unreachable( + let info_json = ¤tly_playing_video.get_info_json()?.expect( "A currently *playing* video must be cached. And thus the info.json should be available", ); diff --git a/crates/yt/src/download/download_options.rs b/crates/yt/src/download/download_options.rs index 2eac4c3..15fed7e 100644 --- a/crates/yt/src/download/download_options.rs +++ b/crates/yt/src/download/download_options.rs @@ -109,8 +109,8 @@ pub(crate) fn download_opts( .set( "subtitleslangs", Value::Array( - additional_opts - .subtitle_langs + subtitle_langs + .map_or("", String::as_str) .split(',') .map(|val| Value::String(val.to_owned())) .collect::<Vec<_>>(), diff --git a/crates/yt/src/download/mod.rs b/crates/yt/src/download/mod.rs index 2e7315d..991d409 100644 --- a/crates/yt/src/download/mod.rs +++ b/crates/yt/src/download/mod.rs @@ -9,17 +9,15 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::{collections::HashMap, io, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, io, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; use crate::{ app::App, download::download_options::download_opts, - storage::video_database::{ - Video, YtDlpOptions, - downloader::{get_next_uncached_video, set_video_cache_path}, - extractor_hash::ExtractorHash, - get::get_video_yt_dlp_opts, - notify::wait_for_cache_reduction, + shared::bytes::Bytes, + storage::{ + db::{extractor_hash::ExtractorHash, insert::Operations, video::Video}, + notify::{wait_for_cache_reduction, wait_for_db_write}, }, unreachable::Unreachable, }; @@ -28,12 +26,12 @@ use anyhow::{Context, Result, bail}; use bytes::Bytes; use futures::{FutureExt, future::BoxFuture}; use log::{debug, error, info, warn}; -use tokio::{fs, task::JoinHandle, time}; -use yt_dlp::{json_cast, json_get}; +use tokio::{fs, select, task::JoinHandle, time}; +use yt_dlp::{YoutubeDL, json_cast, json_get, options::YoutubeDLOptions}; #[allow(clippy::module_name_repetitions)] -pub mod download_options; -pub mod progress_hook; +pub(crate) mod download_options; +pub(crate) mod progress_hook; #[derive(Debug)] #[allow(clippy::module_name_repetitions)] @@ -297,11 +295,17 @@ impl Downloader { if let Some(value) = self.video_size_cache.get(&video.extractor_hash) { Ok(*value) } else { - // the subtitle file size should be negligible - let add_opts = YtDlpOptions { - subtitle_langs: String::new(), - }; - let yt_dlp = download_opts(app, &add_opts)?; + let yt_dlp = { + YoutubeDLOptions::new() + .set("prefer_free_formats", true) + .set("format", "bestvideo[height<=?1080]+bestaudio/best") + .set("fragment_retries", 10) + .set("retries", 10) + .set("getcomments", false) + .set("ignoreerrors", false) + .build() + .context("Failed to instanciate get approx size yt_dlp") + }?; let result = yt_dlp .extract_info(&video.url, false, true) diff --git a/crates/yt/src/main.rs b/crates/yt/src/main.rs index 52b8e07..46ab037 100644 --- a/crates/yt/src/main.rs +++ b/crates/yt/src/main.rs @@ -23,7 +23,6 @@ use cli::{CacheCommand, SelectCommand, SubscriptionCommand, VideosCommand}; use config::Config; use log::{error, info}; use select::cmds::handle_select_cmd; -use storage::video_database::get::video_by_hash; use tokio::{ fs::File, io::{BufReader, stdin}, @@ -33,7 +32,7 @@ use tokio::{ use crate::{ cli::Command, shared::bytes::Bytes, - storage::subscription, + storage::db::{insert::Operations, subscription::Subscriptions}, }; pub(crate) mod ansi_escape_codes; @@ -110,9 +109,9 @@ async fn main() -> Result<()> { max_cache_size.unwrap_or(app.config.download.max_cache_size.as_u64()); info!("Max cache size: '{}'", Bytes::new(max_cache_size)); - maintain(&app, false).await?; + maintain(&app).await?; if force { - invalidate(&app, true).await?; + invalidate(&app).await?; } download::Downloader::new() @@ -173,7 +172,7 @@ async fn main() -> Result<()> { current_progress, total_number, } => { - let all_subs = subscriptions::get(&app).await?; + let all_subs = Subscriptions::get(&app).await?; for sub in &subscriptions { if !all_subs.0.contains_key(sub) { @@ -243,6 +242,7 @@ async fn main() -> Result<()> { subscribe::subscribe(&app, name, url) .await .context("Failed to add a subscription")?; + ops.commit(&app).await?; } SubscriptionCommand::Remove { name } => { subscribe::unsubscribe(&app, name) @@ -250,14 +250,14 @@ async fn main() -> Result<()> { .context("Failed to remove a subscription")?; } SubscriptionCommand::List {} => { - let all_subs = subscriptions::get(&app).await?; + let all_subs = Subscriptions::get(&app).await?; for (key, val) in all_subs.0 { println!("{}: '{}'", key, val.url); } } SubscriptionCommand::Export {} => { - let all_subs = subscriptions::get(&app).await?; + let all_subs = Subscriptions::get(&app).await?; for val in all_subs.0.values() { println!("{}", val.url); } @@ -280,8 +280,8 @@ async fn main() -> Result<()> { Command::Config {} => status::config(&app)?, Command::Database { command } => match command { - CacheCommand::Invalidate { hard } => invalidate(&app, hard).await?, - CacheCommand::Maintain { all } => maintain(&app, all).await?, + CacheCommand::Invalidate {} => invalidate(&app).await?, + CacheCommand::Maintain {} => maintain(&app).await?, }, Command::Comments {} => { diff --git a/crates/yt/src/select/cmds/add.rs b/crates/yt/src/select/cmds/add.rs index f793117..43c9f75 100644 --- a/crates/yt/src/select/cmds/add.rs +++ b/crates/yt/src/select/cmds/add.rs @@ -11,9 +11,7 @@ use crate::{ app::App, download::download_options::download_opts, - storage::video_database::{ - self, extractor_hash::ExtractorHash, get::get_all_hashes, set::add_video, - }, + storage::db::{extractor_hash::ExtractorHash, insert::Operations}, update::video_entry_to_video, }; @@ -45,15 +43,15 @@ pub(crate) async fn add( async fn add_entry(app: &App, entry: InfoJson) -> Result<()> { // We have to re-fetch all hashes every time, because a user could try to add the same // URL twice (for whatever reason.) - let hashes = get_all_hashes(app) + let hashes = ExtractorHash::get_all(app) .await .context("Failed to fetch all video hashes")?; - let extractor_hash = blake3::hash(json_get!(entry, "id", as_str).as_bytes()); + let extractor_hash = ExtractorHash::from_info_json(&entry); if hashes.contains(&extractor_hash) { error!( "Video '{}'{} is already in the database. Skipped adding it", - ExtractorHash::from_hash(extractor_hash) + extractor_hash .into_short_hash(app) .await .with_context(|| format!( @@ -70,20 +68,16 @@ pub(crate) async fn add( return Ok(()); } - let video = video_entry_to_video(&entry, None)?; - add_video(app, video.clone()).await?; + let mut ops = Operations::new("SelectAdd: Video entry to video"); + let video = video_entry_to_video(&entry, None)?.add(&mut ops)?; + ops.commit(app).await?; - println!("{}", &video.to_line_display(app).await?); + println!("{}", &video.to_line_display(app, None).await?); Ok(()) } - let yt_dlp = download_opts( - app, - &video_database::YtDlpOptions { - subtitle_langs: String::new(), - }, - )?; + let yt_dlp = download_opts(app, None)?; let entry = yt_dlp .extract_info(&url, false, true) diff --git a/crates/yt/src/select/cmds/mod.rs b/crates/yt/src/select/cmds/mod.rs index 9da795a..332010a 100644 --- a/crates/yt/src/select/cmds/mod.rs +++ b/crates/yt/src/select/cmds/mod.rs @@ -12,10 +12,9 @@ use crate::{ app::App, cli::{SelectCommand, SharedSelectionCommandArgs}, - storage::video_database::{ - Priority, VideoOptions, VideoStatus, - get::video_by_hash, - set::{set_video_options, video_status}, + storage::db::{ + insert::{Operations, video::Operation}, + video::{Priority, Video, VideoStatus}, }, }; @@ -23,49 +22,33 @@ use anyhow::{Context, Result, bail}; mod add; -pub async fn handle_select_cmd( +pub(crate) async fn handle_select_cmd( app: &App, cmd: SelectCommand, line_number: Option<i64>, + ops: &mut Operations<Operation>, ) -> Result<()> { - match cmd { - SelectCommand::Pick { shared } => { - handle_status_change(app, shared, line_number, VideoStatus::Pick).await?; - } - SelectCommand::Drop { shared } => { - handle_status_change(app, shared, line_number, VideoStatus::Drop).await?; - } - SelectCommand::Watched { shared } => { - handle_status_change(app, shared, line_number, VideoStatus::Watched).await?; - } - SelectCommand::Add { urls, start, stop } => { - Box::pin(add::add(app, urls, start, stop)).await?; - } + let status = match cmd { + SelectCommand::Pick { shared } => Some((VideoStatus::Pick, shared)), + SelectCommand::Drop { shared } => Some((VideoStatus::Drop, shared)), + SelectCommand::Watched { shared } => Some((VideoStatus::Watched, shared)), SelectCommand::Watch { shared } => { - let hash = shared.hash.clone().realize(app).await?; - - let video = video_by_hash(app, &hash).await?; - if let VideoStatus::Cached { cache_path, is_focused, - } = video.status + } = &video.status { - handle_status_change( - app, - shared, - line_number, + Some(( VideoStatus::Cached { - cache_path, - is_focused, + cache_path: cache_path.to_owned(), + is_focused: *is_focused, }, - ) - .await?; + shared, + )) } else { - handle_status_change(app, shared, line_number, VideoStatus::Watch).await?; + Some((VideoStatus::Watch, shared)) } } - SelectCommand::Url { shared } => { let Some(url) = shared.url else { bail!("You need to provide a url to `select url ..`") @@ -75,31 +58,51 @@ pub async fn handle_select_cmd( firefox.args(["-P", "timesinks.youtube"]); firefox.arg(url.as_str()); let _handle = firefox.spawn().context("Failed to run firefox")?; + None } - SelectCommand::File { .. } | SelectCommand::Split { .. } => { - unreachable!("This should have been filtered out") + SelectCommand::File { .. } | SelectCommand::Split { .. } | SelectCommand::Add { .. } => { + unreachable!("These should have been filtered out") } + }; + + if let Some((status, shared)) = status { + handle_status_change( + app, + video, + shared, + line_number, + status, + line_number.is_none(), + ops, + ) + .await?; } + Ok(()) } async fn handle_status_change( app: &App, + video: &mut Video, shared: SharedSelectionCommandArgs, line_number: Option<i64>, new_status: VideoStatus, + ops: &mut Operations<Operation>, ) -> Result<()> { - let hash = shared.hash.realize(app).await?; - let video_options = VideoOptions::new( - shared - .subtitle_langs - .unwrap_or(app.config.select.subtitle_langs.clone()), - shared.speed.unwrap_or(app.config.select.playback_speed), - ); let priority = compute_priority(line_number, shared.priority); - video_status(app, &hash, new_status, priority).await?; - set_video_options(app, &hash, &video_options).await?; + video.set_status(new_status, ops); + if let Some(priority) = priority { + video.set_priority(priority, ops); + } + + if let Some(subtitle_langs) = shared.subtitle_langs { + video.set_subtitle_langs(subtitle_langs, ops); + } + if let Some(playback_speed) = shared.playback_speed { + video.set_playback_speed(playback_speed, ops); + } + Ok(()) } diff --git a/crates/yt/src/select/mod.rs b/crates/yt/src/select/mod.rs index 543013e..4e526b6 100644 --- a/crates/yt/src/select/mod.rs +++ b/crates/yt/src/select/mod.rs @@ -13,18 +13,21 @@ use std::{ collections::HashMap, env::{self}, fs::{self, File, OpenOptions}, - io::{BufRead, BufReader, BufWriter, Read, Seek, Write}, + io::{BufRead, BufReader, BufWriter, Read, Write}, iter, + os::fd::{AsFd, AsRawFd}, path::Path, string::String, }; use crate::{ app::App, - cli::{CliArgs, SelectSplitSortKey, SelectSplitSortMode}, + cli::{CliArgs, SelectCommand, SelectSplitSortKey, SelectSplitSortMode}, constants::HELP_STR, - storage::video_database::{Video, VideoStatusMarker, get}, - unreachable::Unreachable, + storage::db::{ + insert::Operations, + video::{Video, VideoStatusMarker}, + }, }; use anyhow::{Context, Result, bail}; @@ -188,9 +191,9 @@ pub(crate) async fn select_file(app: &App, done: bool, use_last_selection: bool) async fn get_videos(app: &App, include_done: bool) -> Result<Vec<Video>> { if include_done { - get::videos(app, VideoStatusMarker::ALL).await + Video::in_states(app, VideoStatusMarker::ALL).await } else { - get::videos( + Video::in_states( app, &[ VideoStatusMarker::Pick, @@ -234,10 +237,11 @@ async fn write_videos_to_file(app: &App, file: &File, videos: &[Video]) -> Resul } async fn process_file(app: &App, file: &File) -> Result<i64> { - let reader = BufReader::new(file); - let mut line_number = 0; + let mut ops = Operations::new("Select: process file"); + + let reader = BufReader::new(file); for line in reader.lines() { let line = line.context("Failed to read a line")?; @@ -265,18 +269,35 @@ async fn process_file(app: &App, file: &File) -> Result<i64> { unreachable!("This is checked in the `filter_line` function") }; - Box::pin(handle_select_cmd( - app, - cmd.unreachable( - "This value should always be some \ + match cmd.expect( + "This value should always be some \ here, as it would otherwise thrown an error above.", - ), - Some(line_number), - )) - .await?; + ) { + SelectCommand::File { .. } | SelectCommand::Split { .. } => { + bail!("You cannot use `select file` or `select split` recursively.") + } + SelectCommand::Add { urls, start, stop } => { + Box::pin(cmds::add::add(app, urls, start, stop)).await?; + } + other => { + let shared = other + .clone() + .into_shared() + .expect("The ones without shared should have been filtered out."); + + let hash = shared.hash.realize(app).await?; + let mut video = hash + .get_with_app(app) + .await + .expect("The hash was already realized, it should therefore exist"); + + handle_select_cmd(app, other, &mut video, Some(line_number), &mut ops).await?; + } + } } } + ops.commit(app).await?; Ok(-line_number) } diff --git a/crates/yt/src/status/mod.rs b/crates/yt/src/status/mod.rs index f629324..f9528e2 100644 --- a/crates/yt/src/status/mod.rs +++ b/crates/yt/src/status/mod.rs @@ -40,8 +40,8 @@ macro_rules! get { }; } -pub(crate) async fn show(app: &App) -> Result<()> { - let all_videos = get::videos(app, VideoStatusMarker::ALL).await?; +pub(crate) async fn show(app: &App, format: Option<String>) -> Result<()> { + let all_videos = Video::in_states(app, VideoStatusMarker::ALL).await?; // lengths let picked_videos_len = get!(all_videos, Pick); @@ -54,7 +54,7 @@ pub(crate) async fn show(app: &App) -> Result<()> { let drop_videos_len = get!(all_videos, Drop); let dropped_videos_len = get!(all_videos, Dropped); - let subscriptions = subscriptions::get(app).await?; + let subscriptions = Subscriptions::get(app).await?; let subscriptions_len = subscriptions.0.len(); let watchtime_status = { diff --git a/crates/yt/src/subscribe/mod.rs b/crates/yt/src/subscribe/mod.rs index 89ea9f7..93c1390 100644 --- a/crates/yt/src/subscribe/mod.rs +++ b/crates/yt/src/subscribe/mod.rs @@ -19,21 +19,24 @@ use yt_dlp::{json_cast, json_get, options::YoutubeDLOptions}; use crate::{ app::App, - storage::subscriptions::{ - Subscription, add_subscription, check_url, get, remove_all, remove_subscription, + storage::db::{ + insert::{Operations, subscription::Operation}, + subscription::{Subscription, Subscriptions, check_url}, }, - unreachable::Unreachable, }; pub(crate) async fn unsubscribe(app: &App, name: String) -> Result<()> { - let present_subscriptions = get(app).await?; + let mut present_subscriptions = Subscriptions::get(app).await?; - if let Some(subscription) = present_subscriptions.0.get(&name) { - remove_subscription(app, subscription).await?; + let mut ops = Operations::new("Subscribe: unsubscribe"); + if let Some(subscription) = present_subscriptions.0.remove(&name) { + subscription.remove(&mut ops); } else { bail!("Couldn't find subscription: '{}'", &name); } + ops.commit(app).await?; + Ok(()) } @@ -42,9 +45,14 @@ pub(crate) async fn import<W: AsyncBufRead + AsyncBufReadExt + Unpin>( reader: W, force: bool, ) -> Result<()> { + let mut ops = Operations::new("SubscribeImport: init"); + + let all = Subscriptions::get(app).await?; if force { - remove_all(app).await?; + all.remove(&mut ops); } + ops.commit(app).await?; + let mut ops = Operations::new("SubscribeImport: after all subs remove"); let mut lines = reader.lines(); while let Some(line) = lines.next_line().await? { @@ -62,6 +70,7 @@ pub(crate) async fn import<W: AsyncBufRead + AsyncBufReadExt + Unpin>( ), } } + ops.commit(app).await?; Ok(()) } @@ -70,6 +79,7 @@ pub(crate) async fn subscribe( app: &App, name: Option<String>, url: Url, + ops: &mut Operations<Operation>, ) -> Result<()> { if !(url.as_str().ends_with("videos") || url.as_str().ends_with("streams") @@ -80,79 +90,71 @@ pub(crate) async fn subscribe( && url.as_str().contains("youtube.com") { warn!( - "Your youtbe url does not seem like it actually tracks a channels playlist (videos, streams, shorts). Adding subscriptions for each of them..." + "Your youtube url does not seem like it actually tracks a channels playlist \ + (videos, streams, shorts). Adding subscriptions for each of them..." ); let url = Url::parse(&(url.as_str().to_owned() + "/")) - .unreachable("This was an url, it should stay one"); - - if let Some(name) = name { - let out: Result<()> = async move { - actual_subscribe( - app, - Some(name.clone() + " {Videos}"), - url.join("videos/") - .unreachable("The url should allow being joined onto"), - ) - .await - .with_context(|| { - format!("Failed to subscribe to '{}'", name.clone() + " {Videos}") - })?; - - actual_subscribe( - app, - Some(name.clone() + " {Streams}"), - url.join("streams/").unreachable("See above."), - ) - .await - .with_context(|| { - format!("Failed to subscribe to '{}'", name.clone() + " {Streams}") - })?; - - actual_subscribe( - app, - Some(name.clone() + " {Shorts}"), - url.join("shorts/").unreachable("See above."), - ) - .await - .with_context(|| format!("Failed to subscribe to '{}'", name + " {Shorts}"))?; - - Ok(()) - } - .boxed() - .await; - - out?; + .expect("This was an url, it should stay one"); + + let (videos, streams, shorts) = if let Some(name) = name { + ( + Some(name.clone() + " {Videos}"), + Some(name.clone() + " {Streams}"), + Some(name.clone() + " {Shorts}"), + ) } else { - let _ = actual_subscribe(app, None, url.join("videos/").unreachable("See above.")) - .await - .map_err(|err| { - error!("Failed to subscribe to the '{}' variant: {err}", "{Videos}"); - }); - - let _ = actual_subscribe(app, None, url.join("streams/").unreachable("See above.")) - .await - .map_err(|err| { - error!( - "Failed to subscribe to the '{}' variant: {err}", - "{Streams}" - ); - }); - - let _ = actual_subscribe(app, None, url.join("shorts/").unreachable("See above.")) - .await - .map_err(|err| { - error!("Failed to subscribe to the '{}' variant: {err}", "{Shorts}"); - }); - } + (None, None, None) + }; + + let _ = actual_subscribe( + app, + videos, + url.join("videos/").expect("See above."), + ops, + ) + .await + .map_err(|err| { + error!("Failed to subscribe to the '{}' variant: {err}", "{Videos}"); + }); + + let _ = actual_subscribe( + app, + streams, + url.join("streams/").expect("See above."), + ops, + ) + .await + .map_err(|err| { + error!( + "Failed to subscribe to the '{}' variant: {err}", + "{Streams}" + ); + }); + + let _ = actual_subscribe( + app, + shorts, + url.join("shorts/").expect("See above."), + ops, + ) + .await + .map_err(|err| { + error!("Failed to subscribe to the '{}' variant: {err}", "{Shorts}"); + }); } else { - actual_subscribe(app, name, url).await?; + actual_subscribe(app, name, url, ops).await?; } Ok(()) } -async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<()> { +async fn actual_subscribe( + app: &App, + name: Option<String>, + url: Url, + ops: &mut Operations<Operation>, +) -> Result<()> { if !check_url(url.clone()).await? { bail!("The url ('{}') does not represent a playlist!", &url) } @@ -176,7 +178,7 @@ async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<( } }; - let present_subscriptions = get(app).await?; + let present_subscriptions = Subscriptions::get(app).await?; if let Some(subs) = present_subscriptions.0.get(&name) { bail!( @@ -190,7 +192,7 @@ async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<( let sub = Subscription { name, url }; - add_subscription(app, &sub).await?; + sub.add(ops); Ok(()) } diff --git a/crates/yt/src/update/mod.rs b/crates/yt/src/update/mod.rs index fbe23da..11e70de 100644 --- a/crates/yt/src/update/mod.rs +++ b/crates/yt/src/update/mod.rs @@ -20,12 +20,11 @@ use yt_dlp::{info_json::InfoJson, json_cast, json_get}; use crate::{ app::App, select::selection_file::duration::MaybeDuration, - storage::{ - subscriptions::{self, Subscription}, - video_database::{ - Priority, TimeStamp, Video, VideoStatus, extractor_hash::ExtractorHash, - get::get_all_hashes, set::add_video, - }, + storage::db::{ + extractor_hash::ExtractorHash, + insert::Operations, + subscription::{Subscription, Subscriptions}, + video::{Priority, TimeStamp, Video, VideoStatus}, }, }; @@ -39,7 +38,7 @@ pub(crate) async fn update( total_number: Option<usize>, current_progress: Option<usize>, ) -> Result<()> { - let subscriptions = subscriptions::get(app).await?; + let subscriptions = Subscriptions::get(app).await?; let subs: Vec<Subscription> = if subscription_names_to_update.is_empty() { subscriptions.0.into_values().collect() @@ -53,12 +52,10 @@ pub(crate) async fn update( // We can get away with not having to re-fetch the hashes every time, as the returned video // should not contain duplicates. - let hashes = get_all_hashes(app).await?; + let hashes = ExtractorHash::get_all(app).await?; - let updater = Updater::new(max_backlog, hashes); - updater - .update(app, subs, total_number, current_progress) - .await?; + let updater = Updater::new(max_backlog, app.config.update.pool_size, hashes); + updater.update(app, subs).await?; Ok(()) } @@ -147,7 +144,7 @@ pub(crate) fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) smug_url }; - let extractor_hash = blake3::hash(json_get!(entry, "id", as_str).as_bytes()); + let extractor_hash = ExtractorHash::from_info_json(entry); let subscription_name = if let Some(sub) = sub { Some(sub.name.clone()) @@ -172,7 +169,7 @@ pub(crate) fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) duration: MaybeDuration::from_maybe_secs_f64( entry.get("duration").map(|val| json_cast!(val, as_f64)), ), - extractor_hash: ExtractorHash::from_hash(extractor_hash), + extractor_hash, last_status_change: TimeStamp::from_now(), parent_subscription_name: subscription_name, priority: Priority::default(), @@ -182,17 +179,29 @@ pub(crate) fn video_entry_to_video(entry: &InfoJson, sub: Option<&Subscription>) title: json_get!(entry, "title", as_str).to_owned(), url, watch_progress: Duration::default(), + playback_speed: None, + subtitle_langs: None, }; Ok(video) } async fn process_subscription(app: &App, sub: Subscription, entry: InfoJson) -> Result<()> { + let mut ops = Operations::new("Update: process subscription"); let video = video_entry_to_video(&entry, Some(&sub)) .context("Failed to parse search entry as Video")?; - add_video(app, video.clone()) - .await - .with_context(|| format!("Failed to add video to database: '{}'", video.title))?; + let title = video.title.clone(); + let url = video.url.clone(); + let video = video.add(&mut ops).with_context(|| { + format!("Failed to add video to database: '{title}' (with url: '{url}')") + })?; + + ops.commit(app).await.with_context(|| { + format!( + "Failed to add video to database: '{}' (with url: '{}')", + video.title, video.url + ) + })?; println!( "{}", &video diff --git a/crates/yt/src/update/updater.rs b/crates/yt/src/update/updater.rs index 54dea97..3e3ceeb 100644 --- a/crates/yt/src/update/updater.rs +++ b/crates/yt/src/update/updater.rs @@ -135,12 +135,13 @@ impl Updater { .iter() .take(max_backlog) .filter_map(|entry| -> Option<(Subscription, InfoJson)> { - let id = json_get!(entry, "id", as_str); - let extractor_hash = blake3::hash(id.as_bytes()); + let extractor_hash = + ExtractorHash::from_info_json(json_cast!(entry, as_object)); if hashes.contains(&extractor_hash) { debug!( - "Skipping entry, as it is already present: '{extractor_hash}'", + "Skipping entry, as it is \ + already present: '{extractor_hash}'", ); None } else { @@ -168,12 +169,17 @@ impl Updater { Err(err) => { match err { process_ie_result::Error::Python(PythonError(err)) => { - if err.contains( "Join this channel to get access to members-only content ",) { + if err.contains( + "Join this channel to get access \ + to members-only content ", + ) { // Hide this error } else { // Show the error, but don't fail. let error = err - .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ") + .strip_prefix( + "DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ", + ) .unwrap_or(&err); error!("While fetching {:#?}: {error}", sub.name); } @@ -181,9 +187,13 @@ impl Updater { None } process_ie_result::Error::InfoJsonPrepare(error) => { - error!("While fetching {:#?}: Failed to prepare info json: {error}", sub.name); + error!( + "While fetching {:#?}: Failed to prepare \ + info json: {error}", + sub.name + ); None - }, + } } } })) diff --git a/crates/yt/src/version/mod.rs b/crates/yt/src/version/mod.rs index a3aa7ff..b12eadd 100644 --- a/crates/yt/src/version/mod.rs +++ b/crates/yt/src/version/mod.rs @@ -32,7 +32,7 @@ pub(crate) async fn show(config: &Config) -> Result<()> { let (yt_dlp, python) = { let yt_dlp = YoutubeDLOptions::new().build()?; - yt_dlp.version() + yt_dlp.version()? }; let python = python.replace('\n', " "); diff --git a/crates/yt/src/watch/mod.rs b/crates/yt/src/watch/mod.rs index b4c68a0..1936d48 100644 --- a/crates/yt/src/watch/mod.rs +++ b/crates/yt/src/watch/mod.rs @@ -108,19 +108,34 @@ pub(crate) async fn watch(app: Arc<App>) -> Result<()> { let progress_handle = task::spawn(async move { loop { if local_should_break.load(Ordering::Relaxed) { + trace!("WatchProgressThread: Stopping, as we received exit signal."); break; } - if get::currently_focused_video(&local_app).await?.is_some() { - save_watch_progress(&local_app, &local_mpv).await?; + let mut playlist = Playlist::create(&local_app).await?; + + if let Some(index) = playlist.current_index() { + trace!("WatchProgressThread: Saving watch progress for current video"); + + let mut ops = Operations::new("WatchProgressThread: save watch progress thread"); + playlist.save_watch_progress(&local_mpv, index, &mut ops); + ops.commit(&local_app).await?; + } else { + trace!( + "WatchProgressThread: Tried to save current watch progress, but no video active." + ); } - sleep(Duration::from_secs(30)).await; + time::sleep(local_app.config.watch.watch_progress_save_intervall).await; } Ok::<(), anyhow::Error>(()) }); + // Set up the initial playlist. + let playlist = Playlist::create(&app).await?; + playlist.resync_with_mpv(&app, &mpv)?; + let mut have_warned = (false, 0); 'watchloop: loop { 'waitloop: while let Ok(value) = playlist_handler::status(&app).await { @@ -132,21 +147,24 @@ pub(crate) async fn watch(app: Arc<App>) -> Result<()> { // try again next time. if have_warned.0 { if have_warned.1 != marked_watch { - warn!("Now {} videos are marked as to be watched.", marked_watch); + warn!("Now {marked_watch} videos are marked as to be watched."); have_warned.1 = marked_watch; } } else { warn!( - "There is nothing to watch yet, but still {} videos marked as to be watched. \ - Will idle, until they become available", - marked_watch + "There is nothing to watch yet, but still {marked_watch} videos marked as to be watched. \ + Will idle, until they become available" ); have_warned = (true, marked_watch); } wait_for_db_write(&app).await?; + + // Add the new videos, if they are there. + let playlist = Playlist::create(&app).await?; + playlist.resync_with_mpv(&app, &mpv)?; } Status::Available { newly_available } => { - debug!("Check and found {newly_available} videos!"); + debug!("Checked for currently available videos and found {newly_available}!"); have_warned.0 = false; // Something just became available! @@ -166,7 +184,7 @@ pub(crate) async fn watch(app: Arc<App>) -> Result<()> { break; } } - Err(e) => debug!("Mpv Event errored: {}", e), + Err(e) => debug!("Mpv Event errored: {e}"), } } } diff --git a/crates/yt/src/watch/playlist.rs b/crates/yt/src/watch/playlist.rs index f456bfd..0bd8f2e 100644 --- a/crates/yt/src/watch/playlist.rs +++ b/crates/yt/src/watch/playlist.rs @@ -8,12 +8,18 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::path::Path; +use std::{fmt::Write, path::Path}; use crate::{ ansi_escape_codes::{cursor_up, erase_in_display_from_cursor}, app::App, - storage::video_database::{Video, VideoStatus, get, notify::wait_for_db_write}, + storage::{ + db::{ + playlist::Playlist, + video::{Video, VideoStatus}, + }, + notify::wait_for_db_write, + }, }; use anyhow::Result; @@ -37,7 +43,7 @@ fn cache_values(video: &Video) -> (&Path, bool) { pub(crate) async fn playlist(app: &App, watch: bool) -> Result<()> { let mut previous_output_length = 0; loop { - let playlist = get::playlist(app).await?.to_videos(); + let playlist = Playlist::create(app).await?.videos; let output = playlist .into_iter() diff --git a/crates/yt/src/watch/playlist_handler/mod.rs b/crates/yt/src/watch/playlist_handler/mod.rs index 8cf50c9..71f277c 100644 --- a/crates/yt/src/watch/playlist_handler/mod.rs +++ b/crates/yt/src/watch/playlist_handler/mod.rs @@ -8,15 +8,14 @@ // You should have received a copy of the License along with this program. // If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. -use std::{cmp::Ordering, time::Duration}; +use std::time::Duration; use crate::{ app::App, - storage::video_database::{ - VideoStatus, VideoStatusMarker, - extractor_hash::ExtractorHash, - get::{self, Playlist, PlaylistIndex}, - set, + storage::db::{ + insert::{Operations, playlist::VideoTransition}, + playlist::{Playlist, PlaylistIndex}, + video::{Video, VideoStatusMarker}, }, }; @@ -184,11 +183,13 @@ pub(super) async fn reload_mpv_playlist( } /// Return the status of the playback queue -pub async fn status(app: &App) -> Result<Status> { - let playlist = get::playlist(app).await?; +pub(crate) async fn status(app: &App) -> Result<Status> { + let playlist = Playlist::create(app).await?; let playlist_len = playlist.len(); - let marked_watch_num = get::videos(app, &[VideoStatusMarker::Watch]).await?.len(); + let marked_watch_num = Video::in_states(app, &[VideoStatusMarker::Watch]) + .await? + .len(); if playlist_len == 0 && marked_watch_num == 0 { Ok(Status::NoMoreAvailable) @@ -214,12 +215,24 @@ pub async fn status(app: &App) -> Result<Status> { /// Only if internal assertions fail. #[allow(clippy::too_many_lines)] pub(crate) async fn handle_mpv_event(app: &App, mpv: &Mpv, event: &Event<'_>) -> Result<bool> { - match event { + let mut ops = Operations::new("PlaylistHandler: handle event"); + + // Construct the playlist lazily. + // This avoids unneeded db lookups. + // (We use the moved `call_once` as guard for this) + let call_once = String::new(); + let playlist = move || { + drop(call_once); + Playlist::create(app) + }; + + let should_stop_event_handling = match event { Event::EndFile(r) => match r.reason { EndFileReason::Eof => { info!("Mpv reached the end of the current video. Marking it watched."); - mark_video_watched(app, mpv).await?; - reload_mpv_playlist(app, mpv, None, None).await?; + playlist().await?.resync_with_mpv(app, mpv)?; + + false } EndFileReason::Stop => { // This reason is incredibly ambiguous. It _both_ means actually pausing a @@ -227,57 +240,61 @@ pub(crate) async fn handle_mpv_event(app: &App, mpv: &Mpv, event: &Event<'_>) -> // Oh, and it's also called, when a video is removed from the playlist (at // least via "playlist-remove current") info!("Paused video (or went to next playlist entry); Doing nothing"); + + false } EndFileReason::Quit => { info!("Mpv quit. Exiting playback"); - save_watch_progress(app, mpv).await?; + playlist().await?.save_current_watch_progress(mpv, &mut ops); - return Ok(true); + true } EndFileReason::Error => { unreachable!("This should have been raised as a separate error") } EndFileReason::Redirect => { // TODO: We probably need to handle this somehow <2025-02-17> + false } }, Event::StartFile(_) => { + let mut playlist = playlist().await?; + let mpv_pos = usize::try_from(mpv.get_property::<i64>("playlist-pos")?) .expect("The value is strictly positive"); - let next_video = { - let yt_pos = get::current_playlist_index(app).await?.map(usize::from); + let yt_pos = playlist.current_index().map(usize::from); - if (Some(mpv_pos) != yt_pos) || yt_pos.is_none() { - let playlist = get::playlist(app).await?; - let video = playlist - .get(PlaylistIndex::from(mpv_pos)) - .expect("The mpv pos should not be out of bounds"); + if (Some(mpv_pos) != yt_pos) || yt_pos.is_none() { + debug!( + "StartFileHandler: mpv pos {mpv_pos} and our pos {yt_pos:?} do not align. Reloading.." + ); - set::focused( - app, - &video.extractor_hash, - get::currently_focused_video(app) - .await? - .as_ref() - .map(|v| &v.extractor_hash), - ) - .await?; - - video.extractor_hash - } else { - get::currently_focused_video(app) - .await? - .expect("We have a focused video") - .extractor_hash + if let Some((_, vid)) = playlist.get_focused_mut() { + vid.set_focused(false, &mut ops); + ops.commit(app) + .await + .context("Failed to commit video unfocusing")?; + + ops = Operations::new("PlaylistHandler: after set-focused"); } - }; - apply_video_options(app, mpv, &next_video).await?; + let video = playlist + .get_mut(PlaylistIndex::from(mpv_pos)) + .expect("The mpv pos should not be out of bounds"); + + video.set_focused(true, &mut ops); + + playlist.resync_with_mpv(app, mpv)?; + } + + false } Event::Seek => { - save_watch_progress(app, mpv).await?; + playlist().await?.save_current_watch_progress(mpv, &mut ops); + + false } Event::ClientMessage(a) => { debug!("Got Client Message event: '{}'", a.join(" ")); @@ -298,45 +315,39 @@ pub(crate) async fn handle_mpv_event(app: &App, mpv: &Mpv, event: &Event<'_>) -> } &["yt-mark-picked"] => { - let current_video = get::currently_focused_video(app) - .await? - .expect("This should exist at this point"); - let current_index = get::current_playlist_index(app) - .await? - .expect("This should exist, as we can mark this video picked"); - - save_watch_progress(app, mpv).await?; - - set::video_status( + playlist().await?.mark_current_done( app, - ¤t_video.extractor_hash, - VideoStatus::Pick, - Some(current_video.priority), - ) - .await?; + mpv, + VideoTransition::Picked, + &mut ops, + )?; - reload_mpv_playlist(app, mpv, None, Some(current_index)).await?; mpv_message(mpv, "Marked the video as picked", Duration::from_secs(3))?; } &["yt-mark-watched"] => { - let current_index = get::current_playlist_index(app) - .await? - .expect("This should exist, as we can mark this video picked"); - mark_video_watched(app, mpv).await?; + playlist().await?.mark_current_done( + app, + mpv, + VideoTransition::Watched, + &mut ops, + )?; - reload_mpv_playlist(app, mpv, None, Some(current_index)).await?; mpv_message(mpv, "Marked the video watched", Duration::from_secs(3))?; } &["yt-check-new-videos"] => { - reload_mpv_playlist(app, mpv, None, None).await?; + playlist().await?.resync_with_mpv(app, mpv)?; } other => { debug!("Unknown message: {}", other.join(" ")); } } + + false } - _ => {} - } + _ => false, + }; + + ops.commit(app).await?; - Ok(false) + Ok(should_stop_event_handling) } |