aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_daemon
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/turtle/src/atuin_daemon
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/turtle/src/atuin_daemon')
-rw-r--r--crates/turtle/src/atuin_daemon/client.rs418
-rw-r--r--crates/turtle/src/atuin_daemon/components/history.rs327
-rw-r--r--crates/turtle/src/atuin_daemon/components/mod.rs25
-rw-r--r--crates/turtle/src/atuin_daemon/components/search.rs413
-rw-r--r--crates/turtle/src/atuin_daemon/components/semantic.rs903
-rw-r--r--crates/turtle/src/atuin_daemon/components/sync.rs279
-rw-r--r--crates/turtle/src/atuin_daemon/control/mod.rs12
-rw-r--r--crates/turtle/src/atuin_daemon/control/service.rs71
-rw-r--r--crates/turtle/src/atuin_daemon/daemon.rs458
-rw-r--r--crates/turtle/src/atuin_daemon/events.rs74
-rw-r--r--crates/turtle/src/atuin_daemon/history/mod.rs6
-rw-r--r--crates/turtle/src/atuin_daemon/mod.rs128
-rw-r--r--crates/turtle/src/atuin_daemon/search/index.rs684
-rw-r--r--crates/turtle/src/atuin_daemon/search/mod.rs11
-rw-r--r--crates/turtle/src/atuin_daemon/semantic/mod.rs3
-rw-r--r--crates/turtle/src/atuin_daemon/server.rs115
16 files changed, 3927 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_daemon/client.rs b/crates/turtle/src/atuin_daemon/client.rs
new file mode 100644
index 00000000..45ef19e9
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/client.rs
@@ -0,0 +1,418 @@
+use crate::atuin_client::database::Context;
+use crate::atuin_client::settings::{FilterMode, Settings};
+use eyre::{Context as EyreContext, Result};
+use tonic::Code;
+use tonic::transport::{Channel, Endpoint, Uri};
+use tower::service_fn;
+
+use hyper_util::rt::TokioIo;
+
+#[cfg(unix)]
+use tokio::net::UnixStream;
+
+use crate::atuin_client::history::History;
+use tracing::{Level, instrument, span};
+
+use crate::atuin_daemon::control::HistoryRebuiltEvent;
+use crate::atuin_daemon::control::{
+ ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
+ SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
+};
+use crate::atuin_daemon::events::DaemonEvent;
+use crate::atuin_daemon::history::{
+ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
+ StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest,
+ history_client::HistoryClient as HistoryServiceClient,
+};
+use crate::atuin_daemon::search::{
+ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
+ search_client::SearchClient as SearchServiceClient,
+};
+use crate::atuin_daemon::semantic::{
+ CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply,
+ semantic_client::SemanticClient as SemanticServiceClient,
+};
+
+pub struct HistoryClient {
+ client: HistoryServiceClient<Channel>,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum DaemonClientErrorKind {
+ Connect,
+ Unavailable,
+ Unimplemented,
+ Other,
+}
+
+#[must_use]
+pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind {
+ for cause in error.chain() {
+ if cause.downcast_ref::<tonic::transport::Error>().is_some() {
+ return DaemonClientErrorKind::Connect;
+ }
+
+ if let Some(status) = cause.downcast_ref::<tonic::Status>() {
+ return match status.code() {
+ Code::Unavailable => DaemonClientErrorKind::Unavailable,
+ Code::Unimplemented => DaemonClientErrorKind::Unimplemented,
+ _ => DaemonClientErrorKind::Other,
+ };
+ }
+ }
+
+ DaemonClientErrorKind::Other
+}
+
+// Wrap the grpc client
+impl HistoryClient {
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ use eyre::Context;
+
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = HistoryServiceClient::new(channel);
+
+ Ok(HistoryClient { client })
+ }
+
+ pub async fn start_history(&mut self, h: History) -> Result<StartHistoryReply> {
+ let req = StartHistoryRequest {
+ command: h.command,
+ cwd: h.cwd,
+ hostname: h.hostname,
+ session: h.session,
+ timestamp: h.timestamp.unix_timestamp_nanos() as u64,
+ author: h.author,
+ intent: h.intent.unwrap_or_default(),
+ };
+
+ Ok(self.client.start_history(req).await?.into_inner())
+ }
+
+ pub async fn end_history(
+ &mut self,
+ id: String,
+ duration: u64,
+ exit: i64,
+ ) -> Result<EndHistoryReply> {
+ let req = EndHistoryRequest { id, duration, exit };
+
+ Ok(self.client.end_history(req).await?.into_inner())
+ }
+
+ pub async fn status(&mut self) -> Result<StatusReply> {
+ Ok(self.client.status(StatusRequest {}).await?.into_inner())
+ }
+
+ pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> {
+ Ok(self
+ .client
+ .tail_history(TailHistoryRequest {})
+ .await?
+ .into_inner())
+ }
+
+ pub async fn shutdown(&mut self) -> Result<bool> {
+ let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner();
+ Ok(resp.accepted)
+ }
+}
+
+pub struct SearchClient {
+ client: SearchServiceClient<Channel>,
+}
+
+impl SearchClient {
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = SearchServiceClient::new(channel);
+
+ Ok(SearchClient { client })
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))]
+ pub async fn search(
+ &mut self,
+ query: String,
+ query_id: u64,
+ filter_mode: FilterMode,
+ context: Option<Context>,
+ ) -> Result<tonic::Streaming<SearchResponse>> {
+ let request = SearchRequest {
+ query,
+ query_id,
+ filter_mode: RpcFilterMode::from(filter_mode).into(),
+ context: context.map(RpcSearchContext::from),
+ };
+ let request_stream = tokio_stream::once(request);
+ let response = span!(Level::TRACE, "daemon_client_search.request")
+ .in_scope(async || self.client.search(request_stream).await)
+ .await?;
+
+ Ok(response.into_inner())
+ }
+}
+
+impl From<FilterMode> for RpcFilterMode {
+ fn from(filter_mode: FilterMode) -> Self {
+ match filter_mode {
+ FilterMode::Global => RpcFilterMode::Global,
+ FilterMode::Host => RpcFilterMode::Host,
+ FilterMode::Session => RpcFilterMode::Session,
+ FilterMode::Directory => RpcFilterMode::Directory,
+ FilterMode::Workspace => RpcFilterMode::Workspace,
+ FilterMode::SessionPreload => RpcFilterMode::SessionPreload,
+ }
+ }
+}
+
+impl From<Context> for RpcSearchContext {
+ fn from(context: Context) -> Self {
+ RpcSearchContext {
+ session_id: context.session,
+ cwd: context.cwd,
+ hostname: context.hostname,
+ host_id: context.host_id,
+ git_root: context
+ .git_root
+ .map(|path| path.to_string_lossy().to_string()),
+ }
+ }
+}
+
+pub struct SemanticClient {
+ client: SemanticServiceClient<Channel>,
+}
+
+impl SemanticClient {
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = SemanticServiceClient::new(channel);
+
+ Ok(SemanticClient { client })
+ }
+
+ #[cfg(unix)]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.socket_path.clone()).await
+ }
+
+ pub async fn record_commands(
+ &mut self,
+ captures: Vec<CommandCapture>,
+ ) -> Result<RecordCommandsReply> {
+ let stream = tokio_stream::iter(captures);
+ Ok(self.client.record_commands(stream).await?.into_inner())
+ }
+
+ pub async fn command_output(
+ &mut self,
+ history_id: String,
+ ranges: Vec<(i64, i64)>,
+ ) -> Result<CommandOutputReply> {
+ let request = CommandOutputRequest {
+ history_id,
+ ranges: ranges
+ .into_iter()
+ .map(|(start, end)| OutputRange { start, end })
+ .collect(),
+ };
+
+ Ok(self.client.command_output(request).await?.into_inner())
+ }
+}
+
+// ============================================================================
+// Control Client
+// ============================================================================
+
+/// Client for the Control gRPC service.
+///
+/// Used to inject events into a running daemon from external processes.
+pub struct ControlClient {
+ client: ControlServiceClient<Channel>,
+}
+
+impl ControlClient {
+ /// Connect to the daemon's control service.
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = ControlServiceClient::new(channel);
+
+ Ok(ControlClient { client })
+ }
+
+ /// Connect using settings.
+ #[cfg(unix)]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.socket_path.clone()).await
+ }
+
+ /// Send an event to the daemon.
+ pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> {
+ let proto_event = daemon_event_to_proto(event);
+ let request = SendEventRequest {
+ event: Some(proto_event),
+ };
+ self.client.send_event(request).await?;
+ Ok(())
+ }
+}
+
+/// Convert a daemon event to its proto representation.
+fn daemon_event_to_proto(
+ event: DaemonEvent,
+) -> crate::atuin_daemon::control::send_event_request::Event {
+ use crate::atuin_daemon::control::send_event_request::Event;
+
+ match event {
+ DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
+ DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}),
+ DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent {
+ ids: ids.into_iter().map(|id| id.0).collect(),
+ }),
+ DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}),
+ DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}),
+ DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}),
+ // These events are internal and not sent via the control service
+ DaemonEvent::HistoryStarted(_)
+ | DaemonEvent::HistoryEnded(_)
+ | DaemonEvent::RecordsAdded(_)
+ | DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. } => {
+ // Use shutdown as a fallback, though this shouldn't happen
+ tracing::warn!("attempted to send internal event via control service");
+ Event::Shutdown(ShutdownEvent {})
+ }
+ }
+}
+
+// ============================================================================
+// Convenience Functions
+// ============================================================================
+
+/// Emit an event to the daemon.
+///
+/// This is a fire-and-forget helper for sending events to the daemon from
+/// external processes like CLI commands. If the daemon isn't running, this
+/// will silently succeed (returns Ok).
+///
+/// # Example
+///
+/// ```ignore
+/// // After pruning history
+/// emit_event(DaemonEvent::HistoryPruned).await?;
+///
+/// // After deleting specific history items
+/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?;
+///
+/// // Request immediate sync
+/// emit_event(DaemonEvent::ForceSync).await?;
+/// ```
+pub async fn emit_event(event: DaemonEvent) -> Result<()> {
+ emit_event_with_settings(event, None).await
+}
+
+/// Emit an event to the daemon with explicit settings.
+///
+/// If settings are not provided, they will be loaded from the default location.
+/// If the daemon isn't running, this will silently succeed.
+pub async fn emit_event_with_settings(
+ event: DaemonEvent,
+ settings: Option<&Settings>,
+) -> Result<()> {
+ // Load settings if not provided
+ let owned_settings;
+ let settings = match settings {
+ Some(s) => s,
+ None => {
+ owned_settings = Settings::new()?;
+ &owned_settings
+ }
+ };
+
+ // Try to connect - if daemon isn't running, that's fine
+ let mut client = match ControlClient::from_settings(settings).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::debug!(?e, "daemon not running, skipping event emission");
+ return Ok(());
+ }
+ };
+
+ // Send the event
+ if let Err(e) = client.send_event(event).await {
+ tracing::debug!(?e, "failed to send event to daemon");
+ // Don't fail - this is fire-and-forget
+ }
+
+ Ok(())
+}
diff --git a/crates/turtle/src/atuin_daemon/components/history.rs b/crates/turtle/src/atuin_daemon/components/history.rs
new file mode 100644
index 00000000..95d34b69
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/history.rs
@@ -0,0 +1,327 @@
+//! History component.
+//!
+//! Handles command history lifecycle (start/end) and provides the History gRPC service.
+
+use std::{pin::Pin, sync::Arc};
+
+use crate::atuin_client::{
+ database::Database,
+ history::{History, HistoryId, store::HistoryStore},
+ settings::Settings,
+};
+use dashmap::DashMap;
+use eyre::Result;
+use time::OffsetDateTime;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status};
+use tracing::{Level, instrument};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ history::{
+ EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
+ ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
+ TailHistoryReply, TailHistoryRequest,
+ history_server::{History as HistorySvc, HistoryServer},
+ },
+};
+
+const DAEMON_PROTOCOL_VERSION: u32 = 1;
+
+/// History component - manages command history lifecycle.
+///
+/// This component:
+/// - Tracks currently running commands (stored in memory)
+/// - Saves completed commands to the database and record store
+/// - Emits history events for other components (e.g., search indexing)
+/// - Provides the History gRPC service
+pub struct HistoryComponent {
+ inner: Arc<HistoryComponentInner>,
+}
+
+struct HistoryComponentInner {
+ /// Commands currently running (not yet completed).
+ running: DashMap<HistoryId, History>,
+
+ /// Handle to the daemon (set during start).
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+
+ /// History store for pushing records (set during start).
+ history_store: tokio::sync::RwLock<Option<HistoryStore>>,
+}
+
+impl HistoryComponent {
+ /// Create a new history component.
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(HistoryComponentInner {
+ running: DashMap::new(),
+ handle: tokio::sync::RwLock::new(None),
+ history_store: tokio::sync::RwLock::new(None),
+ }),
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ ///
+ /// This returns a tonic service that can be added to a gRPC server.
+ pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
+ HistoryServer::new(HistoryGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for HistoryComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for HistoryComponent {
+ fn name(&self) -> &'static str {
+ "history"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ // Create the history store
+ let host_id = Settings::host_id().await?;
+ let history_store =
+ HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
+
+ *self.inner.history_store.write().await = Some(history_store);
+ *self.inner.handle.write().await = Some(handle);
+
+ tracing::info!("history component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
+ // History component produces events but doesn't need to react to them
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ tracing::info!("history component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+///
+/// This is a thin wrapper that delegates to the component's shared state.
+pub struct HistoryGrpcService {
+ inner: Arc<HistoryComponentInner>,
+}
+
+fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
+ TailHistoryReply {
+ kind: kind as i32,
+ history: Some(HistoryEntry {
+ timestamp: history.timestamp.unix_timestamp_nanos() as u64,
+ id: history.id.0,
+ command: history.command,
+ cwd: history.cwd,
+ session: history.session,
+ hostname: history.hostname,
+ author: history.author,
+ intent: history.intent.unwrap_or_default(),
+ exit: history.exit,
+ duration: history.duration,
+ }),
+ }
+}
+
+#[tonic::async_trait]
+impl HistorySvc for HistoryGrpcService {
+ type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .author(req.author)
+ .intent(req.intent)
+ .build()
+ .into();
+
+ // Emit the event
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.emit(DaemonEvent::HistoryStarted(h.clone()));
+ }
+
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ self.inner.running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply {
+ id: id.to_string(),
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let req = request.into_inner();
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = self.inner.running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Get the handle and store to save the history
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let store_guard = self.inner.history_store.read().await;
+ let history_store = store_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ // Save to database
+ handle
+ .history_db()
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ // Push to record store
+ let (record_id, idx) = history_store
+ .push(history.clone())
+ .await
+ .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
+
+ // Emit the event
+ handle.emit(DaemonEvent::HistoryEnded(history));
+
+ let reply = EndHistoryReply {
+ id: record_id.0.to_string(),
+ idx,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn tail_history(
+ &self,
+ _request: Request<TailHistoryRequest>,
+ ) -> Result<Response<Self::TailHistoryStream>, Status> {
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .cloned()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let mut rx = handle.subscribe();
+ let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
+
+ tokio::spawn(async move {
+ loop {
+ let event = match rx.recv().await {
+ Ok(event) => event,
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ let _ = tx
+ .send(Err(Status::resource_exhausted(format!(
+ "tail stream lagged behind and dropped {skipped} events"
+ ))))
+ .await;
+ break;
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
+ };
+
+ let reply = match event {
+ DaemonEvent::HistoryStarted(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Started, history))
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Ended, history))
+ }
+ _ => None,
+ };
+
+ if let Some(reply) = reply
+ && tx.send(Ok(reply)).await.is_err()
+ {
+ break;
+ }
+ }
+ });
+
+ let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
+ Ok(Response::new(Box::pin(stream)))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn status(
+ &self,
+ _request: Request<StatusRequest>,
+ ) -> Result<Response<StatusReply>, Status> {
+ let reply = StatusReply {
+ healthy: true,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ pid: std::process::id(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn shutdown(
+ &self,
+ _request: Request<ShutdownRequest>,
+ ) -> Result<Response<ShutdownReply>, Status> {
+ // Use the daemon handle to request shutdown
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.shutdown();
+ }
+ Ok(Response::new(ShutdownReply { accepted: true }))
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/mod.rs b/crates/turtle/src/atuin_daemon/components/mod.rs
new file mode 100644
index 00000000..447e31df
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/mod.rs
@@ -0,0 +1,25 @@
+//! Daemon components.
+//!
+//! Components are the building blocks of the daemon. Each component handles
+//! a specific domain and can:
+//!
+//! - Expose gRPC services
+//! - React to events
+//! - Spawn background tasks
+//!
+//! Available components:
+//!
+//! - [`history::HistoryComponent`]: Command history lifecycle management
+//! - [`search::SearchComponent`]: Fuzzy search over history
+//! - [`semantic::SemanticComponent`]: In-memory semantic command captures
+//! - [`sync::SyncComponent`]: Cloud sync
+
+pub mod history;
+pub mod search;
+pub mod semantic;
+pub mod sync;
+
+pub use history::HistoryComponent;
+pub use search::SearchComponent;
+pub use semantic::SemanticComponent;
+pub use sync::SyncComponent;
diff --git a/crates/turtle/src/atuin_daemon/components/search.rs b/crates/turtle/src/atuin_daemon/components/search.rs
new file mode 100644
index 00000000..85191cff
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/search.rs
@@ -0,0 +1,413 @@
+//! Search component.
+//!
+//! Provides fuzzy search over command history using the Nucleo search library
+//! with frecency-based ranking and dynamic filtering.
+
+use std::{pin::Pin, sync::Arc};
+
+use crate::atuin_client::database::Database;
+use eyre::Result;
+use tokio::sync::RwLock;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, debug, info, instrument, span, trace};
+use uuid::Uuid;
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ search::{
+ FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
+ search_server::{Search as SearchSvc, SearchServer},
+ },
+};
+
+const PAGE_SIZE: usize = 5000;
+const RESULTS_LIMIT: u32 = 200;
+/// How often to rebuild the frecency map (in seconds).
+const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
+
+/// Search component - provides fuzzy search over command history.
+///
+/// This component:
+/// - Maintains a deduplicated search index with frecency ranking
+/// - Loads history from the database on startup
+/// - Updates the index when history events occur
+/// - Provides the Search gRPC service
+pub struct SearchComponent {
+ index: Arc<RwLock<SearchIndex>>,
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+ loader_handle: Option<tokio::task::JoinHandle<()>>,
+ frecency_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl SearchComponent {
+ /// Create a new search component.
+ pub fn new() -> Self {
+ Self {
+ index: Arc::new(RwLock::new(SearchIndex::new())),
+ handle: tokio::sync::RwLock::new(None),
+ loader_handle: None,
+ frecency_handle: None,
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
+ SearchServer::new(SearchGrpcService {
+ index: self.index.clone(),
+ })
+ }
+
+ /// Rebuild the entire search index from the database.
+ async fn rebuild_index(&self) -> Result<()> {
+ let handle_guard = self.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| eyre::eyre!("component not initialized"))?;
+
+ info!("Rebuilding search index from database");
+
+ // Create a new index
+ let new_index = SearchIndex::new();
+
+ // Load all history into the new index
+ let db = handle.history_db().clone();
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ new_index.add_histories(&histories);
+ }
+ Ok(None) => break,
+ Err(e) => {
+ tracing::error!("Failed to load history during rebuild: {}", e);
+ break;
+ }
+ }
+ }
+
+ info!(
+ "Search index rebuild complete; {} unique commands",
+ new_index.command_count()
+ );
+
+ // Replace the old index with the new one
+ *self.index.write().await = new_index;
+ Ok(())
+ }
+}
+
+impl Default for SearchComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SearchComponent {
+ fn name(&self) -> &'static str {
+ "search"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ *self.handle.write().await = Some(handle.clone());
+
+ // Spawn background task to load history into index
+ let index = self.index.clone();
+ let db = handle.history_db().clone();
+ let handle_for_loader = handle.clone();
+
+ self.loader_handle = Some(tokio::spawn(async move {
+ info!(
+ "Loading history into search index; page size = {}",
+ PAGE_SIZE
+ );
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ index.read().await.add_histories(&histories);
+ }
+ Ok(None) => {
+ info!(
+ "Initial history load complete; {} unique commands indexed",
+ index.read().await.command_count()
+ );
+ // Build initial frecency map with current settings
+ let settings = handle_for_loader.settings().await;
+ index.read().await.rebuild_frecency(&settings.search).await;
+ info!("Initial frecency map built");
+ break;
+ }
+ Err(e) => {
+ tracing::error!("Failed to load history: {}", e);
+ break;
+ }
+ }
+ }
+ }));
+
+ // Spawn background task to periodically refresh frecency
+ let index_for_frecency = self.index.clone();
+ let handle_for_frecency = handle.clone();
+ self.frecency_handle = Some(tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(
+ FRECENCY_REFRESH_INTERVAL_SECS,
+ ));
+ loop {
+ interval.tick().await;
+ trace!("Refreshing frecency map");
+ let settings = handle_for_frecency.settings().await;
+ index_for_frecency
+ .read()
+ .await
+ .rebuild_frecency(&settings.search)
+ .await;
+ }
+ }));
+
+ tracing::info!("search component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ match event {
+ DaemonEvent::RecordsAdded(records) => {
+ debug!(
+ count = records.len(),
+ "Processing added records for search index"
+ );
+
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let histories: Vec<_> = handle
+ .history_db()
+ .query_history(
+ format!(
+ "select * from history where id in ({})",
+ records
+ .iter()
+ .map(|record| record.0.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ )
+ .as_str(),
+ )
+ .await
+ .unwrap_or_default();
+
+ span!(Level::TRACE, "inject_records", count = histories.len())
+ .in_scope(async || {
+ self.index.read().await.add_histories(&histories);
+ })
+ .await;
+ }
+ }
+ DaemonEvent::HistoryStarted(history) => {
+ debug!(id = %history.id, command = %history.command, "History started (no index action)");
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ span!(Level::TRACE, "inject_history_ended")
+ .in_scope(async || {
+ self.index.read().await.add_history(history);
+ })
+ .await;
+ }
+ DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
+ info!("History store pruned or rebuilt, rebuilding search index");
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::HistoryDeleted { ids } => {
+ info!(
+ count = ids.len(),
+ "History deleted, rebuilding search index"
+ );
+ // For now, just rebuild the entire index. A more efficient implementation
+ // would remove specific items from the index.
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::SettingsReloaded => {
+ info!("Settings reloaded, rebuilding frecency map with new multipliers");
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let settings = handle.settings().await;
+ self.index
+ .read()
+ .await
+ .rebuild_frecency(&settings.search)
+ .await;
+ }
+ }
+ // Events we don't care about
+ DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. }
+ | DaemonEvent::ForceSync
+ | DaemonEvent::ShutdownRequested => {}
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(handle) = self.loader_handle.take() {
+ handle.abort();
+ }
+ if let Some(handle) = self.frecency_handle.take() {
+ handle.abort();
+ }
+ tracing::info!("search component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+pub struct SearchGrpcService {
+ index: Arc<RwLock<SearchIndex>>,
+}
+
+#[tonic::async_trait]
+impl SearchSvc for SearchGrpcService {
+ type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
+ async fn search(
+ &self,
+ request: Request<Streaming<SearchRequest>>,
+ ) -> Result<Response<Self::SearchStream>, Status> {
+ let mut in_stream = request.into_inner();
+ let index = self.index.clone();
+
+ // Create output channel
+ let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
+
+ // Spawn task to handle incoming requests and send responses
+ tokio::spawn(async move {
+ while let Some(req) = in_stream.message().await.transpose() {
+ match req {
+ Ok(search_req) => {
+ let query = search_req.query;
+ let query_id = search_req.query_id;
+ let filter_mode: FilterMode = search_req
+ .filter_mode
+ .try_into()
+ .unwrap_or(FilterMode::Global);
+ let proto_context = search_req.context;
+
+ debug!(
+ "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
+ query,
+ query_id,
+ filter_mode.as_str_name(),
+ proto_context
+ );
+
+ // Convert proto FilterMode + context to IndexFilterMode
+ let index_filter = convert_filter_mode(filter_mode, &proto_context);
+
+ // Build QueryContext from proto context
+ let query_context = proto_context
+ .map(|ctx| QueryContext {
+ cwd: Some(with_trailing_slash(&ctx.cwd)),
+ git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
+ hostname: Some(ctx.hostname),
+ session_id: Some(ctx.session_id),
+ })
+ .unwrap_or_default();
+
+ // Perform the search
+ let history_ids =
+ span!(Level::TRACE, "daemon_search_query", %query, query_id)
+ .in_scope(|| async {
+ let index = index.read().await;
+ index
+ .search(&query, index_filter, &query_context, RESULTS_LIMIT)
+ .await
+ })
+ .await;
+
+ // Convert history IDs to bytes
+ let ids: Vec<Vec<u8>> = history_ids
+ .iter()
+ .filter_map(|id| {
+ Uuid::parse_str(id)
+ .ok()
+ .map(|uuid| uuid.as_bytes().to_vec())
+ })
+ .collect();
+
+ if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
+ break; // Client disconnected
+ }
+ }
+ Err(e) => {
+ let _ = tx.send(Err(e)).await;
+ break;
+ }
+ }
+ }
+ });
+
+ // Convert receiver to stream
+ let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream)))
+ }
+}
+
+/// Convert proto FilterMode and context to IndexFilterMode.
+fn convert_filter_mode(
+ mode: FilterMode,
+ context: &Option<crate::atuin_daemon::search::SearchContext>,
+) -> IndexFilterMode {
+ match (mode, context) {
+ (FilterMode::Global, _) => IndexFilterMode::Global,
+ (FilterMode::Directory, Some(ctx)) => {
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ (FilterMode::Workspace, Some(ctx)) => {
+ if let Some(ref git_root) = ctx.git_root {
+ IndexFilterMode::Workspace(with_trailing_slash(git_root))
+ } else {
+ // Fall back to directory if no git root
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ }
+ (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
+ (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
+ (FilterMode::SessionPreload, Some(ctx)) => {
+ // SessionPreload is similar to Session - filter by session
+ IndexFilterMode::Session(ctx.session_id.clone())
+ }
+ // If no context provided, fall back to global
+ _ => IndexFilterMode::Global,
+ }
+}
+
+#[cfg(windows)]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('\\') {
+ s.to_string()
+ } else {
+ format!("{}\\", s)
+ }
+}
+
+#[cfg(not(windows))]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('/') {
+ s.to_string()
+ } else {
+ format!("{}/", s)
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/semantic.rs b/crates/turtle/src/atuin_daemon/components/semantic.rs
new file mode 100644
index 00000000..a42fd5cb
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/semantic.rs
@@ -0,0 +1,903 @@
+//! Semantic command capture component.
+//!
+//! This is a prototype in-memory store for completed command captures emitted
+//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes
+//! them by history ID for AI tool lookup.
+
+use std::collections::{HashMap, VecDeque};
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+
+use crate::atuin_client::history::{History, HistoryId};
+use eyre::Result;
+use tokio::sync::Mutex;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, instrument};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ semantic::{
+ CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply,
+ semantic_server::{Semantic as SemanticSvc, SemanticServer},
+ },
+};
+
+const MAX_SESSIONS: usize = 20;
+const MAX_COMMANDS_PER_SESSION: usize = 128;
+const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024;
+const MAX_PENDING_HISTORIES: usize = 128;
+
+/// Stores completed command captures and associates them with history events.
+pub struct SemanticComponent {
+ inner: Arc<SemanticComponentInner>,
+}
+
+struct SemanticComponentInner {
+ state: Mutex<SemanticState>,
+}
+
+#[derive(Default)]
+struct SemanticState {
+ sessions: HashMap<SessionId, SessionCaptures>,
+ session_lru: VecDeque<SessionId>,
+ history_index: HashMap<HistoryId, CaptureRef>,
+ pending_histories: VecDeque<History>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+struct SessionId(String);
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+struct CaptureId(u64);
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct CaptureRef {
+ session_id: SessionId,
+ capture_id: CaptureId,
+}
+
+#[derive(Default)]
+struct SessionCaptures {
+ next_id: u64,
+ records: VecDeque<StoredCapture>,
+ output_bytes: usize,
+}
+
+struct StoredCapture {
+ id: CaptureId,
+ history_id: HistoryId,
+ output_bytes: usize,
+ record: SemanticCommandRecord,
+}
+
+struct EvictedCapture {
+ history_id: HistoryId,
+ capture_id: CaptureId,
+}
+
+#[derive(Debug, Clone)]
+struct SemanticCommandRecord {
+ capture: CommandCapture,
+ history: Option<History>,
+}
+
+impl SemanticComponent {
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(SemanticComponentInner {
+ state: Mutex::new(SemanticState::default()),
+ }),
+ }
+ }
+
+ pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> {
+ SemanticServer::new(SemanticGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for SemanticComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SemanticComponent {
+ fn name(&self) -> &'static str {
+ "semantic"
+ }
+
+ async fn start(&mut self, _handle: DaemonHandle) -> Result<()> {
+ tracing::info!("semantic component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::HistoryEnded(history) = event {
+ self.inner.record_history(history.clone()).await;
+ }
+
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ let state = self.inner.state.lock().await;
+ tracing::info!(
+ sessions = state.sessions.len(),
+ records = state.record_count(),
+ indexed_histories = state.history_index.len(),
+ pending_histories = state.pending_histories.len(),
+ "semantic component stopped"
+ );
+ Ok(())
+ }
+}
+
+impl SemanticComponentInner {
+ async fn record_capture(&self, capture: CommandCapture) -> bool {
+ let mut state = self.state.lock().await;
+ state.record_capture(capture)
+ }
+
+ async fn record_history(&self, history: History) {
+ let mut state = self.state.lock().await;
+ state.record_history(history);
+ }
+
+ async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply {
+ let mut state = self.state.lock().await;
+ state.command_output(request)
+ }
+}
+
+impl SemanticState {
+ fn record_capture(&mut self, mut capture: CommandCapture) -> bool {
+ let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else {
+ tracing::debug!(
+ command_bytes = capture.command.len(),
+ prompt_bytes = capture.prompt.len(),
+ output_bytes = capture.output.len(),
+ output_truncated = capture.output_truncated,
+ "dropping semantic command capture without history id"
+ );
+ return false;
+ };
+
+ let history = take_pending_history(&mut self.pending_histories, &history_id);
+ let Some(session_id) = capture
+ .session_id
+ .as_deref()
+ .and_then(|session_id| SessionId::try_from(session_id).ok())
+ .or_else(|| {
+ history
+ .as_ref()
+ .and_then(|history| SessionId::try_from(history.session.as_str()).ok())
+ })
+ else {
+ tracing::debug!(
+ history_id = %history_id,
+ command_bytes = capture.command.len(),
+ prompt_bytes = capture.prompt.len(),
+ output_bytes = capture.output.len(),
+ output_truncated = capture.output_truncated,
+ "dropping semantic command capture without session id"
+ );
+ return false;
+ };
+
+ capture.history_id = Some(history_id.to_string());
+ capture.session_id = Some(session_id.to_string());
+ if capture.output_observed_bytes == 0 {
+ capture.output_observed_bytes = capture.output.len() as u64;
+ }
+
+ let record = SemanticCommandRecord { capture, history };
+ log_record(&record, "recorded semantic command capture");
+ self.push_record(session_id, history_id, record);
+ true
+ }
+
+ fn record_history(&mut self, history: History) {
+ let history_id = history.id.clone();
+
+ if let Some(capture_ref) = self.history_index.get(&history_id).cloned() {
+ if let Some(stored) = self.stored_capture_mut(&capture_ref) {
+ stored.record.history = Some(history);
+ log_record(
+ &stored.record,
+ "associated semantic command capture with history",
+ );
+ return;
+ }
+
+ self.history_index.remove(&history_id);
+ }
+
+ tracing::debug!(
+ id = %history.id,
+ command_bytes = history.command.len(),
+ "history ended before semantic capture arrived"
+ );
+ push_pending_history(&mut self.pending_histories, history);
+ }
+
+ fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply {
+ let Some(history_id) = history_id_from_str(Some(&request.history_id)) else {
+ return command_output_not_found();
+ };
+ let Some(capture_ref) = self.history_index.get(&history_id).cloned() else {
+ return command_output_not_found();
+ };
+
+ let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else {
+ self.history_index.remove(&history_id);
+ return command_output_not_found();
+ };
+
+ self.touch_session(&capture_ref.session_id);
+ reply
+ }
+
+ fn command_output_for_ref(
+ &self,
+ capture_ref: &CaptureRef,
+ ranges: &[crate::atuin_daemon::semantic::OutputRange],
+ ) -> Option<CommandOutputReply> {
+ let stored = self
+ .sessions
+ .get(&capture_ref.session_id)?
+ .stored_capture(capture_ref.capture_id)?;
+ let output = &stored.record.capture.output;
+ let output_observed_bytes = stored
+ .record
+ .capture
+ .output_observed_bytes
+ .max(output.len() as u64);
+
+ Some(CommandOutputReply {
+ found: true,
+ output: String::new(),
+ total_bytes: output.len() as u64,
+ total_lines: output.lines().count() as u64,
+ lines: select_output_ranges(output, ranges),
+ output_truncated: stored.record.capture.output_truncated,
+ output_observed_bytes,
+ })
+ }
+
+ fn push_record(
+ &mut self,
+ session_id: SessionId,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ ) {
+ self.touch_session(&session_id);
+
+ let (capture_id, evicted) = {
+ let session = self.sessions.entry(session_id.clone()).or_default();
+ session.push(history_id.clone(), record)
+ };
+
+ let capture_ref = CaptureRef {
+ session_id: session_id.clone(),
+ capture_id,
+ };
+ self.history_index.insert(history_id, capture_ref);
+
+ for evicted in evicted {
+ self.remove_history_index_if_matches(
+ &session_id,
+ &evicted.history_id,
+ evicted.capture_id,
+ );
+ }
+
+ self.expire_lru_sessions();
+ }
+
+ fn touch_session(&mut self, session_id: &SessionId) {
+ if let Some(index) = self.session_lru.iter().position(|id| id == session_id) {
+ self.session_lru.remove(index);
+ }
+ self.session_lru.push_back(session_id.clone());
+ }
+
+ fn expire_lru_sessions(&mut self) {
+ while self.session_lru.len() > MAX_SESSIONS {
+ let Some(session_id) = self.session_lru.pop_front() else {
+ break;
+ };
+ let Some(session) = self.sessions.remove(&session_id) else {
+ continue;
+ };
+
+ for stored in session.records {
+ self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id);
+ }
+ }
+ }
+
+ fn remove_history_index_if_matches(
+ &mut self,
+ session_id: &SessionId,
+ history_id: &HistoryId,
+ capture_id: CaptureId,
+ ) {
+ if self
+ .history_index
+ .get(history_id)
+ .is_some_and(|capture_ref| {
+ &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id
+ })
+ {
+ self.history_index.remove(history_id);
+ }
+ }
+
+ fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> {
+ self.sessions
+ .get_mut(&capture_ref.session_id)?
+ .stored_capture_mut(capture_ref.capture_id)
+ }
+
+ fn record_count(&self) -> usize {
+ self.sessions
+ .values()
+ .map(|session| session.records.len())
+ .sum()
+ }
+}
+
+impl SessionCaptures {
+ fn push(
+ &mut self,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ ) -> (CaptureId, Vec<EvictedCapture>) {
+ self.push_with_limits(
+ history_id,
+ record,
+ MAX_COMMANDS_PER_SESSION,
+ MAX_BYTES_PER_SESSION,
+ )
+ }
+
+ fn push_with_limits(
+ &mut self,
+ history_id: HistoryId,
+ record: SemanticCommandRecord,
+ max_commands: usize,
+ max_output_bytes: usize,
+ ) -> (CaptureId, Vec<EvictedCapture>) {
+ let capture_id = CaptureId(self.next_id);
+ self.next_id = self.next_id.saturating_add(1);
+ let output_bytes = record.capture.output.len();
+ self.output_bytes = self.output_bytes.saturating_add(output_bytes);
+ self.records.push_back(StoredCapture {
+ id: capture_id,
+ history_id,
+ output_bytes,
+ record,
+ });
+
+ (
+ capture_id,
+ self.evict_to_limits(max_commands, max_output_bytes),
+ )
+ }
+
+ fn evict_to_limits(
+ &mut self,
+ max_commands: usize,
+ max_output_bytes: usize,
+ ) -> Vec<EvictedCapture> {
+ let mut evicted = Vec::new();
+ while self.records.len() > max_commands || self.output_bytes > max_output_bytes {
+ let Some(record) = self.records.pop_front() else {
+ break;
+ };
+ self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes);
+ evicted.push(EvictedCapture {
+ history_id: record.history_id,
+ capture_id: record.id,
+ });
+ }
+ evicted
+ }
+
+ fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> {
+ self.records.iter().find(|record| record.id == capture_id)
+ }
+
+ fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> {
+ self.records
+ .iter_mut()
+ .find(|record| record.id == capture_id)
+ }
+}
+
+impl TryFrom<&str> for SessionId {
+ type Error = ();
+
+ fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
+ let value = value.trim();
+ if value.is_empty() {
+ return Err(());
+ }
+
+ Ok(Self(value.to_string()))
+ }
+}
+
+impl TryFrom<String> for SessionId {
+ type Error = ();
+
+ fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
+ Self::try_from(value.as_str())
+ }
+}
+
+impl AsRef<str> for SessionId {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
+impl Display for SessionId {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&self.0)
+ }
+}
+
+pub struct SemanticGrpcService {
+ inner: Arc<SemanticComponentInner>,
+}
+
+#[tonic::async_trait]
+impl SemanticSvc for SemanticGrpcService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn record_commands(
+ &self,
+ request: Request<Streaming<CommandCapture>>,
+ ) -> Result<Response<RecordCommandsReply>, Status> {
+ let mut stream = request.into_inner();
+ let mut accepted = 0_u64;
+
+ while let Some(capture) = stream.message().await? {
+ if self.inner.record_capture(capture).await {
+ accepted += 1;
+ }
+ }
+
+ Ok(Response::new(RecordCommandsReply { accepted }))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn command_output(
+ &self,
+ request: Request<CommandOutputRequest>,
+ ) -> Result<Response<CommandOutputReply>, Status> {
+ let request = request.into_inner();
+ if request.history_id.trim().is_empty() {
+ return Err(Status::invalid_argument("history_id is required"));
+ }
+
+ Ok(Response::new(self.inner.command_output(&request).await))
+ }
+}
+
+fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> {
+ let value = value?.trim();
+ (!value.is_empty()).then(|| HistoryId(value.to_string()))
+}
+
+fn take_pending_history(
+ histories: &mut VecDeque<History>,
+ history_id: &HistoryId,
+) -> Option<History> {
+ let index = histories
+ .iter()
+ .position(|history| &history.id == history_id)?;
+ histories.remove(index)
+}
+
+fn push_pending_history(histories: &mut VecDeque<History>, history: History) {
+ if let Some(index) = histories
+ .iter()
+ .position(|pending| pending.id == history.id)
+ {
+ histories.remove(index);
+ }
+
+ histories.push_back(history);
+ trim_front(histories, MAX_PENDING_HISTORIES);
+}
+
+fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) {
+ while records.len() > max_len {
+ records.pop_front();
+ }
+}
+
+fn command_output_not_found() -> CommandOutputReply {
+ CommandOutputReply {
+ found: false,
+ output: String::new(),
+ total_bytes: 0,
+ total_lines: 0,
+ lines: Vec::new(),
+ output_truncated: false,
+ output_observed_bytes: 0,
+ }
+}
+
+fn select_output_ranges(
+ output: &str,
+ ranges: &[crate::atuin_daemon::semantic::OutputRange],
+) -> Vec<OutputLine> {
+ let lines: Vec<&str> = output.lines().collect();
+ if lines.is_empty() {
+ return Vec::new();
+ }
+
+ let ranges = if ranges.is_empty() {
+ vec![crate::atuin_daemon::semantic::OutputRange { start: 0, end: 999 }]
+ } else {
+ ranges.to_vec()
+ };
+
+ let mut ranges = ranges
+ .into_iter()
+ .filter_map(|range| normalize_line_range(range.start, range.end, lines.len()))
+ .collect::<Vec<_>>();
+ ranges.sort_unstable_by_key(|(start, _)| *start);
+
+ let mut merged: Vec<(usize, usize)> = Vec::new();
+ for (start, end) in ranges {
+ match merged.last_mut() {
+ Some((_, merged_end)) if start <= merged_end.saturating_add(1) => {
+ *merged_end = (*merged_end).max(end);
+ }
+ _ => merged.push((start, end)),
+ }
+ }
+
+ merged
+ .into_iter()
+ .flat_map(|(start, end)| {
+ lines[start..=end]
+ .iter()
+ .enumerate()
+ .map(move |(offset, line)| OutputLine {
+ line_number: (start + offset + 1) as u64,
+ content: (*line).to_string(),
+ })
+ })
+ .collect()
+}
+
+fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> {
+ let line_count = i64::try_from(line_count).ok()?;
+ let start = if start < 0 { line_count + start } else { start };
+ let end = if end < 0 { line_count + end } else { end };
+
+ if end < 0 || start >= line_count {
+ return None;
+ }
+
+ let start = start.max(0);
+ let end = end.min(line_count - 1);
+
+ (start <= end).then_some((start as usize, end as usize))
+}
+
+fn log_record(record: &SemanticCommandRecord, message: &'static str) {
+ let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>");
+ let associated_history_id = record
+ .history
+ .as_ref()
+ .map(|history| history.id.to_string());
+ let exit = record.history.as_ref().map(|history| history.exit);
+ let duration = record.history.as_ref().map(|history| history.duration);
+ let author = record
+ .history
+ .as_ref()
+ .map(|history| history.author.as_str());
+ let session_id = record.capture.session_id.as_deref();
+
+ tracing::debug!(
+ history_id = %history_id,
+ associated_history_id = ?associated_history_id,
+ session_id = ?session_id,
+ command_bytes = record.capture.command.len(),
+ prompt_bytes = record.capture.prompt.len(),
+ output_bytes = record.capture.output.len(),
+ output_truncated = record.capture.output_truncated,
+ output_observed_bytes = record.capture.output_observed_bytes,
+ capture_exit_code = ?record.capture.exit_code,
+ history_exit = ?exit,
+ duration = ?duration,
+ author = ?author,
+ "{message}"
+ );
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::OffsetDateTime;
+
+ fn history(id: &str, session: &str, command: &str) -> History {
+ History {
+ id: HistoryId(id.to_string()),
+ timestamp: OffsetDateTime::UNIX_EPOCH,
+ duration: 0,
+ exit: 0,
+ command: command.to_string(),
+ cwd: String::new(),
+ session: session.to_string(),
+ hostname: String::new(),
+ author: String::new(),
+ intent: None,
+ deleted_at: None,
+ }
+ }
+
+ fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture {
+ CommandCapture {
+ prompt: String::new(),
+ command: String::new(),
+ output: output.to_string(),
+ exit_code: None,
+ history_id: history_id.map(str::to_string),
+ session_id: session_id.map(str::to_string),
+ output_truncated: false,
+ output_observed_bytes: output.len() as u64,
+ }
+ }
+
+ fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply {
+ state.command_output(&CommandOutputRequest {
+ history_id: history_id.to_string(),
+ ranges: Vec::new(),
+ })
+ }
+
+ fn output_line(line_number: u64, content: &str) -> OutputLine {
+ OutputLine {
+ line_number,
+ content: content.to_string(),
+ }
+ }
+
+ #[test]
+ fn drops_capture_without_history_id() {
+ let mut state = SemanticState::default();
+
+ assert!(!state.record_capture(capture(None, Some("session-1"), "output")));
+ assert!(!command_output(&mut state, "id-1").found);
+ assert_eq!(state.record_count(), 0);
+ }
+
+ #[test]
+ fn stores_capture_by_session_and_history_id() {
+ let mut state = SemanticState::default();
+
+ assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
+
+ let reply = command_output(&mut state, "id-1");
+ assert!(reply.found);
+ assert_eq!(reply.total_bytes, 6);
+ assert_eq!(reply.output_observed_bytes, 6);
+ assert_eq!(reply.lines, vec![output_line(1, "output")]);
+ }
+
+ #[test]
+ fn uses_pending_history_session_when_capture_session_is_missing() {
+ let mut state = SemanticState::default();
+
+ state.record_history(history("id-1", "session-from-history", "cargo test"));
+ assert!(state.record_capture(capture(Some("id-1"), None, "output")));
+
+ assert!(
+ state
+ .sessions
+ .contains_key(&SessionId("session-from-history".to_string()))
+ );
+ assert!(command_output(&mut state, "id-1").found);
+ }
+
+ #[test]
+ fn associates_history_by_id_after_capture_arrives() {
+ let mut state = SemanticState::default();
+
+ assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
+ state.record_history(history("id-1", "session-1", "different command"));
+
+ let capture_ref = state
+ .history_index
+ .get(&HistoryId("id-1".to_string()))
+ .unwrap();
+ let stored = state
+ .sessions
+ .get(&capture_ref.session_id)
+ .unwrap()
+ .stored_capture(capture_ref.capture_id)
+ .unwrap();
+ assert!(stored.record.history.is_some());
+ }
+
+ #[test]
+ fn evicts_oldest_command_when_session_ring_is_full() {
+ let mut state = SemanticState::default();
+
+ for index in 0..=MAX_COMMANDS_PER_SESSION {
+ assert!(state.record_capture(capture(
+ Some(&format!("id-{index}")),
+ Some("session-1"),
+ "output",
+ )));
+ }
+
+ assert!(!command_output(&mut state, "id-0").found);
+ assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found);
+ assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION);
+ }
+
+ #[test]
+ fn evicts_oldest_session_after_lru_limit() {
+ let mut state = SemanticState::default();
+
+ for index in 0..MAX_SESSIONS {
+ assert!(state.record_capture(capture(
+ Some(&format!("id-{index}")),
+ Some(&format!("session-{index}")),
+ "output",
+ )));
+ }
+ assert!(command_output(&mut state, "id-0").found);
+
+ assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",)));
+
+ assert!(command_output(&mut state, "id-0").found);
+ assert!(!command_output(&mut state, "id-1").found);
+ assert!(command_output(&mut state, "new-id").found);
+ assert_eq!(state.sessions.len(), MAX_SESSIONS);
+ }
+
+ #[test]
+ fn evicts_by_session_byte_limit() {
+ let mut session = SessionCaptures::default();
+ let first_output = "x".repeat(10);
+ let second_output = "y";
+ let (_, evicted_first) = session.push_with_limits(
+ HistoryId("first".to_string()),
+ SemanticCommandRecord {
+ capture: capture(Some("first"), Some("session-1"), &first_output),
+ history: None,
+ },
+ MAX_COMMANDS_PER_SESSION,
+ 10,
+ );
+ assert!(evicted_first.is_empty());
+
+ let (_, evicted_second) = session.push_with_limits(
+ HistoryId("second".to_string()),
+ SemanticCommandRecord {
+ capture: capture(Some("second"), Some("session-1"), second_output),
+ history: None,
+ },
+ MAX_COMMANDS_PER_SESSION,
+ 10,
+ );
+
+ assert_eq!(evicted_second.len(), 1);
+ assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string()));
+ assert_eq!(session.records.len(), 1);
+ assert_eq!(session.output_bytes, 1);
+ }
+
+ #[test]
+ fn command_output_reports_truncation_metadata() {
+ let mut state = SemanticState::default();
+ let mut capture = capture(Some("id-1"), Some("session-1"), "partial");
+ capture.output_truncated = true;
+ capture.output_observed_bytes = 1024;
+
+ assert!(state.record_capture(capture));
+
+ let reply = command_output(&mut state, "id-1");
+ assert!(reply.output_truncated);
+ assert_eq!(reply.total_bytes, 7);
+ assert_eq!(reply.output_observed_bytes, 1024);
+ }
+
+ #[test]
+ fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() {
+ let output = "zero\none\ntwo\nthree\nfour";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 1, end: 2 },
+ crate::atuin_daemon::semantic::OutputRange { start: -2, end: -1 },
+ ];
+
+ assert_eq!(
+ select_output_ranges(output, &ranges),
+ vec![
+ output_line(2, "one"),
+ output_line(3, "two"),
+ output_line(4, "three"),
+ output_line(5, "four"),
+ ]
+ );
+ }
+
+ #[test]
+ fn output_ranges_merge_overlaps_and_adjacent_ranges() {
+ let output = (0..100)
+ .map(|n| format!("line {n}"))
+ .collect::<Vec<_>>()
+ .join("\n");
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 0, end: 100 },
+ crate::atuin_daemon::semantic::OutputRange {
+ start: -100,
+ end: -1,
+ },
+ ];
+
+ let selected = select_output_ranges(&output, &ranges);
+
+ assert_eq!(selected.len(), 100);
+ assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
+ assert_eq!(selected.last(), Some(&output_line(100, "line 99")));
+ }
+
+ #[test]
+ fn output_ranges_can_leave_gaps_for_client_formatting() {
+ let output = "zero\none\ntwo\nthree\nfour";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 0, end: 1 },
+ crate::atuin_daemon::semantic::OutputRange { start: 4, end: 4 },
+ ];
+
+ assert_eq!(
+ select_output_ranges(output, &ranges),
+ vec![
+ output_line(1, "zero"),
+ output_line(2, "one"),
+ output_line(5, "four"),
+ ]
+ );
+ }
+
+ #[test]
+ fn empty_output_ranges_default_to_first_thousand_lines() {
+ let output = (0..1001)
+ .map(|n| format!("line {n}"))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ let selected = select_output_ranges(&output, &[]);
+
+ assert_eq!(selected.len(), 1000);
+ assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
+ assert_eq!(selected.last(), Some(&output_line(1000, "line 999")));
+ }
+
+ #[test]
+ fn output_ranges_skip_ranges_fully_outside_output() {
+ let output = "zero\none\ntwo";
+ let ranges = vec![
+ crate::atuin_daemon::semantic::OutputRange { start: 10, end: 20 },
+ crate::atuin_daemon::semantic::OutputRange {
+ start: -20,
+ end: -10,
+ },
+ ];
+
+ assert_eq!(select_output_ranges(output, &ranges), Vec::new());
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs
new file mode 100644
index 00000000..c76fb71b
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/components/sync.rs
@@ -0,0 +1,279 @@
+//! Sync component.
+//!
+//! Handles periodic synchronization with the Atuin cloud server.
+
+use std::time::Duration;
+
+use eyre::Result;
+use rand::Rng;
+use tokio::sync::mpsc;
+use tokio::time::{self, MissedTickBehavior};
+
+use crate::atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
+
+use crate::atuin_daemon::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+};
+
+/// Commands that can be sent to the sync task.
+enum SyncCommand {
+ /// Trigger an immediate sync.
+ ForceSync,
+ /// Stop the sync loop.
+ Stop,
+}
+
+/// Sync state - tracks whether we're in normal operation or retrying after failure.
+#[derive(Clone, Copy, PartialEq, Eq)]
+enum SyncState {
+ /// Normal operation. Periodic syncs only run if auto_sync is enabled.
+ Idle,
+ /// Retrying after a sync failure. Retries continue regardless of auto_sync
+ /// until the sync succeeds.
+ Retrying,
+}
+
+/// Sync component - handles periodic cloud synchronization.
+///
+/// This component:
+/// - Runs a background sync loop on a configurable interval
+/// - Implements exponential backoff on sync failures
+/// - Responds to ForceSync events for immediate sync
+/// - Emits SyncCompleted/SyncFailed events
+pub struct SyncComponent {
+ task_handle: Option<tokio::task::JoinHandle<()>>,
+ command_tx: Option<mpsc::Sender<SyncCommand>>,
+}
+
+impl SyncComponent {
+ /// Create a new sync component.
+ pub fn new() -> Self {
+ Self {
+ task_handle: None,
+ command_tx: None,
+ }
+ }
+}
+
+impl Default for SyncComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SyncComponent {
+ fn name(&self) -> &'static str {
+ "sync"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ let (cmd_tx, cmd_rx) = mpsc::channel(16);
+ self.command_tx = Some(cmd_tx);
+
+ // Spawn the sync loop with its own copy of the handle
+ self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
+
+ tracing::info!("sync component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::ForceSync = event {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::ForceSync).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::Stop).await;
+ }
+ if let Some(handle) = self.task_handle.take() {
+ // Give the task a moment to shut down gracefully
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ tracing::info!("sync component stopped");
+ Ok(())
+ }
+}
+
+/// The main sync loop.
+///
+/// This runs in a spawned task and handles periodic sync as well as
+/// force sync requests.
+async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
+ tracing::info!("sync loop starting");
+
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+ let host_id = match Settings::host_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("failed to get host id, sync disabled: {e}");
+ return;
+ }
+ };
+
+ // Create the stores we need
+ let encryption_key = *handle.encryption_key();
+ let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
+
+ // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
+ let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop.
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ let mut sync_state = SyncState::Idle;
+
+ loop {
+ tokio::select! {
+ _ = ticker.tick() => {
+ let settings = handle.settings().await;
+
+ // Skip periodic ticks if auto_sync is disabled AND we're not retrying
+ // a previous failure. Retries must continue regardless of auto_sync.
+ if !settings.auto_sync && sync_state == SyncState::Idle {
+ tracing::debug!("auto_sync disabled, skipping periodic sync tick");
+ continue;
+ }
+
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ cmd = cmd_rx.recv() => {
+ match cmd {
+ Some(SyncCommand::ForceSync) => {
+ tracing::info!("executing force sync");
+ let settings = handle.settings().await;
+ sync_state = do_sync_tick(
+ &handle,
+ &history_store,
+ &mut ticker,
+ max_interval,
+ &settings,
+ ).await;
+ }
+ Some(SyncCommand::Stop) | None => {
+ tracing::info!("sync loop stopping");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Execute a single sync tick.
+///
+/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
+async fn do_sync_tick(
+ handle: &DaemonHandle,
+ history_store: &HistoryStore,
+ ticker: &mut time::Interval,
+ max_interval: f64,
+ settings: &Settings,
+) -> SyncState {
+ tracing::info!("sync tick");
+
+ // Check if logged in
+ let logged_in = match settings.logged_in().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!("failed to check login status, skipping sync tick: {e}");
+ return SyncState::Idle;
+ }
+ };
+
+ if !logged_in {
+ tracing::debug!("not logged in, skipping sync tick");
+ return SyncState::Idle;
+ }
+
+ // Perform the sync
+ let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
+
+ match res {
+ Err(e) => {
+ tracing::error!("sync tick failed with {e}");
+
+ // Emit failure event
+ handle.emit(DaemonEvent::SyncFailed {
+ error: e.to_string(),
+ });
+
+ // Exponential backoff
+ let mut rng = rand::thread_rng();
+ let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ if new_interval > max_interval {
+ new_interval = max_interval;
+ }
+
+ *ticker = time::interval_at(
+ tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
+ time::Duration::from_secs(new_interval as u64),
+ );
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+
+ SyncState::Retrying
+ }
+ Ok((uploaded_count, downloaded_records)) => {
+ tracing::info!(
+ uploaded = uploaded_count,
+ downloaded = downloaded_records.len(),
+ "sync complete"
+ );
+
+ // Build history from downloaded records
+ if let Err(e) = history_store
+ .incremental_build(handle.history_db(), &downloaded_records)
+ .await
+ {
+ tracing::error!("failed to build history from downloaded records: {e}");
+ }
+
+ // Emit the records added event (for search indexing)
+ handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
+
+ // Emit sync completed event
+ handle.emit(DaemonEvent::SyncCompleted {
+ uploaded: uploaded_count as usize,
+ downloaded: downloaded_records.len(),
+ });
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ *ticker = time::interval_at(
+ tokio::time::Instant::now()
+ + Duration::from_secs(settings.daemon.sync_frequency),
+ time::Duration::from_secs(settings.daemon.sync_frequency),
+ );
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ }
+
+ // Store sync time
+ if let Err(e) = Settings::save_sync_time().await {
+ tracing::error!("failed to save sync time: {e}");
+ }
+
+ SyncState::Idle
+ }
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/control/mod.rs b/crates/turtle/src/atuin_daemon/control/mod.rs
new file mode 100644
index 00000000..afb29c57
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/control/mod.rs
@@ -0,0 +1,12 @@
+//! Control module for external event injection.
+//!
+//! This module provides the gRPC service that allows external processes
+//! (like CLI commands) to inject events into the daemon's event bus.
+
+mod service;
+
+// Include the generated proto code
+tonic::include_proto!("control");
+
+// Re-export the service
+pub use service::ControlService;
diff --git a/crates/turtle/src/atuin_daemon/control/service.rs b/crates/turtle/src/atuin_daemon/control/service.rs
new file mode 100644
index 00000000..cb2ff74e
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/control/service.rs
@@ -0,0 +1,71 @@
+//! Control service implementation.
+//!
+//! This gRPC service allows external processes (like CLI commands) to inject
+//! events into the daemon's event bus.
+
+use crate::atuin_client::history::HistoryId;
+use tonic::{Request, Response, Status};
+use tracing::{Level, info, instrument};
+
+use super::{
+ SendEventRequest, SendEventResponse,
+ control_server::{Control, ControlServer},
+ send_event_request::Event,
+};
+use crate::atuin_daemon::{daemon::DaemonHandle, events::DaemonEvent};
+
+/// The Control gRPC service.
+///
+/// This service is used by external processes to inject events into the daemon.
+/// It's not a component - it's part of the daemon's core infrastructure.
+pub struct ControlService {
+ handle: DaemonHandle,
+}
+
+impl ControlService {
+ /// Create a new control service with the given daemon handle.
+ pub fn new(handle: DaemonHandle) -> Self {
+ Self { handle }
+ }
+
+ /// Get a tonic server for this service.
+ pub fn into_server(self) -> ControlServer<Self> {
+ ControlServer::new(self)
+ }
+}
+
+#[tonic::async_trait]
+impl Control for ControlService {
+ #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
+ async fn send_event(
+ &self,
+ request: Request<SendEventRequest>,
+ ) -> Result<Response<SendEventResponse>, Status> {
+ let req = request.into_inner();
+
+ let event = req
+ .event
+ .ok_or_else(|| Status::invalid_argument("event is required"))?;
+
+ let daemon_event = proto_event_to_daemon_event(event)?;
+
+ info!(?daemon_event, "received control event");
+ self.handle.emit(daemon_event);
+
+ Ok(Response::new(SendEventResponse {}))
+ }
+}
+
+/// Convert a proto event to a daemon event.
+fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
+ match event {
+ Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
+ Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
+ Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
+ ids: e.ids.into_iter().map(HistoryId).collect(),
+ }),
+ Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
+ Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
+ Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/daemon.rs b/crates/turtle/src/atuin_daemon/daemon.rs
new file mode 100644
index 00000000..77c0d8a5
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/daemon.rs
@@ -0,0 +1,458 @@
+//! Core daemon infrastructure.
+//!
+//! This module provides the foundational types for building the atuin daemon:
+//!
+//! - [`DaemonState`]: Shared state owned by the daemon
+//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
+//! - [`Component`]: A trait for implementing daemon components
+//! - [`Daemon`]: The main daemon orchestrator
+//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
+
+use std::sync::Arc;
+
+use crate::atuin_client::{
+ database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
+ settings::Settings,
+};
+use eyre::{Context, Result};
+use tokio::sync::{RwLock, broadcast};
+
+use crate::atuin_daemon::events::DaemonEvent;
+
+// ============================================================================
+// DaemonState
+// ============================================================================
+
+/// Shared state owned by the daemon.
+///
+/// This contains all the resources that components and services need access to.
+/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
+pub struct DaemonState {
+ // Event bus
+ event_tx: broadcast::Sender<DaemonEvent>,
+
+ // Configuration (mutable - can be reloaded)
+ settings: RwLock<Settings>,
+
+ // Encryption key (immutable - derived at startup)
+ encryption_key: [u8; 32],
+
+ // Database handles
+ history_db: HistoryDatabase,
+ store: SqliteStore,
+}
+
+// ============================================================================
+// DaemonHandle
+// ============================================================================
+
+/// A lightweight handle to the daemon's shared state.
+///
+/// This is the primary way for components, gRPC services, and spawned tasks to
+/// interact with the daemon. It provides access to:
+///
+/// - Event emission and subscription
+/// - Configuration (settings, encryption key)
+/// - Database handles
+///
+/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
+/// around to any code that needs daemon access.
+///
+/// # Example
+///
+/// ```ignore
+/// // Emit an event
+/// handle.emit(DaemonEvent::HistoryPruned);
+///
+/// // Access settings
+/// let settings = handle.settings().await;
+/// let sync_freq = settings.daemon.sync_frequency;
+///
+/// // Access database
+/// let history = handle.history_db().load(id).await?;
+/// ```
+#[derive(Clone)]
+pub struct DaemonHandle {
+ state: Arc<DaemonState>,
+}
+
+impl DaemonHandle {
+ // ---- Events ----
+
+ /// Emit an event to the daemon's event bus.
+ ///
+ /// This is fire-and-forget - if no receivers are listening (which shouldn't
+ /// happen in normal operation), the event is dropped silently.
+ pub fn emit(&self, event: DaemonEvent) {
+ if let Err(e) = self.state.event_tx.send(event) {
+ tracing::warn!("failed to emit event (no receivers?): {e}");
+ }
+ }
+
+ /// Subscribe to the event bus.
+ ///
+ /// Returns a receiver that will receive all events emitted after this call.
+ /// Useful for components that need to listen for events outside of the
+ /// normal `handle_event` callback flow.
+ pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
+ self.state.event_tx.subscribe()
+ }
+
+ /// Request graceful shutdown of the daemon.
+ pub fn shutdown(&self) {
+ self.emit(DaemonEvent::ShutdownRequested);
+ }
+
+ // ---- Configuration ----
+
+ /// Get the current settings.
+ ///
+ /// This acquires a read lock on the settings. For most use cases, clone
+ /// the settings if you need to hold onto them.
+ pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
+ self.state.settings.read().await
+ }
+
+ /// Reload settings from disk and emit a SettingsReloaded event.
+ ///
+ /// Components listening for `SettingsReloaded` can then re-read settings
+ /// via `handle.settings()` to pick up the changes.
+ pub async fn reload_settings(&self) -> Result<()> {
+ let new_settings = Settings::new()?;
+ self.apply_settings(new_settings).await;
+ Ok(())
+ }
+
+ /// Apply already-loaded settings and emit a SettingsReloaded event.
+ ///
+ /// Use this when settings have already been loaded (e.g., from a file watcher)
+ /// to avoid parsing the config file twice.
+ pub async fn apply_settings(&self, settings: Settings) {
+ *self.state.settings.write().await = settings;
+ self.emit(DaemonEvent::SettingsReloaded);
+ tracing::info!("settings applied");
+ }
+
+ /// Get the encryption key.
+ pub fn encryption_key(&self) -> &[u8; 32] {
+ &self.state.encryption_key
+ }
+
+ // ---- Database ----
+
+ /// Get a reference to the history database.
+ pub fn history_db(&self) -> &HistoryDatabase {
+ &self.state.history_db
+ }
+
+ /// Get a reference to the record store.
+ pub fn store(&self) -> &SqliteStore {
+ &self.state.store
+ }
+}
+
+impl std::fmt::Debug for DaemonHandle {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DaemonHandle").finish_non_exhaustive()
+ }
+}
+
+// ============================================================================
+// Component Trait
+// ============================================================================
+
+/// A daemon component that handles a specific domain.
+///
+/// Components are the building blocks of the daemon. Each component:
+///
+/// - Has a unique name for logging and debugging
+/// - Can optionally expose gRPC services
+/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
+/// - Handles events from the event bus
+/// - Performs cleanup on shutdown
+///
+/// # Lifecycle
+///
+/// 1. **Construction**: Component is created (usually via `new()`)
+/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
+/// 3. **Running**: `handle_event()` is called for each event on the bus
+/// 4. **Shutdown**: `stop()` is called for cleanup
+///
+/// # Example
+///
+/// ```ignore
+/// pub struct MyComponent {
+/// handle: Option<DaemonHandle>,
+/// }
+///
+/// #[async_trait]
+/// impl Component for MyComponent {
+/// fn name(&self) -> &'static str { "my-component" }
+///
+/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+/// self.handle = Some(handle);
+/// Ok(())
+/// }
+///
+/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+/// match event {
+/// DaemonEvent::SomeEvent => {
+/// // Handle the event
+/// if let Some(handle) = &self.handle {
+/// handle.emit(DaemonEvent::ResponseEvent);
+/// }
+/// }
+/// _ => {}
+/// }
+/// Ok(())
+/// }
+///
+/// async fn stop(&mut self) -> Result<()> {
+/// Ok(())
+/// }
+/// }
+/// ```
+#[tonic::async_trait]
+pub trait Component: Send + Sync {
+ /// Human-readable name for logging and debugging.
+ fn name(&self) -> &'static str;
+
+ /// Called once at startup.
+ ///
+ /// Store the handle if you need to emit events or access daemon resources
+ /// later. The handle is cheaply cloneable, so feel free to clone it for
+ /// spawned tasks.
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
+
+ /// Handle an incoming event.
+ ///
+ /// Called for every event on the bus. To emit new events in response,
+ /// use the handle stored during `start()`. Events emitted here will be
+ /// processed in subsequent event loop iterations.
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
+
+ /// Called on graceful shutdown.
+ ///
+ /// Use this to clean up resources, abort spawned tasks, etc.
+ async fn stop(&mut self) -> Result<()>;
+}
+
+// ============================================================================
+// Daemon
+// ============================================================================
+
+/// The main daemon orchestrator.
+///
+/// The daemon manages components, runs the event loop, and coordinates startup
+/// and shutdown. It is constructed via [`DaemonBuilder`].
+///
+/// # Event Loop
+///
+/// The daemon runs a simple event loop:
+///
+/// 1. Wait for an event on the bus
+/// 2. Dispatch the event to all components (in registration order)
+/// 3. Components may emit new events in response
+/// 4. Repeat until `ShutdownRequested` is received
+///
+/// Events emitted during handling are queued and processed in subsequent
+/// iterations, ensuring the loop eventually drains.
+pub struct Daemon {
+ components: Vec<Box<dyn Component>>,
+ handle: DaemonHandle,
+}
+
+impl Daemon {
+ /// Create a new daemon builder.
+ pub fn builder(settings: Settings) -> DaemonBuilder {
+ DaemonBuilder::new(settings)
+ }
+
+ /// Get a clone of the daemon handle.
+ ///
+ /// The handle can be used to emit events, access settings, etc.
+ pub fn handle(&self) -> DaemonHandle {
+ self.handle.clone()
+ }
+
+ /// Start all components.
+ ///
+ /// This must be called before `run_event_loop()`. It initializes all
+ /// registered components with the daemon handle.
+ pub async fn start_components(&mut self) -> Result<()> {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "starting component");
+ component
+ .start(self.handle.clone())
+ .await
+ .with_context(|| format!("failed to start component: {}", component.name()))?;
+ }
+ Ok(())
+ }
+
+ /// Run the daemon event loop.
+ ///
+ /// This processes events until a ShutdownRequested event is received.
+ /// Components must be started first via `start_components()`.
+ pub async fn run_event_loop(&mut self) -> Result<()> {
+ let mut event_rx = self.handle.subscribe();
+ loop {
+ match event_rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => {
+ tracing::info!("shutdown requested, stopping daemon");
+ break;
+ }
+ Ok(event) => {
+ tracing::debug!(?event, "processing event");
+ self.dispatch_event(&event).await;
+ }
+ Err(broadcast::error::RecvError::Lagged(n)) => {
+ tracing::warn!(
+ skipped = n,
+ "event receiver lagged, some events were dropped"
+ );
+ }
+ Err(broadcast::error::RecvError::Closed) => {
+ tracing::info!("event bus closed, stopping daemon");
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Stop all components.
+ ///
+ /// This performs graceful shutdown of all components.
+ pub async fn stop_components(&mut self) {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "stopping component");
+ if let Err(e) = component.stop().await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error stopping component"
+ );
+ }
+ }
+ tracing::info!("all components stopped");
+ }
+
+ /// Run the daemon.
+ ///
+ /// This is a convenience method that starts components, runs the event loop,
+ /// and handles shutdown. It does not return until the daemon is shut down.
+ pub async fn run(mut self) -> Result<()> {
+ self.start_components().await?;
+ self.run_event_loop().await?;
+ self.stop_components().await;
+ tracing::info!("daemon stopped");
+ Ok(())
+ }
+
+ async fn dispatch_event(&mut self, event: &DaemonEvent) {
+ for component in &mut self.components {
+ if let Err(e) = component.handle_event(event).await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error handling event"
+ );
+ }
+ }
+ }
+}
+
+// ============================================================================
+// DaemonBuilder
+// ============================================================================
+
+/// Builder for constructing a [`Daemon`].
+///
+/// # Example
+///
+/// ```ignore
+/// let daemon = Daemon::builder(settings)
+/// .store(store)
+/// .history_db(history_db)
+/// .component(HistoryComponent::new())
+/// .component(SearchComponent::new())
+/// .component(SyncComponent::new())
+/// .build()
+/// .await?;
+///
+/// daemon.run().await?;
+/// ```
+pub struct DaemonBuilder {
+ settings: Settings,
+ store: Option<SqliteStore>,
+ history_db: Option<HistoryDatabase>,
+ components: Vec<Box<dyn Component>>,
+}
+
+impl DaemonBuilder {
+ /// Create a new daemon builder with the given settings.
+ pub fn new(settings: Settings) -> Self {
+ Self {
+ settings,
+ store: None,
+ history_db: None,
+ components: Vec::new(),
+ }
+ }
+
+ /// Set the record store.
+ pub fn store(mut self, store: SqliteStore) -> Self {
+ self.store = Some(store);
+ self
+ }
+
+ /// Set the history database.
+ pub fn history_db(mut self, db: HistoryDatabase) -> Self {
+ self.history_db = Some(db);
+ self
+ }
+
+ /// Register a component.
+ ///
+ /// Components are started in registration order and stopped in reverse order.
+ pub fn component(mut self, component: impl Component + 'static) -> Self {
+ self.components.push(Box::new(component));
+ self
+ }
+
+ /// Build the daemon.
+ ///
+ /// This loads the encryption key and creates the daemon state.
+ pub async fn build(self) -> Result<Daemon> {
+ let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
+ let history_db = self
+ .history_db
+ .ok_or_else(|| eyre::eyre!("history_db is required"))?;
+
+ // Load encryption key
+ let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
+ .context("could not load encryption key")?
+ .into();
+
+ // Create the event bus
+ let (event_tx, _) = broadcast::channel(64);
+
+ // Create the shared state
+ let state = Arc::new(DaemonState {
+ event_tx,
+ settings: RwLock::new(self.settings),
+ encryption_key,
+ history_db,
+ store,
+ });
+
+ // Create the handle (just a reference to the state)
+ let handle = DaemonHandle { state };
+
+ Ok(Daemon {
+ components: self.components,
+ handle,
+ })
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/events.rs b/crates/turtle/src/atuin_daemon/events.rs
new file mode 100644
index 00000000..9a398925
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/events.rs
@@ -0,0 +1,74 @@
+//! Daemon events.
+//!
+//! Events are the primary communication mechanism within the daemon.
+//! Components emit events to notify others of state changes, and handle
+//! events to react to changes elsewhere in the system.
+//!
+//! External processes (like CLI commands) can also inject events via the
+//! Control gRPC service.
+
+use crate::atuin_client::history::{History, HistoryId};
+use crate::atuin_common::record::RecordId;
+
+/// Events that flow through the daemon's event bus.
+///
+/// Events are broadcast to all components. Each component decides which
+/// events it cares about in its `handle_event` implementation.
+#[derive(Debug, Clone)]
+pub enum DaemonEvent {
+ // ---- History lifecycle ----
+ /// A command has started running.
+ HistoryStarted(History),
+
+ /// A command has finished running.
+ HistoryEnded(History),
+
+ // ---- Sync ----
+ /// Records were synced from the server.
+ ///
+ /// The search component uses this to update its index with new history.
+ RecordsAdded(Vec<RecordId>),
+
+ /// Sync completed successfully.
+ SyncCompleted {
+ /// Number of records uploaded.
+ uploaded: usize,
+ /// Number of records downloaded.
+ downloaded: usize,
+ },
+
+ /// Sync failed.
+ SyncFailed {
+ /// Error message describing what went wrong.
+ error: String,
+ },
+
+ /// Request an immediate sync (external trigger).
+ ForceSync,
+
+ // ---- External commands ----
+ /// History was pruned - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin history prune` or similar.
+ HistoryPruned,
+
+ /// History was rebuilt - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin store rebuild history` or similar.
+ HistoryRebuilt,
+
+ /// Specific history items were deleted.
+ ///
+ /// The search component should remove these from its index.
+ HistoryDeleted {
+ /// IDs of the deleted history entries.
+ ids: Vec<HistoryId>,
+ },
+
+ /// Settings have changed, components should reload if needed.
+ SettingsReloaded,
+
+ // ---- Lifecycle ----
+ /// Request graceful shutdown of the daemon.
+ ShutdownRequested,
+}
diff --git a/crates/turtle/src/atuin_daemon/history/mod.rs b/crates/turtle/src/atuin_daemon/history/mod.rs
new file mode 100644
index 00000000..b71853df
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/history/mod.rs
@@ -0,0 +1,6 @@
+//! History module for the daemon gRPC history service.
+//!
+//! This module contains the proto-generated types for the history gRPC service.
+
+// Include the generated proto code
+tonic::include_proto!("history");
diff --git a/crates/turtle/src/atuin_daemon/mod.rs b/crates/turtle/src/atuin_daemon/mod.rs
new file mode 100644
index 00000000..b05eb95c
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/mod.rs
@@ -0,0 +1,128 @@
+use crate::atuin_client::database::Sqlite as HistoryDatabase;
+use crate::atuin_client::record::sqlite_store::SqliteStore;
+use crate::atuin_client::settings::{Settings, watcher::global_settings_watcher};
+use eyre::Result;
+
+pub mod client;
+pub mod components;
+pub mod control;
+pub mod daemon;
+pub mod events;
+pub mod history;
+pub mod search;
+pub mod semantic;
+pub mod server;
+
+// Re-export core daemon types for convenience
+pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle};
+pub use events::DaemonEvent;
+
+// Re-export components
+pub use components::{HistoryComponent, SearchComponent, SemanticComponent, SyncComponent};
+
+// Re-export client helpers
+pub use client::{ControlClient, SemanticClient, emit_event, emit_event_with_settings};
+
+/// Boot the daemon using the new component-based architecture.
+///
+/// This creates a daemon with the standard components (history, search, sync),
+/// starts the gRPC server with their services, and runs the event loop.
+pub async fn boot(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: HistoryDatabase,
+) -> Result<()> {
+ // Create the components
+ let history_component = HistoryComponent::new();
+ let search_component = SearchComponent::new();
+ let semantic_component = SemanticComponent::new();
+ let sync_component = SyncComponent::new();
+
+ // Get the gRPC services before moving components into the daemon
+ // (The services share state with the components via Arc)
+ let history_service = history_component.grpc_service();
+ let search_service = search_component.grpc_service();
+ let semantic_service = semantic_component.grpc_service();
+
+ // Build the daemon
+ let mut daemon = Daemon::builder(settings.clone())
+ .store(store)
+ .history_db(history_db)
+ .component(history_component)
+ .component(search_component)
+ .component(semantic_component)
+ .component(sync_component)
+ .build()
+ .await?;
+
+ // Get a handle for the control service and gRPC server shutdown
+ let handle = daemon.handle();
+
+ // Create the control service
+ let control_service = control::ControlService::new(handle.clone());
+
+ // Start all components first (so gRPC services can work)
+ daemon.start_components().await?;
+
+ // Spawn config file watcher to reload settings on changes
+ if let Ok(watcher) = global_settings_watcher() {
+ let mut settings_rx = watcher.subscribe();
+ let watcher_handle = handle.clone();
+ tokio::spawn(async move {
+ tracing::info!("config file watcher started");
+ while settings_rx.changed().await.is_ok() {
+ // Use the already-loaded settings from the watcher
+ // (avoids parsing the config file twice)
+ let new_settings = (*settings_rx.borrow()).clone();
+ watcher_handle.apply_settings((*new_settings).clone()).await;
+ }
+ tracing::debug!("config file watcher stopped");
+ });
+ } else {
+ tracing::warn!(
+ "failed to start config file watcher; settings changes will require daemon restart"
+ );
+ }
+
+ // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM
+ let signal_handle = handle.clone();
+ tokio::spawn(async move {
+ shutdown_signal().await;
+ tracing::info!("received shutdown signal");
+ signal_handle.shutdown();
+ });
+
+ // Start the gRPC server in the background
+ server::run_grpc_server(
+ settings,
+ history_service,
+ search_service,
+ semantic_service,
+ control_service.into_server(),
+ handle,
+ )
+ .await?;
+
+ // Run the daemon event loop
+ daemon.run_event_loop().await?;
+
+ // Stop all components on shutdown
+ daemon.stop_components().await;
+
+ tracing::info!("daemon shut down complete");
+ Ok(())
+}
+
+/// Wait for a shutdown signal (Ctrl+C or SIGTERM).
+#[cfg(unix)]
+async fn shutdown_signal() {
+ let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .expect("failed to register sigterm handler");
+ let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
+ .expect("failed to register sigint handler");
+
+ tokio::select! {
+ _ = term.recv() => {},
+ _ = int.recv() => {},
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/search/index.rs b/crates/turtle/src/atuin_daemon/search/index.rs
new file mode 100644
index 00000000..df627e1b
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/search/index.rs
@@ -0,0 +1,684 @@
+//! Search index with frecency-based ranking.
+//!
+//! This module provides a deduplicated search index where each unique command
+//! is stored once, with metadata about all its invocations. This enables:
+//!
+//! - Efficient fuzzy matching (fewer items to match)
+//! - Frecency-based ranking (frequency + recency)
+//! - Dynamic filtering by directory, host, session, etc.
+
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
+
+use crate::atuin_client::settings::Search;
+use crate::{
+ atuin_client::history::{History, is_known_agent},
+ atuin_daemon::components::search::with_trailing_slash,
+};
+use atuin_nucleo::{Injector, Nucleo, pattern};
+use dashmap::DashMap;
+use lasso::{Spur, ThreadedRodeo};
+use time::OffsetDateTime;
+use tokio::sync::RwLock;
+use tracing::{Level, instrument};
+use uuid::Uuid;
+
+/// Parse a UUID string into a 16-byte array.
+/// Returns None if the string is not a valid UUID.
+fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> {
+ Uuid::parse_str(s).ok().map(|u| *u.as_bytes())
+}
+
+/// Format a 16-byte array as a UUID string.
+fn format_uuid_bytes(bytes: &[u8; 16]) -> String {
+ Uuid::from_bytes(*bytes).to_string()
+}
+
+/// Pre-computed frecency data for O(1) lookup.
+#[derive(Debug, Clone, Default)]
+pub struct FrecencyData {
+ /// Total number of times this command was used.
+ pub count: u32,
+ /// Most recent usage timestamp (unix seconds).
+ pub last_used: i64,
+}
+
+impl FrecencyData {
+ /// Record a new usage of this command.
+ pub fn record_use(&mut self, timestamp: i64) {
+ self.count += 1;
+ if timestamp > self.last_used {
+ self.last_used = timestamp;
+ }
+ }
+
+ /// Compute frecency score based on count and recency.
+ ///
+ /// Uses a decay function where more recent commands score higher.
+ /// The formula balances frequency (how often) with recency (how recent).
+ ///
+ /// Multipliers allow tuning the relative weights:
+ /// - `recency_mul`: Multiplier for recency score (default: 1.0)
+ /// - `frequency_mul`: Multiplier for frequency score (default: 1.0)
+ ///
+ /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight.
+ /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc.
+ #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")]
+ pub fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 {
+ if self.count == 0 {
+ return 0;
+ }
+
+ // Time-based decay: score decreases as time passes
+ let age_seconds = (now - self.last_used).max(0) as u64;
+ let age_hours = age_seconds / 3600;
+
+ // Decay factor: recent commands get higher scores
+ // - Last hour: multiplier ~1.0
+ // - Last day: multiplier ~0.5
+ // - Last week: multiplier ~0.1
+ // - Older: multiplier approaches 0
+ let recency_score: f64 = match age_hours {
+ 0 => 100.0,
+ 1..=6 => 90.0,
+ 7..=24 => 70.0,
+ 25..=72 => 50.0,
+ 73..=168 => 30.0,
+ 169..=720 => 15.0,
+ _ => 5.0,
+ };
+
+ // Frequency boost: more uses = higher score (with diminishing returns)
+ let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0);
+
+ // Apply multipliers and combine scores, then round to u32
+ ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32
+ }
+}
+
+/// Data for a unique command.
+pub struct CommandData {
+ /// History ID of the most recent invocation (16-byte UUID).
+ most_recent_id: [u8; 16],
+ /// Timestamp of the most recent invocation.
+ most_recent_timestamp: i64,
+ /// Pre-computed global frecency.
+ pub global_frecency: FrecencyData,
+
+ // Pre-computed indexes for O(1) filter lookups
+ // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized)
+ /// All directories where this command has been run (interned keys).
+ directories: HashSet<Spur>,
+ /// All hostnames where this command has been run (interned keys).
+ hosts: HashSet<Spur>,
+ /// All sessions where this command has been run (as 16-byte UUIDs).
+ sessions: HashSet<[u8; 16]>,
+}
+
+impl CommandData {
+ /// Create a new CommandData from a history entry.
+ /// Returns None if the history entry has invalid UUIDs.
+ pub fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> {
+ let history_id = parse_uuid_bytes(&history.id.0)?;
+ let session = parse_uuid_bytes(&history.session)?;
+ let timestamp = history.timestamp.unix_timestamp();
+
+ let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
+ let host_key = interner.get_or_intern(&history.hostname);
+
+ let mut directories = HashSet::new();
+ directories.insert(dir_key);
+
+ let mut hosts = HashSet::new();
+ hosts.insert(host_key);
+
+ let mut sessions = HashSet::new();
+ sessions.insert(session);
+
+ let mut global_frecency = FrecencyData::default();
+ global_frecency.record_use(timestamp);
+
+ Some(Self {
+ most_recent_id: history_id,
+ most_recent_timestamp: timestamp,
+ global_frecency,
+ directories,
+ hosts,
+ sessions,
+ })
+ }
+
+ /// Add an invocation from a history entry.
+ /// Returns false if the history entry has invalid UUIDs.
+ pub fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool {
+ let Some(history_id) = parse_uuid_bytes(&history.id.0) else {
+ return false;
+ };
+ let Some(session) = parse_uuid_bytes(&history.session) else {
+ return false;
+ };
+
+ let timestamp = history.timestamp.unix_timestamp();
+
+ // Update global frecency
+ self.global_frecency.record_use(timestamp);
+
+ // Update pre-computed indexes for O(1) filter lookups
+ let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
+ self.directories.insert(dir_key);
+ self.hosts.insert(interner.get_or_intern(&history.hostname));
+ self.sessions.insert(session);
+
+ // Update most recent if this invocation is newer
+ if timestamp > self.most_recent_timestamp {
+ self.most_recent_id = history_id;
+ self.most_recent_timestamp = timestamp;
+ }
+
+ true
+ }
+
+ /// Get the most recent history ID for this command.
+ pub fn most_recent_id(&self) -> String {
+ format_uuid_bytes(&self.most_recent_id)
+ }
+
+ /// Check if any invocation matches a directory filter (exact match).
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool {
+ interner
+ .get(dir)
+ .is_some_and(|spur| self.directories.contains(&spur))
+ }
+
+ /// Check if any invocation matches a directory prefix (workspace/git root).
+ /// O(n) where n = number of unique directories for this command.
+ pub fn has_invocation_in_workspace(&self, prefix: &str, interner: &ThreadedRodeo) -> bool {
+ self.directories
+ .iter()
+ .any(|&spur| interner.resolve(&spur).starts_with(prefix))
+ }
+
+ /// Check if any invocation matches a hostname.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool {
+ interner
+ .get(hostname)
+ .is_some_and(|spur| self.hosts.contains(&spur))
+ }
+
+ /// Check if any invocation matches a session.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_session(&self, session: &str) -> bool {
+ parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes))
+ }
+}
+
+/// Filter mode for search queries.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum IndexFilterMode {
+ /// No filtering - search all commands.
+ Global,
+ /// Filter to commands run in a specific directory.
+ Directory(String),
+ /// Filter to commands run in a workspace (directory prefix).
+ Workspace(String),
+ /// Filter to commands run on a specific host.
+ Host(String),
+ /// Filter to commands run in a specific session.
+ Session(String),
+}
+
+/// Context for search queries.
+#[derive(Debug, Clone, Default)]
+pub struct QueryContext {
+ pub cwd: Option<String>,
+ pub git_root: Option<String>,
+ pub hostname: Option<String>,
+ pub session_id: Option<String>,
+}
+
+/// Shareable frecency map: command -> frecency score.
+/// Wrapped in Arc for zero-copy sharing with scorer callbacks.
+type FrecencyMap = Arc<HashMap<Arc<str>, u32>>;
+
+/// A deduplicated search index with frecency-based ranking.
+///
+/// Commands are stored by their text, with metadata about all invocations.
+/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
+///
+/// Global frecency is precomputed by a background task and used for scoring.
+/// If frecency data is not available, search still works but without frecency ranking;
+/// although this should never happen due to precomputing the frecency map.
+pub struct SearchIndex {
+ /// Map from command text to command data.
+ /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
+ /// Keys are Arc<str> to enable zero-copy sharing with frecency_map.
+ commands: Arc<DashMap<Arc<str>, CommandData>>,
+ /// Nucleo fuzzy matcher - items are command strings.
+ nucleo: RwLock<Nucleo<String>>,
+ /// Injector for adding new commands to Nucleo.
+ injector: Injector<String>,
+ /// Precomputed global frecency map. Updated by background task.
+ frecency_map: RwLock<Option<FrecencyMap>>,
+ /// String interner for deduplicating cwd, hostname, and directory paths.
+ interner: Arc<ThreadedRodeo>,
+}
+
+impl SearchIndex {
+ /// Create a new empty search index.
+ pub fn new() -> Self {
+ let nucleo_config = atuin_nucleo::Config::DEFAULT;
+ // Single column for command text
+ let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
+ let injector = nucleo.injector();
+
+ Self {
+ commands: Arc::new(DashMap::new()),
+ nucleo: RwLock::new(nucleo),
+ injector,
+ frecency_map: RwLock::new(None),
+ interner: Arc::new(ThreadedRodeo::new()),
+ }
+ }
+
+ /// Add a history entry to the index.
+ ///
+ /// If the command already exists, updates its invocation data.
+ /// If it's a new command, adds it to both the map and Nucleo.
+ pub fn add_history(&self, history: &History) {
+ if is_known_agent(&history.author) {
+ return;
+ }
+
+ let command = history.command.as_str();
+
+ // DashMap with Arc<str> keys can be looked up with &str via Borrow trait
+ if let Some(mut entry) = self.commands.get_mut(command) {
+ // Existing command - just update invocations
+ entry.add_invocation(history, &self.interner);
+ } else {
+ // New command - create Arc<str> once and share it
+ let Some(data) = CommandData::new(history, &self.interner) else {
+ return; // Invalid UUIDs, skip this entry
+ };
+ let command_arc: Arc<str> = command.into();
+ self.commands.insert(Arc::clone(&command_arc), data);
+ // Nucleo still needs String (unavoidable copy for fuzzy matching)
+ self.injector.push(command_arc.to_string(), |cmd, cols| {
+ cols[0] = cmd.clone().into();
+ });
+ }
+ // Note: frecency_map is rebuilt by background task, not invalidated here
+ }
+
+ /// Add multiple history entries to the index.
+ pub fn add_histories(&self, histories: &[History]) {
+ for history in histories {
+ self.add_history(history);
+ }
+ }
+
+ /// Get the number of unique commands in the index.
+ pub fn command_count(&self) -> usize {
+ self.commands.len()
+ }
+
+ /// Get the number of items in Nucleo (should match command_count).
+ pub async fn nucleo_item_count(&self) -> u32 {
+ self.nucleo.read().await.snapshot().item_count()
+ }
+
+ /// Search for commands matching a query.
+ ///
+ /// Returns a list of history IDs (most recent invocation per command).
+ /// Uses precomputed global frecency for scoring if available.
+ #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))]
+ pub async fn search(
+ &self,
+ query: &str,
+ filter_mode: IndexFilterMode,
+ _context: &QueryContext,
+ limit: u32,
+ ) -> Vec<String> {
+ let mut nucleo = self.nucleo.write().await;
+
+ // Get precomputed frecency map (may be None if not yet computed)
+ let frecency_map = self.frecency_map.read().await.clone();
+
+ // Build filter based on mode
+ let filter = self.build_filter(&filter_mode);
+ nucleo.set_filter(filter);
+
+ // Build scorer from precomputed frecency (or None if not available)
+ let scorer = Self::build_scorer(frecency_map);
+ nucleo.set_scorer(scorer);
+
+ // Update pattern
+ nucleo.pattern.reparse(
+ 0,
+ query,
+ pattern::CaseMatching::Smart,
+ pattern::Normalization::Smart,
+ false,
+ );
+
+ tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
+ // Tick until complete
+ while nucleo.tick(10).running {}
+ });
+
+ // Collect results
+ let snapshot = nucleo.snapshot();
+ let matched_count = snapshot.matched_item_count().min(limit);
+
+ tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
+ snapshot
+ .matched_items(..matched_count)
+ .filter_map(|item| {
+ let cmd = item.data;
+ // DashMap<Arc<str>, _>::get accepts &str via Borrow trait
+ self.commands
+ .get(cmd.as_str())
+ .map(|data| data.most_recent_id())
+ })
+ .collect()
+ })
+ }
+
+ /// Rebuild the global frecency map.
+ ///
+ /// This should be called by a background task periodically.
+ /// The map is used for scoring search results.
+ ///
+ /// Uses multipliers from search settings:
+ /// - `recency_score_multiplier`: Weight for recency component
+ /// - `frequency_score_multiplier`: Weight for frequency component
+ /// - `frecency_score_multiplier`: Overall multiplier for final score
+ #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")]
+ pub async fn rebuild_frecency(&self, search_settings: &Search) {
+ let now = OffsetDateTime::now_utc().unix_timestamp();
+ let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new();
+
+ // Clamp multipliers to non-negative values to prevent broken frecency ranking
+ // (negative values would produce unexpected results when cast to u32)
+ let recency_mul = search_settings.recency_score_multiplier.max(0.0);
+ let frequency_mul = search_settings.frequency_score_multiplier.max(0.0);
+ let frecency_mul = search_settings.frecency_score_multiplier.max(0.0);
+
+ for entry in self.commands.iter() {
+ let frecency = entry
+ .global_frecency
+ .compute(now, recency_mul, frequency_mul);
+ // Apply overall frecency multiplier and round to u32
+ let frecency = (frecency as f64 * frecency_mul).round() as u32;
+ // Arc::clone is cheap - just increments reference count
+ frecency_map.insert(Arc::clone(entry.key()), frecency);
+ }
+
+ *self.frecency_map.write().await = Some(Arc::new(frecency_map));
+ }
+
+ /// Build filter predicate for the given mode.
+ fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> {
+ // For Global mode, no filter needed
+ if matches!(mode, IndexFilterMode::Global) {
+ return None;
+ }
+
+ // Pre-compute which commands pass the filter
+ // Use HashSet<String> for the short-lived filter (simpler than Arc lookup)
+ let passing_commands: Arc<HashSet<String>> = {
+ let mut set = HashSet::new();
+ for entry in self.commands.iter() {
+ let passes = match mode {
+ IndexFilterMode::Global => unreachable!(),
+ IndexFilterMode::Directory(dir) => {
+ entry.has_invocation_in_dir(dir, &self.interner)
+ }
+ IndexFilterMode::Workspace(prefix) => {
+ entry.has_invocation_in_workspace(prefix, &self.interner)
+ }
+ IndexFilterMode::Host(hostname) => {
+ entry.has_invocation_on_host(hostname, &self.interner)
+ }
+ IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
+ };
+ if passes {
+ // Convert Arc<str> to String for filter lookup
+ set.insert(entry.key().to_string());
+ }
+ }
+ Arc::new(set)
+ };
+
+ Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
+ }
+
+ /// Build scorer from precomputed frecency map.
+ ///
+ /// Returns None if frecency map is not available (search still works, just without frecency ranking).
+ fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> {
+ let map = frecency_map?;
+ Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
+ // HashMap<Arc<str>, _>::get accepts &str via Borrow trait
+ let frecency = map.get(cmd.as_str()).copied().unwrap_or(0);
+ fuzzy_score + frecency
+ }))
+ }
+}
+
+impl Default for SearchIndex {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::macros::datetime;
+
+ fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History {
+ History::import()
+ .timestamp(timestamp)
+ .command(command)
+ .cwd(cwd)
+ .build()
+ .into()
+ }
+
+ #[test]
+ fn frecency_data_compute() {
+ let now = 1_000_000i64;
+
+ // Recent command (with default multipliers of 1.0)
+ let recent = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago
+ };
+ assert!(recent.compute(now, 1.0, 1.0) > 100); // High score
+
+ // Old command
+ let old = FrecencyData {
+ count: 5,
+ last_used: now - 86400 * 30, // 30 days ago
+ };
+ assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0));
+
+ // Frequently used old command
+ let frequent_old = FrecencyData {
+ count: 100,
+ last_used: now - 86400 * 7, // 1 week ago
+ };
+ // Should still have decent score due to frequency
+ assert!(frequent_old.compute(now, 1.0, 1.0) > 50);
+ }
+
+ #[test]
+ fn frecency_data_compute_with_multipliers() {
+ let now = 1_000_000_i64;
+
+ let data = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago (recency_score = 100)
+ };
+
+ // Default multipliers (1.0, 1.0)
+ let default_score = data.compute(now, 1.0, 1.0);
+
+ // Double recency weight
+ let double_recency = data.compute(now, 2.0, 1.0);
+ assert!(double_recency > default_score);
+
+ // Double frequency weight
+ let double_frequency = data.compute(now, 1.0, 2.0);
+ assert!(double_frequency > default_score);
+
+ // Zero out recency (only frequency counts)
+ let no_recency = data.compute(now, 0.0, 1.0);
+ assert!(no_recency < default_score);
+
+ // Zero out frequency (only recency counts)
+ let no_frequency = data.compute(now, 1.0, 0.0);
+ assert!(no_frequency < default_score);
+
+ // Zero both (should be zero)
+ let no_score = data.compute(now, 0.0, 0.0);
+ assert_eq!(no_score, 0);
+
+ // Fractional multipliers
+ let half_recency = data.compute(now, 0.5, 1.0);
+ assert!(half_recency < default_score);
+ assert!(half_recency > no_recency);
+
+ // 1.5x multiplier
+ let boost_recency = data.compute(now, 1.5, 1.0);
+ assert!(boost_recency > default_score);
+ assert!(boost_recency < double_recency);
+ }
+
+ #[test]
+ fn command_data_add_invocation() {
+ let interner = ThreadedRodeo::new();
+
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&history1, &interner).unwrap();
+ assert_eq!(data.global_frecency.count, 1);
+ let id1 = data.most_recent_id();
+
+ data.add_invocation(&history2, &interner);
+ assert_eq!(data.global_frecency.count, 2);
+
+ // Most recent ID should update to history2 (newer timestamp)
+ let id2 = data.most_recent_id();
+ assert_ne!(id1, id2);
+ }
+
+ #[test]
+ fn command_data_filters() {
+ let interner = ThreadedRodeo::new();
+
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&h1, &interner).unwrap();
+ data.add_invocation(&h2, &interner);
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User\\project"),
+ with_trailing_slash("C:\\Users\\User\\other"),
+ with_trailing_slash("C:\\Users\\User\\missing"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user/project"),
+ with_trailing_slash("/home/user/other"),
+ with_trailing_slash("/home/user/missing"),
+ )
+ };
+
+ assert!(data.has_invocation_in_dir(&check1, &interner));
+ assert!(data.has_invocation_in_dir(&check2, &interner));
+ assert!(!data.has_invocation_in_dir(&check3, &interner));
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User"),
+ with_trailing_slash("C:\\Users"),
+ with_trailing_slash("C:\\Users\\User\\var"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user"),
+ with_trailing_slash("/home"),
+ with_trailing_slash("/var"),
+ )
+ };
+
+ assert!(data.has_invocation_in_workspace(&check1, &interner));
+ assert!(data.has_invocation_in_workspace(&check2, &interner));
+ assert!(!data.has_invocation_in_workspace(&check3, &interner));
+ }
+
+ #[tokio::test]
+ async fn search_index_add_and_search() {
+ let index = SearchIndex::new();
+
+ let h1 = make_history(
+ "git status",
+ "/home/user/project",
+ datetime!(2024-01-01 10:00 UTC),
+ );
+ let h2 = make_history(
+ "git commit -m 'test'",
+ "/home/user/project",
+ datetime!(2024-01-01 10:05 UTC),
+ );
+ let h3 = make_history(
+ "ls -la",
+ "/home/user/other",
+ datetime!(2024-01-01 10:10 UTC),
+ );
+
+ index.add_history(&h1);
+ index.add_history(&h2);
+ index.add_history(&h3);
+
+ assert_eq!(index.command_count(), 3);
+
+ // Search for "git" - should match 2 commands
+ let results = index
+ .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
+ .await;
+ assert_eq!(results.len(), 2);
+
+ // Search with directory filter
+ let results = index
+ .search(
+ "",
+ IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
+ &QueryContext::default(),
+ 10,
+ )
+ .await;
+ assert_eq!(results.len(), 2); // git status and git commit
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/search/mod.rs b/crates/turtle/src/atuin_daemon/search/mod.rs
new file mode 100644
index 00000000..4d261956
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/search/mod.rs
@@ -0,0 +1,11 @@
+//! Search module for the daemon gRPC search service.
+//!
+//! This module provides fuzzy search over command history using Nucleo.
+
+mod index;
+
+// Include the generated proto code
+tonic::include_proto!("search");
+
+// Re-export the service and index
+pub use index::{IndexFilterMode, QueryContext, SearchIndex};
diff --git a/crates/turtle/src/atuin_daemon/semantic/mod.rs b/crates/turtle/src/atuin_daemon/semantic/mod.rs
new file mode 100644
index 00000000..c3511676
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/semantic/mod.rs
@@ -0,0 +1,3 @@
+//! Semantic command capture gRPC service types.
+
+tonic::include_proto!("semantic");
diff --git a/crates/turtle/src/atuin_daemon/server.rs b/crates/turtle/src/atuin_daemon/server.rs
new file mode 100644
index 00000000..23b04342
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/server.rs
@@ -0,0 +1,115 @@
+use eyre::Result;
+
+use crate::atuin_daemon::components::history::HistoryGrpcService;
+use crate::atuin_daemon::components::search::SearchGrpcService;
+use crate::atuin_daemon::components::semantic::SemanticGrpcService;
+use crate::atuin_daemon::control::{ControlService, control_server::ControlServer};
+use crate::atuin_daemon::daemon::DaemonHandle;
+use crate::atuin_daemon::history::history_server::HistoryServer;
+use crate::atuin_daemon::search::search_server::SearchServer;
+use crate::atuin_daemon::semantic::semantic_server::SemanticServer;
+
+use crate::atuin_client::settings::Settings;
+
+/// Run the gRPC server with the given services.
+///
+/// This starts the gRPC server in the background and returns immediately.
+/// The server will shut down when a ShutdownRequested event is received.
+#[cfg(unix)]
+pub async fn run_grpc_server(
+ settings: Settings,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ semantic_service: SemanticServer<SemanticGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
+) -> Result<()> {
+ use tokio::net::UnixListener;
+ use tokio_stream::wrappers::UnixListenerStream;
+
+ let socket_path = settings.daemon.socket_path.clone();
+
+ let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
+ #[cfg(target_os = "linux")]
+ {
+ use eyre::{OptionExt, WrapErr};
+ use std::os::unix::net::SocketAddr;
+ use std::path::PathBuf;
+ tracing::info!("getting systemd socket");
+ let listener = listenfd::ListenFd::from_env()
+ .take_unix_listener(0)?
+ .ok_or_eyre("missing systemd socket")?;
+ listener.set_nonblocking(true)?;
+ let actual_path: Result<PathBuf, eyre::Report> = listener
+ .local_addr()
+ .context("getting systemd socket's path")
+ .and_then(|addr: SocketAddr| {
+ addr.as_pathname()
+ .ok_or_eyre("systemd socket missing path")
+ .map(|path: &std::path::Path| path.to_owned())
+ });
+ match actual_path {
+ Ok(actual_path) => {
+ tracing::info!("listening on systemd socket: {actual_path:?}");
+ if actual_path != std::path::Path::new(&socket_path) {
+ tracing::warn!(
+ "systemd socket is not at configured client path: {socket_path:?}"
+ );
+ }
+ }
+ Err(err) => {
+ tracing::warn!(
+ "could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}"
+ );
+ }
+ }
+ (UnixListener::from_std(listener)?, false)
+ }
+ } else {
+ tracing::info!("listening on unix socket {socket_path:?}");
+ (UnixListener::bind(socket_path.clone())?, true)
+ };
+
+ let uds_stream = UnixListenerStream::new(uds);
+
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ let mut rx = handle.subscribe();
+ loop {
+ use crate::atuin_daemon::DaemonEvent;
+
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ if cleanup {
+ eprintln!("Removing socket...");
+ if let Err(e) = std::fs::remove_file(&socket_path)
+ && e.kind() != std::io::ErrorKind::NotFound
+ {
+ eprintln!("failed to remove socket: {e}");
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
+
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ use tonic::transport::Server;
+
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(semantic_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
+
+ Ok(())
+}