aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_daemon/components
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/turtle/src/atuin_daemon/components
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/turtle/src/atuin_daemon/components')
-rw-r--r--crates/turtle/src/atuin_daemon/components/history.rs327
-rw-r--r--crates/turtle/src/atuin_daemon/components/mod.rs25
-rw-r--r--crates/turtle/src/atuin_daemon/components/search.rs413
-rw-r--r--crates/turtle/src/atuin_daemon/components/semantic.rs903
-rw-r--r--crates/turtle/src/atuin_daemon/components/sync.rs279
5 files changed, 1947 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_daemon/components/history.rs b/crates/turtle/src/atuin_daemon/components/history.rs
new file mode 100644
index 00000000..95d34b69
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/history.rs
@@ -0,0 +1,327 @@
+//! History component.
+//!
+//! Handles command history lifecycle (start/end) and provides the History gRPC service.
+
+use std::{pin::Pin, sync::Arc};
+
+use crate::atuin_client::{
+ database::Database,
+ history::{History, HistoryId, store::HistoryStore},
+ settings::Settings,
+};
+use dashmap::DashMap;
+use eyre::Result;
+use time::OffsetDateTime;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status};
+use tracing::{Level, instrument};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ history::{
+ EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
+ ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
+ TailHistoryReply, TailHistoryRequest,
+ history_server::{History as HistorySvc, HistoryServer},
+ },
+};
+
+const DAEMON_PROTOCOL_VERSION: u32 = 1;
+
+/// History component - manages command history lifecycle.
+///
+/// This component:
+/// - Tracks currently running commands (stored in memory)
+/// - Saves completed commands to the database and record store
+/// - Emits history events for other components (e.g., search indexing)
+/// - Provides the History gRPC service
+pub struct HistoryComponent {
+ inner: Arc<HistoryComponentInner>,
+}
+
+struct HistoryComponentInner {
+ /// Commands currently running (not yet completed).
+ running: DashMap<HistoryId, History>,
+
+ /// Handle to the daemon (set during start).
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+
+ /// History store for pushing records (set during start).
+ history_store: tokio::sync::RwLock<Option<HistoryStore>>,
+}
+
+impl HistoryComponent {
+ /// Create a new history component.
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(HistoryComponentInner {
+ running: DashMap::new(),
+ handle: tokio::sync::RwLock::new(None),
+ history_store: tokio::sync::RwLock::new(None),
+ }),
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ ///
+ /// This returns a tonic service that can be added to a gRPC server.
+ pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
+ HistoryServer::new(HistoryGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for HistoryComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for HistoryComponent {
+ fn name(&self) -> &'static str {
+ "history"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ // Create the history store
+ let host_id = Settings::host_id().await?;
+ let history_store =
+ HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
+
+ *self.inner.history_store.write().await = Some(history_store);
+ *self.inner.handle.write().await = Some(handle);
+
+ tracing::info!("history component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
+ // History component produces events but doesn't need to react to them
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ tracing::info!("history component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+///
+/// This is a thin wrapper that delegates to the component's shared state.
+pub struct HistoryGrpcService {
+ inner: Arc<HistoryComponentInner>,
+}
+
+fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
+ TailHistoryReply {
+ kind: kind as i32,
+ history: Some(HistoryEntry {
+ timestamp: history.timestamp.unix_timestamp_nanos() as u64,
+ id: history.id.0,
+ command: history.command,
+ cwd: history.cwd,
+ session: history.session,
+ hostname: history.hostname,
+ author: history.author,
+ intent: history.intent.unwrap_or_default(),
+ exit: history.exit,
+ duration: history.duration,
+ }),
+ }
+}
+
+#[tonic::async_trait]
+impl HistorySvc for HistoryGrpcService {
+ type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .author(req.author)
+ .intent(req.intent)
+ .build()
+ .into();
+
+ // Emit the event
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.emit(DaemonEvent::HistoryStarted(h.clone()));
+ }
+
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ self.inner.running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply {
+ id: id.to_string(),
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let req = request.into_inner();
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = self.inner.running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Get the handle and store to save the history
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let store_guard = self.inner.history_store.read().await;
+ let history_store = store_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ // Save to database
+ handle
+ .history_db()
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ // Push to record store
+ let (record_id, idx) = history_store
+ .push(history.clone())
+ .await
+ .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
+
+ // Emit the event
+ handle.emit(DaemonEvent::HistoryEnded(history));
+
+ let reply = EndHistoryReply {
+ id: record_id.0.to_string(),
+ idx,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn tail_history(
+ &self,
+ _request: Request<TailHistoryRequest>,
+ ) -> Result<Response<Self::TailHistoryStream>, Status> {
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .cloned()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let mut rx = handle.subscribe();
+ let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
+
+ tokio::spawn(async move {
+ loop {
+ let event = match rx.recv().await {
+ Ok(event) => event,
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ let _ = tx
+ .send(Err(Status::resource_exhausted(format!(
+ "tail stream lagged behind and dropped {skipped} events"
+ ))))
+ .await;
+ break;
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
+ };
+
+ let reply = match event {
+ DaemonEvent::HistoryStarted(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Started, history))
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Ended, history))
+ }
+ _ => None,
+ };
+
+ if let Some(reply) = reply
+ && tx.send(Ok(reply)).await.is_err()
+ {
+ break;
+ }
+ }
+ });
+
+ let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
+ Ok(Response::new(Box::pin(stream)))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn status(
+ &self,
+ _request: Request<StatusRequest>,
+ ) -> Result<Response<StatusReply>, Status> {
+ let reply = StatusReply {
+ healthy: true,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ pid: std::process::id(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn shutdown(
+ &self,
+ _request: Request<ShutdownRequest>,
+ ) -> Result<Response<ShutdownReply>, Status> {
+ // Use the daemon handle to request shutdown
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.shutdown();
+ }
+ Ok(Response::new(ShutdownReply { accepted: true }))
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/mod.rs b/crates/turtle/src/atuin_daemon/components/mod.rs
new file mode 100644
index 00000000..447e31df
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/mod.rs
@@ -0,0 +1,25 @@
+//! Daemon components.
+//!
+//! Components are the building blocks of the daemon. Each component handles
+//! a specific domain and can:
+//!
+//! - Expose gRPC services
+//! - React to events
+//! - Spawn background tasks
+//!
+//! Available components:
+//!
+//! - [`history::HistoryComponent`]: Command history lifecycle management
+//! - [`search::SearchComponent`]: Fuzzy search over history
+//! - [`semantic::SemanticComponent`]: In-memory semantic command captures
+//! - [`sync::SyncComponent`]: Cloud sync
+
+pub mod history;
+pub mod search;
+pub mod semantic;
+pub mod sync;
+
+pub use history::HistoryComponent;
+pub use search::SearchComponent;
+pub use semantic::SemanticComponent;
+pub use sync::SyncComponent;
diff --git a/crates/turtle/src/atuin_daemon/components/search.rs b/crates/turtle/src/atuin_daemon/components/search.rs
new file mode 100644
index 00000000..85191cff
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/search.rs
@@ -0,0 +1,413 @@
+//! Search component.
+//!
+//! Provides fuzzy search over command history using the Nucleo search library
+//! with frecency-based ranking and dynamic filtering.
+
+use std::{pin::Pin, sync::Arc};
+
+use crate::atuin_client::database::Database;
+use eyre::Result;
+use tokio::sync::RwLock;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, debug, info, instrument, span, trace};
+use uuid::Uuid;
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ search::{
+ FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
+ search_server::{Search as SearchSvc, SearchServer},
+ },
+};
+
+const PAGE_SIZE: usize = 5000;
+const RESULTS_LIMIT: u32 = 200;
+/// How often to rebuild the frecency map (in seconds).
+const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
+
+/// Search component - provides fuzzy search over command history.
+///
+/// This component:
+/// - Maintains a deduplicated search index with frecency ranking
+/// - Loads history from the database on startup
+/// - Updates the index when history events occur
+/// - Provides the Search gRPC service
+pub struct SearchComponent {
+ index: Arc<RwLock<SearchIndex>>,
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+ loader_handle: Option<tokio::task::JoinHandle<()>>,
+ frecency_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl SearchComponent {
+ /// Create a new search component.
+ pub fn new() -> Self {
+ Self {
+ index: Arc::new(RwLock::new(SearchIndex::new())),
+ handle: tokio::sync::RwLock::new(None),
+ loader_handle: None,
+ frecency_handle: None,
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
+ SearchServer::new(SearchGrpcService {
+ index: self.index.clone(),
+ })
+ }
+
+ /// Rebuild the entire search index from the database.
+ async fn rebuild_index(&self) -> Result<()> {
+ let handle_guard = self.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| eyre::eyre!("component not initialized"))?;
+
+ info!("Rebuilding search index from database");
+
+ // Create a new index
+ let new_index = SearchIndex::new();
+
+ // Load all history into the new index
+ let db = handle.history_db().clone();
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ new_index.add_histories(&histories);
+ }
+ Ok(None) => break,
+ Err(e) => {
+ tracing::error!("Failed to load history during rebuild: {}", e);
+ break;
+ }
+ }
+ }
+
+ info!(
+ "Search index rebuild complete; {} unique commands",
+ new_index.command_count()
+ );
+
+ // Replace the old index with the new one
+ *self.index.write().await = new_index;
+ Ok(())
+ }
+}
+
+impl Default for SearchComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SearchComponent {
+ fn name(&self) -> &'static str {
+ "search"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ *self.handle.write().await = Some(handle.clone());
+
+ // Spawn background task to load history into index
+ let index = self.index.clone();
+ let db = handle.history_db().clone();
+ let handle_for_loader = handle.clone();
+
+ self.loader_handle = Some(tokio::spawn(async move {
+ info!(
+ "Loading history into search index; page size = {}",
+ PAGE_SIZE
+ );
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ index.read().await.add_histories(&histories);
+ }
+ Ok(None) => {
+ info!(
+ "Initial history load complete; {} unique commands indexed",
+ index.read().await.command_count()
+ );
+ // Build initial frecency map with current settings
+ let settings = handle_for_loader.settings().await;
+ index.read().await.rebuild_frecency(&settings.search).await;
+ info!("Initial frecency map built");
+ break;
+ }
+ Err(e) => {
+ tracing::error!("Failed to load history: {}", e);
+ break;
+ }
+ }
+ }
+ }));
+
+ // Spawn background task to periodically refresh frecency
+ let index_for_frecency = self.index.clone();
+ let handle_for_frecency = handle.clone();
+ self.frecency_handle = Some(tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(
+ FRECENCY_REFRESH_INTERVAL_SECS,
+ ));
+ loop {
+ interval.tick().await;
+ trace!("Refreshing frecency map");
+ let settings = handle_for_frecency.settings().await;
+ index_for_frecency
+ .read()
+ .await
+ .rebuild_frecency(&settings.search)
+ .await;
+ }
+ }));
+
+ tracing::info!("search component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ match event {
+ DaemonEvent::RecordsAdded(records) => {
+ debug!(
+ count = records.len(),
+ "Processing added records for search index"
+ );
+
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let histories: Vec<_> = handle
+ .history_db()
+ .query_history(
+ format!(
+ "select * from history where id in ({})",
+ records
+ .iter()
+ .map(|record| record.0.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ )
+ .as_str(),
+ )
+ .await
+ .unwrap_or_default();
+
+ span!(Level::TRACE, "inject_records", count = histories.len())
+ .in_scope(async || {
+ self.index.read().await.add_histories(&histories);
+ })
+ .await;
+ }
+ }
+ DaemonEvent::HistoryStarted(history) => {
+ debug!(id = %history.id, command = %history.command, "History started (no index action)");
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ span!(Level::TRACE, "inject_history_ended")
+ .in_scope(async || {
+ self.index.read().await.add_history(history);
+ })
+ .await;
+ }
+ DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
+ info!("History store pruned or rebuilt, rebuilding search index");
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::HistoryDeleted { ids } => {
+ info!(
+ count = ids.len(),
+ "History deleted, rebuilding search index"
+ );
+ // For now, just rebuild the entire index. A more efficient implementation
+ // would remove specific items from the index.
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::SettingsReloaded => {
+ info!("Settings reloaded, rebuilding frecency map with new multipliers");
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let settings = handle.settings().await;
+ self.index
+ .read()
+ .await
+ .rebuild_frecency(&settings.search)
+ .await;
+ }
+ }
+ // Events we don't care about
+ DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. }
+ | DaemonEvent::ForceSync
+ | DaemonEvent::ShutdownRequested => {}
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(handle) = self.loader_handle.take() {
+ handle.abort();
+ }
+ if let Some(handle) = self.frecency_handle.take() {
+ handle.abort();
+ }
+ tracing::info!("search component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+pub struct SearchGrpcService {
+ index: Arc<RwLock<SearchIndex>>,
+}
+
+#[tonic::async_trait]
+impl SearchSvc for SearchGrpcService {
+ type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
+ async fn search(
+ &self,
+ request: Request<Streaming<SearchRequest>>,
+ ) -> Result<Response<Self::SearchStream>, Status> {
+ let mut in_stream = request.into_inner();
+ let index = self.index.clone();
+
+ // Create output channel
+ let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
+
+ // Spawn task to handle incoming requests and send responses
+ tokio::spawn(async move {
+ while let Some(req) = in_stream.message().await.transpose() {
+ match req {
+ Ok(search_req) => {
+ let query = search_req.query;
+ let query_id = search_req.query_id;
+ let filter_mode: FilterMode = search_req
+ .filter_mode
+ .try_into()
+ .unwrap_or(FilterMode::Global);
+ let proto_context = search_req.context;
+
+ debug!(
+ "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
+ query,
+ query_id,
+ filter_mode.as_str_name(),
+ proto_context
+ );
+
+ // Convert proto FilterMode + context to IndexFilterMode
+ let index_filter = convert_filter_mode(filter_mode, &proto_context);
+
+ // Build QueryContext from proto context
+ let query_context = proto_context
+ .map(|ctx| QueryContext {
+ cwd: Some(with_trailing_slash(&ctx.cwd)),
+ git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
+ hostname: Some(ctx.hostname),
+ session_id: Some(ctx.session_id),
+ })
+ .unwrap_or_default();
+
+ // Perform the search
+ let history_ids =
+ span!(Level::TRACE, "daemon_search_query", %query, query_id)
+ .in_scope(|| async {
+ let index = index.read().await;
+ index
+ .search(&query, index_filter, &query_context, RESULTS_LIMIT)
+ .await
+ })
+ .await;
+
+ // Convert history IDs to bytes
+ let ids: Vec<Vec<u8>> = history_ids
+ .iter()
+ .filter_map(|id| {
+ Uuid::parse_str(id)
+ .ok()
+ .map(|uuid| uuid.as_bytes().to_vec())
+ })
+ .collect();
+
+ if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
+ break; // Client disconnected
+ }
+ }
+ Err(e) => {
+ let _ = tx.send(Err(e)).await;
+ break;
+ }
+ }
+ }
+ });
+
+ // Convert receiver to stream
+ let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream)))
+ }
+}
+
+/// Convert proto FilterMode and context to IndexFilterMode.
+fn convert_filter_mode(
+ mode: FilterMode,
+ context: &Option<crate::atuin_daemon::search::SearchContext>,
+) -> IndexFilterMode {
+ match (mode, context) {
+ (FilterMode::Global, _) => IndexFilterMode::Global,
+ (FilterMode::Directory, Some(ctx)) => {
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ (FilterMode::Workspace, Some(ctx)) => {
+ if let Some(ref git_root) = ctx.git_root {
+ IndexFilterMode::Workspace(with_trailing_slash(git_root))
+ } else {
+ // Fall back to directory if no git root
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ }
+ (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
+ (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
+ (FilterMode::SessionPreload, Some(ctx)) => {
+ // SessionPreload is similar to Session - filter by session
+ IndexFilterMode::Session(ctx.session_id.clone())
+ }
+ // If no context provided, fall back to global
+ _ => IndexFilterMode::Global,
+ }
+}
+
+#[cfg(windows)]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('\\') {
+ s.to_string()
+ } else {
+ format!("{}\\", s)
+ }
+}
+
+#[cfg(not(windows))]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('/') {
+ s.to_string()
+ } else {
+ format!("{}/", s)
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/semantic.rs b/crates/turtle/src/atuin_daemon/components/semantic.rs
new file mode 100644
index 00000000..a42fd5cb
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/semantic.rs
@@ -0,0 +1,903 @@
+//! Semantic command capture component.
+//!
+//! This is a prototype in-memory store for completed command captures emitted
+//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes
+//! them by history ID for AI tool lookup.
+
+use std::collections::{HashMap, VecDeque};
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+
+use crate::atuin_client::history::{History, HistoryId};
+use eyre::Result;
+use tokio::sync::Mutex;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, instrument};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ semantic::{
+ CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply,
+ semantic_server::{Semantic as SemanticSvc, SemanticServer},
+ },
+};
+
+const MAX_SESSIONS: usize = 20;
+const MAX_COMMANDS_PER_SESSION: usize = 128;
+const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024;
+const MAX_PENDING_HISTORIES: usize = 128;
+
+/// Stores completed command captures and associates them with history events.
+pub struct SemanticComponent {
+ inner: Arc<SemanticComponentInner>,
+}
+
+struct SemanticComponentInner {
+ state: Mutex<SemanticState>,
+}
+
+#[derive(Default)]
+struct SemanticState {
+ sessions: HashMap<SessionId, SessionCaptures>,
+ session_lru: VecDeque<SessionId>,
+ history_index: HashMap<HistoryId, CaptureRef>,
+ pending_histories: VecDeque<History>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+struct SessionId(String);
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+struct CaptureId(u64);
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct CaptureRef {
+ session_id: SessionId,
+ capture_id: CaptureId,
+}
+
+#[derive(Default)]
+struct SessionCaptures {
+ next_id: u64,
+ records: VecDeque<StoredCapture>,
+ output_bytes: usize,
+}
+
+struct StoredCapture {
+ id: CaptureId,
+ history_id: HistoryId,
+ output_bytes: usize,
+ record: SemanticCommandRecord,
+}
+
+struct EvictedCapture {
+ history_id: HistoryId,
+ capture_id: CaptureId,
+}
+
+#[derive(Debug, Clone)]
+struct SemanticCommandRecord {
+ capture: CommandCapture,
+ history: Option<History>,
+}
+
+impl SemanticComponent {
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(SemanticComponentInner {
+ state: Mutex::new(SemanticState::default()),
+ }),
+ }
+ }
+
+ pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> {
+ SemanticServer::new(SemanticGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for SemanticComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SemanticComponent {
+ fn name(&self) -> &'static str {
+ "semantic"
+ }
+
+ async fn start(&mut self, _handle: DaemonHandle) -> Result<()> {
+ tracing::info!("semantic component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::HistoryEnded(history) = event {
+ self.inner.record_history(history.clone()).await;
+ }
+
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ let state = self.inner.state.lock().await;
+ tracing::info!(
+ sessions = state.sessions.len(),
+ records = state.record_count(),
+ indexed_histories = state.history_index.len(),
+ pending_histories = state.pending_histories.len(),
+ "semantic component stopped"
+ );
+ Ok(())
+ }
+}
+
+impl SemanticComponentInner {
+ async fn record_capture(&self, capture: CommandCapture) -> bool {
+ let mut state = self.state.lock().await;
+ state.record_capture(capture)
+ }
+
+ async fn record_history(&self, history: History) {
+ let mut state = self.state.lock().await;
+ state.record_history(history);
+ }
+
+ async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply {
+ let mut state = self.state.lock().await;
+ state.command_output(request)
+ }
+}
+
+impl SemanticState {
+ fn record_capture(&mut self, mut capture: CommandCapture) -> bool {
+ let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else {
+ tracing::debug!(
+ command_bytes = capture.command.len(),
+ prompt_bytes = capture.prompt.len(),
+ output_bytes = capture.output.len(),
+ output_truncated = capture.output_truncated,
+ "dropping semantic command capture without history id"
+ );
+ return false;
+ };
+
+ let history = take_pending_history(&mut self.pending_histories, &history_id);
+ let Some(session_id) = capture
+ .session_id
+ .as_deref()
+ .and_then(|session_id| SessionId::try_from(session_id).ok())
+ .or_else(|| {
+ history
+ .as_ref()
+ .and_then(|history| SessionId::try_from(history.session.as_str()).ok())
+ })
+ else {
+ tracing::debug!(
+ history_id = %history_id,
+ command_bytes = capture.command.len(),
+ prompt_bytes = capture.prompt.len(),
+ output_bytes = capture.output.len(),
+ output_truncated = capture.output_truncated,
+ "dropping semantic command capture without session id"
+ );
+ return false;
+ };
+
+ capture.history_id = Some(history_id.to_string());
+ capture.session_id = Some(session_id.to_string());
+ if capture.output_observed_bytes == 0 {
+ capture.output_observed_bytes = capture.output.len() as u64;
+ }
+
+ let record = SemanticCommandRecord { capture, history };
+ log_record(&record, "recorded semantic command capture");
+ self.push_record(session_id, history_id, record);
+ true
+ }
+
+ fn record_history(&mut self, history: History) {
+ let history_id = history.id.clone();
+
+ if let Some(capture_ref) = self.history_index.get(&history_id).cloned() {
+ if let Some(stored) = self.stored_capture_mut(&capture_ref) {
+ stored.record.history = Some(history);
+ log_record(
+ &stored.record,
+ "associated semantic command capture with history",
+ );
+ return;
+ }
+
+ self.history_index.remove(&history_id);
+ }
+
+ tracing::debug!(
+ id = %history.id,
+ command_bytes = history.command.len(),
+ "history ended before semantic capture arrived"
+ );
+ push_pending_history(&mut self.pending_histories, history);
+ }
+
+ fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply {
+ let Some(history_id) = history_id_from_str(Some(&request.history_id)) else {
+ return command_output_not_found();
+ };
+ let Some(capture_ref) = self.history_index.get(&history_id).cloned() else {
+ return command_output_not_found();
+ };
+
+ let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else {
+ self.history_index.remove(&history_id);
+ return command_output_not_found();
+ };
+
+ self.touch_session(&capture_ref.session_id);
+ reply
+ }
+
+ fn command_output_for_ref(
+ &self,
+ capture_ref: &CaptureRef,
+ ranges: &[crate::atuin_daemon::semantic::OutputRange],
+ ) -> Option<CommandOutputReply> {
+ let stored = self
+ .sessions
+ .get(&capture_ref.session_id)?
+ .stored_capture(capture_ref.capture_id)?;
+ let output = &stored.record.capture.output;
+ let output_observed_bytes = stored
+ .record
+ .capture
+ .output_observed_bytes
+ .max(output.len() as u64);
+
+ Some(CommandOutputReply {
+ found: true,
+ output: String::new(),
+ total_bytes: output.len() as u64,
+ total_lines: output.lines().count() as u64,
+ lines: select_output_ranges(output, ranges),
+ output_truncated: stored.record.capture.output_truncated,
+ output_observed_bytes,
+ })
+ }
+
+ fn push_record(
+ &mut self,
+ session_id: SessionId,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ ) {
+ self.touch_session(&session_id);
+
+ let (capture_id, evicted) = {
+ let session = self.sessions.entry(session_id.clone()).or_default();
+ session.push(history_id.clone(), record)
+ };
+
+ let capture_ref = CaptureRef {
+ session_id: session_id.clone(),
+ capture_id,
+ };
+ self.history_index.insert(history_id, capture_ref);
+
+ for evicted in evicted {
+ self.remove_history_index_if_matches(
+ &session_id,
+ &evicted.history_id,
+ evicted.capture_id,
+ );
+ }
+
+ self.expire_lru_sessions();
+ }
+
+ fn touch_session(&mut self, session_id: &SessionId) {
+ if let Some(index) = self.session_lru.iter().position(|id| id == session_id) {
+ self.session_lru.remove(index);
+ }
+ self.session_lru.push_back(session_id.clone());
+ }
+
+ fn expire_lru_sessions(&mut self) {
+ while self.session_lru.len() > MAX_SESSIONS {
+ let Some(session_id) = self.session_lru.pop_front() else {
+ break;
+ };
+ let Some(session) = self.sessions.remove(&session_id) else {
+ continue;
+ };
+
+ for stored in session.records {
+ self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id);
+ }
+ }
+ }
+
+ fn remove_history_index_if_matches(
+ &mut self,
+ session_id: &SessionId,
+ history_id: &HistoryId,
+ capture_id: CaptureId,
+ ) {
+ if self
+ .history_index
+ .get(history_id)
+ .is_some_and(|capture_ref| {
+ &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id
+ })
+ {
+ self.history_index.remove(history_id);
+ }
+ }
+
+ fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> {
+ self.sessions
+ .get_mut(&capture_ref.session_id)?
+ .stored_capture_mut(capture_ref.capture_id)
+ }
+
+ fn record_count(&self) -> usize {
+ self.sessions
+ .values()
+ .map(|session| session.records.len())
+ .sum()
+ }
+}
+
+impl SessionCaptures {
+ fn push(
+ &mut self,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ ) -> (CaptureId, Vec<EvictedCapture>) {
+ self.push_with_limits(
+ history_id,
+ record,
+ MAX_COMMANDS_PER_SESSION,
+ MAX_BYTES_PER_SESSION,
+ )
+ }
+
+ fn push_with_limits(
+ &mut self,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ max_commands: usize,
+ max_output_bytes: usize,
+ ) -> (CaptureId, Vec<EvictedCapture>) {
+ let capture_id = CaptureId(self.next_id);
+ self.next_id = self.next_id.saturating_add(1);
+ let output_bytes = record.capture.output.len();
+ self.output_bytes = self.output_bytes.saturating_add(output_bytes);
+ self.records.push_back(StoredCapture {
+ id: capture_id,
+ history_id,
+ output_bytes,
+ record,
+ });
+
+ (
+ capture_id,
+ self.evict_to_limits(max_commands, max_output_bytes),
+ )
+ }
+
+ fn evict_to_limits(
+ &mut self,
+ max_commands: usize,
+ max_output_bytes: usize,
+ ) -> Vec<EvictedCapture> {
+ let mut evicted = Vec::new();
+ while self.records.len() > max_commands || self.output_bytes > max_output_bytes {
+ let Some(record) = self.records.pop_front() else {
+ break;
+ };
+ self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes);
+ evicted.push(EvictedCapture {
+ history_id: record.history_id,
+ capture_id: record.id,
+ });
+ }
+ evicted
+ }
+
+ fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> {
+ self.records.iter().find(|record| record.id == capture_id)
+ }
+
+ fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> {
+ self.records
+ .iter_mut()
+ .find(|record| record.id == capture_id)
+ }
+}
+
+impl TryFrom<&str> for SessionId {
+ type Error = ();
+
+ fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
+ let value = value.trim();
+ if value.is_empty() {
+ return Err(());
+ }
+
+ Ok(Self(value.to_string()))
+ }
+}
+
+impl TryFrom<String> for SessionId {
+ type Error = ();
+
+ fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
+ Self::try_from(value.as_str())
+ }
+}
+
+impl AsRef<str> for SessionId {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
+impl Display for SessionId {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&self.0)
+ }
+}
+
+pub struct SemanticGrpcService {
+ inner: Arc<SemanticComponentInner>,
+}
+
+#[tonic::async_trait]
+impl SemanticSvc for SemanticGrpcService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn record_commands(
+ &self,
+ request: Request<Streaming<CommandCapture>>,
+ ) -> Result<Response<RecordCommandsReply>, Status> {
+ let mut stream = request.into_inner();
+ let mut accepted = 0_u64;
+
+ while let Some(capture) = stream.message().await? {
+ if self.inner.record_capture(capture).await {
+ accepted += 1;
+ }
+ }
+
+ Ok(Response::new(RecordCommandsReply { accepted }))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn command_output(
+ &self,
+ request: Request<CommandOutputRequest>,
+ ) -> Result<Response<CommandOutputReply>, Status> {
+ let request = request.into_inner();
+ if request.history_id.trim().is_empty() {
+ return Err(Status::invalid_argument("history_id is required"));
+ }
+
+ Ok(Response::new(self.inner.command_output(&request).await))
+ }
+}
+
+fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> {
+ let value = value?.trim();
+ (!value.is_empty()).then(|| HistoryId(value.to_string()))
+}
+
+fn take_pending_history(
+ histories: &mut VecDeque<History>,
+ history_id: &HistoryId,
+) -> Option<History> {
+ let index = histories
+ .iter()
+ .position(|history| &history.id == history_id)?;
+ histories.remove(index)
+}
+
+fn push_pending_history(histories: &mut VecDeque<History>, history: History) {
+ if let Some(index) = histories
+ .iter()
+ .position(|pending| pending.id == history.id)
+ {
+ histories.remove(index);
+ }
+
+ histories.push_back(history);
+ trim_front(histories, MAX_PENDING_HISTORIES);
+}
+
+fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) {
+ while records.len() > max_len {
+ records.pop_front();
+ }
+}
+
+fn command_output_not_found() -> CommandOutputReply {
+ CommandOutputReply {
+ found: false,
+ output: String::new(),
+ total_bytes: 0,
+ total_lines: 0,
+ lines: Vec::new(),
+ output_truncated: false,
+ output_observed_bytes: 0,
+ }
+}
+
+fn select_output_ranges(
+ output: &str,
+ ranges: &[crate::atuin_daemon::semantic::OutputRange],
+) -> Vec<OutputLine> {
+ let lines: Vec<&str> = output.lines().collect();
+ if lines.is_empty() {
+ return Vec::new();
+ }
+
+ let ranges = if ranges.is_empty() {
+ vec![crate::atuin_daemon::semantic::OutputRange { start: 0, end: 999 }]
+ } else {
+ ranges.to_vec()
+ };
+
+ let mut ranges = ranges
+ .into_iter()
+ .filter_map(|range| normalize_line_range(range.start, range.end, lines.len()))
+ .collect::<Vec<_>>();
+ ranges.sort_unstable_by_key(|(start, _)| *start);
+
+ let mut merged: Vec<(usize, usize)> = Vec::new();
+ for (start, end) in ranges {
+ match merged.last_mut() {
+ Some((_, merged_end)) if start <= merged_end.saturating_add(1) => {
+ *merged_end = (*merged_end).max(end);
+ }
+ _ => merged.push((start, end)),
+ }
+ }
+
+ merged
+ .into_iter()
+ .flat_map(|(start, end)| {
+ lines[start..=end]
+ .iter()
+ .enumerate()
+ .map(move |(offset, line)| OutputLine {
+ line_number: (start + offset + 1) as u64,
+ content: (*line).to_string(),
+ })
+ })
+ .collect()
+}
+
+fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> {
+ let line_count = i64::try_from(line_count).ok()?;
+ let start = if start < 0 { line_count + start } else { start };
+ let end = if end < 0 { line_count + end } else { end };
+
+ if end < 0 || start >= line_count {
+ return None;
+ }
+
+ let start = start.max(0);
+ let end = end.min(line_count - 1);
+
+ (start <= end).then_some((start as usize, end as usize))
+}
+
+fn log_record(record: &SemanticCommandRecord, message: &'static str) {
+ let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>");
+ let associated_history_id = record
+ .history
+ .as_ref()
+ .map(|history| history.id.to_string());
+ let exit = record.history.as_ref().map(|history| history.exit);
+ let duration = record.history.as_ref().map(|history| history.duration);
+ let author = record
+ .history
+ .as_ref()
+ .map(|history| history.author.as_str());
+ let session_id = record.capture.session_id.as_deref();
+
+ tracing::debug!(
+ history_id = %history_id,
+ associated_history_id = ?associated_history_id,
+ session_id = ?session_id,
+ command_bytes = record.capture.command.len(),
+ prompt_bytes = record.capture.prompt.len(),
+ output_bytes = record.capture.output.len(),
+ output_truncated = record.capture.output_truncated,
+ output_observed_bytes = record.capture.output_observed_bytes,
+ capture_exit_code = ?record.capture.exit_code,
+ history_exit = ?exit,
+ duration = ?duration,
+ author = ?author,
+ "{message}"
+ );
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::OffsetDateTime;
+
+ fn history(id: &str, session: &str, command: &str) -> History {
+ History {
+ id: HistoryId(id.to_string()),
+ timestamp: OffsetDateTime::UNIX_EPOCH,
+ duration: 0,
+ exit: 0,
+ command: command.to_string(),
+ cwd: String::new(),
+ session: session.to_string(),
+ hostname: String::new(),
+ author: String::new(),
+ intent: None,
+ deleted_at: None,
+ }
+ }
+
+ fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture {
+ CommandCapture {
+ prompt: String::new(),
+ command: String::new(),
+ output: output.to_string(),
+ exit_code: None,
+ history_id: history_id.map(str::to_string),
+ session_id: session_id.map(str::to_string),
+ output_truncated: false,
+ output_observed_bytes: output.len() as u64,
+ }
+ }
+
+ fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply {
+ state.command_output(&CommandOutputRequest {
+ history_id: history_id.to_string(),
+ ranges: Vec::new(),
+ })
+ }
+
+ fn output_line(line_number: u64, content: &str) -> OutputLine {
+ OutputLine {
+ line_number,
+ content: content.to_string(),
+ }
+ }
+
+ #[test]
+ fn drops_capture_without_history_id() {
+ let mut state = SemanticState::default();
+
+ assert!(!state.record_capture(capture(None, Some("session-1"), "output")));
+ assert!(!command_output(&mut state, "id-1").found);
+ assert_eq!(state.record_count(), 0);
+ }
+
+ #[test]
+ fn stores_capture_by_session_and_history_id() {
+ let mut state = SemanticState::default();
+
+ assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
+
+ let reply = command_output(&mut state, "id-1");
+ assert!(reply.found);
+ assert_eq!(reply.total_bytes, 6);
+ assert_eq!(reply.output_observed_bytes, 6);
+ assert_eq!(reply.lines, vec![output_line(1, "output")]);
+ }
+
+ #[test]
+ fn uses_pending_history_session_when_capture_session_is_missing() {
+ let mut state = SemanticState::default();
+
+ state.record_history(history("id-1", "session-from-history", "cargo test"));
+ assert!(state.record_capture(capture(Some("id-1"), None, "output")));
+
+ assert!(
+ state
+ .sessions
+ .contains_key(&SessionId("session-from-history".to_string()))
+ );
+ assert!(command_output(&mut state, "id-1").found);
+ }
+
+ #[test]
+ fn associates_history_by_id_after_capture_arrives() {
+ let mut state = SemanticState::default();
+
+ assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
+ state.record_history(history("id-1", "session-1", "different command"));
+
+ let capture_ref = state
+ .history_index
+ .get(&HistoryId("id-1".to_string()))
+ .unwrap();
+ let stored = state
+ .sessions
+ .get(&capture_ref.session_id)
+ .unwrap()
+ .stored_capture(capture_ref.capture_id)
+ .unwrap();
+ assert!(stored.record.history.is_some());
+ }
+
+ #[test]
+ fn evicts_oldest_command_when_session_ring_is_full() {
+ let mut state = SemanticState::default();
+
+ for index in 0..=MAX_COMMANDS_PER_SESSION {
+ assert!(state.record_capture(capture(
+ Some(&format!("id-{index}")),
+ Some("session-1"),
+ "output",
+ )));
+ }
+
+ assert!(!command_output(&mut state, "id-0").found);
+ assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found);
+ assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION);
+ }
+
+ #[test]
+ fn evicts_oldest_session_after_lru_limit() {
+ let mut state = SemanticState::default();
+
+ for index in 0..MAX_SESSIONS {
+ assert!(state.record_capture(capture(
+ Some(&format!("id-{index}")),
+ Some(&format!("session-{index}")),
+ "output",
+ )));
+ }
+ assert!(command_output(&mut state, "id-0").found);
+
+ assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",)));
+
+ assert!(command_output(&mut state, "id-0").found);
+ assert!(!command_output(&mut state, "id-1").found);
+ assert!(command_output(&mut state, "new-id").found);
+ assert_eq!(state.sessions.len(), MAX_SESSIONS);
+ }
+
+ #[test]
+ fn evicts_by_session_byte_limit() {
+ let mut session = SessionCaptures::default();
+ let first_output = "x".repeat(10);
+ let second_output = "y";
+ let (_, evicted_first) = session.push_with_limits(
+ HistoryId("first".to_string()),
+ SemanticCommandRecord {
+ capture: capture(Some("first"), Some("session-1"), &first_output),
+ history: None,
+ },
+ MAX_COMMANDS_PER_SESSION,
+ 10,
+ );
+ assert!(evicted_first.is_empty());
+
+ let (_, evicted_second) = session.push_with_limits(
+ HistoryId("second".to_string()),
+ SemanticCommandRecord {
+ capture: capture(Some("second"), Some("session-1"), second_output),
+ history: None,
+ },
+ MAX_COMMANDS_PER_SESSION,
+ 10,
+ );
+
+ assert_eq!(evicted_second.len(), 1);
+ assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string()));
+ assert_eq!(session.records.len(), 1);
+ assert_eq!(session.output_bytes, 1);
+ }
+
+ #[test]
+ fn command_output_reports_truncation_metadata() {
+ let mut state = SemanticState::default();
+ let mut capture = capture(Some("id-1"), Some("session-1"), "partial");
+ capture.output_truncated = true;
+ capture.output_observed_bytes = 1024;
+
+ assert!(state.record_capture(capture));
+
+ let reply = command_output(&mut state, "id-1");
+ assert!(reply.output_truncated);
+ assert_eq!(reply.total_bytes, 7);
+ assert_eq!(reply.output_observed_bytes, 1024);
+ }
+
+ #[test]
+ fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() {
+ let output = "zero\none\ntwo\nthree\nfour";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 1, end: 2 },
+ crate::atuin_daemon::semantic::OutputRange { start: -2, end: -1 },
+ ];
+
+ assert_eq!(
+ select_output_ranges(output, &ranges),
+ vec![
+ output_line(2, "one"),
+ output_line(3, "two"),
+ output_line(4, "three"),
+ output_line(5, "four"),
+ ]
+ );
+ }
+
+ #[test]
+ fn output_ranges_merge_overlaps_and_adjacent_ranges() {
+ let output = (0..100)
+ .map(|n| format!("line {n}"))
+ .collect::<Vec<_>>()
+ .join("\n");
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 0, end: 100 },
+ crate::atuin_daemon::semantic::OutputRange {
+ start: -100,
+ end: -1,
+ },
+ ];
+
+ let selected = select_output_ranges(&output, &ranges);
+
+ assert_eq!(selected.len(), 100);
+ assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
+ assert_eq!(selected.last(), Some(&output_line(100, "line 99")));
+ }
+
+ #[test]
+ fn output_ranges_can_leave_gaps_for_client_formatting() {
+ let output = "zero\none\ntwo\nthree\nfour";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 0, end: 1 },
+ crate::atuin_daemon::semantic::OutputRange { start: 4, end: 4 },
+ ];
+
+ assert_eq!(
+ select_output_ranges(output, &ranges),
+ vec![
+ output_line(1, "zero"),
+ output_line(2, "one"),
+ output_line(5, "four"),
+ ]
+ );
+ }
+
+ #[test]
+ fn empty_output_ranges_default_to_first_thousand_lines() {
+ let output = (0..1001)
+ .map(|n| format!("line {n}"))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ let selected = select_output_ranges(&output, &[]);
+
+ assert_eq!(selected.len(), 1000);
+ assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
+ assert_eq!(selected.last(), Some(&output_line(1000, "line 999")));
+ }
+
+ #[test]
+ fn output_ranges_skip_ranges_fully_outside_output() {
+ let output = "zero\none\ntwo";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 10, end: 20 },
+ crate::atuin_daemon::semantic::OutputRange {
+ start: -20,
+ end: -10,
+ },
+ ];
+
+ assert_eq!(select_output_ranges(output, &ranges), Vec::new());
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs
new file mode 100644
index 00000000..c76fb71b
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/sync.rs
@@ -0,0 +1,279 @@
+//! Sync component.
+//!
+//! Handles periodic synchronization with the Atuin cloud server.
+
+use std::time::Duration;
+
+use eyre::Result;
+use rand::Rng;
+use tokio::sync::mpsc;
+use tokio::time::{self, MissedTickBehavior};
+
+use crate::atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+};
+
+/// Commands that can be sent to the sync task.
+enum SyncCommand {
+ /// Trigger an immediate sync.
+ ForceSync,
+ /// Stop the sync loop.
+ Stop,
+}
+
+/// Sync state - tracks whether we're in normal operation or retrying after failure.
+#[derive(Clone, Copy, PartialEq, Eq)]
+enum SyncState {
+ /// Normal operation. Periodic syncs only run if auto_sync is enabled.
+ Idle,
+ /// Retrying after a sync failure. Retries continue regardless of auto_sync
+ /// until the sync succeeds.
+ Retrying,
+}
+
+/// Sync component - handles periodic cloud synchronization.
+///
+/// This component:
+/// - Runs a background sync loop on a configurable interval
+/// - Implements exponential backoff on sync failures
+/// - Responds to ForceSync events for immediate sync
+/// - Emits SyncCompleted/SyncFailed events
+pub struct SyncComponent {
+ task_handle: Option<tokio::task::JoinHandle<()>>,
+ command_tx: Option<mpsc::Sender<SyncCommand>>,
+}
+
+impl SyncComponent {
+ /// Create a new sync component.
+ pub fn new() -> Self {
+ Self {
+ task_handle: None,
+ command_tx: None,
+ }
+ }
+}
+
+impl Default for SyncComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SyncComponent {
+ fn name(&self) -> &'static str {
+ "sync"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ let (cmd_tx, cmd_rx) = mpsc::channel(16);
+ self.command_tx = Some(cmd_tx);
+
+ // Spawn the sync loop with its own copy of the handle
+ self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
+
+ tracing::info!("sync component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::ForceSync = event {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::ForceSync).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::Stop).await;
+ }
+ if let Some(handle) = self.task_handle.take() {
+ // Give the task a moment to shut down gracefully
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ tracing::info!("sync component stopped");
+ Ok(())
+ }
+}
+
+/// The main sync loop.
+///
+/// This runs in a spawned task and handles periodic sync as well as
+/// force sync requests.
+async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
+ tracing::info!("sync loop starting");
+
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+ let host_id = match Settings::host_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("failed to get host id, sync disabled: {e}");
+ return;
+ }
+ };
+
+ // Create the stores we need
+ let encryption_key = *handle.encryption_key();
+ let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
+
+ // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
+ let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop.
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ let mut sync_state = SyncState::Idle;
+
+ loop {
+ tokio::select! {
+ _ = ticker.tick() => {
+ let settings = handle.settings().await;
+
+ // Skip periodic ticks if auto_sync is disabled AND we're not retrying
+ // a previous failure. Retries must continue regardless of auto_sync.
+ if !settings.auto_sync && sync_state == SyncState::Idle {
+ tracing::debug!("auto_sync disabled, skipping periodic sync tick");
+ continue;
+ }
+
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ cmd = cmd_rx.recv() => {
+ match cmd {
+ Some(SyncCommand::ForceSync) => {
+ tracing::info!("executing force sync");
+ let settings = handle.settings().await;
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ Some(SyncCommand::Stop) | None => {
+ tracing::info!("sync loop stopping");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Execute a single sync tick.
+///
+/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
+async fn do_sync_tick(
+ handle: &DaemonHandle,
+ history_store: &HistoryStore,
+ ticker: &mut time::Interval,
+ max_interval: f64,
+ settings: &Settings,
+) -> SyncState {
+ tracing::info!("sync tick");
+
+ // Check if logged in
+ let logged_in = match settings.logged_in().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!("failed to check login status, skipping sync tick: {e}");
+ return SyncState::Idle;
+ }
+ };
+
+ if !logged_in {
+ tracing::debug!("not logged in, skipping sync tick");
+ return SyncState::Idle;
+ }
+
+ // Perform the sync
+ let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
+
+ match res {
+ Err(e) => {
+ tracing::error!("sync tick failed with {e}");
+
+ // Emit failure event
+ handle.emit(DaemonEvent::SyncFailed {
+ error: e.to_string(),
+ });
+
+ // Exponential backoff
+ let mut rng = rand::thread_rng();
+ let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ if new_interval > max_interval {
+ new_interval = max_interval;
+ }
+
+ *ticker = time::interval_at(
+ tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
+ time::Duration::from_secs(new_interval as u64),
+ );
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+
+ SyncState::Retrying
+ }
+ Ok((uploaded_count, downloaded_records)) => {
+ tracing::info!(
+ uploaded = uploaded_count,
+ downloaded = downloaded_records.len(),
+ "sync complete"
+ );
+
+ // Build history from downloaded records
+ if let Err(e) = history_store
+ .incremental_build(handle.history_db(), &downloaded_records)
+ .await
+ {
+ tracing::error!("failed to build history from downloaded records: {e}");
+ }
+
+ // Emit the records added event (for search indexing)
+ handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
+
+ // Emit sync completed event
+ handle.emit(DaemonEvent::SyncCompleted {
+ uploaded: uploaded_count as usize,
+ downloaded: downloaded_records.len(),
+ });
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ *ticker = time::interval_at(
+ tokio::time::Instant::now()
+ + Duration::from_secs(settings.daemon.sync_frequency),
+ time::Duration::from_secs(settings.daemon.sync_frequency),
+ );
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ }
+
+ // Store sync time
+ if let Err(e) = Settings::save_sync_time().await {
+ tracing::error!("failed to save sync time: {e}");
+ }
+
+ SyncState::Idle
+ }
+ }
+}