diff options
| author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 00:54:30 +0200 |
|---|---|---|
| committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-11 00:54:30 +0200 |
| commit | 5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch) | |
| tree | c64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/atuin-daemon/src | |
| parent | chore: Somewhat simplify sync code (diff) | |
| download | atuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip | |
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show
dead code correctly.
Diffstat (limited to 'crates/atuin-daemon/src')
| -rw-r--r-- | crates/atuin-daemon/src/client.rs | 518 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 327 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/mod.rs | 25 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/search.rs | 413 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/semantic.rs | 900 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/sync.rs | 279 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/control/mod.rs | 12 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/control/service.rs | 71 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/daemon.rs | 458 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/events.rs | 74 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/history/mod.rs | 6 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/lib.rs | 136 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/search/index.rs | 683 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/search/mod.rs | 11 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/semantic/mod.rs | 3 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/server.rs | 170 |
16 files changed, 0 insertions, 4086 deletions
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs deleted file mode 100644 index c18e0e46..00000000 --- a/crates/atuin-daemon/src/client.rs +++ /dev/null @@ -1,518 +0,0 @@ -use atuin_client::database::Context; -use atuin_client::settings::{FilterMode, Settings}; -use eyre::{Context as EyreContext, Result}; -#[cfg(windows)] -use tokio::net::TcpStream; -use tonic::Code; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; - -use hyper_util::rt::TokioIo; - -#[cfg(unix)] -use tokio::net::UnixStream; - -use atuin_client::history::History; -use tracing::{Level, instrument, span}; - -use crate::control::HistoryRebuiltEvent; -use crate::control::{ - ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, - SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, -}; -use crate::events::DaemonEvent; -use crate::history::{ - EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, - StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, - history_client::HistoryClient as HistoryServiceClient, -}; -use crate::search::{ - FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, - search_client::SearchClient as SearchServiceClient, -}; -use crate::semantic::{ - CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply, - semantic_client::SemanticClient as SemanticServiceClient, -}; - -pub struct HistoryClient { - client: HistoryServiceClient<Channel>, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum DaemonClientErrorKind { - Connect, - Unavailable, - Unimplemented, - Other, -} - -#[must_use] -pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { - for cause in error.chain() { - if cause.downcast_ref::<tonic::transport::Error>().is_some() { - return DaemonClientErrorKind::Connect; - } - - if let Some(status) = cause.downcast_ref::<tonic::Status>() { - return match status.code() { - Code::Unavailable => DaemonClientErrorKind::Unavailable, - Code::Unimplemented => DaemonClientErrorKind::Unimplemented, - _ => DaemonClientErrorKind::Other, - }; - } - } - - DaemonClientErrorKind::Other -} - -// Wrap the grpc client -impl HistoryClient { - #[cfg(unix)] - pub async fn new(path: String) -> Result<Self> { - use eyre::Context; - - let log_path = path.clone(); - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let path = path.clone(); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at {}. Is it running?", - &log_path - ) - })?; - - let client = HistoryServiceClient::new(channel); - - Ok(HistoryClient { client }) - } - - #[cfg(not(unix))] - pub async fn new(port: u64) -> Result<Self> { - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let url = format!("127.0.0.1:{port}"); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" - ) - })?; - - let client = HistoryServiceClient::new(channel); - - Ok(HistoryClient { client }) - } - - pub async fn start_history(&mut self, h: History) -> Result<StartHistoryReply> { - let req = StartHistoryRequest { - command: h.command, - cwd: h.cwd, - hostname: h.hostname, - session: h.session, - timestamp: h.timestamp.unix_timestamp_nanos() as u64, - author: h.author, - intent: h.intent.unwrap_or_default(), - }; - - Ok(self.client.start_history(req).await?.into_inner()) - } - - pub async fn end_history( - &mut self, - id: String, - duration: u64, - exit: i64, - ) -> Result<EndHistoryReply> { - let req = EndHistoryRequest { id, duration, exit }; - - Ok(self.client.end_history(req).await?.into_inner()) - } - - pub async fn status(&mut self) -> Result<StatusReply> { - Ok(self.client.status(StatusRequest {}).await?.into_inner()) - } - - pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> { - Ok(self - .client - .tail_history(TailHistoryRequest {}) - .await? - .into_inner()) - } - - pub async fn shutdown(&mut self) -> Result<bool> { - let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner(); - Ok(resp.accepted) - } -} - -pub struct SearchClient { - client: SearchServiceClient<Channel>, -} - -impl SearchClient { - #[cfg(unix)] - pub async fn new(path: String) -> Result<Self> { - let log_path = path.clone(); - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let path = path.clone(); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at {}. Is it running?", - &log_path - ) - })?; - - let client = SearchServiceClient::new(channel); - - Ok(SearchClient { client }) - } - - #[cfg(not(unix))] - pub async fn new(port: u64) -> Result<Self> { - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let url = format!("127.0.0.1:{port}"); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" - ) - })?; - - let client = SearchServiceClient::new(channel); - - Ok(SearchClient { client }) - } - - #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] - pub async fn search( - &mut self, - query: String, - query_id: u64, - filter_mode: FilterMode, - context: Option<Context>, - ) -> Result<tonic::Streaming<SearchResponse>> { - let request = SearchRequest { - query, - query_id, - filter_mode: RpcFilterMode::from(filter_mode).into(), - context: context.map(RpcSearchContext::from), - }; - let request_stream = tokio_stream::once(request); - let response = span!(Level::TRACE, "daemon_client_search.request") - .in_scope(async || self.client.search(request_stream).await) - .await?; - - Ok(response.into_inner()) - } -} - -impl From<FilterMode> for RpcFilterMode { - fn from(filter_mode: FilterMode) -> Self { - match filter_mode { - FilterMode::Global => RpcFilterMode::Global, - FilterMode::Host => RpcFilterMode::Host, - FilterMode::Session => RpcFilterMode::Session, - FilterMode::Directory => RpcFilterMode::Directory, - FilterMode::Workspace => RpcFilterMode::Workspace, - FilterMode::SessionPreload => RpcFilterMode::SessionPreload, - } - } -} - -impl From<Context> for RpcSearchContext { - fn from(context: Context) -> Self { - RpcSearchContext { - session_id: context.session, - cwd: context.cwd, - hostname: context.hostname, - host_id: context.host_id, - git_root: context - .git_root - .map(|path| path.to_string_lossy().to_string()), - } - } -} - -pub struct SemanticClient { - client: SemanticServiceClient<Channel>, -} - -impl SemanticClient { - #[cfg(unix)] - pub async fn new(path: String) -> Result<Self> { - let log_path = path.clone(); - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let path = path.clone(); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at {}. Is it running?", - &log_path - ) - })?; - - let client = SemanticServiceClient::new(channel); - - Ok(SemanticClient { client }) - } - - #[cfg(not(unix))] - pub async fn new(port: u64) -> Result<Self> { - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let url = format!("127.0.0.1:{port}"); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" - ) - })?; - - let client = SemanticServiceClient::new(channel); - - Ok(SemanticClient { client }) - } - - #[cfg(unix)] - pub async fn from_settings(settings: &Settings) -> Result<Self> { - Self::new(settings.daemon.socket_path.clone()).await - } - - #[cfg(not(unix))] - pub async fn from_settings(settings: &Settings) -> Result<Self> { - Self::new(settings.daemon.tcp_port).await - } - - pub async fn record_commands( - &mut self, - captures: Vec<CommandCapture>, - ) -> Result<RecordCommandsReply> { - let stream = tokio_stream::iter(captures); - Ok(self.client.record_commands(stream).await?.into_inner()) - } - - pub async fn command_output( - &mut self, - history_id: String, - ranges: Vec<(i64, i64)>, - ) -> Result<CommandOutputReply> { - let request = CommandOutputRequest { - history_id, - ranges: ranges - .into_iter() - .map(|(start, end)| OutputRange { start, end }) - .collect(), - }; - - Ok(self.client.command_output(request).await?.into_inner()) - } -} - -// ============================================================================ -// Control Client -// ============================================================================ - -/// Client for the Control gRPC service. -/// -/// Used to inject events into a running daemon from external processes. -pub struct ControlClient { - client: ControlServiceClient<Channel>, -} - -impl ControlClient { - /// Connect to the daemon's control service. - #[cfg(unix)] - pub async fn new(path: String) -> Result<Self> { - let log_path = path.clone(); - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let path = path.clone(); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at {}. Is it running?", - &log_path - ) - })?; - - let client = ControlServiceClient::new(channel); - - Ok(ControlClient { client }) - } - - /// Connect to the daemon's control service. - #[cfg(not(unix))] - pub async fn new(port: u64) -> Result<Self> { - let channel = Endpoint::try_from("http://atuin_local_daemon:0")? - .connect_with_connector(service_fn(move |_: Uri| { - let url = format!("127.0.0.1:{port}"); - - async move { - Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) - } - })) - .await - .wrap_err_with(|| { - format!( - "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" - ) - })?; - - let client = ControlServiceClient::new(channel); - - Ok(ControlClient { client }) - } - - /// Connect using settings. - #[cfg(unix)] - pub async fn from_settings(settings: &Settings) -> Result<Self> { - Self::new(settings.daemon.socket_path.clone()).await - } - - /// Connect using settings. - #[cfg(not(unix))] - pub async fn from_settings(settings: &Settings) -> Result<Self> { - Self::new(settings.daemon.tcp_port).await - } - - /// Send an event to the daemon. - pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { - let proto_event = daemon_event_to_proto(event); - let request = SendEventRequest { - event: Some(proto_event), - }; - self.client.send_event(request).await?; - Ok(()) - } -} - -/// Convert a daemon event to its proto representation. -fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event { - use crate::control::send_event_request::Event; - - match event { - DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), - DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), - DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { - ids: ids.into_iter().map(|id| id.0).collect(), - }), - DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), - DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), - DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), - // These events are internal and not sent via the control service - DaemonEvent::HistoryStarted(_) - | DaemonEvent::HistoryEnded(_) - | DaemonEvent::RecordsAdded(_) - | DaemonEvent::SyncCompleted { .. } - | DaemonEvent::SyncFailed { .. } => { - // Use shutdown as a fallback, though this shouldn't happen - tracing::warn!("attempted to send internal event via control service"); - Event::Shutdown(ShutdownEvent {}) - } - } -} - -// ============================================================================ -// Convenience Functions -// ============================================================================ - -/// Emit an event to the daemon. -/// -/// This is a fire-and-forget helper for sending events to the daemon from -/// external processes like CLI commands. If the daemon isn't running, this -/// will silently succeed (returns Ok). -/// -/// # Example -/// -/// ```ignore -/// // After pruning history -/// emit_event(DaemonEvent::HistoryPruned).await?; -/// -/// // After deleting specific history items -/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; -/// -/// // Request immediate sync -/// emit_event(DaemonEvent::ForceSync).await?; -/// ``` -pub async fn emit_event(event: DaemonEvent) -> Result<()> { - emit_event_with_settings(event, None).await -} - -/// Emit an event to the daemon with explicit settings. -/// -/// If settings are not provided, they will be loaded from the default location. -/// If the daemon isn't running, this will silently succeed. -pub async fn emit_event_with_settings( - event: DaemonEvent, - settings: Option<&Settings>, -) -> Result<()> { - // Load settings if not provided - let owned_settings; - let settings = match settings { - Some(s) => s, - None => { - owned_settings = Settings::new()?; - &owned_settings - } - }; - - // Try to connect - if daemon isn't running, that's fine - let mut client = match ControlClient::from_settings(settings).await { - Ok(c) => c, - Err(e) => { - tracing::debug!(?e, "daemon not running, skipping event emission"); - return Ok(()); - } - }; - - // Send the event - if let Err(e) = client.send_event(event).await { - tracing::debug!(?e, "failed to send event to daemon"); - // Don't fail - this is fire-and-forget - } - - Ok(()) -} diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs deleted file mode 100644 index c82c8f94..00000000 --- a/crates/atuin-daemon/src/components/history.rs +++ /dev/null @@ -1,327 +0,0 @@ -//! History component. -//! -//! Handles command history lifecycle (start/end) and provides the History gRPC service. - -use std::{pin::Pin, sync::Arc}; - -use atuin_client::{ - database::Database, - history::{History, HistoryId, store::HistoryStore}, - settings::Settings, -}; -use dashmap::DashMap; -use eyre::Result; -use time::OffsetDateTime; -use tokio_stream::Stream; -use tonic::{Request, Response, Status}; -use tracing::{Level, instrument}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - history::{ - EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, - ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, - TailHistoryReply, TailHistoryRequest, - history_server::{History as HistorySvc, HistoryServer}, - }, -}; - -const DAEMON_PROTOCOL_VERSION: u32 = 1; - -/// History component - manages command history lifecycle. -/// -/// This component: -/// - Tracks currently running commands (stored in memory) -/// - Saves completed commands to the database and record store -/// - Emits history events for other components (e.g., search indexing) -/// - Provides the History gRPC service -pub struct HistoryComponent { - inner: Arc<HistoryComponentInner>, -} - -struct HistoryComponentInner { - /// Commands currently running (not yet completed). - running: DashMap<HistoryId, History>, - - /// Handle to the daemon (set during start). - handle: tokio::sync::RwLock<Option<DaemonHandle>>, - - /// History store for pushing records (set during start). - history_store: tokio::sync::RwLock<Option<HistoryStore>>, -} - -impl HistoryComponent { - /// Create a new history component. - pub fn new() -> Self { - Self { - inner: Arc::new(HistoryComponentInner { - running: DashMap::new(), - handle: tokio::sync::RwLock::new(None), - history_store: tokio::sync::RwLock::new(None), - }), - } - } - - /// Get the gRPC service for this component. - /// - /// This returns a tonic service that can be added to a gRPC server. - pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> { - HistoryServer::new(HistoryGrpcService { - inner: self.inner.clone(), - }) - } -} - -impl Default for HistoryComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for HistoryComponent { - fn name(&self) -> &'static str { - "history" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - // Create the history store - let host_id = Settings::host_id().await?; - let history_store = - HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); - - *self.inner.history_store.write().await = Some(history_store); - *self.inner.handle.write().await = Some(handle); - - tracing::info!("history component started"); - Ok(()) - } - - async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { - // History component produces events but doesn't need to react to them - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - tracing::info!("history component stopped"); - Ok(()) - } -} - -/// The gRPC service implementation. -/// -/// This is a thin wrapper that delegates to the component's shared state. -pub struct HistoryGrpcService { - inner: Arc<HistoryComponentInner>, -} - -fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { - TailHistoryReply { - kind: kind as i32, - history: Some(HistoryEntry { - timestamp: history.timestamp.unix_timestamp_nanos() as u64, - id: history.id.0, - command: history.command, - cwd: history.cwd, - session: history.session, - hostname: history.hostname, - author: history.author, - intent: history.intent.unwrap_or_default(), - exit: history.exit, - duration: history.duration, - }), - } -} - -#[tonic::async_trait] -impl HistorySvc for HistoryGrpcService { - type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; - - #[instrument(skip_all, level = Level::INFO)] - async fn start_history( - &self, - request: Request<StartHistoryRequest>, - ) -> Result<Response<StartHistoryReply>, Status> { - let req = request.into_inner(); - - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { - Status::invalid_argument( - "failed to parse timestamp as unix time (expected nanos since epoch)", - ) - })?; - - let h: History = History::daemon() - .timestamp(timestamp) - .command(req.command) - .cwd(req.cwd) - .session(req.session) - .hostname(req.hostname) - .author(req.author) - .intent(req.intent) - .build() - .into(); - - // Emit the event - if let Some(handle) = self.inner.handle.read().await.as_ref() { - handle.emit(DaemonEvent::HistoryStarted(h.clone())); - } - - let id = h.id.clone(); - tracing::info!(id = id.to_string(), "start history"); - self.inner.running.insert(id.clone(), h); - - let reply = StartHistoryReply { - id: id.to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn end_history( - &self, - request: Request<EndHistoryRequest>, - ) -> Result<Response<EndHistoryReply>, Status> { - let req = request.into_inner(); - let id = HistoryId(req.id); - - if let Some((_, mut history)) = self.inner.running.remove(&id) { - history.exit = req.exit; - history.duration = match req.duration { - 0 => i64::try_from( - (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), - ) - .expect("failed to convert calculated duration to i64"), - value => i64::try_from(value).expect("failed to get i64 duration"), - }; - - // Get the handle and store to save the history - let handle_guard = self.inner.handle.read().await; - let handle = handle_guard - .as_ref() - .ok_or_else(|| Status::internal("component not initialized"))?; - - let store_guard = self.inner.history_store.read().await; - let history_store = store_guard - .as_ref() - .ok_or_else(|| Status::internal("component not initialized"))?; - - // Save to database - handle - .history_db() - .save(&history) - .await - .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); - - // Push to record store - let (record_id, idx) = history_store - .push(history.clone()) - .await - .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; - - // Emit the event - handle.emit(DaemonEvent::HistoryEnded(history)); - - let reply = EndHistoryReply { - id: record_id.0.to_string(), - idx, - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - return Ok(Response::new(reply)); - } - - Err(Status::not_found(format!( - "could not find history with id: {id}" - ))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn tail_history( - &self, - _request: Request<TailHistoryRequest>, - ) -> Result<Response<Self::TailHistoryStream>, Status> { - let handle_guard = self.inner.handle.read().await; - let handle = handle_guard - .as_ref() - .cloned() - .ok_or_else(|| Status::internal("component not initialized"))?; - - let mut rx = handle.subscribe(); - let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); - - tokio::spawn(async move { - loop { - let event = match rx.recv().await { - Ok(event) => event, - Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { - let _ = tx - .send(Err(Status::resource_exhausted(format!( - "tail stream lagged behind and dropped {skipped} events" - )))) - .await; - break; - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - }; - - let reply = match event { - DaemonEvent::HistoryStarted(history) => { - Some(history_to_tail_reply(HistoryEventKind::Started, history)) - } - DaemonEvent::HistoryEnded(history) => { - Some(history_to_tail_reply(HistoryEventKind::Ended, history)) - } - _ => None, - }; - - if let Some(reply) = reply - && tx.send(Ok(reply)).await.is_err() - { - break; - } - } - }); - - let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); - Ok(Response::new(Box::pin(stream))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn status( - &self, - _request: Request<StatusRequest>, - ) -> Result<Response<StatusReply>, Status> { - let reply = StatusReply { - healthy: true, - version: env!("CARGO_PKG_VERSION").to_string(), - pid: std::process::id(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn shutdown( - &self, - _request: Request<ShutdownRequest>, - ) -> Result<Response<ShutdownReply>, Status> { - // Use the daemon handle to request shutdown - if let Some(handle) = self.inner.handle.read().await.as_ref() { - handle.shutdown(); - } - Ok(Response::new(ShutdownReply { accepted: true })) - } -} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs deleted file mode 100644 index 447e31df..00000000 --- a/crates/atuin-daemon/src/components/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! Daemon components. -//! -//! Components are the building blocks of the daemon. Each component handles -//! a specific domain and can: -//! -//! - Expose gRPC services -//! - React to events -//! - Spawn background tasks -//! -//! Available components: -//! -//! - [`history::HistoryComponent`]: Command history lifecycle management -//! - [`search::SearchComponent`]: Fuzzy search over history -//! - [`semantic::SemanticComponent`]: In-memory semantic command captures -//! - [`sync::SyncComponent`]: Cloud sync - -pub mod history; -pub mod search; -pub mod semantic; -pub mod sync; - -pub use history::HistoryComponent; -pub use search::SearchComponent; -pub use semantic::SemanticComponent; -pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs deleted file mode 100644 index 9fc87fae..00000000 --- a/crates/atuin-daemon/src/components/search.rs +++ /dev/null @@ -1,413 +0,0 @@ -//! Search component. -//! -//! Provides fuzzy search over command history using the Nucleo search library -//! with frecency-based ranking and dynamic filtering. - -use std::{pin::Pin, sync::Arc}; - -use atuin_client::database::Database; -use eyre::Result; -use tokio::sync::RwLock; -use tokio_stream::Stream; -use tonic::{Request, Response, Status, Streaming}; -use tracing::{Level, debug, info, instrument, span, trace}; -use uuid::Uuid; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - search::{ - FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, - search_server::{Search as SearchSvc, SearchServer}, - }, -}; - -const PAGE_SIZE: usize = 5000; -const RESULTS_LIMIT: u32 = 200; -/// How often to rebuild the frecency map (in seconds). -const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; - -/// Search component - provides fuzzy search over command history. -/// -/// This component: -/// - Maintains a deduplicated search index with frecency ranking -/// - Loads history from the database on startup -/// - Updates the index when history events occur -/// - Provides the Search gRPC service -pub struct SearchComponent { - index: Arc<RwLock<SearchIndex>>, - handle: tokio::sync::RwLock<Option<DaemonHandle>>, - loader_handle: Option<tokio::task::JoinHandle<()>>, - frecency_handle: Option<tokio::task::JoinHandle<()>>, -} - -impl SearchComponent { - /// Create a new search component. - pub fn new() -> Self { - Self { - index: Arc::new(RwLock::new(SearchIndex::new())), - handle: tokio::sync::RwLock::new(None), - loader_handle: None, - frecency_handle: None, - } - } - - /// Get the gRPC service for this component. - pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> { - SearchServer::new(SearchGrpcService { - index: self.index.clone(), - }) - } - - /// Rebuild the entire search index from the database. - async fn rebuild_index(&self) -> Result<()> { - let handle_guard = self.handle.read().await; - let handle = handle_guard - .as_ref() - .ok_or_else(|| eyre::eyre!("component not initialized"))?; - - info!("Rebuilding search index from database"); - - // Create a new index - let new_index = SearchIndex::new(); - - // Load all history into the new index - let db = handle.history_db().clone(); - let mut pager = db.all_paged(PAGE_SIZE, false, true); - loop { - match pager.next().await { - Ok(Some(histories)) => { - info!( - "Loading {} history entries into search index", - histories.len() - ); - new_index.add_histories(&histories); - } - Ok(None) => break, - Err(e) => { - tracing::error!("Failed to load history during rebuild: {}", e); - break; - } - } - } - - info!( - "Search index rebuild complete; {} unique commands", - new_index.command_count() - ); - - // Replace the old index with the new one - *self.index.write().await = new_index; - Ok(()) - } -} - -impl Default for SearchComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SearchComponent { - fn name(&self) -> &'static str { - "search" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - *self.handle.write().await = Some(handle.clone()); - - // Spawn background task to load history into index - let index = self.index.clone(); - let db = handle.history_db().clone(); - let handle_for_loader = handle.clone(); - - self.loader_handle = Some(tokio::spawn(async move { - info!( - "Loading history into search index; page size = {}", - PAGE_SIZE - ); - let mut pager = db.all_paged(PAGE_SIZE, false, true); - loop { - match pager.next().await { - Ok(Some(histories)) => { - info!( - "Loading {} history entries into search index", - histories.len() - ); - index.read().await.add_histories(&histories); - } - Ok(None) => { - info!( - "Initial history load complete; {} unique commands indexed", - index.read().await.command_count() - ); - // Build initial frecency map with current settings - let settings = handle_for_loader.settings().await; - index.read().await.rebuild_frecency(&settings.search).await; - info!("Initial frecency map built"); - break; - } - Err(e) => { - tracing::error!("Failed to load history: {}", e); - break; - } - } - } - })); - - // Spawn background task to periodically refresh frecency - let index_for_frecency = self.index.clone(); - let handle_for_frecency = handle.clone(); - self.frecency_handle = Some(tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs( - FRECENCY_REFRESH_INTERVAL_SECS, - )); - loop { - interval.tick().await; - trace!("Refreshing frecency map"); - let settings = handle_for_frecency.settings().await; - index_for_frecency - .read() - .await - .rebuild_frecency(&settings.search) - .await; - } - })); - - tracing::info!("search component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - match event { - DaemonEvent::RecordsAdded(records) => { - debug!( - count = records.len(), - "Processing added records for search index" - ); - - let handle_guard = self.handle.read().await; - if let Some(handle) = handle_guard.as_ref() { - let histories: Vec<_> = handle - .history_db() - .query_history( - format!( - "select * from history where id in ({})", - records - .iter() - .map(|record| record.0.to_string()) - .collect::<Vec<_>>() - .join(",") - ) - .as_str(), - ) - .await - .unwrap_or_default(); - - span!(Level::TRACE, "inject_records", count = histories.len()) - .in_scope(async || { - self.index.read().await.add_histories(&histories); - }) - .await; - } - } - DaemonEvent::HistoryStarted(history) => { - debug!(id = %history.id, command = %history.command, "History started (no index action)"); - } - DaemonEvent::HistoryEnded(history) => { - span!(Level::TRACE, "inject_history_ended") - .in_scope(async || { - self.index.read().await.add_history(history); - }) - .await; - } - DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { - info!("History store pruned or rebuilt, rebuilding search index"); - if let Err(e) = self.rebuild_index().await { - tracing::error!("Failed to rebuild search index: {}", e); - } - } - DaemonEvent::HistoryDeleted { ids } => { - info!( - count = ids.len(), - "History deleted, rebuilding search index" - ); - // For now, just rebuild the entire index. A more efficient implementation - // would remove specific items from the index. - if let Err(e) = self.rebuild_index().await { - tracing::error!("Failed to rebuild search index: {}", e); - } - } - DaemonEvent::SettingsReloaded => { - info!("Settings reloaded, rebuilding frecency map with new multipliers"); - let handle_guard = self.handle.read().await; - if let Some(handle) = handle_guard.as_ref() { - let settings = handle.settings().await; - self.index - .read() - .await - .rebuild_frecency(&settings.search) - .await; - } - } - // Events we don't care about - DaemonEvent::SyncCompleted { .. } - | DaemonEvent::SyncFailed { .. } - | DaemonEvent::ForceSync - | DaemonEvent::ShutdownRequested => {} - } - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - if let Some(handle) = self.loader_handle.take() { - handle.abort(); - } - if let Some(handle) = self.frecency_handle.take() { - handle.abort(); - } - tracing::info!("search component stopped"); - Ok(()) - } -} - -/// The gRPC service implementation. -pub struct SearchGrpcService { - index: Arc<RwLock<SearchIndex>>, -} - -#[tonic::async_trait] -impl SearchSvc for SearchGrpcService { - type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>; - - #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] - async fn search( - &self, - request: Request<Streaming<SearchRequest>>, - ) -> Result<Response<Self::SearchStream>, Status> { - let mut in_stream = request.into_inner(); - let index = self.index.clone(); - - // Create output channel - let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128); - - // Spawn task to handle incoming requests and send responses - tokio::spawn(async move { - while let Some(req) = in_stream.message().await.transpose() { - match req { - Ok(search_req) => { - let query = search_req.query; - let query_id = search_req.query_id; - let filter_mode: FilterMode = search_req - .filter_mode - .try_into() - .unwrap_or(FilterMode::Global); - let proto_context = search_req.context; - - debug!( - "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", - query, - query_id, - filter_mode.as_str_name(), - proto_context - ); - - // Convert proto FilterMode + context to IndexFilterMode - let index_filter = convert_filter_mode(filter_mode, &proto_context); - - // Build QueryContext from proto context - let query_context = proto_context - .map(|ctx| QueryContext { - cwd: Some(with_trailing_slash(&ctx.cwd)), - git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), - hostname: Some(ctx.hostname), - session_id: Some(ctx.session_id), - }) - .unwrap_or_default(); - - // Perform the search - let history_ids = - span!(Level::TRACE, "daemon_search_query", %query, query_id) - .in_scope(|| async { - let index = index.read().await; - index - .search(&query, index_filter, &query_context, RESULTS_LIMIT) - .await - }) - .await; - - // Convert history IDs to bytes - let ids: Vec<Vec<u8>> = history_ids - .iter() - .filter_map(|id| { - Uuid::parse_str(id) - .ok() - .map(|uuid| uuid.as_bytes().to_vec()) - }) - .collect(); - - if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { - break; // Client disconnected - } - } - Err(e) => { - let _ = tx.send(Err(e)).await; - break; - } - } - } - }); - - // Convert receiver to stream - let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream))) - } -} - -/// Convert proto FilterMode and context to IndexFilterMode. -fn convert_filter_mode( - mode: FilterMode, - context: &Option<crate::search::SearchContext>, -) -> IndexFilterMode { - match (mode, context) { - (FilterMode::Global, _) => IndexFilterMode::Global, - (FilterMode::Directory, Some(ctx)) => { - IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) - } - (FilterMode::Workspace, Some(ctx)) => { - if let Some(ref git_root) = ctx.git_root { - IndexFilterMode::Workspace(with_trailing_slash(git_root)) - } else { - // Fall back to directory if no git root - IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) - } - } - (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), - (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), - (FilterMode::SessionPreload, Some(ctx)) => { - // SessionPreload is similar to Session - filter by session - IndexFilterMode::Session(ctx.session_id.clone()) - } - // If no context provided, fall back to global - _ => IndexFilterMode::Global, - } -} - -#[cfg(windows)] -pub fn with_trailing_slash(s: &str) -> String { - if s.ends_with('\\') { - s.to_string() - } else { - format!("{}\\", s) - } -} - -#[cfg(not(windows))] -pub fn with_trailing_slash(s: &str) -> String { - if s.ends_with('/') { - s.to_string() - } else { - format!("{}/", s) - } -} diff --git a/crates/atuin-daemon/src/components/semantic.rs b/crates/atuin-daemon/src/components/semantic.rs deleted file mode 100644 index dff38fd3..00000000 --- a/crates/atuin-daemon/src/components/semantic.rs +++ /dev/null @@ -1,900 +0,0 @@ -//! Semantic command capture component. -//! -//! This is a prototype in-memory store for completed command captures emitted -//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes -//! them by history ID for AI tool lookup. - -use std::collections::{HashMap, VecDeque}; -use std::fmt::{Display, Formatter}; -use std::sync::Arc; - -use atuin_client::history::{History, HistoryId}; -use eyre::Result; -use tokio::sync::Mutex; -use tonic::{Request, Response, Status, Streaming}; -use tracing::{Level, instrument}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, - semantic::{ - CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply, - semantic_server::{Semantic as SemanticSvc, SemanticServer}, - }, -}; - -const MAX_SESSIONS: usize = 20; -const MAX_COMMANDS_PER_SESSION: usize = 128; -const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024; -const MAX_PENDING_HISTORIES: usize = 128; - -/// Stores completed command captures and associates them with history events. -pub struct SemanticComponent { - inner: Arc<SemanticComponentInner>, -} - -struct SemanticComponentInner { - state: Mutex<SemanticState>, -} - -#[derive(Default)] -struct SemanticState { - sessions: HashMap<SessionId, SessionCaptures>, - session_lru: VecDeque<SessionId>, - history_index: HashMap<HistoryId, CaptureRef>, - pending_histories: VecDeque<History>, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct SessionId(String); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -struct CaptureId(u64); - -#[derive(Debug, Clone, PartialEq, Eq)] -struct CaptureRef { - session_id: SessionId, - capture_id: CaptureId, -} - -#[derive(Default)] -struct SessionCaptures { - next_id: u64, - records: VecDeque<StoredCapture>, - output_bytes: usize, -} - -struct StoredCapture { - id: CaptureId, - history_id: HistoryId, - output_bytes: usize, - record: SemanticCommandRecord, -} - -struct EvictedCapture { - history_id: HistoryId, - capture_id: CaptureId, -} - -#[derive(Debug, Clone)] -struct SemanticCommandRecord { - capture: CommandCapture, - history: Option<History>, -} - -impl SemanticComponent { - pub fn new() -> Self { - Self { - inner: Arc::new(SemanticComponentInner { - state: Mutex::new(SemanticState::default()), - }), - } - } - - pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> { - SemanticServer::new(SemanticGrpcService { - inner: self.inner.clone(), - }) - } -} - -impl Default for SemanticComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SemanticComponent { - fn name(&self) -> &'static str { - "semantic" - } - - async fn start(&mut self, _handle: DaemonHandle) -> Result<()> { - tracing::info!("semantic component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::HistoryEnded(history) = event { - self.inner.record_history(history.clone()).await; - } - - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - let state = self.inner.state.lock().await; - tracing::info!( - sessions = state.sessions.len(), - records = state.record_count(), - indexed_histories = state.history_index.len(), - pending_histories = state.pending_histories.len(), - "semantic component stopped" - ); - Ok(()) - } -} - -impl SemanticComponentInner { - async fn record_capture(&self, capture: CommandCapture) -> bool { - let mut state = self.state.lock().await; - state.record_capture(capture) - } - - async fn record_history(&self, history: History) { - let mut state = self.state.lock().await; - state.record_history(history); - } - - async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply { - let mut state = self.state.lock().await; - state.command_output(request) - } -} - -impl SemanticState { - fn record_capture(&mut self, mut capture: CommandCapture) -> bool { - let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else { - tracing::debug!( - command_bytes = capture.command.len(), - prompt_bytes = capture.prompt.len(), - output_bytes = capture.output.len(), - output_truncated = capture.output_truncated, - "dropping semantic command capture without history id" - ); - return false; - }; - - let history = take_pending_history(&mut self.pending_histories, &history_id); - let Some(session_id) = capture - .session_id - .as_deref() - .and_then(|session_id| SessionId::try_from(session_id).ok()) - .or_else(|| { - history - .as_ref() - .and_then(|history| SessionId::try_from(history.session.as_str()).ok()) - }) - else { - tracing::debug!( - history_id = %history_id, - command_bytes = capture.command.len(), - prompt_bytes = capture.prompt.len(), - output_bytes = capture.output.len(), - output_truncated = capture.output_truncated, - "dropping semantic command capture without session id" - ); - return false; - }; - - capture.history_id = Some(history_id.to_string()); - capture.session_id = Some(session_id.to_string()); - if capture.output_observed_bytes == 0 { - capture.output_observed_bytes = capture.output.len() as u64; - } - - let record = SemanticCommandRecord { capture, history }; - log_record(&record, "recorded semantic command capture"); - self.push_record(session_id, history_id, record); - true - } - - fn record_history(&mut self, history: History) { - let history_id = history.id.clone(); - - if let Some(capture_ref) = self.history_index.get(&history_id).cloned() { - if let Some(stored) = self.stored_capture_mut(&capture_ref) { - stored.record.history = Some(history); - log_record( - &stored.record, - "associated semantic command capture with history", - ); - return; - } - - self.history_index.remove(&history_id); - } - - tracing::debug!( - id = %history.id, - command_bytes = history.command.len(), - "history ended before semantic capture arrived" - ); - push_pending_history(&mut self.pending_histories, history); - } - - fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply { - let Some(history_id) = history_id_from_str(Some(&request.history_id)) else { - return command_output_not_found(); - }; - let Some(capture_ref) = self.history_index.get(&history_id).cloned() else { - return command_output_not_found(); - }; - - let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else { - self.history_index.remove(&history_id); - return command_output_not_found(); - }; - - self.touch_session(&capture_ref.session_id); - reply - } - - fn command_output_for_ref( - &self, - capture_ref: &CaptureRef, - ranges: &[crate::semantic::OutputRange], - ) -> Option<CommandOutputReply> { - let stored = self - .sessions - .get(&capture_ref.session_id)? - .stored_capture(capture_ref.capture_id)?; - let output = &stored.record.capture.output; - let output_observed_bytes = stored - .record - .capture - .output_observed_bytes - .max(output.len() as u64); - - Some(CommandOutputReply { - found: true, - output: String::new(), - total_bytes: output.len() as u64, - total_lines: output.lines().count() as u64, - lines: select_output_ranges(output, ranges), - output_truncated: stored.record.capture.output_truncated, - output_observed_bytes, - }) - } - - fn push_record( - &mut self, - session_id: SessionId, - history_id: HistoryId, - record: SemanticCommandRecord, - ) { - self.touch_session(&session_id); - - let (capture_id, evicted) = { - let session = self.sessions.entry(session_id.clone()).or_default(); - session.push(history_id.clone(), record) - }; - - let capture_ref = CaptureRef { - session_id: session_id.clone(), - capture_id, - }; - self.history_index.insert(history_id, capture_ref); - - for evicted in evicted { - self.remove_history_index_if_matches( - &session_id, - &evicted.history_id, - evicted.capture_id, - ); - } - - self.expire_lru_sessions(); - } - - fn touch_session(&mut self, session_id: &SessionId) { - if let Some(index) = self.session_lru.iter().position(|id| id == session_id) { - self.session_lru.remove(index); - } - self.session_lru.push_back(session_id.clone()); - } - - fn expire_lru_sessions(&mut self) { - while self.session_lru.len() > MAX_SESSIONS { - let Some(session_id) = self.session_lru.pop_front() else { - break; - }; - let Some(session) = self.sessions.remove(&session_id) else { - continue; - }; - - for stored in session.records { - self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id); - } - } - } - - fn remove_history_index_if_matches( - &mut self, - session_id: &SessionId, - history_id: &HistoryId, - capture_id: CaptureId, - ) { - if self - .history_index - .get(history_id) - .is_some_and(|capture_ref| { - &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id - }) - { - self.history_index.remove(history_id); - } - } - - fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> { - self.sessions - .get_mut(&capture_ref.session_id)? - .stored_capture_mut(capture_ref.capture_id) - } - - fn record_count(&self) -> usize { - self.sessions - .values() - .map(|session| session.records.len()) - .sum() - } -} - -impl SessionCaptures { - fn push( - &mut self, - history_id: HistoryId, - record: SemanticCommandRecord, - ) -> (CaptureId, Vec<EvictedCapture>) { - self.push_with_limits( - history_id, - record, - MAX_COMMANDS_PER_SESSION, - MAX_BYTES_PER_SESSION, - ) - } - - fn push_with_limits( - &mut self, - history_id: HistoryId, - record: SemanticCommandRecord, - max_commands: usize, - max_output_bytes: usize, - ) -> (CaptureId, Vec<EvictedCapture>) { - let capture_id = CaptureId(self.next_id); - self.next_id = self.next_id.saturating_add(1); - let output_bytes = record.capture.output.len(); - self.output_bytes = self.output_bytes.saturating_add(output_bytes); - self.records.push_back(StoredCapture { - id: capture_id, - history_id, - output_bytes, - record, - }); - - ( - capture_id, - self.evict_to_limits(max_commands, max_output_bytes), - ) - } - - fn evict_to_limits( - &mut self, - max_commands: usize, - max_output_bytes: usize, - ) -> Vec<EvictedCapture> { - let mut evicted = Vec::new(); - while self.records.len() > max_commands || self.output_bytes > max_output_bytes { - let Some(record) = self.records.pop_front() else { - break; - }; - self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes); - evicted.push(EvictedCapture { - history_id: record.history_id, - capture_id: record.id, - }); - } - evicted - } - - fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> { - self.records.iter().find(|record| record.id == capture_id) - } - - fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> { - self.records - .iter_mut() - .find(|record| record.id == capture_id) - } -} - -impl TryFrom<&str> for SessionId { - type Error = (); - - fn try_from(value: &str) -> std::result::Result<Self, Self::Error> { - let value = value.trim(); - if value.is_empty() { - return Err(()); - } - - Ok(Self(value.to_string())) - } -} - -impl TryFrom<String> for SessionId { - type Error = (); - - fn try_from(value: String) -> std::result::Result<Self, Self::Error> { - Self::try_from(value.as_str()) - } -} - -impl AsRef<str> for SessionId { - fn as_ref(&self) -> &str { - &self.0 - } -} - -impl Display for SessionId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } -} - -pub struct SemanticGrpcService { - inner: Arc<SemanticComponentInner>, -} - -#[tonic::async_trait] -impl SemanticSvc for SemanticGrpcService { - #[instrument(skip_all, level = Level::INFO)] - async fn record_commands( - &self, - request: Request<Streaming<CommandCapture>>, - ) -> Result<Response<RecordCommandsReply>, Status> { - let mut stream = request.into_inner(); - let mut accepted = 0_u64; - - while let Some(capture) = stream.message().await? { - if self.inner.record_capture(capture).await { - accepted += 1; - } - } - - Ok(Response::new(RecordCommandsReply { accepted })) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn command_output( - &self, - request: Request<CommandOutputRequest>, - ) -> Result<Response<CommandOutputReply>, Status> { - let request = request.into_inner(); - if request.history_id.trim().is_empty() { - return Err(Status::invalid_argument("history_id is required")); - } - - Ok(Response::new(self.inner.command_output(&request).await)) - } -} - -fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> { - let value = value?.trim(); - (!value.is_empty()).then(|| HistoryId(value.to_string())) -} - -fn take_pending_history( - histories: &mut VecDeque<History>, - history_id: &HistoryId, -) -> Option<History> { - let index = histories - .iter() - .position(|history| &history.id == history_id)?; - histories.remove(index) -} - -fn push_pending_history(histories: &mut VecDeque<History>, history: History) { - if let Some(index) = histories - .iter() - .position(|pending| pending.id == history.id) - { - histories.remove(index); - } - - histories.push_back(history); - trim_front(histories, MAX_PENDING_HISTORIES); -} - -fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) { - while records.len() > max_len { - records.pop_front(); - } -} - -fn command_output_not_found() -> CommandOutputReply { - CommandOutputReply { - found: false, - output: String::new(), - total_bytes: 0, - total_lines: 0, - lines: Vec::new(), - output_truncated: false, - output_observed_bytes: 0, - } -} - -fn select_output_ranges(output: &str, ranges: &[crate::semantic::OutputRange]) -> Vec<OutputLine> { - let lines: Vec<&str> = output.lines().collect(); - if lines.is_empty() { - return Vec::new(); - } - - let ranges = if ranges.is_empty() { - vec![crate::semantic::OutputRange { start: 0, end: 999 }] - } else { - ranges.to_vec() - }; - - let mut ranges = ranges - .into_iter() - .filter_map(|range| normalize_line_range(range.start, range.end, lines.len())) - .collect::<Vec<_>>(); - ranges.sort_unstable_by_key(|(start, _)| *start); - - let mut merged: Vec<(usize, usize)> = Vec::new(); - for (start, end) in ranges { - match merged.last_mut() { - Some((_, merged_end)) if start <= merged_end.saturating_add(1) => { - *merged_end = (*merged_end).max(end); - } - _ => merged.push((start, end)), - } - } - - merged - .into_iter() - .flat_map(|(start, end)| { - lines[start..=end] - .iter() - .enumerate() - .map(move |(offset, line)| OutputLine { - line_number: (start + offset + 1) as u64, - content: (*line).to_string(), - }) - }) - .collect() -} - -fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> { - let line_count = i64::try_from(line_count).ok()?; - let start = if start < 0 { line_count + start } else { start }; - let end = if end < 0 { line_count + end } else { end }; - - if end < 0 || start >= line_count { - return None; - } - - let start = start.max(0); - let end = end.min(line_count - 1); - - (start <= end).then_some((start as usize, end as usize)) -} - -fn log_record(record: &SemanticCommandRecord, message: &'static str) { - let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>"); - let associated_history_id = record - .history - .as_ref() - .map(|history| history.id.to_string()); - let exit = record.history.as_ref().map(|history| history.exit); - let duration = record.history.as_ref().map(|history| history.duration); - let author = record - .history - .as_ref() - .map(|history| history.author.as_str()); - let session_id = record.capture.session_id.as_deref(); - - tracing::debug!( - history_id = %history_id, - associated_history_id = ?associated_history_id, - session_id = ?session_id, - command_bytes = record.capture.command.len(), - prompt_bytes = record.capture.prompt.len(), - output_bytes = record.capture.output.len(), - output_truncated = record.capture.output_truncated, - output_observed_bytes = record.capture.output_observed_bytes, - capture_exit_code = ?record.capture.exit_code, - history_exit = ?exit, - duration = ?duration, - author = ?author, - "{message}" - ); -} - -#[cfg(test)] -mod tests { - use super::*; - use time::OffsetDateTime; - - fn history(id: &str, session: &str, command: &str) -> History { - History { - id: HistoryId(id.to_string()), - timestamp: OffsetDateTime::UNIX_EPOCH, - duration: 0, - exit: 0, - command: command.to_string(), - cwd: String::new(), - session: session.to_string(), - hostname: String::new(), - author: String::new(), - intent: None, - deleted_at: None, - } - } - - fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture { - CommandCapture { - prompt: String::new(), - command: String::new(), - output: output.to_string(), - exit_code: None, - history_id: history_id.map(str::to_string), - session_id: session_id.map(str::to_string), - output_truncated: false, - output_observed_bytes: output.len() as u64, - } - } - - fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply { - state.command_output(&CommandOutputRequest { - history_id: history_id.to_string(), - ranges: Vec::new(), - }) - } - - fn output_line(line_number: u64, content: &str) -> OutputLine { - OutputLine { - line_number, - content: content.to_string(), - } - } - - #[test] - fn drops_capture_without_history_id() { - let mut state = SemanticState::default(); - - assert!(!state.record_capture(capture(None, Some("session-1"), "output"))); - assert!(!command_output(&mut state, "id-1").found); - assert_eq!(state.record_count(), 0); - } - - #[test] - fn stores_capture_by_session_and_history_id() { - let mut state = SemanticState::default(); - - assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); - - let reply = command_output(&mut state, "id-1"); - assert!(reply.found); - assert_eq!(reply.total_bytes, 6); - assert_eq!(reply.output_observed_bytes, 6); - assert_eq!(reply.lines, vec![output_line(1, "output")]); - } - - #[test] - fn uses_pending_history_session_when_capture_session_is_missing() { - let mut state = SemanticState::default(); - - state.record_history(history("id-1", "session-from-history", "cargo test")); - assert!(state.record_capture(capture(Some("id-1"), None, "output"))); - - assert!( - state - .sessions - .contains_key(&SessionId("session-from-history".to_string())) - ); - assert!(command_output(&mut state, "id-1").found); - } - - #[test] - fn associates_history_by_id_after_capture_arrives() { - let mut state = SemanticState::default(); - - assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output"))); - state.record_history(history("id-1", "session-1", "different command")); - - let capture_ref = state - .history_index - .get(&HistoryId("id-1".to_string())) - .unwrap(); - let stored = state - .sessions - .get(&capture_ref.session_id) - .unwrap() - .stored_capture(capture_ref.capture_id) - .unwrap(); - assert!(stored.record.history.is_some()); - } - - #[test] - fn evicts_oldest_command_when_session_ring_is_full() { - let mut state = SemanticState::default(); - - for index in 0..=MAX_COMMANDS_PER_SESSION { - assert!(state.record_capture(capture( - Some(&format!("id-{index}")), - Some("session-1"), - "output", - ))); - } - - assert!(!command_output(&mut state, "id-0").found); - assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found); - assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION); - } - - #[test] - fn evicts_oldest_session_after_lru_limit() { - let mut state = SemanticState::default(); - - for index in 0..MAX_SESSIONS { - assert!(state.record_capture(capture( - Some(&format!("id-{index}")), - Some(&format!("session-{index}")), - "output", - ))); - } - assert!(command_output(&mut state, "id-0").found); - - assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",))); - - assert!(command_output(&mut state, "id-0").found); - assert!(!command_output(&mut state, "id-1").found); - assert!(command_output(&mut state, "new-id").found); - assert_eq!(state.sessions.len(), MAX_SESSIONS); - } - - #[test] - fn evicts_by_session_byte_limit() { - let mut session = SessionCaptures::default(); - let first_output = "x".repeat(10); - let second_output = "y"; - let (_, evicted_first) = session.push_with_limits( - HistoryId("first".to_string()), - SemanticCommandRecord { - capture: capture(Some("first"), Some("session-1"), &first_output), - history: None, - }, - MAX_COMMANDS_PER_SESSION, - 10, - ); - assert!(evicted_first.is_empty()); - - let (_, evicted_second) = session.push_with_limits( - HistoryId("second".to_string()), - SemanticCommandRecord { - capture: capture(Some("second"), Some("session-1"), second_output), - history: None, - }, - MAX_COMMANDS_PER_SESSION, - 10, - ); - - assert_eq!(evicted_second.len(), 1); - assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string())); - assert_eq!(session.records.len(), 1); - assert_eq!(session.output_bytes, 1); - } - - #[test] - fn command_output_reports_truncation_metadata() { - let mut state = SemanticState::default(); - let mut capture = capture(Some("id-1"), Some("session-1"), "partial"); - capture.output_truncated = true; - capture.output_observed_bytes = 1024; - - assert!(state.record_capture(capture)); - - let reply = command_output(&mut state, "id-1"); - assert!(reply.output_truncated); - assert_eq!(reply.total_bytes, 7); - assert_eq!(reply.output_observed_bytes, 1024); - } - - #[test] - fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() { - let output = "zero\none\ntwo\nthree\nfour"; - let ranges = vec![ - crate::semantic::OutputRange { start: 1, end: 2 }, - crate::semantic::OutputRange { start: -2, end: -1 }, - ]; - - assert_eq!( - select_output_ranges(output, &ranges), - vec![ - output_line(2, "one"), - output_line(3, "two"), - output_line(4, "three"), - output_line(5, "four"), - ] - ); - } - - #[test] - fn output_ranges_merge_overlaps_and_adjacent_ranges() { - let output = (0..100) - .map(|n| format!("line {n}")) - .collect::<Vec<_>>() - .join("\n"); - let ranges = vec![ - crate::semantic::OutputRange { start: 0, end: 100 }, - crate::semantic::OutputRange { - start: -100, - end: -1, - }, - ]; - - let selected = select_output_ranges(&output, &ranges); - - assert_eq!(selected.len(), 100); - assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); - assert_eq!(selected.last(), Some(&output_line(100, "line 99"))); - } - - #[test] - fn output_ranges_can_leave_gaps_for_client_formatting() { - let output = "zero\none\ntwo\nthree\nfour"; - let ranges = vec![ - crate::semantic::OutputRange { start: 0, end: 1 }, - crate::semantic::OutputRange { start: 4, end: 4 }, - ]; - - assert_eq!( - select_output_ranges(output, &ranges), - vec![ - output_line(1, "zero"), - output_line(2, "one"), - output_line(5, "four"), - ] - ); - } - - #[test] - fn empty_output_ranges_default_to_first_thousand_lines() { - let output = (0..1001) - .map(|n| format!("line {n}")) - .collect::<Vec<_>>() - .join("\n"); - - let selected = select_output_ranges(&output, &[]); - - assert_eq!(selected.len(), 1000); - assert_eq!(selected.first(), Some(&output_line(1, "line 0"))); - assert_eq!(selected.last(), Some(&output_line(1000, "line 999"))); - } - - #[test] - fn output_ranges_skip_ranges_fully_outside_output() { - let output = "zero\none\ntwo"; - let ranges = vec![ - crate::semantic::OutputRange { start: 10, end: 20 }, - crate::semantic::OutputRange { - start: -20, - end: -10, - }, - ]; - - assert_eq!(select_output_ranges(output, &ranges), Vec::new()); - } -} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs deleted file mode 100644 index 6e486250..00000000 --- a/crates/atuin-daemon/src/components/sync.rs +++ /dev/null @@ -1,279 +0,0 @@ -//! Sync component. -//! -//! Handles periodic synchronization with the Atuin cloud server. - -use std::time::Duration; - -use eyre::Result; -use rand::Rng; -use tokio::sync::mpsc; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; - -use crate::{ - daemon::{Component, DaemonHandle}, - events::DaemonEvent, -}; - -/// Commands that can be sent to the sync task. -enum SyncCommand { - /// Trigger an immediate sync. - ForceSync, - /// Stop the sync loop. - Stop, -} - -/// Sync state - tracks whether we're in normal operation or retrying after failure. -#[derive(Clone, Copy, PartialEq, Eq)] -enum SyncState { - /// Normal operation. Periodic syncs only run if auto_sync is enabled. - Idle, - /// Retrying after a sync failure. Retries continue regardless of auto_sync - /// until the sync succeeds. - Retrying, -} - -/// Sync component - handles periodic cloud synchronization. -/// -/// This component: -/// - Runs a background sync loop on a configurable interval -/// - Implements exponential backoff on sync failures -/// - Responds to ForceSync events for immediate sync -/// - Emits SyncCompleted/SyncFailed events -pub struct SyncComponent { - task_handle: Option<tokio::task::JoinHandle<()>>, - command_tx: Option<mpsc::Sender<SyncCommand>>, -} - -impl SyncComponent { - /// Create a new sync component. - pub fn new() -> Self { - Self { - task_handle: None, - command_tx: None, - } - } -} - -impl Default for SyncComponent { - fn default() -> Self { - Self::new() - } -} - -#[tonic::async_trait] -impl Component for SyncComponent { - fn name(&self) -> &'static str { - "sync" - } - - async fn start(&mut self, handle: DaemonHandle) -> Result<()> { - let (cmd_tx, cmd_rx) = mpsc::channel(16); - self.command_tx = Some(cmd_tx); - - // Spawn the sync loop with its own copy of the handle - self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); - - tracing::info!("sync component started"); - Ok(()) - } - - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::ForceSync = event { - tracing::info!("force sync requested"); - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::ForceSync).await; - } - } - Ok(()) - } - - async fn stop(&mut self) -> Result<()> { - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::Stop).await; - } - if let Some(handle) = self.task_handle.take() { - // Give the task a moment to shut down gracefully - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; - } - tracing::info!("sync component stopped"); - Ok(()) - } -} - -/// The main sync loop. -/// -/// This runs in a spawned task and handles periodic sync as well as -/// force sync requests. -async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { - tracing::info!("sync loop starting"); - - // Clone settings since we need them across await points - let settings = handle.settings().await.clone(); - let host_id = match Settings::host_id().await { - Ok(id) => id, - Err(e) => { - tracing::error!("failed to get host id, sync disabled: {e}"); - return; - } - }; - - // Create the stores we need - let encryption_key = *handle.encryption_key(); - let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut sync_state = SyncState::Idle; - - loop { - tokio::select! { - _ = ticker.tick() => { - let settings = handle.settings().await; - - // Skip periodic ticks if auto_sync is disabled AND we're not retrying - // a previous failure. Retries must continue regardless of auto_sync. - if !settings.auto_sync && sync_state == SyncState::Idle { - tracing::debug!("auto_sync disabled, skipping periodic sync tick"); - continue; - } - - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - cmd = cmd_rx.recv() => { - match cmd { - Some(SyncCommand::ForceSync) => { - tracing::info!("executing force sync"); - let settings = handle.settings().await; - sync_state = do_sync_tick( - &handle, - &history_store, - &mut ticker, - max_interval, - &settings, - ).await; - } - Some(SyncCommand::Stop) | None => { - tracing::info!("sync loop stopping"); - break; - } - } - } - } - } -} - -/// Execute a single sync tick. -/// -/// Returns the new sync state: `Idle` on success, `Retrying` on failure. -async fn do_sync_tick( - handle: &DaemonHandle, - history_store: &HistoryStore, - ticker: &mut time::Interval, - max_interval: f64, - settings: &Settings, -) -> SyncState { - tracing::info!("sync tick"); - - // Check if logged in - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - return SyncState::Idle; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - return SyncState::Idle; - } - - // Perform the sync - let res = sync::sync(settings, handle.store(), handle.encryption_key()).await; - - match res { - Err(e) => { - tracing::error!("sync tick failed with {e}"); - - // Emit failure event - handle.emit(DaemonEvent::SyncFailed { - error: e.to_string(), - }); - - // Exponential backoff - let mut rng = rand::thread_rng(); - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - *ticker = time::interval_at( - tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), - time::Duration::from_secs(new_interval as u64), - ); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - tracing::error!("backing off, next sync tick in {new_interval}"); - - SyncState::Retrying - } - Ok((uploaded_count, downloaded_records)) => { - tracing::info!( - uploaded = uploaded_count, - downloaded = downloaded_records.len(), - "sync complete" - ); - - // Build history from downloaded records - if let Err(e) = history_store - .incremental_build(handle.history_db(), &downloaded_records) - .await - { - tracing::error!("failed to build history from downloaded records: {e}"); - } - - // Emit the records added event (for search indexing) - handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); - - // Emit sync completed event - handle.emit(DaemonEvent::SyncCompleted { - uploaded: uploaded_count as usize, - downloaded: downloaded_records.len(), - }); - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - *ticker = time::interval_at( - tokio::time::Instant::now() - + Duration::from_secs(settings.daemon.sync_frequency), - time::Duration::from_secs(settings.daemon.sync_frequency), - ); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - } - - // Store sync time - if let Err(e) = Settings::save_sync_time().await { - tracing::error!("failed to save sync time: {e}"); - } - - SyncState::Idle - } - } -} diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs deleted file mode 100644 index afb29c57..00000000 --- a/crates/atuin-daemon/src/control/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Control module for external event injection. -//! -//! This module provides the gRPC service that allows external processes -//! (like CLI commands) to inject events into the daemon's event bus. - -mod service; - -// Include the generated proto code -tonic::include_proto!("control"); - -// Re-export the service -pub use service::ControlService; diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs deleted file mode 100644 index 2e7403ce..00000000 --- a/crates/atuin-daemon/src/control/service.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Control service implementation. -//! -//! This gRPC service allows external processes (like CLI commands) to inject -//! events into the daemon's event bus. - -use atuin_client::history::HistoryId; -use tonic::{Request, Response, Status}; -use tracing::{Level, info, instrument}; - -use super::{ - SendEventRequest, SendEventResponse, - control_server::{Control, ControlServer}, - send_event_request::Event, -}; -use crate::{daemon::DaemonHandle, events::DaemonEvent}; - -/// The Control gRPC service. -/// -/// This service is used by external processes to inject events into the daemon. -/// It's not a component - it's part of the daemon's core infrastructure. -pub struct ControlService { - handle: DaemonHandle, -} - -impl ControlService { - /// Create a new control service with the given daemon handle. - pub fn new(handle: DaemonHandle) -> Self { - Self { handle } - } - - /// Get a tonic server for this service. - pub fn into_server(self) -> ControlServer<Self> { - ControlServer::new(self) - } -} - -#[tonic::async_trait] -impl Control for ControlService { - #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] - async fn send_event( - &self, - request: Request<SendEventRequest>, - ) -> Result<Response<SendEventResponse>, Status> { - let req = request.into_inner(); - - let event = req - .event - .ok_or_else(|| Status::invalid_argument("event is required"))?; - - let daemon_event = proto_event_to_daemon_event(event)?; - - info!(?daemon_event, "received control event"); - self.handle.emit(daemon_event); - - Ok(Response::new(SendEventResponse {})) - } -} - -/// Convert a proto event to a daemon event. -fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> { - match event { - Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), - Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), - Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { - ids: e.ids.into_iter().map(HistoryId).collect(), - }), - Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), - Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), - Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), - } -} diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs deleted file mode 100644 index 625ca205..00000000 --- a/crates/atuin-daemon/src/daemon.rs +++ /dev/null @@ -1,458 +0,0 @@ -//! Core daemon infrastructure. -//! -//! This module provides the foundational types for building the atuin daemon: -//! -//! - [`DaemonState`]: Shared state owned by the daemon -//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state -//! - [`Component`]: A trait for implementing daemon components -//! - [`Daemon`]: The main daemon orchestrator -//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon - -use std::sync::Arc; - -use atuin_client::{ - database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore, - settings::Settings, -}; -use eyre::{Context, Result}; -use tokio::sync::{RwLock, broadcast}; - -use crate::events::DaemonEvent; - -// ============================================================================ -// DaemonState -// ============================================================================ - -/// Shared state owned by the daemon. -/// -/// This contains all the resources that components and services need access to. -/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`]. -pub struct DaemonState { - // Event bus - event_tx: broadcast::Sender<DaemonEvent>, - - // Configuration (mutable - can be reloaded) - settings: RwLock<Settings>, - - // Encryption key (immutable - derived at startup) - encryption_key: [u8; 32], - - // Database handles - history_db: HistoryDatabase, - store: SqliteStore, -} - -// ============================================================================ -// DaemonHandle -// ============================================================================ - -/// A lightweight handle to the daemon's shared state. -/// -/// This is the primary way for components, gRPC services, and spawned tasks to -/// interact with the daemon. It provides access to: -/// -/// - Event emission and subscription -/// - Configuration (settings, encryption key) -/// - Database handles -/// -/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed -/// around to any code that needs daemon access. -/// -/// # Example -/// -/// ```ignore -/// // Emit an event -/// handle.emit(DaemonEvent::HistoryPruned); -/// -/// // Access settings -/// let settings = handle.settings().await; -/// let sync_freq = settings.daemon.sync_frequency; -/// -/// // Access database -/// let history = handle.history_db().load(id).await?; -/// ``` -#[derive(Clone)] -pub struct DaemonHandle { - state: Arc<DaemonState>, -} - -impl DaemonHandle { - // ---- Events ---- - - /// Emit an event to the daemon's event bus. - /// - /// This is fire-and-forget - if no receivers are listening (which shouldn't - /// happen in normal operation), the event is dropped silently. - pub fn emit(&self, event: DaemonEvent) { - if let Err(e) = self.state.event_tx.send(event) { - tracing::warn!("failed to emit event (no receivers?): {e}"); - } - } - - /// Subscribe to the event bus. - /// - /// Returns a receiver that will receive all events emitted after this call. - /// Useful for components that need to listen for events outside of the - /// normal `handle_event` callback flow. - pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> { - self.state.event_tx.subscribe() - } - - /// Request graceful shutdown of the daemon. - pub fn shutdown(&self) { - self.emit(DaemonEvent::ShutdownRequested); - } - - // ---- Configuration ---- - - /// Get the current settings. - /// - /// This acquires a read lock on the settings. For most use cases, clone - /// the settings if you need to hold onto them. - pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> { - self.state.settings.read().await - } - - /// Reload settings from disk and emit a SettingsReloaded event. - /// - /// Components listening for `SettingsReloaded` can then re-read settings - /// via `handle.settings()` to pick up the changes. - pub async fn reload_settings(&self) -> Result<()> { - let new_settings = Settings::new()?; - self.apply_settings(new_settings).await; - Ok(()) - } - - /// Apply already-loaded settings and emit a SettingsReloaded event. - /// - /// Use this when settings have already been loaded (e.g., from a file watcher) - /// to avoid parsing the config file twice. - pub async fn apply_settings(&self, settings: Settings) { - *self.state.settings.write().await = settings; - self.emit(DaemonEvent::SettingsReloaded); - tracing::info!("settings applied"); - } - - /// Get the encryption key. - pub fn encryption_key(&self) -> &[u8; 32] { - &self.state.encryption_key - } - - // ---- Database ---- - - /// Get a reference to the history database. - pub fn history_db(&self) -> &HistoryDatabase { - &self.state.history_db - } - - /// Get a reference to the record store. - pub fn store(&self) -> &SqliteStore { - &self.state.store - } -} - -impl std::fmt::Debug for DaemonHandle { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DaemonHandle").finish_non_exhaustive() - } -} - -// ============================================================================ -// Component Trait -// ============================================================================ - -/// A daemon component that handles a specific domain. -/// -/// Components are the building blocks of the daemon. Each component: -/// -/// - Has a unique name for logging and debugging -/// - Can optionally expose gRPC services -/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources -/// - Handles events from the event bus -/// - Performs cleanup on shutdown -/// -/// # Lifecycle -/// -/// 1. **Construction**: Component is created (usually via `new()`) -/// 2. **Start**: `start()` is called with a [`DaemonHandle`] -/// 3. **Running**: `handle_event()` is called for each event on the bus -/// 4. **Shutdown**: `stop()` is called for cleanup -/// -/// # Example -/// -/// ```ignore -/// pub struct MyComponent { -/// handle: Option<DaemonHandle>, -/// } -/// -/// #[async_trait] -/// impl Component for MyComponent { -/// fn name(&self) -> &'static str { "my-component" } -/// -/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> { -/// self.handle = Some(handle); -/// Ok(()) -/// } -/// -/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { -/// match event { -/// DaemonEvent::SomeEvent => { -/// // Handle the event -/// if let Some(handle) = &self.handle { -/// handle.emit(DaemonEvent::ResponseEvent); -/// } -/// } -/// _ => {} -/// } -/// Ok(()) -/// } -/// -/// async fn stop(&mut self) -> Result<()> { -/// Ok(()) -/// } -/// } -/// ``` -#[tonic::async_trait] -pub trait Component: Send + Sync { - /// Human-readable name for logging and debugging. - fn name(&self) -> &'static str; - - /// Called once at startup. - /// - /// Store the handle if you need to emit events or access daemon resources - /// later. The handle is cheaply cloneable, so feel free to clone it for - /// spawned tasks. - async fn start(&mut self, handle: DaemonHandle) -> Result<()>; - - /// Handle an incoming event. - /// - /// Called for every event on the bus. To emit new events in response, - /// use the handle stored during `start()`. Events emitted here will be - /// processed in subsequent event loop iterations. - async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; - - /// Called on graceful shutdown. - /// - /// Use this to clean up resources, abort spawned tasks, etc. - async fn stop(&mut self) -> Result<()>; -} - -// ============================================================================ -// Daemon -// ============================================================================ - -/// The main daemon orchestrator. -/// -/// The daemon manages components, runs the event loop, and coordinates startup -/// and shutdown. It is constructed via [`DaemonBuilder`]. -/// -/// # Event Loop -/// -/// The daemon runs a simple event loop: -/// -/// 1. Wait for an event on the bus -/// 2. Dispatch the event to all components (in registration order) -/// 3. Components may emit new events in response -/// 4. Repeat until `ShutdownRequested` is received -/// -/// Events emitted during handling are queued and processed in subsequent -/// iterations, ensuring the loop eventually drains. -pub struct Daemon { - components: Vec<Box<dyn Component>>, - handle: DaemonHandle, -} - -impl Daemon { - /// Create a new daemon builder. - pub fn builder(settings: Settings) -> DaemonBuilder { - DaemonBuilder::new(settings) - } - - /// Get a clone of the daemon handle. - /// - /// The handle can be used to emit events, access settings, etc. - pub fn handle(&self) -> DaemonHandle { - self.handle.clone() - } - - /// Start all components. - /// - /// This must be called before `run_event_loop()`. It initializes all - /// registered components with the daemon handle. - pub async fn start_components(&mut self) -> Result<()> { - for component in &mut self.components { - tracing::info!(component = component.name(), "starting component"); - component - .start(self.handle.clone()) - .await - .with_context(|| format!("failed to start component: {}", component.name()))?; - } - Ok(()) - } - - /// Run the daemon event loop. - /// - /// This processes events until a ShutdownRequested event is received. - /// Components must be started first via `start_components()`. - pub async fn run_event_loop(&mut self) -> Result<()> { - let mut event_rx = self.handle.subscribe(); - loop { - match event_rx.recv().await { - Ok(DaemonEvent::ShutdownRequested) => { - tracing::info!("shutdown requested, stopping daemon"); - break; - } - Ok(event) => { - tracing::debug!(?event, "processing event"); - self.dispatch_event(&event).await; - } - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!( - skipped = n, - "event receiver lagged, some events were dropped" - ); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::info!("event bus closed, stopping daemon"); - break; - } - } - } - Ok(()) - } - - /// Stop all components. - /// - /// This performs graceful shutdown of all components. - pub async fn stop_components(&mut self) { - for component in &mut self.components { - tracing::info!(component = component.name(), "stopping component"); - if let Err(e) = component.stop().await { - tracing::error!( - component = component.name(), - error = ?e, - "error stopping component" - ); - } - } - tracing::info!("all components stopped"); - } - - /// Run the daemon. - /// - /// This is a convenience method that starts components, runs the event loop, - /// and handles shutdown. It does not return until the daemon is shut down. - pub async fn run(mut self) -> Result<()> { - self.start_components().await?; - self.run_event_loop().await?; - self.stop_components().await; - tracing::info!("daemon stopped"); - Ok(()) - } - - async fn dispatch_event(&mut self, event: &DaemonEvent) { - for component in &mut self.components { - if let Err(e) = component.handle_event(event).await { - tracing::error!( - component = component.name(), - error = ?e, - "error handling event" - ); - } - } - } -} - -// ============================================================================ -// DaemonBuilder -// ============================================================================ - -/// Builder for constructing a [`Daemon`]. -/// -/// # Example -/// -/// ```ignore -/// let daemon = Daemon::builder(settings) -/// .store(store) -/// .history_db(history_db) -/// .component(HistoryComponent::new()) -/// .component(SearchComponent::new()) -/// .component(SyncComponent::new()) -/// .build() -/// .await?; -/// -/// daemon.run().await?; -/// ``` -pub struct DaemonBuilder { - settings: Settings, - store: Option<SqliteStore>, - history_db: Option<HistoryDatabase>, - components: Vec<Box<dyn Component>>, -} - -impl DaemonBuilder { - /// Create a new daemon builder with the given settings. - pub fn new(settings: Settings) -> Self { - Self { - settings, - store: None, - history_db: None, - components: Vec::new(), - } - } - - /// Set the record store. - pub fn store(mut self, store: SqliteStore) -> Self { - self.store = Some(store); - self - } - - /// Set the history database. - pub fn history_db(mut self, db: HistoryDatabase) -> Self { - self.history_db = Some(db); - self - } - - /// Register a component. - /// - /// Components are started in registration order and stopped in reverse order. - pub fn component(mut self, component: impl Component + 'static) -> Self { - self.components.push(Box::new(component)); - self - } - - /// Build the daemon. - /// - /// This loads the encryption key and creates the daemon state. - pub async fn build(self) -> Result<Daemon> { - let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?; - let history_db = self - .history_db - .ok_or_else(|| eyre::eyre!("history_db is required"))?; - - // Load encryption key - let encryption_key: [u8; 32] = encryption::load_key(&self.settings) - .context("could not load encryption key")? - .into(); - - // Create the event bus - let (event_tx, _) = broadcast::channel(64); - - // Create the shared state - let state = Arc::new(DaemonState { - event_tx, - settings: RwLock::new(self.settings), - encryption_key, - history_db, - store, - }); - - // Create the handle (just a reference to the state) - let handle = DaemonHandle { state }; - - Ok(Daemon { - components: self.components, - handle, - }) - } -} diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs deleted file mode 100644 index 4e6c6ff3..00000000 --- a/crates/atuin-daemon/src/events.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! Daemon events. -//! -//! Events are the primary communication mechanism within the daemon. -//! Components emit events to notify others of state changes, and handle -//! events to react to changes elsewhere in the system. -//! -//! External processes (like CLI commands) can also inject events via the -//! Control gRPC service. - -use atuin_client::history::{History, HistoryId}; -use atuin_common::record::RecordId; - -/// Events that flow through the daemon's event bus. -/// -/// Events are broadcast to all components. Each component decides which -/// events it cares about in its `handle_event` implementation. -#[derive(Debug, Clone)] -pub enum DaemonEvent { - // ---- History lifecycle ---- - /// A command has started running. - HistoryStarted(History), - - /// A command has finished running. - HistoryEnded(History), - - // ---- Sync ---- - /// Records were synced from the server. - /// - /// The search component uses this to update its index with new history. - RecordsAdded(Vec<RecordId>), - - /// Sync completed successfully. - SyncCompleted { - /// Number of records uploaded. - uploaded: usize, - /// Number of records downloaded. - downloaded: usize, - }, - - /// Sync failed. - SyncFailed { - /// Error message describing what went wrong. - error: String, - }, - - /// Request an immediate sync (external trigger). - ForceSync, - - // ---- External commands ---- - /// History was pruned - search index needs a full rebuild. - /// - /// Emitted when the user runs `atuin history prune` or similar. - HistoryPruned, - - /// History was rebuilt - search index needs a full rebuild. - /// - /// Emitted when the user runs `atuin store rebuild history` or similar. - HistoryRebuilt, - - /// Specific history items were deleted. - /// - /// The search component should remove these from its index. - HistoryDeleted { - /// IDs of the deleted history entries. - ids: Vec<HistoryId>, - }, - - /// Settings have changed, components should reload if needed. - SettingsReloaded, - - // ---- Lifecycle ---- - /// Request graceful shutdown of the daemon. - ShutdownRequested, -} diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs deleted file mode 100644 index b71853df..00000000 --- a/crates/atuin-daemon/src/history/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! History module for the daemon gRPC history service. -//! -//! This module contains the proto-generated types for the history gRPC service. - -// Include the generated proto code -tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs deleted file mode 100644 index 27d3932b..00000000 --- a/crates/atuin-daemon/src/lib.rs +++ /dev/null @@ -1,136 +0,0 @@ -use atuin_client::database::Sqlite as HistoryDatabase; -use atuin_client::record::sqlite_store::SqliteStore; -use atuin_client::settings::{Settings, watcher::global_settings_watcher}; -use eyre::Result; - -pub mod client; -pub mod components; -pub mod control; -pub mod daemon; -pub mod events; -pub mod history; -pub mod search; -pub mod semantic; -pub mod server; - -// Re-export core daemon types for convenience -pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle}; -pub use events::DaemonEvent; - -// Re-export components -pub use components::{HistoryComponent, SearchComponent, SemanticComponent, SyncComponent}; - -// Re-export client helpers -pub use client::{ControlClient, SemanticClient, emit_event, emit_event_with_settings}; - -/// Boot the daemon using the new component-based architecture. -/// -/// This creates a daemon with the standard components (history, search, sync), -/// starts the gRPC server with their services, and runs the event loop. -pub async fn boot( - settings: Settings, - store: SqliteStore, - history_db: HistoryDatabase, -) -> Result<()> { - // Create the components - let history_component = HistoryComponent::new(); - let search_component = SearchComponent::new(); - let semantic_component = SemanticComponent::new(); - let sync_component = SyncComponent::new(); - - // Get the gRPC services before moving components into the daemon - // (The services share state with the components via Arc) - let history_service = history_component.grpc_service(); - let search_service = search_component.grpc_service(); - let semantic_service = semantic_component.grpc_service(); - - // Build the daemon - let mut daemon = Daemon::builder(settings.clone()) - .store(store) - .history_db(history_db) - .component(history_component) - .component(search_component) - .component(semantic_component) - .component(sync_component) - .build() - .await?; - - // Get a handle for the control service and gRPC server shutdown - let handle = daemon.handle(); - - // Create the control service - let control_service = control::ControlService::new(handle.clone()); - - // Start all components first (so gRPC services can work) - daemon.start_components().await?; - - // Spawn config file watcher to reload settings on changes - if let Ok(watcher) = global_settings_watcher() { - let mut settings_rx = watcher.subscribe(); - let watcher_handle = handle.clone(); - tokio::spawn(async move { - tracing::info!("config file watcher started"); - while settings_rx.changed().await.is_ok() { - // Use the already-loaded settings from the watcher - // (avoids parsing the config file twice) - let new_settings = (*settings_rx.borrow()).clone(); - watcher_handle.apply_settings((*new_settings).clone()).await; - } - tracing::debug!("config file watcher stopped"); - }); - } else { - tracing::warn!( - "failed to start config file watcher; settings changes will require daemon restart" - ); - } - - // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM - let signal_handle = handle.clone(); - tokio::spawn(async move { - shutdown_signal().await; - tracing::info!("received shutdown signal"); - signal_handle.shutdown(); - }); - - // Start the gRPC server in the background - server::run_grpc_server( - settings, - history_service, - search_service, - semantic_service, - control_service.into_server(), - handle, - ) - .await?; - - // Run the daemon event loop - daemon.run_event_loop().await?; - - // Stop all components on shutdown - daemon.stop_components().await; - - tracing::info!("daemon shut down complete"); - Ok(()) -} - -/// Wait for a shutdown signal (Ctrl+C or SIGTERM). -#[cfg(unix)] -async fn shutdown_signal() { - let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to register sigterm handler"); - let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("failed to register sigint handler"); - - tokio::select! { - _ = term.recv() => {}, - _ = int.recv() => {}, - } -} - -/// Wait for a shutdown signal (Ctrl+C). -#[cfg(not(unix))] -async fn shutdown_signal() { - tokio::signal::ctrl_c() - .await - .expect("failed to listen for ctrl+c"); -} diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs deleted file mode 100644 index bb155979..00000000 --- a/crates/atuin-daemon/src/search/index.rs +++ /dev/null @@ -1,683 +0,0 @@ -//! Search index with frecency-based ranking. -//! -//! This module provides a deduplicated search index where each unique command -//! is stored once, with metadata about all its invocations. This enables: -//! -//! - Efficient fuzzy matching (fewer items to match) -//! - Frecency-based ranking (frequency + recency) -//! - Dynamic filtering by directory, host, session, etc. - -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use atuin_client::history::{History, is_known_agent}; -use atuin_client::settings::Search; -use atuin_nucleo::{Injector, Nucleo, pattern}; -use dashmap::DashMap; -use lasso::{Spur, ThreadedRodeo}; -use time::OffsetDateTime; -use tokio::sync::RwLock; -use tracing::{Level, instrument}; -use uuid::Uuid; - -use crate::components::search::with_trailing_slash; - -/// Parse a UUID string into a 16-byte array. -/// Returns None if the string is not a valid UUID. -fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> { - Uuid::parse_str(s).ok().map(|u| *u.as_bytes()) -} - -/// Format a 16-byte array as a UUID string. -fn format_uuid_bytes(bytes: &[u8; 16]) -> String { - Uuid::from_bytes(*bytes).to_string() -} - -/// Pre-computed frecency data for O(1) lookup. -#[derive(Debug, Clone, Default)] -pub struct FrecencyData { - /// Total number of times this command was used. - pub count: u32, - /// Most recent usage timestamp (unix seconds). - pub last_used: i64, -} - -impl FrecencyData { - /// Record a new usage of this command. - pub fn record_use(&mut self, timestamp: i64) { - self.count += 1; - if timestamp > self.last_used { - self.last_used = timestamp; - } - } - - /// Compute frecency score based on count and recency. - /// - /// Uses a decay function where more recent commands score higher. - /// The formula balances frequency (how often) with recency (how recent). - /// - /// Multipliers allow tuning the relative weights: - /// - `recency_mul`: Multiplier for recency score (default: 1.0) - /// - `frequency_mul`: Multiplier for frequency score (default: 1.0) - /// - /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight. - /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc. - #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] - pub fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 { - if self.count == 0 { - return 0; - } - - // Time-based decay: score decreases as time passes - let age_seconds = (now - self.last_used).max(0) as u64; - let age_hours = age_seconds / 3600; - - // Decay factor: recent commands get higher scores - // - Last hour: multiplier ~1.0 - // - Last day: multiplier ~0.5 - // - Last week: multiplier ~0.1 - // - Older: multiplier approaches 0 - let recency_score: f64 = match age_hours { - 0 => 100.0, - 1..=6 => 90.0, - 7..=24 => 70.0, - 25..=72 => 50.0, - 73..=168 => 30.0, - 169..=720 => 15.0, - _ => 5.0, - }; - - // Frequency boost: more uses = higher score (with diminishing returns) - let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0); - - // Apply multipliers and combine scores, then round to u32 - ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32 - } -} - -/// Data for a unique command. -pub struct CommandData { - /// History ID of the most recent invocation (16-byte UUID). - most_recent_id: [u8; 16], - /// Timestamp of the most recent invocation. - most_recent_timestamp: i64, - /// Pre-computed global frecency. - pub global_frecency: FrecencyData, - - // Pre-computed indexes for O(1) filter lookups - // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized) - /// All directories where this command has been run (interned keys). - directories: HashSet<Spur>, - /// All hostnames where this command has been run (interned keys). - hosts: HashSet<Spur>, - /// All sessions where this command has been run (as 16-byte UUIDs). - sessions: HashSet<[u8; 16]>, -} - -impl CommandData { - /// Create a new CommandData from a history entry. - /// Returns None if the history entry has invalid UUIDs. - pub fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> { - let history_id = parse_uuid_bytes(&history.id.0)?; - let session = parse_uuid_bytes(&history.session)?; - let timestamp = history.timestamp.unix_timestamp(); - - let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); - let host_key = interner.get_or_intern(&history.hostname); - - let mut directories = HashSet::new(); - directories.insert(dir_key); - - let mut hosts = HashSet::new(); - hosts.insert(host_key); - - let mut sessions = HashSet::new(); - sessions.insert(session); - - let mut global_frecency = FrecencyData::default(); - global_frecency.record_use(timestamp); - - Some(Self { - most_recent_id: history_id, - most_recent_timestamp: timestamp, - global_frecency, - directories, - hosts, - sessions, - }) - } - - /// Add an invocation from a history entry. - /// Returns false if the history entry has invalid UUIDs. - pub fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool { - let Some(history_id) = parse_uuid_bytes(&history.id.0) else { - return false; - }; - let Some(session) = parse_uuid_bytes(&history.session) else { - return false; - }; - - let timestamp = history.timestamp.unix_timestamp(); - - // Update global frecency - self.global_frecency.record_use(timestamp); - - // Update pre-computed indexes for O(1) filter lookups - let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); - self.directories.insert(dir_key); - self.hosts.insert(interner.get_or_intern(&history.hostname)); - self.sessions.insert(session); - - // Update most recent if this invocation is newer - if timestamp > self.most_recent_timestamp { - self.most_recent_id = history_id; - self.most_recent_timestamp = timestamp; - } - - true - } - - /// Get the most recent history ID for this command. - pub fn most_recent_id(&self) -> String { - format_uuid_bytes(&self.most_recent_id) - } - - /// Check if any invocation matches a directory filter (exact match). - /// O(1) lookup using pre-computed index. - pub fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool { - interner - .get(dir) - .is_some_and(|spur| self.directories.contains(&spur)) - } - - /// Check if any invocation matches a directory prefix (workspace/git root). - /// O(n) where n = number of unique directories for this command. - pub fn has_invocation_in_workspace(&self, prefix: &str, interner: &ThreadedRodeo) -> bool { - self.directories - .iter() - .any(|&spur| interner.resolve(&spur).starts_with(prefix)) - } - - /// Check if any invocation matches a hostname. - /// O(1) lookup using pre-computed index. - pub fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool { - interner - .get(hostname) - .is_some_and(|spur| self.hosts.contains(&spur)) - } - - /// Check if any invocation matches a session. - /// O(1) lookup using pre-computed index. - pub fn has_invocation_in_session(&self, session: &str) -> bool { - parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes)) - } -} - -/// Filter mode for search queries. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum IndexFilterMode { - /// No filtering - search all commands. - Global, - /// Filter to commands run in a specific directory. - Directory(String), - /// Filter to commands run in a workspace (directory prefix). - Workspace(String), - /// Filter to commands run on a specific host. - Host(String), - /// Filter to commands run in a specific session. - Session(String), -} - -/// Context for search queries. -#[derive(Debug, Clone, Default)] -pub struct QueryContext { - pub cwd: Option<String>, - pub git_root: Option<String>, - pub hostname: Option<String>, - pub session_id: Option<String>, -} - -/// Shareable frecency map: command -> frecency score. -/// Wrapped in Arc for zero-copy sharing with scorer callbacks. -type FrecencyMap = Arc<HashMap<Arc<str>, u32>>; - -/// A deduplicated search index with frecency-based ranking. -/// -/// Commands are stored by their text, with metadata about all invocations. -/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. -/// -/// Global frecency is precomputed by a background task and used for scoring. -/// If frecency data is not available, search still works but without frecency ranking; -/// although this should never happen due to precomputing the frecency map. -pub struct SearchIndex { - /// Map from command text to command data. - /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. - /// Keys are Arc<str> to enable zero-copy sharing with frecency_map. - commands: Arc<DashMap<Arc<str>, CommandData>>, - /// Nucleo fuzzy matcher - items are command strings. - nucleo: RwLock<Nucleo<String>>, - /// Injector for adding new commands to Nucleo. - injector: Injector<String>, - /// Precomputed global frecency map. Updated by background task. - frecency_map: RwLock<Option<FrecencyMap>>, - /// String interner for deduplicating cwd, hostname, and directory paths. - interner: Arc<ThreadedRodeo>, -} - -impl SearchIndex { - /// Create a new empty search index. - pub fn new() -> Self { - let nucleo_config = atuin_nucleo::Config::DEFAULT; - // Single column for command text - let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1); - let injector = nucleo.injector(); - - Self { - commands: Arc::new(DashMap::new()), - nucleo: RwLock::new(nucleo), - injector, - frecency_map: RwLock::new(None), - interner: Arc::new(ThreadedRodeo::new()), - } - } - - /// Add a history entry to the index. - /// - /// If the command already exists, updates its invocation data. - /// If it's a new command, adds it to both the map and Nucleo. - pub fn add_history(&self, history: &History) { - if is_known_agent(&history.author) { - return; - } - - let command = history.command.as_str(); - - // DashMap with Arc<str> keys can be looked up with &str via Borrow trait - if let Some(mut entry) = self.commands.get_mut(command) { - // Existing command - just update invocations - entry.add_invocation(history, &self.interner); - } else { - // New command - create Arc<str> once and share it - let Some(data) = CommandData::new(history, &self.interner) else { - return; // Invalid UUIDs, skip this entry - }; - let command_arc: Arc<str> = command.into(); - self.commands.insert(Arc::clone(&command_arc), data); - // Nucleo still needs String (unavoidable copy for fuzzy matching) - self.injector.push(command_arc.to_string(), |cmd, cols| { - cols[0] = cmd.clone().into(); - }); - } - // Note: frecency_map is rebuilt by background task, not invalidated here - } - - /// Add multiple history entries to the index. - pub fn add_histories(&self, histories: &[History]) { - for history in histories { - self.add_history(history); - } - } - - /// Get the number of unique commands in the index. - pub fn command_count(&self) -> usize { - self.commands.len() - } - - /// Get the number of items in Nucleo (should match command_count). - pub async fn nucleo_item_count(&self) -> u32 { - self.nucleo.read().await.snapshot().item_count() - } - - /// Search for commands matching a query. - /// - /// Returns a list of history IDs (most recent invocation per command). - /// Uses precomputed global frecency for scoring if available. - #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] - pub async fn search( - &self, - query: &str, - filter_mode: IndexFilterMode, - _context: &QueryContext, - limit: u32, - ) -> Vec<String> { - let mut nucleo = self.nucleo.write().await; - - // Get precomputed frecency map (may be None if not yet computed) - let frecency_map = self.frecency_map.read().await.clone(); - - // Build filter based on mode - let filter = self.build_filter(&filter_mode); - nucleo.set_filter(filter); - - // Build scorer from precomputed frecency (or None if not available) - let scorer = Self::build_scorer(frecency_map); - nucleo.set_scorer(scorer); - - // Update pattern - nucleo.pattern.reparse( - 0, - query, - pattern::CaseMatching::Smart, - pattern::Normalization::Smart, - false, - ); - - tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { - // Tick until complete - while nucleo.tick(10).running {} - }); - - // Collect results - let snapshot = nucleo.snapshot(); - let matched_count = snapshot.matched_item_count().min(limit); - - tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { - snapshot - .matched_items(..matched_count) - .filter_map(|item| { - let cmd = item.data; - // DashMap<Arc<str>, _>::get accepts &str via Borrow trait - self.commands - .get(cmd.as_str()) - .map(|data| data.most_recent_id()) - }) - .collect() - }) - } - - /// Rebuild the global frecency map. - /// - /// This should be called by a background task periodically. - /// The map is used for scoring search results. - /// - /// Uses multipliers from search settings: - /// - `recency_score_multiplier`: Weight for recency component - /// - `frequency_score_multiplier`: Weight for frequency component - /// - `frecency_score_multiplier`: Overall multiplier for final score - #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] - pub async fn rebuild_frecency(&self, search_settings: &Search) { - let now = OffsetDateTime::now_utc().unix_timestamp(); - let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new(); - - // Clamp multipliers to non-negative values to prevent broken frecency ranking - // (negative values would produce unexpected results when cast to u32) - let recency_mul = search_settings.recency_score_multiplier.max(0.0); - let frequency_mul = search_settings.frequency_score_multiplier.max(0.0); - let frecency_mul = search_settings.frecency_score_multiplier.max(0.0); - - for entry in self.commands.iter() { - let frecency = entry - .global_frecency - .compute(now, recency_mul, frequency_mul); - // Apply overall frecency multiplier and round to u32 - let frecency = (frecency as f64 * frecency_mul).round() as u32; - // Arc::clone is cheap - just increments reference count - frecency_map.insert(Arc::clone(entry.key()), frecency); - } - - *self.frecency_map.write().await = Some(Arc::new(frecency_map)); - } - - /// Build filter predicate for the given mode. - fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> { - // For Global mode, no filter needed - if matches!(mode, IndexFilterMode::Global) { - return None; - } - - // Pre-compute which commands pass the filter - // Use HashSet<String> for the short-lived filter (simpler than Arc lookup) - let passing_commands: Arc<HashSet<String>> = { - let mut set = HashSet::new(); - for entry in self.commands.iter() { - let passes = match mode { - IndexFilterMode::Global => unreachable!(), - IndexFilterMode::Directory(dir) => { - entry.has_invocation_in_dir(dir, &self.interner) - } - IndexFilterMode::Workspace(prefix) => { - entry.has_invocation_in_workspace(prefix, &self.interner) - } - IndexFilterMode::Host(hostname) => { - entry.has_invocation_on_host(hostname, &self.interner) - } - IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), - }; - if passes { - // Convert Arc<str> to String for filter lookup - set.insert(entry.key().to_string()); - } - } - Arc::new(set) - }; - - Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) - } - - /// Build scorer from precomputed frecency map. - /// - /// Returns None if frecency map is not available (search still works, just without frecency ranking). - fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> { - let map = frecency_map?; - Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { - // HashMap<Arc<str>, _>::get accepts &str via Borrow trait - let frecency = map.get(cmd.as_str()).copied().unwrap_or(0); - fuzzy_score + frecency - })) - } -} - -impl Default for SearchIndex { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use time::macros::datetime; - - fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History { - History::import() - .timestamp(timestamp) - .command(command) - .cwd(cwd) - .build() - .into() - } - - #[test] - fn frecency_data_compute() { - let now = 1000000i64; - - // Recent command (with default multipliers of 1.0) - let recent = FrecencyData { - count: 5, - last_used: now - 60, // 1 minute ago - }; - assert!(recent.compute(now, 1.0, 1.0) > 100); // High score - - // Old command - let old = FrecencyData { - count: 5, - last_used: now - 86400 * 30, // 30 days ago - }; - assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0)); - - // Frequently used old command - let frequent_old = FrecencyData { - count: 100, - last_used: now - 86400 * 7, // 1 week ago - }; - // Should still have decent score due to frequency - assert!(frequent_old.compute(now, 1.0, 1.0) > 50); - } - - #[test] - fn frecency_data_compute_with_multipliers() { - let now = 1000000i64; - - let data = FrecencyData { - count: 5, - last_used: now - 60, // 1 minute ago (recency_score = 100) - }; - - // Default multipliers (1.0, 1.0) - let default_score = data.compute(now, 1.0, 1.0); - - // Double recency weight - let double_recency = data.compute(now, 2.0, 1.0); - assert!(double_recency > default_score); - - // Double frequency weight - let double_frequency = data.compute(now, 1.0, 2.0); - assert!(double_frequency > default_score); - - // Zero out recency (only frequency counts) - let no_recency = data.compute(now, 0.0, 1.0); - assert!(no_recency < default_score); - - // Zero out frequency (only recency counts) - let no_frequency = data.compute(now, 1.0, 0.0); - assert!(no_frequency < default_score); - - // Zero both (should be zero) - let no_score = data.compute(now, 0.0, 0.0); - assert_eq!(no_score, 0); - - // Fractional multipliers - let half_recency = data.compute(now, 0.5, 1.0); - assert!(half_recency < default_score); - assert!(half_recency > no_recency); - - // 1.5x multiplier - let boost_recency = data.compute(now, 1.5, 1.0); - assert!(boost_recency > default_score); - assert!(boost_recency < double_recency); - } - - #[test] - fn command_data_add_invocation() { - let interner = ThreadedRodeo::new(); - - let (dir1, dir2) = if cfg!(windows) { - ("C:\\Users\\User\\project", "C:\\Users\\User\\other") - } else { - ("/home/user/project", "/home/user/other") - }; - - let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); - let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); - - let mut data = CommandData::new(&history1, &interner).unwrap(); - assert_eq!(data.global_frecency.count, 1); - let id1 = data.most_recent_id(); - - data.add_invocation(&history2, &interner); - assert_eq!(data.global_frecency.count, 2); - - // Most recent ID should update to history2 (newer timestamp) - let id2 = data.most_recent_id(); - assert_ne!(id1, id2); - } - - #[test] - fn command_data_filters() { - let interner = ThreadedRodeo::new(); - - let (dir1, dir2) = if cfg!(windows) { - ("C:\\Users\\User\\project", "C:\\Users\\User\\other") - } else { - ("/home/user/project", "/home/user/other") - }; - - let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); - let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); - - let mut data = CommandData::new(&h1, &interner).unwrap(); - data.add_invocation(&h2, &interner); - - let (check1, check2, check3) = if cfg!(windows) { - ( - with_trailing_slash("C:\\Users\\User\\project"), - with_trailing_slash("C:\\Users\\User\\other"), - with_trailing_slash("C:\\Users\\User\\missing"), - ) - } else { - ( - with_trailing_slash("/home/user/project"), - with_trailing_slash("/home/user/other"), - with_trailing_slash("/home/user/missing"), - ) - }; - - assert!(data.has_invocation_in_dir(&check1, &interner)); - assert!(data.has_invocation_in_dir(&check2, &interner)); - assert!(!data.has_invocation_in_dir(&check3, &interner)); - - let (check1, check2, check3) = if cfg!(windows) { - ( - with_trailing_slash("C:\\Users\\User"), - with_trailing_slash("C:\\Users"), - with_trailing_slash("C:\\Users\\User\\var"), - ) - } else { - ( - with_trailing_slash("/home/user"), - with_trailing_slash("/home"), - with_trailing_slash("/var"), - ) - }; - - assert!(data.has_invocation_in_workspace(&check1, &interner)); - assert!(data.has_invocation_in_workspace(&check2, &interner)); - assert!(!data.has_invocation_in_workspace(&check3, &interner)); - } - - #[tokio::test] - async fn search_index_add_and_search() { - let index = SearchIndex::new(); - - let h1 = make_history( - "git status", - "/home/user/project", - datetime!(2024-01-01 10:00 UTC), - ); - let h2 = make_history( - "git commit -m 'test'", - "/home/user/project", - datetime!(2024-01-01 10:05 UTC), - ); - let h3 = make_history( - "ls -la", - "/home/user/other", - datetime!(2024-01-01 10:10 UTC), - ); - - index.add_history(&h1); - index.add_history(&h2); - index.add_history(&h3); - - assert_eq!(index.command_count(), 3); - - // Search for "git" - should match 2 commands - let results = index - .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) - .await; - assert_eq!(results.len(), 2); - - // Search with directory filter - let results = index - .search( - "", - IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), - &QueryContext::default(), - 10, - ) - .await; - assert_eq!(results.len(), 2); // git status and git commit - } -} diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs deleted file mode 100644 index 4d261956..00000000 --- a/crates/atuin-daemon/src/search/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Search module for the daemon gRPC search service. -//! -//! This module provides fuzzy search over command history using Nucleo. - -mod index; - -// Include the generated proto code -tonic::include_proto!("search"); - -// Re-export the service and index -pub use index::{IndexFilterMode, QueryContext, SearchIndex}; diff --git a/crates/atuin-daemon/src/semantic/mod.rs b/crates/atuin-daemon/src/semantic/mod.rs deleted file mode 100644 index c3511676..00000000 --- a/crates/atuin-daemon/src/semantic/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Semantic command capture gRPC service types. - -tonic::include_proto!("semantic"); diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs deleted file mode 100644 index b823cff2..00000000 --- a/crates/atuin-daemon/src/server.rs +++ /dev/null @@ -1,170 +0,0 @@ -use eyre::Result; - -use crate::components::history::HistoryGrpcService; -use crate::components::search::SearchGrpcService; -use crate::components::semantic::SemanticGrpcService; -use crate::control::{ControlService, control_server::ControlServer}; -use crate::daemon::DaemonHandle; -use crate::history::history_server::HistoryServer; -use crate::search::search_server::SearchServer; -use crate::semantic::semantic_server::SemanticServer; - -use atuin_client::settings::Settings; - -/// Run the gRPC server with the given services. -/// -/// This starts the gRPC server in the background and returns immediately. -/// The server will shut down when a ShutdownRequested event is received. -#[cfg(unix)] -pub async fn run_grpc_server( - settings: Settings, - history_service: HistoryServer<HistoryGrpcService>, - search_service: SearchServer<SearchGrpcService>, - semantic_service: SemanticServer<SemanticGrpcService>, - control_service: ControlServer<ControlService>, - handle: DaemonHandle, -) -> Result<()> { - use tokio::net::UnixListener; - use tokio_stream::wrappers::UnixListenerStream; - - let socket_path = settings.daemon.socket_path.clone(); - - let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket { - #[cfg(target_os = "linux")] - { - use eyre::{OptionExt, WrapErr}; - use std::os::unix::net::SocketAddr; - use std::path::PathBuf; - tracing::info!("getting systemd socket"); - let listener = listenfd::ListenFd::from_env() - .take_unix_listener(0)? - .ok_or_eyre("missing systemd socket")?; - listener.set_nonblocking(true)?; - let actual_path: Result<PathBuf, eyre::Report> = listener - .local_addr() - .context("getting systemd socket's path") - .and_then(|addr: SocketAddr| { - addr.as_pathname() - .ok_or_eyre("systemd socket missing path") - .map(|path: &std::path::Path| path.to_owned()) - }); - match actual_path { - Ok(actual_path) => { - tracing::info!("listening on systemd socket: {actual_path:?}"); - if actual_path != std::path::Path::new(&socket_path) { - tracing::warn!( - "systemd socket is not at configured client path: {socket_path:?}" - ); - } - } - Err(err) => { - tracing::warn!( - "could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}" - ); - } - } - (UnixListener::from_std(listener)?, false) - } - #[cfg(not(target_os = "linux"))] - unreachable!() - } else { - tracing::info!("listening on unix socket {socket_path:?}"); - (UnixListener::bind(socket_path.clone())?, true) - }; - - let uds_stream = UnixListenerStream::new(uds); - - // Create shutdown signal from daemon handle - let shutdown_signal = async move { - let mut rx = handle.subscribe(); - loop { - use crate::DaemonEvent; - - match rx.recv().await { - Ok(DaemonEvent::ShutdownRequested) => break, - Ok(_) => continue, - Err(_) => break, // Channel closed - } - } - if cleanup { - eprintln!("Removing socket..."); - if let Err(e) = std::fs::remove_file(&socket_path) - && e.kind() != std::io::ErrorKind::NotFound - { - eprintln!("failed to remove socket: {e}"); - } - } - eprintln!("Shutting down gRPC server..."); - }; - - // Spawn the server in the background - tokio::spawn(async move { - use tonic::transport::Server; - - if let Err(e) = Server::builder() - .add_service(history_service) - .add_service(search_service) - .add_service(semantic_service) - .add_service(control_service) - .serve_with_incoming_shutdown(uds_stream, shutdown_signal) - .await - { - tracing::error!("gRPC server error: {e}"); - } - }); - - Ok(()) -} - -/// Run the gRPC server with the given services (Windows/TCP version). -#[cfg(not(unix))] -pub async fn run_grpc_server( - settings: Settings, - history_service: HistoryServer<HistoryGrpcService>, - search_service: SearchServer<SearchGrpcService>, - semantic_service: SemanticServer<SemanticGrpcService>, - control_service: ControlServer<ControlService>, - handle: DaemonHandle, -) -> Result<()> { - use tokio::net::TcpListener; - use tokio_stream::wrappers::TcpListenerStream; - use tonic::transport::Server; - - let port = settings.daemon.tcp_port; - let url = format!("127.0.0.1:{port}"); - let tcp = TcpListener::bind(&url).await?; - let tcp_stream = TcpListenerStream::new(tcp); - - tracing::info!("listening on tcp port {:?}", port); - - // Create shutdown signal from daemon handle - let shutdown_signal = async move { - use crate::DaemonEvent; - - let mut rx = handle.subscribe(); - loop { - match rx.recv().await { - Ok(DaemonEvent::ShutdownRequested) => break, - Ok(_) => continue, - Err(_) => break, // Channel closed - } - } - eprintln!("Shutting down gRPC server..."); - }; - - // Spawn the server in the background - tokio::spawn(async move { - if let Err(e) = Server::builder() - .add_service(history_service) - .add_service(search_service) - .add_service(semantic_service) - .add_service(control_service) - .serve_with_incoming_shutdown(tcp_stream, shutdown_signal) - .await - { - tracing::error!("gRPC server error: {e}"); - } - }); - - Ok(()) -} |
