// yt - A fully featured command line YouTube client
//
// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
// SPDX-License-Identifier: GPL-3.0-or-later
//
// This file is part of Yt.
//
// You should have received a copy of the License along with this program.
// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use crate::{
app::App,
download::download_options::download_opts,
storage::video_database::{
downloader::{get_next_uncached_video, set_video_cache_path},
extractor_hash::ExtractorHash,
getters::get_video_yt_dlp_opts,
Video, YtDlpOptions,
},
};
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use log::{debug, error, info, warn};
use tokio::{fs, task::JoinHandle, time};
mod download_options;
#[derive(Debug)]
pub struct CurrentDownload {
task_handle: JoinHandle<Result<()>>,
extractor_hash: ExtractorHash,
}
impl CurrentDownload {
fn new_from_video(app: Arc<App>, video: Video) -> Self {
let extractor_hash = video.extractor_hash.clone();
let task_handle = tokio::spawn(async move {
Downloader::actually_cache_video(&app, &video)
.await
.with_context(|| format!("Failed to cache video: '{}'", video.title))?;
Ok(())
});
Self {
task_handle,
extractor_hash,
}
}
}
enum CacheSizeCheck {
/// The video can be downloaded
Fits,
/// The video and the current cache size together would exceed the size
TooLarge,
/// The video would not even fit into the empty cache
ExceedsMaxCacheSize,
}
pub struct Downloader {
current_download: Option<CurrentDownload>,
video_size_cache: HashMap<ExtractorHash, u64>,
printed_warning: bool,
cached_cache_allocation: Option<u64>,
}
impl Default for Downloader {
fn default() -> Self {
Self::new()
}
}
impl Downloader {
pub fn new() -> Self {
Self {
current_download: None,
video_size_cache: HashMap::new(),
printed_warning: false,
cached_cache_allocation: None,
}
}
/// Check if enough cache is available. Will wait for 10s if it's not.
async fn is_enough_cache_available(
&mut self,
app: &App,
max_cache_size: u64,
next_video: &Video,
) -> Result<CacheSizeCheck> {
if let Some(cdownload) = &self.current_download {
if cdownload.extractor_hash == next_video.extractor_hash {
// If the video is already being downloaded it will always fit. Otherwise the
// download would not have been started.
return Ok(CacheSizeCheck::Fits);
}
}
let cache_allocation = Self::get_current_cache_allocation(app).await?;
let video_size = self.get_approx_video_size(app, next_video).await?;
if video_size >= max_cache_size {
error!(
"The video '{}' ({}) exceeds the maximum cache size ({})! \
Please set a bigger maximum (`--max-cache-size`) or skip it.",
next_video.title,
Bytes::new(video_size),
Bytes::new(max_cache_size)
);
return Ok(CacheSizeCheck::ExceedsMaxCacheSize);
}
if cache_allocation + video_size >= max_cache_size {
if !self.printed_warning {
warn!(
"Can't download video: '{}' ({}) as it's too large for the cache ({} of {} allocated). \
Waiting for cache size reduction..",
next_video.title, Bytes::new(video_size), Bytes::new(cache_allocation), Bytes::new(max_cache_size)
);
self.printed_warning = true;
}
if let Some(cca) = self.cached_cache_allocation {
if cca != cache_allocation {
warn!(
"Current cache size has changed, it's now: '{}'",
Bytes::new(cache_allocation)
);
self.cached_cache_allocation = Some(cache_allocation);
}
} else {
info!(
"Current cache size allocation: '{}'",
Bytes::new(cache_allocation)
);
self.cached_cache_allocation = Some(cache_allocation);
}
// Wait and hope, that a large video is deleted from the cache.
time::sleep(Duration::from_secs(10)).await;
Ok(CacheSizeCheck::TooLarge)
} else {
self.printed_warning = false;
Ok(CacheSizeCheck::Fits)
}
}
/// The entry point to the Downloader.
/// This Downloader will periodically check if the database has changed, and then also
/// change which videos it downloads.
/// This will run, until the database doesn't contain any watchable videos
pub async fn consume(&mut self, app: Arc<App>, max_cache_size: u64) -> Result<()> {
while let Some(next_video) = get_next_uncached_video(&app).await? {
match self
.is_enough_cache_available(&app, max_cache_size, &next_video)
.await?
{
CacheSizeCheck::Fits => (),
CacheSizeCheck::TooLarge => continue,
CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."),
};
if self.current_download.is_some() {
let current_download = self.current_download.take().expect("Is Some");
if current_download.task_handle.is_finished() {
current_download.task_handle.await??;
continue;
}
if next_video.extractor_hash != current_download.extractor_hash {
info!(
"Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!",
next_video.extractor_hash.into_short_hash(&app).await?, current_download.extractor_hash.into_short_hash(&app).await?
);
// Replace the currently downloading video
current_download.task_handle.abort();
let new_current_download =
CurrentDownload::new_from_video(Arc::clone(&app), next_video);
self.current_download = Some(new_current_download);
} else {
// Reset the taken value
self.current_download = Some(current_download);
}
} else {
info!(
"No video is being downloaded right now, setting it to '{}'",
next_video.title
);
let new_current_download =
CurrentDownload::new_from_video(Arc::clone(&app), next_video);
self.current_download = Some(new_current_download);
}
time::sleep(Duration::new(1, 0)).await;
}
info!("Finished downloading!");
Ok(())
}
pub async fn get_current_cache_allocation(app: &App) -> Result<u64> {
fn dir_size(mut dir: fs::ReadDir) -> BoxFuture<'static, Result<u64>> {
async move {
let mut acc = 0;
while let Some(entry) = dir.next_entry().await? {
let size = match entry.metadata().await? {
data if data.is_dir() => {
let path = entry.path();
let read_dir = fs::read_dir(path).await?;
dir_size(read_dir).await?
}
data => data.len(),
};
acc += size;
}
Ok(acc)
}
.boxed()
}
dir_size(fs::read_dir(&app.config.paths.download_dir).await?).await
}
async fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result<u64> {
if let Some(value) = self.video_size_cache.get(&video.extractor_hash) {
Ok(*value)
} else {
// the subtitle file size should be negligible
let add_opts = YtDlpOptions {
subtitle_langs: "".to_owned(),
};
let opts = &download_opts(app, add_opts);
let result = yt_dlp::extract_info(opts, &video.url, false, true)
.await
.with_context(|| {
format!("Failed to extract video information: '{}'", video.title)
})?;
let size = if let Some(val) = result.filesize {
val
} else if let Some(val) = result.filesize_approx {
val
} else if result.duration.is_some() && result.tbr.is_some() {
let duration = result.duration.expect("Is some").ceil() as u64;
// TODO: yt_dlp gets this from the format
let tbr = result.tbr.expect("Is Some").ceil() as u64;
duration * tbr * (1000 / 8)
} else {
let hardcoded_default = Bytes::from_str("250 MiB").expect("This is hardcoded");
error!(
"Failed to find a filesize for video: '{}' (Using hardcoded value of {})",
video.title, hardcoded_default
);
hardcoded_default.as_u64()
};
assert_eq!(
self.video_size_cache
.insert(video.extractor_hash.clone(), size),
None
);
Ok(size)
}
}
async fn actually_cache_video(app: &App, video: &Video) -> Result<()> {
debug!("Download started: {}", &video.title);
let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?;
let result = yt_dlp::download(&[video.url.clone()], &download_opts(app, addional_opts))
.await
.with_context(|| format!("Failed to download video: '{}'", video.title))?;
assert_eq!(result.len(), 1);
let result = &result[0];
set_video_cache_path(app, &video.extractor_hash, Some(result)).await?;
info!(
"Video '{}' was downlaoded to path: {}",
video.title,
result.display()
);
Ok(())
}
}