From 5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 Mon Sep 17 00:00:00 2001 From: Benedikt Peetz Date: Thu, 11 Jun 2026 00:54:30 +0200 Subject: chore: Move everything into one big crate That helps remove duplicated code and rustc/cargo will now also show dead code correctly. --- crates/atuin-daemon/src/components/sync.rs | 279 ----------------------------- 1 file changed, 279 deletions(-) delete mode 100644 crates/atuin-daemon/src/components/sync.rs (limited to 'crates/atuin-daemon/src/components/sync.rs') diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs deleted file mode 100644 index 6e486250..00000000 --- a/crates/atuin-daemon/src/components/sync.rs +++ /dev/null @@ -1,279 +0,0 @@ -//! Sync component. -//! -//! Handles periodic synchronization with the Atuin cloud server. - -use std::time::Duration; - -use eyre::Result; -use rand::Rng; -use tokio::sync::mpsc; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, -}; - -/// Commands that can be sent to the sync task. -enum SyncCommand { - /// Trigger an immediate sync. - ForceSync, - /// Stop the sync loop. - Stop, -} - -/// Sync state - tracks whether we're in normal operation or retrying after failure. -#[derive(Clone, Copy, PartialEq, Eq)] -enum SyncState { - /// Normal operation. Periodic syncs only run if auto_sync is enabled. - Idle, - /// Retrying after a sync failure. Retries continue regardless of auto_sync - /// until the sync succeeds. - Retrying, -} - -/// Sync component - handles periodic cloud synchronization. -/// -/// This component: -/// - Runs a background sync loop on a configurable interval -/// - Implements exponential backoff on sync failures -/// - Responds to ForceSync events for immediate sync -/// - Emits SyncCompleted/SyncFailed events -pub struct SyncComponent { - task_handle: Option>, - command_tx: Option>, -} - -impl SyncComponent { - /// Create a new sync component. - pub fn new() -> Self { - Self { - task_handle: None, - command_tx: None, - } - } -} - -impl Default for SyncComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SyncComponent { - fn name(&self) -> &'static str { - "sync" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - let (cmd_tx, cmd_rx) = mpsc::channel(16); - self.command_tx = Some(cmd_tx); - - // Spawn the sync loop with its own copy of the handle - self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); - - tracing::info!("sync component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::ForceSync = event { - tracing::info!("force sync requested"); - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::ForceSync).await; - } - } - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::Stop).await; - } - if let Some(handle) = self.task_handle.take() { - // Give the task a moment to shut down gracefully - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; - } - tracing::info!("sync component stopped"); - Ok(()) - } -} - -/// The main sync loop. -/// -/// This runs in a spawned task and handles periodic sync as well as -/// force sync requests. -async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver) { - tracing::info!("sync loop starting"); - - // Clone settings since we need them across await points - let settings = handle.settings().await.clone(); - let host_id = match Settings::host_id().await { - Ok(id) => id, - Err(e) => { - tracing::error!("failed to get host id, sync disabled: {e}"); - return; - } - }; - - // Create the stores we need - let encryption_key = *handle.encryption_key(); - let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut sync_state = SyncState::Idle; - - loop { - tokio::select! { - _ = ticker.tick() => { - let settings = handle.settings().await; - - // Skip periodic ticks if auto_sync is disabled AND we're not retrying - // a previous failure. Retries must continue regardless of auto_sync. - if !settings.auto_sync && sync_state == SyncState::Idle { - tracing::debug!("auto_sync disabled, skipping periodic sync tick"); - continue; - } - - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - cmd = cmd_rx.recv() => { - match cmd { - Some(SyncCommand::ForceSync) => { - tracing::info!("executing force sync"); - let settings = handle.settings().await; - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - Some(SyncCommand::Stop) | None => { - tracing::info!("sync loop stopping"); - break; - } - } - } - } - } -} - -/// Execute a single sync tick. -/// -/// Returns the new sync state: `Idle` on success, `Retrying` on failure. -async fn do_sync_tick( - handle: &DaemonHandle, - history_store: &HistoryStore, - ticker: &mut time::Interval, - max_interval: f64, - settings: &Settings, -) -> SyncState { - tracing::info!("sync tick"); - - // Check if logged in - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - return SyncState::Idle; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - return SyncState::Idle; - } - - // Perform the sync - let res = sync::sync(settings, handle.store(), handle.encryption_key()).await; - - match res { - Err(e) => { - tracing::error!("sync tick failed with {e}"); - - // Emit failure event - handle.emit(DaemonEvent::SyncFailed { - error: e.to_string(), - }); - - // Exponential backoff - let mut rng = rand::thread_rng(); - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - *ticker = time::interval_at( - tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), - time::Duration::from_secs(new_interval as u64), - ); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - tracing::error!("backing off, next sync tick in {new_interval}"); - - SyncState::Retrying - } - Ok((uploaded_count, downloaded_records)) => { - tracing::info!( - uploaded = uploaded_count, - downloaded = downloaded_records.len(), - "sync complete" - ); - - // Build history from downloaded records - if let Err(e) = history_store - .incremental_build(handle.history_db(), &downloaded_records) - .await - { - tracing::error!("failed to build history from downloaded records: {e}"); - } - - // Emit the records added event (for search indexing) - handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); - - // Emit sync completed event - handle.emit(DaemonEvent::SyncCompleted { - uploaded: uploaded_count as usize, - downloaded: downloaded_records.len(), - }); - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - *ticker = time::interval_at( - tokio::time::Instant::now() - + Duration::from_secs(settings.daemon.sync_frequency), - time::Duration::from_secs(settings.daemon.sync_frequency), - ); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - } - - // Store sync time - if let Err(e) = Settings::save_sync_time().await { - tracing::error!("failed to save sync time: {e}"); - } - - SyncState::Idle - } - } -} -- cgit v1.3.1