diff options
Diffstat (limited to 'crates/turtle/src/atuin_daemon/components/sync.rs')
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/sync.rs | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs new file mode 100644 index 00000000..c76fb71b --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/sync.rs @@ -0,0 +1,279 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use std::time::Duration; + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use crate::atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; + +use crate::atuin_daemon::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync state - tracks whether we're in normal operation or retrying after failure. +#[derive(Clone, Copy, PartialEq, Eq)] +enum SyncState { + /// Normal operation. Periodic syncs only run if auto_sync is enabled. + Idle, + /// Retrying after a sync failure. Retries continue regardless of auto_sync + /// until the sync succeeds. + Retrying, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option<tokio::task::JoinHandle<()>>, + command_tx: Option<mpsc::Sender<SyncCommand>>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut sync_state = SyncState::Idle; + + loop { + tokio::select! { + _ = ticker.tick() => { + let settings = handle.settings().await; + + // Skip periodic ticks if auto_sync is disabled AND we're not retrying + // a previous failure. Retries must continue regardless of auto_sync. + if !settings.auto_sync && sync_state == SyncState::Idle { + tracing::debug!("auto_sync disabled, skipping periodic sync tick"); + continue; + } + + sync_state = do_sync_tick( + &handle, + &history_store, + &mut ticker, + max_interval, + &settings, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + let settings = handle.settings().await; + sync_state = do_sync_tick( + &handle, + &history_store, + &mut ticker, + max_interval, + &settings, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +/// +/// Returns the new sync state: `Idle` on success, `Retrying` on failure. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + ticker: &mut time::Interval, + max_interval: f64, + settings: &Settings, +) -> SyncState { + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return SyncState::Idle; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return SyncState::Idle; + } + + // Perform the sync + let res = sync::sync(settings, handle.store(), handle.encryption_key()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval_at( + tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), + time::Duration::from_secs(new_interval as u64), + ); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + tracing::error!("backing off, next sync tick in {new_interval}"); + + SyncState::Retrying + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval_at( + tokio::time::Instant::now() + + Duration::from_secs(settings.daemon.sync_frequency), + time::Duration::from_secs(settings.daemon.sync_frequency), + ); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + + SyncState::Idle + } + } +} |
