diff options
| author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 00:54:30 +0200 |
|---|---|---|
| committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 00:54:30 +0200 |
| commit | 5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch) | |
| tree | c64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/turtle/src/atuin_daemon | |
| parent | chore: Somewhat simplify sync code (diff) | |
| download | atuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip | |
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show
dead code correctly.
Diffstat (limited to 'crates/turtle/src/atuin_daemon')
| -rw-r--r-- | crates/turtle/src/atuin_daemon/client.rs | 418 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/history.rs | 327 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/mod.rs | 25 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/search.rs | 413 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/semantic.rs | 903 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/sync.rs | 279 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/control/mod.rs | 12 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/control/service.rs | 71 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/daemon.rs | 458 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/events.rs | 74 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/history/mod.rs | 6 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/mod.rs | 128 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/search/index.rs | 684 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/search/mod.rs | 11 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/semantic/mod.rs | 3 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/server.rs | 115 |
16 files changed, 3927 insertions, 0 deletions
diff --git a/crates/turtle/src/atuin_daemon/client.rs b/crates/turtle/src/atuin_daemon/client.rs new file mode 100644 index 00000000..45ef19e9 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/client.rs @@ -0,0 +1,418 @@ +use crate::atuin_client::database::Context; +use crate::atuin_client::settings::{FilterMode, Settings}; +use eyre::{Context as EyreContext, Result}; +use tonic::Code; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; + +use hyper_util::rt::TokioIo; + +#[cfg(unix)] +use tokio::net::UnixStream; + +use crate::atuin_client::history::History; +use tracing::{Level, instrument, span}; + +use crate::atuin_daemon::control::HistoryRebuiltEvent; +use crate::atuin_daemon::control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, + SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, +}; +use crate::atuin_daemon::events::DaemonEvent; +use crate::atuin_daemon::history::{ + EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, + StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, + history_client::HistoryClient as HistoryServiceClient, +}; +use crate::atuin_daemon::search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, + search_client::SearchClient as SearchServiceClient, +}; +use crate::atuin_daemon::semantic::{ + CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply, + semantic_client::SemanticClient as SemanticServiceClient, +}; + +pub struct HistoryClient { + client: HistoryServiceClient<Channel>, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum DaemonClientErrorKind { + Connect, + Unavailable, + Unimplemented, + Other, +} + +#[must_use] +pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { + for cause in error.chain() { + if cause.downcast_ref::<tonic::transport::Error>().is_some() { + return DaemonClientErrorKind::Connect; + } + + if let Some(status) = cause.downcast_ref::<tonic::Status>() { + return match status.code() { + Code::Unavailable => DaemonClientErrorKind::Unavailable, + Code::Unimplemented => DaemonClientErrorKind::Unimplemented, + _ => DaemonClientErrorKind::Other, + }; + } + } + + DaemonClientErrorKind::Other +} + +// Wrap the grpc client +impl HistoryClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + use eyre::Context; + + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = HistoryServiceClient::new(channel); + + Ok(HistoryClient { client }) + } + + pub async fn start_history(&mut self, h: History) -> Result<StartHistoryReply> { + let req = StartHistoryRequest { + command: h.command, + cwd: h.cwd, + hostname: h.hostname, + session: h.session, + timestamp: h.timestamp.unix_timestamp_nanos() as u64, + author: h.author, + intent: h.intent.unwrap_or_default(), + }; + + Ok(self.client.start_history(req).await?.into_inner()) + } + + pub async fn end_history( + &mut self, + id: String, + duration: u64, + exit: i64, + ) -> Result<EndHistoryReply> { + let req = EndHistoryRequest { id, duration, exit }; + + Ok(self.client.end_history(req).await?.into_inner()) + } + + pub async fn status(&mut self) -> Result<StatusReply> { + Ok(self.client.status(StatusRequest {}).await?.into_inner()) + } + + pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> { + Ok(self + .client + .tail_history(TailHistoryRequest {}) + .await? + .into_inner()) + } + + pub async fn shutdown(&mut self) -> Result<bool> { + let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner(); + Ok(resp.accepted) + } +} + +pub struct SearchClient { + client: SearchServiceClient<Channel>, +} + +impl SearchClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] + pub async fn search( + &mut self, + query: String, + query_id: u64, + filter_mode: FilterMode, + context: Option<Context>, + ) -> Result<tonic::Streaming<SearchResponse>> { + let request = SearchRequest { + query, + query_id, + filter_mode: RpcFilterMode::from(filter_mode).into(), + context: context.map(RpcSearchContext::from), + }; + let request_stream = tokio_stream::once(request); + let response = span!(Level::TRACE, "daemon_client_search.request") + .in_scope(async || self.client.search(request_stream).await) + .await?; + + Ok(response.into_inner()) + } +} + +impl From<FilterMode> for RpcFilterMode { + fn from(filter_mode: FilterMode) -> Self { + match filter_mode { + FilterMode::Global => RpcFilterMode::Global, + FilterMode::Host => RpcFilterMode::Host, + FilterMode::Session => RpcFilterMode::Session, + FilterMode::Directory => RpcFilterMode::Directory, + FilterMode::Workspace => RpcFilterMode::Workspace, + FilterMode::SessionPreload => RpcFilterMode::SessionPreload, + } + } +} + +impl From<Context> for RpcSearchContext { + fn from(context: Context) -> Self { + RpcSearchContext { + session_id: context.session, + cwd: context.cwd, + hostname: context.hostname, + host_id: context.host_id, + git_root: context + .git_root + .map(|path| path.to_string_lossy().to_string()), + } + } +} + +pub struct SemanticClient { + client: SemanticServiceClient<Channel>, +} + +impl SemanticClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SemanticServiceClient::new(channel); + + Ok(SemanticClient { client }) + } + + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result<Self> { + Self::new(settings.daemon.socket_path.clone()).await + } + + pub async fn record_commands( + &mut self, + captures: Vec<CommandCapture>, + ) -> Result<RecordCommandsReply> { + let stream = tokio_stream::iter(captures); + Ok(self.client.record_commands(stream).await?.into_inner()) + } + + pub async fn command_output( + &mut self, + history_id: String, + ranges: Vec<(i64, i64)>, + ) -> Result<CommandOutputReply> { + let request = CommandOutputRequest { + history_id, + ranges: ranges + .into_iter() + .map(|(start, end)| OutputRange { start, end }) + .collect(), + }; + + Ok(self.client.command_output(request).await?.into_inner()) + } +} + +// ============================================================================ +// Control Client +// ============================================================================ + +/// Client for the Control gRPC service. +/// +/// Used to inject events into a running daemon from external processes. +pub struct ControlClient { + client: ControlServiceClient<Channel>, +} + +impl ControlClient { + /// Connect to the daemon's control service. + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect using settings. + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result<Self> { + Self::new(settings.daemon.socket_path.clone()).await + } + + /// Send an event to the daemon. + pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { + let proto_event = daemon_event_to_proto(event); + let request = SendEventRequest { + event: Some(proto_event), + }; + self.client.send_event(request).await?; + Ok(()) + } +} + +/// Convert a daemon event to its proto representation. +fn daemon_event_to_proto( + event: DaemonEvent, +) -> crate::atuin_daemon::control::send_event_request::Event { + use crate::atuin_daemon::control::send_event_request::Event; + + match event { + DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), + DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), + DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { + ids: ids.into_iter().map(|id| id.0).collect(), + }), + DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), + DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), + DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), + // These events are internal and not sent via the control service + DaemonEvent::HistoryStarted(_) + | DaemonEvent::HistoryEnded(_) + | DaemonEvent::RecordsAdded(_) + | DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } => { + // Use shutdown as a fallback, though this shouldn't happen + tracing::warn!("attempted to send internal event via control service"); + Event::Shutdown(ShutdownEvent {}) + } + } +} + +// ============================================================================ +// Convenience Functions +// ============================================================================ + +/// Emit an event to the daemon. +/// +/// This is a fire-and-forget helper for sending events to the daemon from +/// external processes like CLI commands. If the daemon isn't running, this +/// will silently succeed (returns Ok). +/// +/// # Example +/// +/// ```ignore +/// // After pruning history +/// emit_event(DaemonEvent::HistoryPruned).await?; +/// +/// // After deleting specific history items +/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; +/// +/// // Request immediate sync +/// emit_event(DaemonEvent::ForceSync).await?; +/// ``` +pub async fn emit_event(event: DaemonEvent) -> Result<()> { + emit_event_with_settings(event, None).await +} + +/// Emit an event to the daemon with explicit settings. +/// +/// If settings are not provided, they will be loaded from the default location. +/// If the daemon isn't running, this will silently succeed. +pub async fn emit_event_with_settings( + event: DaemonEvent, + settings: Option<&Settings>, +) -> Result<()> { + // Load settings if not provided + let owned_settings; + let settings = match settings { + Some(s) => s, + None => { + owned_settings = Settings::new()?; + &owned_settings + } + }; + + // Try to connect - if daemon isn't running, that's fine + let mut client = match ControlClient::from_settings(settings).await { + Ok(c) => c, + Err(e) => { + tracing::debug!(?e, "daemon not running, skipping event emission"); + return Ok(()); + } + }; + + // Send the event + if let Err(e) = client.send_event(event).await { + tracing::debug!(?e, "failed to send event to daemon"); + // Don't fail - this is fire-and-forget + } + + Ok(()) +} diff --git a/crates/turtle/src/atuin_daemon/components/history.rs b/crates/turtle/src/atuin_daemon/components/history.rs new file mode 100644 index 00000000..95d34b69 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/history.rs @@ -0,0 +1,327 @@ +//! History component. +//! +//! Handles command history lifecycle (start/end) and provides the History gRPC service. + +use std::{pin::Pin, sync::Arc}; + +use crate::atuin_client::{ + database::Database, + history::{History, HistoryId, store::HistoryStore}, + settings::Settings, +}; +use dashmap::DashMap; +use eyre::Result; +use time::OffsetDateTime; +use tokio_stream::Stream; +use tonic::{Request, Response, Status}; +use tracing::{Level, instrument}; + +use crate::atuin_daemon::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + history::{ + EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, + ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, + TailHistoryReply, TailHistoryRequest, + history_server::{History as HistorySvc, HistoryServer}, + }, +}; + +const DAEMON_PROTOCOL_VERSION: u32 = 1; + +/// History component - manages command history lifecycle. +/// +/// This component: +/// - Tracks currently running commands (stored in memory) +/// - Saves completed commands to the database and record store +/// - Emits history events for other components (e.g., search indexing) +/// - Provides the History gRPC service +pub struct HistoryComponent { + inner: Arc<HistoryComponentInner>, +} + +struct HistoryComponentInner { + /// Commands currently running (not yet completed). + running: DashMap<HistoryId, History>, + + /// Handle to the daemon (set during start). + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + + /// History store for pushing records (set during start). + history_store: tokio::sync::RwLock<Option<HistoryStore>>, +} + +impl HistoryComponent { + /// Create a new history component. + pub fn new() -> Self { + Self { + inner: Arc::new(HistoryComponentInner { + running: DashMap::new(), + handle: tokio::sync::RwLock::new(None), + history_store: tokio::sync::RwLock::new(None), + }), + } + } + + /// Get the gRPC service for this component. + /// + /// This returns a tonic service that can be added to a gRPC server. + pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> { + HistoryServer::new(HistoryGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for HistoryComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for HistoryComponent { + fn name(&self) -> &'static str { + "history" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + // Create the history store + let host_id = Settings::host_id().await?; + let history_store = + HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); + + *self.inner.history_store.write().await = Some(history_store); + *self.inner.handle.write().await = Some(handle); + + tracing::info!("history component started"); + Ok(()) + } + + async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { + // History component produces events but doesn't need to react to them + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + tracing::info!("history component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +/// +/// This is a thin wrapper that delegates to the component's shared state. +pub struct HistoryGrpcService { + inner: Arc<HistoryComponentInner>, +} + +fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { + TailHistoryReply { + kind: kind as i32, + history: Some(HistoryEntry { + timestamp: history.timestamp.unix_timestamp_nanos() as u64, + id: history.id.0, + command: history.command, + cwd: history.cwd, + session: history.session, + hostname: history.hostname, + author: history.author, + intent: history.intent.unwrap_or_default(), + exit: history.exit, + duration: history.duration, + }), + } +} + +#[tonic::async_trait] +impl HistorySvc for HistoryGrpcService { + type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; + + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request<StartHistoryRequest>, + ) -> Result<Response<StartHistoryReply>, Status> { + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .author(req.author) + .intent(req.intent) + .build() + .into(); + + // Emit the event + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.emit(DaemonEvent::HistoryStarted(h.clone())); + } + + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + self.inner.running.insert(id.clone(), h); + + let reply = StartHistoryReply { + id: id.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request<EndHistoryRequest>, + ) -> Result<Response<EndHistoryReply>, Status> { + let req = request.into_inner(); + let id = HistoryId(req.id); + + if let Some((_, mut history)) = self.inner.running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Get the handle and store to save the history + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let store_guard = self.inner.history_store.read().await; + let history_store = store_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + // Save to database + handle + .history_db() + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + // Push to record store + let (record_id, idx) = history_store + .push(history.clone()) + .await + .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; + + // Emit the event + handle.emit(DaemonEvent::HistoryEnded(history)); + + let reply = EndHistoryReply { + id: record_id.0.to_string(), + idx, + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn tail_history( + &self, + _request: Request<TailHistoryRequest>, + ) -> Result<Response<Self::TailHistoryStream>, Status> { + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .cloned() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let mut rx = handle.subscribe(); + let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); + + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + let _ = tx + .send(Err(Status::resource_exhausted(format!( + "tail stream lagged behind and dropped {skipped} events" + )))) + .await; + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + }; + + let reply = match event { + DaemonEvent::HistoryStarted(history) => { + Some(history_to_tail_reply(HistoryEventKind::Started, history)) + } + DaemonEvent::HistoryEnded(history) => { + Some(history_to_tail_reply(HistoryEventKind::Ended, history)) + } + _ => None, + }; + + if let Some(reply) = reply + && tx.send(Ok(reply)).await.is_err() + { + break; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); + Ok(Response::new(Box::pin(stream))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn status( + &self, + _request: Request<StatusRequest>, + ) -> Result<Response<StatusReply>, Status> { + let reply = StatusReply { + healthy: true, + version: env!("CARGO_PKG_VERSION").to_string(), + pid: std::process::id(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn shutdown( + &self, + _request: Request<ShutdownRequest>, + ) -> Result<Response<ShutdownReply>, Status> { + // Use the daemon handle to request shutdown + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.shutdown(); + } + Ok(Response::new(ShutdownReply { accepted: true })) + } +} diff --git a/crates/turtle/src/atuin_daemon/components/mod.rs b/crates/turtle/src/atuin_daemon/components/mod.rs new file mode 100644 index 00000000..447e31df --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/mod.rs @@ -0,0 +1,25 @@ +//! Daemon components. +//! +//! Components are the building blocks of the daemon. Each component handles +//! a specific domain and can: +//! +//! - Expose gRPC services +//! - React to events +//! - Spawn background tasks +//! +//! Available components: +//! +//! - [`history::HistoryComponent`]: Command history lifecycle management +//! - [`search::SearchComponent`]: Fuzzy search over history +//! - [`semantic::SemanticComponent`]: In-memory semantic command captures +//! - [`sync::SyncComponent`]: Cloud sync + +pub mod history; +pub mod search; +pub mod semantic; +pub mod sync; + +pub use history::HistoryComponent; +pub use search::SearchComponent; +pub use semantic::SemanticComponent; +pub use sync::SyncComponent; diff --git a/crates/turtle/src/atuin_daemon/components/search.rs b/crates/turtle/src/atuin_daemon/components/search.rs new file mode 100644 index 00000000..85191cff --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/search.rs @@ -0,0 +1,413 @@ +//! Search component. +//! +//! Provides fuzzy search over command history using the Nucleo search library +//! with frecency-based ranking and dynamic filtering. + +use std::{pin::Pin, sync::Arc}; + +use crate::atuin_client::database::Database; +use eyre::Result; +use tokio::sync::RwLock; +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, debug, info, instrument, span, trace}; +use uuid::Uuid; + +use crate::atuin_daemon::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + search::{ + FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + search_server::{Search as SearchSvc, SearchServer}, + }, +}; + +const PAGE_SIZE: usize = 5000; +const RESULTS_LIMIT: u32 = 200; +/// How often to rebuild the frecency map (in seconds). +const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; + +/// Search component - provides fuzzy search over command history. +/// +/// This component: +/// - Maintains a deduplicated search index with frecency ranking +/// - Loads history from the database on startup +/// - Updates the index when history events occur +/// - Provides the Search gRPC service +pub struct SearchComponent { + index: Arc<RwLock<SearchIndex>>, + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + loader_handle: Option<tokio::task::JoinHandle<()>>, + frecency_handle: Option<tokio::task::JoinHandle<()>>, +} + +impl SearchComponent { + /// Create a new search component. + pub fn new() -> Self { + Self { + index: Arc::new(RwLock::new(SearchIndex::new())), + handle: tokio::sync::RwLock::new(None), + loader_handle: None, + frecency_handle: None, + } + } + + /// Get the gRPC service for this component. + pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> { + SearchServer::new(SearchGrpcService { + index: self.index.clone(), + }) + } + + /// Rebuild the entire search index from the database. + async fn rebuild_index(&self) -> Result<()> { + let handle_guard = self.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("component not initialized"))?; + + info!("Rebuilding search index from database"); + + // Create a new index + let new_index = SearchIndex::new(); + + // Load all history into the new index + let db = handle.history_db().clone(); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + new_index.add_histories(&histories); + } + Ok(None) => break, + Err(e) => { + tracing::error!("Failed to load history during rebuild: {}", e); + break; + } + } + } + + info!( + "Search index rebuild complete; {} unique commands", + new_index.command_count() + ); + + // Replace the old index with the new one + *self.index.write().await = new_index; + Ok(()) + } +} + +impl Default for SearchComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SearchComponent { + fn name(&self) -> &'static str { + "search" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + *self.handle.write().await = Some(handle.clone()); + + // Spawn background task to load history into index + let index = self.index.clone(); + let db = handle.history_db().clone(); + let handle_for_loader = handle.clone(); + + self.loader_handle = Some(tokio::spawn(async move { + info!( + "Loading history into search index; page size = {}", + PAGE_SIZE + ); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + index.read().await.add_histories(&histories); + } + Ok(None) => { + info!( + "Initial history load complete; {} unique commands indexed", + index.read().await.command_count() + ); + // Build initial frecency map with current settings + let settings = handle_for_loader.settings().await; + index.read().await.rebuild_frecency(&settings.search).await; + info!("Initial frecency map built"); + break; + } + Err(e) => { + tracing::error!("Failed to load history: {}", e); + break; + } + } + } + })); + + // Spawn background task to periodically refresh frecency + let index_for_frecency = self.index.clone(); + let handle_for_frecency = handle.clone(); + self.frecency_handle = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + FRECENCY_REFRESH_INTERVAL_SECS, + )); + loop { + interval.tick().await; + trace!("Refreshing frecency map"); + let settings = handle_for_frecency.settings().await; + index_for_frecency + .read() + .await + .rebuild_frecency(&settings.search) + .await; + } + })); + + tracing::info!("search component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::RecordsAdded(records) => { + debug!( + count = records.len(), + "Processing added records for search index" + ); + + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let histories: Vec<_> = handle + .history_db() + .query_history( + format!( + "select * from history where id in ({})", + records + .iter() + .map(|record| record.0.to_string()) + .collect::<Vec<_>>() + .join(",") + ) + .as_str(), + ) + .await + .unwrap_or_default(); + + span!(Level::TRACE, "inject_records", count = histories.len()) + .in_scope(async || { + self.index.read().await.add_histories(&histories); + }) + .await; + } + } + DaemonEvent::HistoryStarted(history) => { + debug!(id = %history.id, command = %history.command, "History started (no index action)"); + } + DaemonEvent::HistoryEnded(history) => { + span!(Level::TRACE, "inject_history_ended") + .in_scope(async || { + self.index.read().await.add_history(history); + }) + .await; + } + DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { + info!("History store pruned or rebuilt, rebuilding search index"); + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::HistoryDeleted { ids } => { + info!( + count = ids.len(), + "History deleted, rebuilding search index" + ); + // For now, just rebuild the entire index. A more efficient implementation + // would remove specific items from the index. + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::SettingsReloaded => { + info!("Settings reloaded, rebuilding frecency map with new multipliers"); + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let settings = handle.settings().await; + self.index + .read() + .await + .rebuild_frecency(&settings.search) + .await; + } + } + // Events we don't care about + DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } + | DaemonEvent::ForceSync + | DaemonEvent::ShutdownRequested => {} + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(handle) = self.loader_handle.take() { + handle.abort(); + } + if let Some(handle) = self.frecency_handle.take() { + handle.abort(); + } + tracing::info!("search component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +pub struct SearchGrpcService { + index: Arc<RwLock<SearchIndex>>, +} + +#[tonic::async_trait] +impl SearchSvc for SearchGrpcService { + type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>; + + #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] + async fn search( + &self, + request: Request<Streaming<SearchRequest>>, + ) -> Result<Response<Self::SearchStream>, Status> { + let mut in_stream = request.into_inner(); + let index = self.index.clone(); + + // Create output channel + let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128); + + // Spawn task to handle incoming requests and send responses + tokio::spawn(async move { + while let Some(req) = in_stream.message().await.transpose() { + match req { + Ok(search_req) => { + let query = search_req.query; + let query_id = search_req.query_id; + let filter_mode: FilterMode = search_req + .filter_mode + .try_into() + .unwrap_or(FilterMode::Global); + let proto_context = search_req.context; + + debug!( + "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", + query, + query_id, + filter_mode.as_str_name(), + proto_context + ); + + // Convert proto FilterMode + context to IndexFilterMode + let index_filter = convert_filter_mode(filter_mode, &proto_context); + + // Build QueryContext from proto context + let query_context = proto_context + .map(|ctx| QueryContext { + cwd: Some(with_trailing_slash(&ctx.cwd)), + git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), + hostname: Some(ctx.hostname), + session_id: Some(ctx.session_id), + }) + .unwrap_or_default(); + + // Perform the search + let history_ids = + span!(Level::TRACE, "daemon_search_query", %query, query_id) + .in_scope(|| async { + let index = index.read().await; + index + .search(&query, index_filter, &query_context, RESULTS_LIMIT) + .await + }) + .await; + + // Convert history IDs to bytes + let ids: Vec<Vec<u8>> = history_ids + .iter() + .filter_map(|id| { + Uuid::parse_str(id) + .ok() + .map(|uuid| uuid.as_bytes().to_vec()) + }) + .collect(); + + if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + // Convert receiver to stream + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream))) + } +} + +/// Convert proto FilterMode and context to IndexFilterMode. +fn convert_filter_mode( + mode: FilterMode, + context: &Option<crate::atuin_daemon::search::SearchContext>, +) -> IndexFilterMode { + match (mode, context) { + (FilterMode::Global, _) => IndexFilterMode::Global, + (FilterMode::Directory, Some(ctx)) => { + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + (FilterMode::Workspace, Some(ctx)) => { + if let Some(ref git_root) = ctx.git_root { + IndexFilterMode::Workspace(with_trailing_slash(git_root)) + } else { + // Fall back to directory if no git root + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + } + (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), + (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), + (FilterMode::SessionPreload, Some(ctx)) => { + // SessionPreload is similar to Session - filter by session + IndexFilterMode::Session(ctx.session_id.clone()) + } + // If no context provided, fall back to global + _ => IndexFilterMode::Global, + } +} + +#[cfg(windows)] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('\\') { + s.to_string() + } else { + format!("{}\\", s) + } +} + +#[cfg(not(windows))] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s.to_string() + } else { + format!("{}/", s) + } +} diff --git a/crates/turtle/src/atuin_daemon/components/semantic.rs b/crates/turtle/src/atuin_daemon/components/semantic.rs new file mode 100644 index 00000000..a42fd5cb --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/semantic.rs @@ -0,0 +1,903 @@ +//! Semantic command capture component. +//! +//! This is a prototype in-memory store for completed command captures emitted +//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes +//! them by history ID for AI tool lookup. + +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Display, Formatter}; +use std::sync::Arc; + +use crate::atuin_client::history::{History, HistoryId}; +use eyre::Result; +use tokio::sync::Mutex; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, instrument}; + +use crate::atuin_daemon::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + semantic::{ + CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply, + semantic_server::{Semantic as SemanticSvc, SemanticServer}, + }, +}; + +const MAX_SESSIONS: usize = 20; +const MAX_COMMANDS_PER_SESSION: usize = 128; +const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024; +const MAX_PENDING_HISTORIES: usize = 128; + +/// Stores completed command captures and associates them with history events. +pub struct SemanticComponent { + inner: Arc<SemanticComponentInner>, +} + +struct SemanticComponentInner { + state: Mutex<SemanticState>, +} + +#[derive(Default)] +struct SemanticState { + sessions: HashMap<SessionId, SessionCaptures>, + session_lru: VecDeque<SessionId>, + history_index: HashMap<HistoryId, CaptureRef>, + pending_histories: VecDeque<History>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct SessionId(String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct CaptureId(u64); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct CaptureRef { + session_id: SessionId, + capture_id: CaptureId, +} + +#[derive(Default)] +struct SessionCaptures { + next_id: u64, + records: VecDeque<StoredCapture>, + output_bytes: usize, +} + +struct StoredCapture { + id: CaptureId, + history_id: HistoryId, + output_bytes: usize, + record: SemanticCommandRecord, +} + +struct EvictedCapture { + history_id: HistoryId, + capture_id: CaptureId, +} + +#[derive(Debug, Clone)] +struct SemanticCommandRecord { + capture: CommandCapture, + history: Option<History>, +} + +impl SemanticComponent { + pub fn new() -> Self { + Self { + inner: Arc::new(SemanticComponentInner { + state: Mutex::new(SemanticState::default()), + }), + } + } + + pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> { + SemanticServer::new(SemanticGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for SemanticComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SemanticComponent { + fn name(&self) -> &'static str { + "semantic" + } + + async fn start(&mut self, _handle: DaemonHandle) -> Result<()> { + tracing::info!("semantic component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::HistoryEnded(history) = event { + self.inner.record_history(history.clone()).await; + } + + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + let state = self.inner.state.lock().await; + tracing::info!( + sessions = state.sessions.len(), + records = state.record_count(), + indexed_histories = state.history_index.len(), + pending_histories = state.pending_histories.len(), + "semantic component stopped" + ); + Ok(()) + } +} + +impl SemanticComponentInner { + async fn record_capture(&self, capture: CommandCapture) -> bool { + let mut state = self.state.lock().await; + state.record_capture(capture) + } + + async fn record_history(&self, history: History) { + let mut state = self.state.lock().await; + state.record_history(history); + } + + async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply { + let mut state = self.state.lock().await; + state.command_output(request) + } +} + +impl SemanticState { + fn record_capture(&mut self, mut capture: CommandCapture) -> bool { + let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else { + tracing::debug!( + command_bytes = capture.command.len(), + prompt_bytes = capture.prompt.len(), + output_bytes = capture.output.len(), + output_truncated = capture.output_truncated, + "dropping semantic command capture without history id" + ); + return false; + }; + + let history = take_pending_history(&mut self.pending_histories, &history_id); + let Some(session_id) = capture + .session_id + .as_deref() + .and_then(|session_id| SessionId::try_from(session_id).ok()) + .or_else(|| { + history + .as_ref() + .and_then(|history| SessionId::try_from(history.session.as_str()).ok()) + }) + else { + tracing::debug!( + history_id = %history_id, + command_bytes = capture.command.len(), + prompt_bytes = capture.prompt.len(), + output_bytes = capture.output.len(), + output_truncated = capture.output_truncated, + "dropping semantic command capture without session id" + ); + return false; + }; + + capture.history_id = Some(history_id.to_string()); + capture.session_id = Some(session_id.to_string()); + if capture.output_observed_bytes == 0 { + capture.output_observed_bytes = capture.output.len() as u64; + } + + let record = SemanticCommandRecord { capture, history }; + log_record(&record, "recorded semantic command capture"); + self.push_record(session_id, history_id, record); + true + } + + fn record_history(&mut self, history: History) { + let history_id = history.id.clone(); + + if let Some(capture_ref) = self.history_index.get(&history_id).cloned() { + if let Some(stored) = self.stored_capture_mut(&capture_ref) { + stored.record.history = Some(history); + log_record( + &stored.record, + "associated semantic command capture with history", + ); + return; + } + + self.history_index.remove(&history_id); + } + + tracing::debug!( + id = %history.id, + command_bytes = history.command.len(), + "history ended before semantic capture arrived" + ); + push_pending_history(&mut self.pending_histories, history); + } + + fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply { + let Some(history_id) = history_id_from_str(Some(&request.history_id)) else { + return command_output_not_found(); + }; + let Some(capture_ref) = self.history_index.get(&history_id).cloned() else { + return command_output_not_found(); + }; + + let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else { + self.history_index.remove(&history_id); + return command_output_not_found(); + }; + + self.touch_session(&capture_ref.session_id); + reply + } + + fn command_output_for_ref( + &self, + capture_ref: &CaptureRef, + ranges: &[crate::atuin_daemon::semantic::OutputRange], + ) -> Option<CommandOutputReply> { + let stored = self + .sessions + .get(&capture_ref.session_id)? + .stored_capture(capture_ref.capture_id)?; + let output = &stored.record.capture.output; + let output_observed_bytes = stored + .record + .capture + .output_observed_bytes + .max(output.len() as u64); + + Some(CommandOutputReply { + found: true, + output: String::new(), + total_bytes: output.len() as u64, + total_lines: output.lines().count() as u64, + lines: select_output_ranges(output, ranges), + output_truncated: stored.record.capture.output_truncated, + output_observed_bytes, + }) + } + + fn push_record( + &mut self, + session_id: SessionId, + history_id: HistoryId, + record: SemanticCommandRecord, + ) { + self.touch_session(&session_id); + + let (capture_id, evicted) = { + let session = self.sessions.entry(session_id.clone()).or_default(); + session.push(history_id.clone(), record) + }; + + let capture_ref = CaptureRef { + session_id: session_id.clone(), + capture_id, + }; + self.history_index.insert(history_id, capture_ref); + + for evicted in evicted { + self.remove_history_index_if_matches( + &session_id, + &evicted.history_id, + evicted.capture_id, + ); + } + + self.expire_lru_sessions(); + } + + fn touch_session(&mut self, session_id: &SessionId) { + if let Some(index) = self.session_lru.iter().position(|id| id == session_id) { + self.session_lru.remove(index); + } + self.session_lru.push_back(session_id.clone()); + } + + fn expire_lru_sessions(&mut self) { + while self.session_lru.len() > MAX_SESSIONS { + let Some(session_id) = self.session_lru.pop_front() else { + break; + }; + let Some(session) = self.sessions.remove(&session_id) else { + continue; + }; + + for stored in session.records { + self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id); + } + } + } + + fn remove_history_index_if_matches( + &mut self, + session_id: &SessionId, + history_id: &HistoryId, + capture_id: CaptureId, + ) { + if self + .history_index + .get(history_id) + .is_some_and(|capture_ref| { + &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id + }) + { + self.history_index.remove(history_id); + } + } + + fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> { + self.sessions + .get_mut(&capture_ref.session_id)? + .stored_capture_mut(capture_ref.capture_id) + } + + fn record_count(&self) -> usize { + self.sessions + .values() + .map(|session| session.records.len()) + .sum() + } +} + +impl SessionCaptures { + fn push( + &mut self, + history_id: HistoryId, + record: SemanticCommandRecord, + ) -> (CaptureId, Vec<EvictedCapture>) { + self.push_with_limits( + history_id, + record, + MAX_COMMANDS_PER_SESSION, + MAX_BYTES_PER_SESSION, + ) + } + + fn push_with_limits( + &mut self, + history_id: HistoryId, + record: SemanticCommandRecord, + max_commands: usize, + max_output_bytes: usize, + ) -> (CaptureId, Vec<EvictedCapture>) { + let capture_id = CaptureId(self.next_id); + self.next_id = self.next_id.saturating_add(1); + let output_bytes = record.capture.output.len(); + self.output_bytes = self.output_bytes.saturating_add(output_bytes); + self.records.push_back(StoredCapture { + id: capture_id, + history_id, + output_bytes, + record, + }); + + ( + capture_id, + self.evict_to_limits(max_commands, max_output_bytes), + ) + } + + fn evict_to_limits( + &mut self, + max_commands: usize, + max_output_bytes: usize, + ) -> Vec<EvictedCapture> { + let mut evicted = Vec::new(); + while self.records.len() > max_commands || self.output_bytes > max_output_bytes { + let Some(record) = self.records.pop_front() else { + break; + }; + self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes); + evicted.push(EvictedCapture { + history_id: record.history_id, + capture_id: record.id, + }); + } + evicted + } + + fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> { + self.records.iter().find(|record| record.id == capture_id) + } + + fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> { + self.records + .iter_mut() + .find(|record| record.id == capture_id) + } +} + +impl TryFrom<&str> for SessionId { + type Error = (); + + fn try_from(value: &str) -> std::result::Result<Self, Self::Error> { + let value = value.trim(); + if value.is_empty() { + return Err(()); + } + + Ok(Self(value.to_string())) + } +} + +impl TryFrom<String> for SessionId { + type Error = (); + + fn try_from(value: String) -> std::result::Result<Self, Self::Error> { + Self::try_from(value.as_str()) + } +} + +impl AsRef<str> for SessionId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Display for SessionId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +pub struct SemanticGrpcService { + inner: Arc<SemanticComponentInner>, +} + +#[tonic::async_trait] +impl SemanticSvc for SemanticGrpcService { + #[instrument(skip_all, level = Level::INFO)] + async fn record_commands( + &self, + request: Request<Streaming<CommandCapture>>, + ) -> Result<Response<RecordCommandsReply>, Status> { + let mut stream = request.into_inner(); + let mut accepted = 0_u64; + + while let Some(capture) = stream.message().await? { + if self.inner.record_capture(capture).await { + accepted += 1; + } + } + + Ok(Response::new(RecordCommandsReply { accepted })) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn command_output( + &self, + request: Request<CommandOutputRequest>, + ) -> Result<Response<CommandOutputReply>, Status> { + let request = request.into_inner(); + if request.history_id.trim().is_empty() { + return Err(Status::invalid_argument("history_id is required")); + } + + Ok(Response::new(self.inner.command_output(&request).await)) + } +} + +fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> { + let value = value?.trim(); + (!value.is_empty()).then(|| HistoryId(value.to_string())) +} + +fn take_pending_history( + histories: &mut VecDeque<History>, + history_id: &HistoryId, +) -> Option<History> { + let index = histories + .iter() + .position(|history| &history.id == history_id)?; + histories.remove(index) +} + +fn push_pending_history(histories: &mut VecDeque<History>, history: History) { + if let Some(index) = histories + .iter() + .position(|pending| pending.id == history.id) + { + histories.remove(index); + } + + histories.push_back(history); + trim_front(histories, MAX_PENDING_HISTORIES); +} + +fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) { + while records.len() > max_len { + records.pop_front(); + } +} + +fn command_output_not_found() -> CommandOutputReply { + CommandOutputReply { + found: false, + output: String::new(), + total_bytes: 0, + total_lines: 0, + lines: Vec::new(), + output_truncated: false, + output_observed_bytes: 0, + } +} + +fn select_output_ranges( + output: &str, + ranges: &[crate::atuin_daemon::semantic::OutputRange], +) -> Vec<OutputLine> { + let lines: Vec<&str> = output.lines().collect(); + if lines.is_empty() { + return Vec::new(); + } + + let ranges = if ranges.is_empty() { + vec![crate::atuin_daemon::semantic::OutputRange { start: 0, end: 999 }] + } else { + ranges.to_vec() + }; + + let mut ranges = ranges + .into_iter() + .filter_map(|range| normalize_line_range(range.start, range.end, lines.len())) + .collect::<Vec<_>>(); + ranges.sort_unstable_by_key(|(start, _)| *start); + + let mut merged: Vec<(usize, usize)> = Vec::new(); + for (start, end) in ranges { + match merged.last_mut() { + Some((_, merged_end)) if start <= merged_end.saturating_add(1) => { + *merged_end = (*merged_end).max(end); + } + _ => merged.push((start, end)), + } + } + + merged + .into_iter() + .flat_map(|(start, end)| { + lines[start..=end] + .iter() + .enumerate() + .map(move |(offset, line)| OutputLine { + line_number: (start + offset + 1) as u64, + content: (*line).to_string(), + }) + }) + .collect() +} + +fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> { + let line_count = i64::try_from(line_count).ok()?; + let start = if start < 0 { line_count + start } else { start }; + let end = if end < 0 { line_count + end } else { end }; + + if end < 0 || start >= line_count { + return None; + } + + let start = start.max(0); + let end = end.min(line_count - 1); + + (start <= end).then_some((start as usize, end as usize)) +} + +fn log_record(record: &SemanticCommandRecord, message: &'static str) { + let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>"); + let associated_history_id = record + .history + .as_ref() + .map(|history| history.id.to_string()); + let exit = record.history.as_ref().map(|history| history.exit); + let duration = record.history.as_ref().map(|history| history.duration); + let author = record + .history + .as_ref() + .map(|history| history.author.as_str()); + let session_id = record.capture.session_id.as_deref(); + + tracing::debug!( + history_id = %history_id, + associated_history_id = ?associated_history_id, + session_id = ?session_id, + command_bytes = record.capture.command.len(), + prompt_bytes = record.capture.prompt.len(), + output_bytes = record.capture.output.len(), + output_truncated = record.capture.output_truncated, + output_observed_bytes = record.capture.output_observed_bytes, + capture_exit_code = ?record.capture.exit_code, + history_exit = ?exit, + duration = ?duration, + author = ?author, + "{message}" + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use time::OffsetDateTime; + + fn history(id: &str, session: &str, command: &str) -> History { + History { + id: HistoryId(id.to_string()), + timestamp: OffsetDateTime::UNIX_EPOCH, + duration: 0, + exit: 0, + command: command.to_string(), + cwd: String::new(), + session: session.to_string(), + hostname: String::new(), + author: String::new(), + intent: None, + deleted_at: None, + } + } + + fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture { + CommandCapture { + prompt: String::new(), + command: String::new(), + output: output.to_string(), + exit_code: None, + history_id: history_id.map(str::to_string), + session_id: session_id.map(str::to_string), + output_truncated: false, + output_observed_bytes: output.len() as u64, + } + } + + fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply { + state.command_output(&CommandOutputRequest { + history_id: history_id.to_string(), + ranges: Vec::new(), + }) + } + + fn output_line(line_number: u64, content: &str) -> OutputLine { + OutputLine { + line_number, + content: content.to_string(), + } + } + + #[test] + fn drops_capture_without_history_id() { + let mut state = SemanticState::default(); + + assert!(!state.record_capture(capture(None, Some("session-1"), "output"))); + assert!(!command_output(&mut state, "id-1").found); + assert_eq!(state.record_count(), 0); + } + + #[test] + fn stores_capture_by_session_and_history_id() { + let mut state = SemanticState::default(); + + assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); + + let reply = command_output(&mut state, "id-1"); + assert!(reply.found); + assert_eq!(reply.total_bytes, 6); + assert_eq!(reply.output_observed_bytes, 6); + assert_eq!(reply.lines, vec![output_line(1, "output")]); + } + + #[test] + fn uses_pending_history_session_when_capture_session_is_missing() { + let mut state = SemanticState::default(); + + state.record_history(history("id-1", "session-from-history", "cargo test")); + assert!(state.record_capture(capture(Some("id-1"), None, "output"))); + + assert!( + state + .sessions + .contains_key(&SessionId("session-from-history".to_string())) + ); + assert!(command_output(&mut state, "id-1").found); + } + + #[test] + fn associates_history_by_id_after_capture_arrives() { + let mut state = SemanticState::default(); + + assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); + state.record_history(history("id-1", "session-1", "different command")); + + let capture_ref = state + .history_index + .get(&HistoryId("id-1".to_string())) + .unwrap(); + let stored = state + .sessions + .get(&capture_ref.session_id) + .unwrap() + .stored_capture(capture_ref.capture_id) + .unwrap(); + assert!(stored.record.history.is_some()); + } + + #[test] + fn evicts_oldest_command_when_session_ring_is_full() { + let mut state = SemanticState::default(); + + for index in 0..=MAX_COMMANDS_PER_SESSION { + assert!(state.record_capture(capture( + Some(&format!("id-{index}")), + Some("session-1"), + "output", + ))); + } + + assert!(!command_output(&mut state, "id-0").found); + assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found); + assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION); + } + + #[test] + fn evicts_oldest_session_after_lru_limit() { + let mut state = SemanticState::default(); + + for index in 0..MAX_SESSIONS { + assert!(state.record_capture(capture( + Some(&format!("id-{index}")), + Some(&format!("session-{index}")), + "output", + ))); + } + assert!(command_output(&mut state, "id-0").found); + + assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",))); + + assert!(command_output(&mut state, "id-0").found); + assert!(!command_output(&mut state, "id-1").found); + assert!(command_output(&mut state, "new-id").found); + assert_eq!(state.sessions.len(), MAX_SESSIONS); + } + + #[test] + fn evicts_by_session_byte_limit() { + let mut session = SessionCaptures::default(); + let first_output = "x".repeat(10); + let second_output = "y"; + let (_, evicted_first) = session.push_with_limits( + HistoryId("first".to_string()), + SemanticCommandRecord { + capture: capture(Some("first"), Some("session-1"), &first_output), + history: None, + }, + MAX_COMMANDS_PER_SESSION, + 10, + ); + assert!(evicted_first.is_empty()); + + let (_, evicted_second) = session.push_with_limits( + HistoryId("second".to_string()), + SemanticCommandRecord { + capture: capture(Some("second"), Some("session-1"), second_output), + history: None, + }, + MAX_COMMANDS_PER_SESSION, + 10, + ); + + assert_eq!(evicted_second.len(), 1); + assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string())); + assert_eq!(session.records.len(), 1); + assert_eq!(session.output_bytes, 1); + } + + #[test] + fn command_output_reports_truncation_metadata() { + let mut state = SemanticState::default(); + let mut capture = capture(Some("id-1"), Some("session-1"), "partial"); + capture.output_truncated = true; + capture.output_observed_bytes = 1024; + + assert!(state.record_capture(capture)); + + let reply = command_output(&mut state, "id-1"); + assert!(reply.output_truncated); + assert_eq!(reply.total_bytes, 7); + assert_eq!(reply.output_observed_bytes, 1024); + } + + #[test] + fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() { + let output = "zero\none\ntwo\nthree\nfour"; + let ranges = vec![ + crate::atuin_daemon::semantic::OutputRange { start: 1, end: 2 }, + crate::atuin_daemon::semantic::OutputRange { start: -2, end: -1 }, + ]; + + assert_eq!( + select_output_ranges(output, &ranges), + vec![ + output_line(2, "one"), + output_line(3, "two"), + output_line(4, "three"), + output_line(5, "four"), + ] + ); + } + + #[test] + fn output_ranges_merge_overlaps_and_adjacent_ranges() { + let output = (0..100) + .map(|n| format!("line {n}")) + .collect::<Vec<_>>() + .join("\n"); + let ranges = vec![ + crate::atuin_daemon::semantic::OutputRange { start: 0, end: 100 }, + crate::atuin_daemon::semantic::OutputRange { + start: -100, + end: -1, + }, + ]; + + let selected = select_output_ranges(&output, &ranges); + + assert_eq!(selected.len(), 100); + assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); + assert_eq!(selected.last(), Some(&output_line(100, "line 99"))); + } + + #[test] + fn output_ranges_can_leave_gaps_for_client_formatting() { + let output = "zero\none\ntwo\nthree\nfour"; + let ranges = vec![ + crate::atuin_daemon::semantic::OutputRange { start: 0, end: 1 }, + crate::atuin_daemon::semantic::OutputRange { start: 4, end: 4 }, + ]; + + assert_eq!( + select_output_ranges(output, &ranges), + vec![ + output_line(1, "zero"), + output_line(2, "one"), + output_line(5, "four"), + ] + ); + } + + #[test] + fn empty_output_ranges_default_to_first_thousand_lines() { + let output = (0..1001) + .map(|n| format!("line {n}")) + .collect::<Vec<_>>() + .join("\n"); + + let selected = select_output_ranges(&output, &[]); + + assert_eq!(selected.len(), 1000); + assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); + assert_eq!(selected.last(), Some(&output_line(1000, "line 999"))); + } + + #[test] + fn output_ranges_skip_ranges_fully_outside_output() { + let output = "zero\none\ntwo"; + let ranges = vec![ + crate::atuin_daemon::semantic::OutputRange { start: 10, end: 20 }, + crate::atuin_daemon::semantic::OutputRange { + start: -20, + end: -10, + }, + ]; + + assert_eq!(select_output_ranges(output, &ranges), Vec::new()); + } +} diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs new file mode 100644 index 00000000..c76fb71b --- /dev/null +++ b/crates/turtle/src/atuin_daemon/components/sync.rs @@ -0,0 +1,279 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use std::time::Duration; + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use crate::atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; + +use crate::atuin_daemon::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync state - tracks whether we're in normal operation or retrying after failure. +#[derive(Clone, Copy, PartialEq, Eq)] +enum SyncState { + /// Normal operation. Periodic syncs only run if auto_sync is enabled. + Idle, + /// Retrying after a sync failure. Retries continue regardless of auto_sync + /// until the sync succeeds. + Retrying, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option<tokio::task::JoinHandle<()>>, + command_tx: Option<mpsc::Sender<SyncCommand>>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut sync_state = SyncState::Idle; + + loop { + tokio::select! { + _ = ticker.tick() => { + let settings = handle.settings().await; + + // Skip periodic ticks if auto_sync is disabled AND we're not retrying + // a previous failure. Retries must continue regardless of auto_sync. + if !settings.auto_sync && sync_state == SyncState::Idle { + tracing::debug!("auto_sync disabled, skipping periodic sync tick"); + continue; + } + + sync_state = do_sync_tick( + &handle, + &history_store, + &mut ticker, + max_interval, + &settings, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + let settings = handle.settings().await; + sync_state = do_sync_tick( + &handle, + &history_store, + &mut ticker, + max_interval, + &settings, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +/// +/// Returns the new sync state: `Idle` on success, `Retrying` on failure. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + ticker: &mut time::Interval, + max_interval: f64, + settings: &Settings, +) -> SyncState { + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return SyncState::Idle; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return SyncState::Idle; + } + + // Perform the sync + let res = sync::sync(settings, handle.store(), handle.encryption_key()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval_at( + tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), + time::Duration::from_secs(new_interval as u64), + ); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + tracing::error!("backing off, next sync tick in {new_interval}"); + + SyncState::Retrying + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval_at( + tokio::time::Instant::now() + + Duration::from_secs(settings.daemon.sync_frequency), + time::Duration::from_secs(settings.daemon.sync_frequency), + ); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + + SyncState::Idle + } + } +} diff --git a/crates/turtle/src/atuin_daemon/control/mod.rs b/crates/turtle/src/atuin_daemon/control/mod.rs new file mode 100644 index 00000000..afb29c57 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/control/mod.rs @@ -0,0 +1,12 @@ +//! Control module for external event injection. +//! +//! This module provides the gRPC service that allows external processes +//! (like CLI commands) to inject events into the daemon's event bus. + +mod service; + +// Include the generated proto code +tonic::include_proto!("control"); + +// Re-export the service +pub use service::ControlService; diff --git a/crates/turtle/src/atuin_daemon/control/service.rs b/crates/turtle/src/atuin_daemon/control/service.rs new file mode 100644 index 00000000..cb2ff74e --- /dev/null +++ b/crates/turtle/src/atuin_daemon/control/service.rs @@ -0,0 +1,71 @@ +//! Control service implementation. +//! +//! This gRPC service allows external processes (like CLI commands) to inject +//! events into the daemon's event bus. + +use crate::atuin_client::history::HistoryId; +use tonic::{Request, Response, Status}; +use tracing::{Level, info, instrument}; + +use super::{ + SendEventRequest, SendEventResponse, + control_server::{Control, ControlServer}, + send_event_request::Event, +}; +use crate::atuin_daemon::{daemon::DaemonHandle, events::DaemonEvent}; + +/// The Control gRPC service. +/// +/// This service is used by external processes to inject events into the daemon. +/// It's not a component - it's part of the daemon's core infrastructure. +pub struct ControlService { + handle: DaemonHandle, +} + +impl ControlService { + /// Create a new control service with the given daemon handle. + pub fn new(handle: DaemonHandle) -> Self { + Self { handle } + } + + /// Get a tonic server for this service. + pub fn into_server(self) -> ControlServer<Self> { + ControlServer::new(self) + } +} + +#[tonic::async_trait] +impl Control for ControlService { + #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] + async fn send_event( + &self, + request: Request<SendEventRequest>, + ) -> Result<Response<SendEventResponse>, Status> { + let req = request.into_inner(); + + let event = req + .event + .ok_or_else(|| Status::invalid_argument("event is required"))?; + + let daemon_event = proto_event_to_daemon_event(event)?; + + info!(?daemon_event, "received control event"); + self.handle.emit(daemon_event); + + Ok(Response::new(SendEventResponse {})) + } +} + +/// Convert a proto event to a daemon event. +fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> { + match event { + Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), + Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), + Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { + ids: e.ids.into_iter().map(HistoryId).collect(), + }), + Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), + Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), + Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), + } +} diff --git a/crates/turtle/src/atuin_daemon/daemon.rs b/crates/turtle/src/atuin_daemon/daemon.rs new file mode 100644 index 00000000..77c0d8a5 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/daemon.rs @@ -0,0 +1,458 @@ +//! Core daemon infrastructure. +//! +//! This module provides the foundational types for building the atuin daemon: +//! +//! - [`DaemonState`]: Shared state owned by the daemon +//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state +//! - [`Component`]: A trait for implementing daemon components +//! - [`Daemon`]: The main daemon orchestrator +//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon + +use std::sync::Arc; + +use crate::atuin_client::{ + database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore, + settings::Settings, +}; +use eyre::{Context, Result}; +use tokio::sync::{RwLock, broadcast}; + +use crate::atuin_daemon::events::DaemonEvent; + +// ============================================================================ +// DaemonState +// ============================================================================ + +/// Shared state owned by the daemon. +/// +/// This contains all the resources that components and services need access to. +/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`]. +pub struct DaemonState { + // Event bus + event_tx: broadcast::Sender<DaemonEvent>, + + // Configuration (mutable - can be reloaded) + settings: RwLock<Settings>, + + // Encryption key (immutable - derived at startup) + encryption_key: [u8; 32], + + // Database handles + history_db: HistoryDatabase, + store: SqliteStore, +} + +// ============================================================================ +// DaemonHandle +// ============================================================================ + +/// A lightweight handle to the daemon's shared state. +/// +/// This is the primary way for components, gRPC services, and spawned tasks to +/// interact with the daemon. It provides access to: +/// +/// - Event emission and subscription +/// - Configuration (settings, encryption key) +/// - Database handles +/// +/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed +/// around to any code that needs daemon access. +/// +/// # Example +/// +/// ```ignore +/// // Emit an event +/// handle.emit(DaemonEvent::HistoryPruned); +/// +/// // Access settings +/// let settings = handle.settings().await; +/// let sync_freq = settings.daemon.sync_frequency; +/// +/// // Access database +/// let history = handle.history_db().load(id).await?; +/// ``` +#[derive(Clone)] +pub struct DaemonHandle { + state: Arc<DaemonState>, +} + +impl DaemonHandle { + // ---- Events ---- + + /// Emit an event to the daemon's event bus. + /// + /// This is fire-and-forget - if no receivers are listening (which shouldn't + /// happen in normal operation), the event is dropped silently. + pub fn emit(&self, event: DaemonEvent) { + if let Err(e) = self.state.event_tx.send(event) { + tracing::warn!("failed to emit event (no receivers?): {e}"); + } + } + + /// Subscribe to the event bus. + /// + /// Returns a receiver that will receive all events emitted after this call. + /// Useful for components that need to listen for events outside of the + /// normal `handle_event` callback flow. + pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> { + self.state.event_tx.subscribe() + } + + /// Request graceful shutdown of the daemon. + pub fn shutdown(&self) { + self.emit(DaemonEvent::ShutdownRequested); + } + + // ---- Configuration ---- + + /// Get the current settings. + /// + /// This acquires a read lock on the settings. For most use cases, clone + /// the settings if you need to hold onto them. + pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> { + self.state.settings.read().await + } + + /// Reload settings from disk and emit a SettingsReloaded event. + /// + /// Components listening for `SettingsReloaded` can then re-read settings + /// via `handle.settings()` to pick up the changes. + pub async fn reload_settings(&self) -> Result<()> { + let new_settings = Settings::new()?; + self.apply_settings(new_settings).await; + Ok(()) + } + + /// Apply already-loaded settings and emit a SettingsReloaded event. + /// + /// Use this when settings have already been loaded (e.g., from a file watcher) + /// to avoid parsing the config file twice. + pub async fn apply_settings(&self, settings: Settings) { + *self.state.settings.write().await = settings; + self.emit(DaemonEvent::SettingsReloaded); + tracing::info!("settings applied"); + } + + /// Get the encryption key. + pub fn encryption_key(&self) -> &[u8; 32] { + &self.state.encryption_key + } + + // ---- Database ---- + + /// Get a reference to the history database. + pub fn history_db(&self) -> &HistoryDatabase { + &self.state.history_db + } + + /// Get a reference to the record store. + pub fn store(&self) -> &SqliteStore { + &self.state.store + } +} + +impl std::fmt::Debug for DaemonHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DaemonHandle").finish_non_exhaustive() + } +} + +// ============================================================================ +// Component Trait +// ============================================================================ + +/// A daemon component that handles a specific domain. +/// +/// Components are the building blocks of the daemon. Each component: +/// +/// - Has a unique name for logging and debugging +/// - Can optionally expose gRPC services +/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources +/// - Handles events from the event bus +/// - Performs cleanup on shutdown +/// +/// # Lifecycle +/// +/// 1. **Construction**: Component is created (usually via `new()`) +/// 2. **Start**: `start()` is called with a [`DaemonHandle`] +/// 3. **Running**: `handle_event()` is called for each event on the bus +/// 4. **Shutdown**: `stop()` is called for cleanup +/// +/// # Example +/// +/// ```ignore +/// pub struct MyComponent { +/// handle: Option<DaemonHandle>, +/// } +/// +/// #[async_trait] +/// impl Component for MyComponent { +/// fn name(&self) -> &'static str { "my-component" } +/// +/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> { +/// self.handle = Some(handle); +/// Ok(()) +/// } +/// +/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { +/// match event { +/// DaemonEvent::SomeEvent => { +/// // Handle the event +/// if let Some(handle) = &self.handle { +/// handle.emit(DaemonEvent::ResponseEvent); +/// } +/// } +/// _ => {} +/// } +/// Ok(()) +/// } +/// +/// async fn stop(&mut self) -> Result<()> { +/// Ok(()) +/// } +/// } +/// ``` +#[tonic::async_trait] +pub trait Component: Send + Sync { + /// Human-readable name for logging and debugging. + fn name(&self) -> &'static str; + + /// Called once at startup. + /// + /// Store the handle if you need to emit events or access daemon resources + /// later. The handle is cheaply cloneable, so feel free to clone it for + /// spawned tasks. + async fn start(&mut self, handle: DaemonHandle) -> Result<()>; + + /// Handle an incoming event. + /// + /// Called for every event on the bus. To emit new events in response, + /// use the handle stored during `start()`. Events emitted here will be + /// processed in subsequent event loop iterations. + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; + + /// Called on graceful shutdown. + /// + /// Use this to clean up resources, abort spawned tasks, etc. + async fn stop(&mut self) -> Result<()>; +} + +// ============================================================================ +// Daemon +// ============================================================================ + +/// The main daemon orchestrator. +/// +/// The daemon manages components, runs the event loop, and coordinates startup +/// and shutdown. It is constructed via [`DaemonBuilder`]. +/// +/// # Event Loop +/// +/// The daemon runs a simple event loop: +/// +/// 1. Wait for an event on the bus +/// 2. Dispatch the event to all components (in registration order) +/// 3. Components may emit new events in response +/// 4. Repeat until `ShutdownRequested` is received +/// +/// Events emitted during handling are queued and processed in subsequent +/// iterations, ensuring the loop eventually drains. +pub struct Daemon { + components: Vec<Box<dyn Component>>, + handle: DaemonHandle, +} + +impl Daemon { + /// Create a new daemon builder. + pub fn builder(settings: Settings) -> DaemonBuilder { + DaemonBuilder::new(settings) + } + + /// Get a clone of the daemon handle. + /// + /// The handle can be used to emit events, access settings, etc. + pub fn handle(&self) -> DaemonHandle { + self.handle.clone() + } + + /// Start all components. + /// + /// This must be called before `run_event_loop()`. It initializes all + /// registered components with the daemon handle. + pub async fn start_components(&mut self) -> Result<()> { + for component in &mut self.components { + tracing::info!(component = component.name(), "starting component"); + component + .start(self.handle.clone()) + .await + .with_context(|| format!("failed to start component: {}", component.name()))?; + } + Ok(()) + } + + /// Run the daemon event loop. + /// + /// This processes events until a ShutdownRequested event is received. + /// Components must be started first via `start_components()`. + pub async fn run_event_loop(&mut self) -> Result<()> { + let mut event_rx = self.handle.subscribe(); + loop { + match event_rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => { + tracing::info!("shutdown requested, stopping daemon"); + break; + } + Ok(event) => { + tracing::debug!(?event, "processing event"); + self.dispatch_event(&event).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "event receiver lagged, some events were dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::info!("event bus closed, stopping daemon"); + break; + } + } + } + Ok(()) + } + + /// Stop all components. + /// + /// This performs graceful shutdown of all components. + pub async fn stop_components(&mut self) { + for component in &mut self.components { + tracing::info!(component = component.name(), "stopping component"); + if let Err(e) = component.stop().await { + tracing::error!( + component = component.name(), + error = ?e, + "error stopping component" + ); + } + } + tracing::info!("all components stopped"); + } + + /// Run the daemon. + /// + /// This is a convenience method that starts components, runs the event loop, + /// and handles shutdown. It does not return until the daemon is shut down. + pub async fn run(mut self) -> Result<()> { + self.start_components().await?; + self.run_event_loop().await?; + self.stop_components().await; + tracing::info!("daemon stopped"); + Ok(()) + } + + async fn dispatch_event(&mut self, event: &DaemonEvent) { + for component in &mut self.components { + if let Err(e) = component.handle_event(event).await { + tracing::error!( + component = component.name(), + error = ?e, + "error handling event" + ); + } + } + } +} + +// ============================================================================ +// DaemonBuilder +// ============================================================================ + +/// Builder for constructing a [`Daemon`]. +/// +/// # Example +/// +/// ```ignore +/// let daemon = Daemon::builder(settings) +/// .store(store) +/// .history_db(history_db) +/// .component(HistoryComponent::new()) +/// .component(SearchComponent::new()) +/// .component(SyncComponent::new()) +/// .build() +/// .await?; +/// +/// daemon.run().await?; +/// ``` +pub struct DaemonBuilder { + settings: Settings, + store: Option<SqliteStore>, + history_db: Option<HistoryDatabase>, + components: Vec<Box<dyn Component>>, +} + +impl DaemonBuilder { + /// Create a new daemon builder with the given settings. + pub fn new(settings: Settings) -> Self { + Self { + settings, + store: None, + history_db: None, + components: Vec::new(), + } + } + + /// Set the record store. + pub fn store(mut self, store: SqliteStore) -> Self { + self.store = Some(store); + self + } + + /// Set the history database. + pub fn history_db(mut self, db: HistoryDatabase) -> Self { + self.history_db = Some(db); + self + } + + /// Register a component. + /// + /// Components are started in registration order and stopped in reverse order. + pub fn component(mut self, component: impl Component + 'static) -> Self { + self.components.push(Box::new(component)); + self + } + + /// Build the daemon. + /// + /// This loads the encryption key and creates the daemon state. + pub async fn build(self) -> Result<Daemon> { + let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?; + let history_db = self + .history_db + .ok_or_else(|| eyre::eyre!("history_db is required"))?; + + // Load encryption key + let encryption_key: [u8; 32] = encryption::load_key(&self.settings) + .context("could not load encryption key")? + .into(); + + // Create the event bus + let (event_tx, _) = broadcast::channel(64); + + // Create the shared state + let state = Arc::new(DaemonState { + event_tx, + settings: RwLock::new(self.settings), + encryption_key, + history_db, + store, + }); + + // Create the handle (just a reference to the state) + let handle = DaemonHandle { state }; + + Ok(Daemon { + components: self.components, + handle, + }) + } +} diff --git a/crates/turtle/src/atuin_daemon/events.rs b/crates/turtle/src/atuin_daemon/events.rs new file mode 100644 index 00000000..9a398925 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/events.rs @@ -0,0 +1,74 @@ +//! Daemon events. +//! +//! Events are the primary communication mechanism within the daemon. +//! Components emit events to notify others of state changes, and handle +//! events to react to changes elsewhere in the system. +//! +//! External processes (like CLI commands) can also inject events via the +//! Control gRPC service. + +use crate::atuin_client::history::{History, HistoryId}; +use crate::atuin_common::record::RecordId; + +/// Events that flow through the daemon's event bus. +/// +/// Events are broadcast to all components. Each component decides which +/// events it cares about in its `handle_event` implementation. +#[derive(Debug, Clone)] +pub enum DaemonEvent { + // ---- History lifecycle ---- + /// A command has started running. + HistoryStarted(History), + + /// A command has finished running. + HistoryEnded(History), + + // ---- Sync ---- + /// Records were synced from the server. + /// + /// The search component uses this to update its index with new history. + RecordsAdded(Vec<RecordId>), + + /// Sync completed successfully. + SyncCompleted { + /// Number of records uploaded. + uploaded: usize, + /// Number of records downloaded. + downloaded: usize, + }, + + /// Sync failed. + SyncFailed { + /// Error message describing what went wrong. + error: String, + }, + + /// Request an immediate sync (external trigger). + ForceSync, + + // ---- External commands ---- + /// History was pruned - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin history prune` or similar. + HistoryPruned, + + /// History was rebuilt - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin store rebuild history` or similar. + HistoryRebuilt, + + /// Specific history items were deleted. + /// + /// The search component should remove these from its index. + HistoryDeleted { + /// IDs of the deleted history entries. + ids: Vec<HistoryId>, + }, + + /// Settings have changed, components should reload if needed. + SettingsReloaded, + + // ---- Lifecycle ---- + /// Request graceful shutdown of the daemon. + ShutdownRequested, +} diff --git a/crates/turtle/src/atuin_daemon/history/mod.rs b/crates/turtle/src/atuin_daemon/history/mod.rs new file mode 100644 index 00000000..b71853df --- /dev/null +++ b/crates/turtle/src/atuin_daemon/history/mod.rs @@ -0,0 +1,6 @@ +//! History module for the daemon gRPC history service. +//! +//! This module contains the proto-generated types for the history gRPC service. + +// Include the generated proto code +tonic::include_proto!("history"); diff --git a/crates/turtle/src/atuin_daemon/mod.rs b/crates/turtle/src/atuin_daemon/mod.rs new file mode 100644 index 00000000..b05eb95c --- /dev/null +++ b/crates/turtle/src/atuin_daemon/mod.rs @@ -0,0 +1,128 @@ +use crate::atuin_client::database::Sqlite as HistoryDatabase; +use crate::atuin_client::record::sqlite_store::SqliteStore; +use crate::atuin_client::settings::{Settings, watcher::global_settings_watcher}; +use eyre::Result; + +pub mod client; +pub mod components; +pub mod control; +pub mod daemon; +pub mod events; +pub mod history; +pub mod search; +pub mod semantic; +pub mod server; + +// Re-export core daemon types for convenience +pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle}; +pub use events::DaemonEvent; + +// Re-export components +pub use components::{HistoryComponent, SearchComponent, SemanticComponent, SyncComponent}; + +// Re-export client helpers +pub use client::{ControlClient, SemanticClient, emit_event, emit_event_with_settings}; + +/// Boot the daemon using the new component-based architecture. +/// +/// This creates a daemon with the standard components (history, search, sync), +/// starts the gRPC server with their services, and runs the event loop. +pub async fn boot( + settings: Settings, + store: SqliteStore, + history_db: HistoryDatabase, +) -> Result<()> { + // Create the components + let history_component = HistoryComponent::new(); + let search_component = SearchComponent::new(); + let semantic_component = SemanticComponent::new(); + let sync_component = SyncComponent::new(); + + // Get the gRPC services before moving components into the daemon + // (The services share state with the components via Arc) + let history_service = history_component.grpc_service(); + let search_service = search_component.grpc_service(); + let semantic_service = semantic_component.grpc_service(); + + // Build the daemon + let mut daemon = Daemon::builder(settings.clone()) + .store(store) + .history_db(history_db) + .component(history_component) + .component(search_component) + .component(semantic_component) + .component(sync_component) + .build() + .await?; + + // Get a handle for the control service and gRPC server shutdown + let handle = daemon.handle(); + + // Create the control service + let control_service = control::ControlService::new(handle.clone()); + + // Start all components first (so gRPC services can work) + daemon.start_components().await?; + + // Spawn config file watcher to reload settings on changes + if let Ok(watcher) = global_settings_watcher() { + let mut settings_rx = watcher.subscribe(); + let watcher_handle = handle.clone(); + tokio::spawn(async move { + tracing::info!("config file watcher started"); + while settings_rx.changed().await.is_ok() { + // Use the already-loaded settings from the watcher + // (avoids parsing the config file twice) + let new_settings = (*settings_rx.borrow()).clone(); + watcher_handle.apply_settings((*new_settings).clone()).await; + } + tracing::debug!("config file watcher stopped"); + }); + } else { + tracing::warn!( + "failed to start config file watcher; settings changes will require daemon restart" + ); + } + + // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM + let signal_handle = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + tracing::info!("received shutdown signal"); + signal_handle.shutdown(); + }); + + // Start the gRPC server in the background + server::run_grpc_server( + settings, + history_service, + search_service, + semantic_service, + control_service.into_server(), + handle, + ) + .await?; + + // Run the daemon event loop + daemon.run_event_loop().await?; + + // Stop all components on shutdown + daemon.stop_components().await; + + tracing::info!("daemon shut down complete"); + Ok(()) +} + +/// Wait for a shutdown signal (Ctrl+C or SIGTERM). +#[cfg(unix)] +async fn shutdown_signal() { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register sigterm handler"); + let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register sigint handler"); + + tokio::select! { + _ = term.recv() => {}, + _ = int.recv() => {}, + } +} diff --git a/crates/turtle/src/atuin_daemon/search/index.rs b/crates/turtle/src/atuin_daemon/search/index.rs new file mode 100644 index 00000000..df627e1b --- /dev/null +++ b/crates/turtle/src/atuin_daemon/search/index.rs @@ -0,0 +1,684 @@ +//! Search index with frecency-based ranking. +//! +//! This module provides a deduplicated search index where each unique command +//! is stored once, with metadata about all its invocations. This enables: +//! +//! - Efficient fuzzy matching (fewer items to match) +//! - Frecency-based ranking (frequency + recency) +//! - Dynamic filtering by directory, host, session, etc. + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use crate::atuin_client::settings::Search; +use crate::{ + atuin_client::history::{History, is_known_agent}, + atuin_daemon::components::search::with_trailing_slash, +}; +use atuin_nucleo::{Injector, Nucleo, pattern}; +use dashmap::DashMap; +use lasso::{Spur, ThreadedRodeo}; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{Level, instrument}; +use uuid::Uuid; + +/// Parse a UUID string into a 16-byte array. +/// Returns None if the string is not a valid UUID. +fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> { + Uuid::parse_str(s).ok().map(|u| *u.as_bytes()) +} + +/// Format a 16-byte array as a UUID string. +fn format_uuid_bytes(bytes: &[u8; 16]) -> String { + Uuid::from_bytes(*bytes).to_string() +} + +/// Pre-computed frecency data for O(1) lookup. +#[derive(Debug, Clone, Default)] +pub struct FrecencyData { + /// Total number of times this command was used. + pub count: u32, + /// Most recent usage timestamp (unix seconds). + pub last_used: i64, +} + +impl FrecencyData { + /// Record a new usage of this command. + pub fn record_use(&mut self, timestamp: i64) { + self.count += 1; + if timestamp > self.last_used { + self.last_used = timestamp; + } + } + + /// Compute frecency score based on count and recency. + /// + /// Uses a decay function where more recent commands score higher. + /// The formula balances frequency (how often) with recency (how recent). + /// + /// Multipliers allow tuning the relative weights: + /// - `recency_mul`: Multiplier for recency score (default: 1.0) + /// - `frequency_mul`: Multiplier for frequency score (default: 1.0) + /// + /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight. + /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc. + #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] + pub fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 { + if self.count == 0 { + return 0; + } + + // Time-based decay: score decreases as time passes + let age_seconds = (now - self.last_used).max(0) as u64; + let age_hours = age_seconds / 3600; + + // Decay factor: recent commands get higher scores + // - Last hour: multiplier ~1.0 + // - Last day: multiplier ~0.5 + // - Last week: multiplier ~0.1 + // - Older: multiplier approaches 0 + let recency_score: f64 = match age_hours { + 0 => 100.0, + 1..=6 => 90.0, + 7..=24 => 70.0, + 25..=72 => 50.0, + 73..=168 => 30.0, + 169..=720 => 15.0, + _ => 5.0, + }; + + // Frequency boost: more uses = higher score (with diminishing returns) + let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0); + + // Apply multipliers and combine scores, then round to u32 + ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32 + } +} + +/// Data for a unique command. +pub struct CommandData { + /// History ID of the most recent invocation (16-byte UUID). + most_recent_id: [u8; 16], + /// Timestamp of the most recent invocation. + most_recent_timestamp: i64, + /// Pre-computed global frecency. + pub global_frecency: FrecencyData, + + // Pre-computed indexes for O(1) filter lookups + // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized) + /// All directories where this command has been run (interned keys). + directories: HashSet<Spur>, + /// All hostnames where this command has been run (interned keys). + hosts: HashSet<Spur>, + /// All sessions where this command has been run (as 16-byte UUIDs). + sessions: HashSet<[u8; 16]>, +} + +impl CommandData { + /// Create a new CommandData from a history entry. + /// Returns None if the history entry has invalid UUIDs. + pub fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> { + let history_id = parse_uuid_bytes(&history.id.0)?; + let session = parse_uuid_bytes(&history.session)?; + let timestamp = history.timestamp.unix_timestamp(); + + let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); + let host_key = interner.get_or_intern(&history.hostname); + + let mut directories = HashSet::new(); + directories.insert(dir_key); + + let mut hosts = HashSet::new(); + hosts.insert(host_key); + + let mut sessions = HashSet::new(); + sessions.insert(session); + + let mut global_frecency = FrecencyData::default(); + global_frecency.record_use(timestamp); + + Some(Self { + most_recent_id: history_id, + most_recent_timestamp: timestamp, + global_frecency, + directories, + hosts, + sessions, + }) + } + + /// Add an invocation from a history entry. + /// Returns false if the history entry has invalid UUIDs. + pub fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool { + let Some(history_id) = parse_uuid_bytes(&history.id.0) else { + return false; + }; + let Some(session) = parse_uuid_bytes(&history.session) else { + return false; + }; + + let timestamp = history.timestamp.unix_timestamp(); + + // Update global frecency + self.global_frecency.record_use(timestamp); + + // Update pre-computed indexes for O(1) filter lookups + let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); + self.directories.insert(dir_key); + self.hosts.insert(interner.get_or_intern(&history.hostname)); + self.sessions.insert(session); + + // Update most recent if this invocation is newer + if timestamp > self.most_recent_timestamp { + self.most_recent_id = history_id; + self.most_recent_timestamp = timestamp; + } + + true + } + + /// Get the most recent history ID for this command. + pub fn most_recent_id(&self) -> String { + format_uuid_bytes(&self.most_recent_id) + } + + /// Check if any invocation matches a directory filter (exact match). + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool { + interner + .get(dir) + .is_some_and(|spur| self.directories.contains(&spur)) + } + + /// Check if any invocation matches a directory prefix (workspace/git root). + /// O(n) where n = number of unique directories for this command. + pub fn has_invocation_in_workspace(&self, prefix: &str, interner: &ThreadedRodeo) -> bool { + self.directories + .iter() + .any(|&spur| interner.resolve(&spur).starts_with(prefix)) + } + + /// Check if any invocation matches a hostname. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool { + interner + .get(hostname) + .is_some_and(|spur| self.hosts.contains(&spur)) + } + + /// Check if any invocation matches a session. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_session(&self, session: &str) -> bool { + parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes)) + } +} + +/// Filter mode for search queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum IndexFilterMode { + /// No filtering - search all commands. + Global, + /// Filter to commands run in a specific directory. + Directory(String), + /// Filter to commands run in a workspace (directory prefix). + Workspace(String), + /// Filter to commands run on a specific host. + Host(String), + /// Filter to commands run in a specific session. + Session(String), +} + +/// Context for search queries. +#[derive(Debug, Clone, Default)] +pub struct QueryContext { + pub cwd: Option<String>, + pub git_root: Option<String>, + pub hostname: Option<String>, + pub session_id: Option<String>, +} + +/// Shareable frecency map: command -> frecency score. +/// Wrapped in Arc for zero-copy sharing with scorer callbacks. +type FrecencyMap = Arc<HashMap<Arc<str>, u32>>; + +/// A deduplicated search index with frecency-based ranking. +/// +/// Commands are stored by their text, with metadata about all invocations. +/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. +/// +/// Global frecency is precomputed by a background task and used for scoring. +/// If frecency data is not available, search still works but without frecency ranking; +/// although this should never happen due to precomputing the frecency map. +pub struct SearchIndex { + /// Map from command text to command data. + /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. + /// Keys are Arc<str> to enable zero-copy sharing with frecency_map. + commands: Arc<DashMap<Arc<str>, CommandData>>, + /// Nucleo fuzzy matcher - items are command strings. + nucleo: RwLock<Nucleo<String>>, + /// Injector for adding new commands to Nucleo. + injector: Injector<String>, + /// Precomputed global frecency map. Updated by background task. + frecency_map: RwLock<Option<FrecencyMap>>, + /// String interner for deduplicating cwd, hostname, and directory paths. + interner: Arc<ThreadedRodeo>, +} + +impl SearchIndex { + /// Create a new empty search index. + pub fn new() -> Self { + let nucleo_config = atuin_nucleo::Config::DEFAULT; + // Single column for command text + let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1); + let injector = nucleo.injector(); + + Self { + commands: Arc::new(DashMap::new()), + nucleo: RwLock::new(nucleo), + injector, + frecency_map: RwLock::new(None), + interner: Arc::new(ThreadedRodeo::new()), + } + } + + /// Add a history entry to the index. + /// + /// If the command already exists, updates its invocation data. + /// If it's a new command, adds it to both the map and Nucleo. + pub fn add_history(&self, history: &History) { + if is_known_agent(&history.author) { + return; + } + + let command = history.command.as_str(); + + // DashMap with Arc<str> keys can be looked up with &str via Borrow trait + if let Some(mut entry) = self.commands.get_mut(command) { + // Existing command - just update invocations + entry.add_invocation(history, &self.interner); + } else { + // New command - create Arc<str> once and share it + let Some(data) = CommandData::new(history, &self.interner) else { + return; // Invalid UUIDs, skip this entry + }; + let command_arc: Arc<str> = command.into(); + self.commands.insert(Arc::clone(&command_arc), data); + // Nucleo still needs String (unavoidable copy for fuzzy matching) + self.injector.push(command_arc.to_string(), |cmd, cols| { + cols[0] = cmd.clone().into(); + }); + } + // Note: frecency_map is rebuilt by background task, not invalidated here + } + + /// Add multiple history entries to the index. + pub fn add_histories(&self, histories: &[History]) { + for history in histories { + self.add_history(history); + } + } + + /// Get the number of unique commands in the index. + pub fn command_count(&self) -> usize { + self.commands.len() + } + + /// Get the number of items in Nucleo (should match command_count). + pub async fn nucleo_item_count(&self) -> u32 { + self.nucleo.read().await.snapshot().item_count() + } + + /// Search for commands matching a query. + /// + /// Returns a list of history IDs (most recent invocation per command). + /// Uses precomputed global frecency for scoring if available. + #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] + pub async fn search( + &self, + query: &str, + filter_mode: IndexFilterMode, + _context: &QueryContext, + limit: u32, + ) -> Vec<String> { + let mut nucleo = self.nucleo.write().await; + + // Get precomputed frecency map (may be None if not yet computed) + let frecency_map = self.frecency_map.read().await.clone(); + + // Build filter based on mode + let filter = self.build_filter(&filter_mode); + nucleo.set_filter(filter); + + // Build scorer from precomputed frecency (or None if not available) + let scorer = Self::build_scorer(frecency_map); + nucleo.set_scorer(scorer); + + // Update pattern + nucleo.pattern.reparse( + 0, + query, + pattern::CaseMatching::Smart, + pattern::Normalization::Smart, + false, + ); + + tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { + // Tick until complete + while nucleo.tick(10).running {} + }); + + // Collect results + let snapshot = nucleo.snapshot(); + let matched_count = snapshot.matched_item_count().min(limit); + + tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { + snapshot + .matched_items(..matched_count) + .filter_map(|item| { + let cmd = item.data; + // DashMap<Arc<str>, _>::get accepts &str via Borrow trait + self.commands + .get(cmd.as_str()) + .map(|data| data.most_recent_id()) + }) + .collect() + }) + } + + /// Rebuild the global frecency map. + /// + /// This should be called by a background task periodically. + /// The map is used for scoring search results. + /// + /// Uses multipliers from search settings: + /// - `recency_score_multiplier`: Weight for recency component + /// - `frequency_score_multiplier`: Weight for frequency component + /// - `frecency_score_multiplier`: Overall multiplier for final score + #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] + pub async fn rebuild_frecency(&self, search_settings: &Search) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new(); + + // Clamp multipliers to non-negative values to prevent broken frecency ranking + // (negative values would produce unexpected results when cast to u32) + let recency_mul = search_settings.recency_score_multiplier.max(0.0); + let frequency_mul = search_settings.frequency_score_multiplier.max(0.0); + let frecency_mul = search_settings.frecency_score_multiplier.max(0.0); + + for entry in self.commands.iter() { + let frecency = entry + .global_frecency + .compute(now, recency_mul, frequency_mul); + // Apply overall frecency multiplier and round to u32 + let frecency = (frecency as f64 * frecency_mul).round() as u32; + // Arc::clone is cheap - just increments reference count + frecency_map.insert(Arc::clone(entry.key()), frecency); + } + + *self.frecency_map.write().await = Some(Arc::new(frecency_map)); + } + + /// Build filter predicate for the given mode. + fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> { + // For Global mode, no filter needed + if matches!(mode, IndexFilterMode::Global) { + return None; + } + + // Pre-compute which commands pass the filter + // Use HashSet<String> for the short-lived filter (simpler than Arc lookup) + let passing_commands: Arc<HashSet<String>> = { + let mut set = HashSet::new(); + for entry in self.commands.iter() { + let passes = match mode { + IndexFilterMode::Global => unreachable!(), + IndexFilterMode::Directory(dir) => { + entry.has_invocation_in_dir(dir, &self.interner) + } + IndexFilterMode::Workspace(prefix) => { + entry.has_invocation_in_workspace(prefix, &self.interner) + } + IndexFilterMode::Host(hostname) => { + entry.has_invocation_on_host(hostname, &self.interner) + } + IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), + }; + if passes { + // Convert Arc<str> to String for filter lookup + set.insert(entry.key().to_string()); + } + } + Arc::new(set) + }; + + Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) + } + + /// Build scorer from precomputed frecency map. + /// + /// Returns None if frecency map is not available (search still works, just without frecency ranking). + fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> { + let map = frecency_map?; + Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { + // HashMap<Arc<str>, _>::get accepts &str via Borrow trait + let frecency = map.get(cmd.as_str()).copied().unwrap_or(0); + fuzzy_score + frecency + })) + } +} + +impl Default for SearchIndex { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::macros::datetime; + + fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History { + History::import() + .timestamp(timestamp) + .command(command) + .cwd(cwd) + .build() + .into() + } + + #[test] + fn frecency_data_compute() { + let now = 1_000_000i64; + + // Recent command (with default multipliers of 1.0) + let recent = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago + }; + assert!(recent.compute(now, 1.0, 1.0) > 100); // High score + + // Old command + let old = FrecencyData { + count: 5, + last_used: now - 86400 * 30, // 30 days ago + }; + assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0)); + + // Frequently used old command + let frequent_old = FrecencyData { + count: 100, + last_used: now - 86400 * 7, // 1 week ago + }; + // Should still have decent score due to frequency + assert!(frequent_old.compute(now, 1.0, 1.0) > 50); + } + + #[test] + fn frecency_data_compute_with_multipliers() { + let now = 1_000_000_i64; + + let data = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago (recency_score = 100) + }; + + // Default multipliers (1.0, 1.0) + let default_score = data.compute(now, 1.0, 1.0); + + // Double recency weight + let double_recency = data.compute(now, 2.0, 1.0); + assert!(double_recency > default_score); + + // Double frequency weight + let double_frequency = data.compute(now, 1.0, 2.0); + assert!(double_frequency > default_score); + + // Zero out recency (only frequency counts) + let no_recency = data.compute(now, 0.0, 1.0); + assert!(no_recency < default_score); + + // Zero out frequency (only recency counts) + let no_frequency = data.compute(now, 1.0, 0.0); + assert!(no_frequency < default_score); + + // Zero both (should be zero) + let no_score = data.compute(now, 0.0, 0.0); + assert_eq!(no_score, 0); + + // Fractional multipliers + let half_recency = data.compute(now, 0.5, 1.0); + assert!(half_recency < default_score); + assert!(half_recency > no_recency); + + // 1.5x multiplier + let boost_recency = data.compute(now, 1.5, 1.0); + assert!(boost_recency > default_score); + assert!(boost_recency < double_recency); + } + + #[test] + fn command_data_add_invocation() { + let interner = ThreadedRodeo::new(); + + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&history1, &interner).unwrap(); + assert_eq!(data.global_frecency.count, 1); + let id1 = data.most_recent_id(); + + data.add_invocation(&history2, &interner); + assert_eq!(data.global_frecency.count, 2); + + // Most recent ID should update to history2 (newer timestamp) + let id2 = data.most_recent_id(); + assert_ne!(id1, id2); + } + + #[test] + fn command_data_filters() { + let interner = ThreadedRodeo::new(); + + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&h1, &interner).unwrap(); + data.add_invocation(&h2, &interner); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User\\project"), + with_trailing_slash("C:\\Users\\User\\other"), + with_trailing_slash("C:\\Users\\User\\missing"), + ) + } else { + ( + with_trailing_slash("/home/user/project"), + with_trailing_slash("/home/user/other"), + with_trailing_slash("/home/user/missing"), + ) + }; + + assert!(data.has_invocation_in_dir(&check1, &interner)); + assert!(data.has_invocation_in_dir(&check2, &interner)); + assert!(!data.has_invocation_in_dir(&check3, &interner)); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User"), + with_trailing_slash("C:\\Users"), + with_trailing_slash("C:\\Users\\User\\var"), + ) + } else { + ( + with_trailing_slash("/home/user"), + with_trailing_slash("/home"), + with_trailing_slash("/var"), + ) + }; + + assert!(data.has_invocation_in_workspace(&check1, &interner)); + assert!(data.has_invocation_in_workspace(&check2, &interner)); + assert!(!data.has_invocation_in_workspace(&check3, &interner)); + } + + #[tokio::test] + async fn search_index_add_and_search() { + let index = SearchIndex::new(); + + let h1 = make_history( + "git status", + "/home/user/project", + datetime!(2024-01-01 10:00 UTC), + ); + let h2 = make_history( + "git commit -m 'test'", + "/home/user/project", + datetime!(2024-01-01 10:05 UTC), + ); + let h3 = make_history( + "ls -la", + "/home/user/other", + datetime!(2024-01-01 10:10 UTC), + ); + + index.add_history(&h1); + index.add_history(&h2); + index.add_history(&h3); + + assert_eq!(index.command_count(), 3); + + // Search for "git" - should match 2 commands + let results = index + .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) + .await; + assert_eq!(results.len(), 2); + + // Search with directory filter + let results = index + .search( + "", + IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), + &QueryContext::default(), + 10, + ) + .await; + assert_eq!(results.len(), 2); // git status and git commit + } +} diff --git a/crates/turtle/src/atuin_daemon/search/mod.rs b/crates/turtle/src/atuin_daemon/search/mod.rs new file mode 100644 index 00000000..4d261956 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/search/mod.rs @@ -0,0 +1,11 @@ +//! Search module for the daemon gRPC search service. +//! +//! This module provides fuzzy search over command history using Nucleo. + +mod index; + +// Include the generated proto code +tonic::include_proto!("search"); + +// Re-export the service and index +pub use index::{IndexFilterMode, QueryContext, SearchIndex}; diff --git a/crates/turtle/src/atuin_daemon/semantic/mod.rs b/crates/turtle/src/atuin_daemon/semantic/mod.rs new file mode 100644 index 00000000..c3511676 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/semantic/mod.rs @@ -0,0 +1,3 @@ +//! Semantic command capture gRPC service types. + +tonic::include_proto!("semantic"); diff --git a/crates/turtle/src/atuin_daemon/server.rs b/crates/turtle/src/atuin_daemon/server.rs new file mode 100644 index 00000000..23b04342 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/server.rs @@ -0,0 +1,115 @@ +use eyre::Result; + +use crate::atuin_daemon::components::history::HistoryGrpcService; +use crate::atuin_daemon::components::search::SearchGrpcService; +use crate::atuin_daemon::components::semantic::SemanticGrpcService; +use crate::atuin_daemon::control::{ControlService, control_server::ControlServer}; +use crate::atuin_daemon::daemon::DaemonHandle; +use crate::atuin_daemon::history::history_server::HistoryServer; +use crate::atuin_daemon::search::search_server::SearchServer; +use crate::atuin_daemon::semantic::semantic_server::SemanticServer; + +use crate::atuin_client::settings::Settings; + +/// Run the gRPC server with the given services. +/// +/// This starts the gRPC server in the background and returns immediately. +/// The server will shut down when a ShutdownRequested event is received. +#[cfg(unix)] +pub async fn run_grpc_server( + settings: Settings, + history_service: HistoryServer<HistoryGrpcService>, + search_service: SearchServer<SearchGrpcService>, + semantic_service: SemanticServer<SemanticGrpcService>, + control_service: ControlServer<ControlService>, + handle: DaemonHandle, +) -> Result<()> { + use tokio::net::UnixListener; + use tokio_stream::wrappers::UnixListenerStream; + + let socket_path = settings.daemon.socket_path.clone(); + + let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket { + #[cfg(target_os = "linux")] + { + use eyre::{OptionExt, WrapErr}; + use std::os::unix::net::SocketAddr; + use std::path::PathBuf; + tracing::info!("getting systemd socket"); + let listener = listenfd::ListenFd::from_env() + .take_unix_listener(0)? + .ok_or_eyre("missing systemd socket")?; + listener.set_nonblocking(true)?; + let actual_path: Result<PathBuf, eyre::Report> = listener + .local_addr() + .context("getting systemd socket's path") + .and_then(|addr: SocketAddr| { + addr.as_pathname() + .ok_or_eyre("systemd socket missing path") + .map(|path: &std::path::Path| path.to_owned()) + }); + match actual_path { + Ok(actual_path) => { + tracing::info!("listening on systemd socket: {actual_path:?}"); + if actual_path != std::path::Path::new(&socket_path) { + tracing::warn!( + "systemd socket is not at configured client path: {socket_path:?}" + ); + } + } + Err(err) => { + tracing::warn!( + "could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}" + ); + } + } + (UnixListener::from_std(listener)?, false) + } + } else { + tracing::info!("listening on unix socket {socket_path:?}"); + (UnixListener::bind(socket_path.clone())?, true) + }; + + let uds_stream = UnixListenerStream::new(uds); + + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + let mut rx = handle.subscribe(); + loop { + use crate::atuin_daemon::DaemonEvent; + + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + if cleanup { + eprintln!("Removing socket..."); + if let Err(e) = std::fs::remove_file(&socket_path) + && e.kind() != std::io::ErrorKind::NotFound + { + eprintln!("failed to remove socket: {e}"); + } + } + eprintln!("Shutting down gRPC server..."); + }; + + // Spawn the server in the background + tokio::spawn(async move { + use tonic::transport::Server; + + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(semantic_service) + .add_service(control_service) + .serve_with_incoming_shutdown(uds_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); + + Ok(()) +} |
