about summary refs log tree commit diff stats
path: root/yt/src
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-21 22:31:31 +0100
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-02-21 22:31:31 +0100
commitdc8539e3707c1a281b3aef9c7a6e8f929845d965 (patch)
treeb48576bb037b9035df45f6bd35dae899718d2c2b /yt/src
parentfix(yt/status): Show the current database version (diff)
downloadyt-dc8539e3707c1a281b3aef9c7a6e8f929845d965.zip
fix(yt/storage/notify): Switch from a polling based system to inotify
Diffstat (limited to '')
-rw-r--r--yt/src/download/mod.rs9
-rw-r--r--yt/src/storage/video_database/mod.rs1
-rw-r--r--yt/src/storage/video_database/notify.rs67
3 files changed, 73 insertions, 4 deletions
diff --git a/yt/src/download/mod.rs b/yt/src/download/mod.rs
index 7cfa0b0..317f636 100644
--- a/yt/src/download/mod.rs
+++ b/yt/src/download/mod.rs
@@ -18,6 +18,7 @@ use crate::{
         downloader::{get_next_uncached_video, set_video_cache_path},
         extractor_hash::ExtractorHash,
         getters::get_video_yt_dlp_opts,
+        notify::wait_for_cache_reduction,
     },
     unreachable::Unreachable,
 };
@@ -160,12 +161,10 @@ impl Downloader {
                     "The `printed_warning` should be false in this case, \
                     and thus should have already set the `cached_cache_allocation`."
                 );
-                // info!("Current cache size allocation: '{}'", cache_allocation);
-                // self.cached_cache_allocation = Some(cache_allocation);
             }
 
             // Wait and hope, that a large video is deleted from the cache.
-            time::sleep(Duration::from_secs(10)).await;
+            wait_for_cache_reduction(app).await?;
             Ok(CacheSizeCheck::TooLarge)
         } else {
             self.printed_warning = false;
@@ -210,6 +209,7 @@ impl Downloader {
                     );
 
                     // Replace the currently downloading video
+                    // FIXME(@bpeetz): This does not work (probably because of the python part.) <2025-02-21>
                     current_download.task_handle.abort();
 
                     let new_current_download =
@@ -227,7 +227,8 @@ impl Downloader {
                 self.current_download = Some(new_current_download);
             }
 
-            time::sleep(Duration::new(1, 0)).await;
+            // TODO(@bpeetz): Why do we sleep here? <2025-02-21>
+            time::sleep(Duration::from_secs(1)).await;
         }
 
         info!("Finished downloading!");
diff --git a/yt/src/storage/video_database/mod.rs b/yt/src/storage/video_database/mod.rs
index afcd179..22628b5 100644
--- a/yt/src/storage/video_database/mod.rs
+++ b/yt/src/storage/video_database/mod.rs
@@ -18,6 +18,7 @@ pub mod downloader;
 pub mod extractor_hash;
 pub mod getters;
 pub mod setters;
+pub mod notify;
 
 #[derive(Debug, Clone)]
 pub struct Video {
diff --git a/yt/src/storage/video_database/notify.rs b/yt/src/storage/video_database/notify.rs
new file mode 100644
index 0000000..56599ba
--- /dev/null
+++ b/yt/src/storage/video_database/notify.rs
@@ -0,0 +1,67 @@
+use std::{
+    path::{Path, PathBuf},
+    sync::mpsc,
+    thread::sleep,
+    time::Duration,
+};
+
+use crate::app::App;
+
+use anyhow::{Context, Result};
+use notify::{
+    Event, EventKind, RecursiveMode, Watcher,
+    event::{DataChange, ModifyKind},
+};
+use tokio::task;
+
+/// This functions registers a watcher for the database and only returns once a write was
+/// registered for the database.
+pub async fn wait_for_db_write(app: &App) -> Result<()> {
+    let db_path: PathBuf = app.config.paths.database_path.clone();
+    task::spawn_blocking(move || wait_for_db_write_sync(&db_path)).await?
+}
+
+fn wait_for_db_write_sync(db_path: &Path) -> Result<()> {
+    let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
+
+    let mut watcher = notify::recommended_watcher(tx)?;
+
+    watcher.watch(db_path, RecursiveMode::NonRecursive)?;
+
+    for res in rx {
+        let event = res.context("Failed to wait for db write")?;
+
+        if let EventKind::Modify(ModifyKind::Data(DataChange::Any)) = event.kind {
+            // Buffer some of the `Modify` event burst.
+            sleep(Duration::from_millis(10));
+
+            return Ok(());
+        }
+    }
+
+    Ok(())
+}
+
+/// This functions registers a watcher for the cache path and returns once a file was removed
+pub async fn wait_for_cache_reduction(app: &App) -> Result<()> {
+    let download_directory: PathBuf = app.config.paths.download_dir.clone();
+    task::spawn_blocking(move || wait_for_cache_reduction_sync(&download_directory)).await?
+}
+
+fn wait_for_cache_reduction_sync(download_directory: &Path) -> Result<()> {
+    let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
+
+    let mut watcher = notify::recommended_watcher(tx)?;
+
+    watcher.watch(download_directory, RecursiveMode::Recursive)?;
+
+    for res in rx {
+        let event = res.context("Failed to wait for cache size reduction")?;
+
+        if let EventKind::Remove(_) = event.kind {
+            return Ok(());
+        }
+    }
+
+    Ok(())
+}