diff options
| author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 16:10:29 +0200 |
|---|---|---|
| committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 16:10:29 +0200 |
| commit | 97f207b771b94c5285faae4810d6eeda1b78926b (patch) | |
| tree | 4482544233c30e0e9a62be6afcfe92c8e01b0a50 /crates/turtle/src/atuin_client/sync.rs | |
| parent | chore: Remove all `pub`s (diff) | |
| download | atuin-97f207b771b94c5285faae4810d6eeda1b78926b.zip | |
chore(server): Simplify the database support
Diffstat (limited to 'crates/turtle/src/atuin_client/sync.rs')
| -rw-r--r-- | crates/turtle/src/atuin_client/sync.rs | 214 |
1 files changed, 0 insertions, 214 deletions
diff --git a/crates/turtle/src/atuin_client/sync.rs b/crates/turtle/src/atuin_client/sync.rs deleted file mode 100644 index 46efdab9..00000000 --- a/crates/turtle/src/atuin_client/sync.rs +++ /dev/null @@ -1,214 +0,0 @@ -use std::collections::HashSet; -use std::iter::FromIterator; - -use eyre::Result; -use tracing::{debug, info}; - -use crate::atuin_common::api::AddHistoryRequest; -use crypto_secretbox::Key; -use time::OffsetDateTime; - -use crate::atuin_client::{ - api_client, - database::Database, - encryption::{decrypt, encrypt, load_key}, - settings::Settings, -}; - -pub(crate) fn hash_str(string: &str) -> String { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(string.as_bytes()); - hex::encode(hasher.finalize()) -} - -// Currently sync is kinda naive, and basically just pages backwards through -// history. This means newly added stuff shows up properly! We also just use -// the total count in each database to indicate whether a sync is needed. -// I think this could be massively improved! If we had a way of easily -// indicating count per time period (hour, day, week, year, etc) then we can -// easily pinpoint where we are missing data and what needs downloading. Start -// with year, then find the week, then the day, then the hour, then download it -// all! The current naive approach will do for now. - -// Check if remote has things we don't, and if so, download them. -// Returns (num downloaded, total local) -async fn sync_download( - key: &Key, - force: bool, - client: &api_client::Client<'_>, - db: &impl Database, -) -> Result<(i64, i64)> { - debug!("starting sync download"); - - let remote_status = client.status().await?; - let remote_count = remote_status.count; - - // useful to ensure we don't even save something that hasn't yet been synced + deleted - let remote_deleted = - HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str)); - - let initial_local = db.history_count(true).await?; - let mut local_count = initial_local; - - let mut last_sync = if force { - OffsetDateTime::UNIX_EPOCH - } else { - Settings::last_sync().await? - }; - - let mut last_timestamp = OffsetDateTime::UNIX_EPOCH; - - let host = if force { Some(String::from("")) } else { None }; - - while remote_count > local_count { - let page = client - .get_history(last_sync, last_timestamp, host.clone()) - .await?; - - let history: Vec<_> = page - .history - .iter() - // TODO: handle deletion earlier in this chain - .map(|h| serde_json::from_str(h).expect("invalid base64")) - .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key")) - .map(|mut h| { - if remote_deleted.contains(h.id.0.as_str()) { - h.deleted_at = Some(time::OffsetDateTime::now_utc()); - h.command = String::from(""); - } - - h - }) - .collect(); - - db.save_bulk(&history).await?; - - local_count = db.history_count(true).await?; - let remote_page_size = std::cmp::max(remote_status.page_size, 0) as usize; - - if history.len() < remote_page_size { - break; - } - - let page_last = history - .last() - .expect("could not get last element of page") - .timestamp; - - // in the case of a small sync frequency, it's possible for history to - // be "lost" between syncs. In this case we need to rewind the sync - // timestamps - if page_last == last_timestamp { - last_timestamp = OffsetDateTime::UNIX_EPOCH; - last_sync -= time::Duration::hours(1); - } else { - last_timestamp = page_last; - } - } - - for i in remote_status.deleted { - // we will update the stored history to have this data - // pretty much everything can be nullified - match db.load(i.as_str()).await? { - Some(h) => { - db.delete(h).await?; - } - _ => { - info!( - "could not delete history with id {}, not found locally", - i.as_str() - ); - } - } - } - - Ok((local_count - initial_local, local_count)) -} - -// Check if we have things remote doesn't, and if so, upload them -async fn sync_upload( - key: &Key, - _force: bool, - client: &api_client::Client<'_>, - db: &impl Database, -) -> Result<()> { - debug!("starting sync upload"); - - let remote_status = client.status().await?; - let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone()); - - let initial_remote_count = client.count().await?; - let mut remote_count = initial_remote_count; - - let local_count = db.history_count(true).await?; - - debug!("remote has {remote_count}, we have {local_count}"); - - // first just try the most recent set - let mut cursor = OffsetDateTime::now_utc(); - - while local_count > remote_count { - let last = db.before(cursor, remote_status.page_size).await?; - let mut buffer = Vec::new(); - - if last.is_empty() { - break; - } - - for i in last { - let data = encrypt(&i, key)?; - let data = serde_json::to_string(&data)?; - - let add_hist = AddHistoryRequest { - id: i.id.to_string(), - timestamp: i.timestamp, - data, - hostname: hash_str(&i.hostname), - }; - - buffer.push(add_hist); - } - - // anything left over outside of the 100 block size - client.post_history(&buffer).await?; - cursor = buffer.last().unwrap().timestamp; - remote_count = client.count().await?; - - debug!("upload cursor: {cursor:?}"); - } - - let deleted = db.deleted().await?; - - for i in deleted { - if remote_deleted.contains(&i.id.to_string()) { - continue; - } - - info!("deleting {} on remote", i.id); - client.delete_history(i).await?; - } - - Ok(()) -} - -pub(crate) async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> { - let client = api_client::Client::new( - &settings.sync_address, - settings.sync_auth_token().await?, - settings.network_connect_timeout, - settings.network_timeout, - )?; - - Settings::save_sync_time().await?; - - let key = load_key(settings)?; // encryption key - - sync_upload(&key, force, &client, db).await?; - - let download = sync_download(&key, force, &client, db).await?; - - debug!("sync downloaded {}", download.0); - - Ok(()) -} |
