use std::{
fs,
io::{stderr, stdout, Read},
mem,
path::PathBuf,
process::Command,
sync::mpsc::{self, Receiver, Sender},
thread::{self, JoinHandle},
};
use anyhow::{bail, Context, Result};
use log::debug;
use crate::PlayThing;
const YT_DLP_FLAGS: [&str; 12] = [
"--format",
"bestvideo[height<=?1080]+bestaudio/best",
"--embed-chapters",
"--progress",
"--write-comments",
"--extractor-args",
"youtube:max_comments=150,all,100;comment_sort=top",
"--write-info-json",
"--sponsorblock-mark",
"default",
"--sponsorblock-remove",
"sponsor",
];
const CONCURRENT: u32 = 5;
const DOWNLOAD_DIR: &str = "/tmp/ytcc";
pub struct Downloader {
sent: usize,
download_thread: JoinHandle<Result<()>>,
orx: Receiver<(PathBuf, Option<u32>)>,
itx: Option<Sender<PlayThing>>,
playspec: Vec<PlayThing>,
}
impl Downloader {
pub fn new(mut playspec: Vec<PlayThing>) -> anyhow::Result<Downloader> {
let (itx, irx): (Sender<PlayThing>, Receiver<PlayThing>) = mpsc::channel();
let (otx, orx) = mpsc::channel();
let jh = thread::spawn(move || -> Result<()> {
while let Some(pt) = irx.recv().ok() {
debug!("Got '{}|{}' to be downloaded", pt.url, pt.id.unwrap_or(0));
let path = download_url(&pt.url)
.with_context(|| format!("Failed to download url: '{}'", &pt.url))?;
otx.send((path, pt.id)).expect("Should not be dropped");
}
debug!("Finished Downloading everything");
Ok(())
});
playspec.reverse();
let mut output = Downloader {
sent: 0,
download_thread: jh,
orx,
itx: Some(itx),
playspec,
};
if output.playspec.len() <= CONCURRENT as usize {
output.add(output.playspec.len() as u32)?;
} else {
output.add(CONCURRENT)?;
}
Ok(output)
}
pub fn add(&mut self, number_to_add: u32) -> Result<()> {
debug!("Adding {} to be downloaded concurrently", number_to_add);
for _ in 0..number_to_add {
let pt = self.playspec.pop().context("No more playthings to pop")?;
self.itx.as_ref().expect("Should still be valid").send(pt)?;
}
Ok(())
}
/// Return the next video already downloaded, will block until the download is complete
pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> {
debug!("Requesting next output");
match self.orx.recv() {
Ok(ok) => {
debug!("Output downloaded to: {}", ok.0.display());
self.sent += 1;
if self.sent < self.playspec.len() {
debug!("Will add 1");
self.add(1).ok()?;
} else {
debug!("Will drop sender");
let itx = mem::take(&mut self.itx);
drop(itx)
}
debug!("Returning: {:#?}", ok);
Some(ok)
}
Err(err) => {
debug!("Recieved error while listening: {}", err);
None
}
}
}
pub fn drop(self) -> anyhow::Result<()> {
match self.download_thread.join() {
Ok(ok) => ok,
Err(err) => panic!("Can't join thread: '{:#?}'", err),
}
}
}
fn download_url(url: &str) -> Result<PathBuf> {
let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?;
output_file
.as_file()
.set_len(0)
.context("Failed to truncate temp-file")?;
if !Into::<PathBuf>::into(DOWNLOAD_DIR).exists() {
fs::create_dir_all(DOWNLOAD_DIR)
.with_context(|| format!("Failed to create download dir at: {}", DOWNLOAD_DIR))?
}
let mut yt_dlp = Command::new("yt-dlp");
yt_dlp.current_dir(DOWNLOAD_DIR);
yt_dlp.stdout(stdout());
yt_dlp.stderr(stderr());
yt_dlp.args(YT_DLP_FLAGS);
yt_dlp.args([
"--output",
"%(channel)s/%(title)s.%(ext)s",
url,
"--print-to-file",
"after_move:filepath",
]);
yt_dlp.arg(output_file.path().as_os_str());
let status = yt_dlp.status().context("Failed to run yt-dlp")?;
if let Some(code) = status.code() {
if code != 0 {
bail!("yt_dlp execution failed with error: '{}'", status);
}
}
let mut path = String::new();
output_file
.as_file()
.read_to_string(&mut path)
.context("Failed to read output file temp file")?;
let path = path.trim();
Ok(path.into())
}