diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-15 06:57:31 +0200 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2025-07-15 06:58:44 +0200 |
commit | 507c9611232e7b820789ec776159c703acd499ab (patch) | |
tree | 8c8d480d87ad064b1e6c028c6ce17f6d6b98906e | |
parent | refactor(crates/yt): Allow `missing_panic_docs` and use expect (diff) | |
download | yt-507c9611232e7b820789ec776159c703acd499ab.zip |
fix(crates/yt/downloader): Correctly treat the download as blocking
This change _might_ also allow aborting the current download, but I'm not yet sure.
-rw-r--r-- | crates/yt/src/download/mod.rs | 126 | ||||
-rw-r--r-- | crates/yt_dlp/src/lib.rs | 38 |
2 files changed, 108 insertions, 56 deletions
diff --git a/crates/yt/src/download/mod.rs b/crates/yt/src/download/mod.rs index 13e1bb3..3eb046a 100644 --- a/crates/yt/src/download/mod.rs +++ b/crates/yt/src/download/mod.rs @@ -33,26 +33,64 @@ pub(crate) mod progress_hook; #[derive(Debug)] #[allow(clippy::module_name_repetitions)] -pub struct CurrentDownload { - task_handle: JoinHandle<Result<()>>, +pub(crate) struct CurrentDownload { + task_handle: JoinHandle<Result<(PathBuf, Video)>>, + yt_dlp: Arc<YoutubeDL>, extractor_hash: ExtractorHash, } impl CurrentDownload { - fn new_from_video(app: Arc<App>, video: Video) -> Self { + fn new_from_video(app: &App, video: Video) -> Result<Self> { let extractor_hash = video.extractor_hash; - let task_handle = tokio::spawn(async move { - Downloader::actually_cache_video(&app, &video) - .await - .with_context(|| format!("Failed to cache video: '{}'", video.title))?; - Ok(()) + debug!("Download started: {}", &video.title); + let yt_dlp = Arc::new(download_opts(app, video.subtitle_langs.as_ref())?); + + let local_yt_dlp = Arc::clone(&yt_dlp); + + let task_handle = tokio::task::spawn_blocking(move || { + let mut result = local_yt_dlp + .download(&[video.url.clone()]) + .with_context(|| format!("Failed to download video: '{}'", video.title))?; + + assert_eq!(result.len(), 1); + Ok((result.remove(0), video)) }); - Self { + Ok(Self { task_handle, + yt_dlp, extractor_hash, - } + }) + } + + fn abort(self) -> Result<()> { + debug!("Cancelling download."); + self.yt_dlp.close()?; + + Ok(()) + } + + fn is_finished(&self) -> bool { + self.task_handle.is_finished() + } + + async fn finalize(self, app: &App) -> Result<()> { + let (result, mut video) = self.task_handle.await??; + + let mut ops = Operations::new("Downloader: Set download path"); + video.set_download_path(&result, &mut ops); + ops.commit(app) + .await + .with_context(|| format!("Failed to committ download of video: '{}'", video.title))?; + + info!( + "Video '{}' was downlaoded to path: {}", + video.title, + result.display() + ); + + Ok(()) } } @@ -92,7 +130,9 @@ impl Downloader { } } - /// Check if enough cache is available. Will wait for 10s if it's not. + /// Check if enough cache is available. + /// + /// Will wait for the next cache deletion if not. async fn is_enough_cache_available( &mut self, app: &App, @@ -107,7 +147,7 @@ impl Downloader { } } let cache_allocation = Self::get_current_cache_allocation(app).await?; - let video_size = self.get_approx_video_size(app, next_video)?; + let video_size = self.get_approx_video_size(next_video)?; if video_size >= max_cache_size { error!( @@ -143,10 +183,7 @@ impl Downloader { // Only print the warning if the display string has actually changed. // Otherwise, we might confuse the user if cca.to_string() != cache_allocation.to_string() { - warn!( - "Current cache size has changed, it's now: '{}'", - cache_allocation - ); + warn!("Current cache size has changed, it's now: '{cache_allocation}'"); } debug!( "Cache size has changed: {} -> {}", @@ -184,18 +221,20 @@ impl Downloader { CacheSizeCheck::Fits => (), CacheSizeCheck::TooLarge => continue, CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."), - }; + } if self.current_download.is_some() { - let current_download = self.current_download.take().unreachable("It is `Some`."); + let current_download = self.current_download.take().expect("It is `Some`."); - if current_download.task_handle.is_finished() { - current_download.task_handle.await??; + if current_download.is_finished() { + // The download is done, finalize it and leave it removed. + current_download.finalize(&app).await?; continue; } if next_video.extractor_hash == current_download.extractor_hash { - // Reset the taken value + // We still want to download the same video. + // reset the taken value self.current_download = Some(current_download); } else { info!( @@ -208,11 +247,11 @@ impl Downloader { ); // Replace the currently downloading video - // FIXME(@bpeetz): This does not work (probably because of the python part.) <2025-02-21> - current_download.task_handle.abort(); + current_download + .abort() + .context("Failed to abort last download")?; - let new_current_download = - CurrentDownload::new_from_video(Arc::clone(&app), next_video); + let new_current_download = CurrentDownload::new_from_video(&app, next_video)?; self.current_download = Some(new_current_download); } @@ -221,13 +260,16 @@ impl Downloader { "No video is being downloaded right now, setting it to '{}'", next_video.title ); - let new_current_download = - CurrentDownload::new_from_video(Arc::clone(&app), next_video); + let new_current_download = CurrentDownload::new_from_video(&app, next_video)?; self.current_download = Some(new_current_download); } - // TODO(@bpeetz): Why do we sleep here? <2025-02-21> - time::sleep(Duration::from_secs(1)).await; + // We have to continuously check, if the current download is done. + // As such we simply wait or recheck on the next write to the db. + select! { + () = time::sleep(Duration::from_secs(1)) => (), + Ok(()) = wait_for_db_write(&app) => (), + } } info!("Finished downloading!"); @@ -289,7 +331,7 @@ impl Downloader { dir_size(read_dir_result).await } - fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result<u64> { + fn get_approx_video_size(&mut self, video: &Video) -> Result<u64> { if let Some(value) = self.video_size_cache.get(&video.extractor_hash) { Ok(*value) } else { @@ -344,28 +386,4 @@ impl Downloader { Ok(size) } } - - async fn actually_cache_video(app: &App, video: &Video) -> Result<()> { - debug!("Download started: {}", &video.title); - - let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?; - let yt_dlp = download_opts(app, &addional_opts)?; - - let result = yt_dlp - .download(&[video.url.clone()]) - .with_context(|| format!("Failed to download video: '{}'", video.title))?; - - assert_eq!(result.len(), 1); - let result = &result[0]; - - set_video_cache_path(app, &video.extractor_hash, Some(result)).await?; - - info!( - "Video '{}' was downlaoded to path: {}", - video.title, - result.display() - ); - - Ok(()) - } } diff --git a/crates/yt_dlp/src/lib.rs b/crates/yt_dlp/src/lib.rs index d0cfbdd..dc602db 100644 --- a/crates/yt_dlp/src/lib.rs +++ b/crates/yt_dlp/src/lib.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; -use log::info; +use log::{debug, info}; use pyo3::{ Bound, Py, PyAny, Python, intern, types::{PyAnyMethods, PyDict, PyIterator, PyList}, @@ -273,12 +273,36 @@ impl YoutubeDL { }) } + /// Close this [`YoutubeDL`] instance, and stop all currently running downloads. + /// + /// # Errors + /// If python operations fail. + pub fn close(&self) -> Result<(), close::Error> { + Python::with_gil(|py| { + debug!("Closing YoutubeDL."); + + let inner = self + .inner + .bind(py) + .getattr(intern!(py, "close")) + .wrap_exc(py)?; + + inner.call0().wrap_exc(py)?; + + Ok(()) + }) + } + fn prepare_info_json<'py>( &self, info: &Bound<'py, PyDict>, py: Python<'py>, ) -> Result<InfoJson, prepare::Error> { - let sanitize = self.inner.bind(py).getattr(intern!(py, "sanitize_info")).wrap_exc(py)?; + let sanitize = self + .inner + .bind(py) + .getattr(intern!(py, "sanitize_info")) + .wrap_exc(py)?; let value = sanitize.call((info,), None).wrap_exc(py)?; @@ -289,6 +313,16 @@ impl YoutubeDL { } #[allow(missing_docs)] +pub mod close { + use crate::python_error::PythonError; + + #[derive(Debug, thiserror::Error)] + pub enum Error { + #[error(transparent)] + Python(#[from] PythonError), + } +} +#[allow(missing_docs)] pub mod process_ie_result { use crate::{prepare, python_error::PythonError}; |