diff options
Diffstat (limited to 'crates/atuin-daemon/src/components')
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 327 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/mod.rs | 25 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/search.rs | 413 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/semantic.rs | 900 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/sync.rs | 279 |
5 files changed, 0 insertions, 1944 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs deleted file mode 100644 index c82c8f94..00000000 --- a/crates/atuin-daemon/src/components/history.rs +++ /dev/null @@ -1,327 +0,0 @@ -//! History component. -//! -//! Handles command history lifecycle (start/end) and provides the History gRPC service. - -use std::{pin::Pin, sync::Arc}; - -use atuin_client::{ - database::Database, - history::{History, HistoryId, store::HistoryStore}, - settings::Settings, -}; -use dashmap::DashMap; -use eyre::Result; -use time::OffsetDateTime; -use tokio_stream::Stream; -use tonic::{Request, Response, Status}; -use tracing::{Level, instrument}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - history::{ - EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, - ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, - TailHistoryReply, TailHistoryRequest, - history_server::{History as HistorySvc, HistoryServer}, - }, -}; - -const DAEMON_PROTOCOL_VERSION: u32 = 1; - -/// History component - manages command history lifecycle. -/// -/// This component: -/// - Tracks currently running commands (stored in memory) -/// - Saves completed commands to the database and record store -/// - Emits history events for other components (e.g., search indexing) -/// - Provides the History gRPC service -pub struct HistoryComponent { - inner: Arc<HistoryComponentInner>, -} - -struct HistoryComponentInner { - /// Commands currently running (not yet completed). - running: DashMap<HistoryId, History>, - - /// Handle to the daemon (set during start). - handle: tokio::sync::RwLock<Option<DaemonHandle>>, - - /// History store for pushing records (set during start). - history_store: tokio::sync::RwLock<Option<HistoryStore>>, -} - -impl HistoryComponent { - /// Create a new history component. - pub fn new() -> Self { - Self { - inner: Arc::new(HistoryComponentInner { - running: DashMap::new(), - handle: tokio::sync::RwLock::new(None), - history_store: tokio::sync::RwLock::new(None), - }), - } - } - - /// Get the gRPC service for this component. - /// - /// This returns a tonic service that can be added to a gRPC server. - pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> { - HistoryServer::new(HistoryGrpcService { - inner: self.inner.clone(), - }) - } -} - -impl Default for HistoryComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for HistoryComponent { - fn name(&self) -> &'static str { - "history" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - // Create the history store - let host_id = Settings::host_id().await?; - let history_store = - HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); - - *self.inner.history_store.write().await = Some(history_store); - *self.inner.handle.write().await = Some(handle); - - tracing::info!("history component started"); - Ok(()) - } - - async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { - // History component produces events but doesn't need to react to them - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - tracing::info!("history component stopped"); - Ok(()) - } -} - -/// The gRPC service implementation. -/// -/// This is a thin wrapper that delegates to the component's shared state. -pub struct HistoryGrpcService { - inner: Arc<HistoryComponentInner>, -} - -fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { - TailHistoryReply { - kind: kind as i32, - history: Some(HistoryEntry { - timestamp: history.timestamp.unix_timestamp_nanos() as u64, - id: history.id.0, - command: history.command, - cwd: history.cwd, - session: history.session, - hostname: history.hostname, - author: history.author, - intent: history.intent.unwrap_or_default(), - exit: history.exit, - duration: history.duration, - }), - } -} - -#[tonic::async_trait] -impl HistorySvc for HistoryGrpcService { - type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; - - #[instrument(skip_all, level = Level::INFO)] - async fn start_history( - &self, - request: Request<StartHistoryRequest>, - ) -> Result<Response<StartHistoryReply>, Status> { - let req = request.into_inner(); - - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { - Status::invalid_argument( - "failed to parse timestamp as unix time (expected nanos since epoch)", - ) - })?; - - let h: History = History::daemon() - .timestamp(timestamp) - .command(req.command) - .cwd(req.cwd) - .session(req.session) - .hostname(req.hostname) - .author(req.author) - .intent(req.intent) - .build() - .into(); - - // Emit the event - if let Some(handle) = self.inner.handle.read().await.as_ref() { - handle.emit(DaemonEvent::HistoryStarted(h.clone())); - } - - let id = h.id.clone(); - tracing::info!(id = id.to_string(), "start history"); - self.inner.running.insert(id.clone(), h); - - let reply = StartHistoryReply { - id: id.to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn end_history( - &self, - request: Request<EndHistoryRequest>, - ) -> Result<Response<EndHistoryReply>, Status> { - let req = request.into_inner(); - let id = HistoryId(req.id); - - if let Some((_, mut history)) = self.inner.running.remove(&id) { - history.exit = req.exit; - history.duration = match req.duration { - 0 => i64::try_from( - (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), - ) - .expect("failed to convert calculated duration to i64"), - value => i64::try_from(value).expect("failed to get i64 duration"), - }; - - // Get the handle and store to save the history - let handle_guard = self.inner.handle.read().await; - let handle = handle_guard - .as_ref() - .ok_or_else(|| Status::internal("component not initialized"))?; - - let store_guard = self.inner.history_store.read().await; - let history_store = store_guard - .as_ref() - .ok_or_else(|| Status::internal("component not initialized"))?; - - // Save to database - handle - .history_db() - .save(&history) - .await - .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); - - // Push to record store - let (record_id, idx) = history_store - .push(history.clone()) - .await - .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; - - // Emit the event - handle.emit(DaemonEvent::HistoryEnded(history)); - - let reply = EndHistoryReply { - id: record_id.0.to_string(), - idx, - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - return Ok(Response::new(reply)); - } - - Err(Status::not_found(format!( - "could not find history with id: {id}" - ))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn tail_history( - &self, - _request: Request<TailHistoryRequest>, - ) -> Result<Response<Self::TailHistoryStream>, Status> { - let handle_guard = self.inner.handle.read().await; - let handle = handle_guard - .as_ref() - .cloned() - .ok_or_else(|| Status::internal("component not initialized"))?; - - let mut rx = handle.subscribe(); - let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); - - tokio::spawn(async move { - loop { - let event = match rx.recv().await { - Ok(event) => event, - Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { - let _ = tx - .send(Err(Status::resource_exhausted(format!( - "tail stream lagged behind and dropped {skipped} events" - )))) - .await; - break; - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - }; - - let reply = match event { - DaemonEvent::HistoryStarted(history) => { - Some(history_to_tail_reply(HistoryEventKind::Started, history)) - } - DaemonEvent::HistoryEnded(history) => { - Some(history_to_tail_reply(HistoryEventKind::Ended, history)) - } - _ => None, - }; - - if let Some(reply) = reply - && tx.send(Ok(reply)).await.is_err() - { - break; - } - } - }); - - let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); - Ok(Response::new(Box::pin(stream))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn status( - &self, - _request: Request<StatusRequest>, - ) -> Result<Response<StatusReply>, Status> { - let reply = StatusReply { - healthy: true, - version: env!("CARGO_PKG_VERSION").to_string(), - pid: std::process::id(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn shutdown( - &self, - _request: Request<ShutdownRequest>, - ) -> Result<Response<ShutdownReply>, Status> { - // Use the daemon handle to request shutdown - if let Some(handle) = self.inner.handle.read().await.as_ref() { - handle.shutdown(); - } - Ok(Response::new(ShutdownReply { accepted: true })) - } -} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs deleted file mode 100644 index 447e31df..00000000 --- a/crates/atuin-daemon/src/components/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! Daemon components. -//! -//! Components are the building blocks of the daemon. Each component handles -//! a specific domain and can: -//! -//! - Expose gRPC services -//! - React to events -//! - Spawn background tasks -//! -//! Available components: -//! -//! - [`history::HistoryComponent`]: Command history lifecycle management -//! - [`search::SearchComponent`]: Fuzzy search over history -//! - [`semantic::SemanticComponent`]: In-memory semantic command captures -//! - [`sync::SyncComponent`]: Cloud sync - -pub mod history; -pub mod search; -pub mod semantic; -pub mod sync; - -pub use history::HistoryComponent; -pub use search::SearchComponent; -pub use semantic::SemanticComponent; -pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs deleted file mode 100644 index 9fc87fae..00000000 --- a/crates/atuin-daemon/src/components/search.rs +++ /dev/null @@ -1,413 +0,0 @@ -//! Search component. -//! -//! Provides fuzzy search over command history using the Nucleo search library -//! with frecency-based ranking and dynamic filtering. - -use std::{pin::Pin, sync::Arc}; - -use atuin_client::database::Database; -use eyre::Result; -use tokio::sync::RwLock; -use tokio_stream::Stream; -use tonic::{Request, Response, Status, Streaming}; -use tracing::{Level, debug, info, instrument, span, trace}; -use uuid::Uuid; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - search::{ - FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, - search_server::{Search as SearchSvc, SearchServer}, - }, -}; - -const PAGE_SIZE: usize = 5000; -const RESULTS_LIMIT: u32 = 200; -/// How often to rebuild the frecency map (in seconds). -const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; - -/// Search component - provides fuzzy search over command history. -/// -/// This component: -/// - Maintains a deduplicated search index with frecency ranking -/// - Loads history from the database on startup -/// - Updates the index when history events occur -/// - Provides the Search gRPC service -pub struct SearchComponent { - index: Arc<RwLock<SearchIndex>>, - handle: tokio::sync::RwLock<Option<DaemonHandle>>, - loader_handle: Option<tokio::task::JoinHandle<()>>, - frecency_handle: Option<tokio::task::JoinHandle<()>>, -} - -impl SearchComponent { - /// Create a new search component. - pub fn new() -> Self { - Self { - index: Arc::new(RwLock::new(SearchIndex::new())), - handle: tokio::sync::RwLock::new(None), - loader_handle: None, - frecency_handle: None, - } - } - - /// Get the gRPC service for this component. - pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> { - SearchServer::new(SearchGrpcService { - index: self.index.clone(), - }) - } - - /// Rebuild the entire search index from the database. - async fn rebuild_index(&self) -> Result<()> { - let handle_guard = self.handle.read().await; - let handle = handle_guard - .as_ref() - .ok_or_else(|| eyre::eyre!("component not initialized"))?; - - info!("Rebuilding search index from database"); - - // Create a new index - let new_index = SearchIndex::new(); - - // Load all history into the new index - let db = handle.history_db().clone(); - let mut pager = db.all_paged(PAGE_SIZE, false, true); - loop { - match pager.next().await { - Ok(Some(histories)) => { - info!( - "Loading {} history entries into search index", - histories.len() - ); - new_index.add_histories(&histories); - } - Ok(None) => break, - Err(e) => { - tracing::error!("Failed to load history during rebuild: {}", e); - break; - } - } - } - - info!( - "Search index rebuild complete; {} unique commands", - new_index.command_count() - ); - - // Replace the old index with the new one - *self.index.write().await = new_index; - Ok(()) - } -} - -impl Default for SearchComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SearchComponent { - fn name(&self) -> &'static str { - "search" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - *self.handle.write().await = Some(handle.clone()); - - // Spawn background task to load history into index - let index = self.index.clone(); - let db = handle.history_db().clone(); - let handle_for_loader = handle.clone(); - - self.loader_handle = Some(tokio::spawn(async move { - info!( - "Loading history into search index; page size = {}", - PAGE_SIZE - ); - let mut pager = db.all_paged(PAGE_SIZE, false, true); - loop { - match pager.next().await { - Ok(Some(histories)) => { - info!( - "Loading {} history entries into search index", - histories.len() - ); - index.read().await.add_histories(&histories); - } - Ok(None) => { - info!( - "Initial history load complete; {} unique commands indexed", - index.read().await.command_count() - ); - // Build initial frecency map with current settings - let settings = handle_for_loader.settings().await; - index.read().await.rebuild_frecency(&settings.search).await; - info!("Initial frecency map built"); - break; - } - Err(e) => { - tracing::error!("Failed to load history: {}", e); - break; - } - } - } - })); - - // Spawn background task to periodically refresh frecency - let index_for_frecency = self.index.clone(); - let handle_for_frecency = handle.clone(); - self.frecency_handle = Some(tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs( - FRECENCY_REFRESH_INTERVAL_SECS, - )); - loop { - interval.tick().await; - trace!("Refreshing frecency map"); - let settings = handle_for_frecency.settings().await; - index_for_frecency - .read() - .await - .rebuild_frecency(&settings.search) - .await; - } - })); - - tracing::info!("search component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - match event { - DaemonEvent::RecordsAdded(records) => { - debug!( - count = records.len(), - "Processing added records for search index" - ); - - let handle_guard = self.handle.read().await; - if let Some(handle) = handle_guard.as_ref() { - let histories: Vec<_> = handle - .history_db() - .query_history( - format!( - "select * from history where id in ({})", - records - .iter() - .map(|record| record.0.to_string()) - .collect::<Vec<_>>() - .join(",") - ) - .as_str(), - ) - .await - .unwrap_or_default(); - - span!(Level::TRACE, "inject_records", count = histories.len()) - .in_scope(async || { - self.index.read().await.add_histories(&histories); - }) - .await; - } - } - DaemonEvent::HistoryStarted(history) => { - debug!(id = %history.id, command = %history.command, "History started (no index action)"); - } - DaemonEvent::HistoryEnded(history) => { - span!(Level::TRACE, "inject_history_ended") - .in_scope(async || { - self.index.read().await.add_history(history); - }) - .await; - } - DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { - info!("History store pruned or rebuilt, rebuilding search index"); - if let Err(e) = self.rebuild_index().await { - tracing::error!("Failed to rebuild search index: {}", e); - } - } - DaemonEvent::HistoryDeleted { ids } => { - info!( - count = ids.len(), - "History deleted, rebuilding search index" - ); - // For now, just rebuild the entire index. A more efficient implementation - // would remove specific items from the index. - if let Err(e) = self.rebuild_index().await { - tracing::error!("Failed to rebuild search index: {}", e); - } - } - DaemonEvent::SettingsReloaded => { - info!("Settings reloaded, rebuilding frecency map with new multipliers"); - let handle_guard = self.handle.read().await; - if let Some(handle) = handle_guard.as_ref() { - let settings = handle.settings().await; - self.index - .read() - .await - .rebuild_frecency(&settings.search) - .await; - } - } - // Events we don't care about - DaemonEvent::SyncCompleted { .. } - | DaemonEvent::SyncFailed { .. } - | DaemonEvent::ForceSync - | DaemonEvent::ShutdownRequested => {} - } - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - if let Some(handle) = self.loader_handle.take() { - handle.abort(); - } - if let Some(handle) = self.frecency_handle.take() { - handle.abort(); - } - tracing::info!("search component stopped"); - Ok(()) - } -} - -/// The gRPC service implementation. -pub struct SearchGrpcService { - index: Arc<RwLock<SearchIndex>>, -} - -#[tonic::async_trait] -impl SearchSvc for SearchGrpcService { - type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>; - - #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] - async fn search( - &self, - request: Request<Streaming<SearchRequest>>, - ) -> Result<Response<Self::SearchStream>, Status> { - let mut in_stream = request.into_inner(); - let index = self.index.clone(); - - // Create output channel - let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128); - - // Spawn task to handle incoming requests and send responses - tokio::spawn(async move { - while let Some(req) = in_stream.message().await.transpose() { - match req { - Ok(search_req) => { - let query = search_req.query; - let query_id = search_req.query_id; - let filter_mode: FilterMode = search_req - .filter_mode - .try_into() - .unwrap_or(FilterMode::Global); - let proto_context = search_req.context; - - debug!( - "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", - query, - query_id, - filter_mode.as_str_name(), - proto_context - ); - - // Convert proto FilterMode + context to IndexFilterMode - let index_filter = convert_filter_mode(filter_mode, &proto_context); - - // Build QueryContext from proto context - let query_context = proto_context - .map(|ctx| QueryContext { - cwd: Some(with_trailing_slash(&ctx.cwd)), - git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), - hostname: Some(ctx.hostname), - session_id: Some(ctx.session_id), - }) - .unwrap_or_default(); - - // Perform the search - let history_ids = - span!(Level::TRACE, "daemon_search_query", %query, query_id) - .in_scope(|| async { - let index = index.read().await; - index - .search(&query, index_filter, &query_context, RESULTS_LIMIT) - .await - }) - .await; - - // Convert history IDs to bytes - let ids: Vec<Vec<u8>> = history_ids - .iter() - .filter_map(|id| { - Uuid::parse_str(id) - .ok() - .map(|uuid| uuid.as_bytes().to_vec()) - }) - .collect(); - - if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { - break; // Client disconnected - } - } - Err(e) => { - let _ = tx.send(Err(e)).await; - break; - } - } - } - }); - - // Convert receiver to stream - let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream))) - } -} - -/// Convert proto FilterMode and context to IndexFilterMode. -fn convert_filter_mode( - mode: FilterMode, - context: &Option<crate::search::SearchContext>, -) -> IndexFilterMode { - match (mode, context) { - (FilterMode::Global, _) => IndexFilterMode::Global, - (FilterMode::Directory, Some(ctx)) => { - IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) - } - (FilterMode::Workspace, Some(ctx)) => { - if let Some(ref git_root) = ctx.git_root { - IndexFilterMode::Workspace(with_trailing_slash(git_root)) - } else { - // Fall back to directory if no git root - IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) - } - } - (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), - (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), - (FilterMode::SessionPreload, Some(ctx)) => { - // SessionPreload is similar to Session - filter by session - IndexFilterMode::Session(ctx.session_id.clone()) - } - // If no context provided, fall back to global - _ => IndexFilterMode::Global, - } -} - -#[cfg(windows)] -pub fn with_trailing_slash(s: &str) -> String { - if s.ends_with('\\') { - s.to_string() - } else { - format!("{}\\", s) - } -} - -#[cfg(not(windows))] -pub fn with_trailing_slash(s: &str) -> String { - if s.ends_with('/') { - s.to_string() - } else { - format!("{}/", s) - } -} diff --git a/crates/atuin-daemon/src/components/semantic.rs b/crates/atuin-daemon/src/components/semantic.rs deleted file mode 100644 index dff38fd3..00000000 --- a/crates/atuin-daemon/src/components/semantic.rs +++ /dev/null @@ -1,900 +0,0 @@ -//! Semantic command capture component. -//! -//! This is a prototype in-memory store for completed command captures emitted -//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes -//! them by history ID for AI tool lookup. - -use std::collections::{HashMap, VecDeque}; -use std::fmt::{Display, Formatter}; -use std::sync::Arc; - -use atuin_client::history::{History, HistoryId}; -use eyre::Result; -use tokio::sync::Mutex; -use tonic::{Request, Response, Status, Streaming}; -use tracing::{Level, instrument}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - semantic::{ - CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply, - semantic_server::{Semantic as SemanticSvc, SemanticServer}, - }, -}; - -const MAX_SESSIONS: usize = 20; -const MAX_COMMANDS_PER_SESSION: usize = 128; -const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024; -const MAX_PENDING_HISTORIES: usize = 128; - -/// Stores completed command captures and associates them with history events. -pub struct SemanticComponent { - inner: Arc<SemanticComponentInner>, -} - -struct SemanticComponentInner { - state: Mutex<SemanticState>, -} - -#[derive(Default)] -struct SemanticState { - sessions: HashMap<SessionId, SessionCaptures>, - session_lru: VecDeque<SessionId>, - history_index: HashMap<HistoryId, CaptureRef>, - pending_histories: VecDeque<History>, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct SessionId(String); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -struct CaptureId(u64); - -#[derive(Debug, Clone, PartialEq, Eq)] -struct CaptureRef { - session_id: SessionId, - capture_id: CaptureId, -} - -#[derive(Default)] -struct SessionCaptures { - next_id: u64, - records: VecDeque<StoredCapture>, - output_bytes: usize, -} - -struct StoredCapture { - id: CaptureId, - history_id: HistoryId, - output_bytes: usize, - record: SemanticCommandRecord, -} - -struct EvictedCapture { - history_id: HistoryId, - capture_id: CaptureId, -} - -#[derive(Debug, Clone)] -struct SemanticCommandRecord { - capture: CommandCapture, - history: Option<History>, -} - -impl SemanticComponent { - pub fn new() -> Self { - Self { - inner: Arc::new(SemanticComponentInner { - state: Mutex::new(SemanticState::default()), - }), - } - } - - pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> { - SemanticServer::new(SemanticGrpcService { - inner: self.inner.clone(), - }) - } -} - -impl Default for SemanticComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SemanticComponent { - fn name(&self) -> &'static str { - "semantic" - } - - async fn start(&mut self, _handle: DaemonHandle) -> Result<()> { - tracing::info!("semantic component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::HistoryEnded(history) = event { - self.inner.record_history(history.clone()).await; - } - - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - let state = self.inner.state.lock().await; - tracing::info!( - sessions = state.sessions.len(), - records = state.record_count(), - indexed_histories = state.history_index.len(), - pending_histories = state.pending_histories.len(), - "semantic component stopped" - ); - Ok(()) - } -} - -impl SemanticComponentInner { - async fn record_capture(&self, capture: CommandCapture) -> bool { - let mut state = self.state.lock().await; - state.record_capture(capture) - } - - async fn record_history(&self, history: History) { - let mut state = self.state.lock().await; - state.record_history(history); - } - - async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply { - let mut state = self.state.lock().await; - state.command_output(request) - } -} - -impl SemanticState { - fn record_capture(&mut self, mut capture: CommandCapture) -> bool { - let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else { - tracing::debug!( - command_bytes = capture.command.len(), - prompt_bytes = capture.prompt.len(), - output_bytes = capture.output.len(), - output_truncated = capture.output_truncated, - "dropping semantic command capture without history id" - ); - return false; - }; - - let history = take_pending_history(&mut self.pending_histories, &history_id); - let Some(session_id) = capture - .session_id - .as_deref() - .and_then(|session_id| SessionId::try_from(session_id).ok()) - .or_else(|| { - history - .as_ref() - .and_then(|history| SessionId::try_from(history.session.as_str()).ok()) - }) - else { - tracing::debug!( - history_id = %history_id, - command_bytes = capture.command.len(), - prompt_bytes = capture.prompt.len(), - output_bytes = capture.output.len(), - output_truncated = capture.output_truncated, - "dropping semantic command capture without session id" - ); - return false; - }; - - capture.history_id = Some(history_id.to_string()); - capture.session_id = Some(session_id.to_string()); - if capture.output_observed_bytes == 0 { - capture.output_observed_bytes = capture.output.len() as u64; - } - - let record = SemanticCommandRecord { capture, history }; - log_record(&record, "recorded semantic command capture"); - self.push_record(session_id, history_id, record); - true - } - - fn record_history(&mut self, history: History) { - let history_id = history.id.clone(); - - if let Some(capture_ref) = self.history_index.get(&history_id).cloned() { - if let Some(stored) = self.stored_capture_mut(&capture_ref) { - stored.record.history = Some(history); - log_record( - &stored.record, - "associated semantic command capture with history", - ); - return; - } - - self.history_index.remove(&history_id); - } - - tracing::debug!( - id = %history.id, - command_bytes = history.command.len(), - "history ended before semantic capture arrived" - ); - push_pending_history(&mut self.pending_histories, history); - } - - fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply { - let Some(history_id) = history_id_from_str(Some(&request.history_id)) else { - return command_output_not_found(); - }; - let Some(capture_ref) = self.history_index.get(&history_id).cloned() else { - return command_output_not_found(); - }; - - let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else { - self.history_index.remove(&history_id); - return command_output_not_found(); - }; - - self.touch_session(&capture_ref.session_id); - reply - } - - fn command_output_for_ref( - &self, - capture_ref: &CaptureRef, - ranges: &[crate::semantic::OutputRange], - ) -> Option<CommandOutputReply> { - let stored = self - .sessions - .get(&capture_ref.session_id)? - .stored_capture(capture_ref.capture_id)?; - let output = &stored.record.capture.output; - let output_observed_bytes = stored - .record - .capture - .output_observed_bytes - .max(output.len() as u64); - - Some(CommandOutputReply { - found: true, - output: String::new(), - total_bytes: output.len() as u64, - total_lines: output.lines().count() as u64, - lines: select_output_ranges(output, ranges), - output_truncated: stored.record.capture.output_truncated, - output_observed_bytes, - }) - } - - fn push_record( - &mut self, - session_id: SessionId, - history_id: HistoryId, - record: SemanticCommandRecord, - ) { - self.touch_session(&session_id); - - let (capture_id, evicted) = { - let session = self.sessions.entry(session_id.clone()).or_default(); - session.push(history_id.clone(), record) - }; - - let capture_ref = CaptureRef { - session_id: session_id.clone(), - capture_id, - }; - self.history_index.insert(history_id, capture_ref); - - for evicted in evicted { - self.remove_history_index_if_matches( - &session_id, - &evicted.history_id, - evicted.capture_id, - ); - } - - self.expire_lru_sessions(); - } - - fn touch_session(&mut self, session_id: &SessionId) { - if let Some(index) = self.session_lru.iter().position(|id| id == session_id) { - self.session_lru.remove(index); - } - self.session_lru.push_back(session_id.clone()); - } - - fn expire_lru_sessions(&mut self) { - while self.session_lru.len() > MAX_SESSIONS { - let Some(session_id) = self.session_lru.pop_front() else { - break; - }; - let Some(session) = self.sessions.remove(&session_id) else { - continue; - }; - - for stored in session.records { - self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id); - } - } - } - - fn remove_history_index_if_matches( - &mut self, - session_id: &SessionId, - history_id: &HistoryId, - capture_id: CaptureId, - ) { - if self - .history_index - .get(history_id) - .is_some_and(|capture_ref| { - &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id - }) - { - self.history_index.remove(history_id); - } - } - - fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> { - self.sessions - .get_mut(&capture_ref.session_id)? - .stored_capture_mut(capture_ref.capture_id) - } - - fn record_count(&self) -> usize { - self.sessions - .values() - .map(|session| session.records.len()) - .sum() - } -} - -impl SessionCaptures { - fn push( - &mut self, - history_id: HistoryId, - record: SemanticCommandRecord, - ) -> (CaptureId, Vec<EvictedCapture>) { - self.push_with_limits( - history_id, - record, - MAX_COMMANDS_PER_SESSION, - MAX_BYTES_PER_SESSION, - ) - } - - fn push_with_limits( - &mut self, - history_id: HistoryId, - record: SemanticCommandRecord, - max_commands: usize, - max_output_bytes: usize, - ) -> (CaptureId, Vec<EvictedCapture>) { - let capture_id = CaptureId(self.next_id); - self.next_id = self.next_id.saturating_add(1); - let output_bytes = record.capture.output.len(); - self.output_bytes = self.output_bytes.saturating_add(output_bytes); - self.records.push_back(StoredCapture { - id: capture_id, - history_id, - output_bytes, - record, - }); - - ( - capture_id, - self.evict_to_limits(max_commands, max_output_bytes), - ) - } - - fn evict_to_limits( - &mut self, - max_commands: usize, - max_output_bytes: usize, - ) -> Vec<EvictedCapture> { - let mut evicted = Vec::new(); - while self.records.len() > max_commands || self.output_bytes > max_output_bytes { - let Some(record) = self.records.pop_front() else { - break; - }; - self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes); - evicted.push(EvictedCapture { - history_id: record.history_id, - capture_id: record.id, - }); - } - evicted - } - - fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> { - self.records.iter().find(|record| record.id == capture_id) - } - - fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> { - self.records - .iter_mut() - .find(|record| record.id == capture_id) - } -} - -impl TryFrom<&str> for SessionId { - type Error = (); - - fn try_from(value: &str) -> std::result::Result<Self, Self::Error> { - let value = value.trim(); - if value.is_empty() { - return Err(()); - } - - Ok(Self(value.to_string())) - } -} - -impl TryFrom<String> for SessionId { - type Error = (); - - fn try_from(value: String) -> std::result::Result<Self, Self::Error> { - Self::try_from(value.as_str()) - } -} - -impl AsRef<str> for SessionId { - fn as_ref(&self) -> &str { - &self.0 - } -} - -impl Display for SessionId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } -} - -pub struct SemanticGrpcService { - inner: Arc<SemanticComponentInner>, -} - -#[tonic::async_trait] -impl SemanticSvc for SemanticGrpcService { - #[instrument(skip_all, level = Level::INFO)] - async fn record_commands( - &self, - request: Request<Streaming<CommandCapture>>, - ) -> Result<Response<RecordCommandsReply>, Status> { - let mut stream = request.into_inner(); - let mut accepted = 0_u64; - - while let Some(capture) = stream.message().await? { - if self.inner.record_capture(capture).await { - accepted += 1; - } - } - - Ok(Response::new(RecordCommandsReply { accepted })) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn command_output( - &self, - request: Request<CommandOutputRequest>, - ) -> Result<Response<CommandOutputReply>, Status> { - let request = request.into_inner(); - if request.history_id.trim().is_empty() { - return Err(Status::invalid_argument("history_id is required")); - } - - Ok(Response::new(self.inner.command_output(&request).await)) - } -} - -fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> { - let value = value?.trim(); - (!value.is_empty()).then(|| HistoryId(value.to_string())) -} - -fn take_pending_history( - histories: &mut VecDeque<History>, - history_id: &HistoryId, -) -> Option<History> { - let index = histories - .iter() - .position(|history| &history.id == history_id)?; - histories.remove(index) -} - -fn push_pending_history(histories: &mut VecDeque<History>, history: History) { - if let Some(index) = histories - .iter() - .position(|pending| pending.id == history.id) - { - histories.remove(index); - } - - histories.push_back(history); - trim_front(histories, MAX_PENDING_HISTORIES); -} - -fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) { - while records.len() > max_len { - records.pop_front(); - } -} - -fn command_output_not_found() -> CommandOutputReply { - CommandOutputReply { - found: false, - output: String::new(), - total_bytes: 0, - total_lines: 0, - lines: Vec::new(), - output_truncated: false, - output_observed_bytes: 0, - } -} - -fn select_output_ranges(output: &str, ranges: &[crate::semantic::OutputRange]) -> Vec<OutputLine> { - let lines: Vec<&str> = output.lines().collect(); - if lines.is_empty() { - return Vec::new(); - } - - let ranges = if ranges.is_empty() { - vec![crate::semantic::OutputRange { start: 0, end: 999 }] - } else { - ranges.to_vec() - }; - - let mut ranges = ranges - .into_iter() - .filter_map(|range| normalize_line_range(range.start, range.end, lines.len())) - .collect::<Vec<_>>(); - ranges.sort_unstable_by_key(|(start, _)| *start); - - let mut merged: Vec<(usize, usize)> = Vec::new(); - for (start, end) in ranges { - match merged.last_mut() { - Some((_, merged_end)) if start <= merged_end.saturating_add(1) => { - *merged_end = (*merged_end).max(end); - } - _ => merged.push((start, end)), - } - } - - merged - .into_iter() - .flat_map(|(start, end)| { - lines[start..=end] - .iter() - .enumerate() - .map(move |(offset, line)| OutputLine { - line_number: (start + offset + 1) as u64, - content: (*line).to_string(), - }) - }) - .collect() -} - -fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> { - let line_count = i64::try_from(line_count).ok()?; - let start = if start < 0 { line_count + start } else { start }; - let end = if end < 0 { line_count + end } else { end }; - - if end < 0 || start >= line_count { - return None; - } - - let start = start.max(0); - let end = end.min(line_count - 1); - - (start <= end).then_some((start as usize, end as usize)) -} - -fn log_record(record: &SemanticCommandRecord, message: &'static str) { - let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>"); - let associated_history_id = record - .history - .as_ref() - .map(|history| history.id.to_string()); - let exit = record.history.as_ref().map(|history| history.exit); - let duration = record.history.as_ref().map(|history| history.duration); - let author = record - .history - .as_ref() - .map(|history| history.author.as_str()); - let session_id = record.capture.session_id.as_deref(); - - tracing::debug!( - history_id = %history_id, - associated_history_id = ?associated_history_id, - session_id = ?session_id, - command_bytes = record.capture.command.len(), - prompt_bytes = record.capture.prompt.len(), - output_bytes = record.capture.output.len(), - output_truncated = record.capture.output_truncated, - output_observed_bytes = record.capture.output_observed_bytes, - capture_exit_code = ?record.capture.exit_code, - history_exit = ?exit, - duration = ?duration, - author = ?author, - "{message}" - ); -} - -#[cfg(test)] -mod tests { - use super::*; - use time::OffsetDateTime; - - fn history(id: &str, session: &str, command: &str) -> History { - History { - id: HistoryId(id.to_string()), - timestamp: OffsetDateTime::UNIX_EPOCH, - duration: 0, - exit: 0, - command: command.to_string(), - cwd: String::new(), - session: session.to_string(), - hostname: String::new(), - author: String::new(), - intent: None, - deleted_at: None, - } - } - - fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture { - CommandCapture { - prompt: String::new(), - command: String::new(), - output: output.to_string(), - exit_code: None, - history_id: history_id.map(str::to_string), - session_id: session_id.map(str::to_string), - output_truncated: false, - output_observed_bytes: output.len() as u64, - } - } - - fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply { - state.command_output(&CommandOutputRequest { - history_id: history_id.to_string(), - ranges: Vec::new(), - }) - } - - fn output_line(line_number: u64, content: &str) -> OutputLine { - OutputLine { - line_number, - content: content.to_string(), - } - } - - #[test] - fn drops_capture_without_history_id() { - let mut state = SemanticState::default(); - - assert!(!state.record_capture(capture(None, Some("session-1"), "output"))); - assert!(!command_output(&mut state, "id-1").found); - assert_eq!(state.record_count(), 0); - } - - #[test] - fn stores_capture_by_session_and_history_id() { - let mut state = SemanticState::default(); - - assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); - - let reply = command_output(&mut state, "id-1"); - assert!(reply.found); - assert_eq!(reply.total_bytes, 6); - assert_eq!(reply.output_observed_bytes, 6); - assert_eq!(reply.lines, vec![output_line(1, "output")]); - } - - #[test] - fn uses_pending_history_session_when_capture_session_is_missing() { - let mut state = SemanticState::default(); - - state.record_history(history("id-1", "session-from-history", "cargo test")); - assert!(state.record_capture(capture(Some("id-1"), None, "output"))); - - assert!( - state - .sessions - .contains_key(&SessionId("session-from-history".to_string())) - ); - assert!(command_output(&mut state, "id-1").found); - } - - #[test] - fn associates_history_by_id_after_capture_arrives() { - let mut state = SemanticState::default(); - - assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); - state.record_history(history("id-1", "session-1", "different command")); - - let capture_ref = state - .history_index - .get(&HistoryId("id-1".to_string())) - .unwrap(); - let stored = state - .sessions - .get(&capture_ref.session_id) - .unwrap() - .stored_capture(capture_ref.capture_id) - .unwrap(); - assert!(stored.record.history.is_some()); - } - - #[test] - fn evicts_oldest_command_when_session_ring_is_full() { - let mut state = SemanticState::default(); - - for index in 0..=MAX_COMMANDS_PER_SESSION { - assert!(state.record_capture(capture( - Some(&format!("id-{index}")), - Some("session-1"), - "output", - ))); - } - - assert!(!command_output(&mut state, "id-0").found); - assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found); - assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION); - } - - #[test] - fn evicts_oldest_session_after_lru_limit() { - let mut state = SemanticState::default(); - - for index in 0..MAX_SESSIONS { - assert!(state.record_capture(capture( - Some(&format!("id-{index}")), - Some(&format!("session-{index}")), - "output", - ))); - } - assert!(command_output(&mut state, "id-0").found); - - assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",))); - - assert!(command_output(&mut state, "id-0").found); - assert!(!command_output(&mut state, "id-1").found); - assert!(command_output(&mut state, "new-id").found); - assert_eq!(state.sessions.len(), MAX_SESSIONS); - } - - #[test] - fn evicts_by_session_byte_limit() { - let mut session = SessionCaptures::default(); - let first_output = "x".repeat(10); - let second_output = "y"; - let (_, evicted_first) = session.push_with_limits( - HistoryId("first".to_string()), - SemanticCommandRecord { - capture: capture(Some("first"), Some("session-1"), &first_output), - history: None, - }, - MAX_COMMANDS_PER_SESSION, - 10, - ); - assert!(evicted_first.is_empty()); - - let (_, evicted_second) = session.push_with_limits( - HistoryId("second".to_string()), - SemanticCommandRecord { - capture: capture(Some("second"), Some("session-1"), second_output), - history: None, - }, - MAX_COMMANDS_PER_SESSION, - 10, - ); - - assert_eq!(evicted_second.len(), 1); - assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string())); - assert_eq!(session.records.len(), 1); - assert_eq!(session.output_bytes, 1); - } - - #[test] - fn command_output_reports_truncation_metadata() { - let mut state = SemanticState::default(); - let mut capture = capture(Some("id-1"), Some("session-1"), "partial"); - capture.output_truncated = true; - capture.output_observed_bytes = 1024; - - assert!(state.record_capture(capture)); - - let reply = command_output(&mut state, "id-1"); - assert!(reply.output_truncated); - assert_eq!(reply.total_bytes, 7); - assert_eq!(reply.output_observed_bytes, 1024); - } - - #[test] - fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() { - let output = "zero\none\ntwo\nthree\nfour"; - let ranges = vec![ - crate::semantic::OutputRange { start: 1, end: 2 }, - crate::semantic::OutputRange { start: -2, end: -1 }, - ]; - - assert_eq!( - select_output_ranges(output, &ranges), - vec![ - output_line(2, "one"), - output_line(3, "two"), - output_line(4, "three"), - output_line(5, "four"), - ] - ); - } - - #[test] - fn output_ranges_merge_overlaps_and_adjacent_ranges() { - let output = (0..100) - .map(|n| format!("line {n}")) - .collect::<Vec<_>>() - .join("\n"); - let ranges = vec![ - crate::semantic::OutputRange { start: 0, end: 100 }, - crate::semantic::OutputRange { - start: -100, - end: -1, - }, - ]; - - let selected = select_output_ranges(&output, &ranges); - - assert_eq!(selected.len(), 100); - assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); - assert_eq!(selected.last(), Some(&output_line(100, "line 99"))); - } - - #[test] - fn output_ranges_can_leave_gaps_for_client_formatting() { - let output = "zero\none\ntwo\nthree\nfour"; - let ranges = vec![ - crate::semantic::OutputRange { start: 0, end: 1 }, - crate::semantic::OutputRange { start: 4, end: 4 }, - ]; - - assert_eq!( - select_output_ranges(output, &ranges), - vec![ - output_line(1, "zero"), - output_line(2, "one"), - output_line(5, "four"), - ] - ); - } - - #[test] - fn empty_output_ranges_default_to_first_thousand_lines() { - let output = (0..1001) - .map(|n| format!("line {n}")) - .collect::<Vec<_>>() - .join("\n"); - - let selected = select_output_ranges(&output, &[]); - - assert_eq!(selected.len(), 1000); - assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); - assert_eq!(selected.last(), Some(&output_line(1000, "line 999"))); - } - - #[test] - fn output_ranges_skip_ranges_fully_outside_output() { - let output = "zero\none\ntwo"; - let ranges = vec![ - crate::semantic::OutputRange { start: 10, end: 20 }, - crate::semantic::OutputRange { - start: -20, - end: -10, - }, - ]; - - assert_eq!(select_output_ranges(output, &ranges), Vec::new()); - } -} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs deleted file mode 100644 index 6e486250..00000000 --- a/crates/atuin-daemon/src/components/sync.rs +++ /dev/null @@ -1,279 +0,0 @@ -//! Sync component. -//! -//! Handles periodic synchronization with the Atuin cloud server. - -use std::time::Duration; - -use eyre::Result; -use rand::Rng; -use tokio::sync::mpsc; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, -}; - -/// Commands that can be sent to the sync task. -enum SyncCommand { - /// Trigger an immediate sync. - ForceSync, - /// Stop the sync loop. - Stop, -} - -/// Sync state - tracks whether we're in normal operation or retrying after failure. -#[derive(Clone, Copy, PartialEq, Eq)] -enum SyncState { - /// Normal operation. Periodic syncs only run if auto_sync is enabled. - Idle, - /// Retrying after a sync failure. Retries continue regardless of auto_sync - /// until the sync succeeds. - Retrying, -} - -/// Sync component - handles periodic cloud synchronization. -/// -/// This component: -/// - Runs a background sync loop on a configurable interval -/// - Implements exponential backoff on sync failures -/// - Responds to ForceSync events for immediate sync -/// - Emits SyncCompleted/SyncFailed events -pub struct SyncComponent { - task_handle: Option<tokio::task::JoinHandle<()>>, - command_tx: Option<mpsc::Sender<SyncCommand>>, -} - -impl SyncComponent { - /// Create a new sync component. - pub fn new() -> Self { - Self { - task_handle: None, - command_tx: None, - } - } -} - -impl Default for SyncComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SyncComponent { - fn name(&self) -> &'static str { - "sync" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - let (cmd_tx, cmd_rx) = mpsc::channel(16); - self.command_tx = Some(cmd_tx); - - // Spawn the sync loop with its own copy of the handle - self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); - - tracing::info!("sync component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::ForceSync = event { - tracing::info!("force sync requested"); - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::ForceSync).await; - } - } - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::Stop).await; - } - if let Some(handle) = self.task_handle.take() { - // Give the task a moment to shut down gracefully - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; - } - tracing::info!("sync component stopped"); - Ok(()) - } -} - -/// The main sync loop. -/// -/// This runs in a spawned task and handles periodic sync as well as -/// force sync requests. -async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { - tracing::info!("sync loop starting"); - - // Clone settings since we need them across await points - let settings = handle.settings().await.clone(); - let host_id = match Settings::host_id().await { - Ok(id) => id, - Err(e) => { - tracing::error!("failed to get host id, sync disabled: {e}"); - return; - } - }; - - // Create the stores we need - let encryption_key = *handle.encryption_key(); - let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut sync_state = SyncState::Idle; - - loop { - tokio::select! { - _ = ticker.tick() => { - let settings = handle.settings().await; - - // Skip periodic ticks if auto_sync is disabled AND we're not retrying - // a previous failure. Retries must continue regardless of auto_sync. - if !settings.auto_sync && sync_state == SyncState::Idle { - tracing::debug!("auto_sync disabled, skipping periodic sync tick"); - continue; - } - - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - cmd = cmd_rx.recv() => { - match cmd { - Some(SyncCommand::ForceSync) => { - tracing::info!("executing force sync"); - let settings = handle.settings().await; - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - Some(SyncCommand::Stop) | None => { - tracing::info!("sync loop stopping"); - break; - } - } - } - } - } -} - -/// Execute a single sync tick. -/// -/// Returns the new sync state: `Idle` on success, `Retrying` on failure. -async fn do_sync_tick( - handle: &DaemonHandle, - history_store: &HistoryStore, - ticker: &mut time::Interval, - max_interval: f64, - settings: &Settings, -) -> SyncState { - tracing::info!("sync tick"); - - // Check if logged in - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - return SyncState::Idle; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - return SyncState::Idle; - } - - // Perform the sync - let res = sync::sync(settings, handle.store(), handle.encryption_key()).await; - - match res { - Err(e) => { - tracing::error!("sync tick failed with {e}"); - - // Emit failure event - handle.emit(DaemonEvent::SyncFailed { - error: e.to_string(), - }); - - // Exponential backoff - let mut rng = rand::thread_rng(); - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - *ticker = time::interval_at( - tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), - time::Duration::from_secs(new_interval as u64), - ); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - tracing::error!("backing off, next sync tick in {new_interval}"); - - SyncState::Retrying - } - Ok((uploaded_count, downloaded_records)) => { - tracing::info!( - uploaded = uploaded_count, - downloaded = downloaded_records.len(), - "sync complete" - ); - - // Build history from downloaded records - if let Err(e) = history_store - .incremental_build(handle.history_db(), &downloaded_records) - .await - { - tracing::error!("failed to build history from downloaded records: {e}"); - } - - // Emit the records added event (for search indexing) - handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); - - // Emit sync completed event - handle.emit(DaemonEvent::SyncCompleted { - uploaded: uploaded_count as usize, - downloaded: downloaded_records.len(), - }); - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - *ticker = time::interval_at( - tokio::time::Instant::now() - + Duration::from_secs(settings.daemon.sync_frequency), - time::Duration::from_secs(settings.daemon.sync_frequency), - ); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - } - - // Store sync time - if let Err(e) = Settings::save_sync_time().await { - tracing::error!("failed to save sync time: {e}"); - } - - SyncState::Idle - } - } -} |
