aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/server.rs
diff options
context:
space:
mode:
authorMichelle Tilley <michelle@michelletilley.net>2026-02-26 14:42:08 -0800
committerGitHub <noreply@github.com>2026-02-26 14:42:08 -0800
commit3ba47446f06d5b0fbeaeb59d4ffed768b70729d8 (patch)
tree28348bb3d18e9983e9212c26840f691766cad985 /crates/atuin-daemon/src/server.rs
parentfeat: Add history author/intent metadata and v1 record version (#3205) (diff)
downloadatuin-3ba47446f06d5b0fbeaeb59d4ffed768b70729d8.zip
feat: In-memory search index with atuin daemon (#3201)
## Summary This PR adds a persistent, in-memory search index to the Atuin daemon, enabling fast fuzzy search without the startup delay of building an index each time the TUI opens. ### Key Changes - **Daemon search service**: A new gRPC service that maintains a Nucleo fuzzy search index in memory - **Real-time index updates**: The daemon listens for history events (new commands, synced records) and updates the index immediately - **Filter mode support**: All existing filter modes work (Global, Host, Session, Directory, Workspace) - **New search engine**: `daemon-fuzzy` search mode that queries the daemon instead of building a local index - **Paged history loading**: Database pagination support for efficient initial index loading - **Configurable logging**: New `[logs]` settings section for daemon and search log configuration - **Component-based daemon architecture**: Refactored daemon internals into a modular, event-driven system - **Fallback to DB search for regex**: Since Nucleo doesn't support regex matching ## Daemon Architecture The daemon has been refactored to use a component-based, event-driven architecture that makes it easier to add new functionality and reason about the system. ### Core Concepts ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────┐ ┌──────────────────────────────────────────────────┐ │ │ │ Daemon │ │ Components │ │ │ │ Handle │────▶│ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ │ │ │ • emit() │ │ │ History │ │ Search │ │ Sync │ │ │ │ │ • subscribe │ │ │ Component │ │ Component │ │ Component │ │ │ │ │ • settings │ │ │ │ │ │ │ │ │ │ │ │ • databases │ │ │ gRPC service│ │ gRPC service│ │ background │ │ │ │ └─────────────┘ │ │ WIP history │ │ Nucleo index│ │ sync │ │ │ │ │ │ └─────────────┘ └─────────────┘ └────────────┘ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ ▲ │ │ ▼ │ │ │ ┌─────────────────────────────────────┴────────────────────────────────┐ │ │ │ Event Bus (broadcast) │ │ │ │ │ │ │ │ HistoryStarted │ HistoryEnded │ RecordsAdded │ SyncCompleted │ ... │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ ┌───────────────────────────────────┴──────────────────────────────────┐ │ │ │ Control Service (gRPC) │ │ │ │ External event injection from CLI commands │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### DaemonHandle A lightweight, cloneable handle that provides access to shared daemon resources: - **Event emission**: `handle.emit(DaemonEvent::...)` broadcasts to all components - **Event subscription**: `handle.subscribe()` returns a receiver for the event bus - **Settings**: `handle.settings()` for configuration access - **Databases**: `handle.history_db()` and `handle.store()` for data access ### Component Trait Components implement a simple lifecycle: ```rust #[async_trait] trait Component: Send + Sync { fn name(&self) -> &'static str; async fn start(&mut self, handle: DaemonHandle) -> Result<()>; async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; async fn stop(&mut self) -> Result<()>; } ``` ### Event-Driven Design Components communicate via events rather than direct coupling: | Event | Emitted By | Consumed By | |-------|-----------|-------------| | `HistoryStarted` | History gRPC | Search (logging) | | `HistoryEnded` | History gRPC | Search (index update) | | `RecordsAdded` | Sync | Search (index update) | | `HistoryPruned` | CLI (via Control) | Search (index rebuild) | | `HistoryDeleted` | CLI (via Control) | Search (index rebuild) | | `ForceSync` | CLI (via Control) | Sync | | `ShutdownRequested` | Signal handler | All (graceful shutdown) | ### External Event Injection CLI commands can inject events into a running daemon: ```rust // After `atuin history prune` emit_event(DaemonEvent::HistoryPruned).await?; // After deleting specific items emit_event(DaemonEvent::HistoryDeleted { ids }).await?; // Request immediate sync emit_event(DaemonEvent::ForceSync).await?; ``` This ensures the daemon's search index stays in sync with database changes made by CLI commands. ## Search Architecture The search service uses a [forked version of Nucleo](https://github.com/atuinsh/nucleo-ext) that adds filter and scorer callbacks, enabling efficient filtering and frecency-based ranking. ``` ┌────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │ │ Event System │───▶│ Search Component │ │ │ │ │ │ │ │ │ │ • RecordsAdded │ │ ┌────────────────────────────┐ │ │ │ │ • HistoryEnded │ │ │ Deduplicated Index │ │ │ │ │ • HistoryPruned │ │ │ │ │ │ │ └─────────────────┘ │ │ CommandData per command: │ │ │ │ │ │ • Global frecency │ │ │ │ ┌─────────────────┐ │ │ • Filter indexes (sets) │ │ │ │ │ Background Task │ │ │ • Invocation history │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ Rebuilds │ │ │ │ │ │ │ frecency map │ │ ▼ │ │ │ │ every 60s │───▶│ ┌────────────────────────────┐ │ │ │ └─────────────────┘ │ │ Nucleo (forked) │ │ │ │ │ │ │ │ │ │ │ │ • Filter callback │ │ │ │ │ │ • Scorer callback │ │ │ │ │ │ • Fuzzy matching │ │ │ │ │ └────────────────────────────┘ │ │ │ └──────────────────────────────────┘ │ │ │ │ │ │ gRPC (Unix socket) │ └──────────────────────────────────────│─────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Search TUI (Client) │ │ │ │ 1. Send query + filter mode + context to daemon │ │ 2. Receive matching history IDs (ranked by frecency) │ │ 3. Hydrate full records from local SQLite database │ │ 4. Display results in TUI │ └─────────────────────────────────────────────────────────────────┘ ``` ### Nucleo Fork The [nucleo-ext fork](https://github.com/atuinsh/nucleo-ext) adds two key features to Nucleo: 1. **Filter callback**: Pre-filter items before fuzzy matching (used for directory/host/session filtering) 2. **Scorer callback**: Compute custom scores after matching (used for frecency ranking) ```rust // Filter: only include commands run in current directory nucleo.set_filter(Some(Arc::new(|cmd: &String| { passing_commands.contains(cmd) }))); // Scorer: combine fuzzy score with frecency nucleo.set_scorer(Some(Arc::new(|cmd: &String, fuzzy_score: u32| { let frecency = frecency_map.get(cmd).unwrap_or(0); fuzzy_score + (frecency * 10) }))); ``` ### Deduplicated Index Commands are stored once per unique command text, with metadata tracking all invocations: ```rust struct CommandData { command: String, invocations: Vec<Invocation>, // All times this command was run global_frecency: FrecencyData, // Precomputed frecency score // O(1) filter indexes directories: HashSet<String>, // All cwds where command was run hosts: HashSet<String>, // All hostnames sessions: HashSet<String>, // All session IDs } ``` This deduplication means: - **Fewer items to match**: ~13K unique commands vs ~62K history entries - **O(1) filter checks**: HashSet lookups instead of scanning invocations - **Single frecency score**: Global frecency computed once, used for all filter modes ### Frecency Scoring Frecency (frequency + recency) scoring prioritizes recently and frequently used commands: ```rust fn compute_frecency(count: u32, last_used: i64, now: i64) -> u32 { let age_hours = (now - last_used) / 3600; // Recency: decays over time (half-life ~24 hours) let recency = (100.0 * (-age_hours as f64 / 24.0).exp()) as u32; // Frequency: logarithmic scaling let frequency = (count.ln() * 20.0).min(100.0) as u32; recency + frequency } ``` The frecency map is: - **Precomputed by background task** every 60 seconds - **Never computed inline** during search (no latency impact) - **Graceful fallback**: If unavailable, search works without frecency ranking ### Filter Mode Implementation | Filter Mode | Implementation | |-------------|----------------| | Global | No filter (all commands) | | Directory | `command.directories.contains(cwd)` | | Workspace | `command.directories.any(\|d\| d.starts_with(git_root))` | | Host | `command.hosts.contains(hostname)` | | Session | `command.sessions.contains(session_id)` | Filters are pre-computed into a HashSet before the search, making the filter callback O(1). ### Search Flow 1. **Daemon startup**: Loads history from SQLite in pages, builds deduplicated index 2. **Frecency precompute**: Background task builds frecency map after history loads 3. **Search request**: Client sends query with filter mode and context 4. **Filter**: Pre-computed HashSet determines which commands pass the filter 5. **Match**: Nucleo fuzzy matches the query against command text 6. **Score**: Frecency scorer ranks results (fuzzy score + frecency * 10) 7. **Response**: Returns history IDs for the most recent invocation of each matching command 8. **Hydration**: Client fetches full records from local SQLite ### Configuration ```toml # Enable daemon + autostart [daemon] enabled = true autostart = true # Enable daemon-based fuzzy search [search] search_mode = "daemon-fuzzy" ``` ## Performance Performance varies based on several factors, but in most initial testing with the new architecture shows improvement: * **Nucleo performs searches up to 4.5x faster**: direct DB search averages 18.07ms, but the daemon completes the same queries in 3.99ms. * **IPC overhead is significant, but acceptable**: a significant amount of wall-time is taken up by the transfer of data over IPC (via UDS in this case). This averages to about ~7.8ms and accounts for 66% of client-side wall time. * **Tail latency improves at every layer**: p99 times correspond to initial requests, worst-case query patterns, etc. but the average p99 daemon-based response time is 3.6x better than the associated DB-based search p99 time * **Query complexity no longer impacts performance**: the Nucleo-based search shows consistent 2-7ms times regardless of query pattern. The DB-based search had a 17x variance (3.59ms to 62.46ms). Interestingly, @ellie - who has a larger history store than I do - gets even better performance on the IPC layer. This could use a lot more testing in various edge cases and on various hardware, but seems promising. ### Regular DB search ``` Individual calls for: db_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 32.25ms 32.20ms 47.70µs {"mode":"Fuzzy","query":"^"} 2 19.48ms 19.40ms 84.20µs {"mode":"Fuzzy","query":"^c"} 3 20.40ms 20.10ms 297.00µs {"mode":"Fuzzy","query":"^ca"} 4 13.07ms 13.00ms 69.90µs {"mode":"Fuzzy","query":"^car"} 5 12.17ms 12.10ms 67.10µs {"mode":"Fuzzy","query":"^carg"} 6 20.78ms 20.70ms 76.60µs {"mode":"Fuzzy","query":"^cargo"} 7 9.15ms 9.10ms 53.20µs {"mode":"Fuzzy","query":"^cargo "} 8 10.24ms 10.00ms 237.00µs {"mode":"Fuzzy","query":"^cargo b"} 9 10.01ms 9.68ms 325.00µs {"mode":"Fuzzy","query":"^cargo bu"} 10 5.89ms 5.83ms 57.20µs {"mode":"Fuzzy","query":"^cargo bui"} 11 8.85ms 8.28ms 568.00µs {"mode":"Fuzzy","query":"^cargo buil"} 12 7.70ms 7.49ms 212.00µs {"mode":"Fuzzy","query":"^cargo build"} 13 3.59ms 3.53ms 57.00µs {"mode":"Fuzzy","query":"^cargo build$"} 14 6.50ms 6.44ms 63.60µs {"mode":"Fuzzy","query":"^cargo "} 15 6.48ms 6.38ms 100.00µs {"mode":"Fuzzy","query":"!"} 16 31.68ms 31.60ms 75.90µs {"mode":"Fuzzy","query":"!g"} 17 62.46ms 62.40ms 58.90µs {"mode":"Fuzzy","query":"!gi"} 18 30.35ms 30.30ms 46.90µs {"mode":"Fuzzy","query":"!git"} 19 53.84ms 53.80ms 40.80µs {"mode":"Fuzzy","query":"!git "} 20 19.24ms 19.20ms 39.70µs {"mode":"Fuzzy","query":"!git c"} 21 22.03ms 22.00ms 34.70µs {"mode":"Fuzzy","query":"!git co"} 22 17.13ms 17.00ms 133.00µs {"mode":"Fuzzy","query":"!git com"} 23 16.14ms 15.90ms 242.00µs {"mode":"Fuzzy","query":"!git comm"} 24 5.11ms 5.08ms 28.60µs {"mode":"Fuzzy","query":"!git commi"} 25 7.31ms 7.26ms 52.70µs {"mode":"Fuzzy","query":"!git commit"} Summary: 25 calls Wall: avg=18.07ms, min=3.59ms, max=62.46ms, p50=13.07ms, p99=62.46ms Busy: avg=17.95ms, min=3.53ms, max=62.40ms, p50=13.00ms, p99=62.40ms ``` ### Daemon-based search **Client** ``` Individual calls for: daemon_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 13.05ms 2.55ms 10.50ms {"query":"^"} 2 10.65ms 1.40ms 9.25ms {"query":"^c"} 3 10.72ms 1.18ms 9.54ms {"query":"^ca"} 4 5.54ms 485.00µs 5.06ms {"query":"^car"} 5 15.02ms 1.02ms 14.00ms {"query":"^carg"} 6 9.49ms 840.00µs 8.65ms {"query":"^cargo"} 7 5.53ms 555.00µs 4.97ms {"query":"^cargo "} 8 8.56ms 717.00µs 7.84ms {"query":"^cargo b"} 9 12.34ms 1.24ms 11.10ms {"query":"^cargo bu"} 10 8.38ms 650.00µs 7.73ms {"query":"^cargo bui"} 11 13.07ms 770.00µs 12.30ms {"query":"^cargo buil"} 12 17.11ms 709.00µs 16.40ms {"query":"^cargo build"} 13 15.41ms 907.00µs 14.50ms {"query":"^cargo build$"} 14 8.19ms 665.00µs 7.52ms {"query":"^cargo "} 15 7.98ms 1.72ms 6.26ms {"query":"!"} 16 13.56ms 856.00µs 12.70ms {"query":"!g"} 17 8.11ms 624.00µs 7.49ms {"query":"!gi"} 18 14.57ms 775.00µs 13.80ms {"query":"!git"} 19 14.18ms 779.00µs 13.40ms {"query":"!git "} 20 9.62ms 802.00µs 8.82ms {"query":"!git c"} 21 15.50ms 1.50ms 14.00ms {"query":"!git co"} 22 11.58ms 1.48ms 10.10ms {"query":"!git com"} 23 13.82ms 2.12ms 11.70ms {"query":"!git comm"} 24 17.48ms 2.18ms 15.30ms {"query":"!git commi"} 25 14.81ms 1.71ms 13.10ms {"query":"!git commit"} Summary: 25 calls Wall: avg=11.77ms, min=5.53ms, max=17.48ms, p50=12.34ms, p99=17.48ms Busy: avg=1.13ms, min=485.00µs, max=2.55ms, p50=856.00µs, p99=2.55ms ``` **Daemon** ``` Individual calls for: daemon_search_query -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.75ms 250ns 1.75ms {"query":"^","query_id":1} 2 4.58ms 125ns 4.58ms {"query":"^c","query_id":2} 3 4.39ms 250ns 4.39ms {"query":"^ca","query_id":3} 4 2.52ms 125ns 2.52ms {"query":"^car","query_id":4} 5 4.44ms 250ns 4.44ms {"query":"^carg","query_id":5} 6 3.66ms 167ns 3.66ms {"query":"^cargo","query_id":6} 7 2.38ms 84ns 2.38ms {"query":"^cargo ","query_id":7} 8 4.13ms 84ns 4.13ms {"query":"^cargo b","query_id":8} 9 4.40ms 167ns 4.40ms {"query":"^cargo bu","query_id":9} 10 3.87ms 125ns 3.87ms {"query":"^cargo bui","query_id":10} 11 4.36ms 84ns 4.36ms {"query":"^cargo buil","query_id":11} 12 3.96ms 333ns 3.96ms {"query":"^cargo build","query_id":12} 13 4.61ms 167ns 4.61ms {"query":"^cargo build$","query_id":13} 14 4.20ms 209ns 4.20ms {"query":"^cargo ","query_id":14} 15 238.17µs 167ns 238.00µs {"query":"!","query_id":15} 16 4.44ms 125ns 4.44ms {"query":"!g","query_id":16} 17 3.47ms 83ns 3.47ms {"query":"!gi","query_id":17} 18 4.57ms 125ns 4.57ms {"query":"!git","query_id":18} 19 7.15ms 167ns 7.15ms {"query":"!git ","query_id":19} 20 4.27ms 250ns 4.27ms {"query":"!git c","query_id":20} 21 5.19ms 292ns 5.19ms {"query":"!git co","query_id":21} 22 4.29ms 417ns 4.29ms {"query":"!git com","query_id":22} 23 4.08ms 125ns 4.08ms {"query":"!git comm","query_id":23} 24 4.50ms 167ns 4.50ms {"query":"!git commi","query_id":24} 25 4.35ms 208ns 4.35ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.99ms, min=238.17µs, max=7.15ms, p50=4.29ms, p99=7.15ms Busy: avg=182ns, min=83ns, max=417ns, p50=167ns, p99=417ns ``` **Nucleo matching time (in daemon)** ``` Individual calls for: nucleo_match -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.73ms 125ns 1.73ms {"query":"^","query_id":1} 2 4.57ms 167ns 4.57ms {"query":"^c","query_id":2} 3 4.37ms 125ns 4.37ms {"query":"^ca","query_id":3} 4 2.51ms 84ns 2.51ms {"query":"^car","query_id":4} 5 4.43ms 125ns 4.43ms {"query":"^carg","query_id":5} 6 3.64ms 125ns 3.64ms {"query":"^cargo","query_id":6} 7 2.37ms 84ns 2.37ms {"query":"^cargo ","query_id":7} 8 4.11ms 125ns 4.11ms {"query":"^cargo b","query_id":8} 9 4.36ms 208ns 4.36ms {"query":"^cargo bu","query_id":9} 10 3.85ms 125ns 3.85ms {"query":"^cargo bui","query_id":10} 11 4.35ms 125ns 4.35ms {"query":"^cargo buil","query_id":11} 12 3.94ms 250ns 3.94ms {"query":"^cargo build","query_id":12} 13 4.59ms 125ns 4.59ms {"query":"^cargo build$","query_id":13} 14 4.18ms 84ns 4.18ms {"query":"^cargo ","query_id":14} 15 220.13µs 125ns 220.00µs {"query":"!","query_id":15} 16 4.43ms 125ns 4.43ms {"query":"!g","query_id":16} 17 3.45ms 125ns 3.45ms {"query":"!gi","query_id":17} 18 4.55ms 125ns 4.55ms {"query":"!git","query_id":18} 19 7.12ms 209ns 7.12ms {"query":"!git ","query_id":19} 20 4.25ms 166ns 4.25ms {"query":"!git c","query_id":20} 21 5.18ms 125ns 5.18ms {"query":"!git co","query_id":21} 22 4.27ms 125ns 4.27ms {"query":"!git com","query_id":22} 23 4.06ms 292ns 4.06ms {"query":"!git comm","query_id":23} 24 4.46ms 166ns 4.46ms {"query":"!git commi","query_id":24} 25 4.31ms 208ns 4.31ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.97ms, min=220.13µs, max=7.12ms, p50=4.27ms, p99=7.12ms Busy: avg=147ns, min=84ns, max=292ns, p50=125ns, p99=292ns ```
Diffstat (limited to 'crates/atuin-daemon/src/server.rs')
-rw-r--r--crates/atuin-daemon/src/server.rs360
1 files changed, 94 insertions, 266 deletions
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
index 826d6191..a11de612 100644
--- a/crates/atuin-daemon/src/server.rs
+++ b/crates/atuin-daemon/src/server.rs
@@ -1,249 +1,49 @@
-use eyre::WrapErr;
-
-use atuin_client::encryption;
-use atuin_client::history::store::HistoryStore;
-use atuin_client::record::sqlite_store::SqliteStore;
-use atuin_client::settings::Settings;
-use std::io::ErrorKind;
-#[cfg(unix)]
-use std::path::PathBuf;
-use std::sync::Arc;
-use time::OffsetDateTime;
-use tokio::sync::watch;
-use tracing::{Level, instrument};
-
-use atuin_client::database::{Database, Sqlite as HistoryDatabase};
-use atuin_client::history::{History, HistoryId};
-use dashmap::DashMap;
use eyre::Result;
-use tonic::{Request, Response, Status, transport::Server};
-
-use crate::history::history_server::{History as HistorySvc, HistoryServer};
-
-use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest};
-use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest};
-
-mod sync;
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-#[derive(Debug)]
-pub struct HistoryService {
- // A store for WIP history
- // This is history that has not yet been completed, aka a command that's current running.
- running: Arc<DashMap<HistoryId, History>>,
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
-}
-
-impl HistoryService {
- pub fn new(
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
- ) -> Self {
- Self {
- running: Arc::new(DashMap::new()),
- store,
- history_db,
- shutdown_tx,
- }
- }
-}
-
-#[tonic::async_trait()]
-impl HistorySvc for HistoryService {
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let mut h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .build()
- .into();
- if !req.author.trim().is_empty() {
- h.author = req.author;
- }
- if !req.intent.trim().is_empty() {
- h.intent = Some(req.intent);
- }
-
- // The old behaviour had us inserting half-finished history records into the database
- // The new behaviour no longer allows that.
- // History that's running is stored in-memory by the daemon, and only committed when
- // complete.
- // If anyone relied on the old behaviour, we could perhaps insert to the history db here
- // too. I'd rather keep it pure, unless that ends up being the case.
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Perhaps allow the incremental build to handle this entirely.
- self.history_db
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- let (id, idx) =
- self.store.push(history).await.map_err(|e| {
- Status::internal(format!("failed to push record to store: {e:?}"))
- })?;
-
- let reply = EndHistoryReply {
- id: id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- // If status RPC responds, the daemon control plane is healthy.
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- let _ = self.shutdown_tx.send(true);
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}
-
-#[cfg(unix)]
-async fn shutdown_signal(socket: Option<PathBuf>, mut shutdown_rx: watch::Receiver<bool>) {
- let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .expect("failed to register sigterm handler");
- let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
- .expect("failed to register sigint handler");
+use crate::components::history::HistoryGrpcService;
+use crate::components::search::SearchGrpcService;
+use crate::control::{ControlService, control_server::ControlServer};
+use crate::daemon::DaemonHandle;
+use crate::history::history_server::HistoryServer;
+use crate::search::search_server::SearchServer;
- tokio::select! {
- _ = term.recv() => {},
- _ = int.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
-
- eprintln!("Removing socket...");
- if let Some(socket) = socket {
- match std::fs::remove_file(socket) {
- Ok(()) => {}
- Err(err) if err.kind() == ErrorKind::NotFound => {}
- Err(err) => {
- eprintln!("failed to remove socket: {err}");
- }
- }
- }
- eprintln!("Shutting down...");
-}
-
-#[cfg(windows)]
-async fn shutdown_signal(mut shutdown_rx: watch::Receiver<bool>) {
- let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler");
- tokio::select! {
- _ = ctrl_c.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
- eprintln!("Shutting down...");
-}
+use atuin_client::settings::Settings;
+/// Run the gRPC server with the given services.
+///
+/// This starts the gRPC server in the background and returns immediately.
+/// The server will shut down when a ShutdownRequested event is received.
#[cfg(unix)]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
- let socket_path = settings.daemon.socket_path;
+ let socket_path = settings.daemon.socket_path.clone();
let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
#[cfg(target_os = "linux")]
{
- use eyre::OptionExt;
+ use eyre::{OptionExt, WrapErr};
+ use std::os::unix::net::SocketAddr;
+ use std::path::PathBuf;
tracing::info!("getting systemd socket");
let listener = listenfd::ListenFd::from_env()
.take_unix_listener(0)?
.ok_or_eyre("missing systemd socket")?;
listener.set_nonblocking(true)?;
- let actual_path = listener
+ let actual_path: Result<PathBuf, eyre::Report> = listener
.local_addr()
.context("getting systemd socket's path")
- .and_then(|addr| {
+ .and_then(|addr: SocketAddr| {
addr.as_pathname()
.ok_or_eyre("systemd socket missing path")
- .map(|path| path.to_owned())
+ .map(|path: &std::path::Path| path.to_owned())
});
match actual_path {
Ok(actual_path) => {
@@ -271,66 +71,94 @@ async fn start_server(
let uds_stream = UnixListenerStream::new(uds);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(
- uds_stream,
- shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx),
- )
- .await?;
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ let mut rx = handle.subscribe();
+ loop {
+ use crate::DaemonEvent;
+
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ if cleanup {
+ eprintln!("Removing socket...");
+ if let Err(e) = std::fs::remove_file(&socket_path)
+ && e.kind() != std::io::ErrorKind::NotFound
+ {
+ eprintln!("failed to remove socket: {e}");
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
+
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ use tonic::transport::Server;
+
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
Ok(())
}
+/// Run the gRPC server with the given services (Windows/TCP version).
#[cfg(not(unix))]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
+ use tonic::transport::Server;
let port = settings.daemon.tcp_port;
let url = format!("127.0.0.1:{port}");
- let tcp = TcpListener::bind(url).await?;
+ let tcp = TcpListener::bind(&url).await?;
let tcp_stream = TcpListenerStream::new(tcp);
tracing::info!("listening on tcp port {:?}", port);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx))
- .await?;
- Ok(())
-}
-
-// break the above down when we end up with multiple services
-
-/// Listen on a unix socket
-/// Pass the path to the socket
-pub async fn listen(
- settings: Settings,
- store: SqliteStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- let encryption_key: [u8; 32] = encryption::load_key(&settings)
- .context("could not load encryption key")?
- .into();
-
- let host_id = Settings::host_id().await?;
- let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ use crate::DaemonEvent;
- let (shutdown_tx, shutdown_rx) = watch::channel(false);
- let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx);
+ let mut rx = handle.subscribe();
+ loop {
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
- // start services
- tokio::spawn(sync::worker(
- settings.clone(),
- store,
- history_store,
- history_db,
- ));
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(tcp_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
- start_server(settings, history, shutdown_rx).await
+ Ok(())
}