// yt - A fully featured command line YouTube client // // Copyright (C) 2025 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use curl::easy::Easy; use log::{error, info, trace, warn}; use pyo3::{ Bound, PyAny, PyErr, PyResult, Python, exceptions, intern, pyfunction, types::{PyAnyMethods, PyDict, PyModule}, wrap_pyfunction, }; use serde::{Deserialize, Serialize}; use crate::{ pydict_cast, pydict_get, python_error::{IntoPythonError, PythonError}, }; /// # Errors /// - If the underlying function returns an error. /// - If python operations fail. pub fn process(py: Python<'_>) -> PyResult> { #[pyfunction] fn actual_processor(info_json: Bound<'_, PyDict>) -> PyResult> { let output = match unwrapped_process(info_json) { Ok(ok) => ok, Err(err) => { return Err(PyErr::new::(err.to_string())); } }; Ok(output) } let module = PyModule::new(py, "rust_post_processors")?; let scope = PyDict::new(py); scope.set_item( intern!(py, "actual_processor"), wrap_pyfunction!(actual_processor, module)?, )?; py.run( c" import yt_dlp class DeArrow(yt_dlp.postprocessor.PostProcessor): def run(self, info): info = actual_processor(info) return [], info inst = DeArrow() ", Some(&scope), None, )?; Ok(scope.get_item(intern!(py, "inst"))?.downcast_into()?) } /// # Errors /// If the API access fails. pub fn unwrapped_process(info: Bound<'_, PyDict>) -> Result, Error> { if pydict_get!(info, "extractor_key", String).as_str() != "Youtube" { return Ok(info); } let mut retry_num = 3; let mut output: DeArrowApi = { loop { let output_bytes = { let mut dst = Vec::new(); let mut easy = Easy::new(); easy.url( format!( "https://sponsor.ajay.app/api/branding?videoID={}", pydict_get!(info, "id", String) ) .as_str(), )?; let mut transfer = easy.transfer(); transfer.write_function(|data| { dst.extend_from_slice(data); Ok(data.len()) })?; transfer.perform()?; drop(transfer); dst }; match serde_json::from_slice(&output_bytes) { Ok(ok) => break ok, Err(err) => { if retry_num > 0 { trace!( "DeArrow: Api access failed, trying again ({retry_num} retries left)" ); retry_num -= 1; } else { let err: serde_json::Error = err; return Err(err.into()); } } } } }; // We pop the titles, so we need this vector reversed. output.titles.reverse(); let title_len = output.titles.len(); let mut iterator = output.titles.clone(); let selected = loop { let Some(title) = iterator.pop() else { break false; }; if (title.locked || title.votes < 1) && title_len > 1 { info!( "DeArrow: Skipping title {:#?}, as it is not good enough", title.value ); // Skip titles that are not “good” enough. continue; } update_title(&info, &title.value).wrap_exc(info.py())?; break true; }; if !selected && title_len != 0 { // No title was selected, even though we had some titles. // Just pick the first one in this case. update_title(&info, &output.titles[0].value).wrap_exc(info.py())?; } Ok(info) } #[derive(thiserror::Error, Debug)] pub enum Error { #[error(transparent)] Python(#[from] PythonError), #[error("Failed to access the DeArrow api: {0}")] Get(#[from] curl::Error), #[error("Failed to deserialize a api json return object: {0}")] Deserialize(#[from] serde_json::Error), } fn update_title(info: &Bound<'_, PyDict>, new_title: &str) -> PyResult<()> { let py = info.py(); assert!(!info.contains(intern!(py, "original_title"))?); if let Ok(old_title) = info.get_item(intern!(py, "title")) { warn!( "DeArrow: Updating title from {:#?} to {:#?}", pydict_cast!(old_title, &str), new_title ); info.set_item(intern!(py, "original_title"), old_title) .expect("We checked, it is a new key"); } else { warn!("DeArrow: Setting title to {new_title:#?}"); } let cleaned_title = { // NOTE(@bpeetz): DeArrow uses `>` as a “Don't format the next word” mark. // They should be removed, if one does not use a auto-formatter. <2025-06-16> new_title.replace('>', "") }; info.set_item(intern!(py, "title"), cleaned_title) .expect("This should work?"); Ok(()) } #[derive(Serialize, Deserialize)] /// See: struct DeArrowApi { titles: Vec, thumbnails: Vec<Thumbnail>, #[serde(alias = "randomTime")] random_time: Option<f64>, #[serde(alias = "videoDuration")] video_duration: Option<f64>, #[serde(alias = "casualVotes")] casual_votes: Vec<CasualVote>, } #[derive(Serialize, Deserialize)] struct CasualVote { id: String, count: u32, title: String, } #[derive(Serialize, Deserialize, Clone)] struct Title { /// Note: Titles will sometimes contain > before a word. /// This tells the auto-formatter to not format a word. /// If you have no auto-formatter, you can ignore this and replace it with an empty string #[serde(alias = "title")] value: String, original: bool, votes: u64, locked: bool, #[serde(alias = "UUID")] uuid: String, /// only present if requested #[serde(alias = "userID")] user_id: Option<String>, } #[derive(Serialize, Deserialize)] struct Thumbnail { // null if original is true timestamp: Option<f64>, original: bool, votes: u64, locked: bool, #[serde(alias = "UUID")] uuid: String, /// only present if requested #[serde(alias = "userID")] user_id: Option<String>, }