aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_daemon
diff options
context:
space:
mode:
Diffstat (limited to 'crates/turtle/src/atuin_daemon')
-rw-r--r--crates/turtle/src/atuin_daemon/client.rs57
-rw-r--r--crates/turtle/src/atuin_daemon/components/history.rs21
-rw-r--r--crates/turtle/src/atuin_daemon/components/search.rs13
-rw-r--r--crates/turtle/src/atuin_daemon/components/semantic.rs41
-rw-r--r--crates/turtle/src/atuin_daemon/components/sync.rs37
-rw-r--r--crates/turtle/src/atuin_daemon/control/mod.rs80
-rw-r--r--crates/turtle/src/atuin_daemon/control/service.rs71
-rw-r--r--crates/turtle/src/atuin_daemon/events.rs3
-rw-r--r--crates/turtle/src/atuin_daemon/generated.rs38
-rw-r--r--crates/turtle/src/atuin_daemon/history/mod.rs6
-rw-r--r--crates/turtle/src/atuin_daemon/mod.rs13
-rw-r--r--crates/turtle/src/atuin_daemon/search/index.rs678
-rw-r--r--crates/turtle/src/atuin_daemon/search/mod.rs681
-rw-r--r--crates/turtle/src/atuin_daemon/semantic/mod.rs3
-rw-r--r--crates/turtle/src/atuin_daemon/server.rs38
15 files changed, 909 insertions, 871 deletions
diff --git a/crates/turtle/src/atuin_daemon/client.rs b/crates/turtle/src/atuin_daemon/client.rs
index 325b21b8..4ec1a60b 100644
--- a/crates/turtle/src/atuin_daemon/client.rs
+++ b/crates/turtle/src/atuin_daemon/client.rs
@@ -1,5 +1,3 @@
-use crate::atuin_client::database::Context;
-use crate::atuin_client::settings::{FilterMode, Settings};
use eyre::{Context as EyreContext, Result};
use tonic::Code;
use tonic::transport::{Channel, Endpoint, Uri};
@@ -9,27 +7,38 @@ use hyper_util::rt::TokioIo;
#[cfg(unix)]
use tokio::net::UnixStream;
-
-use crate::atuin_client::history::History;
use tracing::{Level, instrument, span};
-use crate::atuin_daemon::control::HistoryRebuiltEvent;
-use crate::atuin_daemon::control::{
- ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
- SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
-};
-use crate::atuin_daemon::events::DaemonEvent;
-use crate::atuin_daemon::history::{
- EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
- StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest,
- history_client::HistoryClient as HistoryServiceClient,
-};
-use crate::atuin_daemon::search::{
- FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
- search_client::SearchClient as SearchServiceClient,
-};
-use crate::atuin_daemon::semantic::{
- CommandCapture, RecordCommandsReply, semantic_client::SemanticClient as SemanticServiceClient,
+use crate::atuin_daemon::generated;
+use crate::{
+ atuin_client::{
+ database::Context,
+ history::History,
+ settings::{FilterMode, Settings},
+ },
+ atuin_daemon::{
+ events::DaemonEvent,
+ generated::{
+ control::{
+ ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, HistoryRebuiltEvent,
+ SendEventRequest, SettingsReloadedEvent, ShutdownEvent,
+ control_client::ControlClient as ControlServiceClient,
+ },
+ history::{
+ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply,
+ StartHistoryRequest, StatusReply, StatusRequest, TailHistoryReply,
+ TailHistoryRequest, history_client::HistoryClient as HistoryServiceClient,
+ },
+ search::{
+ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest,
+ SearchResponse, search_client::SearchClient as SearchServiceClient,
+ },
+ semantic::{
+ CommandCapture, RecordCommandsReply,
+ semantic_client::SemanticClient as SemanticServiceClient,
+ },
+ },
+ },
};
pub(crate) struct HistoryClient {
@@ -311,10 +320,8 @@ impl ControlClient {
}
/// Convert a daemon event to its proto representation.
-fn daemon_event_to_proto(
- event: DaemonEvent,
-) -> crate::atuin_daemon::control::send_event_request::Event {
- use crate::atuin_daemon::control::send_event_request::Event;
+fn daemon_event_to_proto(event: DaemonEvent) -> generated::control::send_event_request::Event {
+ use generated::control::send_event_request::Event;
match event {
DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
diff --git a/crates/turtle/src/atuin_daemon/components/history.rs b/crates/turtle/src/atuin_daemon/components/history.rs
index b71543c1..b4f91b06 100644
--- a/crates/turtle/src/atuin_daemon/components/history.rs
+++ b/crates/turtle/src/atuin_daemon/components/history.rs
@@ -18,7 +18,7 @@ use tracing::{Level, instrument};
use crate::atuin_daemon::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
- history::{
+ generated::history::{
EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
TailHistoryReply, TailHistoryRequest,
@@ -144,8 +144,8 @@ impl HistorySvc for HistoryGrpcService {
) -> Result<Response<StartHistoryReply>, Status> {
let req = request.into_inner();
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ let timestamp = OffsetDateTime::from_unix_timestamp_nanos(i128::from(req.timestamp))
+ .map_err(|_| {
Status::invalid_argument(
"failed to parse timestamp as unix time (expected nanos since epoch)",
)
@@ -181,6 +181,7 @@ impl HistorySvc for HistoryGrpcService {
}
#[instrument(skip_all, level = Level::INFO)]
+ #[expect(clippy::significant_drop_tightening, reason = "Would be a logic-bug")]
async fn end_history(
&self,
request: Request<EndHistoryRequest>,
@@ -216,11 +217,7 @@ impl HistorySvc for HistoryGrpcService {
.await
.map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
+ tracing::info!(id = id.0, duration = history.duration, "end history");
// Push to record store
let (record_id, idx) = history_store
@@ -247,6 +244,7 @@ impl HistorySvc for HistoryGrpcService {
}
#[instrument(skip_all, level = Level::INFO)]
+ #[expect(clippy::significant_drop_tightening, reason = "Would be a logic-bug")]
async fn tail_history(
&self,
_request: Request<TailHistoryRequest>,
@@ -265,11 +263,12 @@ impl HistorySvc for HistoryGrpcService {
let event = match rx.recv().await {
Ok(event) => event,
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
- let _ = tx
- .send(Err(Status::resource_exhausted(format!(
+ drop(
+ tx.send(Err(Status::resource_exhausted(format!(
"tail stream lagged behind and dropped {skipped} events"
))))
- .await;
+ .await,
+ );
break;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
diff --git a/crates/turtle/src/atuin_daemon/components/search.rs b/crates/turtle/src/atuin_daemon/components/search.rs
index 832d05d8..39df758b 100644
--- a/crates/turtle/src/atuin_daemon/components/search.rs
+++ b/crates/turtle/src/atuin_daemon/components/search.rs
@@ -15,10 +15,11 @@ use uuid::Uuid;
use crate::atuin_daemon::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
- search::{
- FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
+ generated::search::{
+ self, FilterMode, SearchRequest, SearchResponse,
search_server::{Search as SearchSvc, SearchServer},
},
+ search::{IndexFilterMode, QueryContext, SearchIndex},
};
const PAGE_SIZE: usize = 5000;
@@ -35,7 +36,7 @@ const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
/// - Provides the Search gRPC service
pub(crate) struct SearchComponent {
index: Arc<RwLock<SearchIndex>>,
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+ handle: RwLock<Option<DaemonHandle>>,
loader_handle: Option<tokio::task::JoinHandle<()>>,
frecency_handle: Option<tokio::task::JoinHandle<()>>,
}
@@ -45,7 +46,7 @@ impl SearchComponent {
pub(crate) fn new() -> Self {
Self {
index: Arc::new(RwLock::new(SearchIndex::new())),
- handle: tokio::sync::RwLock::new(None),
+ handle: RwLock::new(None),
loader_handle: None,
frecency_handle: None,
}
@@ -351,7 +352,7 @@ impl SearchSvc for SearchGrpcService {
}
}
Err(e) => {
- let _ = tx.send(Err(e)).await;
+ drop(tx.send(Err(e)).await);
break;
}
}
@@ -367,7 +368,7 @@ impl SearchSvc for SearchGrpcService {
/// Convert proto FilterMode and context to IndexFilterMode.
fn convert_filter_mode(
mode: FilterMode,
- context: &Option<crate::atuin_daemon::search::SearchContext>,
+ context: &Option<search::SearchContext>,
) -> IndexFilterMode {
match (mode, context) {
(FilterMode::Global, _) => IndexFilterMode::Global,
diff --git a/crates/turtle/src/atuin_daemon/components/semantic.rs b/crates/turtle/src/atuin_daemon/components/semantic.rs
index 052c2d73..69ffc134 100644
--- a/crates/turtle/src/atuin_daemon/components/semantic.rs
+++ b/crates/turtle/src/atuin_daemon/components/semantic.rs
@@ -9,6 +9,7 @@ use std::fmt::{Display, Formatter};
use std::sync::Arc;
use crate::atuin_client::history::{History, HistoryId};
+use crate::atuin_daemon::generated::semantic;
use eyre::Result;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status, Streaming};
@@ -17,7 +18,7 @@ use tracing::{Level, instrument};
use crate::atuin_daemon::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
- semantic::{
+ generated::semantic::{
CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply,
semantic_server::{Semantic as SemanticSvc, SemanticServer},
},
@@ -244,7 +245,7 @@ impl SemanticState {
fn command_output_for_ref(
&self,
capture_ref: &CaptureRef,
- ranges: &[crate::atuin_daemon::semantic::OutputRange],
+ ranges: &[semantic::OutputRange],
) -> Option<CommandOutputReply> {
let stored = self
.sessions
@@ -534,17 +535,14 @@ fn command_output_not_found() -> CommandOutputReply {
}
}
-fn select_output_ranges(
- output: &str,
- ranges: &[crate::atuin_daemon::semantic::OutputRange],
-) -> Vec<OutputLine> {
+fn select_output_ranges(output: &str, ranges: &[semantic::OutputRange]) -> Vec<OutputLine> {
let lines: Vec<&str> = output.lines().collect();
if lines.is_empty() {
return Vec::new();
}
let ranges = if ranges.is_empty() {
- vec![crate::atuin_daemon::semantic::OutputRange { start: 0, end: 999 }]
+ vec![semantic::OutputRange { start: 0, end: 999 }]
} else {
ranges.to_vec()
};
@@ -627,9 +625,20 @@ fn log_record(record: &SemanticCommandRecord, message: &'static str) {
#[cfg(test)]
mod tests {
- use super::*;
use time::OffsetDateTime;
+ use crate::{
+ atuin_client::history::{History, HistoryId},
+ atuin_daemon::{
+ components::semantic::{
+ MAX_COMMANDS_PER_SESSION, MAX_SESSIONS, SemanticCommandRecord, SemanticState,
+ SessionCaptures, SessionId, select_output_ranges,
+ },
+ generated::semantic::{self, CommandOutputReply, CommandOutputRequest, OutputLine},
+ },
+ atuin_pty_proxy::CommandCapture,
+ };
+
fn history(id: &str, session: &str, command: &str) -> History {
History {
id: HistoryId(id.to_string()),
@@ -819,8 +828,8 @@ mod tests {
fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() {
let output = "zero\none\ntwo\nthree\nfour";
let ranges = vec![
- crate::atuin_daemon::semantic::OutputRange { start: 1, end: 2 },
- crate::atuin_daemon::semantic::OutputRange { start: -2, end: -1 },
+ semantic::OutputRange { start: 1, end: 2 },
+ semantic::OutputRange { start: -2, end: -1 },
];
assert_eq!(
@@ -841,8 +850,8 @@ mod tests {
.collect::<Vec<_>>()
.join("\n");
let ranges = vec![
- crate::atuin_daemon::semantic::OutputRange { start: 0, end: 100 },
- crate::atuin_daemon::semantic::OutputRange {
+ semantic::OutputRange { start: 0, end: 100 },
+ semantic::OutputRange {
start: -100,
end: -1,
},
@@ -859,8 +868,8 @@ mod tests {
fn output_ranges_can_leave_gaps_for_client_formatting() {
let output = "zero\none\ntwo\nthree\nfour";
let ranges = vec![
- crate::atuin_daemon::semantic::OutputRange { start: 0, end: 1 },
- crate::atuin_daemon::semantic::OutputRange { start: 4, end: 4 },
+ semantic::OutputRange { start: 0, end: 1 },
+ semantic::OutputRange { start: 4, end: 4 },
];
assert_eq!(
@@ -891,8 +900,8 @@ mod tests {
fn output_ranges_skip_ranges_fully_outside_output() {
let output = "zero\none\ntwo";
let ranges = vec![
- crate::atuin_daemon::semantic::OutputRange { start: 10, end: 20 },
- crate::atuin_daemon::semantic::OutputRange {
+ semantic::OutputRange { start: 10, end: 20 },
+ semantic::OutputRange {
start: -20,
end: -10,
},
diff --git a/crates/turtle/src/atuin_daemon/components/sync.rs b/crates/turtle/src/atuin_daemon/components/sync.rs
index fbfbbd67..8b5b621d 100644
--- a/crates/turtle/src/atuin_daemon/components/sync.rs
+++ b/crates/turtle/src/atuin_daemon/components/sync.rs
@@ -27,9 +27,9 @@ enum SyncCommand {
/// Sync state - tracks whether we're in normal operation or retrying after failure.
#[derive(Clone, Copy, PartialEq, Eq)]
enum SyncState {
- /// Normal operation. Periodic syncs only run if auto_sync is enabled.
+ /// Normal operation. Periodic syncs only run if [`auto_sync`] is enabled.
Idle,
- /// Retrying after a sync failure. Retries continue regardless of auto_sync
+ /// Retrying after a sync failure. Retries continue regardless of [`auto_sync`]
/// until the sync succeeds.
Retrying,
}
@@ -39,7 +39,7 @@ enum SyncState {
/// This component:
/// - Runs a background sync loop on a configurable interval
/// - Implements exponential backoff on sync failures
-/// - Responds to ForceSync events for immediate sync
+/// - Responds to [`ForceSync`] events for immediate sync
/// - Emits SyncCompleted/SyncFailed events
pub(crate) struct SyncComponent {
task_handle: Option<tokio::task::JoinHandle<()>>,
@@ -80,22 +80,28 @@ impl Component for SyncComponent {
}
async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- if let DaemonEvent::ForceSync = event {
- tracing::info!("force sync requested");
- if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::ForceSync).await;
+ match event {
+ DaemonEvent::ForceSync => {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ drop(tx.send(SyncCommand::ForceSync).await);
+ }
+ }
+ DaemonEvent::SyncFailed { error } => {
+ tracing::error!(?error, "Sync failed.");
}
+ _ => (),
}
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::Stop).await;
+ drop(tx.send(SyncCommand::Stop).await);
}
if let Some(handle) = self.task_handle.take() {
// Give the task a moment to shut down gracefully
- let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ drop(time::timeout(Duration::from_secs(5), handle).await);
}
tracing::info!("sync component stopped");
Ok(())
@@ -126,7 +132,7 @@ async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>
// Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
- let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+ let mut ticker = time::interval(Duration::from_secs(settings.daemon.sync_frequency));
// IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
// we may end up running a lot of syncs in a hot loop.
@@ -224,10 +230,10 @@ async fn do_sync_tick(
}
*ticker = time::interval_at(
- tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
- time::Duration::from_secs(new_interval as u64),
+ time::Instant::now() + Duration::from_secs(new_interval as u64),
+ Duration::from_secs(new_interval as u64),
);
- ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+ ticker.reset_after(Duration::from_secs(new_interval as u64));
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
tracing::error!("backing off, next sync tick in {new_interval}");
@@ -261,9 +267,8 @@ async fn do_sync_tick(
// Reset backoff on success
if ticker.period().as_secs() != settings.daemon.sync_frequency {
*ticker = time::interval_at(
- tokio::time::Instant::now()
- + Duration::from_secs(settings.daemon.sync_frequency),
- time::Duration::from_secs(settings.daemon.sync_frequency),
+ time::Instant::now() + Duration::from_secs(settings.daemon.sync_frequency),
+ Duration::from_secs(settings.daemon.sync_frequency),
);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
}
diff --git a/crates/turtle/src/atuin_daemon/control/mod.rs b/crates/turtle/src/atuin_daemon/control/mod.rs
index 23068519..7015db5b 100644
--- a/crates/turtle/src/atuin_daemon/control/mod.rs
+++ b/crates/turtle/src/atuin_daemon/control/mod.rs
@@ -1,12 +1,76 @@
-//! Control module for external event injection.
+//! Control service implementation.
//!
-//! This module provides the gRPC service that allows external processes
-//! (like CLI commands) to inject events into the daemon's event bus.
+//! This gRPC service allows external processes (like CLI commands) to inject
+//! events into the daemon's event bus.
-mod service;
+use tonic::{Request, Response, Status};
+use tracing::{Level, info, instrument};
-// Include the generated proto code
-tonic::include_proto!("control");
+use crate::{
+ atuin_client::history::HistoryId,
+ atuin_daemon::{
+ daemon::DaemonHandle,
+ events::DaemonEvent,
+ generated::control::{
+ SendEventRequest, SendEventResponse,
+ control_server::{Control, ControlServer},
+ send_event_request::Event,
+ },
+ },
+};
-// Re-export the service
-pub(crate) use service::ControlService;
+/// The Control gRPC service.
+///
+/// This service is used by external processes to inject events into the daemon.
+/// It's not a component - it's part of the daemon's core infrastructure.
+pub(crate) struct ControlService {
+ handle: DaemonHandle,
+}
+
+impl ControlService {
+ /// Create a new control service with the given daemon handle.
+ pub(crate) fn new(handle: DaemonHandle) -> Self {
+ Self { handle }
+ }
+
+ /// Get a tonic server for this service.
+ pub(crate) fn into_server(self) -> ControlServer<Self> {
+ ControlServer::new(self)
+ }
+}
+
+#[tonic::async_trait]
+impl Control for ControlService {
+ #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
+ async fn send_event(
+ &self,
+ request: Request<SendEventRequest>,
+ ) -> Result<Response<SendEventResponse>, Status> {
+ let req = request.into_inner();
+
+ let event = req
+ .event
+ .ok_or_else(|| Status::invalid_argument("event is required"))?;
+
+ let daemon_event = proto_event_to_daemon_event(event)?;
+
+ info!(?daemon_event, "received control event");
+ self.handle.emit(daemon_event);
+
+ Ok(Response::new(SendEventResponse {}))
+ }
+}
+
+/// Convert a proto event to a daemon event.
+fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
+ match event {
+ Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
+ Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
+ Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
+ ids: e.ids.into_iter().map(HistoryId).collect(),
+ }),
+ Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
+ Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
+ Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/control/service.rs b/crates/turtle/src/atuin_daemon/control/service.rs
deleted file mode 100644
index 8061a3c2..00000000
--- a/crates/turtle/src/atuin_daemon/control/service.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-//! Control service implementation.
-//!
-//! This gRPC service allows external processes (like CLI commands) to inject
-//! events into the daemon's event bus.
-
-use crate::atuin_client::history::HistoryId;
-use tonic::{Request, Response, Status};
-use tracing::{Level, info, instrument};
-
-use super::{
- SendEventRequest, SendEventResponse,
- control_server::{Control, ControlServer},
- send_event_request::Event,
-};
-use crate::atuin_daemon::{daemon::DaemonHandle, events::DaemonEvent};
-
-/// The Control gRPC service.
-///
-/// This service is used by external processes to inject events into the daemon.
-/// It's not a component - it's part of the daemon's core infrastructure.
-pub(crate) struct ControlService {
- handle: DaemonHandle,
-}
-
-impl ControlService {
- /// Create a new control service with the given daemon handle.
- pub(crate) fn new(handle: DaemonHandle) -> Self {
- Self { handle }
- }
-
- /// Get a tonic server for this service.
- pub(crate) fn into_server(self) -> ControlServer<Self> {
- ControlServer::new(self)
- }
-}
-
-#[tonic::async_trait]
-impl Control for ControlService {
- #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
- async fn send_event(
- &self,
- request: Request<SendEventRequest>,
- ) -> Result<Response<SendEventResponse>, Status> {
- let req = request.into_inner();
-
- let event = req
- .event
- .ok_or_else(|| Status::invalid_argument("event is required"))?;
-
- let daemon_event = proto_event_to_daemon_event(event)?;
-
- info!(?daemon_event, "received control event");
- self.handle.emit(daemon_event);
-
- Ok(Response::new(SendEventResponse {}))
- }
-}
-
-/// Convert a proto event to a daemon event.
-fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
- match event {
- Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
- Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
- Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
- ids: e.ids.into_iter().map(HistoryId).collect(),
- }),
- Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
- Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
- Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
- }
-}
diff --git a/crates/turtle/src/atuin_daemon/events.rs b/crates/turtle/src/atuin_daemon/events.rs
index 09369512..d379277d 100644
--- a/crates/turtle/src/atuin_daemon/events.rs
+++ b/crates/turtle/src/atuin_daemon/events.rs
@@ -32,8 +32,11 @@ pub(crate) enum DaemonEvent {
/// Sync completed successfully.
SyncCompleted {
/// Number of records uploaded.
+ #[expect(unused)]
uploaded: usize,
+
/// Number of records downloaded.
+ #[expect(unused)]
downloaded: usize,
},
diff --git a/crates/turtle/src/atuin_daemon/generated.rs b/crates/turtle/src/atuin_daemon/generated.rs
new file mode 100644
index 00000000..e43f7523
--- /dev/null
+++ b/crates/turtle/src/atuin_daemon/generated.rs
@@ -0,0 +1,38 @@
+#![allow(
+ unreachable_pub,
+ unused_qualifications,
+ reason = "All of these lints are triggered by the generated code"
+)]
+
+/// Semantic command capture gRPC service types.
+pub(crate) mod semantic {
+ tonic::include_proto!("semantic");
+}
+
+/// Search module for the daemon gRPC search service.
+///
+/// This module provides fuzzy search over command history using Nucleo.
+pub(crate) mod search {
+ // Include the generated proto code
+ tonic::include_proto!("search");
+}
+
+/// History module for the daemon gRPC history service.
+///
+/// This module contains the proto-generated types for the history gRPC service.
+pub(crate) mod history {
+ // Include the generated proto code
+ tonic::include_proto!("history");
+}
+
+/// Control module for external event injection.
+///
+/// This module provides the gRPC service that allows external processes
+/// (like CLI commands) to inject events into the daemon's event bus.
+pub(crate) mod control {
+ // Include the generated proto code
+ tonic::include_proto!("control");
+
+ // Re-export the service
+ pub(crate) use crate::atuin_daemon::control::ControlService;
+}
diff --git a/crates/turtle/src/atuin_daemon/history/mod.rs b/crates/turtle/src/atuin_daemon/history/mod.rs
deleted file mode 100644
index b71853df..00000000
--- a/crates/turtle/src/atuin_daemon/history/mod.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-//! History module for the daemon gRPC history service.
-//!
-//! This module contains the proto-generated types for the history gRPC service.
-
-// Include the generated proto code
-tonic::include_proto!("history");
diff --git a/crates/turtle/src/atuin_daemon/mod.rs b/crates/turtle/src/atuin_daemon/mod.rs
index b161b0cc..5f0f489e 100644
--- a/crates/turtle/src/atuin_daemon/mod.rs
+++ b/crates/turtle/src/atuin_daemon/mod.rs
@@ -8,11 +8,11 @@ pub(crate) mod components;
pub(crate) mod control;
pub(crate) mod daemon;
pub(crate) mod events;
-pub(crate) mod history;
pub(crate) mod search;
-pub(crate) mod semantic;
pub(crate) mod server;
+pub(crate) mod generated;
+
// Re-export core daemon types for convenience
pub(crate) use daemon::Daemon;
pub(crate) use events::DaemonEvent;
@@ -91,21 +91,18 @@ pub(crate) async fn boot(
signal_handle.shutdown();
});
- // Start the gRPC server in the background
server::run_grpc_server(
- settings,
+ &settings,
history_service,
search_service,
semantic_service,
control_service.into_server(),
handle,
- )
- .await?;
+ )?;
- // Run the daemon event loop
daemon.run_event_loop().await?;
- // Stop all components on shutdown
+ // After the event loop exited, we shut-down the components.
daemon.stop_components().await;
tracing::info!("daemon shut down complete");
diff --git a/crates/turtle/src/atuin_daemon/search/index.rs b/crates/turtle/src/atuin_daemon/search/index.rs
deleted file mode 100644
index 197a8c1b..00000000
--- a/crates/turtle/src/atuin_daemon/search/index.rs
+++ /dev/null
@@ -1,678 +0,0 @@
-//! Search index with frecency-based ranking.
-//!
-//! This module provides a deduplicated search index where each unique command
-//! is stored once, with metadata about all its invocations. This enables:
-//!
-//! - Efficient fuzzy matching (fewer items to match)
-//! - Frecency-based ranking (frequency + recency)
-//! - Dynamic filtering by directory, host, session, etc.
-
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
-
-use crate::atuin_client::settings::Search;
-use crate::{
- atuin_client::history::History, atuin_daemon::components::search::with_trailing_slash,
-};
-use atuin_nucleo::{Injector, Nucleo, pattern};
-use dashmap::DashMap;
-use lasso::{Spur, ThreadedRodeo};
-use time::OffsetDateTime;
-use tokio::sync::RwLock;
-use tracing::{Level, instrument};
-use uuid::Uuid;
-
-/// Parse a UUID string into a 16-byte array.
-/// Returns None if the string is not a valid UUID.
-fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> {
- Uuid::parse_str(s).ok().map(|u| *u.as_bytes())
-}
-
-/// Format a 16-byte array as a UUID string.
-fn format_uuid_bytes(bytes: &[u8; 16]) -> String {
- Uuid::from_bytes(*bytes).to_string()
-}
-
-/// Pre-computed frecency data for O(1) lookup.
-#[derive(Debug, Clone, Default)]
-pub(crate) struct FrecencyData {
- /// Total number of times this command was used.
- pub(crate) count: u32,
- /// Most recent usage timestamp (unix seconds).
- pub(crate) last_used: i64,
-}
-
-impl FrecencyData {
- /// Record a new usage of this command.
- pub(crate) fn record_use(&mut self, timestamp: i64) {
- self.count += 1;
- if timestamp > self.last_used {
- self.last_used = timestamp;
- }
- }
-
- /// Compute frecency score based on count and recency.
- ///
- /// Uses a decay function where more recent commands score higher.
- /// The formula balances frequency (how often) with recency (how recent).
- ///
- /// Multipliers allow tuning the relative weights:
- /// - `recency_mul`: Multiplier for recency score (default: 1.0)
- /// - `frequency_mul`: Multiplier for frequency score (default: 1.0)
- ///
- /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight.
- /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc.
- #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")]
- pub(crate) fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 {
- if self.count == 0 {
- return 0;
- }
-
- // Time-based decay: score decreases as time passes
- let age_seconds = (now - self.last_used).max(0) as u64;
- let age_hours = age_seconds / 3600;
-
- // Decay factor: recent commands get higher scores
- // - Last hour: multiplier ~1.0
- // - Last day: multiplier ~0.5
- // - Last week: multiplier ~0.1
- // - Older: multiplier approaches 0
- let recency_score: f64 = match age_hours {
- 0 => 100.0,
- 1..=6 => 90.0,
- 7..=24 => 70.0,
- 25..=72 => 50.0,
- 73..=168 => 30.0,
- 169..=720 => 15.0,
- _ => 5.0,
- };
-
- // Frequency boost: more uses = higher score (with diminishing returns)
- let frequency_score = (f64::from(self.count).ln() * 20.0).min(100.0);
-
- // Apply multipliers and combine scores, then round to u32
- ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32
- }
-}
-
-/// Data for a unique command.
-pub(crate) struct CommandData {
- /// History ID of the most recent invocation (16-byte UUID).
- most_recent_id: [u8; 16],
- /// Timestamp of the most recent invocation.
- most_recent_timestamp: i64,
- /// Pre-computed global frecency.
- pub(crate) global_frecency: FrecencyData,
-
- // Pre-computed indexes for O(1) filter lookups
- // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized)
- /// All directories where this command has been run (interned keys).
- directories: HashSet<Spur>,
- /// All hostnames where this command has been run (interned keys).
- hosts: HashSet<Spur>,
- /// All sessions where this command has been run (as 16-byte UUIDs).
- sessions: HashSet<[u8; 16]>,
-}
-
-impl CommandData {
- /// Create a new [`CommandData`] from a history entry.
- /// Returns None if the history entry has invalid UUIDs.
- pub(crate) fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> {
- let history_id = parse_uuid_bytes(&history.id.0)?;
- let session = parse_uuid_bytes(&history.session)?;
- let timestamp = history.timestamp.unix_timestamp();
-
- let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
- let host_key = interner.get_or_intern(&history.hostname);
-
- let mut directories = HashSet::new();
- directories.insert(dir_key);
-
- let mut hosts = HashSet::new();
- hosts.insert(host_key);
-
- let mut sessions = HashSet::new();
- sessions.insert(session);
-
- let mut global_frecency = FrecencyData::default();
- global_frecency.record_use(timestamp);
-
- Some(Self {
- most_recent_id: history_id,
- most_recent_timestamp: timestamp,
- global_frecency,
- directories,
- hosts,
- sessions,
- })
- }
-
- /// Add an invocation from a history entry.
- /// Returns false if the history entry has invalid UUIDs.
- pub(crate) fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool {
- let Some(history_id) = parse_uuid_bytes(&history.id.0) else {
- return false;
- };
- let Some(session) = parse_uuid_bytes(&history.session) else {
- return false;
- };
-
- let timestamp = history.timestamp.unix_timestamp();
-
- // Update global frecency
- self.global_frecency.record_use(timestamp);
-
- // Update pre-computed indexes for O(1) filter lookups
- let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
- self.directories.insert(dir_key);
- self.hosts.insert(interner.get_or_intern(&history.hostname));
- self.sessions.insert(session);
-
- // Update most recent if this invocation is newer
- if timestamp > self.most_recent_timestamp {
- self.most_recent_id = history_id;
- self.most_recent_timestamp = timestamp;
- }
-
- true
- }
-
- /// Get the most recent history ID for this command.
- pub(crate) fn most_recent_id(&self) -> String {
- format_uuid_bytes(&self.most_recent_id)
- }
-
- /// Check if any invocation matches a directory filter (exact match).
- /// O(1) lookup using pre-computed index.
- pub(crate) fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool {
- interner
- .get(dir)
- .is_some_and(|spur| self.directories.contains(&spur))
- }
-
- /// Check if any invocation matches a directory prefix (workspace/git root).
- /// O(n) where n = number of unique directories for this command.
- pub(crate) fn has_invocation_in_workspace(
- &self,
- prefix: &str,
- interner: &ThreadedRodeo,
- ) -> bool {
- self.directories
- .iter()
- .any(|&spur| interner.resolve(&spur).starts_with(prefix))
- }
-
- /// Check if any invocation matches a hostname.
- /// O(1) lookup using pre-computed index.
- pub(crate) fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool {
- interner
- .get(hostname)
- .is_some_and(|spur| self.hosts.contains(&spur))
- }
-
- /// Check if any invocation matches a session.
- /// O(1) lookup using pre-computed index.
- pub(crate) fn has_invocation_in_session(&self, session: &str) -> bool {
- parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes))
- }
-}
-
-/// Filter mode for search queries.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub(crate) enum IndexFilterMode {
- /// No filtering - search all commands.
- Global,
- /// Filter to commands run in a specific directory.
- Directory(String),
- /// Filter to commands run in a workspace (directory prefix).
- Workspace(String),
- /// Filter to commands run on a specific host.
- Host(String),
- /// Filter to commands run in a specific session.
- Session(String),
-}
-
-/// Context for search queries.
-#[derive(Debug, Clone, Default)]
-pub(crate) struct QueryContext {
- #[expect(dead_code)]
- pub(crate) cwd: Option<String>,
- #[expect(dead_code)]
- pub(crate) git_root: Option<String>,
- #[expect(dead_code)]
- pub(crate) hostname: Option<String>,
- #[expect(dead_code)]
- pub(crate) session_id: Option<String>,
-}
-
-/// Shareable frecency map: command -> frecency score.
-/// Wrapped in Arc for zero-copy sharing with scorer callbacks.
-type FrecencyMap = Arc<HashMap<Arc<str>, u32>>;
-
-/// A deduplicated search index with frecency-based ranking.
-///
-/// Commands are stored by their text, with metadata about all invocations.
-/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
-///
-/// Global frecency is precomputed by a background task and used for scoring.
-/// If frecency data is not available, search still works but without frecency ranking;
-/// although this should never happen due to precomputing the frecency map.
-pub(crate) struct SearchIndex {
- /// Map from command text to command data.
- /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
- /// Keys are Arc<str> to enable zero-copy sharing with frecency_map.
- commands: Arc<DashMap<Arc<str>, CommandData>>,
- /// Nucleo fuzzy matcher - items are command strings.
- nucleo: RwLock<Nucleo<String>>,
- /// Injector for adding new commands to Nucleo.
- injector: Injector<String>,
- /// Precomputed global frecency map. Updated by background task.
- frecency_map: RwLock<Option<FrecencyMap>>,
- /// String interner for deduplicating cwd, hostname, and directory paths.
- interner: Arc<ThreadedRodeo>,
-}
-
-impl SearchIndex {
- /// Create a new empty search index.
- pub(crate) fn new() -> Self {
- let nucleo_config = atuin_nucleo::Config::DEFAULT;
- // Single column for command text
- let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
- let injector = nucleo.injector();
-
- Self {
- commands: Arc::new(DashMap::new()),
- nucleo: RwLock::new(nucleo),
- injector,
- frecency_map: RwLock::new(None),
- interner: Arc::new(ThreadedRodeo::new()),
- }
- }
-
- /// Add a history entry to the index.
- ///
- /// If the command already exists, updates its invocation data.
- /// If it's a new command, adds it to both the map and Nucleo.
- pub(crate) fn add_history(&self, history: &History) {
- let command = history.command.as_str();
-
- // DashMap with Arc<str> keys can be looked up with &str via Borrow trait
- if let Some(mut entry) = self.commands.get_mut(command) {
- // Existing command - just update invocations
- entry.add_invocation(history, &self.interner);
- } else {
- // New command - create Arc<str> once and share it
- let Some(data) = CommandData::new(history, &self.interner) else {
- return; // Invalid UUIDs, skip this entry
- };
- let command_arc: Arc<str> = command.into();
- self.commands.insert(Arc::clone(&command_arc), data);
- // Nucleo still needs String (unavoidable copy for fuzzy matching)
- self.injector.push(command_arc.to_string(), |cmd, cols| {
- cols[0] = cmd.clone().into();
- });
- }
- // Note: frecency_map is rebuilt by background task, not invalidated here
- }
-
- /// Add multiple history entries to the index.
- pub(crate) fn add_histories(&self, histories: &[History]) {
- for history in histories {
- self.add_history(history);
- }
- }
-
- /// Get the number of unique commands in the index.
- pub(crate) fn command_count(&self) -> usize {
- self.commands.len()
- }
-
- /// Search for commands matching a query.
- ///
- /// Returns a list of history IDs (most recent invocation per command).
- /// Uses precomputed global frecency for scoring if available.
- #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))]
- #[expect(
- clippy::significant_drop_tightening,
- reason = "The nucleo early drop is a false-positive"
- )]
- pub(crate) async fn search(
- &self,
- query: &str,
- filter_mode: IndexFilterMode,
- // TODO(@bpeetz): Use the query context here <2026-06-12>
- #[expect(unused)] context: &QueryContext,
- limit: u32,
- ) -> Vec<String> {
- let mut nucleo = self.nucleo.write().await;
-
- // Get precomputed frecency map (may be None if not yet computed)
- let frecency_map = self.frecency_map.read().await.clone();
-
- // Build filter based on mode
- let filter = self.build_filter(&filter_mode);
- nucleo.set_filter(filter);
-
- // Build scorer from precomputed frecency (or None if not available)
- let scorer = Self::build_scorer(frecency_map);
- nucleo.set_scorer(scorer);
-
- // Update pattern
- nucleo.pattern.reparse(
- 0,
- query,
- pattern::CaseMatching::Smart,
- pattern::Normalization::Smart,
- false,
- );
-
- tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
- // Tick until complete
- while nucleo.tick(10).running {}
- });
-
- // Collect results
- let snapshot = nucleo.snapshot();
- let matched_count = snapshot.matched_item_count().min(limit);
-
- tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
- snapshot
- .matched_items(..matched_count)
- .filter_map(|item| {
- let cmd = item.data;
- // DashMap<Arc<str>, _>::get accepts &str via Borrow trait
- self.commands
- .get(cmd.as_str())
- .map(|data| data.most_recent_id())
- })
- .collect()
- })
- }
-
- /// Rebuild the global frecency map.
- ///
- /// This should be called by a background task periodically.
- /// The map is used for scoring search results.
- ///
- /// Uses multipliers from search settings:
- /// - `recency_score_multiplier`: Weight for recency component
- /// - `frequency_score_multiplier`: Weight for frequency component
- /// - `frecency_score_multiplier`: Overall multiplier for final score
- #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")]
- pub(crate) async fn rebuild_frecency(&self, search_settings: &Search) {
- let now = OffsetDateTime::now_utc().unix_timestamp();
- let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new();
-
- // Clamp multipliers to non-negative values to prevent broken frecency ranking
- // (negative values would produce unexpected results when cast to u32)
- let recency_mul = search_settings.recency_score_multiplier.max(0.0);
- let frequency_mul = search_settings.frequency_score_multiplier.max(0.0);
- let frecency_mul = search_settings.frecency_score_multiplier.max(0.0);
-
- for entry in self.commands.iter() {
- let frecency = entry
- .global_frecency
- .compute(now, recency_mul, frequency_mul);
- // Apply overall frecency multiplier and round to u32
- let frecency = (frecency as f64 * frecency_mul).round() as u32;
- // Arc::clone is cheap - just increments reference count
- frecency_map.insert(Arc::clone(entry.key()), frecency);
- }
-
- *self.frecency_map.write().await = Some(Arc::new(frecency_map));
- }
-
- /// Build filter predicate for the given mode.
- fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> {
- // For Global mode, no filter needed
- if matches!(mode, IndexFilterMode::Global) {
- return None;
- }
-
- // Pre-compute which commands pass the filter
- // Use HashSet<String> for the short-lived filter (simpler than Arc lookup)
- let passing_commands: Arc<HashSet<String>> = {
- let mut set = HashSet::new();
- for entry in self.commands.iter() {
- let passes = match mode {
- IndexFilterMode::Global => unreachable!(),
- IndexFilterMode::Directory(dir) => {
- entry.has_invocation_in_dir(dir, &self.interner)
- }
- IndexFilterMode::Workspace(prefix) => {
- entry.has_invocation_in_workspace(prefix, &self.interner)
- }
- IndexFilterMode::Host(hostname) => {
- entry.has_invocation_on_host(hostname, &self.interner)
- }
- IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
- };
- if passes {
- // Convert Arc<str> to String for filter lookup
- set.insert(entry.key().to_string());
- }
- }
- Arc::new(set)
- };
-
- Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
- }
-
- /// Build scorer from precomputed frecency map.
- ///
- /// Returns None if frecency map is not available (search still works, just without frecency ranking).
- fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> {
- let map = frecency_map?;
- Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
- // HashMap<Arc<str>, _>::get accepts &str via Borrow trait
- let frecency = map.get(cmd.as_str()).copied().unwrap_or(0);
- fuzzy_score + frecency
- }))
- }
-}
-
-impl Default for SearchIndex {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use time::macros::datetime;
-
- #[test]
- fn frecency_data_compute() {
- let now = 1_000_000i64;
-
- // Recent command (with default multipliers of 1.0)
- let recent = FrecencyData {
- count: 5,
- last_used: now - 60, // 1 minute ago
- };
- assert!(recent.compute(now, 1.0, 1.0) > 100); // High score
-
- // Old command
- let old = FrecencyData {
- count: 5,
- last_used: now - 86400 * 30, // 30 days ago
- };
- assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0));
-
- // Frequently used old command
- let frequent_old = FrecencyData {
- count: 100,
- last_used: now - 86400 * 7, // 1 week ago
- };
- // Should still have decent score due to frequency
- assert!(frequent_old.compute(now, 1.0, 1.0) > 50);
- }
-
- #[test]
- fn frecency_data_compute_with_multipliers() {
- let now = 1_000_000_i64;
-
- let data = FrecencyData {
- count: 5,
- last_used: now - 60, // 1 minute ago (recency_score = 100)
- };
-
- // Default multipliers (1.0, 1.0)
- let default_score = data.compute(now, 1.0, 1.0);
-
- // Double recency weight
- let double_recency = data.compute(now, 2.0, 1.0);
- assert!(double_recency > default_score);
-
- // Double frequency weight
- let double_frequency = data.compute(now, 1.0, 2.0);
- assert!(double_frequency > default_score);
-
- // Zero out recency (only frequency counts)
- let no_recency = data.compute(now, 0.0, 1.0);
- assert!(no_recency < default_score);
-
- // Zero out frequency (only recency counts)
- let no_frequency = data.compute(now, 1.0, 0.0);
- assert!(no_frequency < default_score);
-
- // Zero both (should be zero)
- let no_score = data.compute(now, 0.0, 0.0);
- assert_eq!(no_score, 0);
-
- // Fractional multipliers
- let half_recency = data.compute(now, 0.5, 1.0);
- assert!(half_recency < default_score);
- assert!(half_recency > no_recency);
-
- // 1.5x multiplier
- let boost_recency = data.compute(now, 1.5, 1.0);
- assert!(boost_recency > default_score);
- assert!(boost_recency < double_recency);
- }
-
- #[test]
- fn command_data_add_invocation() {
- let interner = ThreadedRodeo::new();
-
- let (dir1, dir2) = if cfg!(windows) {
- ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
- } else {
- ("/home/user/project", "/home/user/other")
- };
-
- let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
- let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
-
- let mut data = CommandData::new(&history1, &interner).unwrap();
- assert_eq!(data.global_frecency.count, 1);
- let id1 = data.most_recent_id();
-
- data.add_invocation(&history2, &interner);
- assert_eq!(data.global_frecency.count, 2);
-
- // Most recent ID should update to history2 (newer timestamp)
- let id2 = data.most_recent_id();
- assert_ne!(id1, id2);
- }
-
- #[test]
- fn command_data_filters() {
- let interner = ThreadedRodeo::new();
-
- let (dir1, dir2) = if cfg!(windows) {
- ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
- } else {
- ("/home/user/project", "/home/user/other")
- };
-
- let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
- let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
-
- let mut data = CommandData::new(&h1, &interner).unwrap();
- data.add_invocation(&h2, &interner);
-
- let (check1, check2, check3) = if cfg!(windows) {
- (
- with_trailing_slash("C:\\Users\\User\\project"),
- with_trailing_slash("C:\\Users\\User\\other"),
- with_trailing_slash("C:\\Users\\User\\missing"),
- )
- } else {
- (
- with_trailing_slash("/home/user/project"),
- with_trailing_slash("/home/user/other"),
- with_trailing_slash("/home/user/missing"),
- )
- };
-
- assert!(data.has_invocation_in_dir(&check1, &interner));
- assert!(data.has_invocation_in_dir(&check2, &interner));
- assert!(!data.has_invocation_in_dir(&check3, &interner));
-
- let (check1, check2, check3) = if cfg!(windows) {
- (
- with_trailing_slash("C:\\Users\\User"),
- with_trailing_slash("C:\\Users"),
- with_trailing_slash("C:\\Users\\User\\var"),
- )
- } else {
- (
- with_trailing_slash("/home/user"),
- with_trailing_slash("/home"),
- with_trailing_slash("/var"),
- )
- };
-
- assert!(data.has_invocation_in_workspace(&check1, &interner));
- assert!(data.has_invocation_in_workspace(&check2, &interner));
- assert!(!data.has_invocation_in_workspace(&check3, &interner));
- }
-
- #[tokio::test]
- async fn search_index_add_and_search() {
- let index = SearchIndex::new();
-
- let h1 = make_history(
- "git status",
- "/home/user/project",
- datetime!(2024-01-01 10:00 UTC),
- );
- let h2 = make_history(
- "git commit -m 'test'",
- "/home/user/project",
- datetime!(2024-01-01 10:05 UTC),
- );
- let h3 = make_history(
- "ls -la",
- "/home/user/other",
- datetime!(2024-01-01 10:10 UTC),
- );
-
- index.add_history(&h1);
- index.add_history(&h2);
- index.add_history(&h3);
-
- assert_eq!(index.command_count(), 3);
-
- // Search for "git" - should match 2 commands
- let results = index
- .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
- .await;
- assert_eq!(results.len(), 2);
-
- // Search with directory filter
- let results = index
- .search(
- "",
- IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
- &QueryContext::default(),
- 10,
- )
- .await;
- assert_eq!(results.len(), 2); // git status and git commit
- }
-}
diff --git a/crates/turtle/src/atuin_daemon/search/mod.rs b/crates/turtle/src/atuin_daemon/search/mod.rs
index 51b6c6cc..60d923cd 100644
--- a/crates/turtle/src/atuin_daemon/search/mod.rs
+++ b/crates/turtle/src/atuin_daemon/search/mod.rs
@@ -1,11 +1,678 @@
-//! Search module for the daemon gRPC search service.
+//! Search index with frecency-based ranking.
//!
-//! This module provides fuzzy search over command history using Nucleo.
+//! This module provides a deduplicated search index where each unique command
+//! is stored once, with metadata about all its invocations. This enables:
+//!
+//! - Efficient fuzzy matching (fewer items to match)
+//! - Frecency-based ranking (frequency + recency)
+//! - Dynamic filtering by directory, host, session, etc.
+
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
+
+use crate::atuin_client::settings::Search;
+use crate::{
+ atuin_client::history::History, atuin_daemon::components::search::with_trailing_slash,
+};
+use atuin_nucleo::{Injector, Nucleo, pattern};
+use dashmap::DashMap;
+use lasso::{Spur, ThreadedRodeo};
+use time::OffsetDateTime;
+use tokio::sync::RwLock;
+use tracing::{Level, instrument};
+use uuid::Uuid;
+
+/// Parse a UUID string into a 16-byte array.
+/// Returns None if the string is not a valid UUID.
+fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> {
+ Uuid::parse_str(s).ok().map(|u| *u.as_bytes())
+}
+
+/// Format a 16-byte array as a UUID string.
+fn format_uuid_bytes(bytes: &[u8; 16]) -> String {
+ Uuid::from_bytes(*bytes).to_string()
+}
+
+/// Pre-computed frecency data for O(1) lookup.
+#[derive(Debug, Clone, Default)]
+pub(crate) struct FrecencyData {
+ /// Total number of times this command was used.
+ pub(crate) count: u32,
+ /// Most recent usage timestamp (unix seconds).
+ pub(crate) last_used: i64,
+}
+
+impl FrecencyData {
+ /// Record a new usage of this command.
+ pub(crate) fn record_use(&mut self, timestamp: i64) {
+ self.count += 1;
+ if timestamp > self.last_used {
+ self.last_used = timestamp;
+ }
+ }
+
+ /// Compute frecency score based on count and recency.
+ ///
+ /// Uses a decay function where more recent commands score higher.
+ /// The formula balances frequency (how often) with recency (how recent).
+ ///
+ /// Multipliers allow tuning the relative weights:
+ /// - `recency_mul`: Multiplier for recency score (default: 1.0)
+ /// - `frequency_mul`: Multiplier for frequency score (default: 1.0)
+ ///
+ /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight.
+ /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc.
+ #[instrument(level = Level::TRACE, name = "index_frecency_compute")]
+ pub(crate) fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 {
+ if self.count == 0 {
+ return 0;
+ }
+
+ // Time-based decay: score decreases as time passes
+ let age_seconds = (now - self.last_used).max(0) as u64;
+ let age_hours = age_seconds / 3600;
+
+ // Decay factor: recent commands get higher scores
+ // - Last hour: multiplier ~1.0
+ // - Last day: multiplier ~0.5
+ // - Last week: multiplier ~0.1
+ // - Older: multiplier approaches 0
+ let recency_score: f64 = match age_hours {
+ 0 => 100.0,
+ 1..=6 => 90.0,
+ 7..=24 => 70.0,
+ 25..=72 => 50.0,
+ 73..=168 => 30.0,
+ 169..=720 => 15.0,
+ _ => 5.0,
+ };
+
+ // Frequency boost: more uses = higher score (with diminishing returns)
+ let frequency_score = (f64::from(self.count).ln() * 20.0).min(100.0);
+
+ // Apply multipliers and combine scores, then round to u32
+ ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32
+ }
+}
+
+/// Data for a unique command.
+pub(crate) struct CommandData {
+ /// History ID of the most recent invocation (16-byte UUID).
+ most_recent_id: [u8; 16],
+ /// Timestamp of the most recent invocation.
+ most_recent_timestamp: i64,
+ /// Pre-computed global frecency.
+ pub(crate) global_frecency: FrecencyData,
+
+ // Pre-computed indexes for O(1) filter lookups
+ // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized)
+ /// All directories where this command has been run (interned keys).
+ directories: HashSet<Spur>,
+ /// All hostnames where this command has been run (interned keys).
+ hosts: HashSet<Spur>,
+ /// All sessions where this command has been run (as 16-byte UUIDs).
+ sessions: HashSet<[u8; 16]>,
+}
+
+impl CommandData {
+ /// Create a new [`CommandData`] from a history entry.
+ /// Returns None if the history entry has invalid UUIDs.
+ pub(crate) fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> {
+ let history_id = parse_uuid_bytes(&history.id.0)?;
+ let session = parse_uuid_bytes(&history.session)?;
+ let timestamp = history.timestamp.unix_timestamp();
+
+ let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
+ let host_key = interner.get_or_intern(&history.hostname);
+
+ let mut directories = HashSet::new();
+ directories.insert(dir_key);
+
+ let mut hosts = HashSet::new();
+ hosts.insert(host_key);
+
+ let mut sessions = HashSet::new();
+ sessions.insert(session);
+
+ let mut global_frecency = FrecencyData::default();
+ global_frecency.record_use(timestamp);
+
+ Some(Self {
+ most_recent_id: history_id,
+ most_recent_timestamp: timestamp,
+ global_frecency,
+ directories,
+ hosts,
+ sessions,
+ })
+ }
+
+ /// Add an invocation from a history entry.
+ /// Returns false if the history entry has invalid UUIDs.
+ pub(crate) fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool {
+ let Some(history_id) = parse_uuid_bytes(&history.id.0) else {
+ return false;
+ };
+ let Some(session) = parse_uuid_bytes(&history.session) else {
+ return false;
+ };
+
+ let timestamp = history.timestamp.unix_timestamp();
+
+ // Update global frecency
+ self.global_frecency.record_use(timestamp);
+
+ // Update pre-computed indexes for O(1) filter lookups
+ let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
+ self.directories.insert(dir_key);
+ self.hosts.insert(interner.get_or_intern(&history.hostname));
+ self.sessions.insert(session);
+
+ // Update most recent if this invocation is newer
+ if timestamp > self.most_recent_timestamp {
+ self.most_recent_id = history_id;
+ self.most_recent_timestamp = timestamp;
+ }
+
+ true
+ }
+
+ /// Get the most recent history ID for this command.
+ pub(crate) fn most_recent_id(&self) -> String {
+ format_uuid_bytes(&self.most_recent_id)
+ }
+
+ /// Check if any invocation matches a directory filter (exact match).
+ /// O(1) lookup using pre-computed index.
+ pub(crate) fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool {
+ interner
+ .get(dir)
+ .is_some_and(|spur| self.directories.contains(&spur))
+ }
+
+ /// Check if any invocation matches a directory prefix (workspace/git root).
+ /// O(n) where n = number of unique directories for this command.
+ pub(crate) fn has_invocation_in_workspace(
+ &self,
+ prefix: &str,
+ interner: &ThreadedRodeo,
+ ) -> bool {
+ self.directories
+ .iter()
+ .any(|&spur| interner.resolve(&spur).starts_with(prefix))
+ }
+
+ /// Check if any invocation matches a hostname.
+ /// O(1) lookup using pre-computed index.
+ pub(crate) fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool {
+ interner
+ .get(hostname)
+ .is_some_and(|spur| self.hosts.contains(&spur))
+ }
+
+ /// Check if any invocation matches a session.
+ /// O(1) lookup using pre-computed index.
+ pub(crate) fn has_invocation_in_session(&self, session: &str) -> bool {
+ parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes))
+ }
+}
+
+/// Filter mode for search queries.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum IndexFilterMode {
+ /// No filtering - search all commands.
+ Global,
+ /// Filter to commands run in a specific directory.
+ Directory(String),
+ /// Filter to commands run in a workspace (directory prefix).
+ Workspace(String),
+ /// Filter to commands run on a specific host.
+ Host(String),
+ /// Filter to commands run in a specific session.
+ Session(String),
+}
+
+/// Context for search queries.
+#[derive(Debug, Clone, Default)]
+pub(crate) struct QueryContext {
+ #[expect(dead_code)]
+ pub(crate) cwd: Option<String>,
+ #[expect(dead_code)]
+ pub(crate) git_root: Option<String>,
+ #[expect(dead_code)]
+ pub(crate) hostname: Option<String>,
+ #[expect(dead_code)]
+ pub(crate) session_id: Option<String>,
+}
+
+/// Shareable frecency map: command -> frecency score.
+/// Wrapped in Arc for zero-copy sharing with scorer callbacks.
+type FrecencyMap = Arc<HashMap<Arc<str>, u32>>;
+
+/// A deduplicated search index with frecency-based ranking.
+///
+/// Commands are stored by their text, with metadata about all invocations.
+/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
+///
+/// Global frecency is precomputed by a background task and used for scoring.
+/// If frecency data is not available, search still works but without frecency ranking;
+/// although this should never happen due to precomputing the frecency map.
+pub(crate) struct SearchIndex {
+ /// Map from command text to command data.
+ /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
+ /// Keys are Arc<str> to enable zero-copy sharing with frecency_map.
+ commands: Arc<DashMap<Arc<str>, CommandData>>,
+ /// Nucleo fuzzy matcher - items are command strings.
+ nucleo: RwLock<Nucleo<String>>,
+ /// Injector for adding new commands to Nucleo.
+ injector: Injector<String>,
+ /// Precomputed global frecency map. Updated by background task.
+ frecency_map: RwLock<Option<FrecencyMap>>,
+ /// String interner for deduplicating cwd, hostname, and directory paths.
+ interner: Arc<ThreadedRodeo>,
+}
+
+impl SearchIndex {
+ /// Create a new empty search index.
+ pub(crate) fn new() -> Self {
+ let nucleo_config = atuin_nucleo::Config::DEFAULT;
+ // Single column for command text
+ let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
+ let injector = nucleo.injector();
+
+ Self {
+ commands: Arc::new(DashMap::new()),
+ nucleo: RwLock::new(nucleo),
+ injector,
+ frecency_map: RwLock::new(None),
+ interner: Arc::new(ThreadedRodeo::new()),
+ }
+ }
+
+ /// Add a history entry to the index.
+ ///
+ /// If the command already exists, updates its invocation data.
+ /// If it's a new command, adds it to both the map and Nucleo.
+ pub(crate) fn add_history(&self, history: &History) {
+ let command = history.command.as_str();
+
+ // DashMap with Arc<str> keys can be looked up with &str via Borrow trait
+ if let Some(mut entry) = self.commands.get_mut(command) {
+ // Existing command - just update invocations
+ entry.add_invocation(history, &self.interner);
+ } else {
+ // New command - create Arc<str> once and share it
+ let Some(data) = CommandData::new(history, &self.interner) else {
+ return; // Invalid UUIDs, skip this entry
+ };
+ let command_arc: Arc<str> = command.into();
+ self.commands.insert(Arc::clone(&command_arc), data);
+ // Nucleo still needs String (unavoidable copy for fuzzy matching)
+ self.injector.push(command_arc.to_string(), |cmd, cols| {
+ cols[0] = cmd.clone().into();
+ });
+ }
+ // Note: frecency_map is rebuilt by background task, not invalidated here
+ }
+
+ /// Add multiple history entries to the index.
+ pub(crate) fn add_histories(&self, histories: &[History]) {
+ for history in histories {
+ self.add_history(history);
+ }
+ }
+
+ /// Get the number of unique commands in the index.
+ pub(crate) fn command_count(&self) -> usize {
+ self.commands.len()
+ }
+
+ /// Search for commands matching a query.
+ ///
+ /// Returns a list of history IDs (most recent invocation per command).
+ /// Uses precomputed global frecency for scoring if available.
+ #[instrument(skip_all, level = Level::TRACE, name = "index_search", fields(query = %query))]
+ #[expect(
+ clippy::significant_drop_tightening,
+ reason = "The nucleo early drop is a false-positive"
+ )]
+ pub(crate) async fn search(
+ &self,
+ query: &str,
+ filter_mode: IndexFilterMode,
+ // TODO(@bpeetz): Use the query context here <2026-06-12>
+ #[expect(unused)] context: &QueryContext,
+ limit: u32,
+ ) -> Vec<String> {
+ let mut nucleo = self.nucleo.write().await;
+
+ // Get precomputed frecency map (may be None if not yet computed)
+ let frecency_map = self.frecency_map.read().await.clone();
+
+ // Build filter based on mode
+ let filter = self.build_filter(&filter_mode);
+ nucleo.set_filter(filter);
+
+ // Build scorer from precomputed frecency (or None if not available)
+ let scorer = Self::build_scorer(frecency_map);
+ nucleo.set_scorer(scorer);
+
+ // Update pattern
+ nucleo.pattern.reparse(
+ 0,
+ query,
+ pattern::CaseMatching::Smart,
+ pattern::Normalization::Smart,
+ false,
+ );
+
+ tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
+ // Tick until complete
+ while nucleo.tick(10).running {}
+ });
+
+ // Collect results
+ let snapshot = nucleo.snapshot();
+ let matched_count = snapshot.matched_item_count().min(limit);
+
+ tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
+ snapshot
+ .matched_items(..matched_count)
+ .filter_map(|item| {
+ let cmd = item.data;
+ // DashMap<Arc<str>, _>::get accepts &str via Borrow trait
+ self.commands
+ .get(cmd.as_str())
+ .map(|data| data.most_recent_id())
+ })
+ .collect()
+ })
+ }
+
+ /// Rebuild the global frecency map.
+ ///
+ /// This should be called by a background task periodically.
+ /// The map is used for scoring search results.
+ ///
+ /// Uses multipliers from search settings:
+ /// - `recency_score_multiplier`: Weight for recency component
+ /// - `frequency_score_multiplier`: Weight for frequency component
+ /// - `frecency_score_multiplier`: Overall multiplier for final score
+ #[instrument(skip_all, level = Level::DEBUG, name = "rebuild_frecency")]
+ pub(crate) async fn rebuild_frecency(&self, search_settings: &Search) {
+ let now = OffsetDateTime::now_utc().unix_timestamp();
+ let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new();
+
+ // Clamp multipliers to non-negative values to prevent broken frecency ranking
+ // (negative values would produce unexpected results when cast to u32)
+ let recency_mul = search_settings.recency_score_multiplier.max(0.0);
+ let frequency_mul = search_settings.frequency_score_multiplier.max(0.0);
+ let frecency_mul = search_settings.frecency_score_multiplier.max(0.0);
+
+ for entry in self.commands.iter() {
+ let frecency = entry
+ .global_frecency
+ .compute(now, recency_mul, frequency_mul);
+ // Apply overall frecency multiplier and round to u32
+ let frecency = (frecency as f64 * frecency_mul).round() as u32;
+ // Arc::clone is cheap - just increments reference count
+ frecency_map.insert(Arc::clone(entry.key()), frecency);
+ }
+
+ *self.frecency_map.write().await = Some(Arc::new(frecency_map));
+ }
+
+ /// Build filter predicate for the given mode.
+ fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> {
+ // For Global mode, no filter needed
+ if matches!(mode, IndexFilterMode::Global) {
+ return None;
+ }
+
+ // Pre-compute which commands pass the filter
+ // Use HashSet<String> for the short-lived filter (simpler than Arc lookup)
+ let passing_commands: Arc<HashSet<String>> = {
+ let mut set = HashSet::new();
+ for entry in self.commands.iter() {
+ let passes = match mode {
+ IndexFilterMode::Global => unreachable!(),
+ IndexFilterMode::Directory(dir) => {
+ entry.has_invocation_in_dir(dir, &self.interner)
+ }
+ IndexFilterMode::Workspace(prefix) => {
+ entry.has_invocation_in_workspace(prefix, &self.interner)
+ }
+ IndexFilterMode::Host(hostname) => {
+ entry.has_invocation_on_host(hostname, &self.interner)
+ }
+ IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
+ };
+ if passes {
+ // Convert Arc<str> to String for filter lookup
+ set.insert(entry.key().to_string());
+ }
+ }
+ Arc::new(set)
+ };
+
+ Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
+ }
+
+ /// Build scorer from precomputed frecency map.
+ ///
+ /// Returns None if frecency map is not available (search still works, just without frecency ranking).
+ fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> {
+ let map = frecency_map?;
+ Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
+ // HashMap<Arc<str>, _>::get accepts &str via Borrow trait
+ let frecency = map.get(cmd.as_str()).copied().unwrap_or(0);
+ fuzzy_score + frecency
+ }))
+ }
+}
+
+impl Default for SearchIndex {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::macros::datetime;
+
+ #[test]
+ fn frecency_data_compute() {
+ let now = 1_000_000i64;
+
+ // Recent command (with default multipliers of 1.0)
+ let recent = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago
+ };
+ assert!(recent.compute(now, 1.0, 1.0) > 100); // High score
+
+ // Old command
+ let old = FrecencyData {
+ count: 5,
+ last_used: now - 86400 * 30, // 30 days ago
+ };
+ assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0));
+
+ // Frequently used old command
+ let frequent_old = FrecencyData {
+ count: 100,
+ last_used: now - 86400 * 7, // 1 week ago
+ };
+ // Should still have decent score due to frequency
+ assert!(frequent_old.compute(now, 1.0, 1.0) > 50);
+ }
+
+ #[test]
+ fn frecency_data_compute_with_multipliers() {
+ let now = 1_000_000_i64;
+
+ let data = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago (recency_score = 100)
+ };
+
+ // Default multipliers (1.0, 1.0)
+ let default_score = data.compute(now, 1.0, 1.0);
+
+ // Double recency weight
+ let double_recency = data.compute(now, 2.0, 1.0);
+ assert!(double_recency > default_score);
+
+ // Double frequency weight
+ let double_frequency = data.compute(now, 1.0, 2.0);
+ assert!(double_frequency > default_score);
+
+ // Zero out recency (only frequency counts)
+ let no_recency = data.compute(now, 0.0, 1.0);
+ assert!(no_recency < default_score);
+
+ // Zero out frequency (only recency counts)
+ let no_frequency = data.compute(now, 1.0, 0.0);
+ assert!(no_frequency < default_score);
+
+ // Zero both (should be zero)
+ let no_score = data.compute(now, 0.0, 0.0);
+ assert_eq!(no_score, 0);
+
+ // Fractional multipliers
+ let half_recency = data.compute(now, 0.5, 1.0);
+ assert!(half_recency < default_score);
+ assert!(half_recency > no_recency);
+
+ // 1.5x multiplier
+ let boost_recency = data.compute(now, 1.5, 1.0);
+ assert!(boost_recency > default_score);
+ assert!(boost_recency < double_recency);
+ }
+
+ #[test]
+ fn command_data_add_invocation() {
+ let interner = ThreadedRodeo::new();
+
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&history1, &interner).unwrap();
+ assert_eq!(data.global_frecency.count, 1);
+ let id1 = data.most_recent_id();
+
+ data.add_invocation(&history2, &interner);
+ assert_eq!(data.global_frecency.count, 2);
+
+ // Most recent ID should update to history2 (newer timestamp)
+ let id2 = data.most_recent_id();
+ assert_ne!(id1, id2);
+ }
+
+ #[test]
+ fn command_data_filters() {
+ let interner = ThreadedRodeo::new();
+
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&h1, &interner).unwrap();
+ data.add_invocation(&h2, &interner);
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User\\project"),
+ with_trailing_slash("C:\\Users\\User\\other"),
+ with_trailing_slash("C:\\Users\\User\\missing"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user/project"),
+ with_trailing_slash("/home/user/other"),
+ with_trailing_slash("/home/user/missing"),
+ )
+ };
+
+ assert!(data.has_invocation_in_dir(&check1, &interner));
+ assert!(data.has_invocation_in_dir(&check2, &interner));
+ assert!(!data.has_invocation_in_dir(&check3, &interner));
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User"),
+ with_trailing_slash("C:\\Users"),
+ with_trailing_slash("C:\\Users\\User\\var"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user"),
+ with_trailing_slash("/home"),
+ with_trailing_slash("/var"),
+ )
+ };
+
+ assert!(data.has_invocation_in_workspace(&check1, &interner));
+ assert!(data.has_invocation_in_workspace(&check2, &interner));
+ assert!(!data.has_invocation_in_workspace(&check3, &interner));
+ }
+
+ #[tokio::test]
+ async fn search_index_add_and_search() {
+ let index = SearchIndex::new();
+
+ let h1 = make_history(
+ "git status",
+ "/home/user/project",
+ datetime!(2024-01-01 10:00 UTC),
+ );
+ let h2 = make_history(
+ "git commit -m 'test'",
+ "/home/user/project",
+ datetime!(2024-01-01 10:05 UTC),
+ );
+ let h3 = make_history(
+ "ls -la",
+ "/home/user/other",
+ datetime!(2024-01-01 10:10 UTC),
+ );
+
+ index.add_history(&h1);
+ index.add_history(&h2);
+ index.add_history(&h3);
-mod index;
+ assert_eq!(index.command_count(), 3);
-// Include the generated proto code
-tonic::include_proto!("search");
+ // Search for "git" - should match 2 commands
+ let results = index
+ .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
+ .await;
+ assert_eq!(results.len(), 2);
-// Re-export the service and index
-pub(crate) use index::{IndexFilterMode, QueryContext, SearchIndex};
+ // Search with directory filter
+ let results = index
+ .search(
+ "",
+ IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
+ &QueryContext::default(),
+ 10,
+ )
+ .await;
+ assert_eq!(results.len(), 2); // git status and git commit
+ }
+}
diff --git a/crates/turtle/src/atuin_daemon/semantic/mod.rs b/crates/turtle/src/atuin_daemon/semantic/mod.rs
deleted file mode 100644
index c3511676..00000000
--- a/crates/turtle/src/atuin_daemon/semantic/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-//! Semantic command capture gRPC service types.
-
-tonic::include_proto!("semantic");
diff --git a/crates/turtle/src/atuin_daemon/server.rs b/crates/turtle/src/atuin_daemon/server.rs
index 65272d2a..36954cca 100644
--- a/crates/turtle/src/atuin_daemon/server.rs
+++ b/crates/turtle/src/atuin_daemon/server.rs
@@ -1,23 +1,28 @@
use eyre::Result;
-use crate::atuin_daemon::components::history::HistoryGrpcService;
-use crate::atuin_daemon::components::search::SearchGrpcService;
-use crate::atuin_daemon::components::semantic::SemanticGrpcService;
-use crate::atuin_daemon::control::{ControlService, control_server::ControlServer};
-use crate::atuin_daemon::daemon::DaemonHandle;
-use crate::atuin_daemon::history::history_server::HistoryServer;
-use crate::atuin_daemon::search::search_server::SearchServer;
-use crate::atuin_daemon::semantic::semantic_server::SemanticServer;
-
-use crate::atuin_client::settings::Settings;
+use crate::{
+ atuin_client::settings::Settings,
+ atuin_daemon::{
+ components::{
+ history::HistoryGrpcService, search::SearchGrpcService, semantic::SemanticGrpcService,
+ },
+ daemon::DaemonHandle,
+ generated::{
+ control::{ControlService, control_server::ControlServer},
+ history::history_server::HistoryServer,
+ search::search_server::SearchServer,
+ semantic::semantic_server::SemanticServer,
+ },
+ },
+};
/// Run the gRPC server with the given services.
///
/// This starts the gRPC server in the background and returns immediately.
-/// The server will shut down when a ShutdownRequested event is received.
+/// The server will shut down when a [`ShutdownRequested`] event is received.
#[cfg(unix)]
-pub(crate) async fn run_grpc_server(
- settings: Settings,
+pub(crate) fn run_grpc_server(
+ settings: &Settings,
history_service: HistoryServer<HistoryGrpcService>,
search_service: SearchServer<SearchGrpcService>,
semantic_service: SemanticServer<SemanticGrpcService>,
@@ -75,15 +80,16 @@ pub(crate) async fn run_grpc_server(
// Create shutdown signal from daemon handle
let shutdown_signal = async move {
let mut rx = handle.subscribe();
+
loop {
use crate::atuin_daemon::DaemonEvent;
match rx.recv().await {
- Ok(DaemonEvent::ShutdownRequested) => break,
- Ok(_) => continue,
- Err(_) => break, // Channel closed
+ Err(_) | Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => (),
}
}
+
if cleanup {
eprintln!("Removing socket...");
if let Err(e) = std::fs::remove_file(&socket_path)