diff options
Diffstat (limited to 'crates/turtle/src/atuin_client')
| -rw-r--r-- | crates/turtle/src/atuin_client/api_client.rs | 116 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_client/meta.rs | 156 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_client/mod.rs | 2 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_client/settings.rs | 151 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_client/sync.rs | 214 |
5 files changed, 19 insertions, 620 deletions
diff --git a/crates/turtle/src/atuin_client/api_client.rs b/crates/turtle/src/atuin_client/api_client.rs index f3f2428a..46995c9a 100644 --- a/crates/turtle/src/atuin_client/api_client.rs +++ b/crates/turtle/src/atuin_client/api_client.rs @@ -16,41 +16,26 @@ use crate::atuin_common::{ }; use crate::atuin_common::{ api::{ - AddHistoryRequest, ChangePasswordRequest, CountResponse, DeleteHistoryRequest, - ErrorResponse, LoginRequest, LoginResponse, MeResponse, RegisterResponse, StatusResponse, - SyncHistoryResponse, + ChangePasswordRequest, ErrorResponse, LoginRequest, LoginResponse, MeResponse, + RegisterResponse, }, record::RecordStatus, }; use semver::Version; -use time::OffsetDateTime; -use time::format_description::well_known::Rfc3339; - -use crate::atuin_client::{history::History, sync::hash_str, utils::get_host_user}; static APP_USER_AGENT: &str = concat!("atuin/", env!("CARGO_PKG_VERSION"),); /// Authentication token for sync API requests. /// -/// The sync API supports two authentication methods: -/// - `Bearer`: Hub API tokens (for users authenticated via Atuin Hub) -/// - `Token`: Legacy CLI session tokens (for users registered via CLI or self-hosted) -/// -/// When both are available, Hub tokens are preferred as they provide unified -/// authentication across CLI and Hub features. +/// Used with `Token <token>` header. #[derive(Debug, Clone)] -pub(crate) enum AuthToken { - /// Legacy CLI session token, used with "Token {token}" header - Token(String), -} +pub(crate) struct AuthToken(pub(crate) String); impl AuthToken { /// Format the token as an Authorization header value fn to_header_value(&self) -> String { - match self { - AuthToken::Token(token) => format!("Token {token}"), - } + format!("Token {}", self.0) } } @@ -62,7 +47,7 @@ pub(crate) struct Client<'a> { fn make_url(address: &str, path: &str) -> Result<String> { // `join()` expects a trailing `/` in order to join paths // e.g. it treats `http://host:port/subdir` as a file called `subdir` - let address = if address.ends_with("/") { + let address = if address.ends_with('/') { address } else { &format!("{address}/") @@ -223,42 +208,6 @@ impl<'a> Client<'a> { }) } - pub(crate) async fn count(&self) -> Result<i64> { - let url = make_url(self.sync_addr, "/sync/count")?; - let url = Url::parse(url.as_str())?; - - let resp = self.client.get(url).send().await?; - let resp = handle_resp_error(resp).await?; - - if !ensure_version(&resp)? { - bail!("could not sync due to version mismatch"); - } - - if resp.status() != StatusCode::OK { - bail!("failed to get count (are you logged in?)"); - } - - let count = resp.json::<CountResponse>().await?; - - Ok(count.count) - } - - pub(crate) async fn status(&self) -> Result<StatusResponse> { - let url = make_url(self.sync_addr, "/sync/status")?; - let url = Url::parse(url.as_str())?; - - let resp = self.client.get(url).send().await?; - let resp = handle_resp_error(resp).await?; - - if !ensure_version(&resp)? { - bail!("could not sync due to version mismatch"); - } - - let status = resp.json::<StatusResponse>().await?; - - Ok(status) - } - pub(crate) async fn me(&self) -> Result<MeResponse> { let url = make_url(self.sync_addr, "/api/v0/me")?; let url = Url::parse(url.as_str())?; @@ -271,59 +220,6 @@ impl<'a> Client<'a> { Ok(status) } - pub(crate) async fn get_history( - &self, - sync_ts: OffsetDateTime, - history_ts: OffsetDateTime, - host: Option<String>, - ) -> Result<SyncHistoryResponse> { - let host = host.unwrap_or_else(|| hash_str(&get_host_user())); - - let url = make_url( - self.sync_addr, - &format!( - "/sync/history?sync_ts={}&history_ts={}&host={}", - urlencoding::encode(sync_ts.format(&Rfc3339)?.as_str()), - urlencoding::encode(history_ts.format(&Rfc3339)?.as_str()), - host, - ), - )?; - - let resp = self.client.get(url).send().await?; - let resp = handle_resp_error(resp).await?; - - let history = resp.json::<SyncHistoryResponse>().await?; - Ok(history) - } - - pub(crate) async fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> { - let url = make_url(self.sync_addr, "/history")?; - let url = Url::parse(url.as_str())?; - - let resp = self.client.post(url).json(history).send().await?; - handle_resp_error(resp).await?; - - Ok(()) - } - - pub(crate) async fn delete_history(&self, h: History) -> Result<()> { - let url = make_url(self.sync_addr, "/history")?; - let url = Url::parse(url.as_str())?; - - let resp = self - .client - .delete(url) - .json(&DeleteHistoryRequest { - client_id: h.id.to_string(), - }) - .send() - .await?; - - handle_resp_error(resp).await?; - - Ok(()) - } - pub(crate) async fn delete_store(&self) -> Result<()> { let url = make_url(self.sync_addr, "/api/v0/store")?; let url = Url::parse(url.as_str())?; diff --git a/crates/turtle/src/atuin_client/meta.rs b/crates/turtle/src/atuin_client/meta.rs index 502d7421..92902c08 100644 --- a/crates/turtle/src/atuin_client/meta.rs +++ b/crates/turtle/src/atuin_client/meta.rs @@ -7,22 +7,12 @@ use eyre::{Result, eyre}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions}; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use tokio::sync::OnceCell; -use tracing::{debug, warn}; +use tracing::debug; use uuid::Uuid; -// Filenames for the legacy plain-text files that we migrate from. -const LEGACY_HOST_ID_FILENAME: &str = "host_id"; -const LEGACY_LAST_SYNC_FILENAME: &str = "last_sync_time"; -const LEGACY_LAST_VERSION_CHECK_FILENAME: &str = "last_version_check_time"; -const LEGACY_LATEST_VERSION_FILENAME: &str = "latest_version"; -const LEGACY_SESSION_FILENAME: &str = "session"; - const KEY_HOST_ID: &str = "host_id"; const KEY_LAST_SYNC: &str = "last_sync_time"; -const KEY_LAST_VERSION_CHECK: &str = "last_version_check_time"; -const KEY_LATEST_VERSION: &str = "latest_version"; const KEY_SESSION: &str = "session"; -const KEY_FILES_MIGRATED: &str = "files_migrated"; pub(crate) struct MetaStore { pool: SqlitePool, @@ -77,10 +67,6 @@ impl MetaStore { cached_host_id: OnceCell::const_new(), }; - if !is_memory { - store.migrate_files().await?; - } - Ok(store) } @@ -97,8 +83,12 @@ impl MetaStore { pub(crate) async fn set(&self, key: &str, value: &str) -> Result<()> { sqlx::query( - "INSERT INTO meta (key, value, updated_at) VALUES (?1, ?2, strftime('%s', 'now')) - ON CONFLICT(key) DO UPDATE SET value = ?2, updated_at = strftime('%s', 'now')", + " + INSERT INTO meta (key, value, updated_at) + VALUES (?1, ?2, strftime('%s', 'now')) + ON CONFLICT(key) DO UPDATE + SET value = ?2, updated_at = strftime('%s', 'now') + ", ) .bind(key) .bind(value) @@ -153,29 +143,6 @@ impl MetaStore { .await } - pub(crate) async fn last_version_check(&self) -> Result<OffsetDateTime> { - match self.get(KEY_LAST_VERSION_CHECK).await? { - Some(v) => Ok(OffsetDateTime::parse(v.as_str(), &Rfc3339)?), - None => Ok(OffsetDateTime::UNIX_EPOCH), - } - } - - pub(crate) async fn save_version_check_time(&self) -> Result<()> { - self.set( - KEY_LAST_VERSION_CHECK, - OffsetDateTime::now_utc().format(&Rfc3339)?.as_str(), - ) - .await - } - - pub(crate) async fn latest_version(&self) -> Result<Option<String>> { - self.get(KEY_LATEST_VERSION).await - } - - pub(crate) async fn save_latest_version(&self, version: &str) -> Result<()> { - self.set(KEY_LATEST_VERSION, version).await - } - pub(crate) async fn session_token(&self) -> Result<Option<String>> { self.get(KEY_SESSION).await } @@ -191,90 +158,6 @@ impl MetaStore { pub(crate) async fn logged_in(&self) -> Result<bool> { Ok(self.session_token().await?.is_some()) } - - // File migration: on first open, migrate old plain-text files into the database. - // Old files are left in place for safe downgrades. - - async fn migrate_files(&self) -> Result<()> { - if self.get(KEY_FILES_MIGRATED).await?.is_some() { - return Ok(()); - } - - let data_dir = crate::atuin_client::settings::Settings::effective_data_dir(); - - // host_id — validate as UUID - let host_id_path = data_dir.join(LEGACY_HOST_ID_FILENAME); - if host_id_path.exists() - && let Ok(value) = fs_err::read_to_string(&host_id_path) - { - let value = value.trim(); - if !value.is_empty() { - if Uuid::from_str(value).is_ok() { - self.set(KEY_HOST_ID, value).await?; - } else { - warn!("skipping migration of host_id: invalid UUID {value:?}"); - } - } - } - - // last_sync_time — validate as RFC3339 - let sync_path = data_dir.join(LEGACY_LAST_SYNC_FILENAME); - if sync_path.exists() - && let Ok(value) = fs_err::read_to_string(&sync_path) - { - let value = value.trim(); - if !value.is_empty() { - if OffsetDateTime::parse(value, &Rfc3339).is_ok() { - self.set(KEY_LAST_SYNC, value).await?; - } else { - warn!("skipping migration of last_sync_time: invalid RFC3339 {value:?}"); - } - } - } - - // last_version_check_time — validate as RFC3339 - let version_check_path = data_dir.join(LEGACY_LAST_VERSION_CHECK_FILENAME); - if version_check_path.exists() - && let Ok(value) = fs_err::read_to_string(&version_check_path) - { - let value = value.trim(); - if !value.is_empty() { - if OffsetDateTime::parse(value, &Rfc3339).is_ok() { - self.set(KEY_LAST_VERSION_CHECK, value).await?; - } else { - warn!( - "skipping migration of last_version_check_time: invalid RFC3339 {value:?}" - ); - } - } - } - - // latest_version — no strict validation, just non-empty - let latest_version_path = data_dir.join(LEGACY_LATEST_VERSION_FILENAME); - if latest_version_path.exists() - && let Ok(value) = fs_err::read_to_string(&latest_version_path) - { - let value = value.trim(); - if !value.is_empty() { - self.set(KEY_LATEST_VERSION, value).await?; - } - } - - // session token — no strict validation, just non-empty - let session_path = data_dir.join(LEGACY_SESSION_FILENAME); - if session_path.exists() - && let Ok(value) = fs_err::read_to_string(&session_path) - { - let value = value.trim(); - if !value.is_empty() { - self.set(KEY_SESSION, value).await?; - } - } - - self.set(KEY_FILES_MIGRATED, "true").await?; - - Ok(()) - } } #[cfg(test)] @@ -324,18 +207,6 @@ mod tests { } #[tokio::test] - async fn test_version_check_time() { - let store = new_test_store().await; - - let t = store.last_version_check().await.unwrap(); - assert_eq!(t, OffsetDateTime::UNIX_EPOCH); - - store.save_version_check_time().await.unwrap(); - let t = store.last_version_check().await.unwrap(); - assert!(t > OffsetDateTime::UNIX_EPOCH); - } - - #[tokio::test] async fn test_session_crud() { let store = new_test_store().await; @@ -352,17 +223,4 @@ mod tests { store.delete_session().await.unwrap(); assert!(!store.logged_in().await.unwrap()); } - - #[tokio::test] - async fn test_latest_version() { - let store = new_test_store().await; - - assert_eq!(store.latest_version().await.unwrap(), None); - - store.save_latest_version("1.2.3").await.unwrap(); - assert_eq!( - store.latest_version().await.unwrap(), - Some("1.2.3".to_string()) - ); - } } diff --git a/crates/turtle/src/atuin_client/mod.rs b/crates/turtle/src/atuin_client/mod.rs index 0ac294cf..ff376c0c 100644 --- a/crates/turtle/src/atuin_client/mod.rs +++ b/crates/turtle/src/atuin_client/mod.rs @@ -6,8 +6,6 @@ pub(crate) mod auth; pub(crate) mod login; #[cfg(feature = "sync")] pub(crate) mod register; -#[cfg(feature = "sync")] -pub(crate) mod sync; pub(crate) mod database; pub(crate) mod distro; diff --git a/crates/turtle/src/atuin_client/settings.rs b/crates/turtle/src/atuin_client/settings.rs index 046aad1a..5ee7cb77 100644 --- a/crates/turtle/src/atuin_client/settings.rs +++ b/crates/turtle/src/atuin_client/settings.rs @@ -15,8 +15,6 @@ use serde::{Deserialize, Serialize}; use serde_with::DeserializeFromStr; use time::{OffsetDateTime, UtcOffset, format_description::FormatItem, macros::format_description}; -pub(crate) const HISTORY_PAGE_SIZE: i64 = 100; - static DATA_DIR: OnceLock<PathBuf> = OnceLock::new(); static META_CONFIG: OnceLock<(String, f64)> = OnceLock::new(); static META_STORE: OnceCell<crate::atuin_client::meta::MetaStore> = OnceCell::const_new(); @@ -24,9 +22,6 @@ static META_STORE: OnceCell<crate::atuin_client::meta::MetaStore> = OnceCell::co pub(crate) mod meta; pub(crate) mod watcher; -/// Default sync address for Atuin's hosted service -pub(crate) const DEFAULT_SYNC_ADDRESS: &str = "https://api.atuin.sh"; - #[derive(Clone, Debug, Deserialize, Copy, ValueEnum, PartialEq, Serialize)] pub(crate) enum SearchMode { #[serde(rename = "prefix")] @@ -335,23 +330,6 @@ impl Default for Stats { } } -/// Sync protocol type for authentication. -/// -/// This setting is primarily for development/testing. When not explicitly set, -/// the protocol is inferred from the sync_address: -/// - Default sync address (api.atuin.sh) → Hub protocol -/// - Custom sync address → Legacy protocol -/// -/// Set explicitly to "hub" to use Hub authentication with a custom sync_address -/// (useful for local development against a Hub instance). -#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize, Default)] -#[serde(rename_all = "lowercase")] -pub(crate) enum SyncProtocol { - /// Use legacy CLI authentication (Token from CLI register/login) - #[default] - Legacy, -} - /// Resolved authentication state for sync operations. /// /// Determined at runtime by examining which tokens are available and what @@ -376,13 +354,14 @@ impl SyncAuth { pub(crate) fn into_auth_token(self) -> Result<crate::atuin_client::api_client::AuthToken> { use crate::atuin_client::api_client::AuthToken; match self { - SyncAuth::Legacy { token } => Ok(AuthToken::Token(token)), + SyncAuth::Legacy { token } => Ok(AuthToken(token)), SyncAuth::NotLoggedIn { reason } => Err(eyre!(reason)), } } } #[derive(Clone, Debug, Deserialize, Default, Serialize)] +#[expect(clippy::struct_excessive_bools)] pub(crate) struct Keys { pub(crate) scroll_exits: bool, pub(crate) exit_past_line_start: bool, @@ -527,18 +506,6 @@ pub(crate) struct Search { pub(crate) frecency_score_multiplier: f64, } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub(crate) struct Tmux { - /// Enable using atuin with tmux popup (tmux >= 3.2) - pub(crate) enabled: bool, - - /// Width of the tmux popup (percentage) - pub(crate) width: String, - - /// Height of the tmux popup (percentage) - pub(crate) height: String, -} - /// Log level for file logging. Maps to tracing's LevelFilter. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] @@ -605,63 +572,6 @@ pub(crate) struct Logs { /// Daemon log settings #[serde(default)] pub(crate) daemon: LogConfig, - - /// AI log settings - #[serde(default)] - pub(crate) ai: LogConfig, -} - -#[derive(Default, Clone, Debug, Deserialize, Serialize)] -pub(crate) struct Ai { - /// Whether or not the AI features are enabled. - pub(crate) enabled: Option<bool>, - - /// The address of the Atuin AI endpoint. Used for AI features like command generation. - /// Only necessary for custom AI endpoints. - pub(crate) endpoint: Option<String>, - - /// The API token for the Atuin AI endpoint. Used for AI features like command generation. - /// Only necessary for custom AI endpoints. - pub(crate) api_token: Option<String>, - - /// Path to the AI sessions database. - pub(crate) db_path: String, - - /// The maximum time in minutes that an AI session can be automatically resumed. - pub(crate) session_continue_minutes: i64, - - /// Deprecated: use opening.send_cwd instead. Kept for backwards compatibility. - #[serde(default)] - pub(crate) send_cwd: Option<bool>, - - /// Configuration for what context is sent in the opening AI request. - #[serde(default)] - pub(crate) opening: AiOpening, - - /// Tool capability flags. - #[serde(default)] - pub(crate) capabilities: AiCapabilities, -} - -#[derive(Default, Clone, Debug, Deserialize, Serialize)] -pub(crate) struct AiCapabilities { - /// Whether the AI can request to search Atuin history. `None` = unset (defaults to enabled, and the ai will ask for permission). - pub(crate) enable_history_search: Option<bool>, - /// Whether the AI can request to view the stored output, if any, for Atuin history entries. `None` = unset (defaults to enabled, and the ai will ask for permission). - pub(crate) enable_history_output: Option<bool>, - /// Whether the AI can request to read and write files. `None` = unset (defaults to enabled, and the ai will ask for permission). - pub(crate) enable_file_tools: Option<bool>, - /// Whether the AI can request to execute bash commands. `None` = unset (defaults to enabled, and the ai will ask for permission). - pub(crate) enable_command_execution: Option<bool>, -} - -#[derive(Default, Clone, Debug, Deserialize, Serialize)] -pub(crate) struct AiOpening { - /// Whether or not to send the current working directory to the AI endpoint. - pub(crate) send_cwd: Option<bool>, - - /// Whether or not to send the last command as context in the opening AI request. - pub(crate) send_last_command: Option<bool>, } impl Default for Preview { @@ -711,10 +621,6 @@ impl Default for Logs { file: "daemon.log".to_string(), ..Default::default() }, - ai: LogConfig { - file: "ai.log".to_string(), - ..Default::default() - }, } } } @@ -740,12 +646,6 @@ impl Logs { self.daemon.enabled.unwrap_or(self.enabled) } - /// Returns whether AI logging is enabled. - /// Uses AI-specific setting if set, otherwise falls back to global. - pub(crate) fn ai_enabled(&self) -> bool { - self.ai.enabled.unwrap_or(self.enabled) - } - /// Returns the log level for search logging. /// Uses search-specific setting if set, otherwise falls back to global. pub(crate) fn search_level(&self) -> LogLevel { @@ -758,12 +658,6 @@ impl Logs { self.daemon.level.unwrap_or(self.level) } - /// Returns the log level for AI logging. - /// Uses AI-specific setting if set, otherwise falls back to global. - pub(crate) fn ai_level(&self) -> LogLevel { - self.ai.level.unwrap_or(self.level) - } - /// Returns the retention days for search logging. /// Uses search-specific setting if set, otherwise falls back to global. pub(crate) fn search_retention(&self) -> u64 { @@ -776,12 +670,6 @@ impl Logs { self.daemon.retention.unwrap_or(self.retention) } - /// Returns the retention days for AI logging. - /// Uses AI-specific setting if set, otherwise falls back to global. - pub(crate) fn ai_retention(&self) -> u64 { - self.ai.retention.unwrap_or(self.retention) - } - /// Returns the full path for the search log file. pub(crate) fn search_path(&self) -> PathBuf { let path = PathBuf::from(&self.search.file); @@ -793,12 +681,6 @@ impl Logs { let path = PathBuf::from(&self.daemon.file); PathBuf::from(&self.dir).join(path) } - - /// Returns the full path for the AI log file. - pub(crate) fn ai_path(&self) -> PathBuf { - let path = PathBuf::from(&self.ai.file); - PathBuf::from(&self.dir).join(path) - } } impl Default for Search { @@ -820,16 +702,6 @@ impl Default for Search { } } -impl Default for Tmux { - fn default() -> Self { - Self { - enabled: false, - width: "80%".to_string(), - height: "60%".to_string(), - } - } -} - // The preview height strategy also takes max_preview_height into account. #[derive(Clone, Debug, Deserialize, Copy, PartialEq, Eq, ValueEnum, Serialize)] pub(crate) enum PreviewStrategy { @@ -1031,6 +903,7 @@ impl Default for Ui { } #[derive(Clone, Debug, Deserialize, Serialize)] +#[expect(clippy::struct_excessive_bools)] pub(crate) struct Settings { pub(crate) data_dir: Option<String>, pub(crate) dialect: Dialect, @@ -1041,9 +914,6 @@ pub(crate) struct Settings { /// The sync address for atuin. pub(crate) sync_address: String, - #[serde(default)] - pub(crate) sync_protocol: SyncProtocol, - pub(crate) sync_frequency: String, pub(crate) db_path: String, pub(crate) record_store_path: String, @@ -1117,9 +987,6 @@ pub(crate) struct Settings { pub(crate) ui: Ui, #[serde(default)] - pub(crate) tmux: Tmux, - - #[serde(default)] pub(crate) logs: Logs, #[serde(default)] @@ -1170,14 +1037,6 @@ impl Settings { Self::meta_store().await?.save_sync_time().await } - pub(crate) async fn last_version_check() -> Result<OffsetDateTime> { - Self::meta_store().await?.last_version_check().await - } - - pub(crate) async fn save_version_check_time() -> Result<()> { - Self::meta_store().await?.save_version_check_time().await - } - pub(crate) async fn should_sync(&self) -> Result<bool> { if !self.auto_sync || !Self::meta_store().await?.logged_in().await? { return Ok(false); @@ -1238,7 +1097,9 @@ impl Settings { /// `AuthToken`. Callers that need to distinguish between auth states /// (e.g. to show different UI) should call `resolve_sync_auth` directly. #[cfg(feature = "sync")] - pub(crate) async fn sync_auth_token(&self) -> Result<crate::atuin_client::api_client::AuthToken> { + pub(crate) async fn sync_auth_token( + &self, + ) -> Result<crate::atuin_client::api_client::AuthToken> { self.resolve_sync_auth().await.into_auth_token() } diff --git a/crates/turtle/src/atuin_client/sync.rs b/crates/turtle/src/atuin_client/sync.rs deleted file mode 100644 index 46efdab9..00000000 --- a/crates/turtle/src/atuin_client/sync.rs +++ /dev/null @@ -1,214 +0,0 @@ -use std::collections::HashSet; -use std::iter::FromIterator; - -use eyre::Result; -use tracing::{debug, info}; - -use crate::atuin_common::api::AddHistoryRequest; -use crypto_secretbox::Key; -use time::OffsetDateTime; - -use crate::atuin_client::{ - api_client, - database::Database, - encryption::{decrypt, encrypt, load_key}, - settings::Settings, -}; - -pub(crate) fn hash_str(string: &str) -> String { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(string.as_bytes()); - hex::encode(hasher.finalize()) -} - -// Currently sync is kinda naive, and basically just pages backwards through -// history. This means newly added stuff shows up properly! We also just use -// the total count in each database to indicate whether a sync is needed. -// I think this could be massively improved! If we had a way of easily -// indicating count per time period (hour, day, week, year, etc) then we can -// easily pinpoint where we are missing data and what needs downloading. Start -// with year, then find the week, then the day, then the hour, then download it -// all! The current naive approach will do for now. - -// Check if remote has things we don't, and if so, download them. -// Returns (num downloaded, total local) -async fn sync_download( - key: &Key, - force: bool, - client: &api_client::Client<'_>, - db: &impl Database, -) -> Result<(i64, i64)> { - debug!("starting sync download"); - - let remote_status = client.status().await?; - let remote_count = remote_status.count; - - // useful to ensure we don't even save something that hasn't yet been synced + deleted - let remote_deleted = - HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str)); - - let initial_local = db.history_count(true).await?; - let mut local_count = initial_local; - - let mut last_sync = if force { - OffsetDateTime::UNIX_EPOCH - } else { - Settings::last_sync().await? - }; - - let mut last_timestamp = OffsetDateTime::UNIX_EPOCH; - - let host = if force { Some(String::from("")) } else { None }; - - while remote_count > local_count { - let page = client - .get_history(last_sync, last_timestamp, host.clone()) - .await?; - - let history: Vec<_> = page - .history - .iter() - // TODO: handle deletion earlier in this chain - .map(|h| serde_json::from_str(h).expect("invalid base64")) - .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key")) - .map(|mut h| { - if remote_deleted.contains(h.id.0.as_str()) { - h.deleted_at = Some(time::OffsetDateTime::now_utc()); - h.command = String::from(""); - } - - h - }) - .collect(); - - db.save_bulk(&history).await?; - - local_count = db.history_count(true).await?; - let remote_page_size = std::cmp::max(remote_status.page_size, 0) as usize; - - if history.len() < remote_page_size { - break; - } - - let page_last = history - .last() - .expect("could not get last element of page") - .timestamp; - - // in the case of a small sync frequency, it's possible for history to - // be "lost" between syncs. In this case we need to rewind the sync - // timestamps - if page_last == last_timestamp { - last_timestamp = OffsetDateTime::UNIX_EPOCH; - last_sync -= time::Duration::hours(1); - } else { - last_timestamp = page_last; - } - } - - for i in remote_status.deleted { - // we will update the stored history to have this data - // pretty much everything can be nullified - match db.load(i.as_str()).await? { - Some(h) => { - db.delete(h).await?; - } - _ => { - info!( - "could not delete history with id {}, not found locally", - i.as_str() - ); - } - } - } - - Ok((local_count - initial_local, local_count)) -} - -// Check if we have things remote doesn't, and if so, upload them -async fn sync_upload( - key: &Key, - _force: bool, - client: &api_client::Client<'_>, - db: &impl Database, -) -> Result<()> { - debug!("starting sync upload"); - - let remote_status = client.status().await?; - let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone()); - - let initial_remote_count = client.count().await?; - let mut remote_count = initial_remote_count; - - let local_count = db.history_count(true).await?; - - debug!("remote has {remote_count}, we have {local_count}"); - - // first just try the most recent set - let mut cursor = OffsetDateTime::now_utc(); - - while local_count > remote_count { - let last = db.before(cursor, remote_status.page_size).await?; - let mut buffer = Vec::new(); - - if last.is_empty() { - break; - } - - for i in last { - let data = encrypt(&i, key)?; - let data = serde_json::to_string(&data)?; - - let add_hist = AddHistoryRequest { - id: i.id.to_string(), - timestamp: i.timestamp, - data, - hostname: hash_str(&i.hostname), - }; - - buffer.push(add_hist); - } - - // anything left over outside of the 100 block size - client.post_history(&buffer).await?; - cursor = buffer.last().unwrap().timestamp; - remote_count = client.count().await?; - - debug!("upload cursor: {cursor:?}"); - } - - let deleted = db.deleted().await?; - - for i in deleted { - if remote_deleted.contains(&i.id.to_string()) { - continue; - } - - info!("deleting {} on remote", i.id); - client.delete_history(i).await?; - } - - Ok(()) -} - -pub(crate) async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> { - let client = api_client::Client::new( - &settings.sync_address, - settings.sync_auth_token().await?, - settings.network_connect_timeout, - settings.network_timeout, - )?; - - Settings::save_sync_time().await?; - - let key = load_key(settings)?; // encryption key - - sync_upload(&key, force, &client, db).await?; - - let download = sync_download(&key, force, &client, db).await?; - - debug!("sync downloaded {}", download.0); - - Ok(()) -} |
