aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_client/sync.rs
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/turtle/src/atuin_client/sync.rs
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/turtle/src/atuin_client/sync.rs')
-rw-r--r--crates/turtle/src/atuin_client/sync.rs214
1 files changed, 214 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_client/sync.rs b/crates/turtle/src/atuin_client/sync.rs
new file mode 100644
index 00000000..361b6636
--- /dev/null
+++ b/crates/turtle/src/atuin_client/sync.rs
@@ -0,0 +1,214 @@
+use std::collections::HashSet;
+use std::iter::FromIterator;
+
+use eyre::Result;
+use tracing::{debug, info};
+
+use crate::atuin_common::api::AddHistoryRequest;
+use crypto_secretbox::Key;
+use time::OffsetDateTime;
+
+use crate::atuin_client::{
+ api_client,
+ database::Database,
+ encryption::{decrypt, encrypt, load_key},
+ settings::Settings,
+};
+
+pub fn hash_str(string: &str) -> String {
+ use sha2::{Digest, Sha256};
+ let mut hasher = Sha256::new();
+ hasher.update(string.as_bytes());
+ hex::encode(hasher.finalize())
+}
+
+// Currently sync is kinda naive, and basically just pages backwards through
+// history. This means newly added stuff shows up properly! We also just use
+// the total count in each database to indicate whether a sync is needed.
+// I think this could be massively improved! If we had a way of easily
+// indicating count per time period (hour, day, week, year, etc) then we can
+// easily pinpoint where we are missing data and what needs downloading. Start
+// with year, then find the week, then the day, then the hour, then download it
+// all! The current naive approach will do for now.
+
+// Check if remote has things we don't, and if so, download them.
+// Returns (num downloaded, total local)
+async fn sync_download(
+ key: &Key,
+ force: bool,
+ client: &api_client::Client<'_>,
+ db: &impl Database,
+) -> Result<(i64, i64)> {
+ debug!("starting sync download");
+
+ let remote_status = client.status().await?;
+ let remote_count = remote_status.count;
+
+ // useful to ensure we don't even save something that hasn't yet been synced + deleted
+ let remote_deleted =
+ HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));
+
+ let initial_local = db.history_count(true).await?;
+ let mut local_count = initial_local;
+
+ let mut last_sync = if force {
+ OffsetDateTime::UNIX_EPOCH
+ } else {
+ Settings::last_sync().await?
+ };
+
+ let mut last_timestamp = OffsetDateTime::UNIX_EPOCH;
+
+ let host = if force { Some(String::from("")) } else { None };
+
+ while remote_count > local_count {
+ let page = client
+ .get_history(last_sync, last_timestamp, host.clone())
+ .await?;
+
+ let history: Vec<_> = page
+ .history
+ .iter()
+ // TODO: handle deletion earlier in this chain
+ .map(|h| serde_json::from_str(h).expect("invalid base64"))
+ .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
+ .map(|mut h| {
+ if remote_deleted.contains(h.id.0.as_str()) {
+ h.deleted_at = Some(time::OffsetDateTime::now_utc());
+ h.command = String::from("");
+ }
+
+ h
+ })
+ .collect();
+
+ db.save_bulk(&history).await?;
+
+ local_count = db.history_count(true).await?;
+ let remote_page_size = std::cmp::max(remote_status.page_size, 0) as usize;
+
+ if history.len() < remote_page_size {
+ break;
+ }
+
+ let page_last = history
+ .last()
+ .expect("could not get last element of page")
+ .timestamp;
+
+ // in the case of a small sync frequency, it's possible for history to
+ // be "lost" between syncs. In this case we need to rewind the sync
+ // timestamps
+ if page_last == last_timestamp {
+ last_timestamp = OffsetDateTime::UNIX_EPOCH;
+ last_sync -= time::Duration::hours(1);
+ } else {
+ last_timestamp = page_last;
+ }
+ }
+
+ for i in remote_status.deleted {
+ // we will update the stored history to have this data
+ // pretty much everything can be nullified
+ match db.load(i.as_str()).await? {
+ Some(h) => {
+ db.delete(h).await?;
+ }
+ _ => {
+ info!(
+ "could not delete history with id {}, not found locally",
+ i.as_str()
+ );
+ }
+ }
+ }
+
+ Ok((local_count - initial_local, local_count))
+}
+
+// Check if we have things remote doesn't, and if so, upload them
+async fn sync_upload(
+ key: &Key,
+ _force: bool,
+ client: &api_client::Client<'_>,
+ db: &impl Database,
+) -> Result<()> {
+ debug!("starting sync upload");
+
+ let remote_status = client.status().await?;
+ let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());
+
+ let initial_remote_count = client.count().await?;
+ let mut remote_count = initial_remote_count;
+
+ let local_count = db.history_count(true).await?;
+
+ debug!("remote has {remote_count}, we have {local_count}");
+
+ // first just try the most recent set
+ let mut cursor = OffsetDateTime::now_utc();
+
+ while local_count > remote_count {
+ let last = db.before(cursor, remote_status.page_size).await?;
+ let mut buffer = Vec::new();
+
+ if last.is_empty() {
+ break;
+ }
+
+ for i in last {
+ let data = encrypt(&i, key)?;
+ let data = serde_json::to_string(&data)?;
+
+ let add_hist = AddHistoryRequest {
+ id: i.id.to_string(),
+ timestamp: i.timestamp,
+ data,
+ hostname: hash_str(&i.hostname),
+ };
+
+ buffer.push(add_hist);
+ }
+
+ // anything left over outside of the 100 block size
+ client.post_history(&buffer).await?;
+ cursor = buffer.last().unwrap().timestamp;
+ remote_count = client.count().await?;
+
+ debug!("upload cursor: {cursor:?}");
+ }
+
+ let deleted = db.deleted().await?;
+
+ for i in deleted {
+ if remote_deleted.contains(&i.id.to_string()) {
+ continue;
+ }
+
+ info!("deleting {} on remote", i.id);
+ client.delete_history(i).await?;
+ }
+
+ Ok(())
+}
+
+pub async fn sync(settings: &Settings, force: bool, db: &impl Database) -> Result<()> {
+ let client = api_client::Client::new(
+ &settings.sync_address,
+ settings.sync_auth_token().await?,
+ settings.network_connect_timeout,
+ settings.network_timeout,
+ )?;
+
+ Settings::save_sync_time().await?;
+
+ let key = load_key(settings)?; // encryption key
+
+ sync_upload(&key, force, &client, db).await?;
+
+ let download = sync_download(&key, force, &client, db).await?;
+
+ debug!("sync downloaded {}", download.0);
+
+ Ok(())
+}