aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/atuin-daemon
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/atuin-daemon')
-rw-r--r--crates/atuin-daemon/Cargo.toml52
-rw-r--r--crates/atuin-daemon/build.rs25
-rw-r--r--crates/atuin-daemon/proto/control.proto62
-rw-r--r--crates/atuin-daemon/proto/history.proto81
-rw-r--r--crates/atuin-daemon/proto/search.proto35
-rw-r--r--crates/atuin-daemon/proto/semantic.proto47
-rw-r--r--crates/atuin-daemon/src/client.rs518
-rw-r--r--crates/atuin-daemon/src/components/history.rs327
-rw-r--r--crates/atuin-daemon/src/components/mod.rs25
-rw-r--r--crates/atuin-daemon/src/components/search.rs413
-rw-r--r--crates/atuin-daemon/src/components/semantic.rs900
-rw-r--r--crates/atuin-daemon/src/components/sync.rs279
-rw-r--r--crates/atuin-daemon/src/control/mod.rs12
-rw-r--r--crates/atuin-daemon/src/control/service.rs71
-rw-r--r--crates/atuin-daemon/src/daemon.rs458
-rw-r--r--crates/atuin-daemon/src/events.rs74
-rw-r--r--crates/atuin-daemon/src/history/mod.rs6
-rw-r--r--crates/atuin-daemon/src/lib.rs136
-rw-r--r--crates/atuin-daemon/src/search/index.rs683
-rw-r--r--crates/atuin-daemon/src/search/mod.rs11
-rw-r--r--crates/atuin-daemon/src/semantic/mod.rs3
-rw-r--r--crates/atuin-daemon/src/server.rs170
-rw-r--r--crates/atuin-daemon/tests/lifecycle.rs222
23 files changed, 0 insertions, 4610 deletions
diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml
deleted file mode 100644
index e767d3c9..00000000
--- a/crates/atuin-daemon/Cargo.toml
+++ /dev/null
@@ -1,52 +0,0 @@
-[package]
-name = "atuin-daemon"
-edition = "2024"
-version = { workspace = true }
-description = "The daemon crate for Atuin"
-
-authors.workspace = true
-rust-version.workspace = true
-license.workspace = true
-homepage.workspace = true
-repository.workspace = true
-readme.workspace = true
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-atuin-client = { path = "../atuin-client", version = "18.16.1" }
-atuin-common = { path = "../atuin-common", version = "18.16.1" }
-atuin-history = { path = "../atuin-history", version = "18.16.1" }
-
-time = { workspace = true }
-uuid = { workspace = true }
-tokio = { workspace = true }
-tower = { workspace = true }
-eyre = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
-
-dashmap = "6.1.0"
-lasso = { version = "0.7", features = ["multi-threaded"] }
-tonic-types = "0.14"
-tonic = "0.14"
-tonic-prost = "0.14"
-prost = "0.14"
-prost-types = "0.14"
-tokio-stream = { version = "0.1.14", features = ["net"] }
-hyper-util = "0.1"
-
-rand.workspace = true
-atuin-nucleo = { workspace = true }
-
-
-[target.'cfg(target_os = "linux")'.dependencies]
-listenfd = "1.0.1"
-
-[dev-dependencies]
-tempfile = { workspace = true }
-
-[build-dependencies]
-protox = "0.9"
-tonic-build = "0.14"
-tonic-prost-build = "0.14"
diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs
deleted file mode 100644
index 7808a07b..00000000
--- a/crates/atuin-daemon/build.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use std::{env, fs, path::PathBuf};
-
-use protox::prost::Message;
-
-fn main() -> std::io::Result<()> {
- let proto_paths = [
- "proto/history.proto",
- "proto/search.proto",
- "proto/control.proto",
- "proto/semantic.proto",
- ];
- let proto_include_dirs = ["proto"];
-
- let file_descriptors = protox::compile(proto_paths, proto_include_dirs).unwrap();
-
- let file_descriptor_path = PathBuf::from(env::var_os("OUT_DIR").expect("OUT_DIR not set"))
- .join("file_descriptor_set.bin");
- fs::write(&file_descriptor_path, file_descriptors.encode_to_vec()).unwrap();
-
- tonic_prost_build::configure()
- .build_server(true)
- .file_descriptor_set_path(&file_descriptor_path)
- .skip_protoc_run()
- .compile_protos(&proto_paths, &proto_include_dirs)
-}
diff --git a/crates/atuin-daemon/proto/control.proto b/crates/atuin-daemon/proto/control.proto
deleted file mode 100644
index 06347902..00000000
--- a/crates/atuin-daemon/proto/control.proto
+++ /dev/null
@@ -1,62 +0,0 @@
-syntax = "proto3";
-package control;
-
-// The Control service allows external processes (CLI commands, etc.)
-// to inject events into the running daemon.
-service Control {
- // Send an event to the daemon's event bus
- rpc SendEvent(SendEventRequest) returns (SendEventResponse);
-}
-
-message SendEventRequest {
- oneof event {
- // History was pruned - search index needs full rebuild
- HistoryPrunedEvent history_pruned = 1;
-
- // Specific history items were deleted
- HistoryDeletedEvent history_deleted = 2;
-
- // Request immediate sync
- ForceSyncEvent force_sync = 3;
-
- // Settings have changed, reload if needed
- SettingsReloadedEvent settings_reloaded = 4;
-
- // Request graceful shutdown
- ShutdownEvent shutdown = 5;
-
- // History was rebuilt - search index needs full rebuild
- HistoryRebuiltEvent history_rebuilt = 6;
- }
-}
-
-message SendEventResponse {
- // Empty on success; errors come through gRPC status
-}
-
-// Individual event message types
-
-message HistoryPrunedEvent {
- // No fields needed - just signals that pruning happened
-}
-
-message HistoryRebuiltEvent {
- // No fields needed - just signals that rebuilding happened
-}
-
-message HistoryDeletedEvent {
- // IDs of deleted history items (UUIDs as strings)
- repeated string ids = 1;
-}
-
-message ForceSyncEvent {
- // No fields needed - just triggers sync
-}
-
-message SettingsReloadedEvent {
- // No fields needed - components should re-read settings
-}
-
-message ShutdownEvent {
- // No fields needed - triggers graceful shutdown
-}
diff --git a/crates/atuin-daemon/proto/history.proto b/crates/atuin-daemon/proto/history.proto
deleted file mode 100644
index 59c12471..00000000
--- a/crates/atuin-daemon/proto/history.proto
+++ /dev/null
@@ -1,81 +0,0 @@
-syntax = "proto3";
-package history;
-
-message StartHistoryRequest {
- // If people are still using my software in ~530 years, they can figure out a u128 migration
- uint64 timestamp = 1; // nanosecond unix epoch
- string command = 2;
- string cwd = 3;
- string session = 4;
- string hostname = 5;
- string author = 6;
- string intent = 7;
-}
-
-message EndHistoryRequest {
- string id = 1;
- int64 exit = 2;
- uint64 duration = 3;
-}
-
-message StartHistoryReply {
- string id = 1;
- string version = 2;
- uint32 protocol = 3;
-}
-
-message EndHistoryReply {
- string id = 1;
- uint64 idx = 2;
- string version = 3;
- uint32 protocol = 4;
-}
-
-message StatusRequest {}
-
-message StatusReply {
- bool healthy = 1;
- string version = 2;
- uint32 pid = 3;
- uint32 protocol = 4;
-}
-
-message ShutdownRequest {}
-
-message ShutdownReply {
- bool accepted = 1;
-}
-
-message TailHistoryRequest {}
-
-enum HistoryEventKind {
- HISTORY_EVENT_KIND_UNSPECIFIED = 0;
- HISTORY_EVENT_KIND_STARTED = 1;
- HISTORY_EVENT_KIND_ENDED = 2;
-}
-
-message HistoryEntry {
- uint64 timestamp = 1; // nanosecond unix epoch
- string id = 2;
- string command = 3;
- string cwd = 4;
- string session = 5;
- string hostname = 6;
- string author = 7;
- string intent = 8;
- int64 exit = 9;
- int64 duration = 10;
-}
-
-message TailHistoryReply {
- HistoryEventKind kind = 1;
- HistoryEntry history = 2;
-}
-
-service History {
- rpc StartHistory(StartHistoryRequest) returns (StartHistoryReply);
- rpc EndHistory(EndHistoryRequest) returns (EndHistoryReply);
- rpc TailHistory(TailHistoryRequest) returns (stream TailHistoryReply);
- rpc Status(StatusRequest) returns (StatusReply);
- rpc Shutdown(ShutdownRequest) returns (ShutdownReply);
-}
diff --git a/crates/atuin-daemon/proto/search.proto b/crates/atuin-daemon/proto/search.proto
deleted file mode 100644
index 6b84acbd..00000000
--- a/crates/atuin-daemon/proto/search.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-syntax = "proto3";
-package search;
-
-enum FilterMode {
- GLOBAL = 0;
- HOST = 1;
- SESSION = 2;
- DIRECTORY = 3;
- WORKSPACE = 4;
- SESSION_PRELOAD = 5;
-}
-
-message SearchContext {
- string session_id = 1;
- string cwd = 2;
- string hostname = 3;
- string host_id = 4;
- optional string git_root = 5;
-}
-
-message SearchRequest {
- string query = 1;
- uint64 query_id = 2; // Incrementing ID to match responses to queries
- FilterMode filter_mode = 3;
- SearchContext context = 4;
-}
-
-message SearchResponse {
- uint64 query_id = 1; // Echo back the query ID
- repeated bytes ids = 2;
-}
-
-service Search {
- rpc Search(stream SearchRequest) returns (stream SearchResponse);
-}
diff --git a/crates/atuin-daemon/proto/semantic.proto b/crates/atuin-daemon/proto/semantic.proto
deleted file mode 100644
index 07e550c8..00000000
--- a/crates/atuin-daemon/proto/semantic.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-syntax = "proto3";
-package semantic;
-
-service Semantic {
- rpc RecordCommands(stream CommandCapture) returns (RecordCommandsReply);
- rpc CommandOutput(CommandOutputRequest) returns (CommandOutputReply);
-}
-
-message CommandCapture {
- string prompt = 1;
- string command = 2;
- string output = 3;
- optional int32 exit_code = 4;
- optional string history_id = 5;
- optional string session_id = 6;
- bool output_truncated = 7;
- uint64 output_observed_bytes = 8;
-}
-
-message RecordCommandsReply {
- uint64 accepted = 1;
-}
-
-message CommandOutputRequest {
- string history_id = 1;
- repeated OutputRange ranges = 2;
-}
-
-message OutputRange {
- int64 start = 1;
- int64 end = 2;
-}
-
-message OutputLine {
- uint64 line_number = 1;
- string content = 2;
-}
-
-message CommandOutputReply {
- bool found = 1;
- string output = 2;
- uint64 total_bytes = 3;
- uint64 total_lines = 4;
- repeated OutputLine lines = 5;
- bool output_truncated = 6;
- uint64 output_observed_bytes = 7;
-}
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
deleted file mode 100644
index c18e0e46..00000000
--- a/crates/atuin-daemon/src/client.rs
+++ /dev/null
@@ -1,518 +0,0 @@
-use atuin_client::database::Context;
-use atuin_client::settings::{FilterMode, Settings};
-use eyre::{Context as EyreContext, Result};
-#[cfg(windows)]
-use tokio::net::TcpStream;
-use tonic::Code;
-use tonic::transport::{Channel, Endpoint, Uri};
-use tower::service_fn;
-
-use hyper_util::rt::TokioIo;
-
-#[cfg(unix)]
-use tokio::net::UnixStream;
-
-use atuin_client::history::History;
-use tracing::{Level, instrument, span};
-
-use crate::control::HistoryRebuiltEvent;
-use crate::control::{
- ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
- SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
-};
-use crate::events::DaemonEvent;
-use crate::history::{
- EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
- StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest,
- history_client::HistoryClient as HistoryServiceClient,
-};
-use crate::search::{
- FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
- search_client::SearchClient as SearchServiceClient,
-};
-use crate::semantic::{
- CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply,
- semantic_client::SemanticClient as SemanticServiceClient,
-};
-
-pub struct HistoryClient {
- client: HistoryServiceClient<Channel>,
-}
-
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub enum DaemonClientErrorKind {
- Connect,
- Unavailable,
- Unimplemented,
- Other,
-}
-
-#[must_use]
-pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind {
- for cause in error.chain() {
- if cause.downcast_ref::<tonic::transport::Error>().is_some() {
- return DaemonClientErrorKind::Connect;
- }
-
- if let Some(status) = cause.downcast_ref::<tonic::Status>() {
- return match status.code() {
- Code::Unavailable => DaemonClientErrorKind::Unavailable,
- Code::Unimplemented => DaemonClientErrorKind::Unimplemented,
- _ => DaemonClientErrorKind::Other,
- };
- }
- }
-
- DaemonClientErrorKind::Other
-}
-
-// Wrap the grpc client
-impl HistoryClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- use eyre::Context;
-
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = HistoryServiceClient::new(channel);
-
- Ok(HistoryClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = HistoryServiceClient::new(channel);
-
- Ok(HistoryClient { client })
- }
-
- pub async fn start_history(&mut self, h: History) -> Result<StartHistoryReply> {
- let req = StartHistoryRequest {
- command: h.command,
- cwd: h.cwd,
- hostname: h.hostname,
- session: h.session,
- timestamp: h.timestamp.unix_timestamp_nanos() as u64,
- author: h.author,
- intent: h.intent.unwrap_or_default(),
- };
-
- Ok(self.client.start_history(req).await?.into_inner())
- }
-
- pub async fn end_history(
- &mut self,
- id: String,
- duration: u64,
- exit: i64,
- ) -> Result<EndHistoryReply> {
- let req = EndHistoryRequest { id, duration, exit };
-
- Ok(self.client.end_history(req).await?.into_inner())
- }
-
- pub async fn status(&mut self) -> Result<StatusReply> {
- Ok(self.client.status(StatusRequest {}).await?.into_inner())
- }
-
- pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> {
- Ok(self
- .client
- .tail_history(TailHistoryRequest {})
- .await?
- .into_inner())
- }
-
- pub async fn shutdown(&mut self) -> Result<bool> {
- let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner();
- Ok(resp.accepted)
- }
-}
-
-pub struct SearchClient {
- client: SearchServiceClient<Channel>,
-}
-
-impl SearchClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = SearchServiceClient::new(channel);
-
- Ok(SearchClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = SearchServiceClient::new(channel);
-
- Ok(SearchClient { client })
- }
-
- #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))]
- pub async fn search(
- &mut self,
- query: String,
- query_id: u64,
- filter_mode: FilterMode,
- context: Option<Context>,
- ) -> Result<tonic::Streaming<SearchResponse>> {
- let request = SearchRequest {
- query,
- query_id,
- filter_mode: RpcFilterMode::from(filter_mode).into(),
- context: context.map(RpcSearchContext::from),
- };
- let request_stream = tokio_stream::once(request);
- let response = span!(Level::TRACE, "daemon_client_search.request")
- .in_scope(async || self.client.search(request_stream).await)
- .await?;
-
- Ok(response.into_inner())
- }
-}
-
-impl From<FilterMode> for RpcFilterMode {
- fn from(filter_mode: FilterMode) -> Self {
- match filter_mode {
- FilterMode::Global => RpcFilterMode::Global,
- FilterMode::Host => RpcFilterMode::Host,
- FilterMode::Session => RpcFilterMode::Session,
- FilterMode::Directory => RpcFilterMode::Directory,
- FilterMode::Workspace => RpcFilterMode::Workspace,
- FilterMode::SessionPreload => RpcFilterMode::SessionPreload,
- }
- }
-}
-
-impl From<Context> for RpcSearchContext {
- fn from(context: Context) -> Self {
- RpcSearchContext {
- session_id: context.session,
- cwd: context.cwd,
- hostname: context.hostname,
- host_id: context.host_id,
- git_root: context
- .git_root
- .map(|path| path.to_string_lossy().to_string()),
- }
- }
-}
-
-pub struct SemanticClient {
- client: SemanticServiceClient<Channel>,
-}
-
-impl SemanticClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = SemanticServiceClient::new(channel);
-
- Ok(SemanticClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = SemanticServiceClient::new(channel);
-
- Ok(SemanticClient { client })
- }
-
- #[cfg(unix)]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.socket_path.clone()).await
- }
-
- #[cfg(not(unix))]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.tcp_port).await
- }
-
- pub async fn record_commands(
- &mut self,
- captures: Vec<CommandCapture>,
- ) -> Result<RecordCommandsReply> {
- let stream = tokio_stream::iter(captures);
- Ok(self.client.record_commands(stream).await?.into_inner())
- }
-
- pub async fn command_output(
- &mut self,
- history_id: String,
- ranges: Vec<(i64, i64)>,
- ) -> Result<CommandOutputReply> {
- let request = CommandOutputRequest {
- history_id,
- ranges: ranges
- .into_iter()
- .map(|(start, end)| OutputRange { start, end })
- .collect(),
- };
-
- Ok(self.client.command_output(request).await?.into_inner())
- }
-}
-
-// ============================================================================
-// Control Client
-// ============================================================================
-
-/// Client for the Control gRPC service.
-///
-/// Used to inject events into a running daemon from external processes.
-pub struct ControlClient {
- client: ControlServiceClient<Channel>,
-}
-
-impl ControlClient {
- /// Connect to the daemon's control service.
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = ControlServiceClient::new(channel);
-
- Ok(ControlClient { client })
- }
-
- /// Connect to the daemon's control service.
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = ControlServiceClient::new(channel);
-
- Ok(ControlClient { client })
- }
-
- /// Connect using settings.
- #[cfg(unix)]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.socket_path.clone()).await
- }
-
- /// Connect using settings.
- #[cfg(not(unix))]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.tcp_port).await
- }
-
- /// Send an event to the daemon.
- pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> {
- let proto_event = daemon_event_to_proto(event);
- let request = SendEventRequest {
- event: Some(proto_event),
- };
- self.client.send_event(request).await?;
- Ok(())
- }
-}
-
-/// Convert a daemon event to its proto representation.
-fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event {
- use crate::control::send_event_request::Event;
-
- match event {
- DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
- DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}),
- DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent {
- ids: ids.into_iter().map(|id| id.0).collect(),
- }),
- DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}),
- DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}),
- DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}),
- // These events are internal and not sent via the control service
- DaemonEvent::HistoryStarted(_)
- | DaemonEvent::HistoryEnded(_)
- | DaemonEvent::RecordsAdded(_)
- | DaemonEvent::SyncCompleted { .. }
- | DaemonEvent::SyncFailed { .. } => {
- // Use shutdown as a fallback, though this shouldn't happen
- tracing::warn!("attempted to send internal event via control service");
- Event::Shutdown(ShutdownEvent {})
- }
- }
-}
-
-// ============================================================================
-// Convenience Functions
-// ============================================================================
-
-/// Emit an event to the daemon.
-///
-/// This is a fire-and-forget helper for sending events to the daemon from
-/// external processes like CLI commands. If the daemon isn't running, this
-/// will silently succeed (returns Ok).
-///
-/// # Example
-///
-/// ```ignore
-/// // After pruning history
-/// emit_event(DaemonEvent::HistoryPruned).await?;
-///
-/// // After deleting specific history items
-/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?;
-///
-/// // Request immediate sync
-/// emit_event(DaemonEvent::ForceSync).await?;
-/// ```
-pub async fn emit_event(event: DaemonEvent) -> Result<()> {
- emit_event_with_settings(event, None).await
-}
-
-/// Emit an event to the daemon with explicit settings.
-///
-/// If settings are not provided, they will be loaded from the default location.
-/// If the daemon isn't running, this will silently succeed.
-pub async fn emit_event_with_settings(
- event: DaemonEvent,
- settings: Option<&Settings>,
-) -> Result<()> {
- // Load settings if not provided
- let owned_settings;
- let settings = match settings {
- Some(s) => s,
- None => {
- owned_settings = Settings::new()?;
- &owned_settings
- }
- };
-
- // Try to connect - if daemon isn't running, that's fine
- let mut client = match ControlClient::from_settings(settings).await {
- Ok(c) => c,
- Err(e) => {
- tracing::debug!(?e, "daemon not running, skipping event emission");
- return Ok(());
- }
- };
-
- // Send the event
- if let Err(e) = client.send_event(event).await {
- tracing::debug!(?e, "failed to send event to daemon");
- // Don't fail - this is fire-and-forget
- }
-
- Ok(())
-}
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
deleted file mode 100644
index c82c8f94..00000000
--- a/crates/atuin-daemon/src/components/history.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! History component.
-//!
-//! Handles command history lifecycle (start/end) and provides the History gRPC service.
-
-use std::{pin::Pin, sync::Arc};
-
-use atuin_client::{
- database::Database,
- history::{History, HistoryId, store::HistoryStore},
- settings::Settings,
-};
-use dashmap::DashMap;
-use eyre::Result;
-use time::OffsetDateTime;
-use tokio_stream::Stream;
-use tonic::{Request, Response, Status};
-use tracing::{Level, instrument};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- history::{
- EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
- ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
- TailHistoryReply, TailHistoryRequest,
- history_server::{History as HistorySvc, HistoryServer},
- },
-};
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-/// History component - manages command history lifecycle.
-///
-/// This component:
-/// - Tracks currently running commands (stored in memory)
-/// - Saves completed commands to the database and record store
-/// - Emits history events for other components (e.g., search indexing)
-/// - Provides the History gRPC service
-pub struct HistoryComponent {
- inner: Arc<HistoryComponentInner>,
-}
-
-struct HistoryComponentInner {
- /// Commands currently running (not yet completed).
- running: DashMap<HistoryId, History>,
-
- /// Handle to the daemon (set during start).
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
-
- /// History store for pushing records (set during start).
- history_store: tokio::sync::RwLock<Option<HistoryStore>>,
-}
-
-impl HistoryComponent {
- /// Create a new history component.
- pub fn new() -> Self {
- Self {
- inner: Arc::new(HistoryComponentInner {
- running: DashMap::new(),
- handle: tokio::sync::RwLock::new(None),
- history_store: tokio::sync::RwLock::new(None),
- }),
- }
- }
-
- /// Get the gRPC service for this component.
- ///
- /// This returns a tonic service that can be added to a gRPC server.
- pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
- HistoryServer::new(HistoryGrpcService {
- inner: self.inner.clone(),
- })
- }
-}
-
-impl Default for HistoryComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for HistoryComponent {
- fn name(&self) -> &'static str {
- "history"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- // Create the history store
- let host_id = Settings::host_id().await?;
- let history_store =
- HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
-
- *self.inner.history_store.write().await = Some(history_store);
- *self.inner.handle.write().await = Some(handle);
-
- tracing::info!("history component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
- // History component produces events but doesn't need to react to them
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- tracing::info!("history component stopped");
- Ok(())
- }
-}
-
-/// The gRPC service implementation.
-///
-/// This is a thin wrapper that delegates to the component's shared state.
-pub struct HistoryGrpcService {
- inner: Arc<HistoryComponentInner>,
-}
-
-fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
- TailHistoryReply {
- kind: kind as i32,
- history: Some(HistoryEntry {
- timestamp: history.timestamp.unix_timestamp_nanos() as u64,
- id: history.id.0,
- command: history.command,
- cwd: history.cwd,
- session: history.session,
- hostname: history.hostname,
- author: history.author,
- intent: history.intent.unwrap_or_default(),
- exit: history.exit,
- duration: history.duration,
- }),
- }
-}
-
-#[tonic::async_trait]
-impl HistorySvc for HistoryGrpcService {
- type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .author(req.author)
- .intent(req.intent)
- .build()
- .into();
-
- // Emit the event
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.emit(DaemonEvent::HistoryStarted(h.clone()));
- }
-
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- self.inner.running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let req = request.into_inner();
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = self.inner.running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Get the handle and store to save the history
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let store_guard = self.inner.history_store.read().await;
- let history_store = store_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- // Save to database
- handle
- .history_db()
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- // Push to record store
- let (record_id, idx) = history_store
- .push(history.clone())
- .await
- .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
-
- // Emit the event
- handle.emit(DaemonEvent::HistoryEnded(history));
-
- let reply = EndHistoryReply {
- id: record_id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn tail_history(
- &self,
- _request: Request<TailHistoryRequest>,
- ) -> Result<Response<Self::TailHistoryStream>, Status> {
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .cloned()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let mut rx = handle.subscribe();
- let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
-
- tokio::spawn(async move {
- loop {
- let event = match rx.recv().await {
- Ok(event) => event,
- Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
- let _ = tx
- .send(Err(Status::resource_exhausted(format!(
- "tail stream lagged behind and dropped {skipped} events"
- ))))
- .await;
- break;
- }
- Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
- };
-
- let reply = match event {
- DaemonEvent::HistoryStarted(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Started, history))
- }
- DaemonEvent::HistoryEnded(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Ended, history))
- }
- _ => None,
- };
-
- if let Some(reply) = reply
- && tx.send(Ok(reply)).await.is_err()
- {
- break;
- }
- }
- });
-
- let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
- Ok(Response::new(Box::pin(stream)))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- // Use the daemon handle to request shutdown
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.shutdown();
- }
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}
diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs
deleted file mode 100644
index 447e31df..00000000
--- a/crates/atuin-daemon/src/components/mod.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-//! Daemon components.
-//!
-//! Components are the building blocks of the daemon. Each component handles
-//! a specific domain and can:
-//!
-//! - Expose gRPC services
-//! - React to events
-//! - Spawn background tasks
-//!
-//! Available components:
-//!
-//! - [`history::HistoryComponent`]: Command history lifecycle management
-//! - [`search::SearchComponent`]: Fuzzy search over history
-//! - [`semantic::SemanticComponent`]: In-memory semantic command captures
-//! - [`sync::SyncComponent`]: Cloud sync
-
-pub mod history;
-pub mod search;
-pub mod semantic;
-pub mod sync;
-
-pub use history::HistoryComponent;
-pub use search::SearchComponent;
-pub use semantic::SemanticComponent;
-pub use sync::SyncComponent;
diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs
deleted file mode 100644
index 9fc87fae..00000000
--- a/crates/atuin-daemon/src/components/search.rs
+++ /dev/null
@@ -1,413 +0,0 @@
-//! Search component.
-//!
-//! Provides fuzzy search over command history using the Nucleo search library
-//! with frecency-based ranking and dynamic filtering.
-
-use std::{pin::Pin, sync::Arc};
-
-use atuin_client::database::Database;
-use eyre::Result;
-use tokio::sync::RwLock;
-use tokio_stream::Stream;
-use tonic::{Request, Response, Status, Streaming};
-use tracing::{Level, debug, info, instrument, span, trace};
-use uuid::Uuid;
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- search::{
- FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
- search_server::{Search as SearchSvc, SearchServer},
- },
-};
-
-const PAGE_SIZE: usize = 5000;
-const RESULTS_LIMIT: u32 = 200;
-/// How often to rebuild the frecency map (in seconds).
-const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
-
-/// Search component - provides fuzzy search over command history.
-///
-/// This component:
-/// - Maintains a deduplicated search index with frecency ranking
-/// - Loads history from the database on startup
-/// - Updates the index when history events occur
-/// - Provides the Search gRPC service
-pub struct SearchComponent {
- index: Arc<RwLock<SearchIndex>>,
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
- loader_handle: Option<tokio::task::JoinHandle<()>>,
- frecency_handle: Option<tokio::task::JoinHandle<()>>,
-}
-
-impl SearchComponent {
- /// Create a new search component.
- pub fn new() -> Self {
- Self {
- index: Arc::new(RwLock::new(SearchIndex::new())),
- handle: tokio::sync::RwLock::new(None),
- loader_handle: None,
- frecency_handle: None,
- }
- }
-
- /// Get the gRPC service for this component.
- pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
- SearchServer::new(SearchGrpcService {
- index: self.index.clone(),
- })
- }
-
- /// Rebuild the entire search index from the database.
- async fn rebuild_index(&self) -> Result<()> {
- let handle_guard = self.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .ok_or_else(|| eyre::eyre!("component not initialized"))?;
-
- info!("Rebuilding search index from database");
-
- // Create a new index
- let new_index = SearchIndex::new();
-
- // Load all history into the new index
- let db = handle.history_db().clone();
- let mut pager = db.all_paged(PAGE_SIZE, false, true);
- loop {
- match pager.next().await {
- Ok(Some(histories)) => {
- info!(
- "Loading {} history entries into search index",
- histories.len()
- );
- new_index.add_histories(&histories);
- }
- Ok(None) => break,
- Err(e) => {
- tracing::error!("Failed to load history during rebuild: {}", e);
- break;
- }
- }
- }
-
- info!(
- "Search index rebuild complete; {} unique commands",
- new_index.command_count()
- );
-
- // Replace the old index with the new one
- *self.index.write().await = new_index;
- Ok(())
- }
-}
-
-impl Default for SearchComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SearchComponent {
- fn name(&self) -> &'static str {
- "search"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- *self.handle.write().await = Some(handle.clone());
-
- // Spawn background task to load history into index
- let index = self.index.clone();
- let db = handle.history_db().clone();
- let handle_for_loader = handle.clone();
-
- self.loader_handle = Some(tokio::spawn(async move {
- info!(
- "Loading history into search index; page size = {}",
- PAGE_SIZE
- );
- let mut pager = db.all_paged(PAGE_SIZE, false, true);
- loop {
- match pager.next().await {
- Ok(Some(histories)) => {
- info!(
- "Loading {} history entries into search index",
- histories.len()
- );
- index.read().await.add_histories(&histories);
- }
- Ok(None) => {
- info!(
- "Initial history load complete; {} unique commands indexed",
- index.read().await.command_count()
- );
- // Build initial frecency map with current settings
- let settings = handle_for_loader.settings().await;
- index.read().await.rebuild_frecency(&settings.search).await;
- info!("Initial frecency map built");
- break;
- }
- Err(e) => {
- tracing::error!("Failed to load history: {}", e);
- break;
- }
- }
- }
- }));
-
- // Spawn background task to periodically refresh frecency
- let index_for_frecency = self.index.clone();
- let handle_for_frecency = handle.clone();
- self.frecency_handle = Some(tokio::spawn(async move {
- let mut interval = tokio::time::interval(std::time::Duration::from_secs(
- FRECENCY_REFRESH_INTERVAL_SECS,
- ));
- loop {
- interval.tick().await;
- trace!("Refreshing frecency map");
- let settings = handle_for_frecency.settings().await;
- index_for_frecency
- .read()
- .await
- .rebuild_frecency(&settings.search)
- .await;
- }
- }));
-
- tracing::info!("search component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- match event {
- DaemonEvent::RecordsAdded(records) => {
- debug!(
- count = records.len(),
- "Processing added records for search index"
- );
-
- let handle_guard = self.handle.read().await;
- if let Some(handle) = handle_guard.as_ref() {
- let histories: Vec<_> = handle
- .history_db()
- .query_history(
- format!(
- "select * from history where id in ({})",
- records
- .iter()
- .map(|record| record.0.to_string())
- .collect::<Vec<_>>()
- .join(",")
- )
- .as_str(),
- )
- .await
- .unwrap_or_default();
-
- span!(Level::TRACE, "inject_records", count = histories.len())
- .in_scope(async || {
- self.index.read().await.add_histories(&histories);
- })
- .await;
- }
- }
- DaemonEvent::HistoryStarted(history) => {
- debug!(id = %history.id, command = %history.command, "History started (no index action)");
- }
- DaemonEvent::HistoryEnded(history) => {
- span!(Level::TRACE, "inject_history_ended")
- .in_scope(async || {
- self.index.read().await.add_history(history);
- })
- .await;
- }
- DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
- info!("History store pruned or rebuilt, rebuilding search index");
- if let Err(e) = self.rebuild_index().await {
- tracing::error!("Failed to rebuild search index: {}", e);
- }
- }
- DaemonEvent::HistoryDeleted { ids } => {
- info!(
- count = ids.len(),
- "History deleted, rebuilding search index"
- );
- // For now, just rebuild the entire index. A more efficient implementation
- // would remove specific items from the index.
- if let Err(e) = self.rebuild_index().await {
- tracing::error!("Failed to rebuild search index: {}", e);
- }
- }
- DaemonEvent::SettingsReloaded => {
- info!("Settings reloaded, rebuilding frecency map with new multipliers");
- let handle_guard = self.handle.read().await;
- if let Some(handle) = handle_guard.as_ref() {
- let settings = handle.settings().await;
- self.index
- .read()
- .await
- .rebuild_frecency(&settings.search)
- .await;
- }
- }
- // Events we don't care about
- DaemonEvent::SyncCompleted { .. }
- | DaemonEvent::SyncFailed { .. }
- | DaemonEvent::ForceSync
- | DaemonEvent::ShutdownRequested => {}
- }
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- if let Some(handle) = self.loader_handle.take() {
- handle.abort();
- }
- if let Some(handle) = self.frecency_handle.take() {
- handle.abort();
- }
- tracing::info!("search component stopped");
- Ok(())
- }
-}
-
-/// The gRPC service implementation.
-pub struct SearchGrpcService {
- index: Arc<RwLock<SearchIndex>>,
-}
-
-#[tonic::async_trait]
-impl SearchSvc for SearchGrpcService {
- type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
-
- #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
- async fn search(
- &self,
- request: Request<Streaming<SearchRequest>>,
- ) -> Result<Response<Self::SearchStream>, Status> {
- let mut in_stream = request.into_inner();
- let index = self.index.clone();
-
- // Create output channel
- let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
-
- // Spawn task to handle incoming requests and send responses
- tokio::spawn(async move {
- while let Some(req) = in_stream.message().await.transpose() {
- match req {
- Ok(search_req) => {
- let query = search_req.query;
- let query_id = search_req.query_id;
- let filter_mode: FilterMode = search_req
- .filter_mode
- .try_into()
- .unwrap_or(FilterMode::Global);
- let proto_context = search_req.context;
-
- debug!(
- "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
- query,
- query_id,
- filter_mode.as_str_name(),
- proto_context
- );
-
- // Convert proto FilterMode + context to IndexFilterMode
- let index_filter = convert_filter_mode(filter_mode, &proto_context);
-
- // Build QueryContext from proto context
- let query_context = proto_context
- .map(|ctx| QueryContext {
- cwd: Some(with_trailing_slash(&ctx.cwd)),
- git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
- hostname: Some(ctx.hostname),
- session_id: Some(ctx.session_id),
- })
- .unwrap_or_default();
-
- // Perform the search
- let history_ids =
- span!(Level::TRACE, "daemon_search_query", %query, query_id)
- .in_scope(|| async {
- let index = index.read().await;
- index
- .search(&query, index_filter, &query_context, RESULTS_LIMIT)
- .await
- })
- .await;
-
- // Convert history IDs to bytes
- let ids: Vec<Vec<u8>> = history_ids
- .iter()
- .filter_map(|id| {
- Uuid::parse_str(id)
- .ok()
- .map(|uuid| uuid.as_bytes().to_vec())
- })
- .collect();
-
- if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
- break; // Client disconnected
- }
- }
- Err(e) => {
- let _ = tx.send(Err(e)).await;
- break;
- }
- }
- }
- });
-
- // Convert receiver to stream
- let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream)))
- }
-}
-
-/// Convert proto FilterMode and context to IndexFilterMode.
-fn convert_filter_mode(
- mode: FilterMode,
- context: &Option<crate::search::SearchContext>,
-) -> IndexFilterMode {
- match (mode, context) {
- (FilterMode::Global, _) => IndexFilterMode::Global,
- (FilterMode::Directory, Some(ctx)) => {
- IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
- }
- (FilterMode::Workspace, Some(ctx)) => {
- if let Some(ref git_root) = ctx.git_root {
- IndexFilterMode::Workspace(with_trailing_slash(git_root))
- } else {
- // Fall back to directory if no git root
- IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
- }
- }
- (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
- (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
- (FilterMode::SessionPreload, Some(ctx)) => {
- // SessionPreload is similar to Session - filter by session
- IndexFilterMode::Session(ctx.session_id.clone())
- }
- // If no context provided, fall back to global
- _ => IndexFilterMode::Global,
- }
-}
-
-#[cfg(windows)]
-pub fn with_trailing_slash(s: &str) -> String {
- if s.ends_with('\\') {
- s.to_string()
- } else {
- format!("{}\\", s)
- }
-}
-
-#[cfg(not(windows))]
-pub fn with_trailing_slash(s: &str) -> String {
- if s.ends_with('/') {
- s.to_string()
- } else {
- format!("{}/", s)
- }
-}
diff --git a/crates/atuin-daemon/src/components/semantic.rs b/crates/atuin-daemon/src/components/semantic.rs
deleted file mode 100644
index dff38fd3..00000000
--- a/crates/atuin-daemon/src/components/semantic.rs
+++ /dev/null
@@ -1,900 +0,0 @@
-//! Semantic command capture component.
-//!
-//! This is a prototype in-memory store for completed command captures emitted
-//! by atuin-pty-proxy. It keeps recent captures per Atuin session and indexes
-//! them by history ID for AI tool lookup.
-
-use std::collections::{HashMap, VecDeque};
-use std::fmt::{Display, Formatter};
-use std::sync::Arc;
-
-use atuin_client::history::{History, HistoryId};
-use eyre::Result;
-use tokio::sync::Mutex;
-use tonic::{Request, Response, Status, Streaming};
-use tracing::{Level, instrument};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- semantic::{
- CommandCapture, CommandOutputReply, CommandOutputRequest, OutputLine, RecordCommandsReply,
- semantic_server::{Semantic as SemanticSvc, SemanticServer},
- },
-};
-
-const MAX_SESSIONS: usize = 20;
-const MAX_COMMANDS_PER_SESSION: usize = 128;
-const MAX_BYTES_PER_SESSION: usize = 32 * 1024 * 1024;
-const MAX_PENDING_HISTORIES: usize = 128;
-
-/// Stores completed command captures and associates them with history events.
-pub struct SemanticComponent {
- inner: Arc<SemanticComponentInner>,
-}
-
-struct SemanticComponentInner {
- state: Mutex<SemanticState>,
-}
-
-#[derive(Default)]
-struct SemanticState {
- sessions: HashMap<SessionId, SessionCaptures>,
- session_lru: VecDeque<SessionId>,
- history_index: HashMap<HistoryId, CaptureRef>,
- pending_histories: VecDeque<History>,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-struct SessionId(String);
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-struct CaptureId(u64);
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-struct CaptureRef {
- session_id: SessionId,
- capture_id: CaptureId,
-}
-
-#[derive(Default)]
-struct SessionCaptures {
- next_id: u64,
- records: VecDeque<StoredCapture>,
- output_bytes: usize,
-}
-
-struct StoredCapture {
- id: CaptureId,
- history_id: HistoryId,
- output_bytes: usize,
- record: SemanticCommandRecord,
-}
-
-struct EvictedCapture {
- history_id: HistoryId,
- capture_id: CaptureId,
-}
-
-#[derive(Debug, Clone)]
-struct SemanticCommandRecord {
- capture: CommandCapture,
- history: Option<History>,
-}
-
-impl SemanticComponent {
- pub fn new() -> Self {
- Self {
- inner: Arc::new(SemanticComponentInner {
- state: Mutex::new(SemanticState::default()),
- }),
- }
- }
-
- pub fn grpc_service(&self) -> SemanticServer<SemanticGrpcService> {
- SemanticServer::new(SemanticGrpcService {
- inner: self.inner.clone(),
- })
- }
-}
-
-impl Default for SemanticComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SemanticComponent {
- fn name(&self) -> &'static str {
- "semantic"
- }
-
- async fn start(&mut self, _handle: DaemonHandle) -> Result<()> {
- tracing::info!("semantic component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- if let DaemonEvent::HistoryEnded(history) = event {
- self.inner.record_history(history.clone()).await;
- }
-
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- let state = self.inner.state.lock().await;
- tracing::info!(
- sessions = state.sessions.len(),
- records = state.record_count(),
- indexed_histories = state.history_index.len(),
- pending_histories = state.pending_histories.len(),
- "semantic component stopped"
- );
- Ok(())
- }
-}
-
-impl SemanticComponentInner {
- async fn record_capture(&self, capture: CommandCapture) -> bool {
- let mut state = self.state.lock().await;
- state.record_capture(capture)
- }
-
- async fn record_history(&self, history: History) {
- let mut state = self.state.lock().await;
- state.record_history(history);
- }
-
- async fn command_output(&self, request: &CommandOutputRequest) -> CommandOutputReply {
- let mut state = self.state.lock().await;
- state.command_output(request)
- }
-}
-
-impl SemanticState {
- fn record_capture(&mut self, mut capture: CommandCapture) -> bool {
- let Some(history_id) = history_id_from_str(capture.history_id.as_deref()) else {
- tracing::debug!(
- command_bytes = capture.command.len(),
- prompt_bytes = capture.prompt.len(),
- output_bytes = capture.output.len(),
- output_truncated = capture.output_truncated,
- "dropping semantic command capture without history id"
- );
- return false;
- };
-
- let history = take_pending_history(&mut self.pending_histories, &history_id);
- let Some(session_id) = capture
- .session_id
- .as_deref()
- .and_then(|session_id| SessionId::try_from(session_id).ok())
- .or_else(|| {
- history
- .as_ref()
- .and_then(|history| SessionId::try_from(history.session.as_str()).ok())
- })
- else {
- tracing::debug!(
- history_id = %history_id,
- command_bytes = capture.command.len(),
- prompt_bytes = capture.prompt.len(),
- output_bytes = capture.output.len(),
- output_truncated = capture.output_truncated,
- "dropping semantic command capture without session id"
- );
- return false;
- };
-
- capture.history_id = Some(history_id.to_string());
- capture.session_id = Some(session_id.to_string());
- if capture.output_observed_bytes == 0 {
- capture.output_observed_bytes = capture.output.len() as u64;
- }
-
- let record = SemanticCommandRecord { capture, history };
- log_record(&record, "recorded semantic command capture");
- self.push_record(session_id, history_id, record);
- true
- }
-
- fn record_history(&mut self, history: History) {
- let history_id = history.id.clone();
-
- if let Some(capture_ref) = self.history_index.get(&history_id).cloned() {
- if let Some(stored) = self.stored_capture_mut(&capture_ref) {
- stored.record.history = Some(history);
- log_record(
- &stored.record,
- "associated semantic command capture with history",
- );
- return;
- }
-
- self.history_index.remove(&history_id);
- }
-
- tracing::debug!(
- id = %history.id,
- command_bytes = history.command.len(),
- "history ended before semantic capture arrived"
- );
- push_pending_history(&mut self.pending_histories, history);
- }
-
- fn command_output(&mut self, request: &CommandOutputRequest) -> CommandOutputReply {
- let Some(history_id) = history_id_from_str(Some(&request.history_id)) else {
- return command_output_not_found();
- };
- let Some(capture_ref) = self.history_index.get(&history_id).cloned() else {
- return command_output_not_found();
- };
-
- let Some(reply) = self.command_output_for_ref(&capture_ref, &request.ranges) else {
- self.history_index.remove(&history_id);
- return command_output_not_found();
- };
-
- self.touch_session(&capture_ref.session_id);
- reply
- }
-
- fn command_output_for_ref(
- &self,
- capture_ref: &CaptureRef,
- ranges: &[crate::semantic::OutputRange],
- ) -> Option<CommandOutputReply> {
- let stored = self
- .sessions
- .get(&capture_ref.session_id)?
- .stored_capture(capture_ref.capture_id)?;
- let output = &stored.record.capture.output;
- let output_observed_bytes = stored
- .record
- .capture
- .output_observed_bytes
- .max(output.len() as u64);
-
- Some(CommandOutputReply {
- found: true,
- output: String::new(),
- total_bytes: output.len() as u64,
- total_lines: output.lines().count() as u64,
- lines: select_output_ranges(output, ranges),
- output_truncated: stored.record.capture.output_truncated,
- output_observed_bytes,
- })
- }
-
- fn push_record(
- &mut self,
- session_id: SessionId,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- ) {
- self.touch_session(&session_id);
-
- let (capture_id, evicted) = {
- let session = self.sessions.entry(session_id.clone()).or_default();
- session.push(history_id.clone(), record)
- };
-
- let capture_ref = CaptureRef {
- session_id: session_id.clone(),
- capture_id,
- };
- self.history_index.insert(history_id, capture_ref);
-
- for evicted in evicted {
- self.remove_history_index_if_matches(
- &session_id,
- &evicted.history_id,
- evicted.capture_id,
- );
- }
-
- self.expire_lru_sessions();
- }
-
- fn touch_session(&mut self, session_id: &SessionId) {
- if let Some(index) = self.session_lru.iter().position(|id| id == session_id) {
- self.session_lru.remove(index);
- }
- self.session_lru.push_back(session_id.clone());
- }
-
- fn expire_lru_sessions(&mut self) {
- while self.session_lru.len() > MAX_SESSIONS {
- let Some(session_id) = self.session_lru.pop_front() else {
- break;
- };
- let Some(session) = self.sessions.remove(&session_id) else {
- continue;
- };
-
- for stored in session.records {
- self.remove_history_index_if_matches(&session_id, &stored.history_id, stored.id);
- }
- }
- }
-
- fn remove_history_index_if_matches(
- &mut self,
- session_id: &SessionId,
- history_id: &HistoryId,
- capture_id: CaptureId,
- ) {
- if self
- .history_index
- .get(history_id)
- .is_some_and(|capture_ref| {
- &capture_ref.session_id == session_id && capture_ref.capture_id == capture_id
- })
- {
- self.history_index.remove(history_id);
- }
- }
-
- fn stored_capture_mut(&mut self, capture_ref: &CaptureRef) -> Option<&mut StoredCapture> {
- self.sessions
- .get_mut(&capture_ref.session_id)?
- .stored_capture_mut(capture_ref.capture_id)
- }
-
- fn record_count(&self) -> usize {
- self.sessions
- .values()
- .map(|session| session.records.len())
- .sum()
- }
-}
-
-impl SessionCaptures {
- fn push(
- &mut self,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- ) -> (CaptureId, Vec<EvictedCapture>) {
- self.push_with_limits(
- history_id,
- record,
- MAX_COMMANDS_PER_SESSION,
- MAX_BYTES_PER_SESSION,
- )
- }
-
- fn push_with_limits(
- &mut self,
- history_id: HistoryId,
- record: SemanticCommandRecord,
- max_commands: usize,
- max_output_bytes: usize,
- ) -> (CaptureId, Vec<EvictedCapture>) {
- let capture_id = CaptureId(self.next_id);
- self.next_id = self.next_id.saturating_add(1);
- let output_bytes = record.capture.output.len();
- self.output_bytes = self.output_bytes.saturating_add(output_bytes);
- self.records.push_back(StoredCapture {
- id: capture_id,
- history_id,
- output_bytes,
- record,
- });
-
- (
- capture_id,
- self.evict_to_limits(max_commands, max_output_bytes),
- )
- }
-
- fn evict_to_limits(
- &mut self,
- max_commands: usize,
- max_output_bytes: usize,
- ) -> Vec<EvictedCapture> {
- let mut evicted = Vec::new();
- while self.records.len() > max_commands || self.output_bytes > max_output_bytes {
- let Some(record) = self.records.pop_front() else {
- break;
- };
- self.output_bytes = self.output_bytes.saturating_sub(record.output_bytes);
- evicted.push(EvictedCapture {
- history_id: record.history_id,
- capture_id: record.id,
- });
- }
- evicted
- }
-
- fn stored_capture(&self, capture_id: CaptureId) -> Option<&StoredCapture> {
- self.records.iter().find(|record| record.id == capture_id)
- }
-
- fn stored_capture_mut(&mut self, capture_id: CaptureId) -> Option<&mut StoredCapture> {
- self.records
- .iter_mut()
- .find(|record| record.id == capture_id)
- }
-}
-
-impl TryFrom<&str> for SessionId {
- type Error = ();
-
- fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
- let value = value.trim();
- if value.is_empty() {
- return Err(());
- }
-
- Ok(Self(value.to_string()))
- }
-}
-
-impl TryFrom<String> for SessionId {
- type Error = ();
-
- fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
- Self::try_from(value.as_str())
- }
-}
-
-impl AsRef<str> for SessionId {
- fn as_ref(&self) -> &str {
- &self.0
- }
-}
-
-impl Display for SessionId {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(&self.0)
- }
-}
-
-pub struct SemanticGrpcService {
- inner: Arc<SemanticComponentInner>,
-}
-
-#[tonic::async_trait]
-impl SemanticSvc for SemanticGrpcService {
- #[instrument(skip_all, level = Level::INFO)]
- async fn record_commands(
- &self,
- request: Request<Streaming<CommandCapture>>,
- ) -> Result<Response<RecordCommandsReply>, Status> {
- let mut stream = request.into_inner();
- let mut accepted = 0_u64;
-
- while let Some(capture) = stream.message().await? {
- if self.inner.record_capture(capture).await {
- accepted += 1;
- }
- }
-
- Ok(Response::new(RecordCommandsReply { accepted }))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn command_output(
- &self,
- request: Request<CommandOutputRequest>,
- ) -> Result<Response<CommandOutputReply>, Status> {
- let request = request.into_inner();
- if request.history_id.trim().is_empty() {
- return Err(Status::invalid_argument("history_id is required"));
- }
-
- Ok(Response::new(self.inner.command_output(&request).await))
- }
-}
-
-fn history_id_from_str(value: Option<&str>) -> Option<HistoryId> {
- let value = value?.trim();
- (!value.is_empty()).then(|| HistoryId(value.to_string()))
-}
-
-fn take_pending_history(
- histories: &mut VecDeque<History>,
- history_id: &HistoryId,
-) -> Option<History> {
- let index = histories
- .iter()
- .position(|history| &history.id == history_id)?;
- histories.remove(index)
-}
-
-fn push_pending_history(histories: &mut VecDeque<History>, history: History) {
- if let Some(index) = histories
- .iter()
- .position(|pending| pending.id == history.id)
- {
- histories.remove(index);
- }
-
- histories.push_back(history);
- trim_front(histories, MAX_PENDING_HISTORIES);
-}
-
-fn trim_front<T>(records: &mut VecDeque<T>, max_len: usize) {
- while records.len() > max_len {
- records.pop_front();
- }
-}
-
-fn command_output_not_found() -> CommandOutputReply {
- CommandOutputReply {
- found: false,
- output: String::new(),
- total_bytes: 0,
- total_lines: 0,
- lines: Vec::new(),
- output_truncated: false,
- output_observed_bytes: 0,
- }
-}
-
-fn select_output_ranges(output: &str, ranges: &[crate::semantic::OutputRange]) -> Vec<OutputLine> {
- let lines: Vec<&str> = output.lines().collect();
- if lines.is_empty() {
- return Vec::new();
- }
-
- let ranges = if ranges.is_empty() {
- vec![crate::semantic::OutputRange { start: 0, end: 999 }]
- } else {
- ranges.to_vec()
- };
-
- let mut ranges = ranges
- .into_iter()
- .filter_map(|range| normalize_line_range(range.start, range.end, lines.len()))
- .collect::<Vec<_>>();
- ranges.sort_unstable_by_key(|(start, _)| *start);
-
- let mut merged: Vec<(usize, usize)> = Vec::new();
- for (start, end) in ranges {
- match merged.last_mut() {
- Some((_, merged_end)) if start <= merged_end.saturating_add(1) => {
- *merged_end = (*merged_end).max(end);
- }
- _ => merged.push((start, end)),
- }
- }
-
- merged
- .into_iter()
- .flat_map(|(start, end)| {
- lines[start..=end]
- .iter()
- .enumerate()
- .map(move |(offset, line)| OutputLine {
- line_number: (start + offset + 1) as u64,
- content: (*line).to_string(),
- })
- })
- .collect()
-}
-
-fn normalize_line_range(start: i64, end: i64, line_count: usize) -> Option<(usize, usize)> {
- let line_count = i64::try_from(line_count).ok()?;
- let start = if start < 0 { line_count + start } else { start };
- let end = if end < 0 { line_count + end } else { end };
-
- if end < 0 || start >= line_count {
- return None;
- }
-
- let start = start.max(0);
- let end = end.min(line_count - 1);
-
- (start <= end).then_some((start as usize, end as usize))
-}
-
-fn log_record(record: &SemanticCommandRecord, message: &'static str) {
- let history_id = record.capture.history_id.as_deref().unwrap_or("<missing>");
- let associated_history_id = record
- .history
- .as_ref()
- .map(|history| history.id.to_string());
- let exit = record.history.as_ref().map(|history| history.exit);
- let duration = record.history.as_ref().map(|history| history.duration);
- let author = record
- .history
- .as_ref()
- .map(|history| history.author.as_str());
- let session_id = record.capture.session_id.as_deref();
-
- tracing::debug!(
- history_id = %history_id,
- associated_history_id = ?associated_history_id,
- session_id = ?session_id,
- command_bytes = record.capture.command.len(),
- prompt_bytes = record.capture.prompt.len(),
- output_bytes = record.capture.output.len(),
- output_truncated = record.capture.output_truncated,
- output_observed_bytes = record.capture.output_observed_bytes,
- capture_exit_code = ?record.capture.exit_code,
- history_exit = ?exit,
- duration = ?duration,
- author = ?author,
- "{message}"
- );
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use time::OffsetDateTime;
-
- fn history(id: &str, session: &str, command: &str) -> History {
- History {
- id: HistoryId(id.to_string()),
- timestamp: OffsetDateTime::UNIX_EPOCH,
- duration: 0,
- exit: 0,
- command: command.to_string(),
- cwd: String::new(),
- session: session.to_string(),
- hostname: String::new(),
- author: String::new(),
- intent: None,
- deleted_at: None,
- }
- }
-
- fn capture(history_id: Option<&str>, session_id: Option<&str>, output: &str) -> CommandCapture {
- CommandCapture {
- prompt: String::new(),
- command: String::new(),
- output: output.to_string(),
- exit_code: None,
- history_id: history_id.map(str::to_string),
- session_id: session_id.map(str::to_string),
- output_truncated: false,
- output_observed_bytes: output.len() as u64,
- }
- }
-
- fn command_output(state: &mut SemanticState, history_id: &str) -> CommandOutputReply {
- state.command_output(&CommandOutputRequest {
- history_id: history_id.to_string(),
- ranges: Vec::new(),
- })
- }
-
- fn output_line(line_number: u64, content: &str) -> OutputLine {
- OutputLine {
- line_number,
- content: content.to_string(),
- }
- }
-
- #[test]
- fn drops_capture_without_history_id() {
- let mut state = SemanticState::default();
-
- assert!(!state.record_capture(capture(None, Some("session-1"), "output")));
- assert!(!command_output(&mut state, "id-1").found);
- assert_eq!(state.record_count(), 0);
- }
-
- #[test]
- fn stores_capture_by_session_and_history_id() {
- let mut state = SemanticState::default();
-
- assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
-
- let reply = command_output(&mut state, "id-1");
- assert!(reply.found);
- assert_eq!(reply.total_bytes, 6);
- assert_eq!(reply.output_observed_bytes, 6);
- assert_eq!(reply.lines, vec![output_line(1, "output")]);
- }
-
- #[test]
- fn uses_pending_history_session_when_capture_session_is_missing() {
- let mut state = SemanticState::default();
-
- state.record_history(history("id-1", "session-from-history", "cargo test"));
- assert!(state.record_capture(capture(Some("id-1"), None, "output")));
-
- assert!(
- state
- .sessions
- .contains_key(&SessionId("session-from-history".to_string()))
- );
- assert!(command_output(&mut state, "id-1").found);
- }
-
- #[test]
- fn associates_history_by_id_after_capture_arrives() {
- let mut state = SemanticState::default();
-
- assert!(state.record_capture(capture(Some("id-1"), Some("session-1"), "output")));
- state.record_history(history("id-1", "session-1", "different command"));
-
- let capture_ref = state
- .history_index
- .get(&HistoryId("id-1".to_string()))
- .unwrap();
- let stored = state
- .sessions
- .get(&capture_ref.session_id)
- .unwrap()
- .stored_capture(capture_ref.capture_id)
- .unwrap();
- assert!(stored.record.history.is_some());
- }
-
- #[test]
- fn evicts_oldest_command_when_session_ring_is_full() {
- let mut state = SemanticState::default();
-
- for index in 0..=MAX_COMMANDS_PER_SESSION {
- assert!(state.record_capture(capture(
- Some(&format!("id-{index}")),
- Some("session-1"),
- "output",
- )));
- }
-
- assert!(!command_output(&mut state, "id-0").found);
- assert!(command_output(&mut state, &format!("id-{MAX_COMMANDS_PER_SESSION}")).found);
- assert_eq!(state.record_count(), MAX_COMMANDS_PER_SESSION);
- }
-
- #[test]
- fn evicts_oldest_session_after_lru_limit() {
- let mut state = SemanticState::default();
-
- for index in 0..MAX_SESSIONS {
- assert!(state.record_capture(capture(
- Some(&format!("id-{index}")),
- Some(&format!("session-{index}")),
- "output",
- )));
- }
- assert!(command_output(&mut state, "id-0").found);
-
- assert!(state.record_capture(capture(Some("new-id"), Some("new-session"), "output",)));
-
- assert!(command_output(&mut state, "id-0").found);
- assert!(!command_output(&mut state, "id-1").found);
- assert!(command_output(&mut state, "new-id").found);
- assert_eq!(state.sessions.len(), MAX_SESSIONS);
- }
-
- #[test]
- fn evicts_by_session_byte_limit() {
- let mut session = SessionCaptures::default();
- let first_output = "x".repeat(10);
- let second_output = "y";
- let (_, evicted_first) = session.push_with_limits(
- HistoryId("first".to_string()),
- SemanticCommandRecord {
- capture: capture(Some("first"), Some("session-1"), &first_output),
- history: None,
- },
- MAX_COMMANDS_PER_SESSION,
- 10,
- );
- assert!(evicted_first.is_empty());
-
- let (_, evicted_second) = session.push_with_limits(
- HistoryId("second".to_string()),
- SemanticCommandRecord {
- capture: capture(Some("second"), Some("session-1"), second_output),
- history: None,
- },
- MAX_COMMANDS_PER_SESSION,
- 10,
- );
-
- assert_eq!(evicted_second.len(), 1);
- assert_eq!(evicted_second[0].history_id, HistoryId("first".to_string()));
- assert_eq!(session.records.len(), 1);
- assert_eq!(session.output_bytes, 1);
- }
-
- #[test]
- fn command_output_reports_truncation_metadata() {
- let mut state = SemanticState::default();
- let mut capture = capture(Some("id-1"), Some("session-1"), "partial");
- capture.output_truncated = true;
- capture.output_observed_bytes = 1024;
-
- assert!(state.record_capture(capture));
-
- let reply = command_output(&mut state, "id-1");
- assert!(reply.output_truncated);
- assert_eq!(reply.total_bytes, 7);
- assert_eq!(reply.output_observed_bytes, 1024);
- }
-
- #[test]
- fn output_ranges_are_line_based_inclusive_and_support_negative_offsets() {
- let output = "zero\none\ntwo\nthree\nfour";
- let ranges = vec![
- crate::semantic::OutputRange { start: 1, end: 2 },
- crate::semantic::OutputRange { start: -2, end: -1 },
- ];
-
- assert_eq!(
- select_output_ranges(output, &ranges),
- vec![
- output_line(2, "one"),
- output_line(3, "two"),
- output_line(4, "three"),
- output_line(5, "four"),
- ]
- );
- }
-
- #[test]
- fn output_ranges_merge_overlaps_and_adjacent_ranges() {
- let output = (0..100)
- .map(|n| format!("line {n}"))
- .collect::<Vec<_>>()
- .join("\n");
- let ranges = vec![
- crate::semantic::OutputRange { start: 0, end: 100 },
- crate::semantic::OutputRange {
- start: -100,
- end: -1,
- },
- ];
-
- let selected = select_output_ranges(&output, &ranges);
-
- assert_eq!(selected.len(), 100);
- assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
- assert_eq!(selected.last(), Some(&output_line(100, "line 99")));
- }
-
- #[test]
- fn output_ranges_can_leave_gaps_for_client_formatting() {
- let output = "zero\none\ntwo\nthree\nfour";
- let ranges = vec![
- crate::semantic::OutputRange { start: 0, end: 1 },
- crate::semantic::OutputRange { start: 4, end: 4 },
- ];
-
- assert_eq!(
- select_output_ranges(output, &ranges),
- vec![
- output_line(1, "zero"),
- output_line(2, "one"),
- output_line(5, "four"),
- ]
- );
- }
-
- #[test]
- fn empty_output_ranges_default_to_first_thousand_lines() {
- let output = (0..1001)
- .map(|n| format!("line {n}"))
- .collect::<Vec<_>>()
- .join("\n");
-
- let selected = select_output_ranges(&output, &[]);
-
- assert_eq!(selected.len(), 1000);
- assert_eq!(selected.first(), Some(&output_line(1, "line 0")));
- assert_eq!(selected.last(), Some(&output_line(1000, "line 999")));
- }
-
- #[test]
- fn output_ranges_skip_ranges_fully_outside_output() {
- let output = "zero\none\ntwo";
- let ranges = vec![
- crate::semantic::OutputRange { start: 10, end: 20 },
- crate::semantic::OutputRange {
- start: -20,
- end: -10,
- },
- ];
-
- assert_eq!(select_output_ranges(output, &ranges), Vec::new());
- }
-}
diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs
deleted file mode 100644
index 6e486250..00000000
--- a/crates/atuin-daemon/src/components/sync.rs
+++ /dev/null
@@ -1,279 +0,0 @@
-//! Sync component.
-//!
-//! Handles periodic synchronization with the Atuin cloud server.
-
-use std::time::Duration;
-
-use eyre::Result;
-use rand::Rng;
-use tokio::sync::mpsc;
-use tokio::time::{self, MissedTickBehavior};
-
-use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
-};
-
-/// Commands that can be sent to the sync task.
-enum SyncCommand {
- /// Trigger an immediate sync.
- ForceSync,
- /// Stop the sync loop.
- Stop,
-}
-
-/// Sync state - tracks whether we're in normal operation or retrying after failure.
-#[derive(Clone, Copy, PartialEq, Eq)]
-enum SyncState {
- /// Normal operation. Periodic syncs only run if auto_sync is enabled.
- Idle,
- /// Retrying after a sync failure. Retries continue regardless of auto_sync
- /// until the sync succeeds.
- Retrying,
-}
-
-/// Sync component - handles periodic cloud synchronization.
-///
-/// This component:
-/// - Runs a background sync loop on a configurable interval
-/// - Implements exponential backoff on sync failures
-/// - Responds to ForceSync events for immediate sync
-/// - Emits SyncCompleted/SyncFailed events
-pub struct SyncComponent {
- task_handle: Option<tokio::task::JoinHandle<()>>,
- command_tx: Option<mpsc::Sender<SyncCommand>>,
-}
-
-impl SyncComponent {
- /// Create a new sync component.
- pub fn new() -> Self {
- Self {
- task_handle: None,
- command_tx: None,
- }
- }
-}
-
-impl Default for SyncComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for SyncComponent {
- fn name(&self) -> &'static str {
- "sync"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- let (cmd_tx, cmd_rx) = mpsc::channel(16);
- self.command_tx = Some(cmd_tx);
-
- // Spawn the sync loop with its own copy of the handle
- self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
-
- tracing::info!("sync component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
- if let DaemonEvent::ForceSync = event {
- tracing::info!("force sync requested");
- if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::ForceSync).await;
- }
- }
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- if let Some(tx) = &self.command_tx {
- let _ = tx.send(SyncCommand::Stop).await;
- }
- if let Some(handle) = self.task_handle.take() {
- // Give the task a moment to shut down gracefully
- let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
- }
- tracing::info!("sync component stopped");
- Ok(())
- }
-}
-
-/// The main sync loop.
-///
-/// This runs in a spawned task and handles periodic sync as well as
-/// force sync requests.
-async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
- tracing::info!("sync loop starting");
-
- // Clone settings since we need them across await points
- let settings = handle.settings().await.clone();
- let host_id = match Settings::host_id().await {
- Ok(id) => id,
- Err(e) => {
- tracing::error!("failed to get host id, sync disabled: {e}");
- return;
- }
- };
-
- // Create the stores we need
- let encryption_key = *handle.encryption_key();
- let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
-
- // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
- let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
-
- let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
-
- // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
- // we may end up running a lot of syncs in a hot loop.
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- let mut sync_state = SyncState::Idle;
-
- loop {
- tokio::select! {
- _ = ticker.tick() => {
- let settings = handle.settings().await;
-
- // Skip periodic ticks if auto_sync is disabled AND we're not retrying
- // a previous failure. Retries must continue regardless of auto_sync.
- if !settings.auto_sync && sync_state == SyncState::Idle {
- tracing::debug!("auto_sync disabled, skipping periodic sync tick");
- continue;
- }
-
- sync_state = do_sync_tick(
- &handle,
- &history_store,
- &mut ticker,
- max_interval,
- &settings,
- ).await;
- }
- cmd = cmd_rx.recv() => {
- match cmd {
- Some(SyncCommand::ForceSync) => {
- tracing::info!("executing force sync");
- let settings = handle.settings().await;
- sync_state = do_sync_tick(
- &handle,
- &history_store,
- &mut ticker,
- max_interval,
- &settings,
- ).await;
- }
- Some(SyncCommand::Stop) | None => {
- tracing::info!("sync loop stopping");
- break;
- }
- }
- }
- }
- }
-}
-
-/// Execute a single sync tick.
-///
-/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
-async fn do_sync_tick(
- handle: &DaemonHandle,
- history_store: &HistoryStore,
- ticker: &mut time::Interval,
- max_interval: f64,
- settings: &Settings,
-) -> SyncState {
- tracing::info!("sync tick");
-
- // Check if logged in
- let logged_in = match settings.logged_in().await {
- Ok(v) => v,
- Err(e) => {
- tracing::warn!("failed to check login status, skipping sync tick: {e}");
- return SyncState::Idle;
- }
- };
-
- if !logged_in {
- tracing::debug!("not logged in, skipping sync tick");
- return SyncState::Idle;
- }
-
- // Perform the sync
- let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
-
- match res {
- Err(e) => {
- tracing::error!("sync tick failed with {e}");
-
- // Emit failure event
- handle.emit(DaemonEvent::SyncFailed {
- error: e.to_string(),
- });
-
- // Exponential backoff
- let mut rng = rand::thread_rng();
- let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
-
- if new_interval > max_interval {
- new_interval = max_interval;
- }
-
- *ticker = time::interval_at(
- tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
- time::Duration::from_secs(new_interval as u64),
- );
- ticker.reset_after(time::Duration::from_secs(new_interval as u64));
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- tracing::error!("backing off, next sync tick in {new_interval}");
-
- SyncState::Retrying
- }
- Ok((uploaded_count, downloaded_records)) => {
- tracing::info!(
- uploaded = uploaded_count,
- downloaded = downloaded_records.len(),
- "sync complete"
- );
-
- // Build history from downloaded records
- if let Err(e) = history_store
- .incremental_build(handle.history_db(), &downloaded_records)
- .await
- {
- tracing::error!("failed to build history from downloaded records: {e}");
- }
-
- // Emit the records added event (for search indexing)
- handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
-
- // Emit sync completed event
- handle.emit(DaemonEvent::SyncCompleted {
- uploaded: uploaded_count as usize,
- downloaded: downloaded_records.len(),
- });
-
- // Reset backoff on success
- if ticker.period().as_secs() != settings.daemon.sync_frequency {
- *ticker = time::interval_at(
- tokio::time::Instant::now()
- + Duration::from_secs(settings.daemon.sync_frequency),
- time::Duration::from_secs(settings.daemon.sync_frequency),
- );
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
- }
-
- // Store sync time
- if let Err(e) = Settings::save_sync_time().await {
- tracing::error!("failed to save sync time: {e}");
- }
-
- SyncState::Idle
- }
- }
-}
diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs
deleted file mode 100644
index afb29c57..00000000
--- a/crates/atuin-daemon/src/control/mod.rs
+++ /dev/null
@@ -1,12 +0,0 @@
-//! Control module for external event injection.
-//!
-//! This module provides the gRPC service that allows external processes
-//! (like CLI commands) to inject events into the daemon's event bus.
-
-mod service;
-
-// Include the generated proto code
-tonic::include_proto!("control");
-
-// Re-export the service
-pub use service::ControlService;
diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs
deleted file mode 100644
index 2e7403ce..00000000
--- a/crates/atuin-daemon/src/control/service.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-//! Control service implementation.
-//!
-//! This gRPC service allows external processes (like CLI commands) to inject
-//! events into the daemon's event bus.
-
-use atuin_client::history::HistoryId;
-use tonic::{Request, Response, Status};
-use tracing::{Level, info, instrument};
-
-use super::{
- SendEventRequest, SendEventResponse,
- control_server::{Control, ControlServer},
- send_event_request::Event,
-};
-use crate::{daemon::DaemonHandle, events::DaemonEvent};
-
-/// The Control gRPC service.
-///
-/// This service is used by external processes to inject events into the daemon.
-/// It's not a component - it's part of the daemon's core infrastructure.
-pub struct ControlService {
- handle: DaemonHandle,
-}
-
-impl ControlService {
- /// Create a new control service with the given daemon handle.
- pub fn new(handle: DaemonHandle) -> Self {
- Self { handle }
- }
-
- /// Get a tonic server for this service.
- pub fn into_server(self) -> ControlServer<Self> {
- ControlServer::new(self)
- }
-}
-
-#[tonic::async_trait]
-impl Control for ControlService {
- #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
- async fn send_event(
- &self,
- request: Request<SendEventRequest>,
- ) -> Result<Response<SendEventResponse>, Status> {
- let req = request.into_inner();
-
- let event = req
- .event
- .ok_or_else(|| Status::invalid_argument("event is required"))?;
-
- let daemon_event = proto_event_to_daemon_event(event)?;
-
- info!(?daemon_event, "received control event");
- self.handle.emit(daemon_event);
-
- Ok(Response::new(SendEventResponse {}))
- }
-}
-
-/// Convert a proto event to a daemon event.
-fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
- match event {
- Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
- Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
- Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
- ids: e.ids.into_iter().map(HistoryId).collect(),
- }),
- Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
- Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
- Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
- }
-}
diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs
deleted file mode 100644
index 625ca205..00000000
--- a/crates/atuin-daemon/src/daemon.rs
+++ /dev/null
@@ -1,458 +0,0 @@
-//! Core daemon infrastructure.
-//!
-//! This module provides the foundational types for building the atuin daemon:
-//!
-//! - [`DaemonState`]: Shared state owned by the daemon
-//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
-//! - [`Component`]: A trait for implementing daemon components
-//! - [`Daemon`]: The main daemon orchestrator
-//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
-
-use std::sync::Arc;
-
-use atuin_client::{
- database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
- settings::Settings,
-};
-use eyre::{Context, Result};
-use tokio::sync::{RwLock, broadcast};
-
-use crate::events::DaemonEvent;
-
-// ============================================================================
-// DaemonState
-// ============================================================================
-
-/// Shared state owned by the daemon.
-///
-/// This contains all the resources that components and services need access to.
-/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
-pub struct DaemonState {
- // Event bus
- event_tx: broadcast::Sender<DaemonEvent>,
-
- // Configuration (mutable - can be reloaded)
- settings: RwLock<Settings>,
-
- // Encryption key (immutable - derived at startup)
- encryption_key: [u8; 32],
-
- // Database handles
- history_db: HistoryDatabase,
- store: SqliteStore,
-}
-
-// ============================================================================
-// DaemonHandle
-// ============================================================================
-
-/// A lightweight handle to the daemon's shared state.
-///
-/// This is the primary way for components, gRPC services, and spawned tasks to
-/// interact with the daemon. It provides access to:
-///
-/// - Event emission and subscription
-/// - Configuration (settings, encryption key)
-/// - Database handles
-///
-/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
-/// around to any code that needs daemon access.
-///
-/// # Example
-///
-/// ```ignore
-/// // Emit an event
-/// handle.emit(DaemonEvent::HistoryPruned);
-///
-/// // Access settings
-/// let settings = handle.settings().await;
-/// let sync_freq = settings.daemon.sync_frequency;
-///
-/// // Access database
-/// let history = handle.history_db().load(id).await?;
-/// ```
-#[derive(Clone)]
-pub struct DaemonHandle {
- state: Arc<DaemonState>,
-}
-
-impl DaemonHandle {
- // ---- Events ----
-
- /// Emit an event to the daemon's event bus.
- ///
- /// This is fire-and-forget - if no receivers are listening (which shouldn't
- /// happen in normal operation), the event is dropped silently.
- pub fn emit(&self, event: DaemonEvent) {
- if let Err(e) = self.state.event_tx.send(event) {
- tracing::warn!("failed to emit event (no receivers?): {e}");
- }
- }
-
- /// Subscribe to the event bus.
- ///
- /// Returns a receiver that will receive all events emitted after this call.
- /// Useful for components that need to listen for events outside of the
- /// normal `handle_event` callback flow.
- pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
- self.state.event_tx.subscribe()
- }
-
- /// Request graceful shutdown of the daemon.
- pub fn shutdown(&self) {
- self.emit(DaemonEvent::ShutdownRequested);
- }
-
- // ---- Configuration ----
-
- /// Get the current settings.
- ///
- /// This acquires a read lock on the settings. For most use cases, clone
- /// the settings if you need to hold onto them.
- pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
- self.state.settings.read().await
- }
-
- /// Reload settings from disk and emit a SettingsReloaded event.
- ///
- /// Components listening for `SettingsReloaded` can then re-read settings
- /// via `handle.settings()` to pick up the changes.
- pub async fn reload_settings(&self) -> Result<()> {
- let new_settings = Settings::new()?;
- self.apply_settings(new_settings).await;
- Ok(())
- }
-
- /// Apply already-loaded settings and emit a SettingsReloaded event.
- ///
- /// Use this when settings have already been loaded (e.g., from a file watcher)
- /// to avoid parsing the config file twice.
- pub async fn apply_settings(&self, settings: Settings) {
- *self.state.settings.write().await = settings;
- self.emit(DaemonEvent::SettingsReloaded);
- tracing::info!("settings applied");
- }
-
- /// Get the encryption key.
- pub fn encryption_key(&self) -> &[u8; 32] {
- &self.state.encryption_key
- }
-
- // ---- Database ----
-
- /// Get a reference to the history database.
- pub fn history_db(&self) -> &HistoryDatabase {
- &self.state.history_db
- }
-
- /// Get a reference to the record store.
- pub fn store(&self) -> &SqliteStore {
- &self.state.store
- }
-}
-
-impl std::fmt::Debug for DaemonHandle {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("DaemonHandle").finish_non_exhaustive()
- }
-}
-
-// ============================================================================
-// Component Trait
-// ============================================================================
-
-/// A daemon component that handles a specific domain.
-///
-/// Components are the building blocks of the daemon. Each component:
-///
-/// - Has a unique name for logging and debugging
-/// - Can optionally expose gRPC services
-/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
-/// - Handles events from the event bus
-/// - Performs cleanup on shutdown
-///
-/// # Lifecycle
-///
-/// 1. **Construction**: Component is created (usually via `new()`)
-/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
-/// 3. **Running**: `handle_event()` is called for each event on the bus
-/// 4. **Shutdown**: `stop()` is called for cleanup
-///
-/// # Example
-///
-/// ```ignore
-/// pub struct MyComponent {
-/// handle: Option<DaemonHandle>,
-/// }
-///
-/// #[async_trait]
-/// impl Component for MyComponent {
-/// fn name(&self) -> &'static str { "my-component" }
-///
-/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
-/// self.handle = Some(handle);
-/// Ok(())
-/// }
-///
-/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
-/// match event {
-/// DaemonEvent::SomeEvent => {
-/// // Handle the event
-/// if let Some(handle) = &self.handle {
-/// handle.emit(DaemonEvent::ResponseEvent);
-/// }
-/// }
-/// _ => {}
-/// }
-/// Ok(())
-/// }
-///
-/// async fn stop(&mut self) -> Result<()> {
-/// Ok(())
-/// }
-/// }
-/// ```
-#[tonic::async_trait]
-pub trait Component: Send + Sync {
- /// Human-readable name for logging and debugging.
- fn name(&self) -> &'static str;
-
- /// Called once at startup.
- ///
- /// Store the handle if you need to emit events or access daemon resources
- /// later. The handle is cheaply cloneable, so feel free to clone it for
- /// spawned tasks.
- async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
-
- /// Handle an incoming event.
- ///
- /// Called for every event on the bus. To emit new events in response,
- /// use the handle stored during `start()`. Events emitted here will be
- /// processed in subsequent event loop iterations.
- async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
-
- /// Called on graceful shutdown.
- ///
- /// Use this to clean up resources, abort spawned tasks, etc.
- async fn stop(&mut self) -> Result<()>;
-}
-
-// ============================================================================
-// Daemon
-// ============================================================================
-
-/// The main daemon orchestrator.
-///
-/// The daemon manages components, runs the event loop, and coordinates startup
-/// and shutdown. It is constructed via [`DaemonBuilder`].
-///
-/// # Event Loop
-///
-/// The daemon runs a simple event loop:
-///
-/// 1. Wait for an event on the bus
-/// 2. Dispatch the event to all components (in registration order)
-/// 3. Components may emit new events in response
-/// 4. Repeat until `ShutdownRequested` is received
-///
-/// Events emitted during handling are queued and processed in subsequent
-/// iterations, ensuring the loop eventually drains.
-pub struct Daemon {
- components: Vec<Box<dyn Component>>,
- handle: DaemonHandle,
-}
-
-impl Daemon {
- /// Create a new daemon builder.
- pub fn builder(settings: Settings) -> DaemonBuilder {
- DaemonBuilder::new(settings)
- }
-
- /// Get a clone of the daemon handle.
- ///
- /// The handle can be used to emit events, access settings, etc.
- pub fn handle(&self) -> DaemonHandle {
- self.handle.clone()
- }
-
- /// Start all components.
- ///
- /// This must be called before `run_event_loop()`. It initializes all
- /// registered components with the daemon handle.
- pub async fn start_components(&mut self) -> Result<()> {
- for component in &mut self.components {
- tracing::info!(component = component.name(), "starting component");
- component
- .start(self.handle.clone())
- .await
- .with_context(|| format!("failed to start component: {}", component.name()))?;
- }
- Ok(())
- }
-
- /// Run the daemon event loop.
- ///
- /// This processes events until a ShutdownRequested event is received.
- /// Components must be started first via `start_components()`.
- pub async fn run_event_loop(&mut self) -> Result<()> {
- let mut event_rx = self.handle.subscribe();
- loop {
- match event_rx.recv().await {
- Ok(DaemonEvent::ShutdownRequested) => {
- tracing::info!("shutdown requested, stopping daemon");
- break;
- }
- Ok(event) => {
- tracing::debug!(?event, "processing event");
- self.dispatch_event(&event).await;
- }
- Err(broadcast::error::RecvError::Lagged(n)) => {
- tracing::warn!(
- skipped = n,
- "event receiver lagged, some events were dropped"
- );
- }
- Err(broadcast::error::RecvError::Closed) => {
- tracing::info!("event bus closed, stopping daemon");
- break;
- }
- }
- }
- Ok(())
- }
-
- /// Stop all components.
- ///
- /// This performs graceful shutdown of all components.
- pub async fn stop_components(&mut self) {
- for component in &mut self.components {
- tracing::info!(component = component.name(), "stopping component");
- if let Err(e) = component.stop().await {
- tracing::error!(
- component = component.name(),
- error = ?e,
- "error stopping component"
- );
- }
- }
- tracing::info!("all components stopped");
- }
-
- /// Run the daemon.
- ///
- /// This is a convenience method that starts components, runs the event loop,
- /// and handles shutdown. It does not return until the daemon is shut down.
- pub async fn run(mut self) -> Result<()> {
- self.start_components().await?;
- self.run_event_loop().await?;
- self.stop_components().await;
- tracing::info!("daemon stopped");
- Ok(())
- }
-
- async fn dispatch_event(&mut self, event: &DaemonEvent) {
- for component in &mut self.components {
- if let Err(e) = component.handle_event(event).await {
- tracing::error!(
- component = component.name(),
- error = ?e,
- "error handling event"
- );
- }
- }
- }
-}
-
-// ============================================================================
-// DaemonBuilder
-// ============================================================================
-
-/// Builder for constructing a [`Daemon`].
-///
-/// # Example
-///
-/// ```ignore
-/// let daemon = Daemon::builder(settings)
-/// .store(store)
-/// .history_db(history_db)
-/// .component(HistoryComponent::new())
-/// .component(SearchComponent::new())
-/// .component(SyncComponent::new())
-/// .build()
-/// .await?;
-///
-/// daemon.run().await?;
-/// ```
-pub struct DaemonBuilder {
- settings: Settings,
- store: Option<SqliteStore>,
- history_db: Option<HistoryDatabase>,
- components: Vec<Box<dyn Component>>,
-}
-
-impl DaemonBuilder {
- /// Create a new daemon builder with the given settings.
- pub fn new(settings: Settings) -> Self {
- Self {
- settings,
- store: None,
- history_db: None,
- components: Vec::new(),
- }
- }
-
- /// Set the record store.
- pub fn store(mut self, store: SqliteStore) -> Self {
- self.store = Some(store);
- self
- }
-
- /// Set the history database.
- pub fn history_db(mut self, db: HistoryDatabase) -> Self {
- self.history_db = Some(db);
- self
- }
-
- /// Register a component.
- ///
- /// Components are started in registration order and stopped in reverse order.
- pub fn component(mut self, component: impl Component + 'static) -> Self {
- self.components.push(Box::new(component));
- self
- }
-
- /// Build the daemon.
- ///
- /// This loads the encryption key and creates the daemon state.
- pub async fn build(self) -> Result<Daemon> {
- let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
- let history_db = self
- .history_db
- .ok_or_else(|| eyre::eyre!("history_db is required"))?;
-
- // Load encryption key
- let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
- .context("could not load encryption key")?
- .into();
-
- // Create the event bus
- let (event_tx, _) = broadcast::channel(64);
-
- // Create the shared state
- let state = Arc::new(DaemonState {
- event_tx,
- settings: RwLock::new(self.settings),
- encryption_key,
- history_db,
- store,
- });
-
- // Create the handle (just a reference to the state)
- let handle = DaemonHandle { state };
-
- Ok(Daemon {
- components: self.components,
- handle,
- })
- }
-}
diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs
deleted file mode 100644
index 4e6c6ff3..00000000
--- a/crates/atuin-daemon/src/events.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-//! Daemon events.
-//!
-//! Events are the primary communication mechanism within the daemon.
-//! Components emit events to notify others of state changes, and handle
-//! events to react to changes elsewhere in the system.
-//!
-//! External processes (like CLI commands) can also inject events via the
-//! Control gRPC service.
-
-use atuin_client::history::{History, HistoryId};
-use atuin_common::record::RecordId;
-
-/// Events that flow through the daemon's event bus.
-///
-/// Events are broadcast to all components. Each component decides which
-/// events it cares about in its `handle_event` implementation.
-#[derive(Debug, Clone)]
-pub enum DaemonEvent {
- // ---- History lifecycle ----
- /// A command has started running.
- HistoryStarted(History),
-
- /// A command has finished running.
- HistoryEnded(History),
-
- // ---- Sync ----
- /// Records were synced from the server.
- ///
- /// The search component uses this to update its index with new history.
- RecordsAdded(Vec<RecordId>),
-
- /// Sync completed successfully.
- SyncCompleted {
- /// Number of records uploaded.
- uploaded: usize,
- /// Number of records downloaded.
- downloaded: usize,
- },
-
- /// Sync failed.
- SyncFailed {
- /// Error message describing what went wrong.
- error: String,
- },
-
- /// Request an immediate sync (external trigger).
- ForceSync,
-
- // ---- External commands ----
- /// History was pruned - search index needs a full rebuild.
- ///
- /// Emitted when the user runs `atuin history prune` or similar.
- HistoryPruned,
-
- /// History was rebuilt - search index needs a full rebuild.
- ///
- /// Emitted when the user runs `atuin store rebuild history` or similar.
- HistoryRebuilt,
-
- /// Specific history items were deleted.
- ///
- /// The search component should remove these from its index.
- HistoryDeleted {
- /// IDs of the deleted history entries.
- ids: Vec<HistoryId>,
- },
-
- /// Settings have changed, components should reload if needed.
- SettingsReloaded,
-
- // ---- Lifecycle ----
- /// Request graceful shutdown of the daemon.
- ShutdownRequested,
-}
diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs
deleted file mode 100644
index b71853df..00000000
--- a/crates/atuin-daemon/src/history/mod.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-//! History module for the daemon gRPC history service.
-//!
-//! This module contains the proto-generated types for the history gRPC service.
-
-// Include the generated proto code
-tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs
deleted file mode 100644
index 27d3932b..00000000
--- a/crates/atuin-daemon/src/lib.rs
+++ /dev/null
@@ -1,136 +0,0 @@
-use atuin_client::database::Sqlite as HistoryDatabase;
-use atuin_client::record::sqlite_store::SqliteStore;
-use atuin_client::settings::{Settings, watcher::global_settings_watcher};
-use eyre::Result;
-
-pub mod client;
-pub mod components;
-pub mod control;
-pub mod daemon;
-pub mod events;
-pub mod history;
-pub mod search;
-pub mod semantic;
-pub mod server;
-
-// Re-export core daemon types for convenience
-pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle};
-pub use events::DaemonEvent;
-
-// Re-export components
-pub use components::{HistoryComponent, SearchComponent, SemanticComponent, SyncComponent};
-
-// Re-export client helpers
-pub use client::{ControlClient, SemanticClient, emit_event, emit_event_with_settings};
-
-/// Boot the daemon using the new component-based architecture.
-///
-/// This creates a daemon with the standard components (history, search, sync),
-/// starts the gRPC server with their services, and runs the event loop.
-pub async fn boot(
- settings: Settings,
- store: SqliteStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- // Create the components
- let history_component = HistoryComponent::new();
- let search_component = SearchComponent::new();
- let semantic_component = SemanticComponent::new();
- let sync_component = SyncComponent::new();
-
- // Get the gRPC services before moving components into the daemon
- // (The services share state with the components via Arc)
- let history_service = history_component.grpc_service();
- let search_service = search_component.grpc_service();
- let semantic_service = semantic_component.grpc_service();
-
- // Build the daemon
- let mut daemon = Daemon::builder(settings.clone())
- .store(store)
- .history_db(history_db)
- .component(history_component)
- .component(search_component)
- .component(semantic_component)
- .component(sync_component)
- .build()
- .await?;
-
- // Get a handle for the control service and gRPC server shutdown
- let handle = daemon.handle();
-
- // Create the control service
- let control_service = control::ControlService::new(handle.clone());
-
- // Start all components first (so gRPC services can work)
- daemon.start_components().await?;
-
- // Spawn config file watcher to reload settings on changes
- if let Ok(watcher) = global_settings_watcher() {
- let mut settings_rx = watcher.subscribe();
- let watcher_handle = handle.clone();
- tokio::spawn(async move {
- tracing::info!("config file watcher started");
- while settings_rx.changed().await.is_ok() {
- // Use the already-loaded settings from the watcher
- // (avoids parsing the config file twice)
- let new_settings = (*settings_rx.borrow()).clone();
- watcher_handle.apply_settings((*new_settings).clone()).await;
- }
- tracing::debug!("config file watcher stopped");
- });
- } else {
- tracing::warn!(
- "failed to start config file watcher; settings changes will require daemon restart"
- );
- }
-
- // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM
- let signal_handle = handle.clone();
- tokio::spawn(async move {
- shutdown_signal().await;
- tracing::info!("received shutdown signal");
- signal_handle.shutdown();
- });
-
- // Start the gRPC server in the background
- server::run_grpc_server(
- settings,
- history_service,
- search_service,
- semantic_service,
- control_service.into_server(),
- handle,
- )
- .await?;
-
- // Run the daemon event loop
- daemon.run_event_loop().await?;
-
- // Stop all components on shutdown
- daemon.stop_components().await;
-
- tracing::info!("daemon shut down complete");
- Ok(())
-}
-
-/// Wait for a shutdown signal (Ctrl+C or SIGTERM).
-#[cfg(unix)]
-async fn shutdown_signal() {
- let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .expect("failed to register sigterm handler");
- let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
- .expect("failed to register sigint handler");
-
- tokio::select! {
- _ = term.recv() => {},
- _ = int.recv() => {},
- }
-}
-
-/// Wait for a shutdown signal (Ctrl+C).
-#[cfg(not(unix))]
-async fn shutdown_signal() {
- tokio::signal::ctrl_c()
- .await
- .expect("failed to listen for ctrl+c");
-}
diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs
deleted file mode 100644
index bb155979..00000000
--- a/crates/atuin-daemon/src/search/index.rs
+++ /dev/null
@@ -1,683 +0,0 @@
-//! Search index with frecency-based ranking.
-//!
-//! This module provides a deduplicated search index where each unique command
-//! is stored once, with metadata about all its invocations. This enables:
-//!
-//! - Efficient fuzzy matching (fewer items to match)
-//! - Frecency-based ranking (frequency + recency)
-//! - Dynamic filtering by directory, host, session, etc.
-
-use std::{
- collections::{HashMap, HashSet},
- sync::Arc,
-};
-
-use atuin_client::history::{History, is_known_agent};
-use atuin_client::settings::Search;
-use atuin_nucleo::{Injector, Nucleo, pattern};
-use dashmap::DashMap;
-use lasso::{Spur, ThreadedRodeo};
-use time::OffsetDateTime;
-use tokio::sync::RwLock;
-use tracing::{Level, instrument};
-use uuid::Uuid;
-
-use crate::components::search::with_trailing_slash;
-
-/// Parse a UUID string into a 16-byte array.
-/// Returns None if the string is not a valid UUID.
-fn parse_uuid_bytes(s: &str) -> Option<[u8; 16]> {
- Uuid::parse_str(s).ok().map(|u| *u.as_bytes())
-}
-
-/// Format a 16-byte array as a UUID string.
-fn format_uuid_bytes(bytes: &[u8; 16]) -> String {
- Uuid::from_bytes(*bytes).to_string()
-}
-
-/// Pre-computed frecency data for O(1) lookup.
-#[derive(Debug, Clone, Default)]
-pub struct FrecencyData {
- /// Total number of times this command was used.
- pub count: u32,
- /// Most recent usage timestamp (unix seconds).
- pub last_used: i64,
-}
-
-impl FrecencyData {
- /// Record a new usage of this command.
- pub fn record_use(&mut self, timestamp: i64) {
- self.count += 1;
- if timestamp > self.last_used {
- self.last_used = timestamp;
- }
- }
-
- /// Compute frecency score based on count and recency.
- ///
- /// Uses a decay function where more recent commands score higher.
- /// The formula balances frequency (how often) with recency (how recent).
- ///
- /// Multipliers allow tuning the relative weights:
- /// - `recency_mul`: Multiplier for recency score (default: 1.0)
- /// - `frequency_mul`: Multiplier for frequency score (default: 1.0)
- ///
- /// A multiplier of 0.0 disables that component, 1.0 is unchanged, 2.0 doubles weight.
- /// Values like 0.5 reduce weight by half, 1.5 increases by 50%, etc.
- #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")]
- pub fn compute(&self, now: i64, recency_mul: f64, frequency_mul: f64) -> u32 {
- if self.count == 0 {
- return 0;
- }
-
- // Time-based decay: score decreases as time passes
- let age_seconds = (now - self.last_used).max(0) as u64;
- let age_hours = age_seconds / 3600;
-
- // Decay factor: recent commands get higher scores
- // - Last hour: multiplier ~1.0
- // - Last day: multiplier ~0.5
- // - Last week: multiplier ~0.1
- // - Older: multiplier approaches 0
- let recency_score: f64 = match age_hours {
- 0 => 100.0,
- 1..=6 => 90.0,
- 7..=24 => 70.0,
- 25..=72 => 50.0,
- 73..=168 => 30.0,
- 169..=720 => 15.0,
- _ => 5.0,
- };
-
- // Frequency boost: more uses = higher score (with diminishing returns)
- let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0);
-
- // Apply multipliers and combine scores, then round to u32
- ((recency_score * recency_mul) + (frequency_score * frequency_mul)).round() as u32
- }
-}
-
-/// Data for a unique command.
-pub struct CommandData {
- /// History ID of the most recent invocation (16-byte UUID).
- most_recent_id: [u8; 16],
- /// Timestamp of the most recent invocation.
- most_recent_timestamp: i64,
- /// Pre-computed global frecency.
- pub global_frecency: FrecencyData,
-
- // Pre-computed indexes for O(1) filter lookups
- // Using HashSet instead of DashSet since CommandData lives inside DashMap (already synchronized)
- /// All directories where this command has been run (interned keys).
- directories: HashSet<Spur>,
- /// All hostnames where this command has been run (interned keys).
- hosts: HashSet<Spur>,
- /// All sessions where this command has been run (as 16-byte UUIDs).
- sessions: HashSet<[u8; 16]>,
-}
-
-impl CommandData {
- /// Create a new CommandData from a history entry.
- /// Returns None if the history entry has invalid UUIDs.
- pub fn new(history: &History, interner: &ThreadedRodeo) -> Option<Self> {
- let history_id = parse_uuid_bytes(&history.id.0)?;
- let session = parse_uuid_bytes(&history.session)?;
- let timestamp = history.timestamp.unix_timestamp();
-
- let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
- let host_key = interner.get_or_intern(&history.hostname);
-
- let mut directories = HashSet::new();
- directories.insert(dir_key);
-
- let mut hosts = HashSet::new();
- hosts.insert(host_key);
-
- let mut sessions = HashSet::new();
- sessions.insert(session);
-
- let mut global_frecency = FrecencyData::default();
- global_frecency.record_use(timestamp);
-
- Some(Self {
- most_recent_id: history_id,
- most_recent_timestamp: timestamp,
- global_frecency,
- directories,
- hosts,
- sessions,
- })
- }
-
- /// Add an invocation from a history entry.
- /// Returns false if the history entry has invalid UUIDs.
- pub fn add_invocation(&mut self, history: &History, interner: &ThreadedRodeo) -> bool {
- let Some(history_id) = parse_uuid_bytes(&history.id.0) else {
- return false;
- };
- let Some(session) = parse_uuid_bytes(&history.session) else {
- return false;
- };
-
- let timestamp = history.timestamp.unix_timestamp();
-
- // Update global frecency
- self.global_frecency.record_use(timestamp);
-
- // Update pre-computed indexes for O(1) filter lookups
- let dir_key = interner.get_or_intern(with_trailing_slash(&history.cwd));
- self.directories.insert(dir_key);
- self.hosts.insert(interner.get_or_intern(&history.hostname));
- self.sessions.insert(session);
-
- // Update most recent if this invocation is newer
- if timestamp > self.most_recent_timestamp {
- self.most_recent_id = history_id;
- self.most_recent_timestamp = timestamp;
- }
-
- true
- }
-
- /// Get the most recent history ID for this command.
- pub fn most_recent_id(&self) -> String {
- format_uuid_bytes(&self.most_recent_id)
- }
-
- /// Check if any invocation matches a directory filter (exact match).
- /// O(1) lookup using pre-computed index.
- pub fn has_invocation_in_dir(&self, dir: &str, interner: &ThreadedRodeo) -> bool {
- interner
- .get(dir)
- .is_some_and(|spur| self.directories.contains(&spur))
- }
-
- /// Check if any invocation matches a directory prefix (workspace/git root).
- /// O(n) where n = number of unique directories for this command.
- pub fn has_invocation_in_workspace(&self, prefix: &str, interner: &ThreadedRodeo) -> bool {
- self.directories
- .iter()
- .any(|&spur| interner.resolve(&spur).starts_with(prefix))
- }
-
- /// Check if any invocation matches a hostname.
- /// O(1) lookup using pre-computed index.
- pub fn has_invocation_on_host(&self, hostname: &str, interner: &ThreadedRodeo) -> bool {
- interner
- .get(hostname)
- .is_some_and(|spur| self.hosts.contains(&spur))
- }
-
- /// Check if any invocation matches a session.
- /// O(1) lookup using pre-computed index.
- pub fn has_invocation_in_session(&self, session: &str) -> bool {
- parse_uuid_bytes(session).is_some_and(|bytes| self.sessions.contains(&bytes))
- }
-}
-
-/// Filter mode for search queries.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum IndexFilterMode {
- /// No filtering - search all commands.
- Global,
- /// Filter to commands run in a specific directory.
- Directory(String),
- /// Filter to commands run in a workspace (directory prefix).
- Workspace(String),
- /// Filter to commands run on a specific host.
- Host(String),
- /// Filter to commands run in a specific session.
- Session(String),
-}
-
-/// Context for search queries.
-#[derive(Debug, Clone, Default)]
-pub struct QueryContext {
- pub cwd: Option<String>,
- pub git_root: Option<String>,
- pub hostname: Option<String>,
- pub session_id: Option<String>,
-}
-
-/// Shareable frecency map: command -> frecency score.
-/// Wrapped in Arc for zero-copy sharing with scorer callbacks.
-type FrecencyMap = Arc<HashMap<Arc<str>, u32>>;
-
-/// A deduplicated search index with frecency-based ranking.
-///
-/// Commands are stored by their text, with metadata about all invocations.
-/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
-///
-/// Global frecency is precomputed by a background task and used for scoring.
-/// If frecency data is not available, search still works but without frecency ranking;
-/// although this should never happen due to precomputing the frecency map.
-pub struct SearchIndex {
- /// Map from command text to command data.
- /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
- /// Keys are Arc<str> to enable zero-copy sharing with frecency_map.
- commands: Arc<DashMap<Arc<str>, CommandData>>,
- /// Nucleo fuzzy matcher - items are command strings.
- nucleo: RwLock<Nucleo<String>>,
- /// Injector for adding new commands to Nucleo.
- injector: Injector<String>,
- /// Precomputed global frecency map. Updated by background task.
- frecency_map: RwLock<Option<FrecencyMap>>,
- /// String interner for deduplicating cwd, hostname, and directory paths.
- interner: Arc<ThreadedRodeo>,
-}
-
-impl SearchIndex {
- /// Create a new empty search index.
- pub fn new() -> Self {
- let nucleo_config = atuin_nucleo::Config::DEFAULT;
- // Single column for command text
- let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
- let injector = nucleo.injector();
-
- Self {
- commands: Arc::new(DashMap::new()),
- nucleo: RwLock::new(nucleo),
- injector,
- frecency_map: RwLock::new(None),
- interner: Arc::new(ThreadedRodeo::new()),
- }
- }
-
- /// Add a history entry to the index.
- ///
- /// If the command already exists, updates its invocation data.
- /// If it's a new command, adds it to both the map and Nucleo.
- pub fn add_history(&self, history: &History) {
- if is_known_agent(&history.author) {
- return;
- }
-
- let command = history.command.as_str();
-
- // DashMap with Arc<str> keys can be looked up with &str via Borrow trait
- if let Some(mut entry) = self.commands.get_mut(command) {
- // Existing command - just update invocations
- entry.add_invocation(history, &self.interner);
- } else {
- // New command - create Arc<str> once and share it
- let Some(data) = CommandData::new(history, &self.interner) else {
- return; // Invalid UUIDs, skip this entry
- };
- let command_arc: Arc<str> = command.into();
- self.commands.insert(Arc::clone(&command_arc), data);
- // Nucleo still needs String (unavoidable copy for fuzzy matching)
- self.injector.push(command_arc.to_string(), |cmd, cols| {
- cols[0] = cmd.clone().into();
- });
- }
- // Note: frecency_map is rebuilt by background task, not invalidated here
- }
-
- /// Add multiple history entries to the index.
- pub fn add_histories(&self, histories: &[History]) {
- for history in histories {
- self.add_history(history);
- }
- }
-
- /// Get the number of unique commands in the index.
- pub fn command_count(&self) -> usize {
- self.commands.len()
- }
-
- /// Get the number of items in Nucleo (should match command_count).
- pub async fn nucleo_item_count(&self) -> u32 {
- self.nucleo.read().await.snapshot().item_count()
- }
-
- /// Search for commands matching a query.
- ///
- /// Returns a list of history IDs (most recent invocation per command).
- /// Uses precomputed global frecency for scoring if available.
- #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))]
- pub async fn search(
- &self,
- query: &str,
- filter_mode: IndexFilterMode,
- _context: &QueryContext,
- limit: u32,
- ) -> Vec<String> {
- let mut nucleo = self.nucleo.write().await;
-
- // Get precomputed frecency map (may be None if not yet computed)
- let frecency_map = self.frecency_map.read().await.clone();
-
- // Build filter based on mode
- let filter = self.build_filter(&filter_mode);
- nucleo.set_filter(filter);
-
- // Build scorer from precomputed frecency (or None if not available)
- let scorer = Self::build_scorer(frecency_map);
- nucleo.set_scorer(scorer);
-
- // Update pattern
- nucleo.pattern.reparse(
- 0,
- query,
- pattern::CaseMatching::Smart,
- pattern::Normalization::Smart,
- false,
- );
-
- tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
- // Tick until complete
- while nucleo.tick(10).running {}
- });
-
- // Collect results
- let snapshot = nucleo.snapshot();
- let matched_count = snapshot.matched_item_count().min(limit);
-
- tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
- snapshot
- .matched_items(..matched_count)
- .filter_map(|item| {
- let cmd = item.data;
- // DashMap<Arc<str>, _>::get accepts &str via Borrow trait
- self.commands
- .get(cmd.as_str())
- .map(|data| data.most_recent_id())
- })
- .collect()
- })
- }
-
- /// Rebuild the global frecency map.
- ///
- /// This should be called by a background task periodically.
- /// The map is used for scoring search results.
- ///
- /// Uses multipliers from search settings:
- /// - `recency_score_multiplier`: Weight for recency component
- /// - `frequency_score_multiplier`: Weight for frequency component
- /// - `frecency_score_multiplier`: Overall multiplier for final score
- #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")]
- pub async fn rebuild_frecency(&self, search_settings: &Search) {
- let now = OffsetDateTime::now_utc().unix_timestamp();
- let mut frecency_map: HashMap<Arc<str>, u32> = HashMap::new();
-
- // Clamp multipliers to non-negative values to prevent broken frecency ranking
- // (negative values would produce unexpected results when cast to u32)
- let recency_mul = search_settings.recency_score_multiplier.max(0.0);
- let frequency_mul = search_settings.frequency_score_multiplier.max(0.0);
- let frecency_mul = search_settings.frecency_score_multiplier.max(0.0);
-
- for entry in self.commands.iter() {
- let frecency = entry
- .global_frecency
- .compute(now, recency_mul, frequency_mul);
- // Apply overall frecency multiplier and round to u32
- let frecency = (frecency as f64 * frecency_mul).round() as u32;
- // Arc::clone is cheap - just increments reference count
- frecency_map.insert(Arc::clone(entry.key()), frecency);
- }
-
- *self.frecency_map.write().await = Some(Arc::new(frecency_map));
- }
-
- /// Build filter predicate for the given mode.
- fn build_filter(&self, mode: &IndexFilterMode) -> Option<atuin_nucleo::Filter<String>> {
- // For Global mode, no filter needed
- if matches!(mode, IndexFilterMode::Global) {
- return None;
- }
-
- // Pre-compute which commands pass the filter
- // Use HashSet<String> for the short-lived filter (simpler than Arc lookup)
- let passing_commands: Arc<HashSet<String>> = {
- let mut set = HashSet::new();
- for entry in self.commands.iter() {
- let passes = match mode {
- IndexFilterMode::Global => unreachable!(),
- IndexFilterMode::Directory(dir) => {
- entry.has_invocation_in_dir(dir, &self.interner)
- }
- IndexFilterMode::Workspace(prefix) => {
- entry.has_invocation_in_workspace(prefix, &self.interner)
- }
- IndexFilterMode::Host(hostname) => {
- entry.has_invocation_on_host(hostname, &self.interner)
- }
- IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
- };
- if passes {
- // Convert Arc<str> to String for filter lookup
- set.insert(entry.key().to_string());
- }
- }
- Arc::new(set)
- };
-
- Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
- }
-
- /// Build scorer from precomputed frecency map.
- ///
- /// Returns None if frecency map is not available (search still works, just without frecency ranking).
- fn build_scorer(frecency_map: Option<FrecencyMap>) -> Option<atuin_nucleo::Scorer<String>> {
- let map = frecency_map?;
- Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
- // HashMap<Arc<str>, _>::get accepts &str via Borrow trait
- let frecency = map.get(cmd.as_str()).copied().unwrap_or(0);
- fuzzy_score + frecency
- }))
- }
-}
-
-impl Default for SearchIndex {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use time::macros::datetime;
-
- fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History {
- History::import()
- .timestamp(timestamp)
- .command(command)
- .cwd(cwd)
- .build()
- .into()
- }
-
- #[test]
- fn frecency_data_compute() {
- let now = 1000000i64;
-
- // Recent command (with default multipliers of 1.0)
- let recent = FrecencyData {
- count: 5,
- last_used: now - 60, // 1 minute ago
- };
- assert!(recent.compute(now, 1.0, 1.0) > 100); // High score
-
- // Old command
- let old = FrecencyData {
- count: 5,
- last_used: now - 86400 * 30, // 30 days ago
- };
- assert!(old.compute(now, 1.0, 1.0) < recent.compute(now, 1.0, 1.0));
-
- // Frequently used old command
- let frequent_old = FrecencyData {
- count: 100,
- last_used: now - 86400 * 7, // 1 week ago
- };
- // Should still have decent score due to frequency
- assert!(frequent_old.compute(now, 1.0, 1.0) > 50);
- }
-
- #[test]
- fn frecency_data_compute_with_multipliers() {
- let now = 1000000i64;
-
- let data = FrecencyData {
- count: 5,
- last_used: now - 60, // 1 minute ago (recency_score = 100)
- };
-
- // Default multipliers (1.0, 1.0)
- let default_score = data.compute(now, 1.0, 1.0);
-
- // Double recency weight
- let double_recency = data.compute(now, 2.0, 1.0);
- assert!(double_recency > default_score);
-
- // Double frequency weight
- let double_frequency = data.compute(now, 1.0, 2.0);
- assert!(double_frequency > default_score);
-
- // Zero out recency (only frequency counts)
- let no_recency = data.compute(now, 0.0, 1.0);
- assert!(no_recency < default_score);
-
- // Zero out frequency (only recency counts)
- let no_frequency = data.compute(now, 1.0, 0.0);
- assert!(no_frequency < default_score);
-
- // Zero both (should be zero)
- let no_score = data.compute(now, 0.0, 0.0);
- assert_eq!(no_score, 0);
-
- // Fractional multipliers
- let half_recency = data.compute(now, 0.5, 1.0);
- assert!(half_recency < default_score);
- assert!(half_recency > no_recency);
-
- // 1.5x multiplier
- let boost_recency = data.compute(now, 1.5, 1.0);
- assert!(boost_recency > default_score);
- assert!(boost_recency < double_recency);
- }
-
- #[test]
- fn command_data_add_invocation() {
- let interner = ThreadedRodeo::new();
-
- let (dir1, dir2) = if cfg!(windows) {
- ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
- } else {
- ("/home/user/project", "/home/user/other")
- };
-
- let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
- let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
-
- let mut data = CommandData::new(&history1, &interner).unwrap();
- assert_eq!(data.global_frecency.count, 1);
- let id1 = data.most_recent_id();
-
- data.add_invocation(&history2, &interner);
- assert_eq!(data.global_frecency.count, 2);
-
- // Most recent ID should update to history2 (newer timestamp)
- let id2 = data.most_recent_id();
- assert_ne!(id1, id2);
- }
-
- #[test]
- fn command_data_filters() {
- let interner = ThreadedRodeo::new();
-
- let (dir1, dir2) = if cfg!(windows) {
- ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
- } else {
- ("/home/user/project", "/home/user/other")
- };
-
- let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
- let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
-
- let mut data = CommandData::new(&h1, &interner).unwrap();
- data.add_invocation(&h2, &interner);
-
- let (check1, check2, check3) = if cfg!(windows) {
- (
- with_trailing_slash("C:\\Users\\User\\project"),
- with_trailing_slash("C:\\Users\\User\\other"),
- with_trailing_slash("C:\\Users\\User\\missing"),
- )
- } else {
- (
- with_trailing_slash("/home/user/project"),
- with_trailing_slash("/home/user/other"),
- with_trailing_slash("/home/user/missing"),
- )
- };
-
- assert!(data.has_invocation_in_dir(&check1, &interner));
- assert!(data.has_invocation_in_dir(&check2, &interner));
- assert!(!data.has_invocation_in_dir(&check3, &interner));
-
- let (check1, check2, check3) = if cfg!(windows) {
- (
- with_trailing_slash("C:\\Users\\User"),
- with_trailing_slash("C:\\Users"),
- with_trailing_slash("C:\\Users\\User\\var"),
- )
- } else {
- (
- with_trailing_slash("/home/user"),
- with_trailing_slash("/home"),
- with_trailing_slash("/var"),
- )
- };
-
- assert!(data.has_invocation_in_workspace(&check1, &interner));
- assert!(data.has_invocation_in_workspace(&check2, &interner));
- assert!(!data.has_invocation_in_workspace(&check3, &interner));
- }
-
- #[tokio::test]
- async fn search_index_add_and_search() {
- let index = SearchIndex::new();
-
- let h1 = make_history(
- "git status",
- "/home/user/project",
- datetime!(2024-01-01 10:00 UTC),
- );
- let h2 = make_history(
- "git commit -m 'test'",
- "/home/user/project",
- datetime!(2024-01-01 10:05 UTC),
- );
- let h3 = make_history(
- "ls -la",
- "/home/user/other",
- datetime!(2024-01-01 10:10 UTC),
- );
-
- index.add_history(&h1);
- index.add_history(&h2);
- index.add_history(&h3);
-
- assert_eq!(index.command_count(), 3);
-
- // Search for "git" - should match 2 commands
- let results = index
- .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
- .await;
- assert_eq!(results.len(), 2);
-
- // Search with directory filter
- let results = index
- .search(
- "",
- IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
- &QueryContext::default(),
- 10,
- )
- .await;
- assert_eq!(results.len(), 2); // git status and git commit
- }
-}
diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs
deleted file mode 100644
index 4d261956..00000000
--- a/crates/atuin-daemon/src/search/mod.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-//! Search module for the daemon gRPC search service.
-//!
-//! This module provides fuzzy search over command history using Nucleo.
-
-mod index;
-
-// Include the generated proto code
-tonic::include_proto!("search");
-
-// Re-export the service and index
-pub use index::{IndexFilterMode, QueryContext, SearchIndex};
diff --git a/crates/atuin-daemon/src/semantic/mod.rs b/crates/atuin-daemon/src/semantic/mod.rs
deleted file mode 100644
index c3511676..00000000
--- a/crates/atuin-daemon/src/semantic/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-//! Semantic command capture gRPC service types.
-
-tonic::include_proto!("semantic");
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
deleted file mode 100644
index b823cff2..00000000
--- a/crates/atuin-daemon/src/server.rs
+++ /dev/null
@@ -1,170 +0,0 @@
-use eyre::Result;
-
-use crate::components::history::HistoryGrpcService;
-use crate::components::search::SearchGrpcService;
-use crate::components::semantic::SemanticGrpcService;
-use crate::control::{ControlService, control_server::ControlServer};
-use crate::daemon::DaemonHandle;
-use crate::history::history_server::HistoryServer;
-use crate::search::search_server::SearchServer;
-use crate::semantic::semantic_server::SemanticServer;
-
-use atuin_client::settings::Settings;
-
-/// Run the gRPC server with the given services.
-///
-/// This starts the gRPC server in the background and returns immediately.
-/// The server will shut down when a ShutdownRequested event is received.
-#[cfg(unix)]
-pub async fn run_grpc_server(
- settings: Settings,
- history_service: HistoryServer<HistoryGrpcService>,
- search_service: SearchServer<SearchGrpcService>,
- semantic_service: SemanticServer<SemanticGrpcService>,
- control_service: ControlServer<ControlService>,
- handle: DaemonHandle,
-) -> Result<()> {
- use tokio::net::UnixListener;
- use tokio_stream::wrappers::UnixListenerStream;
-
- let socket_path = settings.daemon.socket_path.clone();
-
- let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
- #[cfg(target_os = "linux")]
- {
- use eyre::{OptionExt, WrapErr};
- use std::os::unix::net::SocketAddr;
- use std::path::PathBuf;
- tracing::info!("getting systemd socket");
- let listener = listenfd::ListenFd::from_env()
- .take_unix_listener(0)?
- .ok_or_eyre("missing systemd socket")?;
- listener.set_nonblocking(true)?;
- let actual_path: Result<PathBuf, eyre::Report> = listener
- .local_addr()
- .context("getting systemd socket's path")
- .and_then(|addr: SocketAddr| {
- addr.as_pathname()
- .ok_or_eyre("systemd socket missing path")
- .map(|path: &std::path::Path| path.to_owned())
- });
- match actual_path {
- Ok(actual_path) => {
- tracing::info!("listening on systemd socket: {actual_path:?}");
- if actual_path != std::path::Path::new(&socket_path) {
- tracing::warn!(
- "systemd socket is not at configured client path: {socket_path:?}"
- );
- }
- }
- Err(err) => {
- tracing::warn!(
- "could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}"
- );
- }
- }
- (UnixListener::from_std(listener)?, false)
- }
- #[cfg(not(target_os = "linux"))]
- unreachable!()
- } else {
- tracing::info!("listening on unix socket {socket_path:?}");
- (UnixListener::bind(socket_path.clone())?, true)
- };
-
- let uds_stream = UnixListenerStream::new(uds);
-
- // Create shutdown signal from daemon handle
- let shutdown_signal = async move {
- let mut rx = handle.subscribe();
- loop {
- use crate::DaemonEvent;
-
- match rx.recv().await {
- Ok(DaemonEvent::ShutdownRequested) => break,
- Ok(_) => continue,
- Err(_) => break, // Channel closed
- }
- }
- if cleanup {
- eprintln!("Removing socket...");
- if let Err(e) = std::fs::remove_file(&socket_path)
- && e.kind() != std::io::ErrorKind::NotFound
- {
- eprintln!("failed to remove socket: {e}");
- }
- }
- eprintln!("Shutting down gRPC server...");
- };
-
- // Spawn the server in the background
- tokio::spawn(async move {
- use tonic::transport::Server;
-
- if let Err(e) = Server::builder()
- .add_service(history_service)
- .add_service(search_service)
- .add_service(semantic_service)
- .add_service(control_service)
- .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
- .await
- {
- tracing::error!("gRPC server error: {e}");
- }
- });
-
- Ok(())
-}
-
-/// Run the gRPC server with the given services (Windows/TCP version).
-#[cfg(not(unix))]
-pub async fn run_grpc_server(
- settings: Settings,
- history_service: HistoryServer<HistoryGrpcService>,
- search_service: SearchServer<SearchGrpcService>,
- semantic_service: SemanticServer<SemanticGrpcService>,
- control_service: ControlServer<ControlService>,
- handle: DaemonHandle,
-) -> Result<()> {
- use tokio::net::TcpListener;
- use tokio_stream::wrappers::TcpListenerStream;
- use tonic::transport::Server;
-
- let port = settings.daemon.tcp_port;
- let url = format!("127.0.0.1:{port}");
- let tcp = TcpListener::bind(&url).await?;
- let tcp_stream = TcpListenerStream::new(tcp);
-
- tracing::info!("listening on tcp port {:?}", port);
-
- // Create shutdown signal from daemon handle
- let shutdown_signal = async move {
- use crate::DaemonEvent;
-
- let mut rx = handle.subscribe();
- loop {
- match rx.recv().await {
- Ok(DaemonEvent::ShutdownRequested) => break,
- Ok(_) => continue,
- Err(_) => break, // Channel closed
- }
- }
- eprintln!("Shutting down gRPC server...");
- };
-
- // Spawn the server in the background
- tokio::spawn(async move {
- if let Err(e) = Server::builder()
- .add_service(history_service)
- .add_service(search_service)
- .add_service(semantic_service)
- .add_service(control_service)
- .serve_with_incoming_shutdown(tcp_stream, shutdown_signal)
- .await
- {
- tracing::error!("gRPC server error: {e}");
- }
- });
-
- Ok(())
-}
diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs
deleted file mode 100644
index 4a91e5cb..00000000
--- a/crates/atuin-daemon/tests/lifecycle.rs
+++ /dev/null
@@ -1,222 +0,0 @@
-//! Integration tests for the daemon server lifecycle.
-//!
-//! Each test spins up a real gRPC server on a temporary unix socket,
-//! connects a client, and exercises the daemon RPCs.
-
-#[cfg(unix)]
-mod unix {
- use std::time::Duration;
-
- use atuin_client::database::Sqlite;
- use atuin_client::record::sqlite_store::SqliteStore;
- use atuin_client::settings::{Settings, init_meta_config_for_testing};
- use atuin_daemon::client::HistoryClient;
- use atuin_daemon::components::HistoryComponent;
- use atuin_daemon::{Daemon, DaemonHandle};
- use tempfile::TempDir;
- use tokio::net::UnixListener;
- use tokio_stream::wrappers::UnixListenerStream;
- use tonic::transport::Server;
-
- /// Spins up a daemon server on a temp socket and returns a connected client,
- /// the daemon handle (for shutdown), and the temp dir (must be held to keep paths alive).
- async fn start_test_daemon() -> (HistoryClient, DaemonHandle, TempDir) {
- let tmp = tempfile::tempdir().unwrap();
-
- let db_path = tmp.path().join("history.db");
- let record_path = tmp.path().join("records.db");
- let key_path = tmp.path().join("key");
- let socket_path = tmp.path().join("test.sock");
- let meta_path = tmp.path().join("meta.db");
-
- // Initialize the meta store config for testing (required for Settings::host_id())
- init_meta_config_for_testing(meta_path.to_str().unwrap(), 5.0);
-
- // Build settings with test paths
- let settings: Settings = Settings::builder()
- .expect("could not build settings builder")
- .set_override("db_path", db_path.to_str().unwrap())
- .expect("failed to set db_path")
- .set_override("record_store_path", record_path.to_str().unwrap())
- .expect("failed to set record_store_path")
- .set_override("key_path", key_path.to_str().unwrap())
- .expect("failed to set key_path")
- .set_override("daemon.socket_path", socket_path.to_str().unwrap())
- .expect("failed to set socket_path")
- .set_override("meta.db_path", meta_path.to_str().unwrap())
- .expect("failed to set meta.db_path")
- .build()
- .expect("could not build settings")
- .try_deserialize()
- .expect("could not deserialize settings");
-
- // Create databases
- let history_db = Sqlite::new(&db_path, 5.0).await.unwrap();
- let store = SqliteStore::new(&record_path, 5.0).await.unwrap();
-
- // Create the history component and get its gRPC service
- let history_component = HistoryComponent::new();
- let history_service = history_component.grpc_service();
-
- // Build and start the daemon
- let mut daemon = Daemon::builder(settings)
- .store(store)
- .history_db(history_db)
- .component(history_component)
- .build()
- .await
- .unwrap();
-
- let handle = daemon.handle();
-
- // Start components (this initializes the history component with the handle)
- daemon.start_components().await.unwrap();
-
- // Start the gRPC server
- let uds = UnixListener::bind(&socket_path).unwrap();
- let stream = UnixListenerStream::new(uds);
-
- let server_handle = handle.clone();
- tokio::spawn(async move {
- let mut rx = server_handle.subscribe();
- Server::builder()
- .add_service(history_service)
- .serve_with_incoming_shutdown(stream, async move {
- loop {
- match rx.recv().await {
- Ok(atuin_daemon::DaemonEvent::ShutdownRequested) => break,
- Ok(_) => continue,
- Err(_) => break,
- }
- }
- })
- .await
- .unwrap();
- });
-
- // Spawn the daemon event loop in the background
- tokio::spawn(async move {
- daemon.run_event_loop().await.unwrap();
- });
-
- // Give the server a moment to bind.
- tokio::time::sleep(Duration::from_millis(50)).await;
-
- let client = HistoryClient::new(socket_path.to_string_lossy().to_string())
- .await
- .unwrap();
-
- (client, handle, tmp)
- }
-
- #[tokio::test]
- async fn test_status() {
- let (mut client, _handle, _tmp) = start_test_daemon().await;
-
- let status = client.status().await.unwrap();
- assert!(status.healthy);
- assert_eq!(status.version, env!("CARGO_PKG_VERSION"));
- assert_eq!(status.protocol, 1);
- assert!(status.pid > 0);
- }
-
- #[tokio::test]
- async fn test_start_end_history() {
- use atuin_client::history::History;
-
- let (mut client, _handle, _tmp) = start_test_daemon().await;
-
- let history = History::daemon()
- .timestamp(time::OffsetDateTime::now_utc())
- .command("echo hello".to_string())
- .cwd("/tmp".to_string())
- .session("test-session".to_string())
- .hostname("test-host".to_string())
- .build()
- .into();
-
- let start_reply = client.start_history(history).await.unwrap();
- assert!(!start_reply.id.is_empty());
-
- let end_reply = client
- .end_history(start_reply.id, 1_000_000, 0)
- .await
- .unwrap();
- assert!(!end_reply.id.is_empty());
- }
-
- #[tokio::test]
- async fn test_tail_history_streams_started_and_ended_events() {
- use atuin_client::history::History;
- use atuin_daemon::history::HistoryEventKind;
-
- let (mut client, _handle, _tmp) = start_test_daemon().await;
- let mut stream = client.tail_history().await.unwrap();
-
- let history = History::daemon()
- .timestamp(time::OffsetDateTime::now_utc())
- .command("git status".to_string())
- .cwd("/tmp/repo".to_string())
- .session("tail-session".to_string())
- .hostname("test-host:ellie".to_string())
- .author("claude".to_string())
- .intent("inspect repository state".to_string())
- .build()
- .into();
-
- let start_reply = client.start_history(history).await.unwrap();
-
- let started = stream.message().await.unwrap().unwrap();
- assert_eq!(
- HistoryEventKind::try_from(started.kind).unwrap(),
- HistoryEventKind::Started
- );
- let started_history = started.history.unwrap();
- assert_eq!(started_history.id, start_reply.id);
- assert_eq!(started_history.command, "git status");
- assert_eq!(started_history.cwd, "/tmp/repo");
- assert_eq!(started_history.hostname, "test-host:ellie");
- assert_eq!(started_history.author, "claude");
- assert_eq!(started_history.intent, "inspect repository state");
-
- client
- .end_history(start_reply.id.clone(), 1_000_000, 0)
- .await
- .unwrap();
-
- let ended = stream.message().await.unwrap().unwrap();
- assert_eq!(
- HistoryEventKind::try_from(ended.kind).unwrap(),
- HistoryEventKind::Ended
- );
- let ended_history = ended.history.unwrap();
- assert_eq!(ended_history.id, start_reply.id);
- assert_eq!(ended_history.exit, 0);
- assert_eq!(ended_history.duration, 1_000_000);
- }
-
- #[tokio::test]
- async fn test_end_unknown_history_fails() {
- let (mut client, _handle, _tmp) = start_test_daemon().await;
-
- let result = client
- .end_history("nonexistent-id".to_string(), 1000, 0)
- .await;
- assert!(result.is_err());
- }
-
- #[tokio::test]
- async fn test_shutdown() {
- let (mut client, _handle, _tmp) = start_test_daemon().await;
-
- let accepted = client.shutdown().await.unwrap();
- assert!(accepted);
-
- // Give server time to shut down.
- tokio::time::sleep(Duration::from_millis(100)).await;
-
- // Subsequent calls should fail since the server is gone.
- let result = client.status().await;
- assert!(result.is_err());
- }
-}