about summary refs log tree commit diff stats
path: root/sys/nixpkgs/pkgs/ytc/src/downloader.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sys/nixpkgs/pkgs/ytc/src/downloader.rs146
1 files changed, 146 insertions, 0 deletions
diff --git a/sys/nixpkgs/pkgs/ytc/src/downloader.rs b/sys/nixpkgs/pkgs/ytc/src/downloader.rs
new file mode 100644
index 00000000..9509278c
--- /dev/null
+++ b/sys/nixpkgs/pkgs/ytc/src/downloader.rs
@@ -0,0 +1,146 @@
+use std::{
+    io::{stderr, stdout, Read},
+    mem,
+    path::PathBuf,
+    process::Command,
+    sync::mpsc::{self, Receiver, Sender},
+    thread::{self, JoinHandle},
+};
+
+use anyhow::{bail, Context, Result};
+use log::debug;
+
+use crate::PlayThing;
+
+const YT_DLP_FLAGS: [&str; 12] = [
+    "--format",
+    "bestvideo[height<=?1080]+bestaudio/best",
+    "--embed-chapters",
+    "--progress",
+    "--write-comments",
+    "--extractor-args",
+    "youtube:max_comments=150,all,100;comment_sort=top",
+    "--write-info-json",
+    "--sponsorblock-mark",
+    "default",
+    "--sponsorblock-remove",
+    "sponsor",
+];
+
+const CONCURRENT: u32 = 5;
+
+const DOWNLOAD_DIR: &str = "/tmp/ytcc";
+
+pub struct Downloader {
+    sent: usize,
+    download_thread: JoinHandle<Result<()>>,
+    orx: Receiver<(PathBuf, Option<u32>)>,
+    itx: Option<Sender<PlayThing>>,
+    playspec: Vec<PlayThing>,
+}
+
+impl Downloader {
+    pub fn new(mut playspec: Vec<PlayThing>) -> anyhow::Result<Downloader> {
+        let (itx, irx): (Sender<PlayThing>, Receiver<PlayThing>) = mpsc::channel();
+        let (otx, orx) = mpsc::channel();
+        let jh = thread::spawn(move || -> Result<()> {
+            while let Some(pt) = irx.recv().ok() {
+                debug!("Got '{}|{}' to be downloaded", pt.url, pt.id.unwrap_or(0));
+                let path = download_url(&pt.url)
+                    .with_context(|| format!("Failed to download url: '{}'", &pt.url))?;
+                otx.send((path, pt.id)).expect("Should not be dropped");
+            }
+            debug!("Finished Downloading everything");
+            Ok(())
+        });
+
+        playspec.reverse();
+        let mut output = Downloader {
+            sent: 0,
+            download_thread: jh,
+            orx,
+            itx: Some(itx),
+            playspec,
+        };
+        if output.playspec.len() <= CONCURRENT as usize {
+            output.add(output.playspec.len() as u32)?;
+        } else {
+            output.add(CONCURRENT)?;
+        }
+        Ok(output)
+    }
+
+    pub fn add(&mut self, number_to_add: u32) -> Result<()> {
+        debug!("Adding {} to be downloaded concurrently", number_to_add);
+        for _ in 0..number_to_add {
+            let pt = self.playspec.pop().context("No more playthings to pop")?;
+            self.itx.as_ref().expect("Should still be valid").send(pt)?;
+        }
+        Ok(())
+    }
+
+    /// Return the next video already downloaded, will block until the download is complete
+    pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> {
+        debug!("Requesting next output");
+        match self.orx.recv() {
+            Ok(ok) => {
+                debug!("Output downloaded to: {}", ok.0.display());
+                self.sent += 1;
+                if self.sent < self.playspec.len() {
+                    debug!("Will add 1");
+                    self.add(1).ok()?;
+                } else {
+                    debug!("Will drop sender");
+                    let itx = mem::take(&mut self.itx);
+                    drop(itx)
+                }
+                debug!("Returning: {:#?}", ok);
+                Some(ok)
+            }
+            Err(err) => {
+                debug!("Recieved error while listening: {}", err);
+                None
+            }
+        }
+    }
+    pub fn drop(self) -> anyhow::Result<()> {
+        match self.download_thread.join() {
+            Ok(ok) => ok,
+            Err(err) => panic!("Can't join thread: '{:#?}'", err),
+        }
+    }
+}
+
+fn download_url(url: &str) -> Result<PathBuf> {
+    let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?;
+    output_file
+        .as_file()
+        .set_len(0)
+        .context("Failed to truncate temp-file")?;
+    let mut yt_dlp = Command::new("yt-dlp");
+    yt_dlp.current_dir(DOWNLOAD_DIR);
+    yt_dlp.stdout(stdout());
+    yt_dlp.stderr(stderr());
+    yt_dlp.args(YT_DLP_FLAGS);
+    yt_dlp.args([
+        "--output",
+        "%(channel)s/%(title)s.%(ext)s",
+        url,
+        "--print-to-file",
+        "after_move:filepath",
+    ]);
+    yt_dlp.arg(output_file.path().as_os_str());
+    let status = yt_dlp.status().context("Failed to run yt_dlp")?;
+    if let Some(code) = status.code() {
+        if code != 0 {
+            bail!("yt_dlp execution failed with error: '{}'", status);
+        }
+    }
+    let mut path = String::new();
+    output_file
+        .as_file()
+        .read_to_string(&mut path)
+        .context("Failed to read output file temp file")?;
+    let path = path.trim();
+    Ok(path.into())
+}