about summary refs log blame commit diff stats
path: root/sys/nixpkgs/pkgs/ytc/src/downloader.rs
blob: f1bbd16ac0559273c66be55f868b8640713973ce (plain) (tree)
1
2
          
       




















































































































                                                                                           


                                                                                            











                                               
                                                                  











                                                                      
use std::{
    fs,
    io::{stderr, stdout, Read},
    mem,
    path::PathBuf,
    process::Command,
    sync::mpsc::{self, Receiver, Sender},
    thread::{self, JoinHandle},
};

use anyhow::{bail, Context, Result};
use log::debug;

use crate::PlayThing;

const YT_DLP_FLAGS: [&str; 12] = [
    "--format",
    "bestvideo[height<=?1080]+bestaudio/best",
    "--embed-chapters",
    "--progress",
    "--write-comments",
    "--extractor-args",
    "youtube:max_comments=150,all,100;comment_sort=top",
    "--write-info-json",
    "--sponsorblock-mark",
    "default",
    "--sponsorblock-remove",
    "sponsor",
];

const CONCURRENT: u32 = 5;

const DOWNLOAD_DIR: &str = "/tmp/ytcc";

pub struct Downloader {
    sent: usize,
    download_thread: JoinHandle<Result<()>>,
    orx: Receiver<(PathBuf, Option<u32>)>,
    itx: Option<Sender<PlayThing>>,
    playspec: Vec<PlayThing>,
}

impl Downloader {
    pub fn new(mut playspec: Vec<PlayThing>) -> anyhow::Result<Downloader> {
        let (itx, irx): (Sender<PlayThing>, Receiver<PlayThing>) = mpsc::channel();
        let (otx, orx) = mpsc::channel();
        let jh = thread::spawn(move || -> Result<()> {
            while let Some(pt) = irx.recv().ok() {
                debug!("Got '{}|{}' to be downloaded", pt.url, pt.id.unwrap_or(0));
                let path = download_url(&pt.url)
                    .with_context(|| format!("Failed to download url: '{}'", &pt.url))?;
                otx.send((path, pt.id)).expect("Should not be dropped");
            }
            debug!("Finished Downloading everything");
            Ok(())
        });

        playspec.reverse();
        let mut output = Downloader {
            sent: 0,
            download_thread: jh,
            orx,
            itx: Some(itx),
            playspec,
        };
        if output.playspec.len() <= CONCURRENT as usize {
            output.add(output.playspec.len() as u32)?;
        } else {
            output.add(CONCURRENT)?;
        }
        Ok(output)
    }

    pub fn add(&mut self, number_to_add: u32) -> Result<()> {
        debug!("Adding {} to be downloaded concurrently", number_to_add);
        for _ in 0..number_to_add {
            let pt = self.playspec.pop().context("No more playthings to pop")?;
            self.itx.as_ref().expect("Should still be valid").send(pt)?;
        }
        Ok(())
    }

    /// Return the next video already downloaded, will block until the download is complete
    pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> {
        debug!("Requesting next output");
        match self.orx.recv() {
            Ok(ok) => {
                debug!("Output downloaded to: {}", ok.0.display());
                self.sent += 1;
                if self.sent < self.playspec.len() {
                    debug!("Will add 1");
                    self.add(1).ok()?;
                } else {
                    debug!("Will drop sender");
                    let itx = mem::take(&mut self.itx);
                    drop(itx)
                }
                debug!("Returning: {:#?}", ok);
                Some(ok)
            }
            Err(err) => {
                debug!("Recieved error while listening: {}", err);
                None
            }
        }
    }
    pub fn drop(self) -> anyhow::Result<()> {
        match self.download_thread.join() {
            Ok(ok) => ok,
            Err(err) => panic!("Can't join thread: '{:#?}'", err),
        }
    }
}

fn download_url(url: &str) -> Result<PathBuf> {
    let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?;
    output_file
        .as_file()
        .set_len(0)
        .context("Failed to truncate temp-file")?;
    if !Into::<PathBuf>::into(DOWNLOAD_DIR).exists() {
        fs::create_dir_all(DOWNLOAD_DIR)
            .with_context(|| format!("Failed to create download dir at: {}", DOWNLOAD_DIR))?
    }
    let mut yt_dlp = Command::new("yt-dlp");
    yt_dlp.current_dir(DOWNLOAD_DIR);
    yt_dlp.stdout(stdout());
    yt_dlp.stderr(stderr());
    yt_dlp.args(YT_DLP_FLAGS);
    yt_dlp.args([
        "--output",
        "%(channel)s/%(title)s.%(ext)s",
        url,
        "--print-to-file",
        "after_move:filepath",
    ]);
    yt_dlp.arg(output_file.path().as_os_str());
    let status = yt_dlp.status().context("Failed to run yt-dlp")?;
    if let Some(code) = status.code() {
        if code != 0 {
            bail!("yt_dlp execution failed with error: '{}'", status);
        }
    }
    let mut path = String::new();
    output_file
        .as_file()
        .read_to_string(&mut path)
        .context("Failed to read output file temp file")?;
    let path = path.trim();
    Ok(path.into())
}