aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/components
diff options
context:
space:
mode:
Diffstat (limited to 'crates/atuin-daemon/src/components')
-rw-r--r--crates/atuin-daemon/src/components/history.rs327
-rw-r--r--crates/atuin-daemon/src/components/mod.rs25
-rw-r--r--crates/atuin-daemon/src/components/search.rs413
-rw-r--r--crates/atuin-daemon/src/components/semantic.rs900
-rw-r--r--crates/atuin-daemon/src/components/sync.rs279
5 files changed, 0 insertions, 1944 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
deleted file mode 100644
index c82c8f94..00000000
--- a/crates/atuin-daemon/src/components/history.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! History component.
-//!
-//! Handles command history lifecycle (start/end) and provides the History gRPC service.
-
-use std::{pin::Pin, sync::Arc};
-
-use atuin_client::{
- database::Database,
- history::{History, HistoryId, store::HistoryStore},
- settings::Settings,
-};
-use dashmap::DashMap;
-use eyre::Result;
-use time::OffsetDateTime;
-use tokio_stream::Stream;
-use tonic::{Request, Response, Status};
-use tracing::{Level, instrument};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- history::{
- EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
- ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
- TailHistoryReply, TailHistoryRequest,
- history_server::{History as HistorySvc, HistoryServer},
- },
-};
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-/// History component - manages command history lifecycle.
-///
-/// This component:
-/// - Tracks currently running commands (stored in memory)
-/// - Saves completed commands to the database and record store
-/// - Emits history events for other components (e.g., search indexing)
-/// - Provides the History gRPC service
-pub struct HistoryComponent {
- inner: Arc<HistoryComponentInner>,
-}
-
-struct HistoryComponentInner {
- /// Commands currently running (not yet completed).
- running: DashMap<HistoryId, History>,
-
- /// Handle to the daemon (set during start).
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
-
- /// History store for pushing records (set during start).
- history_store: tokio::sync::RwLock<Option<HistoryStore>>,
-}
-
-impl HistoryComponent {
- /// Create a new history component.
- pub fn new() -> Self {
- Self {
- inner: Arc::new(HistoryComponentInner {
- running: DashMap::new(),
- handle: tokio::sync::RwLock::new(None),
- history_store: tokio::sync::RwLock::new(None),
- }),
- }
- }
-
- /// Get the gRPC service for this component.
- ///
- /// This returns a tonic service that can be added to a gRPC server.
- pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
- HistoryServer::new(HistoryGrpcService {
- inner: self.inner.clone(),
- })
- }
-}
-
-impl Default for HistoryComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for HistoryComponent {
- fn name(&self) -> &'static str {
- "history"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- // Create the history store
- let host_id = Settings::host_id().await?;
- let history_store =
- HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
-
- *self.inner.history_store.write().await = Some(history_store);
- *self.inner.handle.write().await = Some(handle);
-
- tracing::info!("history component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
- // History component produces events but doesn't need to react to them
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- tracing::info!("history component stopped");
- Ok(())
- }
-}
-
-/// The gRPC service implementation.
-///
-/// This is a thin wrapper that delegates to the component's shared state.
-pub struct HistoryGrpcService {
- inner: Arc<HistoryComponentInner>,
-}
-
-fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
- TailHistoryReply {
- kind: kind as i32,
- history: Some(HistoryEntry {
- timestamp: history.timestamp.unix_timestamp_nanos() as u64,
- id: history.id.0,
- command: history.command,
- cwd: history.cwd,
- session: history.session,
- hostname: history.hostname,
- author: history.author,
- intent: history.intent.unwrap_or_default(),
- exit: history.exit,
- duration: history.duration,
- }),
- }
-}
-
-#[tonic::async_trait]
-impl HistorySvc for HistoryGrpcService {
- type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .author(req.author)
- .intent(req.intent)
- .build()
- .into();
-
- // Emit the event
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.emit(DaemonEvent::HistoryStarted(h.clone()));
- }
-
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- self.inner.running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let req = request.into_inner();
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = self.inner.running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Get the handle and store to save the history
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let store_guard = self.inner.history_store.read().await;
- let history_store = store_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- // Save to database
- handle
- .history_db()
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- // Push to record store
- let (record_id, idx) = history_store
- .push(history.clone())
- .await
- .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
-
- // Emit the event
- handle.emit(DaemonEvent::HistoryEnded(history));
-
- let reply = EndHistoryReply {
- id: record_id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn tail_history(
- &self,
- _request: Request<TailHistoryRequest>,
- ) -> Result<Response<Self::TailHistoryStream>, Status> {
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .cloned()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let mut rx = handle.subscribe();
- let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
-
- tokio::spawn(async move {
- loop {
- let event = match rx.recv().await {
- Ok(event) => event,
- Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
- let _ = tx
- .send(Err(Status::resource_exhausted(format!(
- "tail stream lagged behind and dropped {skipped} events"
- ))))
- .await;
- break;
- }
- Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
- };
-
- let reply = match event {
- DaemonEvent::HistoryStarted(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Started, history))
- }
- DaemonEvent::HistoryEnded(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Ended, history))
- }
- _ => None,
- };
-
- if let Some(reply) = reply
- && tx.send(Ok(reply)).await.is_err()
- {
- break;
- }
- }
- });
-
- let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
- Ok(Response::new(Box::pin(stream)))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- // Use the daemon handle to request shutdown
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.shutdown();
- }
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}
diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs
deleted file mode 100644
index 447e31df..00000000
--- a/crates/atuin-daemon/src/components/mod.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-//! Daemon components.
-//!
-//! Components are the building blocks of the daemon. Each component handles
-//! a specific domain and can:
-//!
-//! - Expose gRPC services
-//! - React to events
-//! - Spawn background tasks
-//!
-//! Available components:
-//!
-//! - [`history::HistoryComponent`]: Command history lifecycle management
-//! - [`search::SearchComponent`]: Fuzzy search over history
-//! - [`semantic::SemanticComponent`]: In-memory semantic command captures
-//! - [`sync::SyncComponent`]: Cloud sync
-
-pub mod history;
-pub mod search;
-pub mod semantic;
-pub mod sync;
-
-pub use history::HistoryComponent;
-pub use search::SearchComponent;
-pub use semantic::SemanticComponent;
-pub use sync::SyncComponent;
diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs
deleted file mode 100644
index 9fc87fae..00000000
--- a/crates/atuin-daemon/src/components/search.rs
+++ /dev/null
@@ -1,413 +0,0 @@
-//! Search component.
-//!
-//! Provides fuzzy search over command history using the Nucleo search library
-//! with frecency-based ranking and dynamic filtering.
-
-use std::{pin::Pin, sync::Arc};
-
-use atuin_client::database::Database;
-use eyre::Result;
-use tokio::sync::RwLock;
-use tokio_stream::Stream;
-use tonic::{Request, Response, Status, Streaming};
-use tracing::{Level, debug, info, instrument, span, trace};
-use uuid::Uuid;
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- search::{
- FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
- search_server::{Search as SearchSvc, SearchServer},
- },
-};
-
-const PAGE_SIZE: usize = 5000;
-const RESULTS_LIMIT: u32 = 200;
-/// How often to rebuild the frecency map (in seconds).
-const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
-
-/// Search component - provides fuzzy search over command history.
-///
-/// This component:
-/// - Maintains a deduplicated search index with frecency ranking
-/// - Loads history from the database on startup
-/// - Updates the index when history events occur
-/// - Provides the Search gRPC service
-pub struct SearchComponent {
- index: Arc<RwLock<SearchIndex>>,
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
- loader_handle: Option<tokio::task::JoinHandle<()>>,
- frecency_handle: Option<tokio::task::JoinHandle<()>>,
-}
-
-impl SearchComponent {
- /// Create a new search component.
- pub fn new() -> Self {
- Self {
- index: Arc::new(RwLock::new(SearchIndex::new())),
- handle: tokio::sync::RwLock::new(None),
- loader_handle: None,
- frecency_handle: None,
- }
- }
-
- /// Get the gRPC service for this component.
- pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
- SearchServer::new(SearchGrpcService {
- index: self.index.clone(),
- })
- }
-
- /// Rebuild the entire search index from the database.
- async fn rebuild_index(&self) -> Result<()> {
- let handle_guard = self.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .ok_or_else(|| eyre::eyre!("component not initialized"))?;
-
- info!("Rebuilding search index from database");
-
- // Create a new index
- let new_index = SearchIndex::new();
-
- // Load all history into the new index
- let db = handle.history_db().clone();
- let mut pager = db.all_paged(PAGE_SIZE, false, true);
- loop {
- match pager.next().await {
- Ok(Some(histories)) => {
- info!(
- "Loading {} history entries into search index",
- histories.len()
- );
- new_index.add_histories(&histories);
- }
- Ok(None) => break,
- Err(e) => {
- tracing::error!("Failed to load history during rebuild: {}", e);
- break;
- }
- }
- }
-
- info!(
- "Search index rebuild complete; {} unique commands",
- new_index.command_count()
- );
-
- // Replace the old index with the new one
- *self.index.write().await = new_index;
- Ok(())
- }
-}
-
-impl Default for SearchComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SearchComponent {
- fn name(&self) -> &'static str {
- "search"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- *self.handle.write().await = Some(handle.clone());
-
- // Spawn background task to load history into index
- let index = self.index.clone();
- let db = handle.history_db().clone();
- let handle_for_loader = handle.clone();
-
- self.loader_handle = Some(tokio::spawn(async move {
- info!(
- "Loading history into search index; page size = {}",
- PAGE_SIZE
- );
- let mut pager = db.all_paged(PAGE_SIZE, false, true);
- loop {
- match pager.next().await {
- Ok(Some(histories)) => {
- info!(
- "Loading {} history entries into search index",
- histories.len()
- );
- index.read().await.add_histories(&histories);
- }
- Ok(None) => {
- info!(
- "Initial history load complete; {} unique commands indexed",
- index.read().await.command_count()
- );
- // Build initial frecency map with current settings
- let settings = handle_for_loader.settings().await;
- index.read().await.rebuild_frecency(&settings.search).await;
- info!("Initial frecency map built");
- break;
- }
- Err(e) => {
- tracing::error!("Failed to load history: {}", e);
- break;
- }
- }
- }
- }));
-
- // Spawn background task to periodically refresh frecency
- let index_for_frecency = self.index.clone();
- let handle_for_frecency = handle.clone();
- self.frecency_handle = Some(tokio::spawn(async move {
- let mut interval = tokio::time::interval(std::time::Duration::from_secs(
- FRECENCY_REFRESH_INTERVAL_SECS,
- ));
- loop {
- interval.tick().await;
- trace!("Refreshing frecency map");
- let settings = handle_for_frecency.settings().await;
- index_for_frecency
- .read()
- .await
- .rebuild_frecency(&settings.search)
- .await;
- }
- }));
-
- tracing::info!("search component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- match event {
- DaemonEvent::RecordsAdded(records) => {
- debug!(
- count = records.len(),
- "Processing added records for search index"
- );
-
- let handle_guard = self.handle.read().await;
- if let Some(handle) = handle_guard.as_ref() {
- let histories: Vec<_> = handle
- .history_db()
- .query_history(
- format!(
- "select * from history where id in ({})",
- records
- .iter()
- .map(|record| record.0.to_string())
- .collect::<Vec<_>>()
- .join(",")
- )
- .as_str(),
- )
- .await
- .unwrap_or_default();
-
- span!(Level::TRACE, "inject_records", count = histories.len())
- .in_scope(async || {
- self.index.read().await.add_histories(&histories);
- })
- .await;
- }
- }
- DaemonEvent::HistoryStarted(history) => {
- debug!(id = %history.id, command = %history.command, "History started (no index action)");
- }
- DaemonEvent::HistoryEnded(history) => {
- span!(Level::TRACE, "inject_history_ended")
- .in_scope(async || {
- self.index.read().await.add_history(history);
- })
- .await;
- }
- DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
- info!("History store pruned or rebuilt, rebuilding search index");
- if let Err(e) = self.rebuild_index().await {
- tracing::error!("Failed to rebuild search index: {}", e);
- }
- }
- DaemonEvent::HistoryDeleted { ids } => {
- info!(
- count = ids.len(),
- "History deleted, rebuilding search index"
- );
- // For now, just rebuild the entire index. A more efficient implementation
- // would remove specific items from the index.
- if let Err(e) = self.rebuild_index().await {
- tracing::error!("Failed to rebuild search index: {}", e);
- }
- }
- DaemonEvent::SettingsReloaded => {
- info!("Settings reloaded, rebuilding frecency map with new multipliers");
- let handle_guard = self.handle.read().await;
- if let Some(handle) = handle_guard.as_ref() {
- let settings = handle.settings().await;
- self.index
- .read()
- .await
- .rebuild_frecency(&settings.search)
- .await;
- }
- }
- // Events we don't care about
- DaemonEvent::SyncCompleted { .. }
- | DaemonEvent::SyncFailed { .. }
- | DaemonEvent::ForceSync
- | DaemonEvent::ShutdownRequested => {}
- }
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- if let Some(handle) = self.loader_handle.take() {
- handle.abort();
- }
- if let Some(handle) = self.frecency_handle.take() {
- handle.abort();
- }
- tracing::info!("search component stopped");
- Ok(())
- }
-}
-
-/// The gRPC service implementation.
-pub struct SearchGrpcService {
- index: Arc<RwLock<SearchIndex>>,
-}
-
-#[tonic::async_trait]
-impl SearchSvc for SearchGrpcService {
- type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
-
- #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
- async fn search(
- &self,
- request: Request<Streaming<SearchRequest>>,
- ) -> Result<Response<Self::SearchStream>, Status> {
- let mut in_stream = request.into_inner();
- let index = self.index.clone();
-
- // Create output channel
- let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
-
- // Spawn task to handle incoming requests and send responses
- tokio::spawn(async move {
- while let Some(req) = in_stream.message().await.transpose() {
- match req {
- Ok(search_req) => {
- let query = search_req.query;
- let query_id = search_req.query_id;
- let filter_mode: FilterMode = search_req
- .filter_mode
- .try_into()
- .unwrap_or(FilterMode::Global);
- let proto_context = search_req.context;
-
- debug!(
- "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
- query,
- query_id,
- filter_mode.as_str_name(),
- proto_context
- );
-
- // Convert proto FilterMode + context to IndexFilterMode
- let index_filter = convert_filter_mode(filter_mode, &proto_context);
-
- // Build QueryContext from proto context
- let query_context = proto_context
- .map(|ctx| QueryContext {
- cwd: Some(with_trailing_slash(&ctx.cwd)),
- git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
- hostname: Some(ctx.hostname),
- session_id: Some(ctx.session_id),
- })
- .unwrap_or_default();
-
- // Perform the search
- let history_ids =
- span!(Level::TRACE, "daemon_search_query", %query, query_id)
- .in_scope(|| async {
- let index = index.read().await;
- index
- .search(&query, index_filter, &query_context, RESULTS_LIMIT)
- .await
- })
- .await;
-
- // Convert history IDs to bytes
- let ids: Vec<Vec<u8>> = history_ids
- .iter()
- .filter_map(|id| {
- Uuid::parse_str(id)
- .ok()
- .map(|uuid| uuid.as_bytes().to_vec())
- })
- .collect();
-
- if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
- break; // Client disconnected
- }
- }
- Err(e) => {
- let _ = tx.send(Err(e)).await;
- break;
- }
- }
- }
- });
-
- // Convert receiver to stream
- let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream)))
- }
-}
-
-/// Convert proto FilterMode and context to IndexFilterMode.
-fn convert_filter_mode(
- mode: FilterMode,
- context: &Option<crate::search::SearchContext>,
-) -> IndexFilterMode {
- match (mode, context) {
- (FilterMode::Global, _) => IndexFilterMode::Global,
- (FilterMode::Directory, Some(ctx)) => {
- IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
- }
- (FilterMode::Workspace, Some(ctx)) => {
- if let Some(ref git_root) = ctx.git_root {
- IndexFilterMode::Workspace(with_trailing_slash(git_root))
- } else {
- // Fall back to directory if no git root
- IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
- }
- }
- (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
- (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
- (FilterMode::SessionPreload, Some(ctx)) => {
- // SessionPreload is similar to Session - filter by session
- IndexFilterMode::Session(ctx.session_id.clone())
- }
- // If no context provided, fall back to global
- _ => IndexFilterMode::Global,
- }
-}
-
-#[cfg(windows)]
-pub fn with_trailing_slash(s: &str) -> String {
- if s.ends_with('\\') {
- s.to_string()
- } else {
- format!("{}\\", s)
- }
-}
-
-#[cfg(not(windows))]
-pub fn with_trailing_slash(s: &str) -> String {
- if s.ends_with('/') {
- s.to_string()
- } else {
- format!("{}/", s)
- }
-}
diff --git a/crates/atuin-daemon/src/components/semantic.rs b/crates/atuin-daemon/src/components/semantic.rs
deleted file mode 100644
index dff38fd3..00000000
--- a/crates/atuin-daemon/src/components/semantic.rs
+++ /dev/null
@@ -1,900 +0,0 @@
-//! Semantic command capture component.
-//!
-//! This is a prototype in-memory store for completed command captures emitted
-//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes
-//! them by history ID for AI tool lookup.
-
-use std::collections::{HashMap, VecDeque};
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-use atuin_client::history::{History, HistoryId};
-use eyre::Result;
-use tokio::sync::Mutex;
-use tonic::{Request, Response, Status, Streaming};
-use tracing::{Level, instrument};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- semantic::{
- CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply,
- semantic_server::{Semantic as SemanticSvc, SemanticServer},
- },
-};
-
-const MAX_SESSIONS: usize = 20;
-const MAX_COMMANDS_PER_SESSION: usize = 128;
-const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024;
-const MAX_PENDING_HISTORIES: usize = 128;
-
-/// Stores completed command captures and associates them with history events.
-pub struct SemanticComponent {
- inner: Arc<SemanticComponentInner>,
-}
-
-struct SemanticComponentInner {
- state: Mutex<SemanticState>,
-}
-
-#[derive(Default)]
-struct SemanticState {
- sessions: HashMap<SessionId, SessionCaptures>,
- session_lru: VecDeque<SessionId>,
- history_index: HashMap<HistoryId, CaptureRef>,
- pending_histories: VecDeque<History>,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-struct SessionId(String);
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-struct CaptureId(u64);
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-struct CaptureRef {
- session_id: SessionId,
- capture_id: CaptureId,
-}
-
-#[derive(Default)]
-struct SessionCaptures {
- next_id: u64,
- records: VecDeque<StoredCapture>,
- output_bytes: usize,
-}
-
-struct StoredCapture {
- id: CaptureId,
- history_id: HistoryId,
- output_bytes: usize,
- record: SemanticCommandRecord,
-}
-
-struct EvictedCapture {
- history_id: HistoryId,
- capture_id: CaptureId,
-}
-
-#[derive(Debug, Clone)]
-struct SemanticCommandRecord {
- capture: CommandCapture,
- history: Option<History>,
-}
-
-impl SemanticComponent {
- pub fn new() -> Self {
- Self {
- inner: Arc::new(SemanticComponentInner {
- state: Mutex::new(SemanticState::default()),
- }),
- }
- }
-
- pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> {
- SemanticServer::new(SemanticGrpcService {
- inner: self.inner.clone(),
- })
- }
-}
-
-impl Default for SemanticComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SemanticComponent {
- fn name(&self) -> &'static str {
- "semantic"
- }
-
- async fn start(&mut self, _handle: DaemonHandle) -> Result<()> {
- tracing::info!("semantic component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- if let DaemonEvent::HistoryEnded(history) = event {
- self.inner.record_history(history.clone()).await;
- }
-
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- let state = self.inner.state.lock().await;
- tracing::info!(
- sessions = state.sessions.len(),
- records = state.record_count(),
- indexed_histories = state.history_index.len(),
- pending_histories = state.pending_histories.len(),
- "semantic component stopped"
- );
- Ok(())
- }
-}
-
-impl SemanticComponentInner {
- async fn record_capture(&self, capture: CommandCapture) -> bool {
- let mut state = self.state.lock().await;
- state.record_capture(capture)
- }
-
- async fn record_history(&self, history: History) {
- let mut state = self.state.lock().await;
- state.record_history(history);
- }
-
- async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply {
- let mut state = self.state.lock().await;
- state.command_output(request)
- }
-}
-
-impl SemanticState {
- fn record_capture(&mut self, mut capture: CommandCapture) -> bool {
- let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else {
- tracing::debug!(
- command_bytes = capture.command.len(),
- prompt_bytes = capture.prompt.len(),
- output_bytes = capture.output.len(),
- output_truncated = capture.output_truncated,
- "dropping semantic command capture without history id"
- );
- return false;
- };
-
- let history = take_pending_history(&mut self.pending_histories, &history_id);
- let Some(session_id) = capture
- .session_id
- .as_deref()
- .and_then(|session_id| SessionId::try_from(session_id).ok())
- .or_else(|| {
- history
- .as_ref()
- .and_then(|history| SessionId::try_from(history.session.as_str()).ok())
- })
- else {
- tracing::debug!(
- history_id = %history_id,
- command_bytes = capture.command.len(),
- prompt_bytes = capture.prompt.len(),
- output_bytes = capture.output.len(),
- output_truncated = capture.output_truncated,
- "dropping semantic command capture without session id"
- );
- return false;
- };
-
- capture.history_id = Some(history_id.to_string());
- capture.session_id = Some(session_id.to_string());
- if capture.output_observed_bytes == 0 {
- capture.output_observed_bytes = capture.output.len() as u64;
- }
-
- let record = SemanticCommandRecord { capture, history };
- log_record(&record, "recorded semantic command capture");
- self.push_record(session_id, history_id, record);
- true
- }
-
- fn record_history(&mut self, history: History) {
- let history_id = history.id.clone();
-
- if let Some(capture_ref) = self.history_index.get(&history_id).cloned() {
- if let Some(stored) = self.stored_capture_mut(&capture_ref) {
- stored.record.history = Some(history);
- log_record(
- &stored.record,
- "associated semantic command capture with history",
- );
- return;
- }
-
- self.history_index.remove(&history_id);
- }
-
- tracing::debug!(
- id = %history.id,
- command_bytes = history.command.len(),
- "history ended before semantic capture arrived"
- );
- push_pending_history(&mut self.pending_histories, history);
- }
-
- fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply {
- let Some(history_id) = history_id_from_str(Some(&request.history_id)) else {
- return command_output_not_found();
- };
- let Some(capture_ref) = self.history_index.get(&history_id).cloned() else {
- return command_output_not_found();
- };
-
- let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else {
- self.history_index.remove(&history_id);
- return command_output_not_found();
- };
-
- self.touch_session(&capture_ref.session_id);
- reply
- }
-
- fn command_output_for_ref(
- &self,
- capture_ref: &CaptureRef,
- ranges: &[crate::semantic::OutputRange],
- ) -> Option<CommandOutputReply> {
- let stored = self
- .sessions
- .get(&capture_ref.session_id)?
- .stored_capture(capture_ref.capture_id)?;
- let output = &stored.record.capture.output;
- let output_observed_bytes = stored
- .record
- .capture
- .output_observed_bytes
- .max(output.len() as u64);
-
- Some(CommandOutputReply {
- found: true,
- output: String::new(),
- total_bytes: output.len() as u64,
- total_lines: output.lines().count() as u64,
- lines: select_output_ranges(output, ranges),
- output_truncated: stored.record.capture.output_truncated,
- output_observed_bytes,
- })
- }
-
- fn push_record(
- &mut self,
- session_id: SessionId,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- ) {
- self.touch_session(&session_id);
-
- let (capture_id, evicted) = {
- let session = self.sessions.entry(session_id.clone()).or_default();
- session.push(history_id.clone(), record)
- };
-
- let capture_ref = CaptureRef {
- session_id: session_id.clone(),
- capture_id,
- };
- self.history_index.insert(history_id, capture_ref);
-
- for evicted in evicted {
- self.remove_history_index_if_matches(
- &session_id,
- &evicted.history_id,
- evicted.capture_id,
- );
- }
-
- self.expire_lru_sessions();
- }
-
- fn touch_session(&mut self, session_id: &SessionId) {
- if let Some(index) = self.session_lru.iter().position(|id| id == session_id) {
- self.session_lru.remove(index);
- }
- self.session_lru.push_back(session_id.clone());
- }
-
- fn expire_lru_sessions(&mut self) {
- while self.session_lru.len() > MAX_SESSIONS {
- let Some(session_id) = self.session_lru.pop_front() else {
- break;
- };
- let Some(session) = self.sessions.remove(&session_id) else {
- continue;
- };
-
- for stored in session.records {
- self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id);
- }
- }
- }
-
- fn remove_history_index_if_matches(
- &mut self,
- session_id: &SessionId,
- history_id: &HistoryId,
- capture_id: CaptureId,
- ) {
- if self
- .history_index
- .get(history_id)
- .is_some_and(|capture_ref| {
- &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id
- })
- {
- self.history_index.remove(history_id);
- }
- }
-
- fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> {
- self.sessions
- .get_mut(&capture_ref.session_id)?
- .stored_capture_mut(capture_ref.capture_id)
- }
-
- fn record_count(&self) -> usize {
- self.sessions
- .values()
- .map(|session| session.records.len())
- .sum()
- }
-}
-
-impl SessionCaptures {
- fn push(
- &mut self,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- ) -> (CaptureId, Vec<EvictedCapture>) {
- self.push_with_limits(
- history_id,
- record,
- MAX_COMMANDS_PER_SESSION,
- MAX_BYTES_PER_SESSION,
- )
- }
-
- fn push_with_limits(
- &mut self,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- max_commands: usize,
- max_output_bytes: usize,
- ) -> (CaptureId, Vec<EvictedCapture>) {
- let capture_id = CaptureId(self.next_id);
- self.next_id = self.next_id.saturating_add(1);
- let output_bytes = record.capture.output.len();
- self.output_bytes = self.output_bytes.saturating_add(output_bytes);
- self.records.push_back(StoredCapture {
- id: capture_id,
- history_id,
- output_bytes,
- record,
- });
-
- (
- capture_id,
- self.evict_to_limits(max_commands, max_output_bytes),
- )
- }
-
- fn evict_to_limits(
- &mut self,
- max_commands: usize,
- max_output_bytes: usize,
- ) -> Vec<EvictedCapture> {
- let mut evicted = Vec::new();
- while self.records.len() > max_commands || self.output_bytes > max_output_bytes {
- let Some(record) = self.records.pop_front() else {
- break;
- };
- self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes);
- evicted.push(EvictedCapture {
- history_id: record.history_id,
- capture_id: record.id,
- });
- }
- evicted
- }
-
- fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> {
- self.records.iter().find(|record| record.id == capture_id)
- }
-
- fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> {
- self.records
- .iter_mut()
- .find(|record| record.id == capture_id)
- }
-}
-
-impl TryFrom<&str> for SessionId {
- type Error = ();
-
- fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
- let value = value.trim();
- if value.is_empty() {
- return Err(());
- }
-
- Ok(Self(value.to_string()))
- }
-}
-
-impl TryFrom<String> for SessionId {
- type Error = ();
-
- fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
- Self::try_from(value.as_str())
- }
-}
-
-impl AsRef<str> for SessionId {
- fn as_ref(&self) -> &str {
- &self.0
- }
-}
-
-impl Display for SessionId {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(&self.0)
- }
-}
-
-pub struct SemanticGrpcService {
- inner: Arc<SemanticComponentInner>,
-}
-
-#[tonic::async_trait]
-impl SemanticSvc for SemanticGrpcService {
- #[instrument(skip_all, level = Level::INFO)]
- async fn record_commands(
- &self,
- request: Request<Streaming<CommandCapture>>,
- ) -> Result<Response<RecordCommandsReply>, Status> {
- let mut stream = request.into_inner();
- let mut accepted = 0_u64;
-
- while let Some(capture) = stream.message().await? {
- if self.inner.record_capture(capture).await {
- accepted += 1;
- }
- }
-
- Ok(Response::new(RecordCommandsReply { accepted }))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn command_output(
- &self,
- request: Request<CommandOutputRequest>,
- ) -> Result<Response<CommandOutputReply>, Status> {
- let request = request.into_inner();
- if request.history_id.trim().is_empty() {
- return Err(Status::invalid_argument("history_id is required"));
- }
-
- Ok(Response::new(self.inner.command_output(&request).await))
- }
-}
-
-fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> {
- let value = value?.trim();
- (!value.is_empty()).then(|| HistoryId(value.to_string()))
-}
-
-fn take_pending_history(
- histories: &mut VecDeque<History>,
- history_id: &HistoryId,
-) -> Option<History> {
- let index = histories
- .iter()
- .position(|history| &history.id == history_id)?;
- histories.remove(index)
-}
-
-fn push_pending_history(histories: &mut VecDeque<History>, history: History) {
- if let Some(index) = histories
- .iter()
- .position(|pending| pending.id == history.id)
- {
- histories.remove(index);
- }
-
- histories.push_back(history);
- trim_front(histories, MAX_PENDING_HISTORIES);
-}
-
-fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) {
- while records.len() > max_len {
- records.pop_front();
- }
-}
-
-fn command_output_not_found() -> CommandOutputReply {
- CommandOutputReply {
- found: false,
- output: String::new(),
- total_bytes: 0,
- total_lines: 0,
- lines: Vec::new(),
- output_truncated: false,
- output_observed_bytes: 0,
- }
-}
-
-fn select_output_ranges(output: &str, ranges: &[crate::semantic::OutputRange]) -> Vec<OutputLine> {
- let lines: Vec<&str> = output.lines().collect();
- if lines.is_empty() {
- return Vec::new();
- }
-
- let ranges = if ranges.is_empty() {
- vec![crate::semantic::OutputRange { start: 0, end: 999 }]
- } else {
- ranges.to_vec()
- };
-
- let mut ranges = ranges
- .into_iter()
- .filter_map(|range| normalize_line_range(range.start, range.end, lines.len()))
- .collect::<Vec<_>>();
- ranges.sort_unstable_by_key(|(start, _)| *start);
-
- let mut merged: Vec<(usize, usize)> = Vec::new();
- for (start, end) in ranges {
- match merged.last_mut() {
- Some((_, merged_end)) if start <= merged_end.saturating_add(1) => {
- *merged_end = (*merged_end).max(end);
- }
- _ => merged.push((start, end)),
- }
- }
-
- merged
- .into_iter()
- .flat_map(|(start, end)| {
- lines[start..=end]
- .iter()
- .enumerate()
- .map(move |(offset, line)| OutputLine {
- line_number: (start + offset + 1) as u64,
- content: (*line).to_string(),
- })
- })
- .collect()
-}
-
-fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> {
- let line_count = i64::try_from(line_count).ok()?;
- let start = if start < 0 { line_count + start } else { start };
- let end = if end < 0 { line_count + end } else { end };
-
- if end < 0 || start >= line_count {
- return None;
- }
-
- let start = start.max(0);
- let end = end.min(line_count - 1);
-
- (start <= end).then_some((start as usize, end as usize))
-}
-
-fn log_record(record: &SemanticCommandRecord, message: &'static str) {
- let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>");
- let associated_history_id = record
- .history
- .as_ref()
- .map(|history| history.id.to_string());
- let exit = record.history.as_ref().map(|history| history.exit);
- let duration = record.history.as_ref().map(|history| history.duration);
- let author = record
- .history
- .as_ref()
- .map(|history| history.author.as_str());
- let session_id = record.capture.session_id.as_deref();
-
- tracing::debug!(
- history_id = %history_id,
- associated_history_id = ?associated_history_id,
- session_id = ?session_id,
- command_bytes = record.capture.command.len(),
- prompt_bytes = record.capture.prompt.len(),
- output_bytes = record.capture.output.len(),
- output_truncated = record.capture.output_truncated,
- output_observed_bytes = record.capture.output_observed_bytes,
- capture_exit_code = ?record.capture.exit_code,
- history_exit = ?exit,
- duration = ?duration,
- author = ?author,
- "{message}"
- );
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use time::OffsetDateTime;
-
- fn history(id: &str, session: &str, command: &str) -> History {
- History {
- id: HistoryId(id.to_string()),
- timestamp: OffsetDateTime::UNIX_EPOCH,
- duration: 0,
- exit: 0,
- command: command.to_string(),
- cwd: String::new(),
- session: session.to_string(),
- hostname: String::new(),
- author: String::new(),
- intent: None,
- deleted_at: None,
- }
- }
-
- fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture {
- CommandCapture {
- prompt: String::new(),
- command: String::new(),
- output: output.to_string(),
- exit_code: None,
- history_id: history_id.map(str::to_string),
- session_id: session_id.map(str::to_string),
- output_truncated: false,
- output_observed_bytes: output.len() as u64,
- }
- }
-
- fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply {
- state.command_output(&CommandOutputRequest {
- history_id: history_id.to_string(),
- ranges: Vec::new(),
- })
- }
-
- fn output_line(line_number: u64, content: &str) -> OutputLine {
- OutputLine {
- line_number,
- content: content.to_string(),
- }
- }
-
- #[test]
- fn drops_capture_without_history_id() {
- let mut state = SemanticState::default();
-
- assert!(!state.record_capture(capture(None, Some("session-1"), "output")));
- assert!(!command_output(&mut state, "id-1").found);
- assert_eq!(state.record_count(), 0);
- }
-
- #[test]
- fn stores_capture_by_session_and_history_id() {
- let mut state = SemanticState::default();
-
- assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
-
- let reply = command_output(&mut state, "id-1");
- assert!(reply.found);
- assert_eq!(reply.total_bytes, 6);
- assert_eq!(reply.output_observed_bytes, 6);
- assert_eq!(reply.lines, vec![output_line(1, "output")]);
- }
-
- #[test]
- fn uses_pending_history_session_when_capture_session_is_missing() {
- let mut state = SemanticState::default();
-
- state.record_history(history("id-1", "session-from-history", "cargo test"));
- assert!(state.record_capture(capture(Some("id-1"), None, "output")));
-
- assert!(
- state
- .sessions
- .contains_key(&SessionId("session-from-history".to_string()))
- );
- assert!(command_output(&mut state, "id-1").found);
- }
-
- #[test]
- fn associates_history_by_id_after_capture_arrives() {
- let mut state = SemanticState::default();
-
- assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
- state.record_history(history("id-1", "session-1", "different command"));
-
- let capture_ref = state
- .history_index
- .get(&HistoryId("id-1".to_string()))
- .unwrap();
- let stored = state
- .sessions
- .get(&capture_ref.session_id)
- .unwrap()
- .stored_capture(capture_ref.capture_id)
- .unwrap();
- assert!(stored.record.history.is_some());
- }
-
- #[test]
- fn evicts_oldest_command_when_session_ring_is_full() {
- let mut state = SemanticState::default();
-
- for index in 0..=MAX_COMMANDS_PER_SESSION {
- assert!(state.record_capture(capture(
- Some(&format!("id-{index}")),
- Some("session-1"),
- "output",
- )));
- }
-
- assert!(!command_output(&mut state, "id-0").found);
- assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found);
- assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION);
- }
-
- #[test]
- fn evicts_oldest_session_after_lru_limit() {
- let mut state = SemanticState::default();
-
- for index in 0..MAX_SESSIONS {
- assert!(state.record_capture(capture(
- Some(&format!("id-{index}")),
- Some(&format!("session-{index}")),
- "output",
- )));
- }
- assert!(command_output(&mut state, "id-0").found);
-
- assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",)));
-
- assert!(command_output(&mut state, "id-0").found);
- assert!(!command_output(&mut state, "id-1").found);
- assert!(command_output(&mut state, "new-id").found);
- assert_eq!(state.sessions.len(), MAX_SESSIONS);
- }
-
- #[test]
- fn evicts_by_session_byte_limit() {
- let mut session = SessionCaptures::default();
- let first_output = "x".repeat(10);
- let second_output = "y";
- let (_, evicted_first) = session.push_with_limits(
- HistoryId("first".to_string()),
- SemanticCommandRecord {
- capture: capture(Some("first"), Some("session-1"), &first_output),
- history: None,
- },
- MAX_COMMANDS_PER_SESSION,
- 10,
- );
- assert!(evicted_first.is_empty());
-
- let (_, evicted_second) = session.push_with_limits(
- HistoryId("second".to_string()),
- SemanticCommandRecord {
- capture: capture(Some("second"), Some("session-1"), second_output),
- history: None,
- },
- MAX_COMMANDS_PER_SESSION,
- 10,
- );
-
- assert_eq!(evicted_second.len(), 1);
- assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string()));
- assert_eq!(session.records.len(), 1);
- assert_eq!(session.output_bytes, 1);
- }
-
- #[test]
- fn command_output_reports_truncation_metadata() {
- let mut state = SemanticState::default();
- let mut capture = capture(Some("id-1"), Some("session-1"), "partial");
- capture.output_truncated = true;
- capture.output_observed_bytes = 1024;
-
- assert!(state.record_capture(capture));
-
- let reply = command_output(&mut state, "id-1");
- assert!(reply.output_truncated);
- assert_eq!(reply.total_bytes, 7);
- assert_eq!(reply.output_observed_bytes, 1024);
- }
-
- #[test]
- fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() {
- let output = "zero\none\ntwo\nthree\nfour";
- let ranges = vec![
- crate::semantic::OutputRange { start: 1, end: 2 },
- crate::semantic::OutputRange { start: -2, end: -1 },
- ];
-
- assert_eq!(
- select_output_ranges(output, &ranges),
- vec![
- output_line(2, "one"),
- output_line(3, "two"),
- output_line(4, "three"),
- output_line(5, "four"),
- ]
- );
- }
-
- #[test]
- fn output_ranges_merge_overlaps_and_adjacent_ranges() {
- let output = (0..100)
- .map(|n| format!("line {n}"))
- .collect::<Vec<_>>()
- .join("\n");
- let ranges = vec![
- crate::semantic::OutputRange { start: 0, end: 100 },
- crate::semantic::OutputRange {
- start: -100,
- end: -1,
- },
- ];
-
- let selected = select_output_ranges(&output, &ranges);
-
- assert_eq!(selected.len(), 100);
- assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
- assert_eq!(selected.last(), Some(&output_line(100, "line 99")));
- }
-
- #[test]
- fn output_ranges_can_leave_gaps_for_client_formatting() {
- let output = "zero\none\ntwo\nthree\nfour";
- let ranges = vec![
- crate::semantic::OutputRange { start: 0, end: 1 },
- crate::semantic::OutputRange { start: 4, end: 4 },
- ];
-
- assert_eq!(
- select_output_ranges(output, &ranges),
- vec![
- output_line(1, "zero"),
- output_line(2, "one"),
- output_line(5, "four"),
- ]
- );
- }
-
- #[test]
- fn empty_output_ranges_default_to_first_thousand_lines() {
- let output = (0..1001)
- .map(|n| format!("line {n}"))
- .collect::<Vec<_>>()
- .join("\n");
-
- let selected = select_output_ranges(&output, &[]);
-
- assert_eq!(selected.len(), 1000);
- assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
- assert_eq!(selected.last(), Some(&output_line(1000, "line 999")));
- }
-
- #[test]
- fn output_ranges_skip_ranges_fully_outside_output() {
- let output = "zero\none\ntwo";
- let ranges = vec![
- crate::semantic::OutputRange { start: 10, end: 20 },
- crate::semantic::OutputRange {
- start: -20,
- end: -10,
- },
- ];
-
- assert_eq!(select_output_ranges(output, &ranges), Vec::new());
- }
-}
diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs
deleted file mode 100644
index 6e486250..00000000
--- a/crates/atuin-daemon/src/components/sync.rs
+++ /dev/null
@@ -1,279 +0,0 @@
-//! Sync component.
-//!
-//! Handles periodic synchronization with the Atuin cloud server.
-
-use std::time::Duration;
-
-use eyre::Result;
-use rand::Rng;
-use tokio::sync::mpsc;
-use tokio::time::{self, MissedTickBehavior};
-
-use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
-};
-
-/// Commands that can be sent to the sync task.
-enum SyncCommand {
- /// Trigger an immediate sync.
- ForceSync,
- /// Stop the sync loop.
- Stop,
-}
-
-/// Sync state - tracks whether we're in normal operation or retrying after failure.
-#[derive(Clone, Copy, PartialEq, Eq)]
-enum SyncState {
- /// Normal operation. Periodic syncs only run if auto_sync is enabled.
- Idle,
- /// Retrying after a sync failure. Retries continue regardless of auto_sync
- /// until the sync succeeds.
- Retrying,
-}
-
-/// Sync component - handles periodic cloud synchronization.
-///
-/// This component:
-/// - Runs a background sync loop on a configurable interval
-/// - Implements exponential backoff on sync failures
-/// - Responds to ForceSync events for immediate sync
-/// - Emits SyncCompleted/SyncFailed events
-pub struct SyncComponent {
- task_handle: Option<tokio::task::JoinHandle<()>>,
- command_tx: Option<mpsc::Sender<SyncCommand>>,
-}
-
-impl SyncComponent {
- /// Create a new sync component.
- pub fn new() -> Self {
- Self {
- task_handle: None,
- command_tx: None,
- }
- }
-}
-
-impl Default for SyncComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SyncComponent {
- fn name(&self) -> &'static str {
- "sync"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- let (cmd_tx, cmd_rx) = mpsc::channel(16);
- self.command_tx = Some(cmd_tx);
-
- // Spawn the sync loop with its own copy of the handle
- self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
-
- tracing::info!("sync component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- if let DaemonEvent::ForceSync = event {
- tracing::info!("force sync requested");
- if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::ForceSync).await;
- }
- }
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::Stop).await;
- }
- if let Some(handle) = self.task_handle.take() {
- // Give the task a moment to shut down gracefully
- let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
- }
- tracing::info!("sync component stopped");
- Ok(())
- }
-}
-
-/// The main sync loop.
-///
-/// This runs in a spawned task and handles periodic sync as well as
-/// force sync requests.
-async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
- tracing::info!("sync loop starting");
-
- // Clone settings since we need them across await points
- let settings = handle.settings().await.clone();
- let host_id = match Settings::host_id().await {
- Ok(id) => id,
- Err(e) => {
- tracing::error!("failed to get host id, sync disabled: {e}");
- return;
- }
- };
-
- // Create the stores we need
- let encryption_key = *handle.encryption_key();
- let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
-
- // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
- let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
-
- let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
-
- // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
- // we may end up running a lot of syncs in a hot loop.
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- let mut sync_state = SyncState::Idle;
-
- loop {
- tokio::select! {
- _ = ticker.tick() => {
- let settings = handle.settings().await;
-
- // Skip periodic ticks if auto_sync is disabled AND we're not retrying
- // a previous failure. Retries must continue regardless of auto_sync.
- if !settings.auto_sync && sync_state == SyncState::Idle {
- tracing::debug!("auto_sync disabled, skipping periodic sync tick");
- continue;
- }
-
- sync_state = do_sync_tick(
- &handle,
- &history_store,
- &mut ticker,
- max_interval,
- &settings,
- ).await;
- }
- cmd = cmd_rx.recv() => {
- match cmd {
- Some(SyncCommand::ForceSync) => {
- tracing::info!("executing force sync");
- let settings = handle.settings().await;
- sync_state = do_sync_tick(
- &handle,
- &history_store,
- &mut ticker,
- max_interval,
- &settings,
- ).await;
- }
- Some(SyncCommand::Stop) | None => {
- tracing::info!("sync loop stopping");
- break;
- }
- }
- }
- }
- }
-}
-
-/// Execute a single sync tick.
-///
-/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
-async fn do_sync_tick(
- handle: &DaemonHandle,
- history_store: &HistoryStore,
- ticker: &mut time::Interval,
- max_interval: f64,
- settings: &Settings,
-) -> SyncState {
- tracing::info!("sync tick");
-
- // Check if logged in
- let logged_in = match settings.logged_in().await {
- Ok(v) => v,
- Err(e) => {
- tracing::warn!("failed to check login status, skipping sync tick: {e}");
- return SyncState::Idle;
- }
- };
-
- if !logged_in {
- tracing::debug!("not logged in, skipping sync tick");
- return SyncState::Idle;
- }
-
- // Perform the sync
- let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
-
- match res {
- Err(e) => {
- tracing::error!("sync tick failed with {e}");
-
- // Emit failure event
- handle.emit(DaemonEvent::SyncFailed {
- error: e.to_string(),
- });
-
- // Exponential backoff
- let mut rng = rand::thread_rng();
- let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
-
- if new_interval > max_interval {
- new_interval = max_interval;
- }
-
- *ticker = time::interval_at(
- tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
- time::Duration::from_secs(new_interval as u64),
- );
- ticker.reset_after(time::Duration::from_secs(new_interval as u64));
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- tracing::error!("backing off, next sync tick in {new_interval}");
-
- SyncState::Retrying
- }
- Ok((uploaded_count, downloaded_records)) => {
- tracing::info!(
- uploaded = uploaded_count,
- downloaded = downloaded_records.len(),
- "sync complete"
- );
-
- // Build history from downloaded records
- if let Err(e) = history_store
- .incremental_build(handle.history_db(), &downloaded_records)
- .await
- {
- tracing::error!("failed to build history from downloaded records: {e}");
- }
-
- // Emit the records added event (for search indexing)
- handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
-
- // Emit sync completed event
- handle.emit(DaemonEvent::SyncCompleted {
- uploaded: uploaded_count as usize,
- downloaded: downloaded_records.len(),
- });
-
- // Reset backoff on success
- if ticker.period().as_secs() != settings.daemon.sync_frequency {
- *ticker = time::interval_at(
- tokio::time::Instant::now()
- + Duration::from_secs(settings.daemon.sync_frequency),
- time::Duration::from_secs(settings.daemon.sync_frequency),
- );
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
- }
-
- // Store sync time
- if let Err(e) = Settings::save_sync_time().await {
- tracing::error!("failed to save sync time: {e}");
- }
-
- SyncState::Idle
- }
- }
-}