authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-22 11:40:22 +0100
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-22 11:40:22 +0100
commit4a008ef549f595af18f7cf2d0e9940d2627ae8c4 (patch)
parentfeat(yt/storage/migrate): Add version two (diff)
feat(yt/watch/playlist_handler): Rewrite to use new db layout
The previous iteration actually worked with the mpv playlist, which
proved very bug prone as mpv does not store our costume video, but the
loaded cache path. Besides this, the old handler stored everything in
memory, resulting in a full playback reset on restart.

This new version uses the db to store the playlist and the playback
progress and simply synchronizes this playlist with mpv.
2 files changed, 417 insertions, 0 deletions
diff --git a/yt/src/watch/playlist_handler/client_messages/mod.rs b/yt/src/watch/playlist_handler/client_messages/mod.rs
new file mode 100644
index 0000000..4c6d9d5
--- /dev/null
+++ b/yt/src/watch/playlist_handler/client_messages/mod.rs
@@ -0,0 +1,88 @@
+use std::{env, time::Duration};
+use crate::{app::App, comments};
+use anyhow::{Context, Result, bail};
+use libmpv2::Mpv;
+use tokio::process::Command;
+use super::mpv_message;
+async fn run_self_in_external_command(app: &App, args: &[&str]) -> Result<()> {
+    let binary =
+        env::current_exe().context("Failed to determine the current executable to re-execute")?;
+    let status = Command::new("riverctl")
+        .args(["focus-output", "next"])
+        .status()
+        .await?;
+    if !status.success() {
+        bail!("focusing the next output failed!");
+    }
+    let arguments = [
+        &[
+            "--title",
+            "floating please",
+            "--command",
+            binary
+                .to_str()
+                .context("Failed to turn the executable path to a utf8-string")?,
+            "--db-path",
+            app.config
+                .paths
+                .database_path
+                .to_str()
+                .context("Failed to parse the database_path as a utf8-string")?,
+        ],
+        args,
+    ]
+    .concat();
+    let status = Command::new("alacritty").args(arguments).status().await?;
+    if !status.success() {
+        bail!("Falied to start `yt comments`");
+    }
+    let status = Command::new("riverctl")
+        .args(["focus-output", "next"])
+        .status()
+        .await?;
+    if !status.success() {
+        bail!("focusing the next output failed!");
+    }
+    Ok(())
+pub(super) async fn handle_yt_description_external(app: &App) -> Result<()> {
+    run_self_in_external_command(app, &["description"]).await?;
+    Ok(())
+pub(super) async fn handle_yt_description_local(app: &App, mpv: &Mpv) -> Result<()> {
+    let description: String = comments::description::get(app)
+        .await?
+        .chars()
+        .take(app.config.watch.local_displays_length)
+        .collect();
+    mpv_message(mpv, &description, Duration::from_secs(6))?;
+    Ok(())
+pub(super) async fn handle_yt_comments_external(app: &App) -> Result<()> {
+    run_self_in_external_command(app, &["comments"]).await?;
+    Ok(())
+pub(super) async fn handle_yt_comments_local(app: &App, mpv: &Mpv) -> Result<()> {
+    let comments: String = comments::get(app)
+        .await?
+        .render(false)
+        .chars()
+        .take(app.config.watch.local_displays_length)
+        .collect();
+    mpv_message(mpv, &comments, Duration::from_secs(6))?;
+    Ok(())
diff --git a/yt/src/watch/playlist_handler/mod.rs b/yt/src/watch/playlist_handler/mod.rs
new file mode 100644
index 0000000..e6ceece
--- /dev/null
+++ b/yt/src/watch/playlist_handler/mod.rs
@@ -0,0 +1,329 @@
+use std::{cmp::Ordering, time::Duration};
+use crate::{
+    app::App,
+    storage::video_database::{
+        VideoStatus, VideoStatusMarker,
+        extractor_hash::ExtractorHash,
+        get::{self, Playlist, PlaylistIndex},
+        set,
+    },
+use anyhow::{Context, Result};
+use libmpv2::{EndFileReason, Mpv, events::Event};
+use log::{debug, info};
+mod client_messages;
+#[derive(Debug, Clone, Copy)]
+pub enum Status {
+    /// There are no videos cached and no more marked to be watched.
+    /// Waiting is pointless.
+    NoMoreAvailable,
+    /// There are no videos cached, but some (> 0) are marked to be watched.
+    /// So we should wait for them to become available.
+    NoCached { marked_watch: usize },
+    /// There are videos cached and ready to be inserted into the playback queue.
+    Available { newly_available: usize },
+fn mpv_message(mpv: &Mpv, message: &str, time: Duration) -> Result<()> {
+    mpv.command("show-text", &[
+        message,
+        time.as_millis().to_string().as_str(),
+    ])?;
+    Ok(())
+async fn apply_video_options(app: &App, mpv: &Mpv, video: &ExtractorHash) -> Result<()> {
+    let options = get::video_mpv_opts(app, video).await?;
+    let video = get::video_by_hash(app, video).await?;
+    mpv.set_property("speed", options.playback_speed)?;
+    // We already start at 0, so setting it twice adds a uncomfortable skip sound.
+    if video.watch_progress.as_secs() != 0 {
+        mpv.set_property(
+            "time-pos",
+            i64::try_from(video.watch_progress.as_secs()).expect("This should not overflow"),
+        )?;
+    }
+    Ok(())
+async fn mark_video_watched(app: &App, mpv: &Mpv) -> Result<()> {
+    let current_video = get::currently_focused_video(app)
+        .await?
+        .expect("This should be some at this point");
+    debug!(
+        "playlist handler will mark video '{}' watched.",
+        current_video.title
+    );
+    save_watch_progress(app, mpv).await?;
+    set::video_watched(app, &current_video.extractor_hash).await?;
+    Ok(())
+/// Saves the `watch_progress` of the currently focused video.
+pub(super) async fn save_watch_progress(app: &App, mpv: &Mpv) -> Result<()> {
+    let current_video = get::currently_focused_video(app)
+        .await?
+        .expect("This should be some at this point");
+    let watch_progress = u32::try_from(
+        mpv.get_property::<i64>("time-pos")
+            .context("Failed to get the watchprogress of the currently playling video")?,
+    )
+    .expect("This conversion should never fail as the `time-pos` property is positive");
+    debug!(
+        "Setting the watch progress for the current_video '{}' to {watch_progress}s",
+        current_video.title_fmt_no_color()
+    );
+    set::video_watch_progress(app, &current_video.extractor_hash, watch_progress).await
+/// Sync the mpv playlist with the internal playlist.
+/// This takes an `maybe_playlist` argument, if you have already fetched the playlist and want to
+/// add that.
+pub(super) async fn reload_mpv_playlist(
+    app: &App,
+    mpv: &Mpv,
+    maybe_playlist: Option<Playlist>,
+    maybe_index: Option<PlaylistIndex>,
+) -> Result<()> {
+    fn get_playlist_count(mpv: &Mpv) -> Result<usize> {
+        mpv.get_property::<i64>("playlist/count")
+            .context("Failed to get mpv playlist len")
+            .map(|count| {
+                usize::try_from(count).expect("The playlist_count should always be positive")
+            })
+    }
+    if get_playlist_count(mpv)? != 0 {
+        // We could also use `loadlist`, but that would require use to start a unix socket or even
+        // write all the video paths to a file beforehand
+        mpv.command("playlist-clear", &[])?;
+        mpv.command("playlist-remove", &["current"])?;
+    }
+    assert_eq!(
+        get_playlist_count(mpv)?,
+        0,
+        "The playlist should be empty at this point."
+    );
+    let playlist = if let Some(p) = maybe_playlist {
+        p
+    } else {
+        get::playlist(app).await?
+    };
+    debug!("Will add {} videos to playlist.", playlist.len());
+    playlist.into_iter().try_for_each(|cache_path| {
+        mpv.command("loadfile", &[
+            cache_path.to_str().with_context(|| {
+                format!(
+                    "Failed to parse the video cache path ('{}') as valid utf8",
+                    cache_path.display()
+                )
+            })?,
+            "append-play",
+        ])?;
+        Ok::<(), anyhow::Error>(())
+    })?;
+    let index = if let Some(index) = maybe_index {
+        let index = usize::from(index);
+        let playlist_length = get_playlist_count(mpv)?;
+        match index.cmp(&playlist_length) {
+            Ordering::Greater => {
+                unreachable!(
+                    "The index '{index}' execeeds the playlist length '{playlist_length}'."
+                );
+            }
+            Ordering::Less => index,
+            Ordering::Equal => {
+                // The index is pointing to the end of the playlist. We could either go the second
+                // to last entry (i.e., one entry back) or wrap around to the start.
+                // We wrap around:
+                0
+            }
+        }
+    } else {
+        get::current_playlist_index(app)
+            .await?
+            .map_or(0, usize::from)
+    };
+    mpv.set_property("playlist-pos", index.to_string().as_str())?;
+    Ok(())
+/// Return the status of the playback queue
+pub async fn status(app: &App) -> Result<Status> {
+    let playlist = get::playlist(app).await?;
+    let playlist_len = playlist.len();
+    let marked_watch_num = get::videos(app, &[VideoStatusMarker::Watch]).await?.len();
+    if playlist_len == 0 && marked_watch_num == 0 {
+        Ok(Status::NoMoreAvailable)
+    } else if playlist_len == 0 && marked_watch_num != 0 {
+        Ok(Status::NoCached {
+            marked_watch: marked_watch_num,
+        })
+    } else if playlist_len != 0 {
+        Ok(Status::Available {
+            newly_available: playlist_len,
+        })
+    } else {
+        unreachable!(
+            "The playlist length is {playlist_len}, but the number of marked watch videos is {marked_watch_num}! This is a bug."
+        );
+    }
+/// # Returns
+/// This will return [`true`], if the event handling should be stopped
+/// # Panics
+/// Only if internal assertions fail.
+pub async fn handle_mpv_event(app: &App, mpv: &Mpv, event: &Event<'_>) -> Result<bool> {
+    match event {
+        Event::EndFile(r) => match r.reason {
+            EndFileReason::Eof => {
+                info!("Mpv reached the end of the current video. Marking it watched.");
+                mark_video_watched(app, mpv).await?;
+                reload_mpv_playlist(app, mpv, None, None).await?;
+            }
+            EndFileReason::Stop => {
+                // This reason is incredibly ambiguous. It _both_ means actually pausing a
+                // video and going to the next one in the playlist.
+                // Oh, and it's also called, when a video is removed from the playlist (at
+                // least via "playlist-remove current")
+                info!("Paused video (or went to next playlist entry); Doing nothing");
+            }
+            EndFileReason::Quit => {
+                info!("Mpv quit. Exiting playback");
+                save_watch_progress(app, mpv).await?;
+                return Ok(true);
+            }
+            EndFileReason::Error => {
+                unreachable!("This should have been raised as a separate error")
+            }
+            EndFileReason::Redirect => {
+                // TODO: We probably need to handle this somehow <2025-02-17>
+            }
+        },
+        Event::StartFile(_) => {
+            let mpv_pos = usize::try_from(mpv.get_property::<i64>("playlist-pos")?)
+                .expect("The value is strictly positive");
+            let next_video = {
+                let yt_pos = get::current_playlist_index(app).await?.map(usize::from);
+                if (Some(mpv_pos) != yt_pos) || yt_pos.is_none() {
+                    let playlist = get::playlist(app).await?;
+                    let video = playlist
+                        .get(PlaylistIndex::from(mpv_pos))
+                        .expect("The mpv pos should not be out of bounds");
+                    set::focused(
+                        app,
+                        &video.extractor_hash,
+                        get::currently_focused_video(app)
+                            .await?
+                            .as_ref()
+                            .map(|v| &v.extractor_hash),
+                    )
+                    .await?;
+                    video.extractor_hash
+                } else {
+                    get::currently_focused_video(app)
+                        .await?
+                        .expect("We have a focused video")
+                        .extractor_hash
+                }
+            };
+            apply_video_options(app, mpv, &next_video).await?;
+        }
+        Event::Seek => {
+            save_watch_progress(app, mpv).await?;
+        }
+        Event::ClientMessage(a) => {
+            debug!("Got Client Message event: '{}'", a.join(" "));
+            match a.as_slice() {
+                &["yt-comments-external"] => {
+                    client_messages::handle_yt_comments_external(app).await?;
+                }
+                &["yt-comments-local"] => {
+                    client_messages::handle_yt_comments_local(app, mpv).await?;
+                }
+                &["yt-description-external"] => {
+                    client_messages::handle_yt_description_external(app).await?;
+                }
+                &["yt-description-local"] => {
+                    client_messages::handle_yt_description_local(app, mpv).await?;
+                }
+                &["yt-mark-picked"] => {
+                    let current_video = get::currently_focused_video(app)
+                        .await?
+                        .expect("This should exist at this point");
+                    let current_index = get::current_playlist_index(app)
+                        .await?
+                        .expect("This should exist, as we can mark this video picked");
+                    save_watch_progress(app, mpv).await?;
+                    set::video_status(
+                        app,
+                        &current_video.extractor_hash,
+                        VideoStatus::Pick,
+                        Some(current_video.priority),
+                    )
+                    .await?;
+                    reload_mpv_playlist(app, mpv, None, Some(current_index)).await?;
+                    mpv_message(mpv, "Marked the video as picked", Duration::from_secs(3))?;
+                }
+                &["yt-mark-watched"] => {
+                    let current_index = get::current_playlist_index(app)
+                        .await?
+                        .expect("This should exist, as we can mark this video picked");
+                    mark_video_watched(app, mpv).await?;
+                    reload_mpv_playlist(app, mpv, None, Some(current_index)).await?;
+                    mpv_message(mpv, "Marked the video watched", Duration::from_secs(3))?;
+                }
+                &["yt-check-new-videos"] => {
+                    reload_mpv_playlist(app, mpv, None, None).await?;
+                }
+                other => {
+                    debug!("Unknown message: {}", other.join(" "));
+                }
+            }
+        }
+        _ => {}
+    }
+    Ok(false)