aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/src/main.rs2
-rw-r--r--yt/src/update/mod.rs151
-rw-r--r--yt/src/update/updater.rs162
3 files changed, 207 insertions, 108 deletions
diff --git a/yt/src/main.rs b/yt/src/main.rs
index 4378346..5b5eda4 100644
--- a/yt/src/main.rs
+++ b/yt/src/main.rs
@@ -173,7 +173,7 @@ async fn main() -> Result<()> {
let max_backlog = max_backlog.unwrap_or(app.config.update.max_backlog);
- update::update(&app, max_backlog, subscriptions, verbosity).await?;
+ update::update(&app, max_backlog, subscriptions).await?;
}
Command::Subscriptions { cmd } => match cmd {
SubscriptionCommand::Add { name, url } => {
diff --git a/yt/src/update/mod.rs b/yt/src/update/mod.rs
index 3cdc61b..730e7c0 100644
--- a/yt/src/update/mod.rs
+++ b/yt/src/update/mod.rs
@@ -8,116 +8,63 @@
// You should have received a copy of the License along with this program.
// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
-use std::{collections::HashMap, process::Stdio, str::FromStr, string::ToString};
+use std::str::FromStr;
use anyhow::{Context, Ok, Result};
use chrono::{DateTime, Utc};
-use log::{error, info, warn};
-use tokio::{
- io::{AsyncBufReadExt, BufReader},
- process::Command,
-};
+use log::{info, warn};
use url::Url;
use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson};
use crate::{
app::App,
storage::{
- subscriptions::{get, Subscription},
+ subscriptions::{self, Subscription},
video_database::{
extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video,
VideoStatus,
},
},
- unreachable::Unreachable,
videos::display::format_video::FormatVideo,
};
+mod updater;
+use updater::Updater;
+
pub async fn update(
app: &App,
- max_backlog: u32,
- subs_to_update: Vec<String>,
- verbosity: u8,
+ max_backlog: usize,
+ subscription_names_to_update: Vec<String>,
) -> Result<()> {
- let subscriptions = get(app).await?;
- let mut back_subs: HashMap<Url, Subscription> = HashMap::new();
- let logging = verbosity > 0;
- let log_level = match verbosity {
- // 0 => 50, // logging.CRITICAL
- 0 => 40, // logging.ERROR
- 1 => 30, // logging.WARNING
- 2 => 20, // logging.INFO
- 3.. => 10, // logging.DEBUG
- };
- info!("Passing log_level {} to the update script", log_level);
+ let subscriptions = subscriptions::get(app).await?;
- let mut urls: Vec<String> = vec![];
- for (name, sub) in subscriptions.0 {
- if subs_to_update.contains(&name) || subs_to_update.is_empty() {
- urls.push(sub.url.to_string());
- back_subs.insert(sub.url.clone(), sub);
- } else {
- info!(
- "Not updating subscription '{}' as it was not specified",
- name
- );
- }
- }
+ let urls: Vec<_> = if subscription_names_to_update.is_empty() {
+ subscriptions.0.values().collect()
+ } else {
+ subscriptions
+ .0
+ .values()
+ .filter(|sub| {
+ if subscription_names_to_update.contains(&sub.name) {
+ true
+ } else {
+ info!(
+ "Not updating subscription '{}' as it was not specified",
+ sub.name
+ );
+ false
+ }
+ })
+ .collect()
+ };
// We can get away with not having to re-fetch the hashes every time, as the returned video
// should not contain duplicates.
let hashes = get_all_hashes(app).await?;
- let mut child = Command::new("raw_update.py")
- .arg(max_backlog.to_string())
- .arg(urls.len().to_string())
- .arg(log_level.to_string())
- .args(&urls)
- .args(hashes.iter().map(ToString::to_string).collect::<Vec<_>>())
- .stdout(Stdio::piped())
- .stderr(if logging {
- Stdio::inherit()
- } else {
- Stdio::null()
- })
- .stdin(Stdio::null())
- .spawn()
- .context("Failed to call python3 update_raw")?;
-
- let mut out = BufReader::new(
- child
- .stdout
- .take()
- .unreachable("Should be able to take child stdout"),
- )
- .lines();
-
- while let Some(line) = out.next_line().await? {
- // use tokio::{fs::File, io::AsyncWriteExt};
- // let mut output = File::create("output.json").await?;
- // output.write(line.as_bytes()).await?;
- // output.flush().await?;
- // output.sync_all().await?;
- // drop(output);
-
- let output_json: HashMap<Url, InfoJson> = serde_json::from_str(&line)
- .unreachable("The json is generated by our own script. It should be valid");
-
- for (url, value) in output_json {
- let sub = back_subs.get(&url).unreachable("This was stored before");
- process_subscription(app, sub, value, &hashes)
- .await
- .with_context(|| format!("Failed to process subscription: '{}'", sub.name))?;
- }
- }
-
- let out = child.wait().await?;
- if !out.success() {
- error!(
- "The update_raw.py invokation failed (exit code: {}).",
- out.code()
- .map_or("<No exit code>".to_owned(), |f| f.to_string())
- );
+ {
+ let mut updater = Updater::new(max_backlog, &hashes);
+ updater.update(app, &urls).await?;
}
Ok(())
@@ -231,31 +178,21 @@ pub fn video_entry_to_video(entry: InfoJson, sub: Option<&Subscription>) -> Resu
Ok(video)
}
-async fn process_subscription(
- app: &App,
- sub: &Subscription,
- entry: InfoJson,
- hashes: &[blake3::Hash],
-) -> Result<()> {
+async fn process_subscription(app: &App, sub: &Subscription, entry: InfoJson) -> Result<()> {
let video =
video_entry_to_video(entry, Some(sub)).context("Failed to parse search entry as Video")?;
- if hashes.contains(video.extractor_hash.hash()) {
- // We already stored the video information
- unreachable!("The python update script should have never provided us a duplicated video");
- } else {
- add_video(app, video.clone())
+ add_video(app, video.clone())
+ .await
+ .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
+ println!(
+ "{}",
+ (&video
+ .to_formatted_video(app)
.await
- .with_context(|| format!("Failed to add video to database: '{}'", video.title))?;
- println!(
- "{}",
- (&video
- .to_formatted_video(app)
- .await
- .with_context(|| format!("Failed to format video: '{}'", video.title))?
- .colorize(app))
- .to_line_display()
- );
- Ok(())
- }
+ .with_context(|| format!("Failed to format video: '{}'", video.title))?
+ .colorize(app))
+ .to_line_display()
+ );
+ Ok(())
}
diff --git a/yt/src/update/updater.rs b/yt/src/update/updater.rs
new file mode 100644
index 0000000..d54def8
--- /dev/null
+++ b/yt/src/update/updater.rs
@@ -0,0 +1,162 @@
+use std::io::{stderr, Write};
+
+use anyhow::{Context, Result};
+use blake3::Hash;
+use futures::{
+ stream::{self},
+ StreamExt, TryStreamExt,
+};
+use log::{debug, error, info, log_enabled, Level};
+use owo_colors::OwoColorize;
+use serde_json::json;
+use yt_dlp::{error::YtDlpError, process_ie_result, wrapper::info_json::InfoJson};
+
+use crate::{app::App, storage::subscriptions::Subscription};
+
+use super::process_subscription;
+
+pub(super) struct Updater<'a> {
+ max_backlog: usize,
+ hashes: &'a [Hash],
+}
+
+impl<'a> Updater<'a> {
+ pub(super) fn new(max_backlog: usize, hashes: &'a [Hash]) -> Self {
+ Self {
+ max_backlog,
+ hashes,
+ }
+ }
+
+ pub(super) async fn update(
+ &mut self,
+ app: &App,
+ subscriptions: &[&Subscription],
+ ) -> Result<()> {
+ let mut stream = stream::iter(subscriptions)
+ .map(|sub| self.get_new_entries(sub))
+ .buffer_unordered(100);
+
+ while let Some(output) = stream.next().await {
+ let mut entries = output?;
+
+ if entries.is_empty() {
+ continue;
+ }
+
+ let (sub, entry) = entries.remove(0);
+ process_subscription(app, sub, entry).await?;
+
+ let entry_stream: Result<()> = stream::iter(entries)
+ .map(|(sub, entry)| process_subscription(app, sub, entry))
+ .buffer_unordered(100)
+ .try_collect()
+ .await;
+ entry_stream?;
+ }
+
+ Ok(())
+ }
+
+ async fn get_new_entries(
+ &self,
+ sub: &'a Subscription,
+ ) -> Result<Vec<(&'a Subscription, InfoJson)>> {
+ // ANSI ESCAPE CODES Wrappers {{{
+ // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands
+ const CSI: &str = "\x1b[";
+ fn clear_whole_line() {
+ eprint!("{CSI}2K");
+ }
+ fn move_to_col(x: usize) {
+ eprint!("{CSI}{x}G");
+ }
+ // fn hide_cursor() {
+ // eprint!("{CSI}?25l");
+ // }
+ // fn show_cursor() {
+ // eprint!("{CSI}?25h");
+ // }
+ // }}}
+
+ let json = json! {
+ {
+ "playliststart": 1,
+ "playlistend": self.max_backlog,
+ "noplaylist": false,
+ "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+ }
+ };
+ let yt_dlp_opts = json.as_object().expect("This is hardcoded");
+
+ if !log_enabled!(Level::Debug) {
+ clear_whole_line();
+ move_to_col(1);
+ eprint!("Checking playlist {}...", sub.name);
+ move_to_col(1);
+ stderr().flush()?;
+ }
+
+ let info = yt_dlp::extract_info(yt_dlp_opts, &sub.url, false, false)
+ .await
+ .with_context(|| format!("Failed to get playlist '{}'.", sub.name))?;
+
+ let entries = info.entries.unwrap_or(vec![]);
+ let valid_entries: Vec<(&Subscription, InfoJson)> = entries
+ .into_iter()
+ .take(self.max_backlog)
+ .filter_map(|entry| -> Option<(&Subscription, InfoJson)> {
+ let id = entry.id.as_ref().expect("Should exist?");
+ let extractor_hash = blake3::hash(id.as_bytes());
+ if self.hashes.contains(&extractor_hash) {
+ debug!(
+ "Skipping entry, as it is already present: '{}'",
+ extractor_hash
+ );
+ None
+ } else {
+ Some((sub, entry))
+ }
+ })
+ .collect();
+
+ let processed_entries = {
+ let base: Result<Vec<(&Subscription, InfoJson)>, YtDlpError> =
+ stream::iter(valid_entries)
+ .map(|(sub, entry)| async move {
+ match process_ie_result(yt_dlp_opts, entry, false).await {
+ Ok(output) => Ok((sub, output)),
+ Err(err) => Err(err),
+ }
+ })
+ .buffer_unordered(100)
+ .try_collect()
+ .await;
+ match base {
+ Ok(ok) => ok,
+ Err(err) => {
+ if let YtDlpError::PythonError { error, kind } = &err {
+ if kind.as_str() == "<class 'yt_dlp.utils.DownloadError'>"
+ && error.to_string().as_str().contains(
+ "Join this channel to get access to members-only content ",
+ )
+ {
+ vec![]
+ } else {
+ let error_string = error.to_string();
+ let error = error_string
+ .strip_prefix("DownloadError: \u{1b}[0;31mERROR:\u{1b}[0m ")
+ .expect("This prefix should exists");
+ error!("{error}");
+ vec![]
+ }
+ } else {
+ Err(err).context("Failed to process new entries.")?
+ }
+ }
+ }
+ };
+
+ Ok(processed_entries)
+ }
+}