diff options
Diffstat (limited to 'crates/atuin-daemon/src/components')
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 252 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/mod.rs | 22 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/search.rs | 394 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/sync.rs | 257 |
4 files changed, 925 insertions, 0 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs new file mode 100644 index 00000000..23d48c5e --- /dev/null +++ b/crates/atuin-daemon/src/components/history.rs @@ -0,0 +1,252 @@ +//! History component. +//! +//! Handles command history lifecycle (start/end) and provides the History gRPC service. + +use std::sync::Arc; + +use atuin_client::{ + database::Database, + history::{History, HistoryId, store::HistoryStore}, + settings::Settings, +}; +use dashmap::DashMap; +use eyre::Result; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Level, instrument}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + history::{ + EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, + StartHistoryRequest, StatusReply, StatusRequest, + history_server::{History as HistorySvc, HistoryServer}, + }, +}; + +const DAEMON_PROTOCOL_VERSION: u32 = 1; + +/// History component - manages command history lifecycle. +/// +/// This component: +/// - Tracks currently running commands (stored in memory) +/// - Saves completed commands to the database and record store +/// - Emits history events for other components (e.g., search indexing) +/// - Provides the History gRPC service +pub struct HistoryComponent { + inner: Arc<HistoryComponentInner>, +} + +struct HistoryComponentInner { + /// Commands currently running (not yet completed). + running: DashMap<HistoryId, History>, + + /// Handle to the daemon (set during start). + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + + /// History store for pushing records (set during start). + history_store: tokio::sync::RwLock<Option<HistoryStore>>, +} + +impl HistoryComponent { + /// Create a new history component. + pub fn new() -> Self { + Self { + inner: Arc::new(HistoryComponentInner { + running: DashMap::new(), + handle: tokio::sync::RwLock::new(None), + history_store: tokio::sync::RwLock::new(None), + }), + } + } + + /// Get the gRPC service for this component. + /// + /// This returns a tonic service that can be added to a gRPC server. + pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> { + HistoryServer::new(HistoryGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for HistoryComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for HistoryComponent { + fn name(&self) -> &'static str { + "history" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + // Create the history store + let host_id = Settings::host_id().await?; + let history_store = + HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); + + *self.inner.history_store.write().await = Some(history_store); + *self.inner.handle.write().await = Some(handle); + + tracing::info!("history component started"); + Ok(()) + } + + async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { + // History component produces events but doesn't need to react to them + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + tracing::info!("history component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +/// +/// This is a thin wrapper that delegates to the component's shared state. +pub struct HistoryGrpcService { + inner: Arc<HistoryComponentInner>, +} + +#[tonic::async_trait] +impl HistorySvc for HistoryGrpcService { + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request<StartHistoryRequest>, + ) -> Result<Response<StartHistoryReply>, Status> { + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .build() + .into(); + + // Emit the event + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.emit(DaemonEvent::HistoryStarted(h.clone())); + } + + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + self.inner.running.insert(id.clone(), h); + + let reply = StartHistoryReply { + id: id.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request<EndHistoryRequest>, + ) -> Result<Response<EndHistoryReply>, Status> { + let req = request.into_inner(); + let id = HistoryId(req.id); + + if let Some((_, mut history)) = self.inner.running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Get the handle and store to save the history + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let store_guard = self.inner.history_store.read().await; + let history_store = store_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + // Save to database + handle + .history_db() + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + // Push to record store + let (record_id, idx) = history_store + .push(history.clone()) + .await + .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; + + // Emit the event + handle.emit(DaemonEvent::HistoryEnded(history)); + + let reply = EndHistoryReply { + id: record_id.0.to_string(), + idx, + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn status( + &self, + _request: Request<StatusRequest>, + ) -> Result<Response<StatusReply>, Status> { + let reply = StatusReply { + healthy: true, + version: env!("CARGO_PKG_VERSION").to_string(), + pid: std::process::id(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn shutdown( + &self, + _request: Request<ShutdownRequest>, + ) -> Result<Response<ShutdownReply>, Status> { + // Use the daemon handle to request shutdown + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.shutdown(); + } + Ok(Response::new(ShutdownReply { accepted: true })) + } +} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs new file mode 100644 index 00000000..5950d5d5 --- /dev/null +++ b/crates/atuin-daemon/src/components/mod.rs @@ -0,0 +1,22 @@ +//! Daemon components. +//! +//! Components are the building blocks of the daemon. Each component handles +//! a specific domain and can: +//! +//! - Expose gRPC services +//! - React to events +//! - Spawn background tasks +//! +//! Available components: +//! +//! - [`history::HistoryComponent`]: Command history lifecycle management +//! - [`search::SearchComponent`]: Fuzzy search over history +//! - [`sync::SyncComponent`]: Cloud sync + +pub mod history; +pub mod search; +pub mod sync; + +pub use history::HistoryComponent; +pub use search::SearchComponent; +pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs new file mode 100644 index 00000000..7fb59dea --- /dev/null +++ b/crates/atuin-daemon/src/components/search.rs @@ -0,0 +1,394 @@ +//! Search component. +//! +//! Provides fuzzy search over command history using the Nucleo search library +//! with frecency-based ranking and dynamic filtering. + +use std::{pin::Pin, sync::Arc}; + +use atuin_client::database::Database; +use eyre::Result; +use tokio::sync::RwLock; +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, debug, info, instrument, span, trace}; +use uuid::Uuid; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + search::{ + FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + search_server::{Search as SearchSvc, SearchServer}, + }, +}; + +const PAGE_SIZE: usize = 5000; +const RESULTS_LIMIT: u32 = 200; +/// How often to rebuild the frecency map (in seconds). +const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; + +/// Search component - provides fuzzy search over command history. +/// +/// This component: +/// - Maintains a deduplicated search index with frecency ranking +/// - Loads history from the database on startup +/// - Updates the index when history events occur +/// - Provides the Search gRPC service +pub struct SearchComponent { + index: Arc<RwLock<SearchIndex>>, + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + loader_handle: Option<tokio::task::JoinHandle<()>>, + frecency_handle: Option<tokio::task::JoinHandle<()>>, +} + +impl SearchComponent { + /// Create a new search component. + pub fn new() -> Self { + Self { + index: Arc::new(RwLock::new(SearchIndex::new())), + handle: tokio::sync::RwLock::new(None), + loader_handle: None, + frecency_handle: None, + } + } + + /// Get the gRPC service for this component. + pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> { + SearchServer::new(SearchGrpcService { + index: self.index.clone(), + }) + } + + /// Rebuild the entire search index from the database. + async fn rebuild_index(&self) -> Result<()> { + let handle_guard = self.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("component not initialized"))?; + + info!("Rebuilding search index from database"); + + // Create a new index + let new_index = SearchIndex::new(); + + // Load all history into the new index + let db = handle.history_db().clone(); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + new_index.add_histories(&histories); + } + Ok(None) => break, + Err(e) => { + tracing::error!("Failed to load history during rebuild: {}", e); + break; + } + } + } + + info!( + "Search index rebuild complete; {} unique commands", + new_index.command_count() + ); + + // Replace the old index with the new one + *self.index.write().await = new_index; + Ok(()) + } +} + +impl Default for SearchComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SearchComponent { + fn name(&self) -> &'static str { + "search" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + *self.handle.write().await = Some(handle.clone()); + + // Spawn background task to load history into index + let index = self.index.clone(); + let db = handle.history_db().clone(); + + self.loader_handle = Some(tokio::spawn(async move { + info!( + "Loading history into search index; page size = {}", + PAGE_SIZE + ); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + index.read().await.add_histories(&histories); + } + Ok(None) => { + info!( + "Initial history load complete; {} unique commands indexed", + index.read().await.command_count() + ); + // Build initial frecency map + index.read().await.rebuild_frecency().await; + info!("Initial frecency map built"); + break; + } + Err(e) => { + tracing::error!("Failed to load history: {}", e); + break; + } + } + } + })); + + // Spawn background task to periodically refresh frecency + let index_for_frecency = self.index.clone(); + self.frecency_handle = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + FRECENCY_REFRESH_INTERVAL_SECS, + )); + loop { + interval.tick().await; + trace!("Refreshing frecency map"); + index_for_frecency.read().await.rebuild_frecency().await; + } + })); + + tracing::info!("search component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::RecordsAdded(records) => { + debug!( + count = records.len(), + "Processing added records for search index" + ); + + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let histories: Vec<_> = handle + .history_db() + .query_history( + format!( + "select * from history where id in ({})", + records + .iter() + .map(|record| record.0.to_string()) + .collect::<Vec<_>>() + .join(",") + ) + .as_str(), + ) + .await + .unwrap_or_default(); + + span!(Level::TRACE, "inject_records", count = histories.len()) + .in_scope(async || { + self.index.read().await.add_histories(&histories); + }) + .await; + } + } + DaemonEvent::HistoryStarted(history) => { + debug!(id = %history.id, command = %history.command, "History started (no index action)"); + } + DaemonEvent::HistoryEnded(history) => { + span!(Level::TRACE, "inject_history_ended") + .in_scope(async || { + self.index.read().await.add_history(history); + }) + .await; + } + DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { + info!("History store pruned or rebuilt, rebuilding search index"); + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::HistoryDeleted { ids } => { + info!( + count = ids.len(), + "History deleted, rebuilding search index" + ); + // For now, just rebuild the entire index. A more efficient implementation + // would remove specific items from the index. + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + // Events we don't care about + DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } + | DaemonEvent::ForceSync + | DaemonEvent::SettingsReloaded + | DaemonEvent::ShutdownRequested => {} + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(handle) = self.loader_handle.take() { + handle.abort(); + } + if let Some(handle) = self.frecency_handle.take() { + handle.abort(); + } + tracing::info!("search component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +pub struct SearchGrpcService { + index: Arc<RwLock<SearchIndex>>, +} + +#[tonic::async_trait] +impl SearchSvc for SearchGrpcService { + type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>; + + #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] + async fn search( + &self, + request: Request<Streaming<SearchRequest>>, + ) -> Result<Response<Self::SearchStream>, Status> { + let mut in_stream = request.into_inner(); + let index = self.index.clone(); + + // Create output channel + let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128); + + // Spawn task to handle incoming requests and send responses + tokio::spawn(async move { + while let Some(req) = in_stream.message().await.transpose() { + match req { + Ok(search_req) => { + let query = search_req.query; + let query_id = search_req.query_id; + let filter_mode: FilterMode = search_req + .filter_mode + .try_into() + .unwrap_or(FilterMode::Global); + let proto_context = search_req.context; + + debug!( + "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", + query, + query_id, + filter_mode.as_str_name(), + proto_context + ); + + // Convert proto FilterMode + context to IndexFilterMode + let index_filter = convert_filter_mode(filter_mode, &proto_context); + + // Build QueryContext from proto context + let query_context = proto_context + .map(|ctx| QueryContext { + cwd: Some(with_trailing_slash(&ctx.cwd)), + git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), + hostname: Some(ctx.hostname), + session_id: Some(ctx.session_id), + }) + .unwrap_or_default(); + + // Perform the search + let history_ids = + span!(Level::TRACE, "daemon_search_query", %query, query_id) + .in_scope(|| async { + let index = index.read().await; + index + .search(&query, index_filter, &query_context, RESULTS_LIMIT) + .await + }) + .await; + + // Convert history IDs to bytes + let ids: Vec<Vec<u8>> = history_ids + .iter() + .filter_map(|id| { + Uuid::parse_str(id) + .ok() + .map(|uuid| uuid.as_bytes().to_vec()) + }) + .collect(); + + if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + // Convert receiver to stream + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream))) + } +} + +/// Convert proto FilterMode and context to IndexFilterMode. +fn convert_filter_mode( + mode: FilterMode, + context: &Option<crate::search::SearchContext>, +) -> IndexFilterMode { + match (mode, context) { + (FilterMode::Global, _) => IndexFilterMode::Global, + (FilterMode::Directory, Some(ctx)) => { + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + (FilterMode::Workspace, Some(ctx)) => { + if let Some(ref git_root) = ctx.git_root { + IndexFilterMode::Workspace(with_trailing_slash(git_root)) + } else { + // Fall back to directory if no git root + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + } + (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), + (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), + (FilterMode::SessionPreload, Some(ctx)) => { + // SessionPreload is similar to Session - filter by session + IndexFilterMode::Session(ctx.session_id.clone()) + } + // If no context provided, fall back to global + _ => IndexFilterMode::Global, + } +} + +#[cfg(windows)] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('\\') { + s.to_string() + } else { + format!("{}\\", s) + } +} + +#[cfg(not(windows))] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s.to_string() + } else { + format!("{}/", s) + } +} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs new file mode 100644 index 00000000..6217706a --- /dev/null +++ b/crates/atuin-daemon/src/components/sync.rs @@ -0,0 +1,257 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; +use atuin_dotfiles::store::{AliasStore, var::VarStore}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option<tokio::task::JoinHandle<()>>, + command_tx: Option<mpsc::Sender<SyncCommand>>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key); + let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = ticker.tick() => { + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + alias_store: &AliasStore, + var_store: &VarStore, + ticker: &mut time::Interval, + max_interval: f64, +) { + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return; + } + + // Perform the sync + let res = sync::sync(&settings, handle.store()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + + tracing::error!("backing off, next sync tick in {new_interval}"); + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Rebuild alias and var stores + if let Err(e) = alias_store.build().await { + tracing::error!("failed to rebuild alias store: {e}"); + } + if let Err(e) = var_store.build().await { + tracing::error!("failed to rebuild var store: {e}"); + } + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + } + } +} |
