aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_daemon/components/sync.rs
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/turtle/src/atuin_daemon/components/sync.rs
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/turtle/src/atuin_daemon/components/sync.rs')
-rw-r--r--crates/turtle/src/atuin_daemon/components/sync.rs279
1 files changed, 279 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs
new file mode 100644
index 00000000..c76fb71b
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/sync.rs
@@ -0,0 +1,279 @@
+//! Sync component.
+//!
+//! Handles periodic synchronization with the Atuin cloud server.
+
+use std::time::Duration;
+
+use eyre::Result;
+use rand::Rng;
+use tokio::sync::mpsc;
+use tokio::time::{self, MissedTickBehavior};
+
+use crate::atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+};
+
+/// Commands that can be sent to the sync task.
+enum SyncCommand {
+ /// Trigger an immediate sync.
+ ForceSync,
+ /// Stop the sync loop.
+ Stop,
+}
+
+/// Sync state - tracks whether we're in normal operation or retrying after failure.
+#[derive(Clone, Copy, PartialEq, Eq)]
+enum SyncState {
+ /// Normal operation. Periodic syncs only run if auto_sync is enabled.
+ Idle,
+ /// Retrying after a sync failure. Retries continue regardless of auto_sync
+ /// until the sync succeeds.
+ Retrying,
+}
+
+/// Sync component - handles periodic cloud synchronization.
+///
+/// This component:
+/// - Runs a background sync loop on a configurable interval
+/// - Implements exponential backoff on sync failures
+/// - Responds to ForceSync events for immediate sync
+/// - Emits SyncCompleted/SyncFailed events
+pub struct SyncComponent {
+ task_handle: Option<tokio::task::JoinHandle<()>>,
+ command_tx: Option<mpsc::Sender<SyncCommand>>,
+}
+
+impl SyncComponent {
+ /// Create a new sync component.
+ pub fn new() -> Self {
+ Self {
+ task_handle: None,
+ command_tx: None,
+ }
+ }
+}
+
+impl Default for SyncComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SyncComponent {
+ fn name(&self) -> &'static str {
+ "sync"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ let (cmd_tx, cmd_rx) = mpsc::channel(16);
+ self.command_tx = Some(cmd_tx);
+
+ // Spawn the sync loop with its own copy of the handle
+ self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
+
+ tracing::info!("sync component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::ForceSync = event {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::ForceSync).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::Stop).await;
+ }
+ if let Some(handle) = self.task_handle.take() {
+ // Give the task a moment to shut down gracefully
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ tracing::info!("sync component stopped");
+ Ok(())
+ }
+}
+
+/// The main sync loop.
+///
+/// This runs in a spawned task and handles periodic sync as well as
+/// force sync requests.
+async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
+ tracing::info!("sync loop starting");
+
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+ let host_id = match Settings::host_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("failed to get host id, sync disabled: {e}");
+ return;
+ }
+ };
+
+ // Create the stores we need
+ let encryption_key = *handle.encryption_key();
+ let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
+
+ // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
+ let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop.
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ let mut sync_state = SyncState::Idle;
+
+ loop {
+ tokio::select! {
+ _ = ticker.tick() => {
+ let settings = handle.settings().await;
+
+ // Skip periodic ticks if auto_sync is disabled AND we're not retrying
+ // a previous failure. Retries must continue regardless of auto_sync.
+ if !settings.auto_sync && sync_state == SyncState::Idle {
+ tracing::debug!("auto_sync disabled, skipping periodic sync tick");
+ continue;
+ }
+
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ cmd = cmd_rx.recv() => {
+ match cmd {
+ Some(SyncCommand::ForceSync) => {
+ tracing::info!("executing force sync");
+ let settings = handle.settings().await;
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ Some(SyncCommand::Stop) | None => {
+ tracing::info!("sync loop stopping");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Execute a single sync tick.
+///
+/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
+async fn do_sync_tick(
+ handle: &DaemonHandle,
+ history_store: &HistoryStore,
+ ticker: &mut time::Interval,
+ max_interval: f64,
+ settings: &Settings,
+) -> SyncState {
+ tracing::info!("sync tick");
+
+ // Check if logged in
+ let logged_in = match settings.logged_in().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!("failed to check login status, skipping sync tick: {e}");
+ return SyncState::Idle;
+ }
+ };
+
+ if !logged_in {
+ tracing::debug!("not logged in, skipping sync tick");
+ return SyncState::Idle;
+ }
+
+ // Perform the sync
+ let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
+
+ match res {
+ Err(e) => {
+ tracing::error!("sync tick failed with {e}");
+
+ // Emit failure event
+ handle.emit(DaemonEvent::SyncFailed {
+ error: e.to_string(),
+ });
+
+ // Exponential backoff
+ let mut rng = rand::thread_rng();
+ let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ if new_interval > max_interval {
+ new_interval = max_interval;
+ }
+
+ *ticker = time::interval_at(
+ tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
+ time::Duration::from_secs(new_interval as u64),
+ );
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+
+ SyncState::Retrying
+ }
+ Ok((uploaded_count, downloaded_records)) => {
+ tracing::info!(
+ uploaded = uploaded_count,
+ downloaded = downloaded_records.len(),
+ "sync complete"
+ );
+
+ // Build history from downloaded records
+ if let Err(e) = history_store
+ .incremental_build(handle.history_db(), &downloaded_records)
+ .await
+ {
+ tracing::error!("failed to build history from downloaded records: {e}");
+ }
+
+ // Emit the records added event (for search indexing)
+ handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
+
+ // Emit sync completed event
+ handle.emit(DaemonEvent::SyncCompleted {
+ uploaded: uploaded_count as usize,
+ downloaded: downloaded_records.len(),
+ });
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ *ticker = time::interval_at(
+ tokio::time::Instant::now()
+ + Duration::from_secs(settings.daemon.sync_frequency),
+ time::Duration::from_secs(settings.daemon.sync_frequency),
+ );
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ }
+
+ // Store sync time
+ if let Err(e) = Settings::save_sync_time().await {
+ tracing::error!("failed to save sync time: {e}");
+ }
+
+ SyncState::Idle
+ }
+ }
+}