diff options
Diffstat (limited to 'crates/turtle/src/atuin_daemon')
| -rw-r--r-- | crates/turtle/src/atuin_daemon/client.rs | 57 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/history.rs | 21 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/search.rs | 13 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/semantic.rs | 41 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/components/sync.rs | 37 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/control/mod.rs | 80 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/control/service.rs | 71 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/events.rs | 3 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/generated.rs | 38 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/history/mod.rs | 6 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/mod.rs | 13 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/search/index.rs | 678 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/search/mod.rs | 681 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/semantic/mod.rs | 3 | ||||
| -rw-r--r-- | crates/turtle/src/atuin_daemon/server.rs | 38 |
15 files changed, 909 insertions, 871 deletions
diff --git a/crates/turtle/src/atuin_daemon/client.rs b/crates/turtle/src/atuin_daemon/client.rs index 325b21b8..4ec1a60b 100644 --- a/crates/turtle/src/atuin_daemon/client.rs +++ b/crates/turtle/src/atuin_daemon/client.rs @@ -1,5 +1,3 @@ -use crate::atuin_client::database::Context; -use crate::atuin_client::settings::{FilterMode, Settings}; use eyre::{Context as EyreContext, Result}; use tonic::Code; use tonic::transport::{Channel, Endpoint, Uri}; @@ -9,27 +7,38 @@ use hyper_util::rt::TokioIo; #[cfg(unix)] use tokio::net::UnixStream; - -use crate::atuin_client::history::History; use tracing::{Level, instrument, span}; -use crate::atuin_daemon::control::HistoryRebuiltEvent; -use crate::atuin_daemon::control::{ - ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, - SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, -}; -use crate::atuin_daemon::events::DaemonEvent; -use crate::atuin_daemon::history::{ - EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, - StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, - history_client::HistoryClient as HistoryServiceClient, -}; -use crate::atuin_daemon::search::{ - FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, - search_client::SearchClient as SearchServiceClient, -}; -use crate::atuin_daemon::semantic::{ - CommandCapture, RecordCommandsReply, semantic_client::SemanticClient as SemanticServiceClient, +use crate::atuin_daemon::generated; +use crate::{ + atuin_client::{ + database::Context, + history::History, + settings::{FilterMode, Settings}, + }, + atuin_daemon::{ + events::DaemonEvent, + generated::{ + control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, HistoryRebuiltEvent, + SendEventRequest, SettingsReloadedEvent, ShutdownEvent, + control_client::ControlClient as ControlServiceClient, + }, + history::{ + EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, + StartHistoryRequest, StatusReply, StatusRequest, TailHistoryReply, + TailHistoryRequest, history_client::HistoryClient as HistoryServiceClient, + }, + search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, + SearchResponse, search_client::SearchClient as SearchServiceClient, + }, + semantic::{ + CommandCapture, RecordCommandsReply, + semantic_client::SemanticClient as SemanticServiceClient, + }, + }, + }, }; pub(crate) struct HistoryClient { @@ -311,10 +320,8 @@ impl ControlClient { } /// Convert a daemon event to its proto representation. -fn daemon_event_to_proto( - event: DaemonEvent, -) -> crate::atuin_daemon::control::send_event_request::Event { - use crate::atuin_daemon::control::send_event_request::Event; +fn daemon_event_to_proto(event: DaemonEvent) -> generated::control::send_event_request::Event { + use generated::control::send_event_request::Event; match event { DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), diff --git a/crates/turtle/src/atuin_daemon/components/history.rs b/crates/turtle/src/atuin_daemon/components/history.rs index b71543c1..b4f91b06 100644 --- a/crates/turtle/src/atuin_daemon/components/history.rs +++ b/crates/turtle/src/atuin_daemon/components/history.rs @@ -18,7 +18,7 @@ use tracing::{Level, instrument}; use crate::atuin_daemon::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, - history::{ + generated::history::{ EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, @@ -144,8 +144,8 @@ impl HistorySvc for HistoryGrpcService { ) -> Result<Response<StartHistoryReply>, Status> { let req = request.into_inner(); - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + let timestamp = OffsetDateTime::from_unix_timestamp_nanos(i128::from(req.timestamp)) + .map_err(|_| { Status::invalid_argument( "failed to parse timestamp as unix time (expected nanos since epoch)", ) @@ -181,6 +181,7 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + #[expect(clippy::significant_drop_tightening, reason = "Would be a logic-bug")] async fn end_history( &self, request: Request<EndHistoryRequest>, @@ -216,11 +217,7 @@ impl HistorySvc for HistoryGrpcService { .await .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); + tracing::info!(id = id.0, duration = history.duration, "end history"); // Push to record store let (record_id, idx) = history_store @@ -247,6 +244,7 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + #[expect(clippy::significant_drop_tightening, reason = "Would be a logic-bug")] async fn tail_history( &self, _request: Request<TailHistoryRequest>, @@ -265,11 +263,12 @@ impl HistorySvc for HistoryGrpcService { let event = match rx.recv().await { Ok(event) => event, Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { - let _ = tx - .send(Err(Status::resource_exhausted(format!( + drop( + tx.send(Err(Status::resource_exhausted(format!( "tail stream lagged behind and dropped {skipped} events" )))) - .await; + .await, + ); break; } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, diff --git a/crates/turtle/src/atuin_daemon/components/search.rs b/crates/turtle/src/atuin_daemon/components/search.rs index 832d05d8..39df758b 100644 --- a/crates/turtle/src/atuin_daemon/components/search.rs +++ b/crates/turtle/src/atuin_daemon/components/search.rs @@ -15,10 +15,11 @@ use uuid::Uuid; use crate::atuin_daemon::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, - search::{ - FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + generated::search::{ + self, FilterMode, SearchRequest, SearchResponse, search_server::{Search as SearchSvc, SearchServer}, }, + search::{IndexFilterMode, QueryContext, SearchIndex}, }; const PAGE_SIZE: usize = 5000; @@ -35,7 +36,7 @@ const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; /// - Provides the Search gRPC service pub(crate) struct SearchComponent { index: Arc<RwLock<SearchIndex>>, - handle: tokio::sync::RwLock<Option<DaemonHandle>>, + handle: RwLock<Option<DaemonHandle>>, loader_handle: Option<tokio::task::JoinHandle<()>>, frecency_handle: Option<tokio::task::JoinHandle<()>>, } @@ -45,7 +46,7 @@ impl SearchComponent { pub(crate) fn new() -> Self { Self { index: Arc::new(RwLock::new(SearchIndex::new())), - handle: tokio::sync::RwLock::new(None), + handle: RwLock::new(None), loader_handle: None, frecency_handle: None, } @@ -351,7 +352,7 @@ impl SearchSvc for SearchGrpcService { } } Err(e) => { - let _ = tx.send(Err(e)).await; + drop(tx.send(Err(e)).await); break; } } @@ -367,7 +368,7 @@ impl SearchSvc for SearchGrpcService { /// Convert proto FilterMode and context to IndexFilterMode. fn convert_filter_mode( mode: FilterMode, - context: &Option<crate::atuin_daemon::search::SearchContext>, + context: &Option<search::SearchContext>, ) -> IndexFilterMode { match (mode, context) { (FilterMode::Global, _) => IndexFilterMode::Global, diff --git a/crates/turtle/src/atuin_daemon/components/semantic.rs b/crates/turtle/src/atuin_daemon/components/semantic.rs index 052c2d73..69ffc134 100644 --- a/crates/turtle/src/atuin_daemon/components/semantic.rs +++ b/crates/turtle/src/atuin_daemon/components/semantic.rs @@ -9,6 +9,7 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use crate::atuin_client::history::{History, HistoryId}; +use crate::atuin_daemon::generated::semantic; use eyre::Result; use tokio::sync::Mutex; use tonic::{Request, Response, Status, Streaming}; @@ -17,7 +18,7 @@ use tracing::{Level, instrument}; use crate::atuin_daemon::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, - semantic::{ + generated::semantic::{ CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply, semantic_server::{Semantic as SemanticSvc, SemanticServer}, }, @@ -244,7 +245,7 @@ impl SemanticState { fn command_output_for_ref( &self, capture_ref: &CaptureRef, - ranges: &[crate::atuin_daemon::semantic::OutputRange], + ranges: &[semantic::OutputRange], ) -> Option<CommandOutputReply> { let stored = self .sessions @@ -534,17 +535,14 @@ fn command_output_not_found() -> CommandOutputReply { } } -fn select_output_ranges( - output: &str, - ranges: &[crate::atuin_daemon::semantic::OutputRange], -) -> Vec<OutputLine> { +fn select_output_ranges(output: &str, ranges: &[semantic::OutputRange]) -> Vec<OutputLine> { let lines: Vec<&str> = output.lines().collect(); if lines.is_empty() { return Vec::new(); } let ranges = if ranges.is_empty() { - vec![crate::atuin_daemon::semantic::OutputRange { start: 0, end: 999 }] + vec![semantic::OutputRange { start: 0, end: 999 }] } else { ranges.to_vec() }; @@ -627,9 +625,20 @@ fn log_record(record: &SemanticCommandRecord, message: &'static str) { #[cfg(test)] mod tests { - use super::*; use time::OffsetDateTime; + use crate::{ + atuin_client::history::{History, HistoryId}, + atuin_daemon::{ + components::semantic::{ + MAX_COMMANDS_PER_SESSION, MAX_SESSIONS, SemanticCommandRecord, SemanticState, + SessionCaptures, SessionId, select_output_ranges, + }, + generated::semantic::{self, CommandOutputReply, CommandOutputRequest, OutputLine}, + }, + atuin_pty_proxy::CommandCapture, + }; + fn history(id: &str, session: &str, command: &str) -> History { History { id: HistoryId(id.to_string()), @@ -819,8 +828,8 @@ mod tests { fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() { let output = "zero\none\ntwo\nthree\nfour"; let ranges = vec![ - crate::atuin_daemon::semantic::OutputRange { start: 1, end: 2 }, - crate::atuin_daemon::semantic::OutputRange { start: -2, end: -1 }, + semantic::OutputRange { start: 1, end: 2 }, + semantic::OutputRange { start: -2, end: -1 }, ]; assert_eq!( @@ -841,8 +850,8 @@ mod tests { .collect::<Vec<_>>() .join("\n"); let ranges = vec![ - crate::atuin_daemon::semantic::OutputRange { start: 0, end: 100 }, - crate::atuin_daemon::semantic::OutputRange { + semantic::OutputRange { start: 0, end: 100 }, + semantic::OutputRange { start: -100, end: -1, }, @@ -859,8 +868,8 @@ mod tests { fn output_ranges_can_leave_gaps_for_client_formatting() { let output = "zero\none\ntwo\nthree\nfour"; let ranges = vec![ - crate::atuin_daemon::semantic::OutputRange { start: 0, end: 1 }, - crate::atuin_daemon::semantic::OutputRange { start: 4, end: 4 }, + semantic::OutputRange { start: 0, end: 1 }, + semantic::OutputRange { start: 4, end: 4 }, ]; assert_eq!( @@ -891,8 +900,8 @@ mod tests { fn output_ranges_skip_ranges_fully_outside_output() { let output = "zero\none\ntwo"; let ranges = vec![ - crate::atuin_daemon::semantic::OutputRange { start: 10, end: 20 }, - crate::atuin_daemon::semantic::OutputRange { + semantic::OutputRange { start: 10, end: 20 }, + semantic::OutputRange { start: -20, end: -10, }, diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs index fbfbbd67..8b5b621d 100644 --- a/crates/turtle/src/atuin_daemon/components/sync.rs +++ b/crates/turtle/src/atuin_daemon/components/sync.rs @@ -27,9 +27,9 @@ enum SyncCommand { /// Sync state - tracks whether we're in normal operation or retrying after failure. #[derive(Clone, Copy, PartialEq, Eq)] enum SyncState { - /// Normal operation. Periodic syncs only run if auto_sync is enabled. + /// Normal operation. Periodic syncs only run if [`auto_sync`] is enabled. Idle, - /// Retrying after a sync failure. Retries continue regardless of auto_sync + /// Retrying after a sync failure. Retries continue regardless of [`auto_sync`] /// until the sync succeeds. Retrying, } @@ -39,7 +39,7 @@ enum SyncState { /// This component: /// - Runs a background sync loop on a configurable interval /// - Implements exponential backoff on sync failures -/// - Responds to ForceSync events for immediate sync +/// - Responds to [`ForceSync`] events for immediate sync /// - Emits SyncCompleted/SyncFailed events pub(crate) struct SyncComponent { task_handle: Option<tokio::task::JoinHandle<()>>, @@ -80,22 +80,28 @@ impl Component for SyncComponent { } async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { - if let DaemonEvent::ForceSync = event { - tracing::info!("force sync requested"); - if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::ForceSync).await; + match event { + DaemonEvent::ForceSync => { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + drop(tx.send(SyncCommand::ForceSync).await); + } + } + DaemonEvent::SyncFailed { error } => { + tracing::error!(?error, "Sync failed."); } + _ => (), } Ok(()) } async fn stop(&mut self) -> Result<()> { if let Some(tx) = &self.command_tx { - let _ = tx.send(SyncCommand::Stop).await; + drop(tx.send(SyncCommand::Stop).await); } if let Some(handle) = self.task_handle.take() { // Give the task a moment to shut down gracefully - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + drop(time::timeout(Duration::from_secs(5), handle).await); } tracing::info!("sync component stopped"); Ok(()) @@ -126,7 +132,7 @@ async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand> // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + let mut ticker = time::interval(Duration::from_secs(settings.daemon.sync_frequency)); // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, // we may end up running a lot of syncs in a hot loop. @@ -224,10 +230,10 @@ async fn do_sync_tick( } *ticker = time::interval_at( - tokio::time::Instant::now() + Duration::from_secs(new_interval as u64), - time::Duration::from_secs(new_interval as u64), + time::Instant::now() + Duration::from_secs(new_interval as u64), + Duration::from_secs(new_interval as u64), ); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(Duration::from_secs(new_interval as u64)); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); tracing::error!("backing off, next sync tick in {new_interval}"); @@ -261,9 +267,8 @@ async fn do_sync_tick( // Reset backoff on success if ticker.period().as_secs() != settings.daemon.sync_frequency { *ticker = time::interval_at( - tokio::time::Instant::now() - + Duration::from_secs(settings.daemon.sync_frequency), - time::Duration::from_secs(settings.daemon.sync_frequency), + time::Instant::now() + Duration::from_secs(settings.daemon.sync_frequency), + Duration::from_secs(settings.daemon.sync_frequency), ); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); } diff --git a/crates/turtle/src/atuin_daemon/control/mod.rs b/crates/turtle/src/atuin_daemon/control/mod.rs index 23068519..7015db5b 100644 --- a/crates/turtle/src/atuin_daemon/control/mod.rs +++ b/crates/turtle/src/atuin_daemon/control/mod.rs @@ -1,12 +1,76 @@ -//! Control module for external event injection. +//! Control service implementation. //! -//! This module provides the gRPC service that allows external processes -//! (like CLI commands) to inject events into the daemon's event bus. +//! This gRPC service allows external processes (like CLI commands) to inject +//! events into the daemon's event bus. -mod service; +use tonic::{Request, Response, Status}; +use tracing::{Level, info, instrument}; -// Include the generated proto code -tonic::include_proto!("control"); +use crate::{ + atuin_client::history::HistoryId, + atuin_daemon::{ + daemon::DaemonHandle, + events::DaemonEvent, + generated::control::{ + SendEventRequest, SendEventResponse, + control_server::{Control, ControlServer}, + send_event_request::Event, + }, + }, +}; -// Re-export the service -pub(crate) use service::ControlService; +/// The Control gRPC service. +/// +/// This service is used by external processes to inject events into the daemon. +/// It's not a component - it's part of the daemon's core infrastructure. +pub(crate) struct ControlService { + handle: DaemonHandle, +} + +impl ControlService { + /// Create a new control service with the given daemon handle. + pub(crate) fn new(handle: DaemonHandle) -> Self { + Self { handle } + } + + /// Get a tonic server for this service. + pub(crate) fn into_server(self) -> ControlServer<Self> { + ControlServer::new(self) + } +} + +#[tonic::async_trait] +impl Control for ControlService { + #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] + async fn send_event( + &self, + request: Request<SendEventRequest>, + ) -> Result<Response<SendEventResponse>, Status> { + let req = request.into_inner(); + + let event = req + .event + .ok_or_else(|| Status::invalid_argument("event is required"))?; + + let daemon_event = proto_event_to_daemon_event(event)?; + + info!(?daemon_event, "received control event"); + self.handle.emit(daemon_event); + + Ok(Response::new(SendEventResponse {})) + } +} + +/// Convert a proto event to a daemon event. +fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> { + match event { + Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), + Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), + Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { + ids: e.ids.into_iter().map(HistoryId).collect(), + }), + Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), + Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), + Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), + } +} diff --git a/crates/turtle/src/atuin_daemon/control/service.rs b/crates/turtle/src/atuin_daemon/control/service.rs deleted file mode 100644 index 8061a3c2..00000000 --- a/crates/turtle/src/atuin_daemon/control/service.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Control service implementation. -//! -//! This gRPC service allows external processes (like CLI commands) to inject -//! events into the daemon's event bus. - -use crate::atuin_client::history::HistoryId; -use tonic::{Request, Response, Status}; -use tracing::{Level, info, instrument}; - -use super::{ - SendEventRequest, SendEventResponse, - control_server::{Control, ControlServer}, - send_event_request::Event, -}; -use crate::atuin_daemon::{daemon::DaemonHandle, events::DaemonEvent}; - -/// The Control gRPC service. -/// -/// This service is used by external processes to inject events into the daemon. -/// It's not a component - it's part of the daemon's core infrastructure. -pub(crate) struct ControlService { - handle: DaemonHandle, -} - -impl ControlService { - /// Create a new control service with the given daemon handle. - pub(crate) fn new(handle: DaemonHandle) -> Self { - Self { handle } - } - - /// Get a tonic server for this service. - pub(crate) fn into_server(self) -> ControlServer<Self> { - ControlServer::new(self) - } -} - -#[tonic::async_trait] -impl Control for ControlService { - #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] - async fn send_event( - &self, - request: Request<SendEventRequest>, - ) -> Result<Response<SendEventResponse>, Status> { - let req = request.into_inner(); - - let event = req - .event - .ok_or_else(|| Status::invalid_argument("event is required"))?; - - let daemon_event = proto_event_to_daemon_event(event)?; - - info!(?daemon_event, "received control event"); - self.handle.emit(daemon_event); - - Ok(Response::new(SendEventResponse {})) - } -} - -/// Convert a proto event to a daemon event. -fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> { - match event { - Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), - Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), - Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { - ids: e.ids.into_iter().map(HistoryId).collect(), - }), - Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), - Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), - Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), - } -} diff --git a/crates/turtle/src/atuin_daemon/events.rs b/crates/turtle/src/atuin_daemon/events.rs index 09369512..d379277d 100644 --- a/crates/turtle/src/atuin_daemon/events.rs +++ b/crates/turtle/src/atuin_daemon/events.rs @@ -32,8 +32,11 @@ pub(crate) enum DaemonEvent { /// Sync completed successfully. SyncCompleted { /// Number of records uploaded. + #[expect(unused)] uploaded: usize, + /// Number of records downloaded. + #[expect(unused)] downloaded: usize, }, diff --git a/crates/turtle/src/atuin_daemon/generated.rs b/crates/turtle/src/atuin_daemon/generated.rs new file mode 100644 index 00000000..e43f7523 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/generated.rs @@ -0,0 +1,38 @@ +#![allow( + unreachable_pub, + unused_qualifications, + reason = "All of these lints are triggered by the generated code" +)] + +/// Semantic command capture gRPC service types. +pub(crate) mod semantic { + tonic::include_proto!("semantic"); +} + +/// Search module for the daemon gRPC search service. +/// +/// This module provides fuzzy search over command history using Nucleo. +pub(crate) mod search { + // Include the generated proto code + tonic::include_proto!("search"); +} + +/// History module for the daemon gRPC history service. +/// +/// This module contains the proto-generated types for the history gRPC service. +pub(crate) mod history { + // Include the generated proto code + tonic::include_proto!("history"); +} + +/// Control module for external event injection. +/// +/// This module provides the gRPC service that allows external processes +/// (like CLI commands) to inject events into the daemon's event bus. +pub(crate) mod control { + // Include the generated proto code + tonic::include_proto!("control"); + + // Re-export the service + pub(crate) use crate::atuin_daemon::control::ControlService; +} diff --git a/crates/turtle/src/atuin_daemon/history/mod.rs b/crates/turtle/src/atuin_daemon/history/mod.rs deleted file mode 100644 index b71853df..00000000 --- a/crates/turtle/src/atuin_daemon/history/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! History module for the daemon gRPC history service. -//! -//! This module contains the proto-generated types for the history gRPC service. - -// Include the generated proto code -tonic::include_proto!("history"); diff --git a/crates/turtle/src/atuin_daemon/mod.rs b/crates/turtle/src/atuin_daemon/mod.rs index b161b0cc..5f0f489e 100644 --- a/crates/turtle/src/atuin_daemon/mod.rs +++ b/crates/turtle/src/atuin_daemon/mod.rs @@ -8,11 +8,11 @@ pub(crate) mod components; pub(crate) mod control; pub(crate) mod daemon; pub(crate) mod events; -pub(crate) mod history; pub(crate) mod search; -pub(crate) mod semantic; pub(crate) mod server; +pub(crate) mod generated; + // Re-export core daemon types for convenience pub(crate) use daemon::Daemon; pub(crate) use events::DaemonEvent; @@ -91,21 +91,18 @@ pub(crate) async fn boot( signal_handle.shutdown(); }); - // Start the gRPC server in the background server::run_grpc_server( - settings, + &settings, history_service, search_service, semantic_service, control_service.into_server(), handle, - ) - .await?; + )?; - // Run the daemon event loop daemon.run_event_loop().await?; - // Stop all components on shutdown + // After the event loop exited, we shut-down the components. daemon.stop_components().await; tracing::info!("daemon shut down complete"); diff --git a/crates/turtle/src/atuin_daemon/search/index.rs b/crates/turtle/src/atuin_daemon/search/index.rs deleted file mode 100644 index 197a8c1b..00000000 --- a/crates/turtle/src/atuin_daemon/search/index.rs +++ /dev/null @@ -1,678 +0,0 @@ -//! Search index with frecency-based ranking. -//! -//! This module provides a deduplicated search index where each unique command -//! is stored once, with metadata about all its invocations. This enables: -//! -//! - Efficient fuzzy matching (fewer items to match) -//! - Frecency-based ranking (frequency + recency) -//! - Dynamic filtering by directory, host, session, etc. - -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use crate::atuin_client::settings::Search; -use crate::{ - atuin_client::history::History, atuin_daemon::components::search::with_trailing_slash, -}; -use atuin_nucleo::{Injector, Nucleo, pattern}; -use dashmap::DashMap; -use lasso::{Spur, ThreadedRodeo}; -use time::OffsetDateTime; -use tokio::sync::RwLock; -use tracing::{Level, instrument}; -use uuid::Uuid; - -/// Parse a UUID string into a 16-byte array. -/// Returns None if the string is not a valid UUID. -fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> { - Uuid::parse_str(s).ok().map(|u| *u.as_bytes()) -} - -/// Format a 16-byte array as a UUID string. -fn format_uuid_bytes(bytes: &[u8; 16]) -> String { - Uuid::from_bytes(*bytes).to_string() -} - -/// Pre-computed frecency data for O(1) lookup. -#[derive(Debug, Clone, Default)] -pub(crate) struct FrecencyData { - /// Total number of times this command was used. - pub(crate) count: u32, - /// Most recent usage timestamp (unix seconds). - pub(crate) last_used: i64, -} - -impl FrecencyData { - /// Record a new usage of this command. - pub(crate) fn record_use(&mut self, timestamp: i64) { - self.count += 1; - if timestamp > self.last_used { - self.last_used = timestamp; - } - } - - /// Compute frecency score based on count and recency. - /// - /// Uses a decay function where more recent commands score higher. - /// The formula balances frequency (how often) with recency (how recent). - /// - /// Multipliers allow tuning the relative weights: - /// - `recency_mul`: Multiplier for recency score (default: 1.0) - /// - `frequency_mul`: Multiplier for frequency score (default: 1.0) - /// - /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight. - /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc. - #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] - pub(crate) fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 { - if self.count == 0 { - return 0; - } - - // Time-based decay: score decreases as time passes - let age_seconds = (now - self.last_used).max(0) as u64; - let age_hours = age_seconds / 3600; - - // Decay factor: recent commands get higher scores - // - Last hour: multiplier ~1.0 - // - Last day: multiplier ~0.5 - // - Last week: multiplier ~0.1 - // - Older: multiplier approaches 0 - let recency_score: f64 = match age_hours { - 0 => 100.0, - 1..=6 => 90.0, - 7..=24 => 70.0, - 25..=72 => 50.0, - 73..=168 => 30.0, - 169..=720 => 15.0, - _ => 5.0, - }; - - // Frequency boost: more uses = higher score (with diminishing returns) - let frequency_score = (f64::from(self.count).ln() * 20.0).min(100.0); - - // Apply multipliers and combine scores, then round to u32 - ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32 - } -} - -/// Data for a unique command. -pub(crate) struct CommandData { - /// History ID of the most recent invocation (16-byte UUID). - most_recent_id: [u8; 16], - /// Timestamp of the most recent invocation. - most_recent_timestamp: i64, - /// Pre-computed global frecency. - pub(crate) global_frecency: FrecencyData, - - // Pre-computed indexes for O(1) filter lookups - // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized) - /// All directories where this command has been run (interned keys). - directories: HashSet<Spur>, - /// All hostnames where this command has been run (interned keys). - hosts: HashSet<Spur>, - /// All sessions where this command has been run (as 16-byte UUIDs). - sessions: HashSet<[u8; 16]>, -} - -impl CommandData { - /// Create a new [`CommandData`] from a history entry. - /// Returns None if the history entry has invalid UUIDs. - pub(crate) fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> { - let history_id = parse_uuid_bytes(&history.id.0)?; - let session = parse_uuid_bytes(&history.session)?; - let timestamp = history.timestamp.unix_timestamp(); - - let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); - let host_key = interner.get_or_intern(&history.hostname); - - let mut directories = HashSet::new(); - directories.insert(dir_key); - - let mut hosts = HashSet::new(); - hosts.insert(host_key); - - let mut sessions = HashSet::new(); - sessions.insert(session); - - let mut global_frecency = FrecencyData::default(); - global_frecency.record_use(timestamp); - - Some(Self { - most_recent_id: history_id, - most_recent_timestamp: timestamp, - global_frecency, - directories, - hosts, - sessions, - }) - } - - /// Add an invocation from a history entry. - /// Returns false if the history entry has invalid UUIDs. - pub(crate) fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool { - let Some(history_id) = parse_uuid_bytes(&history.id.0) else { - return false; - }; - let Some(session) = parse_uuid_bytes(&history.session) else { - return false; - }; - - let timestamp = history.timestamp.unix_timestamp(); - - // Update global frecency - self.global_frecency.record_use(timestamp); - - // Update pre-computed indexes for O(1) filter lookups - let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); - self.directories.insert(dir_key); - self.hosts.insert(interner.get_or_intern(&history.hostname)); - self.sessions.insert(session); - - // Update most recent if this invocation is newer - if timestamp > self.most_recent_timestamp { - self.most_recent_id = history_id; - self.most_recent_timestamp = timestamp; - } - - true - } - - /// Get the most recent history ID for this command. - pub(crate) fn most_recent_id(&self) -> String { - format_uuid_bytes(&self.most_recent_id) - } - - /// Check if any invocation matches a directory filter (exact match). - /// O(1) lookup using pre-computed index. - pub(crate) fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool { - interner - .get(dir) - .is_some_and(|spur| self.directories.contains(&spur)) - } - - /// Check if any invocation matches a directory prefix (workspace/git root). - /// O(n) where n = number of unique directories for this command. - pub(crate) fn has_invocation_in_workspace( - &self, - prefix: &str, - interner: &ThreadedRodeo, - ) -> bool { - self.directories - .iter() - .any(|&spur| interner.resolve(&spur).starts_with(prefix)) - } - - /// Check if any invocation matches a hostname. - /// O(1) lookup using pre-computed index. - pub(crate) fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool { - interner - .get(hostname) - .is_some_and(|spur| self.hosts.contains(&spur)) - } - - /// Check if any invocation matches a session. - /// O(1) lookup using pre-computed index. - pub(crate) fn has_invocation_in_session(&self, session: &str) -> bool { - parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes)) - } -} - -/// Filter mode for search queries. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum IndexFilterMode { - /// No filtering - search all commands. - Global, - /// Filter to commands run in a specific directory. - Directory(String), - /// Filter to commands run in a workspace (directory prefix). - Workspace(String), - /// Filter to commands run on a specific host. - Host(String), - /// Filter to commands run in a specific session. - Session(String), -} - -/// Context for search queries. -#[derive(Debug, Clone, Default)] -pub(crate) struct QueryContext { - #[expect(dead_code)] - pub(crate) cwd: Option<String>, - #[expect(dead_code)] - pub(crate) git_root: Option<String>, - #[expect(dead_code)] - pub(crate) hostname: Option<String>, - #[expect(dead_code)] - pub(crate) session_id: Option<String>, -} - -/// Shareable frecency map: command -> frecency score. -/// Wrapped in Arc for zero-copy sharing with scorer callbacks. -type FrecencyMap = Arc<HashMap<Arc<str>, u32>>; - -/// A deduplicated search index with frecency-based ranking. -/// -/// Commands are stored by their text, with metadata about all invocations. -/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. -/// -/// Global frecency is precomputed by a background task and used for scoring. -/// If frecency data is not available, search still works but without frecency ranking; -/// although this should never happen due to precomputing the frecency map. -pub(crate) struct SearchIndex { - /// Map from command text to command data. - /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. - /// Keys are Arc<str> to enable zero-copy sharing with frecency_map. - commands: Arc<DashMap<Arc<str>, CommandData>>, - /// Nucleo fuzzy matcher - items are command strings. - nucleo: RwLock<Nucleo<String>>, - /// Injector for adding new commands to Nucleo. - injector: Injector<String>, - /// Precomputed global frecency map. Updated by background task. - frecency_map: RwLock<Option<FrecencyMap>>, - /// String interner for deduplicating cwd, hostname, and directory paths. - interner: Arc<ThreadedRodeo>, -} - -impl SearchIndex { - /// Create a new empty search index. - pub(crate) fn new() -> Self { - let nucleo_config = atuin_nucleo::Config::DEFAULT; - // Single column for command text - let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1); - let injector = nucleo.injector(); - - Self { - commands: Arc::new(DashMap::new()), - nucleo: RwLock::new(nucleo), - injector, - frecency_map: RwLock::new(None), - interner: Arc::new(ThreadedRodeo::new()), - } - } - - /// Add a history entry to the index. - /// - /// If the command already exists, updates its invocation data. - /// If it's a new command, adds it to both the map and Nucleo. - pub(crate) fn add_history(&self, history: &History) { - let command = history.command.as_str(); - - // DashMap with Arc<str> keys can be looked up with &str via Borrow trait - if let Some(mut entry) = self.commands.get_mut(command) { - // Existing command - just update invocations - entry.add_invocation(history, &self.interner); - } else { - // New command - create Arc<str> once and share it - let Some(data) = CommandData::new(history, &self.interner) else { - return; // Invalid UUIDs, skip this entry - }; - let command_arc: Arc<str> = command.into(); - self.commands.insert(Arc::clone(&command_arc), data); - // Nucleo still needs String (unavoidable copy for fuzzy matching) - self.injector.push(command_arc.to_string(), |cmd, cols| { - cols[0] = cmd.clone().into(); - }); - } - // Note: frecency_map is rebuilt by background task, not invalidated here - } - - /// Add multiple history entries to the index. - pub(crate) fn add_histories(&self, histories: &[History]) { - for history in histories { - self.add_history(history); - } - } - - /// Get the number of unique commands in the index. - pub(crate) fn command_count(&self) -> usize { - self.commands.len() - } - - /// Search for commands matching a query. - /// - /// Returns a list of history IDs (most recent invocation per command). - /// Uses precomputed global frecency for scoring if available. - #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] - #[expect( - clippy::significant_drop_tightening, - reason = "The nucleo early drop is a false-positive" - )] - pub(crate) async fn search( - &self, - query: &str, - filter_mode: IndexFilterMode, - // TODO(@bpeetz): Use the query context here <2026-06-12> - #[expect(unused)] context: &QueryContext, - limit: u32, - ) -> Vec<String> { - let mut nucleo = self.nucleo.write().await; - - // Get precomputed frecency map (may be None if not yet computed) - let frecency_map = self.frecency_map.read().await.clone(); - - // Build filter based on mode - let filter = self.build_filter(&filter_mode); - nucleo.set_filter(filter); - - // Build scorer from precomputed frecency (or None if not available) - let scorer = Self::build_scorer(frecency_map); - nucleo.set_scorer(scorer); - - // Update pattern - nucleo.pattern.reparse( - 0, - query, - pattern::CaseMatching::Smart, - pattern::Normalization::Smart, - false, - ); - - tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { - // Tick until complete - while nucleo.tick(10).running {} - }); - - // Collect results - let snapshot = nucleo.snapshot(); - let matched_count = snapshot.matched_item_count().min(limit); - - tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { - snapshot - .matched_items(..matched_count) - .filter_map(|item| { - let cmd = item.data; - // DashMap<Arc<str>, _>::get accepts &str via Borrow trait - self.commands - .get(cmd.as_str()) - .map(|data| data.most_recent_id()) - }) - .collect() - }) - } - - /// Rebuild the global frecency map. - /// - /// This should be called by a background task periodically. - /// The map is used for scoring search results. - /// - /// Uses multipliers from search settings: - /// - `recency_score_multiplier`: Weight for recency component - /// - `frequency_score_multiplier`: Weight for frequency component - /// - `frecency_score_multiplier`: Overall multiplier for final score - #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] - pub(crate) async fn rebuild_frecency(&self, search_settings: &Search) { - let now = OffsetDateTime::now_utc().unix_timestamp(); - let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new(); - - // Clamp multipliers to non-negative values to prevent broken frecency ranking - // (negative values would produce unexpected results when cast to u32) - let recency_mul = search_settings.recency_score_multiplier.max(0.0); - let frequency_mul = search_settings.frequency_score_multiplier.max(0.0); - let frecency_mul = search_settings.frecency_score_multiplier.max(0.0); - - for entry in self.commands.iter() { - let frecency = entry - .global_frecency - .compute(now, recency_mul, frequency_mul); - // Apply overall frecency multiplier and round to u32 - let frecency = (frecency as f64 * frecency_mul).round() as u32; - // Arc::clone is cheap - just increments reference count - frecency_map.insert(Arc::clone(entry.key()), frecency); - } - - *self.frecency_map.write().await = Some(Arc::new(frecency_map)); - } - - /// Build filter predicate for the given mode. - fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> { - // For Global mode, no filter needed - if matches!(mode, IndexFilterMode::Global) { - return None; - } - - // Pre-compute which commands pass the filter - // Use HashSet<String> for the short-lived filter (simpler than Arc lookup) - let passing_commands: Arc<HashSet<String>> = { - let mut set = HashSet::new(); - for entry in self.commands.iter() { - let passes = match mode { - IndexFilterMode::Global => unreachable!(), - IndexFilterMode::Directory(dir) => { - entry.has_invocation_in_dir(dir, &self.interner) - } - IndexFilterMode::Workspace(prefix) => { - entry.has_invocation_in_workspace(prefix, &self.interner) - } - IndexFilterMode::Host(hostname) => { - entry.has_invocation_on_host(hostname, &self.interner) - } - IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), - }; - if passes { - // Convert Arc<str> to String for filter lookup - set.insert(entry.key().to_string()); - } - } - Arc::new(set) - }; - - Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) - } - - /// Build scorer from precomputed frecency map. - /// - /// Returns None if frecency map is not available (search still works, just without frecency ranking). - fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> { - let map = frecency_map?; - Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { - // HashMap<Arc<str>, _>::get accepts &str via Borrow trait - let frecency = map.get(cmd.as_str()).copied().unwrap_or(0); - fuzzy_score + frecency - })) - } -} - -impl Default for SearchIndex { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use time::macros::datetime; - - #[test] - fn frecency_data_compute() { - let now = 1_000_000i64; - - // Recent command (with default multipliers of 1.0) - let recent = FrecencyData { - count: 5, - last_used: now - 60, // 1 minute ago - }; - assert!(recent.compute(now, 1.0, 1.0) > 100); // High score - - // Old command - let old = FrecencyData { - count: 5, - last_used: now - 86400 * 30, // 30 days ago - }; - assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0)); - - // Frequently used old command - let frequent_old = FrecencyData { - count: 100, - last_used: now - 86400 * 7, // 1 week ago - }; - // Should still have decent score due to frequency - assert!(frequent_old.compute(now, 1.0, 1.0) > 50); - } - - #[test] - fn frecency_data_compute_with_multipliers() { - let now = 1_000_000_i64; - - let data = FrecencyData { - count: 5, - last_used: now - 60, // 1 minute ago (recency_score = 100) - }; - - // Default multipliers (1.0, 1.0) - let default_score = data.compute(now, 1.0, 1.0); - - // Double recency weight - let double_recency = data.compute(now, 2.0, 1.0); - assert!(double_recency > default_score); - - // Double frequency weight - let double_frequency = data.compute(now, 1.0, 2.0); - assert!(double_frequency > default_score); - - // Zero out recency (only frequency counts) - let no_recency = data.compute(now, 0.0, 1.0); - assert!(no_recency < default_score); - - // Zero out frequency (only recency counts) - let no_frequency = data.compute(now, 1.0, 0.0); - assert!(no_frequency < default_score); - - // Zero both (should be zero) - let no_score = data.compute(now, 0.0, 0.0); - assert_eq!(no_score, 0); - - // Fractional multipliers - let half_recency = data.compute(now, 0.5, 1.0); - assert!(half_recency < default_score); - assert!(half_recency > no_recency); - - // 1.5x multiplier - let boost_recency = data.compute(now, 1.5, 1.0); - assert!(boost_recency > default_score); - assert!(boost_recency < double_recency); - } - - #[test] - fn command_data_add_invocation() { - let interner = ThreadedRodeo::new(); - - let (dir1, dir2) = if cfg!(windows) { - ("C:\\Users\\User\\project", "C:\\Users\\User\\other") - } else { - ("/home/user/project", "/home/user/other") - }; - - let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); - let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); - - let mut data = CommandData::new(&history1, &interner).unwrap(); - assert_eq!(data.global_frecency.count, 1); - let id1 = data.most_recent_id(); - - data.add_invocation(&history2, &interner); - assert_eq!(data.global_frecency.count, 2); - - // Most recent ID should update to history2 (newer timestamp) - let id2 = data.most_recent_id(); - assert_ne!(id1, id2); - } - - #[test] - fn command_data_filters() { - let interner = ThreadedRodeo::new(); - - let (dir1, dir2) = if cfg!(windows) { - ("C:\\Users\\User\\project", "C:\\Users\\User\\other") - } else { - ("/home/user/project", "/home/user/other") - }; - - let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); - let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); - - let mut data = CommandData::new(&h1, &interner).unwrap(); - data.add_invocation(&h2, &interner); - - let (check1, check2, check3) = if cfg!(windows) { - ( - with_trailing_slash("C:\\Users\\User\\project"), - with_trailing_slash("C:\\Users\\User\\other"), - with_trailing_slash("C:\\Users\\User\\missing"), - ) - } else { - ( - with_trailing_slash("/home/user/project"), - with_trailing_slash("/home/user/other"), - with_trailing_slash("/home/user/missing"), - ) - }; - - assert!(data.has_invocation_in_dir(&check1, &interner)); - assert!(data.has_invocation_in_dir(&check2, &interner)); - assert!(!data.has_invocation_in_dir(&check3, &interner)); - - let (check1, check2, check3) = if cfg!(windows) { - ( - with_trailing_slash("C:\\Users\\User"), - with_trailing_slash("C:\\Users"), - with_trailing_slash("C:\\Users\\User\\var"), - ) - } else { - ( - with_trailing_slash("/home/user"), - with_trailing_slash("/home"), - with_trailing_slash("/var"), - ) - }; - - assert!(data.has_invocation_in_workspace(&check1, &interner)); - assert!(data.has_invocation_in_workspace(&check2, &interner)); - assert!(!data.has_invocation_in_workspace(&check3, &interner)); - } - - #[tokio::test] - async fn search_index_add_and_search() { - let index = SearchIndex::new(); - - let h1 = make_history( - "git status", - "/home/user/project", - datetime!(2024-01-01 10:00 UTC), - ); - let h2 = make_history( - "git commit -m 'test'", - "/home/user/project", - datetime!(2024-01-01 10:05 UTC), - ); - let h3 = make_history( - "ls -la", - "/home/user/other", - datetime!(2024-01-01 10:10 UTC), - ); - - index.add_history(&h1); - index.add_history(&h2); - index.add_history(&h3); - - assert_eq!(index.command_count(), 3); - - // Search for "git" - should match 2 commands - let results = index - .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) - .await; - assert_eq!(results.len(), 2); - - // Search with directory filter - let results = index - .search( - "", - IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), - &QueryContext::default(), - 10, - ) - .await; - assert_eq!(results.len(), 2); // git status and git commit - } -} diff --git a/crates/turtle/src/atuin_daemon/search/mod.rs b/crates/turtle/src/atuin_daemon/search/mod.rs index 51b6c6cc..60d923cd 100644 --- a/crates/turtle/src/atuin_daemon/search/mod.rs +++ b/crates/turtle/src/atuin_daemon/search/mod.rs @@ -1,11 +1,678 @@ -//! Search module for the daemon gRPC search service. +//! Search index with frecency-based ranking. //! -//! This module provides fuzzy search over command history using Nucleo. +//! This module provides a deduplicated search index where each unique command +//! is stored once, with metadata about all its invocations. This enables: +//! +//! - Efficient fuzzy matching (fewer items to match) +//! - Frecency-based ranking (frequency + recency) +//! - Dynamic filtering by directory, host, session, etc. + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use crate::atuin_client::settings::Search; +use crate::{ + atuin_client::history::History, atuin_daemon::components::search::with_trailing_slash, +}; +use atuin_nucleo::{Injector, Nucleo, pattern}; +use dashmap::DashMap; +use lasso::{Spur, ThreadedRodeo}; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{Level, instrument}; +use uuid::Uuid; + +/// Parse a UUID string into a 16-byte array. +/// Returns None if the string is not a valid UUID. +fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> { + Uuid::parse_str(s).ok().map(|u| *u.as_bytes()) +} + +/// Format a 16-byte array as a UUID string. +fn format_uuid_bytes(bytes: &[u8; 16]) -> String { + Uuid::from_bytes(*bytes).to_string() +} + +/// Pre-computed frecency data for O(1) lookup. +#[derive(Debug, Clone, Default)] +pub(crate) struct FrecencyData { + /// Total number of times this command was used. + pub(crate) count: u32, + /// Most recent usage timestamp (unix seconds). + pub(crate) last_used: i64, +} + +impl FrecencyData { + /// Record a new usage of this command. + pub(crate) fn record_use(&mut self, timestamp: i64) { + self.count += 1; + if timestamp > self.last_used { + self.last_used = timestamp; + } + } + + /// Compute frecency score based on count and recency. + /// + /// Uses a decay function where more recent commands score higher. + /// The formula balances frequency (how often) with recency (how recent). + /// + /// Multipliers allow tuning the relative weights: + /// - `recency_mul`: Multiplier for recency score (default: 1.0) + /// - `frequency_mul`: Multiplier for frequency score (default: 1.0) + /// + /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight. + /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc. + #[instrument(level = Level::TRACE, name = "index_frecency_compute")] + pub(crate) fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 { + if self.count == 0 { + return 0; + } + + // Time-based decay: score decreases as time passes + let age_seconds = (now - self.last_used).max(0) as u64; + let age_hours = age_seconds / 3600; + + // Decay factor: recent commands get higher scores + // - Last hour: multiplier ~1.0 + // - Last day: multiplier ~0.5 + // - Last week: multiplier ~0.1 + // - Older: multiplier approaches 0 + let recency_score: f64 = match age_hours { + 0 => 100.0, + 1..=6 => 90.0, + 7..=24 => 70.0, + 25..=72 => 50.0, + 73..=168 => 30.0, + 169..=720 => 15.0, + _ => 5.0, + }; + + // Frequency boost: more uses = higher score (with diminishing returns) + let frequency_score = (f64::from(self.count).ln() * 20.0).min(100.0); + + // Apply multipliers and combine scores, then round to u32 + ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32 + } +} + +/// Data for a unique command. +pub(crate) struct CommandData { + /// History ID of the most recent invocation (16-byte UUID). + most_recent_id: [u8; 16], + /// Timestamp of the most recent invocation. + most_recent_timestamp: i64, + /// Pre-computed global frecency. + pub(crate) global_frecency: FrecencyData, + + // Pre-computed indexes for O(1) filter lookups + // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized) + /// All directories where this command has been run (interned keys). + directories: HashSet<Spur>, + /// All hostnames where this command has been run (interned keys). + hosts: HashSet<Spur>, + /// All sessions where this command has been run (as 16-byte UUIDs). + sessions: HashSet<[u8; 16]>, +} + +impl CommandData { + /// Create a new [`CommandData`] from a history entry. + /// Returns None if the history entry has invalid UUIDs. + pub(crate) fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> { + let history_id = parse_uuid_bytes(&history.id.0)?; + let session = parse_uuid_bytes(&history.session)?; + let timestamp = history.timestamp.unix_timestamp(); + + let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); + let host_key = interner.get_or_intern(&history.hostname); + + let mut directories = HashSet::new(); + directories.insert(dir_key); + + let mut hosts = HashSet::new(); + hosts.insert(host_key); + + let mut sessions = HashSet::new(); + sessions.insert(session); + + let mut global_frecency = FrecencyData::default(); + global_frecency.record_use(timestamp); + + Some(Self { + most_recent_id: history_id, + most_recent_timestamp: timestamp, + global_frecency, + directories, + hosts, + sessions, + }) + } + + /// Add an invocation from a history entry. + /// Returns false if the history entry has invalid UUIDs. + pub(crate) fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool { + let Some(history_id) = parse_uuid_bytes(&history.id.0) else { + return false; + }; + let Some(session) = parse_uuid_bytes(&history.session) else { + return false; + }; + + let timestamp = history.timestamp.unix_timestamp(); + + // Update global frecency + self.global_frecency.record_use(timestamp); + + // Update pre-computed indexes for O(1) filter lookups + let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd)); + self.directories.insert(dir_key); + self.hosts.insert(interner.get_or_intern(&history.hostname)); + self.sessions.insert(session); + + // Update most recent if this invocation is newer + if timestamp > self.most_recent_timestamp { + self.most_recent_id = history_id; + self.most_recent_timestamp = timestamp; + } + + true + } + + /// Get the most recent history ID for this command. + pub(crate) fn most_recent_id(&self) -> String { + format_uuid_bytes(&self.most_recent_id) + } + + /// Check if any invocation matches a directory filter (exact match). + /// O(1) lookup using pre-computed index. + pub(crate) fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool { + interner + .get(dir) + .is_some_and(|spur| self.directories.contains(&spur)) + } + + /// Check if any invocation matches a directory prefix (workspace/git root). + /// O(n) where n = number of unique directories for this command. + pub(crate) fn has_invocation_in_workspace( + &self, + prefix: &str, + interner: &ThreadedRodeo, + ) -> bool { + self.directories + .iter() + .any(|&spur| interner.resolve(&spur).starts_with(prefix)) + } + + /// Check if any invocation matches a hostname. + /// O(1) lookup using pre-computed index. + pub(crate) fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool { + interner + .get(hostname) + .is_some_and(|spur| self.hosts.contains(&spur)) + } + + /// Check if any invocation matches a session. + /// O(1) lookup using pre-computed index. + pub(crate) fn has_invocation_in_session(&self, session: &str) -> bool { + parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes)) + } +} + +/// Filter mode for search queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum IndexFilterMode { + /// No filtering - search all commands. + Global, + /// Filter to commands run in a specific directory. + Directory(String), + /// Filter to commands run in a workspace (directory prefix). + Workspace(String), + /// Filter to commands run on a specific host. + Host(String), + /// Filter to commands run in a specific session. + Session(String), +} + +/// Context for search queries. +#[derive(Debug, Clone, Default)] +pub(crate) struct QueryContext { + #[expect(dead_code)] + pub(crate) cwd: Option<String>, + #[expect(dead_code)] + pub(crate) git_root: Option<String>, + #[expect(dead_code)] + pub(crate) hostname: Option<String>, + #[expect(dead_code)] + pub(crate) session_id: Option<String>, +} + +/// Shareable frecency map: command -> frecency score. +/// Wrapped in Arc for zero-copy sharing with scorer callbacks. +type FrecencyMap = Arc<HashMap<Arc<str>, u32>>; + +/// A deduplicated search index with frecency-based ranking. +/// +/// Commands are stored by their text, with metadata about all invocations. +/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. +/// +/// Global frecency is precomputed by a background task and used for scoring. +/// If frecency data is not available, search still works but without frecency ranking; +/// although this should never happen due to precomputing the frecency map. +pub(crate) struct SearchIndex { + /// Map from command text to command data. + /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. + /// Keys are Arc<str> to enable zero-copy sharing with frecency_map. + commands: Arc<DashMap<Arc<str>, CommandData>>, + /// Nucleo fuzzy matcher - items are command strings. + nucleo: RwLock<Nucleo<String>>, + /// Injector for adding new commands to Nucleo. + injector: Injector<String>, + /// Precomputed global frecency map. Updated by background task. + frecency_map: RwLock<Option<FrecencyMap>>, + /// String interner for deduplicating cwd, hostname, and directory paths. + interner: Arc<ThreadedRodeo>, +} + +impl SearchIndex { + /// Create a new empty search index. + pub(crate) fn new() -> Self { + let nucleo_config = atuin_nucleo::Config::DEFAULT; + // Single column for command text + let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1); + let injector = nucleo.injector(); + + Self { + commands: Arc::new(DashMap::new()), + nucleo: RwLock::new(nucleo), + injector, + frecency_map: RwLock::new(None), + interner: Arc::new(ThreadedRodeo::new()), + } + } + + /// Add a history entry to the index. + /// + /// If the command already exists, updates its invocation data. + /// If it's a new command, adds it to both the map and Nucleo. + pub(crate) fn add_history(&self, history: &History) { + let command = history.command.as_str(); + + // DashMap with Arc<str> keys can be looked up with &str via Borrow trait + if let Some(mut entry) = self.commands.get_mut(command) { + // Existing command - just update invocations + entry.add_invocation(history, &self.interner); + } else { + // New command - create Arc<str> once and share it + let Some(data) = CommandData::new(history, &self.interner) else { + return; // Invalid UUIDs, skip this entry + }; + let command_arc: Arc<str> = command.into(); + self.commands.insert(Arc::clone(&command_arc), data); + // Nucleo still needs String (unavoidable copy for fuzzy matching) + self.injector.push(command_arc.to_string(), |cmd, cols| { + cols[0] = cmd.clone().into(); + }); + } + // Note: frecency_map is rebuilt by background task, not invalidated here + } + + /// Add multiple history entries to the index. + pub(crate) fn add_histories(&self, histories: &[History]) { + for history in histories { + self.add_history(history); + } + } + + /// Get the number of unique commands in the index. + pub(crate) fn command_count(&self) -> usize { + self.commands.len() + } + + /// Search for commands matching a query. + /// + /// Returns a list of history IDs (most recent invocation per command). + /// Uses precomputed global frecency for scoring if available. + #[instrument(skip_all, level = Level::TRACE, name = "index_search", fields(query = %query))] + #[expect( + clippy::significant_drop_tightening, + reason = "The nucleo early drop is a false-positive" + )] + pub(crate) async fn search( + &self, + query: &str, + filter_mode: IndexFilterMode, + // TODO(@bpeetz): Use the query context here <2026-06-12> + #[expect(unused)] context: &QueryContext, + limit: u32, + ) -> Vec<String> { + let mut nucleo = self.nucleo.write().await; + + // Get precomputed frecency map (may be None if not yet computed) + let frecency_map = self.frecency_map.read().await.clone(); + + // Build filter based on mode + let filter = self.build_filter(&filter_mode); + nucleo.set_filter(filter); + + // Build scorer from precomputed frecency (or None if not available) + let scorer = Self::build_scorer(frecency_map); + nucleo.set_scorer(scorer); + + // Update pattern + nucleo.pattern.reparse( + 0, + query, + pattern::CaseMatching::Smart, + pattern::Normalization::Smart, + false, + ); + + tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { + // Tick until complete + while nucleo.tick(10).running {} + }); + + // Collect results + let snapshot = nucleo.snapshot(); + let matched_count = snapshot.matched_item_count().min(limit); + + tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { + snapshot + .matched_items(..matched_count) + .filter_map(|item| { + let cmd = item.data; + // DashMap<Arc<str>, _>::get accepts &str via Borrow trait + self.commands + .get(cmd.as_str()) + .map(|data| data.most_recent_id()) + }) + .collect() + }) + } + + /// Rebuild the global frecency map. + /// + /// This should be called by a background task periodically. + /// The map is used for scoring search results. + /// + /// Uses multipliers from search settings: + /// - `recency_score_multiplier`: Weight for recency component + /// - `frequency_score_multiplier`: Weight for frequency component + /// - `frecency_score_multiplier`: Overall multiplier for final score + #[instrument(skip_all, level = Level::DEBUG, name = "rebuild_frecency")] + pub(crate) async fn rebuild_frecency(&self, search_settings: &Search) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new(); + + // Clamp multipliers to non-negative values to prevent broken frecency ranking + // (negative values would produce unexpected results when cast to u32) + let recency_mul = search_settings.recency_score_multiplier.max(0.0); + let frequency_mul = search_settings.frequency_score_multiplier.max(0.0); + let frecency_mul = search_settings.frecency_score_multiplier.max(0.0); + + for entry in self.commands.iter() { + let frecency = entry + .global_frecency + .compute(now, recency_mul, frequency_mul); + // Apply overall frecency multiplier and round to u32 + let frecency = (frecency as f64 * frecency_mul).round() as u32; + // Arc::clone is cheap - just increments reference count + frecency_map.insert(Arc::clone(entry.key()), frecency); + } + + *self.frecency_map.write().await = Some(Arc::new(frecency_map)); + } + + /// Build filter predicate for the given mode. + fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> { + // For Global mode, no filter needed + if matches!(mode, IndexFilterMode::Global) { + return None; + } + + // Pre-compute which commands pass the filter + // Use HashSet<String> for the short-lived filter (simpler than Arc lookup) + let passing_commands: Arc<HashSet<String>> = { + let mut set = HashSet::new(); + for entry in self.commands.iter() { + let passes = match mode { + IndexFilterMode::Global => unreachable!(), + IndexFilterMode::Directory(dir) => { + entry.has_invocation_in_dir(dir, &self.interner) + } + IndexFilterMode::Workspace(prefix) => { + entry.has_invocation_in_workspace(prefix, &self.interner) + } + IndexFilterMode::Host(hostname) => { + entry.has_invocation_on_host(hostname, &self.interner) + } + IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), + }; + if passes { + // Convert Arc<str> to String for filter lookup + set.insert(entry.key().to_string()); + } + } + Arc::new(set) + }; + + Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) + } + + /// Build scorer from precomputed frecency map. + /// + /// Returns None if frecency map is not available (search still works, just without frecency ranking). + fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> { + let map = frecency_map?; + Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { + // HashMap<Arc<str>, _>::get accepts &str via Borrow trait + let frecency = map.get(cmd.as_str()).copied().unwrap_or(0); + fuzzy_score + frecency + })) + } +} + +impl Default for SearchIndex { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::macros::datetime; + + #[test] + fn frecency_data_compute() { + let now = 1_000_000i64; + + // Recent command (with default multipliers of 1.0) + let recent = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago + }; + assert!(recent.compute(now, 1.0, 1.0) > 100); // High score + + // Old command + let old = FrecencyData { + count: 5, + last_used: now - 86400 * 30, // 30 days ago + }; + assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0)); + + // Frequently used old command + let frequent_old = FrecencyData { + count: 100, + last_used: now - 86400 * 7, // 1 week ago + }; + // Should still have decent score due to frequency + assert!(frequent_old.compute(now, 1.0, 1.0) > 50); + } + + #[test] + fn frecency_data_compute_with_multipliers() { + let now = 1_000_000_i64; + + let data = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago (recency_score = 100) + }; + + // Default multipliers (1.0, 1.0) + let default_score = data.compute(now, 1.0, 1.0); + + // Double recency weight + let double_recency = data.compute(now, 2.0, 1.0); + assert!(double_recency > default_score); + + // Double frequency weight + let double_frequency = data.compute(now, 1.0, 2.0); + assert!(double_frequency > default_score); + + // Zero out recency (only frequency counts) + let no_recency = data.compute(now, 0.0, 1.0); + assert!(no_recency < default_score); + + // Zero out frequency (only recency counts) + let no_frequency = data.compute(now, 1.0, 0.0); + assert!(no_frequency < default_score); + + // Zero both (should be zero) + let no_score = data.compute(now, 0.0, 0.0); + assert_eq!(no_score, 0); + + // Fractional multipliers + let half_recency = data.compute(now, 0.5, 1.0); + assert!(half_recency < default_score); + assert!(half_recency > no_recency); + + // 1.5x multiplier + let boost_recency = data.compute(now, 1.5, 1.0); + assert!(boost_recency > default_score); + assert!(boost_recency < double_recency); + } + + #[test] + fn command_data_add_invocation() { + let interner = ThreadedRodeo::new(); + + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&history1, &interner).unwrap(); + assert_eq!(data.global_frecency.count, 1); + let id1 = data.most_recent_id(); + + data.add_invocation(&history2, &interner); + assert_eq!(data.global_frecency.count, 2); + + // Most recent ID should update to history2 (newer timestamp) + let id2 = data.most_recent_id(); + assert_ne!(id1, id2); + } + + #[test] + fn command_data_filters() { + let interner = ThreadedRodeo::new(); + + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&h1, &interner).unwrap(); + data.add_invocation(&h2, &interner); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User\\project"), + with_trailing_slash("C:\\Users\\User\\other"), + with_trailing_slash("C:\\Users\\User\\missing"), + ) + } else { + ( + with_trailing_slash("/home/user/project"), + with_trailing_slash("/home/user/other"), + with_trailing_slash("/home/user/missing"), + ) + }; + + assert!(data.has_invocation_in_dir(&check1, &interner)); + assert!(data.has_invocation_in_dir(&check2, &interner)); + assert!(!data.has_invocation_in_dir(&check3, &interner)); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User"), + with_trailing_slash("C:\\Users"), + with_trailing_slash("C:\\Users\\User\\var"), + ) + } else { + ( + with_trailing_slash("/home/user"), + with_trailing_slash("/home"), + with_trailing_slash("/var"), + ) + }; + + assert!(data.has_invocation_in_workspace(&check1, &interner)); + assert!(data.has_invocation_in_workspace(&check2, &interner)); + assert!(!data.has_invocation_in_workspace(&check3, &interner)); + } + + #[tokio::test] + async fn search_index_add_and_search() { + let index = SearchIndex::new(); + + let h1 = make_history( + "git status", + "/home/user/project", + datetime!(2024-01-01 10:00 UTC), + ); + let h2 = make_history( + "git commit -m 'test'", + "/home/user/project", + datetime!(2024-01-01 10:05 UTC), + ); + let h3 = make_history( + "ls -la", + "/home/user/other", + datetime!(2024-01-01 10:10 UTC), + ); + + index.add_history(&h1); + index.add_history(&h2); + index.add_history(&h3); -mod index; + assert_eq!(index.command_count(), 3); -// Include the generated proto code -tonic::include_proto!("search"); + // Search for "git" - should match 2 commands + let results = index + .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) + .await; + assert_eq!(results.len(), 2); -// Re-export the service and index -pub(crate) use index::{IndexFilterMode, QueryContext, SearchIndex}; + // Search with directory filter + let results = index + .search( + "", + IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), + &QueryContext::default(), + 10, + ) + .await; + assert_eq!(results.len(), 2); // git status and git commit + } +} diff --git a/crates/turtle/src/atuin_daemon/semantic/mod.rs b/crates/turtle/src/atuin_daemon/semantic/mod.rs deleted file mode 100644 index c3511676..00000000 --- a/crates/turtle/src/atuin_daemon/semantic/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Semantic command capture gRPC service types. - -tonic::include_proto!("semantic"); diff --git a/crates/turtle/src/atuin_daemon/server.rs b/crates/turtle/src/atuin_daemon/server.rs index 65272d2a..36954cca 100644 --- a/crates/turtle/src/atuin_daemon/server.rs +++ b/crates/turtle/src/atuin_daemon/server.rs @@ -1,23 +1,28 @@ use eyre::Result; -use crate::atuin_daemon::components::history::HistoryGrpcService; -use crate::atuin_daemon::components::search::SearchGrpcService; -use crate::atuin_daemon::components::semantic::SemanticGrpcService; -use crate::atuin_daemon::control::{ControlService, control_server::ControlServer}; -use crate::atuin_daemon::daemon::DaemonHandle; -use crate::atuin_daemon::history::history_server::HistoryServer; -use crate::atuin_daemon::search::search_server::SearchServer; -use crate::atuin_daemon::semantic::semantic_server::SemanticServer; - -use crate::atuin_client::settings::Settings; +use crate::{ + atuin_client::settings::Settings, + atuin_daemon::{ + components::{ + history::HistoryGrpcService, search::SearchGrpcService, semantic::SemanticGrpcService, + }, + daemon::DaemonHandle, + generated::{ + control::{ControlService, control_server::ControlServer}, + history::history_server::HistoryServer, + search::search_server::SearchServer, + semantic::semantic_server::SemanticServer, + }, + }, +}; /// Run the gRPC server with the given services. /// /// This starts the gRPC server in the background and returns immediately. -/// The server will shut down when a ShutdownRequested event is received. +/// The server will shut down when a [`ShutdownRequested`] event is received. #[cfg(unix)] -pub(crate) async fn run_grpc_server( - settings: Settings, +pub(crate) fn run_grpc_server( + settings: &Settings, history_service: HistoryServer<HistoryGrpcService>, search_service: SearchServer<SearchGrpcService>, semantic_service: SemanticServer<SemanticGrpcService>, @@ -75,15 +80,16 @@ pub(crate) async fn run_grpc_server( // Create shutdown signal from daemon handle let shutdown_signal = async move { let mut rx = handle.subscribe(); + loop { use crate::atuin_daemon::DaemonEvent; match rx.recv().await { - Ok(DaemonEvent::ShutdownRequested) => break, - Ok(_) => continue, - Err(_) => break, // Channel closed + Err(_) | Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => (), } } + if cleanup { eprintln!("Removing socket..."); if let Err(e) = std::fs::remove_file(&socket_path) |
