aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkgs/by-name/yt/yt/Cargo.toml2
-rw-r--r--pkgs/by-name/yt/yt/flake.nix7
-rwxr-xr-xpkgs/by-name/yt/yt/python_update/raw_update.py150
-rw-r--r--pkgs/by-name/yt/yt/src/cli.rs71
-rw-r--r--pkgs/by-name/yt/yt/src/main.rs74
-rw-r--r--pkgs/by-name/yt/yt/src/status/mod.rs2
-rw-r--r--pkgs/by-name/yt/yt/src/storage/subscriptions.rs135
-rw-r--r--pkgs/by-name/yt/yt/src/storage/video_database/schema.sql8
-rw-r--r--pkgs/by-name/yt/yt/src/subscribe/mod.rs65
-rw-r--r--pkgs/by-name/yt/yt/src/update/mod.rs321
-rw-r--r--pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs28
11 files changed, 539 insertions, 324 deletions
diff --git a/pkgs/by-name/yt/yt/Cargo.toml b/pkgs/by-name/yt/yt/Cargo.toml
index d703d0cb..ca2c3c8d 100644
--- a/pkgs/by-name/yt/yt/Cargo.toml
+++ b/pkgs/by-name/yt/yt/Cargo.toml
@@ -19,7 +19,7 @@ serde_json = "1.0.125"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "sqlite"] }
stderrlog = "0.6.0"
tempfile = "3.12.0"
-tokio = { version = "1.39.3", features = ["rt-multi-thread", "macros", "process", "time"] }
+tokio = { version = "1.39.3", features = ["rt-multi-thread", "macros", "process", "time", "io-std"] }
url = { version = "2.5.2", features = ["serde"] }
xdg = "2.5.2"
yt_dlp = { path = "./yt_dlp/" }
diff --git a/pkgs/by-name/yt/yt/flake.nix b/pkgs/by-name/yt/yt/flake.nix
index 5ced0fbd..7733d7c7 100644
--- a/pkgs/by-name/yt/yt/flake.nix
+++ b/pkgs/by-name/yt/yt/flake.nix
@@ -14,7 +14,10 @@
}: (flake-utils.lib.eachDefaultSystem (system: let
pkgs = nixpkgs.legacyPackages."${system}";
- yt-dlp = pkgs.python3.withPackages (ps: [ps.yt-dlp]);
+ python = pkgs.python3.withPackages (ps:
+ with ps; [
+ yt-dlp
+ ]);
buildInputs = with pkgs; [
mpv-unwrapped.dev
@@ -49,7 +52,7 @@
sqlx-cli
sqlite-interactive
- yt-dlp
+ python
jq
cargo-edit
diff --git a/pkgs/by-name/yt/yt/python_update/raw_update.py b/pkgs/by-name/yt/yt/python_update/raw_update.py
new file mode 100755
index 00000000..3388bc7c
--- /dev/null
+++ b/pkgs/by-name/yt/yt/python_update/raw_update.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+
+# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`).
+# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b
+
+import asyncio
+import itertools
+import json
+import logging
+import sys
+from dataclasses import dataclass
+from functools import partial
+from typing import Any, Iterable, Optional, Tuple, TypeVar
+
+import yt_dlp
+
+
+@dataclass(frozen=True)
+class Playlist:
+ name: str
+ url: str
+ reverse: bool
+
+
+@dataclass(frozen=True)
+class Video:
+ url: str
+ title: str
+ description: str
+ publish_date: float
+ watch_date: Optional[float]
+ duration: float
+ thumbnail_url: Optional[str]
+ extractor_hash: str
+
+ @property
+ def watched(self) -> bool:
+ return self.watch_date is not None
+
+
+logger = logging.getLogger("yt")
+logging.basicConfig(encoding="utf-8", level=logging.DEBUG)
+
+_ytdl_logger = logging.getLogger("yt_dlp")
+_ytdl_logger.propagate = False
+_ytdl_logger.addHandler(logging.NullHandler())
+YTDL_COMMON_OPTS = {"logger": _ytdl_logger}
+
+T = TypeVar("T")
+
+
+def take(amount: int, iterable: Iterable[T]) -> Iterable[T]:
+ """Take the first elements of an iterable.
+
+ If the given iterable has less elements than the given amount, the returned iterable has the
+ same amount of elements as the given iterable. Otherwise the returned iterable has `amount`
+ elements.
+
+ :param amount: The number of elements to take
+ :param iterable: The iterable to take elements from
+ :return: The first elements of the given iterable
+ """
+ for _, elem in zip(range(amount), iterable):
+ yield elem
+
+
+class Fetcher:
+ def __init__(self, max_backlog):
+ self.max_items = max_backlog
+ self.ydl_opts = {
+ **YTDL_COMMON_OPTS,
+ "playliststart": 1,
+ "playlistend": max_backlog,
+ "noplaylist": False,
+ "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+ }
+
+ async def get_unprocessed_entries(self, url: str) -> Iterable[Tuple[str, Any]]:
+ result = []
+ with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+ logger.info("Checking playlist '%s'...", url)
+ try:
+ loop = asyncio.get_event_loop()
+ info = await loop.run_in_executor(
+ None,
+ partial(ydl.extract_info, url, download=False, process=False),
+ )
+ except yt_dlp.DownloadError as download_error:
+ logger.error(
+ "Failed to get playlist '%s'. Error was: '%s'",
+ url,
+ download_error,
+ )
+ else:
+ entries = info.get("entries", [])
+ for entry in take(self.max_items, entries):
+ result.append((url, entry))
+ return result
+
+ def _process_ie(self, entry):
+ with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+ processed = ydl.process_ie_result(entry, False)
+
+ # walk through the ie_result dictionary to force evaluation of lazily loaded resources
+ repr(processed)
+
+ return processed
+
+ async def process_entry(self, url: str, entry: Any) -> Optional[Any]:
+ try:
+ loop = asyncio.get_event_loop()
+ processed = await loop.run_in_executor(None, self._process_ie, entry)
+ except yt_dlp.DownloadError as download_error:
+ logger.error(
+ "Failed to get a video of playlist '%s'. Error was: '%s'",
+ url,
+ download_error,
+ )
+ return None
+ else:
+ print(json.dumps({url: processed}))
+
+
+class Updater:
+ def __init__(self, max_backlog=20):
+ self.max_items = max_backlog
+ self.fetcher = Fetcher(max_backlog)
+
+ async def update_url(self, url: str):
+ print(f"Updating {url}...", file=sys.stderr)
+ new_entries = await self.fetcher.get_unprocessed_entries(url)
+
+ await asyncio.gather(
+ *itertools.starmap(self.fetcher.process_entry, new_entries)
+ )
+
+ async def do_update(self, urls: Iterable[str]):
+ await asyncio.gather(*map(self.update_url, urls))
+
+ def update(self, urls: Iterable[str]):
+ asyncio.run(self.do_update(urls))
+
+
+def update(max_backlog: int):
+ u = Updater(max_backlog=max_backlog)
+ u.update(sys.argv[2:])
+
+
+max_backlog = int(sys.argv[1])
+update(max_backlog)
diff --git a/pkgs/by-name/yt/yt/src/cli.rs b/pkgs/by-name/yt/yt/src/cli.rs
index 45590d56..23d400f8 100644
--- a/pkgs/by-name/yt/yt/src/cli.rs
+++ b/pkgs/by-name/yt/yt/src/cli.rs
@@ -65,22 +65,6 @@ pub enum Command {
cmd: Option<SelectCommand>,
},
- /// Subscribe to an URL
- Subscribe {
- #[arg(short, long)]
- /// The human readable name of the subscription
- name: Option<String>,
-
- /// The URL to listen to
- url: Url,
- },
-
- /// Unsubscribe from an URL
- Unsubscribe {
- /// The human readable name of the subscription
- name: String,
- },
-
/// Update the video database
Update {
#[arg(short, long, default_value = "20")]
@@ -96,18 +80,12 @@ pub enum Command {
concurrent_processes: usize,
},
- /// Update one subscription (this is here to parallelize the normal `update`)
- #[command(hide = true)]
- UpdateOnce {
- /// The name of the subscription to update
- sub_name: String,
-
- /// The number of videos to stop updating
- max_backlog: u32,
+ /// Manipulate subscription
+ #[command(visible_alias = "subs")]
+ Subscriptions {
+ #[command(subcommand)]
+ cmd: SubscriptionCommand,
},
-
- /// List all subscriptions
- Subscriptions {},
}
impl Default for Command {
@@ -118,6 +96,42 @@ impl Default for Command {
}
}
+#[derive(Subcommand, Clone, Debug)]
+pub enum SubscriptionCommand {
+ /// Subscribe to an URL
+ Add {
+ #[arg(short, long)]
+ /// The human readable name of the subscription
+ name: Option<String>,
+
+ /// The URL to listen to
+ url: Url,
+ },
+
+ /// Unsubscribe from an URL
+ Remove {
+ /// The human readable name of the subscription
+ name: String,
+ },
+
+ /// Import a bunch of URLs as subscriptions.
+ Import {
+ /// The file containing the URLs. Will use Stdin otherwise.
+ file: Option<PathBuf>,
+
+ /// Remove any previous subscriptions
+ #[arg(short, long)]
+ force: bool
+ },
+
+ /// List all subscriptions
+ List {
+ /// Only show the URLs
+ #[arg(short, long)]
+ url: bool,
+ },
+}
+
#[derive(Clone, Debug, Args)]
#[command(infer_subcommands = true)]
/// Mark the video given by the hash to be watched
@@ -192,6 +206,9 @@ impl Default for SelectCommand {
pub enum CheckCommand {
/// Check if the given info.json is deserializable
InfoJson { path: PathBuf },
+
+ /// Check if the given update info.json is deserializable
+ UpdateInfoJson { path: PathBuf },
}
#[derive(Subcommand, Clone, Copy, Debug)]
diff --git a/pkgs/by-name/yt/yt/src/main.rs b/pkgs/by-name/yt/yt/src/main.rs
index 302fba1e..ab16cfa7 100644
--- a/pkgs/by-name/yt/yt/src/main.rs
+++ b/pkgs/by-name/yt/yt/src/main.rs
@@ -1,11 +1,16 @@
-use std::fs;
+use std::{collections::HashMap, fs};
use anyhow::{bail, Context, Result};
use app::App;
use cache::invalidate;
use clap::Parser;
-use cli::{CacheCommand, CheckCommand, SelectCommand};
+use cli::{CacheCommand, CheckCommand, SelectCommand, SubscriptionCommand};
use select::cmds::handle_select_cmd;
+use tokio::{
+ fs::File,
+ io::{stdin, BufReader},
+};
+use url::Url;
use yt_dlp::wrapper::info_json::InfoJson;
use crate::{cli::Command, storage::subscriptions::get_subscriptions};
@@ -56,22 +61,12 @@ async fn main() -> Result<()> {
_ => handle_select_cmd(&app, cmd, None).await?,
}
}
- Command::Subscribe { name, url } => {
- subscribe::subscribe(name, url)
- .await
- .context("Failed to add a subscription")?;
- }
- Command::Unsubscribe { name } => {
- subscribe::unsubscribe(name)
- .await
- .context("Failed to remove a subscription")?;
- }
Command::Update {
max_backlog,
subscriptions,
concurrent_processes,
} => {
- let all_subs = get_subscriptions()?;
+ let all_subs = get_subscriptions(&app).await?;
for sub in &subscriptions {
if let None = all_subs.0.get(sub) {
@@ -82,21 +77,43 @@ async fn main() -> Result<()> {
}
}
- update::update(max_backlog, subscriptions, concurrent_processes).await?;
- }
- Command::UpdateOnce {
- sub_name,
- max_backlog,
- } => {
- update::update_once(&app, sub_name, max_backlog).await?;
+ update::update(&app, max_backlog, subscriptions, concurrent_processes).await?;
}
- Command::Subscriptions {} => {
- let all_subs = get_subscriptions()?;
- for (key, val) in all_subs.0 {
- println!("{}: '{}'", key, val.url);
+ Command::Subscriptions { cmd } => match cmd {
+ SubscriptionCommand::Add { name, url } => {
+ subscribe::subscribe(&app, name, url)
+ .await
+ .context("Failed to add a subscription")?;
}
- }
+ SubscriptionCommand::Remove { name } => {
+ subscribe::unsubscribe(&app, name)
+ .await
+ .context("Failed to remove a subscription")?;
+ }
+ SubscriptionCommand::List { url } => {
+ let all_subs = get_subscriptions(&app).await?;
+
+ if url {
+ for val in all_subs.0.values() {
+ println!("{}", val.url);
+ }
+ } else {
+ for (key, val) in all_subs.0 {
+ println!("{}: '{}'", key, val.url);
+ }
+ }
+ }
+ SubscriptionCommand::Import { file, force } => {
+ if let Some(file) = file {
+ let f = File::open(file).await?;
+
+ subscribe::import(&app, BufReader::new(f), force).await?
+ } else {
+ subscribe::import(&app, BufReader::new(stdin()), force).await?
+ };
+ }
+ },
Command::Watch {} => watch::watch(&app).await?,
@@ -115,6 +132,13 @@ async fn main() -> Result<()> {
let _: InfoJson =
serde_json::from_str(&string).context("Failed to deserialize value")?;
}
+ CheckCommand::UpdateInfoJson { path } => {
+ let string = fs::read_to_string(&path)
+ .with_context(|| format!("Failed to read '{}' to string!", path.display()))?;
+
+ let _: HashMap<Url, InfoJson> =
+ serde_json::from_str(&string).context("Failed to deserialize value")?;
+ }
},
Command::Comments {} => {
comments::comments(&app).await?;
diff --git a/pkgs/by-name/yt/yt/src/status/mod.rs b/pkgs/by-name/yt/yt/src/status/mod.rs
index 43632048..b0ce2bab 100644
--- a/pkgs/by-name/yt/yt/src/status/mod.rs
+++ b/pkgs/by-name/yt/yt/src/status/mod.rs
@@ -60,7 +60,7 @@ pub async fn show(app: &App) -> Result<()> {
let drop_videos_changing = (get!(@changing all_videos, Drop)).len();
let dropped_videos_changing = (get!(@changing all_videos, Dropped)).len();
- let subscriptions = get_subscriptions()?;
+ let subscriptions = get_subscriptions(&app).await?;
let subscriptions_len = subscriptions.0.len();
println!(
"\
diff --git a/pkgs/by-name/yt/yt/src/storage/subscriptions.rs b/pkgs/by-name/yt/yt/src/storage/subscriptions.rs
index 13ec493b..0c262e1a 100644
--- a/pkgs/by-name/yt/yt/src/storage/subscriptions.rs
+++ b/pkgs/by-name/yt/yt/src/storage/subscriptions.rs
@@ -1,20 +1,17 @@
//! Handle subscriptions
-use std::{
- collections::HashMap,
- fs::{self, File},
-};
+use std::collections::HashMap;
-use anyhow::{bail, Context, Result};
-use log::{debug, warn};
-use serde::{Deserialize, Serialize};
+use anyhow::Result;
+use log::debug;
use serde_json::{json, Value};
+use sqlx::query;
use url::Url;
use yt_dlp::wrapper::info_json::InfoType;
-use crate::constants;
+use crate::app::App;
-#[derive(Deserialize, Serialize, Clone, Debug)]
+#[derive(Clone, Debug)]
pub struct Subscription {
/// The human readable name of this subscription
pub name: String,
@@ -48,78 +45,86 @@ pub async fn check_url(url: &Url) -> Result<bool> {
Ok(info._type == Some(InfoType::Playlist))
}
-#[derive(Deserialize, Serialize, Default)]
+#[derive(Default)]
pub struct Subscriptions(pub(crate) HashMap<String, Subscription>);
-/// Get a list of subscriptions
-pub fn get_subscriptions() -> Result<Subscriptions> {
- let subscriptions_path = constants::subscriptions()?;
-
- if subscriptions_path.exists() {
- let subscriptions_file = File::open(&subscriptions_path).with_context(|| {
- format!(
- "Failed to open the subscription file at: '{}'",
- subscriptions_path.display()
- )
- })?;
-
- let subscriptions = serde_json::from_reader(subscriptions_file)
- .context("Failed to deserialize the subscriptions file.")?;
-
- Ok(subscriptions)
- } else {
- warn!("No subscriptions file found!");
- return Ok(Subscriptions::default());
- }
-}
-
-pub fn write_subscriptions(subs: Subscriptions) -> Result<()> {
- let subscriptions_path = constants::subscriptions()?;
-
- fs::create_dir_all(subscriptions_path.parent().expect("Should have a parent"))?;
-
- let subscriptions_file = File::create(&subscriptions_path).with_context(|| {
- format!(
- "Failed to create the subscription file at: '{}'",
- subscriptions_path.display()
- )
- })?;
-
- serde_json::to_writer(subscriptions_file, &subs)
- .context("Failed to serialize and write the subscriptions file.")?;
+pub async fn remove_all_subscriptions(app: &App) -> Result<()> {
+ query!(
+ "
+ DELETE FROM subscriptions;
+ ",
+ )
+ .execute(&app.database)
+ .await?;
Ok(())
}
-pub async fn remove_subscription(sub: &Subscription) -> Result<()> {
- let mut present_subscriptions = get_subscriptions()?;
+/// Get a list of subscriptions
+pub async fn get_subscriptions(app: &App) -> Result<Subscriptions> {
+ let raw_subs = query!(
+ "
+ SELECT *
+ FROM subscriptions;
+ "
+ )
+ .fetch_all(&app.database)
+ .await?;
- let string = format!("Unsubscribed from '{}' at '{}'", sub.name, sub.url);
+ let subscriptions: HashMap<String, Subscription> = raw_subs
+ .into_iter()
+ .map(|sub| {
+ (
+ sub.name.clone(),
+ Subscription::new(
+ sub.name,
+ Url::parse(&sub.url).expect("This should be valid"),
+ ),
+ )
+ })
+ .collect();
- if let None = present_subscriptions.0.remove(&sub.name) {
- bail!(
- "The subscription '{}' was not in your subscription list!",
- sub.name
- );
- }
+ Ok(Subscriptions(subscriptions))
+}
- write_subscriptions(present_subscriptions)?;
+pub async fn add_subscription(app: &App, sub: &Subscription) -> Result<()> {
+ let url = sub.url.to_string();
- println!("{}", string);
+ query!(
+ "
+ INSERT INTO subscriptions (
+ name,
+ url
+ ) VALUES (?, ?);
+ ",
+ sub.name,
+ url
+ )
+ .execute(&app.database)
+ .await?;
+ println!("Subscribed to '{}' at '{}'", sub.name, sub.url);
Ok(())
}
-pub async fn add_subscription(sub: Subscription) -> Result<()> {
- let mut present_subscriptions = get_subscriptions()?;
-
- let string = format!("Subscribed to {} at {}", sub.name, sub.url);
-
- present_subscriptions.0.insert(sub.name.clone(), sub);
+pub async fn remove_subscription(app: &App, sub: &Subscription) -> Result<()> {
+ let output = query!(
+ "
+ DELETE FROM subscriptions
+ WHERE name = ?
+ ",
+ sub.name,
+ )
+ .execute(&app.database)
+ .await?;
- write_subscriptions(present_subscriptions)?;
+ assert_eq!(
+ output.rows_affected(),
+ 1,
+ "The remove subscriptino query did effect more (or less) than one row. This is a bug."
+ );
- println!("{}", string);
+ println!("Unsubscribed from '{}' at '{}'", sub.name, sub.url);
Ok(())
}
diff --git a/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql b/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql
index 2e9e18af..74378ff8 100644
--- a/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql
+++ b/pkgs/by-name/yt/yt/src/storage/video_database/schema.sql
@@ -37,4 +37,10 @@ CREATE TABLE IF NOT EXISTS video_options (
subtitle_langs TEXT NOT NULL,
playback_speed REAL NOT NULL,
FOREIGN KEY(extractor_hash) REFERENCES videos (extractor_hash)
-)
+);
+
+-- Store subscriptions
+CREATE TABLE IF NOT EXISTS subscriptions (
+ name TEXT UNIQUE NOT NULL PRIMARY KEY,
+ url TEXT NOT NULL
+);
diff --git a/pkgs/by-name/yt/yt/src/subscribe/mod.rs b/pkgs/by-name/yt/yt/src/subscribe/mod.rs
index 88be038a..3311dc42 100644
--- a/pkgs/by-name/yt/yt/src/subscribe/mod.rs
+++ b/pkgs/by-name/yt/yt/src/subscribe/mod.rs
@@ -1,19 +1,26 @@
+use std::str::FromStr;
+
use anyhow::{bail, Context, Result};
use futures::FutureExt;
use log::warn;
use serde_json::{json, Value};
+use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use url::Url;
use yt_dlp::wrapper::info_json::InfoType;
-use crate::storage::subscriptions::{
- add_subscription, check_url, get_subscriptions, remove_subscription, Subscription,
+use crate::{
+ app::App,
+ storage::subscriptions::{
+ add_subscription, check_url, get_subscriptions, remove_all_subscriptions,
+ remove_subscription, Subscription,
+ },
};
-pub async fn unsubscribe(name: String) -> Result<()> {
- let present_subscriptions = get_subscriptions()?;
+pub async fn unsubscribe(app: &App, name: String) -> Result<()> {
+ let present_subscriptions = get_subscriptions(&app).await?;
if let Some(subscription) = present_subscriptions.0.get(&name) {
- remove_subscription(subscription).await?;
+ remove_subscription(&app, subscription).await?;
} else {
bail!("Couldn't find subscription: '{}'", &name);
}
@@ -21,7 +28,36 @@ pub async fn unsubscribe(name: String) -> Result<()> {
Ok(())
}
-pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> {
+pub async fn import<W: AsyncBufRead + AsyncBufReadExt + Unpin>(
+ app: &App,
+ reader: W,
+ force: bool,
+) -> Result<()> {
+ if force {
+ remove_all_subscriptions(&app).await?;
+ }
+
+ let mut lines = reader.lines();
+ while let Some(line) = lines.next_line().await? {
+ let url =
+ Url::from_str(&line).with_context(|| format!("Failed to parse '{}' as url", line))?;
+ match subscribe(app, None, url)
+ .await
+ .with_context(|| format!("Failed to subscribe to: '{}'", line))
+ {
+ Ok(_) => (),
+ Err(err) => eprintln!(
+ "Error while subscribing to '{}': '{}'",
+ line,
+ err.source().expect("Should have a source").to_string()
+ ),
+ }
+ }
+
+ Ok(())
+}
+
+pub async fn subscribe(app: &App, name: Option<String>, url: Url) -> Result<()> {
if !(url.as_str().ends_with("videos")
|| url.as_str().ends_with("streams")
|| url.as_str().ends_with("shorts"))
@@ -35,6 +71,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> {
if let Some(name) = name {
let out: Result<()> = async move {
actual_subscribe(
+ &app,
Some(name.clone() + " {Videos}"),
url.join("videos/").expect("Works"),
)
@@ -44,6 +81,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> {
})?;
actual_subscribe(
+ &app,
Some(name.clone() + " {Streams}"),
url.join("streams/").expect("Works"),
)
@@ -53,6 +91,7 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> {
})?;
actual_subscribe(
+ &app,
Some(name.clone() + " {Shorts}"),
url.join("shorts/").expect("Works"),
)
@@ -66,26 +105,26 @@ pub async fn subscribe(name: Option<String>, url: Url) -> Result<()> {
out?
} else {
- actual_subscribe(None, url.join("videos/").expect("Works"))
+ actual_subscribe(&app, None, url.join("videos/").expect("Works"))
.await
.with_context(|| format!("Failed to subscribe to the '{}' variant", "{Videos}"))?;
- actual_subscribe(None, url.join("streams/").expect("Works"))
+ actual_subscribe(&app, None, url.join("streams/").expect("Works"))
.await
.with_context(|| format!("Failed to subscribe to the '{}' variant", "{Streams}"))?;
- actual_subscribe(None, url.join("shorts/").expect("Works"))
+ actual_subscribe(&app, None, url.join("shorts/").expect("Works"))
.await
.with_context(|| format!("Failed to subscribe to the '{}' variant", "{Shorts}"))?;
}
} else {
- actual_subscribe(name, url).await?;
+ actual_subscribe(&app, name, url).await?;
}
Ok(())
}
-async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> {
+async fn actual_subscribe(app: &App, name: Option<String>, url: Url) -> Result<()> {
let name = if let Some(name) = name {
if !check_url(&url).await? {
bail!("The url ('{}') does not represent a playlist!", &url)
@@ -112,7 +151,7 @@ async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> {
}
};
- let present_subscriptions = get_subscriptions()?;
+ let present_subscriptions = get_subscriptions(&app).await?;
if let Some(subs) = present_subscriptions.0.get(&name) {
bail!(
@@ -126,7 +165,7 @@ async fn actual_subscribe(name: Option<String>, url: Url) -> Result<()> {
let sub = Subscription { name, url };
- add_subscription(sub).await?;
+ add_subscription(&app, &sub).await?;
Ok(())
}
diff --git a/pkgs/by-name/yt/yt/src/update/mod.rs b/pkgs/by-name/yt/yt/src/update/mod.rs
index 2cbae9fd..60399a0e 100644
--- a/pkgs/by-name/yt/yt/src/update/mod.rs
+++ b/pkgs/by-name/yt/yt/src/update/mod.rs
@@ -1,125 +1,98 @@
-use std::{env::current_exe, str::FromStr, sync::Arc};
+use std::{collections::HashMap, process::Stdio, str::FromStr};
-use anyhow::{bail, Context, Ok, Result};
+use anyhow::{Context, Ok, Result};
use chrono::{DateTime, Utc};
-use log::{debug, error, info, warn};
-use serde_json::{json, Value};
-use tokio::{process::Command, sync::Semaphore, task::JoinSet};
-use yt_dlp::unsmuggle_url;
+use log::{error, info, warn};
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ process::Command,
+};
+use url::Url;
+use yt_dlp::{unsmuggle_url, wrapper::info_json::InfoJson};
use crate::{
app::App,
storage::{
subscriptions::{get_subscriptions, Subscription},
video_database::{
- extractor_hash::ExtractorHash, getters::get_video_hashes, setters::add_video, Video,
+ extractor_hash::ExtractorHash, getters::get_all_hashes, setters::add_video, Video,
VideoStatus,
},
},
};
pub async fn update(
+ app: &App,
max_backlog: u32,
subs_to_update: Vec<String>,
- concurrent_processes: usize,
+ _concurrent_processes: usize,
) -> Result<()> {
- let subscriptions = get_subscriptions()?;
-
- let mut join_set = JoinSet::new();
- let permits = Arc::new(Semaphore::const_new(concurrent_processes));
-
- for key in subscriptions.0.into_keys() {
- if subs_to_update.contains(&key) || subs_to_update.is_empty() {
- let new_permits = Arc::clone(&permits);
-
- join_set.spawn(async move {
- let _permit = new_permits
- .acquire()
- .await
- .expect("The semaphore should not be closed");
-
- debug!(
- "Started downloading: `yt 'update-once' '{}' '{}'`",
- &key,
- max_backlog.to_string()
- );
+ let subscriptions = get_subscriptions(&app).await?;
+ let mut back_subs: HashMap<Url, Subscription> = HashMap::new();
- let exe_name = current_exe().context("Failed to get the current executable")?;
- let mut child = Command::new(exe_name)
- // TODO: Add currying of the verbosity flags <2024-07-28>
- // .arg("-vvvv")
- .arg("update-once")
- .arg(&key)
- .arg(max_backlog.to_string())
- .spawn()
- .context("Failed to call yt update-once")?;
-
- let output = child.wait().await;
-
- Ok((output, key))
- });
+ let mut urls: Vec<String> = vec![];
+ for (name, sub) in subscriptions.0 {
+ if subs_to_update.contains(&name) || subs_to_update.is_empty() {
+ urls.push(sub.url.to_string());
+ back_subs.insert(sub.url.clone(), sub);
} else {
info!(
"Not updating subscription '{}' as it was not specified",
- key
+ name
);
}
}
- while let Some(res) = join_set.join_next().await {
- let (output, key) = res??;
+ let mut child = Command::new("./python_update/raw_update.py")
+ .arg(max_backlog.to_string())
+ .args(&urls)
+ .stdout(Stdio::piped())
+ .stderr(Stdio::null())
+ .stdin(Stdio::null())
+ .spawn()
+ .context("Failed to call python3 update_raw")?;
- debug!("{} finished its update.", &key);
- match output {
- std::result::Result::Ok(status) => {
- if !status.success() {
- // This should already be printed as error, so we only print it here as debug.
- debug!("A yt update-once invokation failed for subscription: '{key}'")
- }
- }
- Err(err) => {
- error!("Failed to update subscription ('{key}') because of io error: '{err}'")
- }
- }
- }
+ let mut out = BufReader::new(
+ child
+ .stdout
+ .take()
+ .expect("Should be able to take child stdout"),
+ )
+ .lines();
- Ok(())
-}
+ let hashes = get_all_hashes(app).await?;
-pub async fn update_once(app: &App, sub_name: String, max_backlog: u32) -> Result<()> {
- let subs = get_subscriptions()?;
+ while let Some(line) = out.next_line().await? {
+ // use tokio::{fs::File, io::AsyncWriteExt};
+ // let mut output = File::create("output.json").await?;
+ // output.write(line.as_bytes()).await?;
+ // output.flush().await?;
+ // output.sync_all().await?;
+ // drop(output);
- if let Some(sub) = subs.0.get(&sub_name) {
- if let Err(err) = update_subscription(app, sub, max_backlog).await {
- error!("Failed to update subscription ('{sub_name}'): {err}")
+ let output_json: HashMap<Url, InfoJson> =
+ serde_json::from_str(&line).expect("This should be valid json");
+
+ for (url, value) in output_json {
+ let sub = back_subs.get(&url).expect("This was stored before");
+ process_subscription(app, sub, value, &hashes).await?
}
- } else {
- bail!(
- "Your subscription name '{}' is not a subscription (This is probably a bug)!",
- sub_name
- );
+ }
+
+ let out = child.wait().await?;
+ if out.success() {
+ error!("A yt update-once invokation failed for all subscriptions.")
}
Ok(())
}
-async fn update_subscription(app: &App, sub: &Subscription, max_backlog: u32) -> Result<()> {
- println!("Updating subscription '{}'...", sub.name);
-
- let yt_opts = match json!( {
- "playliststart": 1,
- "playlistend": max_backlog,
- // "noplaylist": false,
- "extractor_args": {
- "youtubetab": {
- "approximate_date": [""]
- }
- }
- }) {
- Value::Object(map) => map,
- _ => unreachable!("This is hardcoded"),
- };
-
+async fn process_subscription(
+ app: &App,
+ sub: &Subscription,
+ entry: InfoJson,
+ hashes: &Vec<blake3::Hash>,
+) -> Result<()> {
macro_rules! unwrap_option {
($option:expr) => {
match $option {
@@ -133,108 +106,92 @@ async fn update_subscription(app: &App, sub: &Subscription, max_backlog: u32) ->
};
}
- let videos = yt_dlp::extract_info(&yt_opts, &sub.url, false, true)
- .await
- .with_context(|| {
- format!(
- "Failed to get info for subscription '{}': '{}'",
- sub.name, sub.url
- )
- })?;
+ let publish_date = if let Some(date) = &entry.upload_date {
+ let year: u32 = date
+ .chars()
+ .take(4)
+ .collect::<String>()
+ .parse()
+ .expect("Should work.");
+ let month: u32 = date
+ .chars()
+ .skip(4)
+ .take(2)
+ .collect::<String>()
+ .parse()
+ .expect("Should work");
+ let day: u32 = date
+ .chars()
+ .skip(6)
+ .take(2)
+ .collect::<String>()
+ .parse()
+ .expect("Should work");
- let hashes = get_video_hashes(app, &sub).await?;
-
- let real_videos = unwrap_option!(videos.entries)
- .iter()
- .map(|entry| -> Result<Option<Video>> {
- let publish_date = if let Some(date) = &entry.upload_date {
- let year: u32 = date
- .chars()
- .take(4)
- .collect::<String>()
- .parse()
- .expect("Should work.");
- let month: u32 = date
- .chars()
- .skip(4)
- .take(2)
- .collect::<String>()
- .parse()
- .expect("Should work");
- let day: u32 = date
- .chars()
- .skip(6)
- .take(2)
- .collect::<String>()
- .parse()
- .expect("Should work");
-
- let date_string = format!("{year:04}-{month:02}-{day:02}T00:00:00Z");
- Some(
- DateTime::<Utc>::from_str(&date_string)
- .expect("This should always work")
- .timestamp(),
- )
- } else {
- warn!(
- "The video '{}' lacks it's upload date!",
- unwrap_option!(&entry.title)
- );
- None
- };
-
- let thumbnail_url = match (&entry.thumbnails, &entry.thumbnail) {
- (None, None) => None,
- (None, Some(thumbnail)) => Some(thumbnail.to_owned()),
+ let date_string = format!("{year:04}-{month:02}-{day:02}T00:00:00Z");
+ Some(
+ DateTime::<Utc>::from_str(&date_string)
+ .expect("This should always work")
+ .timestamp(),
+ )
+ } else {
+ warn!(
+ "The video '{}' lacks it's upload date!",
+ unwrap_option!(&entry.title)
+ );
+ None
+ };
- // TODO: The algorithm is not exactly the best <2024-05-28>
- (Some(thumbnails), None) => Some(
- thumbnails
- .get(0)
- .expect("At least one should exist")
- .url
- .clone(),
- ),
- (Some(_), Some(thumnail)) => Some(thumnail.to_owned()),
- };
+ let thumbnail_url = match (&entry.thumbnails, &entry.thumbnail) {
+ (None, None) => None,
+ (None, Some(thumbnail)) => Some(thumbnail.to_owned()),
- let url = {
- let smug_url: url::Url = unwrap_option!(entry.webpage_url.clone());
- unsmuggle_url(smug_url)?
- };
+ // TODO: The algorithm is not exactly the best <2024-05-28>
+ (Some(thumbnails), None) => Some(
+ thumbnails
+ .get(0)
+ .expect("At least one should exist")
+ .url
+ .clone(),
+ ),
+ (Some(_), Some(thumnail)) => Some(thumnail.to_owned()),
+ };
- let extractor_hash = blake3::hash(url.as_str().as_bytes());
+ let url = {
+ let smug_url: url::Url = unwrap_option!(entry.webpage_url.clone());
+ unsmuggle_url(smug_url)?
+ };
- if hashes.contains(&extractor_hash) {
- // We already stored the video information
- Ok(None)
- } else {
- let video = Video {
- cache_path: None,
- description: entry.description.clone(),
- duration: entry.duration,
- extractor_hash: ExtractorHash::from_hash(extractor_hash),
- last_status_change: Utc::now().timestamp(),
- parent_subscription_name: Some(sub.name.clone()),
- priority: 0,
- publish_date,
- status: VideoStatus::Pick,
- status_change: false,
- thumbnail_url,
- title: unwrap_option!(entry.title.clone()),
- url,
- };
+ let extractor_hash = blake3::hash(url.as_str().as_bytes());
- println!("{}", video.to_color_display());
- Ok(Some(video))
- }
- })
- .collect::<Result<Vec<Option<Video>>>>()?;
+ if hashes.contains(&extractor_hash) {
+ // We already stored the video information
+ println!(
+ "(Ignoring duplicated video from: '{}' -> '{}')",
+ sub.name,
+ unwrap_option!(entry.title)
+ );
+ return Ok(());
+ } else {
+ let video = Video {
+ cache_path: None,
+ description: entry.description.clone(),
+ duration: entry.duration,
+ extractor_hash: ExtractorHash::from_hash(extractor_hash),
+ last_status_change: Utc::now().timestamp(),
+ parent_subscription_name: Some(sub.name.clone()),
+ priority: 0,
+ publish_date,
+ status: VideoStatus::Pick,
+ status_change: false,
+ thumbnail_url,
+ title: unwrap_option!(entry.title.clone()),
+ url,
+ };
- for video in real_videos {
- if let Some(video) = video {
- add_video(app, video).await?;
- }
+ println!("{}", video.to_color_display());
+ add_video(app, video).await?;
}
+
Ok(())
}
diff --git a/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs b/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs
index 9b50aa93..46da6eee 100644
--- a/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs
+++ b/pkgs/by-name/yt/yt/yt_dlp/src/wrapper/info_json.rs
@@ -41,6 +41,7 @@ pub struct InfoJson {
pub chapters: Option<Vec<Chapter>>,
pub comment_count: Option<u32>,
pub comments: Option<Vec<Comment>>,
+ pub concurrent_view_count: Option<u32>,
pub description: Option<String>,
pub display_id: Option<String>,
pub downloader_options: Option<DownloaderOptions>,
@@ -48,6 +49,8 @@ pub struct InfoJson {
pub duration_string: Option<String>,
pub dynamic_range: Option<String>,
pub entries: Option<Vec<InfoJson>>,
+ pub episode: Option<String>,
+ pub episode_number: Option<u32>,
pub epoch: Option<u32>,
pub ext: Option<String>,
pub extractor: Option<Extractor>,
@@ -99,6 +102,9 @@ pub struct InfoJson {
pub requested_formats: Option<Vec<Format>>,
pub requested_subtitles: Option<HashMap<String, Subtitle>>,
pub resolution: Option<String>,
+ pub season: Option<String>,
+ pub season_number: Option<u32>,
+ pub series: Option<String>,
pub source_preference: Option<i32>,
pub sponsorblock_chapters: Option<Vec<SponsorblockChapter>>,
pub stretched_ratio: Option<Todo>,
@@ -283,6 +289,9 @@ pub enum Extractor {
#[serde(alias = "generic")]
Generic,
+ #[serde(alias = "SVTSeries")]
+ SVTSeries,
+
#[serde(alias = "youtube")]
YouTube,
@@ -296,6 +305,9 @@ pub enum ExtractorKey {
#[serde(alias = "Generic")]
Generic,
+ #[serde(alias = "SVTSeries")]
+ SVTSeries,
+
#[serde(alias = "Youtube")]
YouTube,
@@ -425,7 +437,7 @@ pub struct ThumbNail {
#[serde(deny_unknown_fields)]
pub struct Format {
pub __needs_testing: Option<bool>,
- pub __working: Option<Todo>,
+ pub __working: Option<bool>,
pub abr: Option<f64>,
pub acodec: Option<String>,
pub aspect_ratio: Option<f64>,
@@ -450,8 +462,10 @@ pub struct Format {
pub has_drm: Option<bool>,
pub height: Option<u32>,
pub http_headers: Option<HttpHeader>,
+ pub is_dash_periods: Option<bool>,
pub language: Option<String>,
pub language_preference: Option<i32>,
+ pub manifest_stream_number: Option<u32>,
pub manifest_url: Option<Url>,
pub preference: Option<i32>,
pub protocol: Option<String>,
@@ -477,20 +491,20 @@ pub struct DownloaderOptions {
#[serde(deny_unknown_fields)]
pub struct HttpHeader {
#[serde(alias = "User-Agent")]
- pub user_agent: String,
+ pub user_agent: Option<String>,
#[serde(alias = "Accept")]
- pub accept: String,
+ pub accept: Option<String>,
#[serde(alias = "Accept-Language")]
- pub accept_language: String,
+ pub accept_language: Option<String>,
#[serde(alias = "Sec-Fetch-Mode")]
- pub sec_fetch_mode: String,
+ pub sec_fetch_mode: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, PartialOrd)]
#[serde(deny_unknown_fields)]
pub struct Fragment {
- pub url: String,
- pub duration: f64,
+ pub url: Option<Url>,
+ pub duration: Option<f64>,
pub path: Option<PathBuf>,
}