use std::{ fs::{self, canonicalize}, io::{stderr, stdout, Read}, mem, os::unix::fs::symlink, path::PathBuf, process::Command, sync::mpsc::{self, Receiver, Sender}, thread::{self, JoinHandle}, }; use anyhow::{bail, Context, Result}; use log::{debug, error, warn}; use url::Url; use crate::constants::{status_path, CONCURRENT, DOWNLOAD_DIR, MPV_FLAGS, YT_DLP_FLAGS}; #[derive(Debug)] pub struct Downloadable { pub url: Url, pub id: Option<u32>, } impl std::fmt::Display for Downloadable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { write!( f, "{}|{}", self.url.as_str().replace('|', ";"), self.id.unwrap_or(0), ) } } pub struct Downloader { sent: usize, download_thread: JoinHandle<Result<()>>, orx: Receiver<(PathBuf, Option<u32>)>, itx: Option<Sender<Downloadable>>, playspec: Vec<Downloadable>, } impl Downloader { pub fn new(mut playspec: Vec<Downloadable>) -> anyhow::Result<Downloader> { let (itx, irx): (Sender<Downloadable>, Receiver<Downloadable>) = mpsc::channel(); let (otx, orx) = mpsc::channel(); let jh = thread::spawn(move || -> Result<()> { while let Ok(pt) = irx.recv() { debug!("Got '{}' to be downloaded", pt); let path = download_url(&pt.url) .with_context(|| format!("Failed to download url: '{}'", &pt.url))?; otx.send((path, pt.id)).expect("Should not be dropped"); } debug!("Finished Downloading everything"); Ok(()) }); playspec.reverse(); let mut output = Downloader { sent: 0, download_thread: jh, orx, itx: Some(itx), playspec, }; if output.playspec.len() <= CONCURRENT as usize { output.add(output.playspec.len() as u32)?; } else { output.add(CONCURRENT)?; } Ok(output) } pub fn add(&mut self, number_to_add: u32) -> Result<()> { debug!("Adding {} to be downloaded concurrently", number_to_add); for _ in 0..number_to_add { let pt = self.playspec.pop().expect("This call should be guarded"); self.itx.as_ref().expect("Should still be valid").send(pt)?; self.sent += 1; } Ok(()) } /// Return the next video already downloaded, will block until the download is complete pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> { debug!("Requesting next output"); match self.orx.recv() { Ok(ok) => { debug!("Output downloaded to: {}", ok.0.display()); if !self.playspec.is_empty() { self.add(1).ok()?; } else { debug!( "Done sending videos to be downloaded, downoladed: {} videos", self.sent ); let itx = mem::take(&mut self.itx); drop(itx) } debug!("Returning: {}|{}", ok.0.display(), ok.1.unwrap_or(0)); Some(ok) } Err(err) => { debug!("Received error while listening: {}", err); None } } } pub fn drop(self) -> anyhow::Result<()> { // Check that we really downloaded everything assert_eq!(self.playspec.len(), 0); match self.download_thread.join() { Ok(ok) => ok, Err(err) => panic!("Failed to join downloader thread: '{:#?}'", err), } } pub fn consume(mut self) -> anyhow::Result<()> { while let Some((path, id)) = self.next() { debug!("Next path to play is: '{}'", path.display()); let mut info_json = canonicalize(&path).context("Failed to canoncialize path")?; info_json.set_extension("info.json"); if status_path()?.is_symlink() { fs::remove_file(status_path()?).context("Failed to delete old status file")?; } else if !status_path()?.exists() { debug!( "The status path at '{}' does not exists", status_path()?.display() ); } else { bail!( "The status path ('{}') is not a symlink but exists!", status_path()?.display() ); } symlink(info_json, status_path()?).context("Failed to symlink")?; let mut mpv = Command::new("mpv"); mpv.stdout(stdout()); mpv.stderr(stderr()); mpv.args(MPV_FLAGS); // TODO: Set the title to the name of the video, not the path <2024-02-09> // mpv.arg(format!("--title=")) mpv.arg(&path); let status = mpv.status().context("Failed to run mpv")?; if status.success() { fs::remove_file(&path)?; if let Some(id) = id { println!("\x1b[32;1mMarking {} as watched!\x1b[0m", id); let mut ytcc = std::process::Command::new("ytcc"); ytcc.stdout(stdout()); ytcc.stderr(stderr()); ytcc.args(["mark"]); ytcc.arg(id.to_string()); let status = ytcc.status().context("Failed to run ytcc")?; if let Some(code) = status.code() { if code != 0 { bail!("Ytcc failed with status: {}", code); } } } debug!("mpv exited with: '{}'", status); } else { warn!("mpv exited with: '{}'", status); } } self.drop()?; Ok(()) } } fn download_url(url: &Url) -> Result<PathBuf> { let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?; output_file .as_file() .set_len(0) .context("Failed to truncate temp-file")?; if !Into::<PathBuf>::into(DOWNLOAD_DIR).exists() { fs::create_dir_all(DOWNLOAD_DIR) .with_context(|| format!("Failed to create download dir at: {}", DOWNLOAD_DIR))? } let mut yt_dlp = Command::new("yt-dlp"); yt_dlp.current_dir(DOWNLOAD_DIR); yt_dlp.stdout(stdout()); yt_dlp.stderr(stderr()); yt_dlp.args(YT_DLP_FLAGS); yt_dlp.args([ "--output", "%(channel)s/%(title)s.%(ext)s", url.as_str(), "--print-to-file", "after_move:filepath", ]); yt_dlp.arg(output_file.path().as_os_str()); let status = yt_dlp.status().context("Failed to run yt-dlp")?; if !status.success() { error!("yt-dlp execution failed with error: '{}'", status); } let mut path = String::new(); output_file .as_file() .read_to_string(&mut path) .context("Failed to read output file temp file")?; let path = path.trim(); Ok(path.into()) }