aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src
diff options
context:
space:
mode:
authorMichelle Tilley <michelle@michelletilley.net>2026-02-26 14:42:08 -0800
committerGitHub <noreply@github.com>2026-02-26 14:42:08 -0800
commit3ba47446f06d5b0fbeaeb59d4ffed768b70729d8 (patch)
tree28348bb3d18e9983e9212c26840f691766cad985 /crates/atuin-daemon/src
parentfeat: Add history author/intent metadata and v1 record version (#3205) (diff)
downloadatuin-3ba47446f06d5b0fbeaeb59d4ffed768b70729d8.zip
feat: In-memory search index with atuin daemon (#3201)
## Summary This PR adds a persistent, in-memory search index to the Atuin daemon, enabling fast fuzzy search without the startup delay of building an index each time the TUI opens. ### Key Changes - **Daemon search service**: A new gRPC service that maintains a Nucleo fuzzy search index in memory - **Real-time index updates**: The daemon listens for history events (new commands, synced records) and updates the index immediately - **Filter mode support**: All existing filter modes work (Global, Host, Session, Directory, Workspace) - **New search engine**: `daemon-fuzzy` search mode that queries the daemon instead of building a local index - **Paged history loading**: Database pagination support for efficient initial index loading - **Configurable logging**: New `[logs]` settings section for daemon and search log configuration - **Component-based daemon architecture**: Refactored daemon internals into a modular, event-driven system - **Fallback to DB search for regex**: Since Nucleo doesn't support regex matching ## Daemon Architecture The daemon has been refactored to use a component-based, event-driven architecture that makes it easier to add new functionality and reason about the system. ### Core Concepts ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────┐ ┌──────────────────────────────────────────────────┐ │ │ │ Daemon │ │ Components │ │ │ │ Handle │────▶│ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ │ │ │ • emit() │ │ │ History │ │ Search │ │ Sync │ │ │ │ │ • subscribe │ │ │ Component │ │ Component │ │ Component │ │ │ │ │ • settings │ │ │ │ │ │ │ │ │ │ │ │ • databases │ │ │ gRPC service│ │ gRPC service│ │ background │ │ │ │ └─────────────┘ │ │ WIP history │ │ Nucleo index│ │ sync │ │ │ │ │ │ └─────────────┘ └─────────────┘ └────────────┘ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ ▲ │ │ ▼ │ │ │ ┌─────────────────────────────────────┴────────────────────────────────┐ │ │ │ Event Bus (broadcast) │ │ │ │ │ │ │ │ HistoryStarted │ HistoryEnded │ RecordsAdded │ SyncCompleted │ ... │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ ┌───────────────────────────────────┴──────────────────────────────────┐ │ │ │ Control Service (gRPC) │ │ │ │ External event injection from CLI commands │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### DaemonHandle A lightweight, cloneable handle that provides access to shared daemon resources: - **Event emission**: `handle.emit(DaemonEvent::...)` broadcasts to all components - **Event subscription**: `handle.subscribe()` returns a receiver for the event bus - **Settings**: `handle.settings()` for configuration access - **Databases**: `handle.history_db()` and `handle.store()` for data access ### Component Trait Components implement a simple lifecycle: ```rust #[async_trait] trait Component: Send + Sync { fn name(&self) -> &'static str; async fn start(&mut self, handle: DaemonHandle) -> Result<()>; async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; async fn stop(&mut self) -> Result<()>; } ``` ### Event-Driven Design Components communicate via events rather than direct coupling: | Event | Emitted By | Consumed By | |-------|-----------|-------------| | `HistoryStarted` | History gRPC | Search (logging) | | `HistoryEnded` | History gRPC | Search (index update) | | `RecordsAdded` | Sync | Search (index update) | | `HistoryPruned` | CLI (via Control) | Search (index rebuild) | | `HistoryDeleted` | CLI (via Control) | Search (index rebuild) | | `ForceSync` | CLI (via Control) | Sync | | `ShutdownRequested` | Signal handler | All (graceful shutdown) | ### External Event Injection CLI commands can inject events into a running daemon: ```rust // After `atuin history prune` emit_event(DaemonEvent::HistoryPruned).await?; // After deleting specific items emit_event(DaemonEvent::HistoryDeleted { ids }).await?; // Request immediate sync emit_event(DaemonEvent::ForceSync).await?; ``` This ensures the daemon's search index stays in sync with database changes made by CLI commands. ## Search Architecture The search service uses a [forked version of Nucleo](https://github.com/atuinsh/nucleo-ext) that adds filter and scorer callbacks, enabling efficient filtering and frecency-based ranking. ``` ┌────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │ │ Event System │───▶│ Search Component │ │ │ │ │ │ │ │ │ │ • RecordsAdded │ │ ┌────────────────────────────┐ │ │ │ │ • HistoryEnded │ │ │ Deduplicated Index │ │ │ │ │ • HistoryPruned │ │ │ │ │ │ │ └─────────────────┘ │ │ CommandData per command: │ │ │ │ │ │ • Global frecency │ │ │ │ ┌─────────────────┐ │ │ • Filter indexes (sets) │ │ │ │ │ Background Task │ │ │ • Invocation history │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ Rebuilds │ │ │ │ │ │ │ frecency map │ │ ▼ │ │ │ │ every 60s │───▶│ ┌────────────────────────────┐ │ │ │ └─────────────────┘ │ │ Nucleo (forked) │ │ │ │ │ │ │ │ │ │ │ │ • Filter callback │ │ │ │ │ │ • Scorer callback │ │ │ │ │ │ • Fuzzy matching │ │ │ │ │ └────────────────────────────┘ │ │ │ └──────────────────────────────────┘ │ │ │ │ │ │ gRPC (Unix socket) │ └──────────────────────────────────────│─────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Search TUI (Client) │ │ │ │ 1. Send query + filter mode + context to daemon │ │ 2. Receive matching history IDs (ranked by frecency) │ │ 3. Hydrate full records from local SQLite database │ │ 4. Display results in TUI │ └─────────────────────────────────────────────────────────────────┘ ``` ### Nucleo Fork The [nucleo-ext fork](https://github.com/atuinsh/nucleo-ext) adds two key features to Nucleo: 1. **Filter callback**: Pre-filter items before fuzzy matching (used for directory/host/session filtering) 2. **Scorer callback**: Compute custom scores after matching (used for frecency ranking) ```rust // Filter: only include commands run in current directory nucleo.set_filter(Some(Arc::new(|cmd: &String| { passing_commands.contains(cmd) }))); // Scorer: combine fuzzy score with frecency nucleo.set_scorer(Some(Arc::new(|cmd: &String, fuzzy_score: u32| { let frecency = frecency_map.get(cmd).unwrap_or(0); fuzzy_score + (frecency * 10) }))); ``` ### Deduplicated Index Commands are stored once per unique command text, with metadata tracking all invocations: ```rust struct CommandData { command: String, invocations: Vec<Invocation>, // All times this command was run global_frecency: FrecencyData, // Precomputed frecency score // O(1) filter indexes directories: HashSet<String>, // All cwds where command was run hosts: HashSet<String>, // All hostnames sessions: HashSet<String>, // All session IDs } ``` This deduplication means: - **Fewer items to match**: ~13K unique commands vs ~62K history entries - **O(1) filter checks**: HashSet lookups instead of scanning invocations - **Single frecency score**: Global frecency computed once, used for all filter modes ### Frecency Scoring Frecency (frequency + recency) scoring prioritizes recently and frequently used commands: ```rust fn compute_frecency(count: u32, last_used: i64, now: i64) -> u32 { let age_hours = (now - last_used) / 3600; // Recency: decays over time (half-life ~24 hours) let recency = (100.0 * (-age_hours as f64 / 24.0).exp()) as u32; // Frequency: logarithmic scaling let frequency = (count.ln() * 20.0).min(100.0) as u32; recency + frequency } ``` The frecency map is: - **Precomputed by background task** every 60 seconds - **Never computed inline** during search (no latency impact) - **Graceful fallback**: If unavailable, search works without frecency ranking ### Filter Mode Implementation | Filter Mode | Implementation | |-------------|----------------| | Global | No filter (all commands) | | Directory | `command.directories.contains(cwd)` | | Workspace | `command.directories.any(\|d\| d.starts_with(git_root))` | | Host | `command.hosts.contains(hostname)` | | Session | `command.sessions.contains(session_id)` | Filters are pre-computed into a HashSet before the search, making the filter callback O(1). ### Search Flow 1. **Daemon startup**: Loads history from SQLite in pages, builds deduplicated index 2. **Frecency precompute**: Background task builds frecency map after history loads 3. **Search request**: Client sends query with filter mode and context 4. **Filter**: Pre-computed HashSet determines which commands pass the filter 5. **Match**: Nucleo fuzzy matches the query against command text 6. **Score**: Frecency scorer ranks results (fuzzy score + frecency * 10) 7. **Response**: Returns history IDs for the most recent invocation of each matching command 8. **Hydration**: Client fetches full records from local SQLite ### Configuration ```toml # Enable daemon + autostart [daemon] enabled = true autostart = true # Enable daemon-based fuzzy search [search] search_mode = "daemon-fuzzy" ``` ## Performance Performance varies based on several factors, but in most initial testing with the new architecture shows improvement: * **Nucleo performs searches up to 4.5x faster**: direct DB search averages 18.07ms, but the daemon completes the same queries in 3.99ms. * **IPC overhead is significant, but acceptable**: a significant amount of wall-time is taken up by the transfer of data over IPC (via UDS in this case). This averages to about ~7.8ms and accounts for 66% of client-side wall time. * **Tail latency improves at every layer**: p99 times correspond to initial requests, worst-case query patterns, etc. but the average p99 daemon-based response time is 3.6x better than the associated DB-based search p99 time * **Query complexity no longer impacts performance**: the Nucleo-based search shows consistent 2-7ms times regardless of query pattern. The DB-based search had a 17x variance (3.59ms to 62.46ms). Interestingly, @ellie - who has a larger history store than I do - gets even better performance on the IPC layer. This could use a lot more testing in various edge cases and on various hardware, but seems promising. ### Regular DB search ``` Individual calls for: db_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 32.25ms 32.20ms 47.70µs {"mode":"Fuzzy","query":"^"} 2 19.48ms 19.40ms 84.20µs {"mode":"Fuzzy","query":"^c"} 3 20.40ms 20.10ms 297.00µs {"mode":"Fuzzy","query":"^ca"} 4 13.07ms 13.00ms 69.90µs {"mode":"Fuzzy","query":"^car"} 5 12.17ms 12.10ms 67.10µs {"mode":"Fuzzy","query":"^carg"} 6 20.78ms 20.70ms 76.60µs {"mode":"Fuzzy","query":"^cargo"} 7 9.15ms 9.10ms 53.20µs {"mode":"Fuzzy","query":"^cargo "} 8 10.24ms 10.00ms 237.00µs {"mode":"Fuzzy","query":"^cargo b"} 9 10.01ms 9.68ms 325.00µs {"mode":"Fuzzy","query":"^cargo bu"} 10 5.89ms 5.83ms 57.20µs {"mode":"Fuzzy","query":"^cargo bui"} 11 8.85ms 8.28ms 568.00µs {"mode":"Fuzzy","query":"^cargo buil"} 12 7.70ms 7.49ms 212.00µs {"mode":"Fuzzy","query":"^cargo build"} 13 3.59ms 3.53ms 57.00µs {"mode":"Fuzzy","query":"^cargo build$"} 14 6.50ms 6.44ms 63.60µs {"mode":"Fuzzy","query":"^cargo "} 15 6.48ms 6.38ms 100.00µs {"mode":"Fuzzy","query":"!"} 16 31.68ms 31.60ms 75.90µs {"mode":"Fuzzy","query":"!g"} 17 62.46ms 62.40ms 58.90µs {"mode":"Fuzzy","query":"!gi"} 18 30.35ms 30.30ms 46.90µs {"mode":"Fuzzy","query":"!git"} 19 53.84ms 53.80ms 40.80µs {"mode":"Fuzzy","query":"!git "} 20 19.24ms 19.20ms 39.70µs {"mode":"Fuzzy","query":"!git c"} 21 22.03ms 22.00ms 34.70µs {"mode":"Fuzzy","query":"!git co"} 22 17.13ms 17.00ms 133.00µs {"mode":"Fuzzy","query":"!git com"} 23 16.14ms 15.90ms 242.00µs {"mode":"Fuzzy","query":"!git comm"} 24 5.11ms 5.08ms 28.60µs {"mode":"Fuzzy","query":"!git commi"} 25 7.31ms 7.26ms 52.70µs {"mode":"Fuzzy","query":"!git commit"} Summary: 25 calls Wall: avg=18.07ms, min=3.59ms, max=62.46ms, p50=13.07ms, p99=62.46ms Busy: avg=17.95ms, min=3.53ms, max=62.40ms, p50=13.00ms, p99=62.40ms ``` ### Daemon-based search **Client** ``` Individual calls for: daemon_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 13.05ms 2.55ms 10.50ms {"query":"^"} 2 10.65ms 1.40ms 9.25ms {"query":"^c"} 3 10.72ms 1.18ms 9.54ms {"query":"^ca"} 4 5.54ms 485.00µs 5.06ms {"query":"^car"} 5 15.02ms 1.02ms 14.00ms {"query":"^carg"} 6 9.49ms 840.00µs 8.65ms {"query":"^cargo"} 7 5.53ms 555.00µs 4.97ms {"query":"^cargo "} 8 8.56ms 717.00µs 7.84ms {"query":"^cargo b"} 9 12.34ms 1.24ms 11.10ms {"query":"^cargo bu"} 10 8.38ms 650.00µs 7.73ms {"query":"^cargo bui"} 11 13.07ms 770.00µs 12.30ms {"query":"^cargo buil"} 12 17.11ms 709.00µs 16.40ms {"query":"^cargo build"} 13 15.41ms 907.00µs 14.50ms {"query":"^cargo build$"} 14 8.19ms 665.00µs 7.52ms {"query":"^cargo "} 15 7.98ms 1.72ms 6.26ms {"query":"!"} 16 13.56ms 856.00µs 12.70ms {"query":"!g"} 17 8.11ms 624.00µs 7.49ms {"query":"!gi"} 18 14.57ms 775.00µs 13.80ms {"query":"!git"} 19 14.18ms 779.00µs 13.40ms {"query":"!git "} 20 9.62ms 802.00µs 8.82ms {"query":"!git c"} 21 15.50ms 1.50ms 14.00ms {"query":"!git co"} 22 11.58ms 1.48ms 10.10ms {"query":"!git com"} 23 13.82ms 2.12ms 11.70ms {"query":"!git comm"} 24 17.48ms 2.18ms 15.30ms {"query":"!git commi"} 25 14.81ms 1.71ms 13.10ms {"query":"!git commit"} Summary: 25 calls Wall: avg=11.77ms, min=5.53ms, max=17.48ms, p50=12.34ms, p99=17.48ms Busy: avg=1.13ms, min=485.00µs, max=2.55ms, p50=856.00µs, p99=2.55ms ``` **Daemon** ``` Individual calls for: daemon_search_query -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.75ms 250ns 1.75ms {"query":"^","query_id":1} 2 4.58ms 125ns 4.58ms {"query":"^c","query_id":2} 3 4.39ms 250ns 4.39ms {"query":"^ca","query_id":3} 4 2.52ms 125ns 2.52ms {"query":"^car","query_id":4} 5 4.44ms 250ns 4.44ms {"query":"^carg","query_id":5} 6 3.66ms 167ns 3.66ms {"query":"^cargo","query_id":6} 7 2.38ms 84ns 2.38ms {"query":"^cargo ","query_id":7} 8 4.13ms 84ns 4.13ms {"query":"^cargo b","query_id":8} 9 4.40ms 167ns 4.40ms {"query":"^cargo bu","query_id":9} 10 3.87ms 125ns 3.87ms {"query":"^cargo bui","query_id":10} 11 4.36ms 84ns 4.36ms {"query":"^cargo buil","query_id":11} 12 3.96ms 333ns 3.96ms {"query":"^cargo build","query_id":12} 13 4.61ms 167ns 4.61ms {"query":"^cargo build$","query_id":13} 14 4.20ms 209ns 4.20ms {"query":"^cargo ","query_id":14} 15 238.17µs 167ns 238.00µs {"query":"!","query_id":15} 16 4.44ms 125ns 4.44ms {"query":"!g","query_id":16} 17 3.47ms 83ns 3.47ms {"query":"!gi","query_id":17} 18 4.57ms 125ns 4.57ms {"query":"!git","query_id":18} 19 7.15ms 167ns 7.15ms {"query":"!git ","query_id":19} 20 4.27ms 250ns 4.27ms {"query":"!git c","query_id":20} 21 5.19ms 292ns 5.19ms {"query":"!git co","query_id":21} 22 4.29ms 417ns 4.29ms {"query":"!git com","query_id":22} 23 4.08ms 125ns 4.08ms {"query":"!git comm","query_id":23} 24 4.50ms 167ns 4.50ms {"query":"!git commi","query_id":24} 25 4.35ms 208ns 4.35ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.99ms, min=238.17µs, max=7.15ms, p50=4.29ms, p99=7.15ms Busy: avg=182ns, min=83ns, max=417ns, p50=167ns, p99=417ns ``` **Nucleo matching time (in daemon)** ``` Individual calls for: nucleo_match -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.73ms 125ns 1.73ms {"query":"^","query_id":1} 2 4.57ms 167ns 4.57ms {"query":"^c","query_id":2} 3 4.37ms 125ns 4.37ms {"query":"^ca","query_id":3} 4 2.51ms 84ns 2.51ms {"query":"^car","query_id":4} 5 4.43ms 125ns 4.43ms {"query":"^carg","query_id":5} 6 3.64ms 125ns 3.64ms {"query":"^cargo","query_id":6} 7 2.37ms 84ns 2.37ms {"query":"^cargo ","query_id":7} 8 4.11ms 125ns 4.11ms {"query":"^cargo b","query_id":8} 9 4.36ms 208ns 4.36ms {"query":"^cargo bu","query_id":9} 10 3.85ms 125ns 3.85ms {"query":"^cargo bui","query_id":10} 11 4.35ms 125ns 4.35ms {"query":"^cargo buil","query_id":11} 12 3.94ms 250ns 3.94ms {"query":"^cargo build","query_id":12} 13 4.59ms 125ns 4.59ms {"query":"^cargo build$","query_id":13} 14 4.18ms 84ns 4.18ms {"query":"^cargo ","query_id":14} 15 220.13µs 125ns 220.00µs {"query":"!","query_id":15} 16 4.43ms 125ns 4.43ms {"query":"!g","query_id":16} 17 3.45ms 125ns 3.45ms {"query":"!gi","query_id":17} 18 4.55ms 125ns 4.55ms {"query":"!git","query_id":18} 19 7.12ms 209ns 7.12ms {"query":"!git ","query_id":19} 20 4.25ms 166ns 4.25ms {"query":"!git c","query_id":20} 21 5.18ms 125ns 5.18ms {"query":"!git co","query_id":21} 22 4.27ms 125ns 4.27ms {"query":"!git com","query_id":22} 23 4.06ms 292ns 4.06ms {"query":"!git comm","query_id":23} 24 4.46ms 166ns 4.46ms {"query":"!git commi","query_id":24} 25 4.31ms 208ns 4.31ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.97ms, min=220.13µs, max=7.12ms, p50=4.27ms, p99=7.12ms Busy: avg=147ns, min=84ns, max=292ns, p50=125ns, p99=292ns ```
Diffstat (limited to 'crates/atuin-daemon/src')
-rw-r--r--crates/atuin-daemon/src/client.rs289
-rw-r--r--crates/atuin-daemon/src/components/history.rs252
-rw-r--r--crates/atuin-daemon/src/components/mod.rs22
-rw-r--r--crates/atuin-daemon/src/components/search.rs394
-rw-r--r--crates/atuin-daemon/src/components/sync.rs257
-rw-r--r--crates/atuin-daemon/src/control/mod.rs12
-rw-r--r--crates/atuin-daemon/src/control/service.rs71
-rw-r--r--crates/atuin-daemon/src/daemon.rs450
-rw-r--r--crates/atuin-daemon/src/events.rs74
-rw-r--r--crates/atuin-daemon/src/history.rs1
-rw-r--r--crates/atuin-daemon/src/history/mod.rs6
-rw-r--r--crates/atuin-daemon/src/lib.rs107
-rw-r--r--crates/atuin-daemon/src/search/index.rs572
-rw-r--r--crates/atuin-daemon/src/search/mod.rs11
-rw-r--r--crates/atuin-daemon/src/server.rs360
-rw-r--r--crates/atuin-daemon/src/server/sync.rs96
16 files changed, 2610 insertions, 364 deletions
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
index 3b76a680..2f492f6b 100644
--- a/crates/atuin-daemon/src/client.rs
+++ b/crates/atuin-daemon/src/client.rs
@@ -1,4 +1,6 @@
-use eyre::{Context, Result};
+use atuin_client::database::Context;
+use atuin_client::settings::{FilterMode, Settings};
+use eyre::{Context as EyreContext, Result};
#[cfg(windows)]
use tokio::net::TcpStream;
use tonic::Code;
@@ -11,11 +13,22 @@ use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use atuin_client::history::History;
+use tracing::{Level, instrument, span};
+use crate::control::HistoryRebuiltEvent;
+use crate::control::{
+ ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
+ SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
+};
+use crate::events::DaemonEvent;
use crate::history::{
EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient,
};
+use crate::search::{
+ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
+ search_client::SearchClient as SearchServiceClient,
+};
pub struct HistoryClient {
client: HistoryServiceClient<Channel>,
@@ -52,6 +65,8 @@ pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind {
impl HistoryClient {
#[cfg(unix)]
pub async fn new(path: String) -> Result<Self> {
+ use eyre::Context;
+
let log_path = path.clone();
let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
.connect_with_connector(service_fn(move |_: Uri| {
@@ -130,3 +145,275 @@ impl HistoryClient {
Ok(resp.accepted)
}
}
+
+pub struct SearchClient {
+ client: SearchServiceClient<Channel>,
+}
+
+impl SearchClient {
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = SearchServiceClient::new(channel);
+
+ Ok(SearchClient { client })
+ }
+
+ #[cfg(not(unix))]
+ pub async fn new(port: u64) -> Result<Self> {
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let url = format!("127.0.0.1:{port}");
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
+ )
+ })?;
+
+ let client = SearchServiceClient::new(channel);
+
+ Ok(SearchClient { client })
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))]
+ pub async fn search(
+ &mut self,
+ query: String,
+ query_id: u64,
+ filter_mode: FilterMode,
+ context: Option<Context>,
+ ) -> Result<tonic::Streaming<SearchResponse>> {
+ let request = SearchRequest {
+ query,
+ query_id,
+ filter_mode: RpcFilterMode::from(filter_mode).into(),
+ context: context.map(RpcSearchContext::from),
+ };
+ let request_stream = tokio_stream::once(request);
+ let response = span!(Level::TRACE, "daemon_client_search.request")
+ .in_scope(async || self.client.search(request_stream).await)
+ .await?;
+
+ Ok(response.into_inner())
+ }
+}
+
+impl From<FilterMode> for RpcFilterMode {
+ fn from(filter_mode: FilterMode) -> Self {
+ match filter_mode {
+ FilterMode::Global => RpcFilterMode::Global,
+ FilterMode::Host => RpcFilterMode::Host,
+ FilterMode::Session => RpcFilterMode::Session,
+ FilterMode::Directory => RpcFilterMode::Directory,
+ FilterMode::Workspace => RpcFilterMode::Workspace,
+ FilterMode::SessionPreload => RpcFilterMode::SessionPreload,
+ }
+ }
+}
+
+impl From<Context> for RpcSearchContext {
+ fn from(context: Context) -> Self {
+ RpcSearchContext {
+ session_id: context.session,
+ cwd: context.cwd,
+ hostname: context.hostname,
+ host_id: context.host_id,
+ git_root: context
+ .git_root
+ .map(|path| path.to_string_lossy().to_string()),
+ }
+ }
+}
+
+// ============================================================================
+// Control Client
+// ============================================================================
+
+/// Client for the Control gRPC service.
+///
+/// Used to inject events into a running daemon from external processes.
+pub struct ControlClient {
+ client: ControlServiceClient<Channel>,
+}
+
+impl ControlClient {
+ /// Connect to the daemon's control service.
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = ControlServiceClient::new(channel);
+
+ Ok(ControlClient { client })
+ }
+
+ /// Connect to the daemon's control service.
+ #[cfg(not(unix))]
+ pub async fn new(port: u64) -> Result<Self> {
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let url = format!("127.0.0.1:{port}");
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
+ )
+ })?;
+
+ let client = ControlServiceClient::new(channel);
+
+ Ok(ControlClient { client })
+ }
+
+ /// Connect using settings.
+ #[cfg(unix)]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.socket_path.clone()).await
+ }
+
+ /// Connect using settings.
+ #[cfg(not(unix))]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.tcp_port).await
+ }
+
+ /// Send an event to the daemon.
+ pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> {
+ let proto_event = daemon_event_to_proto(event);
+ let request = SendEventRequest {
+ event: Some(proto_event),
+ };
+ self.client.send_event(request).await?;
+ Ok(())
+ }
+}
+
+/// Convert a daemon event to its proto representation.
+fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event {
+ use crate::control::send_event_request::Event;
+
+ match event {
+ DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
+ DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}),
+ DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent {
+ ids: ids.into_iter().map(|id| id.0).collect(),
+ }),
+ DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}),
+ DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}),
+ DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}),
+ // These events are internal and not sent via the control service
+ DaemonEvent::HistoryStarted(_)
+ | DaemonEvent::HistoryEnded(_)
+ | DaemonEvent::RecordsAdded(_)
+ | DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. } => {
+ // Use shutdown as a fallback, though this shouldn't happen
+ tracing::warn!("attempted to send internal event via control service");
+ Event::Shutdown(ShutdownEvent {})
+ }
+ }
+}
+
+// ============================================================================
+// Convenience Functions
+// ============================================================================
+
+/// Emit an event to the daemon.
+///
+/// This is a fire-and-forget helper for sending events to the daemon from
+/// external processes like CLI commands. If the daemon isn't running, this
+/// will silently succeed (returns Ok).
+///
+/// # Example
+///
+/// ```ignore
+/// // After pruning history
+/// emit_event(DaemonEvent::HistoryPruned).await?;
+///
+/// // After deleting specific history items
+/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?;
+///
+/// // Request immediate sync
+/// emit_event(DaemonEvent::ForceSync).await?;
+/// ```
+pub async fn emit_event(event: DaemonEvent) -> Result<()> {
+ emit_event_with_settings(event, None).await
+}
+
+/// Emit an event to the daemon with explicit settings.
+///
+/// If settings are not provided, they will be loaded from the default location.
+/// If the daemon isn't running, this will silently succeed.
+pub async fn emit_event_with_settings(
+ event: DaemonEvent,
+ settings: Option<&Settings>,
+) -> Result<()> {
+ // Load settings if not provided
+ let owned_settings;
+ let settings = match settings {
+ Some(s) => s,
+ None => {
+ owned_settings = Settings::new()?;
+ &owned_settings
+ }
+ };
+
+ // Try to connect - if daemon isn't running, that's fine
+ let mut client = match ControlClient::from_settings(settings).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::debug!(?e, "daemon not running, skipping event emission");
+ return Ok(());
+ }
+ };
+
+ // Send the event
+ if let Err(e) = client.send_event(event).await {
+ tracing::debug!(?e, "failed to send event to daemon");
+ // Don't fail - this is fire-and-forget
+ }
+
+ Ok(())
+}
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
new file mode 100644
index 00000000..23d48c5e
--- /dev/null
+++ b/crates/atuin-daemon/src/components/history.rs
@@ -0,0 +1,252 @@
+//! History component.
+//!
+//! Handles command history lifecycle (start/end) and provides the History gRPC service.
+
+use std::sync::Arc;
+
+use atuin_client::{
+ database::Database,
+ history::{History, HistoryId, store::HistoryStore},
+ settings::Settings,
+};
+use dashmap::DashMap;
+use eyre::Result;
+use time::OffsetDateTime;
+use tonic::{Request, Response, Status};
+use tracing::{Level, instrument};
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ history::{
+ EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply,
+ StartHistoryRequest, StatusReply, StatusRequest,
+ history_server::{History as HistorySvc, HistoryServer},
+ },
+};
+
+const DAEMON_PROTOCOL_VERSION: u32 = 1;
+
+/// History component - manages command history lifecycle.
+///
+/// This component:
+/// - Tracks currently running commands (stored in memory)
+/// - Saves completed commands to the database and record store
+/// - Emits history events for other components (e.g., search indexing)
+/// - Provides the History gRPC service
+pub struct HistoryComponent {
+ inner: Arc<HistoryComponentInner>,
+}
+
+struct HistoryComponentInner {
+ /// Commands currently running (not yet completed).
+ running: DashMap<HistoryId, History>,
+
+ /// Handle to the daemon (set during start).
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+
+ /// History store for pushing records (set during start).
+ history_store: tokio::sync::RwLock<Option<HistoryStore>>,
+}
+
+impl HistoryComponent {
+ /// Create a new history component.
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(HistoryComponentInner {
+ running: DashMap::new(),
+ handle: tokio::sync::RwLock::new(None),
+ history_store: tokio::sync::RwLock::new(None),
+ }),
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ ///
+ /// This returns a tonic service that can be added to a gRPC server.
+ pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
+ HistoryServer::new(HistoryGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for HistoryComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for HistoryComponent {
+ fn name(&self) -> &'static str {
+ "history"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ // Create the history store
+ let host_id = Settings::host_id().await?;
+ let history_store =
+ HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
+
+ *self.inner.history_store.write().await = Some(history_store);
+ *self.inner.handle.write().await = Some(handle);
+
+ tracing::info!("history component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
+ // History component produces events but doesn't need to react to them
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ tracing::info!("history component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+///
+/// This is a thin wrapper that delegates to the component's shared state.
+pub struct HistoryGrpcService {
+ inner: Arc<HistoryComponentInner>,
+}
+
+#[tonic::async_trait]
+impl HistorySvc for HistoryGrpcService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .build()
+ .into();
+
+ // Emit the event
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.emit(DaemonEvent::HistoryStarted(h.clone()));
+ }
+
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ self.inner.running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply {
+ id: id.to_string(),
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let req = request.into_inner();
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = self.inner.running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Get the handle and store to save the history
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let store_guard = self.inner.history_store.read().await;
+ let history_store = store_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ // Save to database
+ handle
+ .history_db()
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ // Push to record store
+ let (record_id, idx) = history_store
+ .push(history.clone())
+ .await
+ .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
+
+ // Emit the event
+ handle.emit(DaemonEvent::HistoryEnded(history));
+
+ let reply = EndHistoryReply {
+ id: record_id.0.to_string(),
+ idx,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn status(
+ &self,
+ _request: Request<StatusRequest>,
+ ) -> Result<Response<StatusReply>, Status> {
+ let reply = StatusReply {
+ healthy: true,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ pid: std::process::id(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn shutdown(
+ &self,
+ _request: Request<ShutdownRequest>,
+ ) -> Result<Response<ShutdownReply>, Status> {
+ // Use the daemon handle to request shutdown
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.shutdown();
+ }
+ Ok(Response::new(ShutdownReply { accepted: true }))
+ }
+}
diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs
new file mode 100644
index 00000000..5950d5d5
--- /dev/null
+++ b/crates/atuin-daemon/src/components/mod.rs
@@ -0,0 +1,22 @@
+//! Daemon components.
+//!
+//! Components are the building blocks of the daemon. Each component handles
+//! a specific domain and can:
+//!
+//! - Expose gRPC services
+//! - React to events
+//! - Spawn background tasks
+//!
+//! Available components:
+//!
+//! - [`history::HistoryComponent`]: Command history lifecycle management
+//! - [`search::SearchComponent`]: Fuzzy search over history
+//! - [`sync::SyncComponent`]: Cloud sync
+
+pub mod history;
+pub mod search;
+pub mod sync;
+
+pub use history::HistoryComponent;
+pub use search::SearchComponent;
+pub use sync::SyncComponent;
diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs
new file mode 100644
index 00000000..7fb59dea
--- /dev/null
+++ b/crates/atuin-daemon/src/components/search.rs
@@ -0,0 +1,394 @@
+//! Search component.
+//!
+//! Provides fuzzy search over command history using the Nucleo search library
+//! with frecency-based ranking and dynamic filtering.
+
+use std::{pin::Pin, sync::Arc};
+
+use atuin_client::database::Database;
+use eyre::Result;
+use tokio::sync::RwLock;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, debug, info, instrument, span, trace};
+use uuid::Uuid;
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ search::{
+ FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
+ search_server::{Search as SearchSvc, SearchServer},
+ },
+};
+
+const PAGE_SIZE: usize = 5000;
+const RESULTS_LIMIT: u32 = 200;
+/// How often to rebuild the frecency map (in seconds).
+const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
+
+/// Search component - provides fuzzy search over command history.
+///
+/// This component:
+/// - Maintains a deduplicated search index with frecency ranking
+/// - Loads history from the database on startup
+/// - Updates the index when history events occur
+/// - Provides the Search gRPC service
+pub struct SearchComponent {
+ index: Arc<RwLock<SearchIndex>>,
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+ loader_handle: Option<tokio::task::JoinHandle<()>>,
+ frecency_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl SearchComponent {
+ /// Create a new search component.
+ pub fn new() -> Self {
+ Self {
+ index: Arc::new(RwLock::new(SearchIndex::new())),
+ handle: tokio::sync::RwLock::new(None),
+ loader_handle: None,
+ frecency_handle: None,
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
+ SearchServer::new(SearchGrpcService {
+ index: self.index.clone(),
+ })
+ }
+
+ /// Rebuild the entire search index from the database.
+ async fn rebuild_index(&self) -> Result<()> {
+ let handle_guard = self.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| eyre::eyre!("component not initialized"))?;
+
+ info!("Rebuilding search index from database");
+
+ // Create a new index
+ let new_index = SearchIndex::new();
+
+ // Load all history into the new index
+ let db = handle.history_db().clone();
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ new_index.add_histories(&histories);
+ }
+ Ok(None) => break,
+ Err(e) => {
+ tracing::error!("Failed to load history during rebuild: {}", e);
+ break;
+ }
+ }
+ }
+
+ info!(
+ "Search index rebuild complete; {} unique commands",
+ new_index.command_count()
+ );
+
+ // Replace the old index with the new one
+ *self.index.write().await = new_index;
+ Ok(())
+ }
+}
+
+impl Default for SearchComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SearchComponent {
+ fn name(&self) -> &'static str {
+ "search"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ *self.handle.write().await = Some(handle.clone());
+
+ // Spawn background task to load history into index
+ let index = self.index.clone();
+ let db = handle.history_db().clone();
+
+ self.loader_handle = Some(tokio::spawn(async move {
+ info!(
+ "Loading history into search index; page size = {}",
+ PAGE_SIZE
+ );
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ index.read().await.add_histories(&histories);
+ }
+ Ok(None) => {
+ info!(
+ "Initial history load complete; {} unique commands indexed",
+ index.read().await.command_count()
+ );
+ // Build initial frecency map
+ index.read().await.rebuild_frecency().await;
+ info!("Initial frecency map built");
+ break;
+ }
+ Err(e) => {
+ tracing::error!("Failed to load history: {}", e);
+ break;
+ }
+ }
+ }
+ }));
+
+ // Spawn background task to periodically refresh frecency
+ let index_for_frecency = self.index.clone();
+ self.frecency_handle = Some(tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(
+ FRECENCY_REFRESH_INTERVAL_SECS,
+ ));
+ loop {
+ interval.tick().await;
+ trace!("Refreshing frecency map");
+ index_for_frecency.read().await.rebuild_frecency().await;
+ }
+ }));
+
+ tracing::info!("search component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ match event {
+ DaemonEvent::RecordsAdded(records) => {
+ debug!(
+ count = records.len(),
+ "Processing added records for search index"
+ );
+
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let histories: Vec<_> = handle
+ .history_db()
+ .query_history(
+ format!(
+ "select * from history where id in ({})",
+ records
+ .iter()
+ .map(|record| record.0.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ )
+ .as_str(),
+ )
+ .await
+ .unwrap_or_default();
+
+ span!(Level::TRACE, "inject_records", count = histories.len())
+ .in_scope(async || {
+ self.index.read().await.add_histories(&histories);
+ })
+ .await;
+ }
+ }
+ DaemonEvent::HistoryStarted(history) => {
+ debug!(id = %history.id, command = %history.command, "History started (no index action)");
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ span!(Level::TRACE, "inject_history_ended")
+ .in_scope(async || {
+ self.index.read().await.add_history(history);
+ })
+ .await;
+ }
+ DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
+ info!("History store pruned or rebuilt, rebuilding search index");
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::HistoryDeleted { ids } => {
+ info!(
+ count = ids.len(),
+ "History deleted, rebuilding search index"
+ );
+ // For now, just rebuild the entire index. A more efficient implementation
+ // would remove specific items from the index.
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ // Events we don't care about
+ DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. }
+ | DaemonEvent::ForceSync
+ | DaemonEvent::SettingsReloaded
+ | DaemonEvent::ShutdownRequested => {}
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(handle) = self.loader_handle.take() {
+ handle.abort();
+ }
+ if let Some(handle) = self.frecency_handle.take() {
+ handle.abort();
+ }
+ tracing::info!("search component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+pub struct SearchGrpcService {
+ index: Arc<RwLock<SearchIndex>>,
+}
+
+#[tonic::async_trait]
+impl SearchSvc for SearchGrpcService {
+ type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
+ async fn search(
+ &self,
+ request: Request<Streaming<SearchRequest>>,
+ ) -> Result<Response<Self::SearchStream>, Status> {
+ let mut in_stream = request.into_inner();
+ let index = self.index.clone();
+
+ // Create output channel
+ let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
+
+ // Spawn task to handle incoming requests and send responses
+ tokio::spawn(async move {
+ while let Some(req) = in_stream.message().await.transpose() {
+ match req {
+ Ok(search_req) => {
+ let query = search_req.query;
+ let query_id = search_req.query_id;
+ let filter_mode: FilterMode = search_req
+ .filter_mode
+ .try_into()
+ .unwrap_or(FilterMode::Global);
+ let proto_context = search_req.context;
+
+ debug!(
+ "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
+ query,
+ query_id,
+ filter_mode.as_str_name(),
+ proto_context
+ );
+
+ // Convert proto FilterMode + context to IndexFilterMode
+ let index_filter = convert_filter_mode(filter_mode, &proto_context);
+
+ // Build QueryContext from proto context
+ let query_context = proto_context
+ .map(|ctx| QueryContext {
+ cwd: Some(with_trailing_slash(&ctx.cwd)),
+ git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
+ hostname: Some(ctx.hostname),
+ session_id: Some(ctx.session_id),
+ })
+ .unwrap_or_default();
+
+ // Perform the search
+ let history_ids =
+ span!(Level::TRACE, "daemon_search_query", %query, query_id)
+ .in_scope(|| async {
+ let index = index.read().await;
+ index
+ .search(&query, index_filter, &query_context, RESULTS_LIMIT)
+ .await
+ })
+ .await;
+
+ // Convert history IDs to bytes
+ let ids: Vec<Vec<u8>> = history_ids
+ .iter()
+ .filter_map(|id| {
+ Uuid::parse_str(id)
+ .ok()
+ .map(|uuid| uuid.as_bytes().to_vec())
+ })
+ .collect();
+
+ if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
+ break; // Client disconnected
+ }
+ }
+ Err(e) => {
+ let _ = tx.send(Err(e)).await;
+ break;
+ }
+ }
+ }
+ });
+
+ // Convert receiver to stream
+ let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream)))
+ }
+}
+
+/// Convert proto FilterMode and context to IndexFilterMode.
+fn convert_filter_mode(
+ mode: FilterMode,
+ context: &Option<crate::search::SearchContext>,
+) -> IndexFilterMode {
+ match (mode, context) {
+ (FilterMode::Global, _) => IndexFilterMode::Global,
+ (FilterMode::Directory, Some(ctx)) => {
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ (FilterMode::Workspace, Some(ctx)) => {
+ if let Some(ref git_root) = ctx.git_root {
+ IndexFilterMode::Workspace(with_trailing_slash(git_root))
+ } else {
+ // Fall back to directory if no git root
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ }
+ (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
+ (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
+ (FilterMode::SessionPreload, Some(ctx)) => {
+ // SessionPreload is similar to Session - filter by session
+ IndexFilterMode::Session(ctx.session_id.clone())
+ }
+ // If no context provided, fall back to global
+ _ => IndexFilterMode::Global,
+ }
+}
+
+#[cfg(windows)]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('\\') {
+ s.to_string()
+ } else {
+ format!("{}\\", s)
+ }
+}
+
+#[cfg(not(windows))]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('/') {
+ s.to_string()
+ } else {
+ format!("{}/", s)
+ }
+}
diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs
new file mode 100644
index 00000000..6217706a
--- /dev/null
+++ b/crates/atuin-daemon/src/components/sync.rs
@@ -0,0 +1,257 @@
+//! Sync component.
+//!
+//! Handles periodic synchronization with the Atuin cloud server.
+
+use eyre::Result;
+use rand::Rng;
+use tokio::sync::mpsc;
+use tokio::time::{self, MissedTickBehavior};
+
+use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
+use atuin_dotfiles::store::{AliasStore, var::VarStore};
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+};
+
+/// Commands that can be sent to the sync task.
+enum SyncCommand {
+ /// Trigger an immediate sync.
+ ForceSync,
+ /// Stop the sync loop.
+ Stop,
+}
+
+/// Sync component - handles periodic cloud synchronization.
+///
+/// This component:
+/// - Runs a background sync loop on a configurable interval
+/// - Implements exponential backoff on sync failures
+/// - Responds to ForceSync events for immediate sync
+/// - Emits SyncCompleted/SyncFailed events
+pub struct SyncComponent {
+ task_handle: Option<tokio::task::JoinHandle<()>>,
+ command_tx: Option<mpsc::Sender<SyncCommand>>,
+}
+
+impl SyncComponent {
+ /// Create a new sync component.
+ pub fn new() -> Self {
+ Self {
+ task_handle: None,
+ command_tx: None,
+ }
+ }
+}
+
+impl Default for SyncComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SyncComponent {
+ fn name(&self) -> &'static str {
+ "sync"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ let (cmd_tx, cmd_rx) = mpsc::channel(16);
+ self.command_tx = Some(cmd_tx);
+
+ // Spawn the sync loop with its own copy of the handle
+ self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
+
+ tracing::info!("sync component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::ForceSync = event {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::ForceSync).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::Stop).await;
+ }
+ if let Some(handle) = self.task_handle.take() {
+ // Give the task a moment to shut down gracefully
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ tracing::info!("sync component stopped");
+ Ok(())
+ }
+}
+
+/// The main sync loop.
+///
+/// This runs in a spawned task and handles periodic sync as well as
+/// force sync requests.
+async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
+ tracing::info!("sync loop starting");
+
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+ let host_id = match Settings::host_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("failed to get host id, sync disabled: {e}");
+ return;
+ }
+ };
+
+ // Create the stores we need
+ let encryption_key = *handle.encryption_key();
+ let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
+ let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key);
+ let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key);
+
+ // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
+ let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop.
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ loop {
+ tokio::select! {
+ _ = ticker.tick() => {
+ do_sync_tick(
+ &handle,
+ &history_store,
+ &alias_store,
+ &var_store,
+ &mut ticker,
+ max_interval,
+ ).await;
+ }
+ cmd = cmd_rx.recv() => {
+ match cmd {
+ Some(SyncCommand::ForceSync) => {
+ tracing::info!("executing force sync");
+ do_sync_tick(
+ &handle,
+ &history_store,
+ &alias_store,
+ &var_store,
+ &mut ticker,
+ max_interval,
+ ).await;
+ }
+ Some(SyncCommand::Stop) | None => {
+ tracing::info!("sync loop stopping");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Execute a single sync tick.
+async fn do_sync_tick(
+ handle: &DaemonHandle,
+ history_store: &HistoryStore,
+ alias_store: &AliasStore,
+ var_store: &VarStore,
+ ticker: &mut time::Interval,
+ max_interval: f64,
+) {
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+
+ tracing::info!("sync tick");
+
+ // Check if logged in
+ let logged_in = match settings.logged_in().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!("failed to check login status, skipping sync tick: {e}");
+ return;
+ }
+ };
+
+ if !logged_in {
+ tracing::debug!("not logged in, skipping sync tick");
+ return;
+ }
+
+ // Perform the sync
+ let res = sync::sync(&settings, handle.store()).await;
+
+ match res {
+ Err(e) => {
+ tracing::error!("sync tick failed with {e}");
+
+ // Emit failure event
+ handle.emit(DaemonEvent::SyncFailed {
+ error: e.to_string(),
+ });
+
+ // Exponential backoff
+ let mut rng = rand::thread_rng();
+ let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ if new_interval > max_interval {
+ new_interval = max_interval;
+ }
+
+ *ticker = time::interval(time::Duration::from_secs(new_interval as u64));
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+ }
+ Ok((uploaded_count, downloaded_records)) => {
+ tracing::info!(
+ uploaded = uploaded_count,
+ downloaded = downloaded_records.len(),
+ "sync complete"
+ );
+
+ // Build history from downloaded records
+ if let Err(e) = history_store
+ .incremental_build(handle.history_db(), &downloaded_records)
+ .await
+ {
+ tracing::error!("failed to build history from downloaded records: {e}");
+ }
+
+ // Emit the records added event (for search indexing)
+ handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
+
+ // Emit sync completed event
+ handle.emit(DaemonEvent::SyncCompleted {
+ uploaded: uploaded_count as usize,
+ downloaded: downloaded_records.len(),
+ });
+
+ // Rebuild alias and var stores
+ if let Err(e) = alias_store.build().await {
+ tracing::error!("failed to rebuild alias store: {e}");
+ }
+ if let Err(e) = var_store.build().await {
+ tracing::error!("failed to rebuild var store: {e}");
+ }
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+ }
+
+ // Store sync time
+ if let Err(e) = Settings::save_sync_time().await {
+ tracing::error!("failed to save sync time: {e}");
+ }
+ }
+ }
+}
diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs
new file mode 100644
index 00000000..afb29c57
--- /dev/null
+++ b/crates/atuin-daemon/src/control/mod.rs
@@ -0,0 +1,12 @@
+//! Control module for external event injection.
+//!
+//! This module provides the gRPC service that allows external processes
+//! (like CLI commands) to inject events into the daemon's event bus.
+
+mod service;
+
+// Include the generated proto code
+tonic::include_proto!("control");
+
+// Re-export the service
+pub use service::ControlService;
diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs
new file mode 100644
index 00000000..2e7403ce
--- /dev/null
+++ b/crates/atuin-daemon/src/control/service.rs
@@ -0,0 +1,71 @@
+//! Control service implementation.
+//!
+//! This gRPC service allows external processes (like CLI commands) to inject
+//! events into the daemon's event bus.
+
+use atuin_client::history::HistoryId;
+use tonic::{Request, Response, Status};
+use tracing::{Level, info, instrument};
+
+use super::{
+ SendEventRequest, SendEventResponse,
+ control_server::{Control, ControlServer},
+ send_event_request::Event,
+};
+use crate::{daemon::DaemonHandle, events::DaemonEvent};
+
+/// The Control gRPC service.
+///
+/// This service is used by external processes to inject events into the daemon.
+/// It's not a component - it's part of the daemon's core infrastructure.
+pub struct ControlService {
+ handle: DaemonHandle,
+}
+
+impl ControlService {
+ /// Create a new control service with the given daemon handle.
+ pub fn new(handle: DaemonHandle) -> Self {
+ Self { handle }
+ }
+
+ /// Get a tonic server for this service.
+ pub fn into_server(self) -> ControlServer<Self> {
+ ControlServer::new(self)
+ }
+}
+
+#[tonic::async_trait]
+impl Control for ControlService {
+ #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
+ async fn send_event(
+ &self,
+ request: Request<SendEventRequest>,
+ ) -> Result<Response<SendEventResponse>, Status> {
+ let req = request.into_inner();
+
+ let event = req
+ .event
+ .ok_or_else(|| Status::invalid_argument("event is required"))?;
+
+ let daemon_event = proto_event_to_daemon_event(event)?;
+
+ info!(?daemon_event, "received control event");
+ self.handle.emit(daemon_event);
+
+ Ok(Response::new(SendEventResponse {}))
+ }
+}
+
+/// Convert a proto event to a daemon event.
+fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
+ match event {
+ Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
+ Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
+ Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
+ ids: e.ids.into_iter().map(HistoryId).collect(),
+ }),
+ Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
+ Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
+ Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
+ }
+}
diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs
new file mode 100644
index 00000000..ec0b7b68
--- /dev/null
+++ b/crates/atuin-daemon/src/daemon.rs
@@ -0,0 +1,450 @@
+//! Core daemon infrastructure.
+//!
+//! This module provides the foundational types for building the atuin daemon:
+//!
+//! - [`DaemonState`]: Shared state owned by the daemon
+//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
+//! - [`Component`]: A trait for implementing daemon components
+//! - [`Daemon`]: The main daemon orchestrator
+//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
+
+use std::sync::Arc;
+
+use atuin_client::{
+ database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
+ settings::Settings,
+};
+use eyre::{Context, Result};
+use tokio::sync::{RwLock, broadcast};
+
+use crate::events::DaemonEvent;
+
+// ============================================================================
+// DaemonState
+// ============================================================================
+
+/// Shared state owned by the daemon.
+///
+/// This contains all the resources that components and services need access to.
+/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
+pub struct DaemonState {
+ // Event bus
+ event_tx: broadcast::Sender<DaemonEvent>,
+
+ // Configuration (mutable - can be reloaded)
+ settings: RwLock<Settings>,
+
+ // Encryption key (immutable - derived at startup)
+ encryption_key: [u8; 32],
+
+ // Database handles
+ history_db: HistoryDatabase,
+ store: SqliteStore,
+}
+
+// ============================================================================
+// DaemonHandle
+// ============================================================================
+
+/// A lightweight handle to the daemon's shared state.
+///
+/// This is the primary way for components, gRPC services, and spawned tasks to
+/// interact with the daemon. It provides access to:
+///
+/// - Event emission and subscription
+/// - Configuration (settings, encryption key)
+/// - Database handles
+///
+/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
+/// around to any code that needs daemon access.
+///
+/// # Example
+///
+/// ```ignore
+/// // Emit an event
+/// handle.emit(DaemonEvent::HistoryPruned);
+///
+/// // Access settings
+/// let settings = handle.settings().await;
+/// let sync_freq = settings.daemon.sync_frequency;
+///
+/// // Access database
+/// let history = handle.history_db().load(id).await?;
+/// ```
+#[derive(Clone)]
+pub struct DaemonHandle {
+ state: Arc<DaemonState>,
+}
+
+impl DaemonHandle {
+ // ---- Events ----
+
+ /// Emit an event to the daemon's event bus.
+ ///
+ /// This is fire-and-forget - if no receivers are listening (which shouldn't
+ /// happen in normal operation), the event is dropped silently.
+ pub fn emit(&self, event: DaemonEvent) {
+ if let Err(e) = self.state.event_tx.send(event) {
+ tracing::warn!("failed to emit event (no receivers?): {e}");
+ }
+ }
+
+ /// Subscribe to the event bus.
+ ///
+ /// Returns a receiver that will receive all events emitted after this call.
+ /// Useful for components that need to listen for events outside of the
+ /// normal `handle_event` callback flow.
+ pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
+ self.state.event_tx.subscribe()
+ }
+
+ /// Request graceful shutdown of the daemon.
+ pub fn shutdown(&self) {
+ self.emit(DaemonEvent::ShutdownRequested);
+ }
+
+ // ---- Configuration ----
+
+ /// Get the current settings.
+ ///
+ /// This acquires a read lock on the settings. For most use cases, clone
+ /// the settings if you need to hold onto them.
+ pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
+ self.state.settings.read().await
+ }
+
+ /// Reload settings from disk and emit a SettingsReloaded event.
+ ///
+ /// Components listening for `SettingsReloaded` can then re-read settings
+ /// via `handle.settings()` to pick up the changes.
+ pub async fn reload_settings(&self) -> Result<()> {
+ let new_settings = Settings::new()?;
+ *self.state.settings.write().await = new_settings;
+ self.emit(DaemonEvent::SettingsReloaded);
+ tracing::info!("settings reloaded");
+ Ok(())
+ }
+
+ /// Get the encryption key.
+ pub fn encryption_key(&self) -> &[u8; 32] {
+ &self.state.encryption_key
+ }
+
+ // ---- Database ----
+
+ /// Get a reference to the history database.
+ pub fn history_db(&self) -> &HistoryDatabase {
+ &self.state.history_db
+ }
+
+ /// Get a reference to the record store.
+ pub fn store(&self) -> &SqliteStore {
+ &self.state.store
+ }
+}
+
+impl std::fmt::Debug for DaemonHandle {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DaemonHandle").finish_non_exhaustive()
+ }
+}
+
+// ============================================================================
+// Component Trait
+// ============================================================================
+
+/// A daemon component that handles a specific domain.
+///
+/// Components are the building blocks of the daemon. Each component:
+///
+/// - Has a unique name for logging and debugging
+/// - Can optionally expose gRPC services
+/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
+/// - Handles events from the event bus
+/// - Performs cleanup on shutdown
+///
+/// # Lifecycle
+///
+/// 1. **Construction**: Component is created (usually via `new()`)
+/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
+/// 3. **Running**: `handle_event()` is called for each event on the bus
+/// 4. **Shutdown**: `stop()` is called for cleanup
+///
+/// # Example
+///
+/// ```ignore
+/// pub struct MyComponent {
+/// handle: Option<DaemonHandle>,
+/// }
+///
+/// #[async_trait]
+/// impl Component for MyComponent {
+/// fn name(&self) -> &'static str { "my-component" }
+///
+/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+/// self.handle = Some(handle);
+/// Ok(())
+/// }
+///
+/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+/// match event {
+/// DaemonEvent::SomeEvent => {
+/// // Handle the event
+/// if let Some(handle) = &self.handle {
+/// handle.emit(DaemonEvent::ResponseEvent);
+/// }
+/// }
+/// _ => {}
+/// }
+/// Ok(())
+/// }
+///
+/// async fn stop(&mut self) -> Result<()> {
+/// Ok(())
+/// }
+/// }
+/// ```
+#[tonic::async_trait]
+pub trait Component: Send + Sync {
+ /// Human-readable name for logging and debugging.
+ fn name(&self) -> &'static str;
+
+ /// Called once at startup.
+ ///
+ /// Store the handle if you need to emit events or access daemon resources
+ /// later. The handle is cheaply cloneable, so feel free to clone it for
+ /// spawned tasks.
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
+
+ /// Handle an incoming event.
+ ///
+ /// Called for every event on the bus. To emit new events in response,
+ /// use the handle stored during `start()`. Events emitted here will be
+ /// processed in subsequent event loop iterations.
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
+
+ /// Called on graceful shutdown.
+ ///
+ /// Use this to clean up resources, abort spawned tasks, etc.
+ async fn stop(&mut self) -> Result<()>;
+}
+
+// ============================================================================
+// Daemon
+// ============================================================================
+
+/// The main daemon orchestrator.
+///
+/// The daemon manages components, runs the event loop, and coordinates startup
+/// and shutdown. It is constructed via [`DaemonBuilder`].
+///
+/// # Event Loop
+///
+/// The daemon runs a simple event loop:
+///
+/// 1. Wait for an event on the bus
+/// 2. Dispatch the event to all components (in registration order)
+/// 3. Components may emit new events in response
+/// 4. Repeat until `ShutdownRequested` is received
+///
+/// Events emitted during handling are queued and processed in subsequent
+/// iterations, ensuring the loop eventually drains.
+pub struct Daemon {
+ components: Vec<Box<dyn Component>>,
+ handle: DaemonHandle,
+}
+
+impl Daemon {
+ /// Create a new daemon builder.
+ pub fn builder(settings: Settings) -> DaemonBuilder {
+ DaemonBuilder::new(settings)
+ }
+
+ /// Get a clone of the daemon handle.
+ ///
+ /// The handle can be used to emit events, access settings, etc.
+ pub fn handle(&self) -> DaemonHandle {
+ self.handle.clone()
+ }
+
+ /// Start all components.
+ ///
+ /// This must be called before `run_event_loop()`. It initializes all
+ /// registered components with the daemon handle.
+ pub async fn start_components(&mut self) -> Result<()> {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "starting component");
+ component
+ .start(self.handle.clone())
+ .await
+ .with_context(|| format!("failed to start component: {}", component.name()))?;
+ }
+ Ok(())
+ }
+
+ /// Run the daemon event loop.
+ ///
+ /// This processes events until a ShutdownRequested event is received.
+ /// Components must be started first via `start_components()`.
+ pub async fn run_event_loop(&mut self) -> Result<()> {
+ let mut event_rx = self.handle.subscribe();
+ loop {
+ match event_rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => {
+ tracing::info!("shutdown requested, stopping daemon");
+ break;
+ }
+ Ok(event) => {
+ tracing::debug!(?event, "processing event");
+ self.dispatch_event(&event).await;
+ }
+ Err(broadcast::error::RecvError::Lagged(n)) => {
+ tracing::warn!(
+ skipped = n,
+ "event receiver lagged, some events were dropped"
+ );
+ }
+ Err(broadcast::error::RecvError::Closed) => {
+ tracing::info!("event bus closed, stopping daemon");
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Stop all components.
+ ///
+ /// This performs graceful shutdown of all components.
+ pub async fn stop_components(&mut self) {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "stopping component");
+ if let Err(e) = component.stop().await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error stopping component"
+ );
+ }
+ }
+ tracing::info!("all components stopped");
+ }
+
+ /// Run the daemon.
+ ///
+ /// This is a convenience method that starts components, runs the event loop,
+ /// and handles shutdown. It does not return until the daemon is shut down.
+ pub async fn run(mut self) -> Result<()> {
+ self.start_components().await?;
+ self.run_event_loop().await?;
+ self.stop_components().await;
+ tracing::info!("daemon stopped");
+ Ok(())
+ }
+
+ async fn dispatch_event(&mut self, event: &DaemonEvent) {
+ for component in &mut self.components {
+ if let Err(e) = component.handle_event(event).await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error handling event"
+ );
+ }
+ }
+ }
+}
+
+// ============================================================================
+// DaemonBuilder
+// ============================================================================
+
+/// Builder for constructing a [`Daemon`].
+///
+/// # Example
+///
+/// ```ignore
+/// let daemon = Daemon::builder(settings)
+/// .store(store)
+/// .history_db(history_db)
+/// .component(HistoryComponent::new())
+/// .component(SearchComponent::new())
+/// .component(SyncComponent::new())
+/// .build()
+/// .await?;
+///
+/// daemon.run().await?;
+/// ```
+pub struct DaemonBuilder {
+ settings: Settings,
+ store: Option<SqliteStore>,
+ history_db: Option<HistoryDatabase>,
+ components: Vec<Box<dyn Component>>,
+}
+
+impl DaemonBuilder {
+ /// Create a new daemon builder with the given settings.
+ pub fn new(settings: Settings) -> Self {
+ Self {
+ settings,
+ store: None,
+ history_db: None,
+ components: Vec::new(),
+ }
+ }
+
+ /// Set the record store.
+ pub fn store(mut self, store: SqliteStore) -> Self {
+ self.store = Some(store);
+ self
+ }
+
+ /// Set the history database.
+ pub fn history_db(mut self, db: HistoryDatabase) -> Self {
+ self.history_db = Some(db);
+ self
+ }
+
+ /// Register a component.
+ ///
+ /// Components are started in registration order and stopped in reverse order.
+ pub fn component(mut self, component: impl Component + 'static) -> Self {
+ self.components.push(Box::new(component));
+ self
+ }
+
+ /// Build the daemon.
+ ///
+ /// This loads the encryption key and creates the daemon state.
+ pub async fn build(self) -> Result<Daemon> {
+ let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
+ let history_db = self
+ .history_db
+ .ok_or_else(|| eyre::eyre!("history_db is required"))?;
+
+ // Load encryption key
+ let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
+ .context("could not load encryption key")?
+ .into();
+
+ // Create the event bus
+ let (event_tx, _) = broadcast::channel(64);
+
+ // Create the shared state
+ let state = Arc::new(DaemonState {
+ event_tx,
+ settings: RwLock::new(self.settings),
+ encryption_key,
+ history_db,
+ store,
+ });
+
+ // Create the handle (just a reference to the state)
+ let handle = DaemonHandle { state };
+
+ Ok(Daemon {
+ components: self.components,
+ handle,
+ })
+ }
+}
diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs
new file mode 100644
index 00000000..4e6c6ff3
--- /dev/null
+++ b/crates/atuin-daemon/src/events.rs
@@ -0,0 +1,74 @@
+//! Daemon events.
+//!
+//! Events are the primary communication mechanism within the daemon.
+//! Components emit events to notify others of state changes, and handle
+//! events to react to changes elsewhere in the system.
+//!
+//! External processes (like CLI commands) can also inject events via the
+//! Control gRPC service.
+
+use atuin_client::history::{History, HistoryId};
+use atuin_common::record::RecordId;
+
+/// Events that flow through the daemon's event bus.
+///
+/// Events are broadcast to all components. Each component decides which
+/// events it cares about in its `handle_event` implementation.
+#[derive(Debug, Clone)]
+pub enum DaemonEvent {
+ // ---- History lifecycle ----
+ /// A command has started running.
+ HistoryStarted(History),
+
+ /// A command has finished running.
+ HistoryEnded(History),
+
+ // ---- Sync ----
+ /// Records were synced from the server.
+ ///
+ /// The search component uses this to update its index with new history.
+ RecordsAdded(Vec<RecordId>),
+
+ /// Sync completed successfully.
+ SyncCompleted {
+ /// Number of records uploaded.
+ uploaded: usize,
+ /// Number of records downloaded.
+ downloaded: usize,
+ },
+
+ /// Sync failed.
+ SyncFailed {
+ /// Error message describing what went wrong.
+ error: String,
+ },
+
+ /// Request an immediate sync (external trigger).
+ ForceSync,
+
+ // ---- External commands ----
+ /// History was pruned - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin history prune` or similar.
+ HistoryPruned,
+
+ /// History was rebuilt - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin store rebuild history` or similar.
+ HistoryRebuilt,
+
+ /// Specific history items were deleted.
+ ///
+ /// The search component should remove these from its index.
+ HistoryDeleted {
+ /// IDs of the deleted history entries.
+ ids: Vec<HistoryId>,
+ },
+
+ /// Settings have changed, components should reload if needed.
+ SettingsReloaded,
+
+ // ---- Lifecycle ----
+ /// Request graceful shutdown of the daemon.
+ ShutdownRequested,
+}
diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs
deleted file mode 100644
index 57f5b2cf..00000000
--- a/crates/atuin-daemon/src/history.rs
+++ /dev/null
@@ -1 +0,0 @@
-tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs
new file mode 100644
index 00000000..b71853df
--- /dev/null
+++ b/crates/atuin-daemon/src/history/mod.rs
@@ -0,0 +1,6 @@
+//! History module for the daemon gRPC history service.
+//!
+//! This module contains the proto-generated types for the history gRPC service.
+
+// Include the generated proto code
+tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs
index e00060bc..6dc04db3 100644
--- a/crates/atuin-daemon/src/lib.rs
+++ b/crates/atuin-daemon/src/lib.rs
@@ -1,3 +1,110 @@
+use atuin_client::database::Sqlite as HistoryDatabase;
+use atuin_client::{record::sqlite_store::SqliteStore, settings::Settings};
+use eyre::Result;
+
pub mod client;
+pub mod components;
+pub mod control;
+pub mod daemon;
+pub mod events;
pub mod history;
+pub mod search;
pub mod server;
+
+// Re-export core daemon types for convenience
+pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle};
+pub use events::DaemonEvent;
+
+// Re-export components
+pub use components::{HistoryComponent, SearchComponent, SyncComponent};
+
+// Re-export client helpers
+pub use client::{ControlClient, emit_event, emit_event_with_settings};
+
+/// Boot the daemon using the new component-based architecture.
+///
+/// This creates a daemon with the standard components (history, search, sync),
+/// starts the gRPC server with their services, and runs the event loop.
+pub async fn boot(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: HistoryDatabase,
+) -> Result<()> {
+ // Create the components
+ let history_component = HistoryComponent::new();
+ let search_component = SearchComponent::new();
+ let sync_component = SyncComponent::new();
+
+ // Get the gRPC services before moving components into the daemon
+ // (The services share state with the components via Arc)
+ let history_service = history_component.grpc_service();
+ let search_service = search_component.grpc_service();
+
+ // Build the daemon
+ let mut daemon = Daemon::builder(settings.clone())
+ .store(store)
+ .history_db(history_db)
+ .component(history_component)
+ .component(search_component)
+ .component(sync_component)
+ .build()
+ .await?;
+
+ // Get a handle for the control service and gRPC server shutdown
+ let handle = daemon.handle();
+
+ // Create the control service
+ let control_service = control::ControlService::new(handle.clone());
+
+ // Start all components first (so gRPC services can work)
+ daemon.start_components().await?;
+
+ // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM
+ let signal_handle = handle.clone();
+ tokio::spawn(async move {
+ shutdown_signal().await;
+ tracing::info!("received shutdown signal");
+ signal_handle.shutdown();
+ });
+
+ // Start the gRPC server in the background
+ server::run_grpc_server(
+ settings,
+ history_service,
+ search_service,
+ control_service.into_server(),
+ handle,
+ )
+ .await?;
+
+ // Run the daemon event loop
+ daemon.run_event_loop().await?;
+
+ // Stop all components on shutdown
+ daemon.stop_components().await;
+
+ tracing::info!("daemon shut down complete");
+ Ok(())
+}
+
+/// Wait for a shutdown signal (Ctrl+C or SIGTERM).
+#[cfg(unix)]
+async fn shutdown_signal() {
+ let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .expect("failed to register sigterm handler");
+ let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
+ .expect("failed to register sigint handler");
+
+ tokio::select! {
+ _ = term.recv() => {},
+ _ = int.recv() => {},
+ }
+}
+
+/// Wait for a shutdown signal (Ctrl+C).
+#[cfg(not(unix))]
+async fn shutdown_signal() {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to listen for ctrl+c");
+}
diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs
new file mode 100644
index 00000000..b15b057f
--- /dev/null
+++ b/crates/atuin-daemon/src/search/index.rs
@@ -0,0 +1,572 @@
+//! Search index with frecency-based ranking.
+//!
+//! This module provides a deduplicated search index where each unique command
+//! is stored once, with metadata about all its invocations. This enables:
+//!
+//! - Efficient fuzzy matching (fewer items to match)
+//! - Frecency-based ranking (frequency + recency)
+//! - Dynamic filtering by directory, host, session, etc.
+
+use std::{collections::HashMap, sync::Arc};
+
+use atuin_client::history::History;
+use dashmap::{DashMap, DashSet};
+use nucleo::{Injector, Nucleo, pattern};
+use time::OffsetDateTime;
+use tokio::sync::RwLock;
+use tracing::{Level, instrument};
+
+use crate::components::search::with_trailing_slash;
+
+/// Data for a single invocation of a command.
+#[derive(Debug, Clone)]
+pub struct Invocation {
+ /// When the command was run.
+ pub timestamp: i64,
+ /// The working directory when the command was run.
+ #[allow(dead_code)]
+ pub cwd: String,
+ /// The hostname where the command was run.
+ #[allow(dead_code)]
+ pub hostname: String,
+ /// The session ID.
+ #[allow(dead_code)]
+ pub session: String,
+ /// The history entry ID (for returning in search results).
+ pub history_id: String,
+}
+
+impl From<&History> for Invocation {
+ fn from(history: &History) -> Self {
+ Self {
+ timestamp: history.timestamp.unix_timestamp(),
+ cwd: history.cwd.clone(),
+ hostname: history.hostname.clone(),
+ session: history.session.clone(),
+ history_id: history.id.0.clone(),
+ }
+ }
+}
+
+/// Pre-computed frecency data for O(1) lookup.
+#[derive(Debug, Clone, Default)]
+pub struct FrecencyData {
+ /// Total number of times this command was used.
+ pub count: u32,
+ /// Most recent usage timestamp (unix seconds).
+ pub last_used: i64,
+}
+
+impl FrecencyData {
+ /// Record a new usage of this command.
+ pub fn record_use(&mut self, timestamp: i64) {
+ self.count += 1;
+ if timestamp > self.last_used {
+ self.last_used = timestamp;
+ }
+ }
+
+ /// Compute frecency score based on count and recency.
+ ///
+ /// Uses a decay function where more recent commands score higher.
+ /// The formula balances frequency (how often) with recency (how recent).
+ #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")]
+ pub fn compute(&self, now: i64) -> u32 {
+ if self.count == 0 {
+ return 0;
+ }
+
+ // Time-based decay: score decreases as time passes
+ let age_seconds = (now - self.last_used).max(0) as u64;
+ let age_hours = age_seconds / 3600;
+
+ // Decay factor: recent commands get higher scores
+ // - Last hour: multiplier ~1.0
+ // - Last day: multiplier ~0.5
+ // - Last week: multiplier ~0.1
+ // - Older: multiplier approaches 0
+ let recency_score = match age_hours {
+ 0 => 100,
+ 1..=6 => 90,
+ 7..=24 => 70,
+ 25..=72 => 50,
+ 73..=168 => 30,
+ 169..=720 => 15,
+ _ => 5,
+ };
+
+ // Frequency boost: more uses = higher score (with diminishing returns)
+ let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0) as u32;
+
+ // Combined score
+ recency_score + frequency_score
+ }
+}
+
+/// Data for a unique command, including all its invocations.
+pub struct CommandData {
+ /// The command text (stored for debugging/logging purposes).
+ #[allow(dead_code)]
+ pub command: String,
+ /// All invocations of this command, sorted by timestamp (newest first).
+ pub invocations: Vec<Invocation>,
+ /// Pre-computed global frecency.
+ pub global_frecency: FrecencyData,
+
+ // Pre-computed indexes for O(1) filter lookups
+ /// All directories where this command has been run.
+ directories: DashSet<String>,
+ /// All hostnames where this command has been run.
+ hosts: DashSet<String>,
+ /// All sessions where this command has been run.
+ sessions: DashSet<String>,
+}
+
+impl CommandData {
+ /// Create a new CommandData from a history entry.
+ pub fn new(history: &History) -> Self {
+ let mut data = Self {
+ command: history.command.clone(),
+ invocations: Vec::new(),
+ global_frecency: FrecencyData::default(),
+ directories: DashSet::new(),
+ hosts: DashSet::new(),
+ sessions: DashSet::new(),
+ };
+ data.add_invocation(history);
+ data
+ }
+
+ /// Add an invocation from a history entry.
+ pub fn add_invocation(&mut self, history: &History) {
+ let timestamp = history.timestamp.unix_timestamp();
+
+ // Update global frecency
+ self.global_frecency.record_use(timestamp);
+
+ // Update pre-computed indexes for O(1) filter lookups
+ self.directories.insert(with_trailing_slash(&history.cwd));
+ self.hosts.insert(history.hostname.clone());
+ self.sessions.insert(history.session.clone());
+
+ let invocation = Invocation::from(history);
+
+ // Insert sorted by timestamp (newest first)
+ let pos = self
+ .invocations
+ .iter()
+ .position(|inv| inv.timestamp < timestamp)
+ .unwrap_or(self.invocations.len());
+ self.invocations.insert(pos, invocation);
+ }
+
+ /// Get the most recent history ID for this command.
+ pub fn most_recent_id(&self) -> Option<&str> {
+ self.invocations.first().map(|inv| inv.history_id.as_str())
+ }
+
+ /// Check if any invocation matches a directory filter (exact match).
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_dir(&self, dir: &str) -> bool {
+ self.directories.contains(dir)
+ }
+
+ /// Check if any invocation matches a directory prefix (workspace/git root).
+ /// O(n) where n = number of unique directories for this command.
+ pub fn has_invocation_in_workspace(&self, prefix: &str) -> bool {
+ self.directories.iter().any(|d| d.starts_with(prefix))
+ }
+
+ /// Check if any invocation matches a hostname.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_on_host(&self, hostname: &str) -> bool {
+ self.hosts.contains(hostname)
+ }
+
+ /// Check if any invocation matches a session.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_session(&self, session: &str) -> bool {
+ self.sessions.contains(session)
+ }
+}
+
+/// Filter mode for search queries.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum IndexFilterMode {
+ /// No filtering - search all commands.
+ Global,
+ /// Filter to commands run in a specific directory.
+ Directory(String),
+ /// Filter to commands run in a workspace (directory prefix).
+ Workspace(String),
+ /// Filter to commands run on a specific host.
+ Host(String),
+ /// Filter to commands run in a specific session.
+ Session(String),
+}
+
+/// Context for search queries.
+#[derive(Debug, Clone, Default)]
+pub struct QueryContext {
+ pub cwd: Option<String>,
+ pub git_root: Option<String>,
+ pub hostname: Option<String>,
+ pub session_id: Option<String>,
+}
+
+/// A deduplicated search index with frecency-based ranking.
+///
+/// Commands are stored by their text, with metadata about all invocations.
+/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
+///
+/// Global frecency is precomputed by a background task and used for scoring.
+/// If frecency data is not available, search still works but without frecency ranking;
+/// although this should never happen due to precomputing the frecency map.
+pub struct SearchIndex {
+ /// Map from command text to command data.
+ /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
+ commands: Arc<DashMap<String, CommandData>>,
+ /// Nucleo fuzzy matcher - items are command strings.
+ nucleo: RwLock<Nucleo<String>>,
+ /// Injector for adding new commands to Nucleo.
+ injector: Injector<String>,
+ /// Precomputed global frecency map (command -> frecency score).
+ /// Updated by background task. If None, search works without frecency.
+ frecency_map: RwLock<Option<Arc<HashMap<String, u32>>>>,
+}
+
+impl SearchIndex {
+ /// Create a new empty search index.
+ pub fn new() -> Self {
+ let nucleo_config = nucleo::Config::DEFAULT;
+ // Single column for command text
+ let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
+ let injector = nucleo.injector();
+
+ Self {
+ commands: Arc::new(DashMap::new()),
+ nucleo: RwLock::new(nucleo),
+ injector,
+ frecency_map: RwLock::new(None),
+ }
+ }
+
+ /// Add a history entry to the index.
+ ///
+ /// If the command already exists, updates its invocation data.
+ /// If it's a new command, adds it to both the map and Nucleo.
+ pub fn add_history(&self, history: &History) {
+ let command = &history.command;
+
+ if let Some(mut entry) = self.commands.get_mut(command) {
+ // Existing command - just update invocations
+ entry.add_invocation(history);
+ } else {
+ // New command - add to both map and Nucleo
+ let data = CommandData::new(history);
+ self.commands.insert(command.clone(), data);
+ self.injector.push(command.clone(), |cmd, cols| {
+ cols[0] = cmd.clone().into();
+ });
+ }
+ // Note: frecency_map is rebuilt by background task, not invalidated here
+ }
+
+ /// Add multiple history entries to the index.
+ pub fn add_histories(&self, histories: &[History]) {
+ for history in histories {
+ self.add_history(history);
+ }
+ }
+
+ /// Get the number of unique commands in the index.
+ pub fn command_count(&self) -> usize {
+ self.commands.len()
+ }
+
+ /// Get the number of items in Nucleo (should match command_count).
+ pub async fn nucleo_item_count(&self) -> u32 {
+ self.nucleo.read().await.snapshot().item_count()
+ }
+
+ /// Search for commands matching a query.
+ ///
+ /// Returns a list of history IDs (most recent invocation per command).
+ /// Uses precomputed global frecency for scoring if available.
+ #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))]
+ pub async fn search(
+ &self,
+ query: &str,
+ filter_mode: IndexFilterMode,
+ _context: &QueryContext,
+ limit: u32,
+ ) -> Vec<String> {
+ let mut nucleo = self.nucleo.write().await;
+
+ // Get precomputed frecency map (may be None if not yet computed)
+ let frecency_map = self.frecency_map.read().await.clone();
+
+ // Build filter based on mode
+ let filter = self.build_filter(&filter_mode);
+ nucleo.set_filter(filter);
+
+ // Build scorer from precomputed frecency (or None if not available)
+ let scorer = Self::build_scorer(frecency_map);
+ nucleo.set_scorer(scorer);
+
+ // Update pattern
+ nucleo.pattern.reparse(
+ 0,
+ query,
+ pattern::CaseMatching::Smart,
+ pattern::Normalization::Smart,
+ false,
+ );
+
+ tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
+ // Tick until complete
+ while nucleo.tick(10).running {}
+ });
+
+ // Collect results
+ let snapshot = nucleo.snapshot();
+ let matched_count = snapshot.matched_item_count().min(limit);
+
+ tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
+ snapshot
+ .matched_items(..matched_count)
+ .filter_map(|item| {
+ let cmd = item.data;
+ self.commands
+ .get(cmd)
+ .and_then(|data| data.most_recent_id().map(|s| s.to_string()))
+ })
+ .collect()
+ })
+ }
+
+ /// Rebuild the global frecency map.
+ ///
+ /// This should be called by a background task periodically.
+ /// The map is used for scoring search results.
+ #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")]
+ pub async fn rebuild_frecency(&self) {
+ let now = OffsetDateTime::now_utc().unix_timestamp();
+ let mut frecency_map: HashMap<String, u32> = HashMap::new();
+
+ for entry in self.commands.iter() {
+ let frecency = entry.global_frecency.compute(now);
+ frecency_map.insert(entry.key().clone(), frecency);
+ }
+
+ *self.frecency_map.write().await = Some(Arc::new(frecency_map));
+ }
+
+ /// Build filter predicate for the given mode.
+ fn build_filter(&self, mode: &IndexFilterMode) -> Option<nucleo::Filter<String>> {
+ // For Global mode, no filter needed
+ if matches!(mode, IndexFilterMode::Global) {
+ return None;
+ }
+
+ // Pre-compute which commands pass the filter
+ let passing_commands: Arc<std::collections::HashSet<String>> = {
+ let mut set = std::collections::HashSet::new();
+ for entry in self.commands.iter() {
+ let passes = match mode {
+ IndexFilterMode::Global => unreachable!(),
+ IndexFilterMode::Directory(dir) => entry.has_invocation_in_dir(dir),
+ IndexFilterMode::Workspace(prefix) => entry.has_invocation_in_workspace(prefix),
+ IndexFilterMode::Host(hostname) => entry.has_invocation_on_host(hostname),
+ IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
+ };
+ if passes {
+ set.insert(entry.key().clone());
+ }
+ }
+ Arc::new(set)
+ };
+
+ Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
+ }
+
+ /// Build scorer from precomputed frecency map.
+ ///
+ /// Returns None if frecency map is not available (search still works, just without frecency ranking).
+ fn build_scorer(
+ frecency_map: Option<Arc<HashMap<String, u32>>>,
+ ) -> Option<nucleo::Scorer<String>> {
+ let map = frecency_map?;
+ Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
+ let frecency = map.get(cmd).copied().unwrap_or(0);
+ fuzzy_score + frecency
+ }))
+ }
+}
+
+impl Default for SearchIndex {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::macros::datetime;
+
+ fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History {
+ History::import()
+ .timestamp(timestamp)
+ .command(command)
+ .cwd(cwd)
+ .build()
+ .into()
+ }
+
+ #[test]
+ fn frecency_data_compute() {
+ let now = 1000000i64;
+
+ // Recent command
+ let recent = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago
+ };
+ assert!(recent.compute(now) > 100); // High score
+
+ // Old command
+ let old = FrecencyData {
+ count: 5,
+ last_used: now - 86400 * 30, // 30 days ago
+ };
+ assert!(old.compute(now) < recent.compute(now));
+
+ // Frequently used old command
+ let frequent_old = FrecencyData {
+ count: 100,
+ last_used: now - 86400 * 7, // 1 week ago
+ };
+ // Should still have decent score due to frequency
+ assert!(frequent_old.compute(now) > 50);
+ }
+
+ #[test]
+ fn command_data_add_invocation() {
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&history1);
+ assert_eq!(data.invocations.len(), 1);
+ assert_eq!(data.global_frecency.count, 1);
+
+ data.add_invocation(&history2);
+ assert_eq!(data.invocations.len(), 2);
+ assert_eq!(data.global_frecency.count, 2);
+
+ // Most recent should be first
+ assert_eq!(data.invocations[0].cwd, dir2);
+ assert_eq!(data.invocations[1].cwd, dir1);
+ }
+
+ #[test]
+ fn command_data_filters() {
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&h1);
+ data.add_invocation(&h2);
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User\\project"),
+ with_trailing_slash("C:\\Users\\User\\other"),
+ with_trailing_slash("C:\\Users\\User\\missing"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user/project"),
+ with_trailing_slash("/home/user/other"),
+ with_trailing_slash("/home/user/missing"),
+ )
+ };
+
+ assert!(data.has_invocation_in_dir(&check1));
+ assert!(data.has_invocation_in_dir(&check2));
+ assert!(!data.has_invocation_in_dir(&check3));
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User"),
+ with_trailing_slash("C:\\Users"),
+ with_trailing_slash("C:\\Users\\User\\var"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user"),
+ with_trailing_slash("/home"),
+ with_trailing_slash("/var"),
+ )
+ };
+
+ assert!(data.has_invocation_in_workspace(&check1));
+ assert!(data.has_invocation_in_workspace(&check2));
+ assert!(!data.has_invocation_in_workspace(&check3));
+ }
+
+ #[tokio::test]
+ async fn search_index_add_and_search() {
+ let index = SearchIndex::new();
+
+ let h1 = make_history(
+ "git status",
+ "/home/user/project",
+ datetime!(2024-01-01 10:00 UTC),
+ );
+ let h2 = make_history(
+ "git commit -m 'test'",
+ "/home/user/project",
+ datetime!(2024-01-01 10:05 UTC),
+ );
+ let h3 = make_history(
+ "ls -la",
+ "/home/user/other",
+ datetime!(2024-01-01 10:10 UTC),
+ );
+
+ index.add_history(&h1);
+ index.add_history(&h2);
+ index.add_history(&h3);
+
+ assert_eq!(index.command_count(), 3);
+
+ // Search for "git" - should match 2 commands
+ let results = index
+ .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
+ .await;
+ assert_eq!(results.len(), 2);
+
+ // Search with directory filter
+ let results = index
+ .search(
+ "",
+ IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
+ &QueryContext::default(),
+ 10,
+ )
+ .await;
+ assert_eq!(results.len(), 2); // git status and git commit
+ }
+}
diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs
new file mode 100644
index 00000000..4d261956
--- /dev/null
+++ b/crates/atuin-daemon/src/search/mod.rs
@@ -0,0 +1,11 @@
+//! Search module for the daemon gRPC search service.
+//!
+//! This module provides fuzzy search over command history using Nucleo.
+
+mod index;
+
+// Include the generated proto code
+tonic::include_proto!("search");
+
+// Re-export the service and index
+pub use index::{IndexFilterMode, QueryContext, SearchIndex};
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
index 826d6191..a11de612 100644
--- a/crates/atuin-daemon/src/server.rs
+++ b/crates/atuin-daemon/src/server.rs
@@ -1,249 +1,49 @@
-use eyre::WrapErr;
-
-use atuin_client::encryption;
-use atuin_client::history::store::HistoryStore;
-use atuin_client::record::sqlite_store::SqliteStore;
-use atuin_client::settings::Settings;
-use std::io::ErrorKind;
-#[cfg(unix)]
-use std::path::PathBuf;
-use std::sync::Arc;
-use time::OffsetDateTime;
-use tokio::sync::watch;
-use tracing::{Level, instrument};
-
-use atuin_client::database::{Database, Sqlite as HistoryDatabase};
-use atuin_client::history::{History, HistoryId};
-use dashmap::DashMap;
use eyre::Result;
-use tonic::{Request, Response, Status, transport::Server};
-
-use crate::history::history_server::{History as HistorySvc, HistoryServer};
-
-use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest};
-use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest};
-
-mod sync;
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-#[derive(Debug)]
-pub struct HistoryService {
- // A store for WIP history
- // This is history that has not yet been completed, aka a command that's current running.
- running: Arc<DashMap<HistoryId, History>>,
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
-}
-
-impl HistoryService {
- pub fn new(
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
- ) -> Self {
- Self {
- running: Arc::new(DashMap::new()),
- store,
- history_db,
- shutdown_tx,
- }
- }
-}
-
-#[tonic::async_trait()]
-impl HistorySvc for HistoryService {
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let mut h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .build()
- .into();
- if !req.author.trim().is_empty() {
- h.author = req.author;
- }
- if !req.intent.trim().is_empty() {
- h.intent = Some(req.intent);
- }
-
- // The old behaviour had us inserting half-finished history records into the database
- // The new behaviour no longer allows that.
- // History that's running is stored in-memory by the daemon, and only committed when
- // complete.
- // If anyone relied on the old behaviour, we could perhaps insert to the history db here
- // too. I'd rather keep it pure, unless that ends up being the case.
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Perhaps allow the incremental build to handle this entirely.
- self.history_db
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- let (id, idx) =
- self.store.push(history).await.map_err(|e| {
- Status::internal(format!("failed to push record to store: {e:?}"))
- })?;
-
- let reply = EndHistoryReply {
- id: id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- // If status RPC responds, the daemon control plane is healthy.
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- let _ = self.shutdown_tx.send(true);
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}
-
-#[cfg(unix)]
-async fn shutdown_signal(socket: Option<PathBuf>, mut shutdown_rx: watch::Receiver<bool>) {
- let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .expect("failed to register sigterm handler");
- let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
- .expect("failed to register sigint handler");
+use crate::components::history::HistoryGrpcService;
+use crate::components::search::SearchGrpcService;
+use crate::control::{ControlService, control_server::ControlServer};
+use crate::daemon::DaemonHandle;
+use crate::history::history_server::HistoryServer;
+use crate::search::search_server::SearchServer;
- tokio::select! {
- _ = term.recv() => {},
- _ = int.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
-
- eprintln!("Removing socket...");
- if let Some(socket) = socket {
- match std::fs::remove_file(socket) {
- Ok(()) => {}
- Err(err) if err.kind() == ErrorKind::NotFound => {}
- Err(err) => {
- eprintln!("failed to remove socket: {err}");
- }
- }
- }
- eprintln!("Shutting down...");
-}
-
-#[cfg(windows)]
-async fn shutdown_signal(mut shutdown_rx: watch::Receiver<bool>) {
- let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler");
- tokio::select! {
- _ = ctrl_c.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
- eprintln!("Shutting down...");
-}
+use atuin_client::settings::Settings;
+/// Run the gRPC server with the given services.
+///
+/// This starts the gRPC server in the background and returns immediately.
+/// The server will shut down when a ShutdownRequested event is received.
#[cfg(unix)]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
- let socket_path = settings.daemon.socket_path;
+ let socket_path = settings.daemon.socket_path.clone();
let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
#[cfg(target_os = "linux")]
{
- use eyre::OptionExt;
+ use eyre::{OptionExt, WrapErr};
+ use std::os::unix::net::SocketAddr;
+ use std::path::PathBuf;
tracing::info!("getting systemd socket");
let listener = listenfd::ListenFd::from_env()
.take_unix_listener(0)?
.ok_or_eyre("missing systemd socket")?;
listener.set_nonblocking(true)?;
- let actual_path = listener
+ let actual_path: Result<PathBuf, eyre::Report> = listener
.local_addr()
.context("getting systemd socket's path")
- .and_then(|addr| {
+ .and_then(|addr: SocketAddr| {
addr.as_pathname()
.ok_or_eyre("systemd socket missing path")
- .map(|path| path.to_owned())
+ .map(|path: &std::path::Path| path.to_owned())
});
match actual_path {
Ok(actual_path) => {
@@ -271,66 +71,94 @@ async fn start_server(
let uds_stream = UnixListenerStream::new(uds);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(
- uds_stream,
- shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx),
- )
- .await?;
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ let mut rx = handle.subscribe();
+ loop {
+ use crate::DaemonEvent;
+
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ if cleanup {
+ eprintln!("Removing socket...");
+ if let Err(e) = std::fs::remove_file(&socket_path)
+ && e.kind() != std::io::ErrorKind::NotFound
+ {
+ eprintln!("failed to remove socket: {e}");
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
+
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ use tonic::transport::Server;
+
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
Ok(())
}
+/// Run the gRPC server with the given services (Windows/TCP version).
#[cfg(not(unix))]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
+ use tonic::transport::Server;
let port = settings.daemon.tcp_port;
let url = format!("127.0.0.1:{port}");
- let tcp = TcpListener::bind(url).await?;
+ let tcp = TcpListener::bind(&url).await?;
let tcp_stream = TcpListenerStream::new(tcp);
tracing::info!("listening on tcp port {:?}", port);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx))
- .await?;
- Ok(())
-}
-
-// break the above down when we end up with multiple services
-
-/// Listen on a unix socket
-/// Pass the path to the socket
-pub async fn listen(
- settings: Settings,
- store: SqliteStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- let encryption_key: [u8; 32] = encryption::load_key(&settings)
- .context("could not load encryption key")?
- .into();
-
- let host_id = Settings::host_id().await?;
- let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ use crate::DaemonEvent;
- let (shutdown_tx, shutdown_rx) = watch::channel(false);
- let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx);
+ let mut rx = handle.subscribe();
+ loop {
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
- // start services
- tokio::spawn(sync::worker(
- settings.clone(),
- store,
- history_store,
- history_db,
- ));
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(tcp_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
- start_server(settings, history, shutdown_rx).await
+ Ok(())
}
diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs
deleted file mode 100644
index e1e49597..00000000
--- a/crates/atuin-daemon/src/server/sync.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-use eyre::Result;
-use rand::Rng;
-use tokio::time::{self, MissedTickBehavior};
-
-use atuin_client::database::Sqlite as HistoryDatabase;
-use atuin_client::{
- encryption,
- history::store::HistoryStore,
- record::{sqlite_store::SqliteStore, sync},
- settings::Settings,
-};
-
-use atuin_dotfiles::store::{AliasStore, var::VarStore};
-
-pub async fn worker(
- settings: Settings,
- store: SqliteStore,
- history_store: HistoryStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- tracing::info!("booting sync worker");
-
- let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into();
- let host_id = Settings::host_id().await?;
- let alias_store = AliasStore::new(store.clone(), host_id, encryption_key);
- let var_store = VarStore::new(store.clone(), host_id, encryption_key);
-
- // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
- let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
-
- let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
-
- // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
- // we may end up running a lot of syncs in a hot loop. No bueno!
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- loop {
- ticker.tick().await;
- tracing::info!("sync worker tick");
-
- let logged_in = match settings.logged_in().await {
- Ok(v) => v,
- Err(e) => {
- tracing::warn!("failed to check login status, skipping sync tick: {e}");
- continue;
- }
- };
-
- if !logged_in {
- tracing::debug!("not logged in, skipping sync tick");
- continue;
- }
-
- let res = sync::sync(&settings, &store).await;
-
- if let Err(e) = res {
- tracing::error!("sync tick failed with {e}");
-
- let mut rng = rand::thread_rng();
-
- let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
-
- if new_interval > max_interval {
- new_interval = max_interval;
- }
-
- ticker = time::interval(time::Duration::from_secs(new_interval as u64));
- ticker.reset_after(time::Duration::from_secs(new_interval as u64));
-
- tracing::error!("backing off, next sync tick in {new_interval}");
- } else {
- let (uploaded, downloaded) = res.unwrap();
-
- tracing::info!(
- uploaded = ?uploaded,
- downloaded = ?downloaded,
- "sync complete"
- );
-
- history_store
- .incremental_build(&history_db, &downloaded)
- .await?;
-
- alias_store.build().await?;
- var_store.build().await?;
-
- // Reset backoff on success
- if ticker.period().as_secs() != settings.daemon.sync_frequency {
- ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
- }
-
- // store sync time
- Settings::save_sync_time().await?;
- }
- }
-}