aboutsummaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-07-15 06:57:31 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2025-07-15 06:58:44 +0200
commit507c9611232e7b820789ec776159c703acd499ab (patch)
tree8c8d480d87ad064b1e6c028c6ce17f6d6b98906e /crates
parentrefactor(crates/yt): Allow `missing_panic_docs` and use expect (diff)
downloadyt-507c9611232e7b820789ec776159c703acd499ab.zip
fix(crates/yt/downloader): Correctly treat the download as blocking
This change _might_ also allow aborting the current download, but I'm not yet sure.
Diffstat (limited to '')
-rw-r--r--crates/yt/src/download/mod.rs126
-rw-r--r--crates/yt_dlp/src/lib.rs38
2 files changed, 108 insertions, 56 deletions
diff --git a/crates/yt/src/download/mod.rs b/crates/yt/src/download/mod.rs
index 13e1bb3..3eb046a 100644
--- a/crates/yt/src/download/mod.rs
+++ b/crates/yt/src/download/mod.rs
@@ -33,26 +33,64 @@ pub(crate) mod progress_hook;
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
-pub struct CurrentDownload {
- task_handle: JoinHandle<Result<()>>,
+pub(crate) struct CurrentDownload {
+ task_handle: JoinHandle<Result<(PathBuf, Video)>>,
+ yt_dlp: Arc<YoutubeDL>,
extractor_hash: ExtractorHash,
}
impl CurrentDownload {
- fn new_from_video(app: Arc<App>, video: Video) -> Self {
+ fn new_from_video(app: &App, video: Video) -> Result<Self> {
let extractor_hash = video.extractor_hash;
- let task_handle = tokio::spawn(async move {
- Downloader::actually_cache_video(&app, &video)
- .await
- .with_context(|| format!("Failed to cache video: '{}'", video.title))?;
- Ok(())
+ debug!("Download started: {}", &video.title);
+ let yt_dlp = Arc::new(download_opts(app, video.subtitle_langs.as_ref())?);
+
+ let local_yt_dlp = Arc::clone(&yt_dlp);
+
+ let task_handle = tokio::task::spawn_blocking(move || {
+ let mut result = local_yt_dlp
+ .download(&[video.url.clone()])
+ .with_context(|| format!("Failed to download video: '{}'", video.title))?;
+
+ assert_eq!(result.len(), 1);
+ Ok((result.remove(0), video))
});
- Self {
+ Ok(Self {
task_handle,
+ yt_dlp,
extractor_hash,
- }
+ })
+ }
+
+ fn abort(self) -> Result<()> {
+ debug!("Cancelling download.");
+ self.yt_dlp.close()?;
+
+ Ok(())
+ }
+
+ fn is_finished(&self) -> bool {
+ self.task_handle.is_finished()
+ }
+
+ async fn finalize(self, app: &App) -> Result<()> {
+ let (result, mut video) = self.task_handle.await??;
+
+ let mut ops = Operations::new("Downloader: Set download path");
+ video.set_download_path(&result, &mut ops);
+ ops.commit(app)
+ .await
+ .with_context(|| format!("Failed to committ download of video: '{}'", video.title))?;
+
+ info!(
+ "Video '{}' was downlaoded to path: {}",
+ video.title,
+ result.display()
+ );
+
+ Ok(())
}
}
@@ -92,7 +130,9 @@ impl Downloader {
}
}
- /// Check if enough cache is available. Will wait for 10s if it's not.
+ /// Check if enough cache is available.
+ ///
+ /// Will wait for the next cache deletion if not.
async fn is_enough_cache_available(
&mut self,
app: &App,
@@ -107,7 +147,7 @@ impl Downloader {
}
}
let cache_allocation = Self::get_current_cache_allocation(app).await?;
- let video_size = self.get_approx_video_size(app, next_video)?;
+ let video_size = self.get_approx_video_size(next_video)?;
if video_size >= max_cache_size {
error!(
@@ -143,10 +183,7 @@ impl Downloader {
// Only print the warning if the display string has actually changed.
// Otherwise, we might confuse the user
if cca.to_string() != cache_allocation.to_string() {
- warn!(
- "Current cache size has changed, it's now: '{}'",
- cache_allocation
- );
+ warn!("Current cache size has changed, it's now: '{cache_allocation}'");
}
debug!(
"Cache size has changed: {} -> {}",
@@ -184,18 +221,20 @@ impl Downloader {
CacheSizeCheck::Fits => (),
CacheSizeCheck::TooLarge => continue,
CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."),
- };
+ }
if self.current_download.is_some() {
- let current_download = self.current_download.take().unreachable("It is `Some`.");
+ let current_download = self.current_download.take().expect("It is `Some`.");
- if current_download.task_handle.is_finished() {
- current_download.task_handle.await??;
+ if current_download.is_finished() {
+ // The download is done, finalize it and leave it removed.
+ current_download.finalize(&app).await?;
continue;
}
if next_video.extractor_hash == current_download.extractor_hash {
- // Reset the taken value
+ // We still want to download the same video.
+ // reset the taken value
self.current_download = Some(current_download);
} else {
info!(
@@ -208,11 +247,11 @@ impl Downloader {
);
// Replace the currently downloading video
- // FIXME(@bpeetz): This does not work (probably because of the python part.) <2025-02-21>
- current_download.task_handle.abort();
+ current_download
+ .abort()
+ .context("Failed to abort last download")?;
- let new_current_download =
- CurrentDownload::new_from_video(Arc::clone(&app), next_video);
+ let new_current_download = CurrentDownload::new_from_video(&app, next_video)?;
self.current_download = Some(new_current_download);
}
@@ -221,13 +260,16 @@ impl Downloader {
"No video is being downloaded right now, setting it to '{}'",
next_video.title
);
- let new_current_download =
- CurrentDownload::new_from_video(Arc::clone(&app), next_video);
+ let new_current_download = CurrentDownload::new_from_video(&app, next_video)?;
self.current_download = Some(new_current_download);
}
- // TODO(@bpeetz): Why do we sleep here? <2025-02-21>
- time::sleep(Duration::from_secs(1)).await;
+ // We have to continuously check, if the current download is done.
+ // As such we simply wait or recheck on the next write to the db.
+ select! {
+ () = time::sleep(Duration::from_secs(1)) => (),
+ Ok(()) = wait_for_db_write(&app) => (),
+ }
}
info!("Finished downloading!");
@@ -289,7 +331,7 @@ impl Downloader {
dir_size(read_dir_result).await
}
- fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result<u64> {
+ fn get_approx_video_size(&mut self, video: &Video) -> Result<u64> {
if let Some(value) = self.video_size_cache.get(&video.extractor_hash) {
Ok(*value)
} else {
@@ -344,28 +386,4 @@ impl Downloader {
Ok(size)
}
}
-
- async fn actually_cache_video(app: &App, video: &Video) -> Result<()> {
- debug!("Download started: {}", &video.title);
-
- let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?;
- let yt_dlp = download_opts(app, &addional_opts)?;
-
- let result = yt_dlp
- .download(&[video.url.clone()])
- .with_context(|| format!("Failed to download video: '{}'", video.title))?;
-
- assert_eq!(result.len(), 1);
- let result = &result[0];
-
- set_video_cache_path(app, &video.extractor_hash, Some(result)).await?;
-
- info!(
- "Video '{}' was downlaoded to path: {}",
- video.title,
- result.display()
- );
-
- Ok(())
- }
}
diff --git a/crates/yt_dlp/src/lib.rs b/crates/yt_dlp/src/lib.rs
index d0cfbdd..dc602db 100644
--- a/crates/yt_dlp/src/lib.rs
+++ b/crates/yt_dlp/src/lib.rs
@@ -12,7 +12,7 @@
use std::path::PathBuf;
-use log::info;
+use log::{debug, info};
use pyo3::{
Bound, Py, PyAny, Python, intern,
types::{PyAnyMethods, PyDict, PyIterator, PyList},
@@ -273,12 +273,36 @@ impl YoutubeDL {
})
}
+ /// Close this [`YoutubeDL`] instance, and stop all currently running downloads.
+ ///
+ /// # Errors
+ /// If python operations fail.
+ pub fn close(&self) -> Result<(), close::Error> {
+ Python::with_gil(|py| {
+ debug!("Closing YoutubeDL.");
+
+ let inner = self
+ .inner
+ .bind(py)
+ .getattr(intern!(py, "close"))
+ .wrap_exc(py)?;
+
+ inner.call0().wrap_exc(py)?;
+
+ Ok(())
+ })
+ }
+
fn prepare_info_json<'py>(
&self,
info: &Bound<'py, PyDict>,
py: Python<'py>,
) -> Result<InfoJson, prepare::Error> {
- let sanitize = self.inner.bind(py).getattr(intern!(py, "sanitize_info")).wrap_exc(py)?;
+ let sanitize = self
+ .inner
+ .bind(py)
+ .getattr(intern!(py, "sanitize_info"))
+ .wrap_exc(py)?;
let value = sanitize.call((info,), None).wrap_exc(py)?;
@@ -289,6 +313,16 @@ impl YoutubeDL {
}
#[allow(missing_docs)]
+pub mod close {
+ use crate::python_error::PythonError;
+
+ #[derive(Debug, thiserror::Error)]
+ pub enum Error {
+ #[error(transparent)]
+ Python(#[from] PythonError),
+ }
+}
+#[allow(missing_docs)]
pub mod process_ie_result {
use crate::{prepare, python_error::PythonError};