1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
// yt - A fully featured command line YouTube client
//
// Copyright (C) 2025 Benedikt Peetz <benedikt.peetz@b-peetz.de>
// SPDX-License-Identifier: GPL-3.0-or-later
//
// This file is part of Yt.
//
// You should have received a copy of the License along with this program.
// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
use std::fmt::Display;
use log::{Level, debug, log_enabled};
use rustpython::vm::{
AsObject, PyPayload, PyRef, VirtualMachine,
builtins::{PyBaseException, PyBaseExceptionRef, PyStr},
py_io::Write,
suggestion::offer_suggestions,
};
#[derive(thiserror::Error, Debug)]
pub struct PythonError(pub String);
impl Display for PythonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Python threw an exception: {}", self.0)
}
}
impl PythonError {
pub(super) fn from_exception(vm: &VirtualMachine, exc: &PyRef<PyBaseException>) -> Self {
let buffer = process_exception(vm, exc);
Self(buffer)
}
}
pub(super) fn process_exception(vm: &VirtualMachine, err: &PyBaseExceptionRef) -> String {
let mut buffer = String::new();
write_exception(vm, &mut buffer, err)
.expect("We are writing into an *in-memory* string, it will always work");
if log_enabled!(Level::Debug) {
let mut output = String::new();
vm.write_exception(&mut output, err)
.expect("We are writing into an *in-memory* string, it will always work");
debug!("Python threw an exception: {output}");
}
buffer
}
// Inlined and changed from `vm.write_exception_inner`
fn write_exception<W: Write>(
vm: &VirtualMachine,
output: &mut W,
exc: &PyBaseExceptionRef,
) -> Result<(), W::Error> {
let varargs = exc.args();
let args_repr = {
match varargs.len() {
0 => vec![],
1 => {
let args0_repr = if true {
varargs[0]
.str(vm)
.unwrap_or_else(|_| PyStr::from("<element str() failed>").into_ref(&vm.ctx))
} else {
varargs[0].repr(vm).unwrap_or_else(|_| {
PyStr::from("<element repr() failed>").into_ref(&vm.ctx)
})
};
vec![args0_repr]
}
_ => varargs
.iter()
.map(|vararg| {
vararg.repr(vm).unwrap_or_else(|_| {
PyStr::from("<element repr() failed>").into_ref(&vm.ctx)
})
})
.collect(),
}
};
let exc_class = exc.class();
if exc_class.fast_issubclass(vm.ctx.exceptions.syntax_error) {
unreachable!(
"A syntax error should never be raised, \
as yt_dlp should not have them and neither our embedded code"
);
}
let exc_name = exc_class.name();
match args_repr.len() {
0 => write!(output, "{exc_name}"),
1 => write!(output, "{}: {}", exc_name, args_repr[0]),
_ => write!(
output,
"{}: ({})",
exc_name,
args_repr
.iter()
.map(|val| val.as_str())
.collect::<Vec<_>>()
.join(", "),
),
}?;
match offer_suggestions(exc, vm) {
Some(suggestions) => {
write!(output, ". Did you mean: '{suggestions}'?")
}
None => Ok(()),
}
}
|