// yt - A fully featured command line YouTube client // // Copyright (C) 2024 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . // This file is taken from: https://github.com/dylanbstorey/pyo3-pylogger/blob/d89e0d6820ebc4f067647e3b74af59dbc4941dd5/src/lib.rs // It is licensed under the Apache 2.0 License, copyright up to 2024 by Dylan Storey // It was modified by Benedikt Peetz 2024, 2025 use log::{Level, MetadataBuilder, Record, logger}; use rustpython::vm::{ PyObjectRef, PyRef, PyResult, VirtualMachine, builtins::{PyInt, PyList, PyStr}, convert::ToPyObject, function::FuncArgs, }; /// Consume a Python `logging.LogRecord` and emit a Rust `Log` instead. fn host_log(mut input: FuncArgs, vm: &VirtualMachine) -> PyResult<()> { let record = input.args.remove(0); let rust_target = { let base: PyRef = input.args.remove(0).downcast().expect("Should be a string"); base.as_str().to_owned() }; let level = { let level: PyRef = record .get_attr("levelno", vm)? .downcast() .expect("Should always be an int"); level.as_u32_mask() }; let message = { let get_message = record.get_attr("getMessage", vm)?; let message: PyRef = get_message .call((), vm)? .downcast() .expect("Downcasting works"); message.as_str().to_owned() }; let pathname = { let pathname: PyRef = record .get_attr("pathname", vm)? .downcast() .expect("Is a string"); pathname.as_str().to_owned() }; let lineno = { let lineno: PyRef = record .get_attr("lineno", vm)? .downcast() .expect("Is a number"); lineno.as_u32_mask() }; let logger_name = { let name: PyRef = record .get_attr("name", vm)? .downcast() .expect("Should be a string"); name.as_str().to_owned() }; let full_target: Option = if logger_name.trim().is_empty() || logger_name == "root" { None } else { // Libraries (ex: tracing_subscriber::filter::Directive) expect rust-style targets like foo::bar, // and may not deal well with "." as a module separator: let logger_name = logger_name.replace('.', "::"); Some(format!("{rust_target}::{logger_name}")) }; let target = full_target.as_deref().unwrap_or(&rust_target); // error let error_metadata = if level >= 40 { MetadataBuilder::new() .target(target) .level(Level::Error) .build() } else if level >= 30 { MetadataBuilder::new() .target(target) .level(Level::Warn) .build() } else if level >= 20 { MetadataBuilder::new() .target(target) .level(Level::Info) .build() } else if level >= 10 { MetadataBuilder::new() .target(target) .level(Level::Debug) .build() } else { MetadataBuilder::new() .target(target) .level(Level::Trace) .build() }; logger().log( &Record::builder() .metadata(error_metadata) .args(format_args!("{}", &message)) .line(Some(lineno)) .file(None) .module_path(Some(&pathname)) .build(), ); Ok(()) } /// Registers the `host_log` function in rust as the event handler for Python's logging logger /// This function needs to be called from within a pyo3 context as early as possible to ensure logging messages /// arrive to the rust consumer. /// /// # Panics /// Only if internal assertions fail. #[allow(clippy::module_name_repetitions)] pub(super) fn setup_logging(vm: &VirtualMachine, target: &str) -> PyResult { let logging = vm.import("logging", 0)?; let scope = vm.new_scope_with_builtins(); for (key, value) in logging.dict().expect("Should be a dict") { let key: PyRef = key.downcast().expect("Is a string"); scope.globals.set_item(key.as_str(), value, vm)?; } scope .globals .set_item("host_log", vm.new_function("host_log", host_log).into(), vm)?; let local_scope = scope.clone(); vm.run_code_string( local_scope, format!( r#" class HostHandler(Handler): def __init__(self, level=0): super().__init__(level=level) def emit(self, record): host_log(record,"{target}") oldBasicConfig = basicConfig def basicConfig(*pargs, **kwargs): if "handlers" not in kwargs: kwargs["handlers"] = [HostHandler()] return oldBasicConfig(*pargs, **kwargs) "# ) .as_str(), "".to_owned(), )?; let all: PyRef = logging .get_attr("__all__", vm)? .downcast() .expect("Is a list"); all.borrow_vec_mut().push(vm.new_pyobj("HostHandler")); // { // let logging_dict = logging.dict().expect("Exists"); // // for (key, val) in scope.globals { // let key: PyRef = key.downcast().expect("Is a string"); // // if !logging_dict.contains_key(key.as_str(), vm) { // logging_dict.set_item(key.as_str(), val, vm)?; // } // } // // for (key, val) in scope.locals { // let key: PyRef = key.downcast().expect("Is a string"); // // if !logging_dict.contains_key(key.as_str(), vm) { // logging_dict.set_item(key.as_str(), val, vm)?; // } // } // } Ok(scope.globals.to_pyobject(vm)) }