about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-07-15 06:57:31 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-07-15 06:58:44 +0200
commit507c9611232e7b820789ec776159c703acd499ab (patch)
tree8c8d480d87ad064b1e6c028c6ce17f6d6b98906e
parentrefactor(crates/yt): Allow `missing_panic_docs` and use expect (diff)
downloadyt-507c9611232e7b820789ec776159c703acd499ab.zip
fix(crates/yt/downloader): Correctly treat the download as blocking
This change _might_ also allow aborting the current download, but I'm
not yet sure.
-rw-r--r--crates/yt/src/download/mod.rs126
-rw-r--r--crates/yt_dlp/src/lib.rs38
2 files changed, 108 insertions, 56 deletions
diff --git a/crates/yt/src/download/mod.rs b/crates/yt/src/download/mod.rs
index 13e1bb3..3eb046a 100644
--- a/crates/yt/src/download/mod.rs
+++ b/crates/yt/src/download/mod.rs
@@ -33,26 +33,64 @@ pub(crate) mod progress_hook;
 
 #[derive(Debug)]
 #[allow(clippy::module_name_repetitions)]
-pub struct CurrentDownload {
-    task_handle: JoinHandle<Result<()>>,
+pub(crate) struct CurrentDownload {
+    task_handle: JoinHandle<Result<(PathBuf, Video)>>,
+    yt_dlp: Arc<YoutubeDL>,
     extractor_hash: ExtractorHash,
 }
 
 impl CurrentDownload {
-    fn new_from_video(app: Arc<App>, video: Video) -> Self {
+    fn new_from_video(app: &App, video: Video) -> Result<Self> {
         let extractor_hash = video.extractor_hash;
 
-        let task_handle = tokio::spawn(async move {
-            Downloader::actually_cache_video(&app, &video)
-                .await
-                .with_context(|| format!("Failed to cache video: '{}'", video.title))?;
-            Ok(())
+        debug!("Download started: {}", &video.title);
+        let yt_dlp = Arc::new(download_opts(app, video.subtitle_langs.as_ref())?);
+
+        let local_yt_dlp = Arc::clone(&yt_dlp);
+
+        let task_handle = tokio::task::spawn_blocking(move || {
+            let mut result = local_yt_dlp
+                .download(&[video.url.clone()])
+                .with_context(|| format!("Failed to download video: '{}'", video.title))?;
+
+            assert_eq!(result.len(), 1);
+            Ok((result.remove(0), video))
         });
 
-        Self {
+        Ok(Self {
             task_handle,
+            yt_dlp,
             extractor_hash,
-        }
+        })
+    }
+
+    fn abort(self) -> Result<()> {
+        debug!("Cancelling download.");
+        self.yt_dlp.close()?;
+
+        Ok(())
+    }
+
+    fn is_finished(&self) -> bool {
+        self.task_handle.is_finished()
+    }
+
+    async fn finalize(self, app: &App) -> Result<()> {
+        let (result, mut video) = self.task_handle.await??;
+
+        let mut ops = Operations::new("Downloader: Set download path");
+        video.set_download_path(&result, &mut ops);
+        ops.commit(app)
+            .await
+            .with_context(|| format!("Failed to committ download of video: '{}'", video.title))?;
+
+        info!(
+            "Video '{}' was downlaoded to path: {}",
+            video.title,
+            result.display()
+        );
+
+        Ok(())
     }
 }
 
@@ -92,7 +130,9 @@ impl Downloader {
         }
     }
 
-    /// Check if enough cache is available. Will wait for 10s if it's not.
+    /// Check if enough cache is available.
+    ///
+    /// Will wait for the next cache deletion if not.
     async fn is_enough_cache_available(
         &mut self,
         app: &App,
@@ -107,7 +147,7 @@ impl Downloader {
             }
         }
         let cache_allocation = Self::get_current_cache_allocation(app).await?;
-        let video_size = self.get_approx_video_size(app, next_video)?;
+        let video_size = self.get_approx_video_size(next_video)?;
 
         if video_size >= max_cache_size {
             error!(
@@ -143,10 +183,7 @@ impl Downloader {
                     // Only print the warning if the display string has actually changed.
                     // Otherwise, we might confuse the user
                     if cca.to_string() != cache_allocation.to_string() {
-                        warn!(
-                            "Current cache size has changed, it's now: '{}'",
-                            cache_allocation
-                        );
+                        warn!("Current cache size has changed, it's now: '{cache_allocation}'");
                     }
                     debug!(
                         "Cache size has changed: {} -> {}",
@@ -184,18 +221,20 @@ impl Downloader {
                 CacheSizeCheck::Fits => (),
                 CacheSizeCheck::TooLarge => continue,
                 CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."),
-            };
+            }
 
             if self.current_download.is_some() {
-                let current_download = self.current_download.take().unreachable("It is `Some`.");
+                let current_download = self.current_download.take().expect("It is `Some`.");
 
-                if current_download.task_handle.is_finished() {
-                    current_download.task_handle.await??;
+                if current_download.is_finished() {
+                    // The download is done, finalize it and leave it removed.
+                    current_download.finalize(&app).await?;
                     continue;
                 }
 
                 if next_video.extractor_hash == current_download.extractor_hash {
-                    // Reset the taken value
+                    // We still want to download the same video.
+                    // reset the taken value
                     self.current_download = Some(current_download);
                 } else {
                     info!(
@@ -208,11 +247,11 @@ impl Downloader {
                     );
 
                     // Replace the currently downloading video
-                    // FIXME(@bpeetz): This does not work (probably because of the python part.) <2025-02-21>
-                    current_download.task_handle.abort();
+                    current_download
+                        .abort()
+                        .context("Failed to abort last download")?;
 
-                    let new_current_download =
-                        CurrentDownload::new_from_video(Arc::clone(&app), next_video);
+                    let new_current_download = CurrentDownload::new_from_video(&app, next_video)?;
 
                     self.current_download = Some(new_current_download);
                 }
@@ -221,13 +260,16 @@ impl Downloader {
                     "No video is being downloaded right now, setting it to '{}'",
                     next_video.title
                 );
-                let new_current_download =
-                    CurrentDownload::new_from_video(Arc::clone(&app), next_video);
+                let new_current_download = CurrentDownload::new_from_video(&app, next_video)?;
                 self.current_download = Some(new_current_download);
             }
 
-            // TODO(@bpeetz): Why do we sleep here? <2025-02-21>
-            time::sleep(Duration::from_secs(1)).await;
+            // We have to continuously check, if the current download is done.
+            // As such we simply wait or recheck on the next write to the db.
+            select! {
+                () = time::sleep(Duration::from_secs(1)) => (),
+                Ok(()) = wait_for_db_write(&app) => (),
+            }
         }
 
         info!("Finished downloading!");
@@ -289,7 +331,7 @@ impl Downloader {
         dir_size(read_dir_result).await
     }
 
-    fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result<u64> {
+    fn get_approx_video_size(&mut self, video: &Video) -> Result<u64> {
         if let Some(value) = self.video_size_cache.get(&video.extractor_hash) {
             Ok(*value)
         } else {
@@ -344,28 +386,4 @@ impl Downloader {
             Ok(size)
         }
     }
-
-    async fn actually_cache_video(app: &App, video: &Video) -> Result<()> {
-        debug!("Download started: {}", &video.title);
-
-        let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?;
-        let yt_dlp = download_opts(app, &addional_opts)?;
-
-        let result = yt_dlp
-            .download(&[video.url.clone()])
-            .with_context(|| format!("Failed to download video: '{}'", video.title))?;
-
-        assert_eq!(result.len(), 1);
-        let result = &result[0];
-
-        set_video_cache_path(app, &video.extractor_hash, Some(result)).await?;
-
-        info!(
-            "Video '{}' was downlaoded to path: {}",
-            video.title,
-            result.display()
-        );
-
-        Ok(())
-    }
 }
diff --git a/crates/yt_dlp/src/lib.rs b/crates/yt_dlp/src/lib.rs
index d0cfbdd..dc602db 100644
--- a/crates/yt_dlp/src/lib.rs
+++ b/crates/yt_dlp/src/lib.rs
@@ -12,7 +12,7 @@
 
 use std::path::PathBuf;
 
-use log::info;
+use log::{debug, info};
 use pyo3::{
     Bound, Py, PyAny, Python, intern,
     types::{PyAnyMethods, PyDict, PyIterator, PyList},
@@ -273,12 +273,36 @@ impl YoutubeDL {
         })
     }
 
+    /// Close this [`YoutubeDL`] instance, and stop all currently running downloads.
+    ///
+    /// # Errors
+    /// If python operations fail.
+    pub fn close(&self) -> Result<(), close::Error> {
+        Python::with_gil(|py| {
+            debug!("Closing YoutubeDL.");
+
+            let inner = self
+                .inner
+                .bind(py)
+                .getattr(intern!(py, "close"))
+                .wrap_exc(py)?;
+
+            inner.call0().wrap_exc(py)?;
+
+            Ok(())
+        })
+    }
+
     fn prepare_info_json<'py>(
         &self,
         info: &Bound<'py, PyDict>,
         py: Python<'py>,
     ) -> Result<InfoJson, prepare::Error> {
-        let sanitize = self.inner.bind(py).getattr(intern!(py, "sanitize_info")).wrap_exc(py)?;
+        let sanitize = self
+            .inner
+            .bind(py)
+            .getattr(intern!(py, "sanitize_info"))
+            .wrap_exc(py)?;
 
         let value = sanitize.call((info,), None).wrap_exc(py)?;
 
@@ -289,6 +313,16 @@ impl YoutubeDL {
 }
 
 #[allow(missing_docs)]
+pub mod close {
+    use crate::python_error::PythonError;
+
+    #[derive(Debug, thiserror::Error)]
+    pub enum Error {
+        #[error(transparent)]
+        Python(#[from] PythonError),
+    }
+}
+#[allow(missing_docs)]
 pub mod process_ie_result {
     use crate::{prepare, python_error::PythonError};