aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/src/lib.rs
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2024-08-21 10:49:23 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2024-08-21 11:28:43 +0200
commit1debeb77f7986de1b659dcfdc442de6415e1d9f5 (patch)
tree4df3e7c3f6a2d1ec116e4088c5ace7f143a8b05f /yt_dlp/src/lib.rs
downloadyt-1debeb77f7986de1b659dcfdc442de6415e1d9f5.zip
chore: Initial Commit
This repository was migrated out of my nixos-config.
Diffstat (limited to 'yt_dlp/src/lib.rs')
-rw-r--r--yt_dlp/src/lib.rs410
1 files changed, 410 insertions, 0 deletions
diff --git a/yt_dlp/src/lib.rs b/yt_dlp/src/lib.rs
new file mode 100644
index 0000000..5bb02c1
--- /dev/null
+++ b/yt_dlp/src/lib.rs
@@ -0,0 +1,410 @@
+// yt - A fully featured command line YouTube client
+//
+// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+// This file is part of Yt.
+//
+// You should have received a copy of the License along with this program.
+// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
+
+// use std::{fs::File, io::Write};
+
+use std::{path::PathBuf, sync::Once};
+
+use crate::{duration::Duration, logging::setup_logging, wrapper::info_json::InfoJson};
+
+use log::info;
+use pyo3::types::{PyString, PyTuple, PyTupleMethods};
+use pyo3::{
+ pyfunction,
+ types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyModule},
+ wrap_pyfunction_bound, Bound, PyAny, PyResult, Python,
+};
+use serde::Serialize;
+use serde_json::{Map, Value};
+use url::Url;
+
+pub mod duration;
+pub mod logging;
+pub mod wrapper;
+
+/// Synchronisation helper, to ensure that we don't setup the logger multiple times
+static SYNC_OBJ: Once = Once::new();
+
+/// Add a logger to the yt-dlp options.
+/// If you have an logger set (i.e. for rust), than this will log to rust
+pub fn add_logger_and_sig_handler<'a>(
+ opts: Bound<'a, PyDict>,
+ py: Python,
+) -> PyResult<Bound<'a, PyDict>> {
+ setup_logging(py, "yt_dlp")?;
+
+ let logging = PyModule::import_bound(py, "logging")?;
+ let ytdl_logger = logging.call_method1("getLogger", ("yt_dlp",))?;
+
+ // Ensure that all events are logged by setting the log level to NOTSET (we filter on rust's side)
+ // Also use this static, to ensure that we don't configure the logger every time
+ SYNC_OBJ.call_once(|| {
+ // Disable the SIGINT (Ctrl+C) handler, python installs.
+ // This allows the user to actually stop the application with Ctrl+C.
+ // This is here because it can only be run in the main thread and this was here already.
+ py.run_bound(
+ r#"
+import signal
+signal.signal(signal.SIGINT, signal.SIG_DFL)
+ "#,
+ None,
+ None,
+ )
+ .expect("This code should always work");
+
+ let config_opts = PyDict::new_bound(py);
+ config_opts
+ .set_item("level", 0)
+ .expect("Setting this item should always work");
+
+ logging
+ .call_method("basicConfig", (), Some(&config_opts))
+ .expect("This method exists");
+ });
+
+ // This was taken from `ytcc`, I don't think it is still applicable
+ // ytdl_logger.setattr("propagate", false)?;
+ // let logging_null_handler = logging.call_method0("NullHandler")?;
+ // ytdl_logger.setattr("addHandler", logging_null_handler)?;
+
+ opts.set_item("logger", ytdl_logger).expect("Should work");
+
+ Ok(opts)
+}
+
+#[pyfunction]
+pub fn progress_hook<'a>(py: Python, input: Bound<'_, PyDict>) -> PyResult<()> {
+ let input: serde_json::Map<String, Value> = serde_json::from_str(&json_dumps(
+ py,
+ input
+ .downcast::<PyAny>()
+ .expect("Will always work")
+ .to_owned(),
+ )?)
+ .expect("Python should always produce valid json");
+
+ macro_rules! get {
+ (@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{
+ let a = $item.get($name).expect(concat!(
+ "The field '",
+ stringify!($name),
+ "' should exist."
+ ));
+
+ if a.$type_fun() {
+ a.$get_fun().expect(
+ "The should have been checked in the if guard, so unpacking here is fine",
+ )
+ } else {
+ panic!(
+ "Value {} => \n{}\n is not of type: {}",
+ $name,
+ a,
+ stringify!($type_fun)
+ );
+ }
+ }};
+
+ ($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{
+ let a = get! {@interrogate input, is_object, as_object, $name1};
+ let b = get! {@interrogate a, $type_fun, $get_fun, $name2};
+ b
+ }};
+
+ ($type_fun:ident, $get_fun:ident, $name:expr) => {{
+ get! {@interrogate input, $type_fun, $get_fun, $name}
+ }};
+ }
+
+ macro_rules! default_get {
+ (@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{
+ let a = if let Some(field) = $item.get($name) {
+ field.$get_fun().unwrap_or($default)
+ } else {
+ $default
+ };
+ a
+ }};
+
+ ($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{
+ let a = get! {@interrogate input, is_object, as_object, $name1};
+ let b = default_get! {@interrogate a, $default, $get_fun, $name2};
+ b
+ }};
+
+ ($get_fun:ident, $default:expr, $name:expr) => {{
+ default_get! {@interrogate input, $default, $get_fun, $name}
+ }};
+ }
+
+ macro_rules! c {
+ ($color:expr, $format:expr) => {
+ format!("\x1b[{}m{}\x1b[0m", $color, $format)
+ };
+ }
+
+ fn format_bytes(bytes: u64) -> String {
+ if bytes >= 1_000_000 {
+ format!("{} MB", bytes / 1_000_000)
+ } else if bytes >= 1_000 {
+ format!("{} KB", bytes / 1_000)
+ } else {
+ format!("{} B", bytes)
+ }
+ }
+
+ fn format_speed(speed: f64) -> String {
+ if speed > 1_000_000.0 {
+ format!("{:.02} MB/s", speed / 1_000_000.0)
+ } else if speed > 1_000.0 {
+ format!("{:.02} KB/s", speed / 1_000.0)
+ } else {
+ format!("{:.02} B/s", speed)
+ }
+ }
+
+ let get_title = |add_extension: bool| -> String {
+ match get! {is_string, as_str, "info_dict", "ext"} {
+ "vtt" => {
+ format!(
+ "Subtitles ({})",
+ get! {is_string, as_str, "info_dict", "name"}
+ )
+ }
+ title_extension @ ("webm" | "mp4" | "m4a") => {
+ if add_extension {
+ format!(
+ "{} ({})",
+ default_get! { as_str, "<No title>", "info_dict", "title"},
+ title_extension
+ )
+ } else {
+ default_get! { as_str, "<No title>", "info_dict", "title"}.to_owned()
+ }
+ }
+ other => panic!("The extension '{}' is not yet implemented", other),
+ }
+ };
+
+ match get! {is_string, as_str, "status"} {
+ "downloading" => {
+ let elapsed = default_get! {as_f64, 0.0f64, "elapsed"};
+ let eta = default_get! {as_f64, 0.0, "eta"};
+ let speed = default_get! {as_f64, 0.0, "speed"};
+
+ let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"};
+ let total_bytes = default_get!(as_u64, 0, "total_bytes");
+
+ let percent: f64 = {
+ if total_bytes == 0 {
+ 100.0
+ } else {
+ (downloaded_bytes as f64 / total_bytes as f64) * 100.0
+ }
+ };
+
+ print!("\x1b[1F"); // Move one line up, to allow the `println` after it to print a newline
+ print!("\x1b[2K"); // Clear whole line.
+ print!("\x1b[1G"); // Move cursor to column 1.
+
+ println!(
+ "'{}' [{}/{} at {}] -> [{}/{} {}]",
+ c!("34;1", get_title(true)),
+ c!("33;1", Duration::from(Some(elapsed))),
+ c!("33;1", Duration::from(Some(eta))),
+ c!("32;1", format_speed(speed)),
+ c!("31;1", format_bytes(downloaded_bytes)),
+ c!("31;1", format_bytes(total_bytes)),
+ c!("36;1", format!("{:.02}%", percent))
+ );
+ }
+ "finished" => {
+ println!("Finished downloading: '{}'", c!("34;1", get_title(false)))
+ }
+ "error" => {
+ panic!("Error whilst downloading: {}", get_title(true))
+ }
+ other => panic!("{} is not a valid state!", other),
+ };
+
+ Ok(())
+}
+
+pub fn add_hooks<'a>(opts: Bound<'a, PyDict>, py: Python) -> PyResult<Bound<'a, PyDict>> {
+ if let Some(hooks) = opts.get_item("progress_hooks")? {
+ let hooks = hooks.downcast::<PyList>()?;
+ hooks.append(wrap_pyfunction_bound!(progress_hook, py)?)?;
+
+ opts.set_item("progress_hooks", hooks)?;
+ } else {
+ // No hooks are set yet
+ let hooks_list = PyList::new_bound(py, &[wrap_pyfunction_bound!(progress_hook, py)?]);
+
+ opts.set_item("progress_hooks", hooks_list)?;
+ }
+
+ Ok(opts)
+}
+
+/// `extract_info(self, url, download=True, ie_key=None, extra_info=None, process=True, force_generic_extractor=False)`
+///
+/// Extract and return the information dictionary of the URL
+///
+/// Arguments:
+/// @param url URL to extract
+///
+/// Keyword arguments:
+/// @param download Whether to download videos
+/// @param process Whether to resolve all unresolved references (URLs, playlist items).
+/// Must be True for download to work
+/// @param ie_key Use only the extractor with this key
+///
+/// @param extra_info Dictionary containing the extra values to add to the info (For internal use only)
+/// @force_generic_extractor Force using the generic extractor (Deprecated; use ie_key='Generic')
+pub async fn extract_info(
+ yt_dlp_opts: &Map<String, Value>,
+ url: &Url,
+ download: bool,
+ process: bool,
+) -> PyResult<InfoJson> {
+ Python::with_gil(|py| {
+ let opts = json_map_to_py_dict(yt_dlp_opts, py)?;
+
+ let instance = get_yt_dlp(py, opts)?;
+ let args = (url.as_str(),);
+
+ let kwargs = PyDict::new_bound(py);
+ kwargs.set_item("download", download)?;
+ kwargs.set_item("process", process)?;
+
+ let result = instance.call_method("extract_info", args, Some(&kwargs))?;
+
+ // Remove the `<generator at 0xsome_hex>`, by setting it to null
+ if !process {
+ result.set_item("entries", ())?;
+ }
+
+ let result_str = json_dumps(py, result)?;
+
+ //let mut file = File::create("output.info.json").unwrap();
+ //write!(file, "{}", result_str).unwrap();
+
+ Ok(serde_json::from_str(&result_str)
+ .expect("Python should be able to produce correct json"))
+ })
+}
+
+pub fn unsmuggle_url(smug_url: Url) -> PyResult<Url> {
+ Python::with_gil(|py| {
+ let utils = get_yt_dlp_utils(py)?;
+ let url = utils
+ .call_method1("unsmuggle_url", (smug_url.as_str(),))?
+ .downcast::<PyTuple>()?
+ .get_item(0)?;
+
+ let url: Url = url
+ .downcast::<PyString>()?
+ .to_string()
+ .parse()
+ .expect("Python should be able to return a valid url");
+
+ Ok(url)
+ })
+}
+
+/// Download a given list of URLs.
+/// Returns the paths they were downloaded to.
+pub async fn download(
+ urls: &[Url],
+ download_options: &Map<String, Value>,
+) -> PyResult<Vec<PathBuf>> {
+ let mut out_paths = Vec::with_capacity(urls.len());
+
+ for url in urls {
+ info!("Started downloading url: '{}'", url);
+ let info_json = extract_info(download_options, url, true, true).await?;
+
+ let result_string = if let Some(filename) = info_json.filename {
+ // Try to work around yt-dlp type weirdness
+ filename
+ } else {
+ (&info_json.requested_downloads.expect("This must exist")[0].filename).to_owned()
+ };
+
+ out_paths.push(result_string);
+ info!("Finished downloading url: '{}'", url);
+ }
+
+ Ok(out_paths)
+}
+
+fn json_map_to_py_dict<'a>(
+ map: &Map<String, Value>,
+ py: Python<'a>,
+) -> PyResult<Bound<'a, PyDict>> {
+ let json_string = serde_json::to_string(&map).expect("This must always work");
+
+ let python_dict = json_loads(py, json_string)?;
+
+ Ok(python_dict)
+}
+
+fn json_dumps(py: Python, input: Bound<PyAny>) -> PyResult<String> {
+ // json.dumps(yt_dlp.sanitize_info(input))
+
+ let yt_dlp = get_yt_dlp(py, PyDict::new_bound(py))?;
+ let sanitized_result = yt_dlp.call_method1("sanitize_info", (input,))?;
+
+ let json = PyModule::import_bound(py, "json")?;
+ let dumps = json.getattr("dumps")?;
+
+ let output = dumps.call1((sanitized_result,))?;
+
+ let output_str = output.extract::<String>()?;
+
+ Ok(output_str)
+}
+
+fn json_loads_str<T: Serialize>(py: Python, input: T) -> PyResult<Bound<PyDict>> {
+ let string = serde_json::to_string(&input).expect("Correct json must be pased");
+
+ json_loads(py, string)
+}
+
+fn json_loads(py: Python, input: String) -> PyResult<Bound<PyDict>> {
+ // json.loads(input)
+
+ let json = PyModule::import_bound(py, "json")?;
+ let dumps = json.getattr("loads")?;
+
+ let output = dumps.call1((input,))?;
+
+ Ok(output
+ .downcast::<PyDict>()
+ .expect("This should always be a PyDict")
+ .clone())
+}
+
+fn get_yt_dlp_utils<'a>(py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
+ let yt_dlp = PyModule::import_bound(py, "yt_dlp")?;
+ let utils = yt_dlp.getattr("utils")?;
+
+ Ok(utils)
+}
+fn get_yt_dlp<'a>(py: Python<'a>, opts: Bound<'a, PyDict>) -> PyResult<Bound<'a, PyAny>> {
+ // Unconditionally set a logger
+ let opts = add_logger_and_sig_handler(opts, py)?;
+ let opts = add_hooks(opts, py)?;
+
+ let yt_dlp = PyModule::import_bound(py, "yt_dlp")?;
+ let youtube_dl = yt_dlp.call_method1("YoutubeDL", (opts,))?;
+
+ Ok(youtube_dl)
+}