From 3ba47446f06d5b0fbeaeb59d4ffed768b70729d8 Mon Sep 17 00:00:00 2001 From: Michelle Tilley Date: Thu, 26 Feb 2026 14:42:08 -0800 Subject: feat: In-memory search index with atuin daemon (#3201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary This PR adds a persistent, in-memory search index to the Atuin daemon, enabling fast fuzzy search without the startup delay of building an index each time the TUI opens. ### Key Changes - **Daemon search service**: A new gRPC service that maintains a Nucleo fuzzy search index in memory - **Real-time index updates**: The daemon listens for history events (new commands, synced records) and updates the index immediately - **Filter mode support**: All existing filter modes work (Global, Host, Session, Directory, Workspace) - **New search engine**: `daemon-fuzzy` search mode that queries the daemon instead of building a local index - **Paged history loading**: Database pagination support for efficient initial index loading - **Configurable logging**: New `[logs]` settings section for daemon and search log configuration - **Component-based daemon architecture**: Refactored daemon internals into a modular, event-driven system - **Fallback to DB search for regex**: Since Nucleo doesn't support regex matching ## Daemon Architecture The daemon has been refactored to use a component-based, event-driven architecture that makes it easier to add new functionality and reason about the system. ### Core Concepts ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────┐ ┌──────────────────────────────────────────────────┐ │ │ │ Daemon │ │ Components │ │ │ │ Handle │────▶│ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ │ │ │ • emit() │ │ │ History │ │ Search │ │ Sync │ │ │ │ │ • subscribe │ │ │ Component │ │ Component │ │ Component │ │ │ │ │ • settings │ │ │ │ │ │ │ │ │ │ │ │ • databases │ │ │ gRPC service│ │ gRPC service│ │ background │ │ │ │ └─────────────┘ │ │ WIP history │ │ Nucleo index│ │ sync │ │ │ │ │ │ └─────────────┘ └─────────────┘ └────────────┘ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ ▲ │ │ ▼ │ │ │ ┌─────────────────────────────────────┴────────────────────────────────┐ │ │ │ Event Bus (broadcast) │ │ │ │ │ │ │ │ HistoryStarted │ HistoryEnded │ RecordsAdded │ SyncCompleted │ ... │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ ┌───────────────────────────────────┴──────────────────────────────────┐ │ │ │ Control Service (gRPC) │ │ │ │ External event injection from CLI commands │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### DaemonHandle A lightweight, cloneable handle that provides access to shared daemon resources: - **Event emission**: `handle.emit(DaemonEvent::...)` broadcasts to all components - **Event subscription**: `handle.subscribe()` returns a receiver for the event bus - **Settings**: `handle.settings()` for configuration access - **Databases**: `handle.history_db()` and `handle.store()` for data access ### Component Trait Components implement a simple lifecycle: ```rust #[async_trait] trait Component: Send + Sync { fn name(&self) -> &'static str; async fn start(&mut self, handle: DaemonHandle) -> Result<()>; async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; async fn stop(&mut self) -> Result<()>; } ``` ### Event-Driven Design Components communicate via events rather than direct coupling: | Event | Emitted By | Consumed By | |-------|-----------|-------------| | `HistoryStarted` | History gRPC | Search (logging) | | `HistoryEnded` | History gRPC | Search (index update) | | `RecordsAdded` | Sync | Search (index update) | | `HistoryPruned` | CLI (via Control) | Search (index rebuild) | | `HistoryDeleted` | CLI (via Control) | Search (index rebuild) | | `ForceSync` | CLI (via Control) | Sync | | `ShutdownRequested` | Signal handler | All (graceful shutdown) | ### External Event Injection CLI commands can inject events into a running daemon: ```rust // After `atuin history prune` emit_event(DaemonEvent::HistoryPruned).await?; // After deleting specific items emit_event(DaemonEvent::HistoryDeleted { ids }).await?; // Request immediate sync emit_event(DaemonEvent::ForceSync).await?; ``` This ensures the daemon's search index stays in sync with database changes made by CLI commands. ## Search Architecture The search service uses a [forked version of Nucleo](https://github.com/atuinsh/nucleo-ext) that adds filter and scorer callbacks, enabling efficient filtering and frecency-based ranking. ``` ┌────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │ │ Event System │───▶│ Search Component │ │ │ │ │ │ │ │ │ │ • RecordsAdded │ │ ┌────────────────────────────┐ │ │ │ │ • HistoryEnded │ │ │ Deduplicated Index │ │ │ │ │ • HistoryPruned │ │ │ │ │ │ │ └─────────────────┘ │ │ CommandData per command: │ │ │ │ │ │ • Global frecency │ │ │ │ ┌─────────────────┐ │ │ • Filter indexes (sets) │ │ │ │ │ Background Task │ │ │ • Invocation history │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ Rebuilds │ │ │ │ │ │ │ frecency map │ │ ▼ │ │ │ │ every 60s │───▶│ ┌────────────────────────────┐ │ │ │ └─────────────────┘ │ │ Nucleo (forked) │ │ │ │ │ │ │ │ │ │ │ │ • Filter callback │ │ │ │ │ │ • Scorer callback │ │ │ │ │ │ • Fuzzy matching │ │ │ │ │ └────────────────────────────┘ │ │ │ └──────────────────────────────────┘ │ │ │ │ │ │ gRPC (Unix socket) │ └──────────────────────────────────────│─────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Search TUI (Client) │ │ │ │ 1. Send query + filter mode + context to daemon │ │ 2. Receive matching history IDs (ranked by frecency) │ │ 3. Hydrate full records from local SQLite database │ │ 4. Display results in TUI │ └─────────────────────────────────────────────────────────────────┘ ``` ### Nucleo Fork The [nucleo-ext fork](https://github.com/atuinsh/nucleo-ext) adds two key features to Nucleo: 1. **Filter callback**: Pre-filter items before fuzzy matching (used for directory/host/session filtering) 2. **Scorer callback**: Compute custom scores after matching (used for frecency ranking) ```rust // Filter: only include commands run in current directory nucleo.set_filter(Some(Arc::new(|cmd: &String| { passing_commands.contains(cmd) }))); // Scorer: combine fuzzy score with frecency nucleo.set_scorer(Some(Arc::new(|cmd: &String, fuzzy_score: u32| { let frecency = frecency_map.get(cmd).unwrap_or(0); fuzzy_score + (frecency * 10) }))); ``` ### Deduplicated Index Commands are stored once per unique command text, with metadata tracking all invocations: ```rust struct CommandData { command: String, invocations: Vec, // All times this command was run global_frecency: FrecencyData, // Precomputed frecency score // O(1) filter indexes directories: HashSet, // All cwds where command was run hosts: HashSet, // All hostnames sessions: HashSet, // All session IDs } ``` This deduplication means: - **Fewer items to match**: ~13K unique commands vs ~62K history entries - **O(1) filter checks**: HashSet lookups instead of scanning invocations - **Single frecency score**: Global frecency computed once, used for all filter modes ### Frecency Scoring Frecency (frequency + recency) scoring prioritizes recently and frequently used commands: ```rust fn compute_frecency(count: u32, last_used: i64, now: i64) -> u32 { let age_hours = (now - last_used) / 3600; // Recency: decays over time (half-life ~24 hours) let recency = (100.0 * (-age_hours as f64 / 24.0).exp()) as u32; // Frequency: logarithmic scaling let frequency = (count.ln() * 20.0).min(100.0) as u32; recency + frequency } ``` The frecency map is: - **Precomputed by background task** every 60 seconds - **Never computed inline** during search (no latency impact) - **Graceful fallback**: If unavailable, search works without frecency ranking ### Filter Mode Implementation | Filter Mode | Implementation | |-------------|----------------| | Global | No filter (all commands) | | Directory | `command.directories.contains(cwd)` | | Workspace | `command.directories.any(\|d\| d.starts_with(git_root))` | | Host | `command.hosts.contains(hostname)` | | Session | `command.sessions.contains(session_id)` | Filters are pre-computed into a HashSet before the search, making the filter callback O(1). ### Search Flow 1. **Daemon startup**: Loads history from SQLite in pages, builds deduplicated index 2. **Frecency precompute**: Background task builds frecency map after history loads 3. **Search request**: Client sends query with filter mode and context 4. **Filter**: Pre-computed HashSet determines which commands pass the filter 5. **Match**: Nucleo fuzzy matches the query against command text 6. **Score**: Frecency scorer ranks results (fuzzy score + frecency * 10) 7. **Response**: Returns history IDs for the most recent invocation of each matching command 8. **Hydration**: Client fetches full records from local SQLite ### Configuration ```toml # Enable daemon + autostart [daemon] enabled = true autostart = true # Enable daemon-based fuzzy search [search] search_mode = "daemon-fuzzy" ``` ## Performance Performance varies based on several factors, but in most initial testing with the new architecture shows improvement: * **Nucleo performs searches up to 4.5x faster**: direct DB search averages 18.07ms, but the daemon completes the same queries in 3.99ms. * **IPC overhead is significant, but acceptable**: a significant amount of wall-time is taken up by the transfer of data over IPC (via UDS in this case). This averages to about ~7.8ms and accounts for 66% of client-side wall time. * **Tail latency improves at every layer**: p99 times correspond to initial requests, worst-case query patterns, etc. but the average p99 daemon-based response time is 3.6x better than the associated DB-based search p99 time * **Query complexity no longer impacts performance**: the Nucleo-based search shows consistent 2-7ms times regardless of query pattern. The DB-based search had a 17x variance (3.59ms to 62.46ms). Interestingly, @ellie - who has a larger history store than I do - gets even better performance on the IPC layer. This could use a lot more testing in various edge cases and on various hardware, but seems promising. ### Regular DB search ``` Individual calls for: db_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 32.25ms 32.20ms 47.70µs {"mode":"Fuzzy","query":"^"} 2 19.48ms 19.40ms 84.20µs {"mode":"Fuzzy","query":"^c"} 3 20.40ms 20.10ms 297.00µs {"mode":"Fuzzy","query":"^ca"} 4 13.07ms 13.00ms 69.90µs {"mode":"Fuzzy","query":"^car"} 5 12.17ms 12.10ms 67.10µs {"mode":"Fuzzy","query":"^carg"} 6 20.78ms 20.70ms 76.60µs {"mode":"Fuzzy","query":"^cargo"} 7 9.15ms 9.10ms 53.20µs {"mode":"Fuzzy","query":"^cargo "} 8 10.24ms 10.00ms 237.00µs {"mode":"Fuzzy","query":"^cargo b"} 9 10.01ms 9.68ms 325.00µs {"mode":"Fuzzy","query":"^cargo bu"} 10 5.89ms 5.83ms 57.20µs {"mode":"Fuzzy","query":"^cargo bui"} 11 8.85ms 8.28ms 568.00µs {"mode":"Fuzzy","query":"^cargo buil"} 12 7.70ms 7.49ms 212.00µs {"mode":"Fuzzy","query":"^cargo build"} 13 3.59ms 3.53ms 57.00µs {"mode":"Fuzzy","query":"^cargo build$"} 14 6.50ms 6.44ms 63.60µs {"mode":"Fuzzy","query":"^cargo "} 15 6.48ms 6.38ms 100.00µs {"mode":"Fuzzy","query":"!"} 16 31.68ms 31.60ms 75.90µs {"mode":"Fuzzy","query":"!g"} 17 62.46ms 62.40ms 58.90µs {"mode":"Fuzzy","query":"!gi"} 18 30.35ms 30.30ms 46.90µs {"mode":"Fuzzy","query":"!git"} 19 53.84ms 53.80ms 40.80µs {"mode":"Fuzzy","query":"!git "} 20 19.24ms 19.20ms 39.70µs {"mode":"Fuzzy","query":"!git c"} 21 22.03ms 22.00ms 34.70µs {"mode":"Fuzzy","query":"!git co"} 22 17.13ms 17.00ms 133.00µs {"mode":"Fuzzy","query":"!git com"} 23 16.14ms 15.90ms 242.00µs {"mode":"Fuzzy","query":"!git comm"} 24 5.11ms 5.08ms 28.60µs {"mode":"Fuzzy","query":"!git commi"} 25 7.31ms 7.26ms 52.70µs {"mode":"Fuzzy","query":"!git commit"} Summary: 25 calls Wall: avg=18.07ms, min=3.59ms, max=62.46ms, p50=13.07ms, p99=62.46ms Busy: avg=17.95ms, min=3.53ms, max=62.40ms, p50=13.00ms, p99=62.40ms ``` ### Daemon-based search **Client** ``` Individual calls for: daemon_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 13.05ms 2.55ms 10.50ms {"query":"^"} 2 10.65ms 1.40ms 9.25ms {"query":"^c"} 3 10.72ms 1.18ms 9.54ms {"query":"^ca"} 4 5.54ms 485.00µs 5.06ms {"query":"^car"} 5 15.02ms 1.02ms 14.00ms {"query":"^carg"} 6 9.49ms 840.00µs 8.65ms {"query":"^cargo"} 7 5.53ms 555.00µs 4.97ms {"query":"^cargo "} 8 8.56ms 717.00µs 7.84ms {"query":"^cargo b"} 9 12.34ms 1.24ms 11.10ms {"query":"^cargo bu"} 10 8.38ms 650.00µs 7.73ms {"query":"^cargo bui"} 11 13.07ms 770.00µs 12.30ms {"query":"^cargo buil"} 12 17.11ms 709.00µs 16.40ms {"query":"^cargo build"} 13 15.41ms 907.00µs 14.50ms {"query":"^cargo build$"} 14 8.19ms 665.00µs 7.52ms {"query":"^cargo "} 15 7.98ms 1.72ms 6.26ms {"query":"!"} 16 13.56ms 856.00µs 12.70ms {"query":"!g"} 17 8.11ms 624.00µs 7.49ms {"query":"!gi"} 18 14.57ms 775.00µs 13.80ms {"query":"!git"} 19 14.18ms 779.00µs 13.40ms {"query":"!git "} 20 9.62ms 802.00µs 8.82ms {"query":"!git c"} 21 15.50ms 1.50ms 14.00ms {"query":"!git co"} 22 11.58ms 1.48ms 10.10ms {"query":"!git com"} 23 13.82ms 2.12ms 11.70ms {"query":"!git comm"} 24 17.48ms 2.18ms 15.30ms {"query":"!git commi"} 25 14.81ms 1.71ms 13.10ms {"query":"!git commit"} Summary: 25 calls Wall: avg=11.77ms, min=5.53ms, max=17.48ms, p50=12.34ms, p99=17.48ms Busy: avg=1.13ms, min=485.00µs, max=2.55ms, p50=856.00µs, p99=2.55ms ``` **Daemon** ``` Individual calls for: daemon_search_query -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.75ms 250ns 1.75ms {"query":"^","query_id":1} 2 4.58ms 125ns 4.58ms {"query":"^c","query_id":2} 3 4.39ms 250ns 4.39ms {"query":"^ca","query_id":3} 4 2.52ms 125ns 2.52ms {"query":"^car","query_id":4} 5 4.44ms 250ns 4.44ms {"query":"^carg","query_id":5} 6 3.66ms 167ns 3.66ms {"query":"^cargo","query_id":6} 7 2.38ms 84ns 2.38ms {"query":"^cargo ","query_id":7} 8 4.13ms 84ns 4.13ms {"query":"^cargo b","query_id":8} 9 4.40ms 167ns 4.40ms {"query":"^cargo bu","query_id":9} 10 3.87ms 125ns 3.87ms {"query":"^cargo bui","query_id":10} 11 4.36ms 84ns 4.36ms {"query":"^cargo buil","query_id":11} 12 3.96ms 333ns 3.96ms {"query":"^cargo build","query_id":12} 13 4.61ms 167ns 4.61ms {"query":"^cargo build$","query_id":13} 14 4.20ms 209ns 4.20ms {"query":"^cargo ","query_id":14} 15 238.17µs 167ns 238.00µs {"query":"!","query_id":15} 16 4.44ms 125ns 4.44ms {"query":"!g","query_id":16} 17 3.47ms 83ns 3.47ms {"query":"!gi","query_id":17} 18 4.57ms 125ns 4.57ms {"query":"!git","query_id":18} 19 7.15ms 167ns 7.15ms {"query":"!git ","query_id":19} 20 4.27ms 250ns 4.27ms {"query":"!git c","query_id":20} 21 5.19ms 292ns 5.19ms {"query":"!git co","query_id":21} 22 4.29ms 417ns 4.29ms {"query":"!git com","query_id":22} 23 4.08ms 125ns 4.08ms {"query":"!git comm","query_id":23} 24 4.50ms 167ns 4.50ms {"query":"!git commi","query_id":24} 25 4.35ms 208ns 4.35ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.99ms, min=238.17µs, max=7.15ms, p50=4.29ms, p99=7.15ms Busy: avg=182ns, min=83ns, max=417ns, p50=167ns, p99=417ns ``` **Nucleo matching time (in daemon)** ``` Individual calls for: nucleo_match -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.73ms 125ns 1.73ms {"query":"^","query_id":1} 2 4.57ms 167ns 4.57ms {"query":"^c","query_id":2} 3 4.37ms 125ns 4.37ms {"query":"^ca","query_id":3} 4 2.51ms 84ns 2.51ms {"query":"^car","query_id":4} 5 4.43ms 125ns 4.43ms {"query":"^carg","query_id":5} 6 3.64ms 125ns 3.64ms {"query":"^cargo","query_id":6} 7 2.37ms 84ns 2.37ms {"query":"^cargo ","query_id":7} 8 4.11ms 125ns 4.11ms {"query":"^cargo b","query_id":8} 9 4.36ms 208ns 4.36ms {"query":"^cargo bu","query_id":9} 10 3.85ms 125ns 3.85ms {"query":"^cargo bui","query_id":10} 11 4.35ms 125ns 4.35ms {"query":"^cargo buil","query_id":11} 12 3.94ms 250ns 3.94ms {"query":"^cargo build","query_id":12} 13 4.59ms 125ns 4.59ms {"query":"^cargo build$","query_id":13} 14 4.18ms 84ns 4.18ms {"query":"^cargo ","query_id":14} 15 220.13µs 125ns 220.00µs {"query":"!","query_id":15} 16 4.43ms 125ns 4.43ms {"query":"!g","query_id":16} 17 3.45ms 125ns 3.45ms {"query":"!gi","query_id":17} 18 4.55ms 125ns 4.55ms {"query":"!git","query_id":18} 19 7.12ms 209ns 7.12ms {"query":"!git ","query_id":19} 20 4.25ms 166ns 4.25ms {"query":"!git c","query_id":20} 21 5.18ms 125ns 5.18ms {"query":"!git co","query_id":21} 22 4.27ms 125ns 4.27ms {"query":"!git com","query_id":22} 23 4.06ms 292ns 4.06ms {"query":"!git comm","query_id":23} 24 4.46ms 166ns 4.46ms {"query":"!git commi","query_id":24} 25 4.31ms 208ns 4.31ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.97ms, min=220.13µs, max=7.12ms, p50=4.27ms, p99=7.12ms Busy: avg=147ns, min=84ns, max=292ns, p50=125ns, p99=292ns ``` --- CONTRIBUTING.md | 45 ++ Cargo.lock | 35 ++ Cargo.toml | 8 +- crates/atuin-ai/src/commands.rs | 103 +++- crates/atuin-ai/src/commands/inline.rs | 40 +- crates/atuin-client/src/database.rs | 192 +++++++ crates/atuin-client/src/hub.rs | 20 +- crates/atuin-client/src/settings.rs | 221 +++++++- crates/atuin-common/src/utils.rs | 4 + crates/atuin-daemon/Cargo.toml | 10 +- crates/atuin-daemon/build.rs | 6 +- crates/atuin-daemon/proto/control.proto | 62 +++ crates/atuin-daemon/proto/search.proto | 35 ++ crates/atuin-daemon/src/client.rs | 289 ++++++++++- crates/atuin-daemon/src/components/history.rs | 252 +++++++++ crates/atuin-daemon/src/components/mod.rs | 22 + crates/atuin-daemon/src/components/search.rs | 394 ++++++++++++++ crates/atuin-daemon/src/components/sync.rs | 257 +++++++++ crates/atuin-daemon/src/control/mod.rs | 12 + crates/atuin-daemon/src/control/service.rs | 71 +++ crates/atuin-daemon/src/daemon.rs | 450 ++++++++++++++++ crates/atuin-daemon/src/events.rs | 74 +++ crates/atuin-daemon/src/history.rs | 1 - crates/atuin-daemon/src/history/mod.rs | 6 + crates/atuin-daemon/src/lib.rs | 107 ++++ crates/atuin-daemon/src/search/index.rs | 572 +++++++++++++++++++++ crates/atuin-daemon/src/search/mod.rs | 11 + crates/atuin-daemon/src/server.rs | 360 ++++--------- crates/atuin-daemon/src/server/sync.rs | 96 ---- crates/atuin-daemon/tests/lifecycle.rs | 89 +++- crates/atuin/Cargo.toml | 2 + crates/atuin/src/command/client.rs | 185 ++++++- crates/atuin/src/command/client/daemon.rs | 103 +++- crates/atuin/src/command/client/history.rs | 12 + crates/atuin/src/command/client/search.rs | 5 + crates/atuin/src/command/client/search/engines.rs | 12 +- .../src/command/client/search/engines/daemon.rs | 206 ++++++++ .../atuin/src/command/client/search/engines/db.rs | 11 +- .../src/command/client/search/engines/skim.rs | 17 +- .../atuin/src/command/client/search/interactive.rs | 22 +- crates/atuin/src/command/client/store/rebuild.rs | 6 + docs/docs/configuration/config.md | 57 ++ scripts/span-table.ts | 420 +++++++++++++++ 43 files changed, 4439 insertions(+), 463 deletions(-) create mode 100644 crates/atuin-daemon/proto/control.proto create mode 100644 crates/atuin-daemon/proto/search.proto create mode 100644 crates/atuin-daemon/src/components/history.rs create mode 100644 crates/atuin-daemon/src/components/mod.rs create mode 100644 crates/atuin-daemon/src/components/search.rs create mode 100644 crates/atuin-daemon/src/components/sync.rs create mode 100644 crates/atuin-daemon/src/control/mod.rs create mode 100644 crates/atuin-daemon/src/control/service.rs create mode 100644 crates/atuin-daemon/src/daemon.rs create mode 100644 crates/atuin-daemon/src/events.rs delete mode 100644 crates/atuin-daemon/src/history.rs create mode 100644 crates/atuin-daemon/src/history/mod.rs create mode 100644 crates/atuin-daemon/src/search/index.rs create mode 100644 crates/atuin-daemon/src/search/mod.rs delete mode 100644 crates/atuin-daemon/src/server/sync.rs create mode 100644 crates/atuin/src/command/client/search/engines/daemon.rs create mode 100755 scripts/span-table.ts diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8c1f45b7..f47e1ff4 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -67,6 +67,51 @@ While iterating on the server, I find it helpful to run a new user on my system, Our test coverage is currently not the best, but we are working on it! Generally tests live in the file next to the functionality they are testing, and are executed just with `cargo test`. +## Logging and Debugging + +### Log Files + +Atuin writes logs to `~/.atuin/logs` unless configured otherwise. Log files are rotated daily and retained for 4 days by default: + +- `search.log.*` - Interactive search session logs +- `daemon.log.*` - Background daemon logs + +### Log Levels + +You can set the `ATUIN_LOG` environment variable to override log verbosity from the config file: + +```shell +ATUIN_LOG=debug atuin search # Enable debug logging +ATUIN_LOG=trace atuin search # Enable trace logging (very verbose) +``` + +### Span Timing (Performance Profiling) + +For performance analysis, you can capture detailed span timing data as JSON: + +```shell +ATUIN_SPAN=spans.json atuin search +``` + +This creates a JSON file with timing information for each instrumented span, including: +- `time.busy` - Time actively executing code +- `time.idle` - Time awaiting async operations (I/O, child tasks) + +The `scripts/span-table.ts` script analyzes these logs: + +```shell +# Summary view - shows all spans with timing stats +bun scripts/span-table.ts spans.json + +# Detail view - shows individual calls for a specific span +bun scripts/span-table.ts spans.json --detail daemon_search + +# Filter to specific spans +bun scripts/span-table.ts spans.json --filter "search|hydrate" +``` + +This is useful for comparing performance between different search implementations or identifying bottlenecks. + ## Migrations Be careful creating database migrations - once your database has migrated ahead of current stable, there is no going back diff --git a/Cargo.lock b/Cargo.lock index 329df7a4..5ef8d9f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,6 +241,7 @@ dependencies = [ "itertools", "log", "norm", + "nucleo-matcher", "open", "ratatui", "regex", @@ -257,6 +258,7 @@ dependencies = [ "tiny-bip39", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "tracing-tree", "unicode-width 0.2.2", @@ -377,6 +379,7 @@ dependencies = [ "eyre", "hyper-util", "listenfd", + "nucleo", "prost", "prost-types", "protox", @@ -2865,6 +2868,25 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nucleo" +version = "0.5.0" +source = "git+https://github.com/atuinsh/nucleo-ext.git?branch=main#74bd786e98f7c88d68f967855d6f57b3ac2d09ef" +dependencies = [ + "nucleo-matcher", + "parking_lot", + "rayon", +] + +[[package]] +name = "nucleo-matcher" +version = "0.3.1" +source = "git+https://github.com/atuinsh/nucleo-ext.git?branch=main#74bd786e98f7c88d68f967855d6f57b3ac2d09ef" +dependencies = [ + "memchr", + "unicode-segmentation", +] + [[package]] name = "num-bigint-dig" version = "0.8.6" @@ -5329,6 +5351,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -5339,12 +5371,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4acb21ef..c064bf5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,11 +47,15 @@ ratatui = "0.30.0" sql-builder = "3" tempfile = { version = "3.19" } minijinja = "2.9.0" -rustls = { version = "0.23", default-features = false, features = ["ring", "std", "tls12"] } +rustls = { version = "0.23", default-features = false, features = [ + "ring", + "std", + "tls12", +] } [workspace.dependencies.tracing-subscriber] version = "0.3" -features = ["ansi", "fmt", "registry", "env-filter"] +features = ["ansi", "fmt", "registry", "env-filter", "json"] [workspace.dependencies.reqwest] version = "0.13" diff --git a/crates/atuin-ai/src/commands.rs b/crates/atuin-ai/src/commands.rs index 7d5ca16b..b35cec9e 100644 --- a/crates/atuin-ai/src/commands.rs +++ b/crates/atuin-ai/src/commands.rs @@ -1,8 +1,13 @@ +use std::{ + fs, + path::{Path, PathBuf}, +}; + use atuin_common::shell::Shell; use clap::{Parser, Subcommand}; -use tracing::Level; +use eyre::Result; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt}; - #[cfg(debug_assertions)] pub mod debug_render; @@ -72,7 +77,11 @@ enum Commands { pub async fn run() -> eyre::Result<()> { let cli = Cli::parse(); - init_tracing(cli.verbose); + let settings = atuin_client::settings::Settings::new()?; + + if settings.logs.ai_enabled() { + init_logging(&settings, cli.verbose)?; + } match cli.command { Commands::Init { shell } => init::run(shell).await, @@ -89,6 +98,7 @@ pub async fn run() -> eyre::Result<()> { cli.api_token, keep, debug_state, + &settings, ) .await } @@ -104,39 +114,90 @@ pub async fn run() -> eyre::Result<()> { } } -fn init_tracing(verbose: bool) { - let level = if verbose { Level::DEBUG } else { Level::INFO }; +pub fn detect_shell() -> Option { + Some(Shell::current().to_string()) +} - // Create env filter - let env_filter = EnvFilter::from_default_env().add_directive( - format!("atuin_ai={}", level.as_str().to_lowercase()) - .parse() - .unwrap(), - ); +/// Initializes logging for the AI commands. +fn init_logging(settings: &atuin_client::settings::Settings, verbose: bool) -> Result<()> { + // ATUIN_LOG env var overrides config file level settings + let env_log_set = std::env::var("ATUIN_LOG").is_ok(); + + // Base filter from env var (or empty if not set) + let base_filter = + EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?); + + // Use config level unless ATUIN_LOG is set + let filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.ai_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + let filename = settings.logs.ai.file.clone(); + + // Clean up old log files + cleanup_old_logs(&log_dir, &filename, settings.logs.ai_retention()); - // Create console layer (only for verbose mode) let console_layer = if verbose { Some( fmt::layer() .with_writer(std::io::stderr) .with_ansi(true) .with_target(false) - .with_filter(env_filter), + .with_filter(filter.clone()), ) } else { None }; - // Initialize subscriber - let subscriber = tracing_subscriber::registry(); + let file_appender = RollingFileAppender::new(Rotation::DAILY, &log_dir, &filename); - if let Some(console) = console_layer { - subscriber.with(console).init(); + let base = tracing_subscriber::registry().with( + fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(filter), + ); + + if let Some(console_layer) = console_layer { + base.with(console_layer).init(); } else { - subscriber.init(); - } + base.init(); + }; + + Ok(()) } -pub fn detect_shell() -> Option { - Some(Shell::current().to_string()) +fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) { + let cutoff = std::time::SystemTime::now() + - std::time::Duration::from_secs(retention_days * 24 * 60 * 60); + + let Ok(entries) = fs::read_dir(log_dir) else { + return; + }; + + for entry in entries.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23" + if !name.starts_with(prefix) || name == prefix { + continue; + } + + if let Ok(metadata) = entry.metadata() + && let Ok(modified) = metadata.modified() + && modified < cutoff + { + let _ = fs::remove_file(&path); + } + } } diff --git a/crates/atuin-ai/src/commands/inline.rs b/crates/atuin-ai/src/commands/inline.rs index 3f9278a2..b49bfece 100644 --- a/crates/atuin-ai/src/commands/inline.rs +++ b/crates/atuin-ai/src/commands/inline.rs @@ -15,6 +15,7 @@ use eyre::{Context as _, Result, bail}; use futures::StreamExt; use reqwest::Url; use std::io::Write; +use tracing::{debug, error, info, trace}; pub async fn run( initial_command: Option, @@ -23,6 +24,7 @@ pub async fn run( api_token: Option, keep_output: bool, debug_state_file: Option, + settings: &atuin_client::settings::Settings, ) -> Result<()> { // Install panic hook once at entry point to ensure terminal restoration install_panic_hook(); @@ -31,7 +33,6 @@ pub async fn run( // 1. Command line arguments/environment variables // 2. Settings file // 3. Default - let settings = atuin_client::settings::Settings::new()?; let endpoint = api_endpoint.as_deref().unwrap_or( settings .ai @@ -44,7 +45,7 @@ pub async fn run( let token = if let Some(token) = &api_token { token.to_string() } else { - ensure_hub_session(&settings, endpoint).await? + ensure_hub_session(settings, endpoint).await? }; let action = run_inline_tui( @@ -57,6 +58,7 @@ pub async fn run( }, keep_output, debug_state_file, + settings, ) .await?; emit_shell_result(action.0, &action.1); @@ -69,9 +71,12 @@ async fn ensure_hub_session( hub_address: &str, ) -> Result { if let Some(token) = atuin_client::hub::get_session_token().await? { + debug!("Found Hub session, using existing token"); return Ok(token); } + info!("No Hub session found, prompting for authentication"); + println!("Atuin AI requires authenticating with Atuin Hub."); println!("This is separate from your sync setup."); println!("Press enter to begin (or esc to cancel)."); @@ -79,6 +84,8 @@ async fn ensure_hub_session( bail!("authentication canceled"); } + debug!("Starting Atuin Hub authentication..."); + println!("Authenticating with Atuin Hub..."); let mut auth_settings = settings.clone(); auth_settings.hub_address = hub_address.to_string(); @@ -93,6 +100,8 @@ async fn ensure_hub_session( ) .await?; + info!("Authentication complete, saving session token"); + atuin_client::hub::save_session(&token).await?; Ok(token) } @@ -141,6 +150,8 @@ fn create_chat_stream( } }; + debug!("Sending SSE request to {endpoint}"); + // Build request body let mut request_body = serde_json::json!({ "messages": messages, @@ -155,6 +166,7 @@ fn create_chat_stream( // Include session_id only if present (not on first request) if let Some(ref sid) = session_id { + trace!("Including session_id in request: {sid}"); request_body["session_id"] = serde_json::json!(sid); } @@ -178,12 +190,14 @@ fn create_chat_stream( let status = response.status(); if status == reqwest::StatusCode::UNAUTHORIZED { // Clear saved session on auth error + error!("SSE request failed with status: {status}, clearing session"); let _ = atuin_client::hub::delete_session().await; yield Err(eyre::eyre!("Hub session expired. Re-run to authenticate again.")); return; } if !status.is_success() { let body = response.text().await.unwrap_or_default(); + error!("SSE request failed ({}): {}", status, body); yield Err(eyre::eyre!("SSE request failed ({}): {}", status, body)); return; } @@ -197,7 +211,7 @@ fn create_chat_stream( let event_type = sse_event.event.as_str(); let data = sse_event.data.clone(); - tracing::debug!(event_type = %event_type, data = %data, "SSE event received"); + debug!(event_type = %event_type, "SSE event received"); match event_type { "text" => { @@ -245,8 +259,10 @@ fn create_chat_stream( "error" => { if let Ok(json) = serde_json::from_str::(&data) { let message = json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string(); + error!("SSE error: {}", message); yield Ok(ChatStreamEvent::Error(message)); } else { + error!("SSE error: {}", data); yield Ok(ChatStreamEvent::Error(data)); } break; @@ -391,6 +407,7 @@ async fn run_inline_tui( initial_prompt: Option, keep_output: bool, debug_state_file: Option, + settings: &atuin_client::settings::Settings, ) -> Result<(Action, String)> { // Initialize terminal guard and app state let mut guard = TerminalGuard::new(keep_output)?; @@ -425,7 +442,6 @@ async fn run_inline_tui( log_state!("init"); // Load theme - let settings = atuin_client::settings::Settings::new()?; let mut theme_manager = ThemeManager::new(None, None); let theme = theme_manager.load_theme(&settings.theme.name, None); @@ -486,12 +502,12 @@ async fn run_inline_tui( match stream.as_mut().poll_next(&mut cx) { std::task::Poll::Ready(Some(Ok(event))) => match event { ChatStreamEvent::TextChunk(text) => { - tracing::debug!(text = %text, "Processing TextChunk"); + trace!(text = %text, "Processing TextChunk"); app.state.append_streaming_text(&text); log_state!("text_chunk"); } ChatStreamEvent::ToolCall { id, name, input } => { - tracing::debug!(id = %id, name = %name, "Processing ToolCall"); + trace!(id = %id, name = %name, "Processing ToolCall"); app.state.add_tool_call(id, name, input); log_state!("tool_call"); } @@ -500,17 +516,17 @@ async fn run_inline_tui( content, is_error, } => { - tracing::debug!(tool_use_id = %tool_use_id, "Processing ToolResult"); + trace!(tool_use_id = %tool_use_id, "Processing ToolResult"); app.state.add_tool_result(tool_use_id, content, is_error); log_state!("tool_result"); } ChatStreamEvent::Status(status) => { - tracing::debug!(status = %status, "Processing Status"); + trace!(status = %status, "Processing Status"); app.state.update_streaming_status(&status); log_state!("status"); } ChatStreamEvent::Done { session_id } => { - tracing::debug!(session_id = %session_id, "Processing Done"); + trace!(session_id = %session_id, "Processing Done"); chat_stream = None; if !session_id.is_empty() { app.state.store_session_id(session_id); @@ -519,7 +535,7 @@ async fn run_inline_tui( log_state!("done"); } ChatStreamEvent::Error(msg) => { - tracing::debug!(error = %msg, "Processing Error"); + trace!(error = %msg, "Processing Error"); chat_stream = None; app.state.streaming_error(msg); log_state!("error"); @@ -544,7 +560,7 @@ async fn run_inline_tui( // Handle user cancellation (Esc during streaming) - drop the stream if app.state.was_interrupted && chat_stream.is_some() { - tracing::debug!("User cancelled streaming, dropping chat stream"); + debug!("User cancelled streaming, dropping chat stream"); chat_stream = None; app.state.was_interrupted = false; // Reset the flag } @@ -579,7 +595,7 @@ async fn run_inline_tui( token.clone(), app.state.session_id.clone(), messages, - &settings, + settings, )); } } diff --git a/crates/atuin-client/src/database.rs b/crates/atuin-client/src/database.rs index 5f292bec..7c63368d 100644 --- a/crates/atuin-client/src/database.rs +++ b/crates/atuin-client/src/database.rs @@ -138,9 +138,13 @@ pub trait Database: Send + Sync + 'static { async fn all_with_count(&self) -> Result>; + fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged; + async fn stats(&self, h: &History) -> Result; async fn get_dups(&self, before: i64, dupkeep: u32) -> Result>; + + fn clone_boxed(&self) -> Box; } // Intended for use on a developer machine and not a sync server. @@ -650,6 +654,10 @@ impl Database for Sqlite { Ok(res) } + fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged { + Paged::new(Box::new(self.clone()), page_size, include_deleted, unique) + } + // deleted_at doesn't mean the actual time that the user deleted it, // but the time that the system marks it as deleted async fn delete(&self, mut h: History) -> Result<()> { @@ -814,6 +822,70 @@ impl Database for Sqlite { Ok(res) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } +} + +pub struct Paged { + database: Box, + page_size: usize, + last_id: Option, + include_deleted: bool, + unique: bool, +} + +impl Paged { + pub fn new( + database: Box, + page_size: usize, + include_deleted: bool, + unique: bool, + ) -> Self { + Self { + database, + page_size, + last_id: None, + include_deleted, + unique, + } + } + + pub async fn next(&mut self) -> Result>> { + let mut query = SqlBuilder::select_from(SqlName::new("history").alias("h").baquoted()); + + query.field("*").order_desc("id"); + + if !self.include_deleted { + query.and_where_is_null("deleted_at"); + } + + if self.unique { + // We want to deduplicate on command, but the user can search via cwd, hostname, and session. + // Without those fields, filter modes won't work right. With those fields, we get duplicates. + // This must be handled upstream. + query + .group_by("command, cwd, hostname, session") + .having("max(timestamp)"); + } + + query.limit(self.page_size); + + if let Some(last_id) = &self.last_id { + query.and_where_lt("id", quote(last_id)); + } + + let query = query.sql().expect("bug in list query. please report"); + let res = self.database.query_history(&query).await?; + + if res.is_empty() { + Ok(None) + } else { + self.last_id = Some(res.last().unwrap().id.0.clone()); + Ok(Some(res)) + } + } } trait SqlBuilderExt { @@ -1165,6 +1237,126 @@ mod test { .unwrap(); } + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_basic() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add 5 history items + for i in 0..5 { + new_history_item(&mut db, &format!("command{}", i)) + .await + .unwrap(); + } + + // Create a paged iterator with page_size of 2 + let mut paged = db.all_paged(2, false, false); + + // First page should have 2 items + let page1 = paged.next().await.unwrap(); + assert!(page1.is_some()); + assert_eq!(page1.unwrap().len(), 2); + + // Second page should have 2 items + let page2 = paged.next().await.unwrap(); + assert!(page2.is_some()); + assert_eq!(page2.unwrap().len(), 2); + + // Third page should have 1 item + let page3 = paged.next().await.unwrap(); + assert!(page3.is_some()); + assert_eq!(page3.unwrap().len(), 1); + + // Fourth page should be None (exhausted) + let page4 = paged.next().await.unwrap(); + assert!(page4.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_empty() { + let db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Create a paged iterator on empty database + let mut paged = db.all_paged(10, false, false); + + // Should return None immediately + let page = paged.next().await.unwrap(); + assert!(page.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_unique() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add duplicate commands + new_history_item(&mut db, "duplicate").await.unwrap(); + new_history_item(&mut db, "duplicate").await.unwrap(); + new_history_item(&mut db, "unique1").await.unwrap(); + new_history_item(&mut db, "unique2").await.unwrap(); + + // Without unique flag - should get all 4 + let mut paged = db.all_paged(10, false, false); + let page = paged.next().await.unwrap().unwrap(); + assert_eq!(page.len(), 4); + + // With unique flag - should get 3 (duplicates collapsed) + let mut paged_unique = db.all_paged(10, false, true); + let page_unique = paged_unique.next().await.unwrap().unwrap(); + assert_eq!(page_unique.len(), 3); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_include_deleted() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add items + new_history_item(&mut db, "keep1").await.unwrap(); + new_history_item(&mut db, "keep2").await.unwrap(); + new_history_item(&mut db, "delete_me").await.unwrap(); + + // Delete one item + let all = db + .list( + &[], + &Context { + hostname: "".to_string(), + session: "".to_string(), + cwd: "".to_string(), + host_id: "".to_string(), + git_root: None, + }, + None, + false, + false, + ) + .await + .unwrap(); + + let to_delete = all + .iter() + .find(|h| h.command == "delete_me") + .unwrap() + .clone(); + db.delete(to_delete).await.unwrap(); + + // Without include_deleted - should get 2 + let mut paged = db.all_paged(10, false, false); + let page = paged.next().await.unwrap().unwrap(); + assert_eq!(page.len(), 2); + + // With include_deleted - should get 3 + let mut paged_deleted = db.all_paged(10, true, false); + let page_deleted = paged_deleted.next().await.unwrap().unwrap(); + assert_eq!(page_deleted.len(), 3); + } + #[tokio::test(flavor = "multi_thread")] async fn test_search_bench_dupes() { let context = Context { diff --git a/crates/atuin-client/src/hub.rs b/crates/atuin-client/src/hub.rs index 5b34574b..b94c69ea 100644 --- a/crates/atuin-client/src/hub.rs +++ b/crates/atuin-client/src/hub.rs @@ -58,10 +58,14 @@ impl HubAuthSession { /// /// Returns a session containing the code and auth URL that the user should visit. pub async fn start(settings: &Settings) -> Result { + debug!("Starting Hub authentication process..."); + let code_response = request_code(&settings.hub_address) .await .context("Failed to request authentication code from Hub")?; + debug!("Received code from Hub"); + let code = code_response.code; let auth_url = format!("{}/auth/cli?code={}", settings.hub_address, code); @@ -79,8 +83,10 @@ impl HubAuthSession { match verify_code(&self.hub_address, &self.code).await { Ok(response) => { if let Some(token) = response.token { + debug!("Authentication complete, received token"); Ok(HubAuthStatus::Complete(token)) } else if let Some(error) = response.error { + error!("Authentication failed: {}", error); Ok(HubAuthStatus::Failed(error)) } else { Ok(HubAuthStatus::Pending) @@ -105,8 +111,11 @@ impl HubAuthSession { ) -> Result { let start = std::time::Instant::now(); + debug!("Polling for Hub authentication completion..."); + loop { if start.elapsed() > timeout { + warn!("Authentication loop exited due to timeout"); bail!("Authentication timed out. Please try again."); } @@ -181,17 +190,21 @@ async fn handle_resp_error(resp: reqwest::Response) -> Result let status = resp.status(); if status == StatusCode::SERVICE_UNAVAILABLE { + error!("Service unavailable: check https://status.atuin.sh"); bail!("Service unavailable: check https://status.atuin.sh"); } if status == StatusCode::TOO_MANY_REQUESTS { + error!("Rate limited; please wait before trying again"); bail!("Rate limited; please wait before trying again"); } if !status.is_success() { if let Ok(error) = resp.json::().await { + error!("Hub error: {} - {}", status, error.reason); bail!("Hub error: {} - {}", status, error.reason); } + error!("Hub request failed with status: {}", status); bail!("Hub request failed with status: {}", status); } @@ -204,6 +217,8 @@ async fn request_code(address: &str) -> Result { let url = make_url(address, "/auth/cli/code")?; let client = reqwest::Client::new(); + debug!("Requesting code from Hub at {url}"); + let resp = client .post(&url) .header(USER_AGENT, APP_USER_AGENT) @@ -219,9 +234,12 @@ async fn request_code(address: &str) -> Result { /// Poll to verify the CLI auth code and get the session token async fn verify_code(address: &str, code: &str) -> Result { ensure_crypto_provider(); - let url = make_url(address, &format!("/auth/cli/verify?code={}", code))?; + let base = make_url(address, "/auth/cli/verify")?; + let url = format!("{base}?code={code}"); let client = reqwest::Client::new(); + debug!("Verifying code with Hub at {base}?code=******"); + let resp = client .post(&url) .header(USER_AGENT, APP_USER_AGENT) diff --git a/crates/atuin-client/src/settings.rs b/crates/atuin-client/src/settings.rs index a15ce461..8e874832 100644 --- a/crates/atuin-client/src/settings.rs +++ b/crates/atuin-client/src/settings.rs @@ -42,6 +42,10 @@ pub enum SearchMode { #[serde(rename = "skim")] Skim, + + #[serde(rename = "daemon-fuzzy")] + #[clap(aliases = &["daemon-fuzzy"])] + DaemonFuzzy, } impl SearchMode { @@ -51,6 +55,7 @@ impl SearchMode { SearchMode::FullText => "FULLTXT", SearchMode::Fuzzy => "FUZZY", SearchMode::Skim => "SKIM", + SearchMode::DaemonFuzzy => "DAEMON", } } pub fn next(&self, settings: &Settings) -> Self { @@ -58,9 +63,13 @@ impl SearchMode { SearchMode::Prefix => SearchMode::FullText, // if the user is using skim, we go to skim SearchMode::FullText if settings.search_mode == SearchMode::Skim => SearchMode::Skim, + // if the user is using daemon-fuzzy, we go to daemon-fuzzy + SearchMode::FullText if settings.search_mode == SearchMode::DaemonFuzzy => { + SearchMode::DaemonFuzzy + } // otherwise fuzzy. SearchMode::FullText => SearchMode::Fuzzy, - SearchMode::Fuzzy | SearchMode::Skim => SearchMode::Prefix, + SearchMode::Fuzzy | SearchMode::Skim | SearchMode::DaemonFuzzy => SearchMode::Prefix, } } } @@ -477,6 +486,78 @@ pub struct Tmux { pub height: String, } +/// Log level for file logging. Maps to tracing's LevelFilter. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum LogLevel { + Trace, + Debug, + #[default] + Info, + Warn, + Error, +} + +impl LogLevel { + /// Convert to a tracing directive string for use with EnvFilter. + pub fn as_directive(&self) -> &'static str { + match self { + LogLevel::Trace => "trace", + LogLevel::Debug => "debug", + LogLevel::Info => "info", + LogLevel::Warn => "warn", + LogLevel::Error => "error", + } + } +} + +/// Configuration for a specific log type (search or daemon). +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct LogConfig { + /// Log file name (relative to dir) or absolute path. + pub file: String, + + /// Override global enabled setting for this log type. + pub enabled: Option, + + /// Override global level setting for this log type. + pub level: Option, + + /// Override global retention days setting for this log type. + pub retention: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Logs { + /// Enable file logging globally. Defaults to true. + #[serde(default = "Logs::default_enabled")] + pub enabled: bool, + + /// Directory for log files. Defaults to ~/.atuin/logs + pub dir: String, + + /// Default log level for file logging. Defaults to "info". + /// Note: ATUIN_LOG environment variable overrides this. + #[serde(default)] + pub level: LogLevel, + + /// Default retention days for log files. Defaults to 4. + #[serde(default = "Logs::default_retention")] + pub retention: u64, + + /// Search log settings + #[serde(default)] + pub search: LogConfig, + + /// Daemon log settings + #[serde(default)] + pub daemon: LogConfig, + + /// AI log settings + #[serde(default)] + pub ai: LogConfig, +} + #[derive(Default, Clone, Debug, Deserialize, Serialize)] pub struct Ai { /// The address of the Atuin AI endpoint. Used for AI features like command generation. @@ -523,6 +604,117 @@ impl Default for Daemon { } } +impl Default for Logs { + fn default() -> Self { + Self { + enabled: true, + dir: "".to_string(), + level: LogLevel::default(), + retention: Self::default_retention(), + search: LogConfig { + file: "search.log".to_string(), + ..Default::default() + }, + daemon: LogConfig { + file: "daemon.log".to_string(), + ..Default::default() + }, + ai: LogConfig { + file: "ai.log".to_string(), + ..Default::default() + }, + } + } +} + +impl Logs { + fn default_enabled() -> bool { + true + } + + fn default_retention() -> u64 { + 4 + } + + /// Returns whether search logging is enabled. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_enabled(&self) -> bool { + self.search.enabled.unwrap_or(self.enabled) + } + + /// Returns whether daemon logging is enabled. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_enabled(&self) -> bool { + self.daemon.enabled.unwrap_or(self.enabled) + } + + /// Returns whether AI logging is enabled. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_enabled(&self) -> bool { + self.ai.enabled.unwrap_or(self.enabled) + } + + /// Returns the log level for search logging. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_level(&self) -> LogLevel { + self.search.level.unwrap_or(self.level) + } + + /// Returns the log level for daemon logging. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_level(&self) -> LogLevel { + self.daemon.level.unwrap_or(self.level) + } + + /// Returns the log level for AI logging. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_level(&self) -> LogLevel { + self.ai.level.unwrap_or(self.level) + } + + /// Returns the retention days for search logging. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_retention(&self) -> u64 { + self.search.retention.unwrap_or(self.retention) + } + + /// Returns the retention days for daemon logging. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_retention(&self) -> u64 { + self.daemon.retention.unwrap_or(self.retention) + } + + /// Returns the retention days for AI logging. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_retention(&self) -> u64 { + self.ai.retention.unwrap_or(self.retention) + } + + /// Returns the full path for the search log file. + /// If `file` is an absolute path, returns it directly. + /// Otherwise, joins it with `dir`. + pub fn search_path(&self) -> PathBuf { + let path = PathBuf::from(&self.search.file); + if path.is_absolute() { + path + } else { + PathBuf::from(&self.dir).join(path) + } + } + + /// Returns the full path for the daemon log file. + /// If `file` is an absolute path, returns it directly. + /// Otherwise, joins it with `dir`. + pub fn daemon_path(&self) -> PathBuf { + let path = PathBuf::from(&self.daemon.file); + if path.is_absolute() { + path + } else { + PathBuf::from(&self.dir).join(path) + } + } +} + impl Default for Search { fn default() -> Self { Self { @@ -848,6 +1040,9 @@ pub struct Settings { #[serde(default)] pub tmux: Tmux, + #[serde(default)] + pub logs: Logs, + #[serde(default)] pub meta: meta::Settings, @@ -1033,6 +1228,7 @@ impl Settings { let scripts_path = data_dir.join("scripts.db"); let socket_path = atuin_common::utils::runtime_dir().join("atuin.sock"); let pidfile_path = data_dir.join("atuin-daemon.pid"); + let logs_dir = atuin_common::utils::logs_dir(); let key_path = data_dir.join("key"); let meta_path = data_dir.join("meta.db"); @@ -1101,6 +1297,12 @@ impl Settings { .set_default("daemon.pidfile_path", pidfile_path.to_str())? .set_default("daemon.systemd_socket", false)? .set_default("daemon.tcp_port", 8889)? + .set_default("logs.enabled", true)? + .set_default("logs.dir", logs_dir.to_str())? + .set_default("logs.level", "info")? + .set_default("logs.search.file", "search.log")? + .set_default("logs.daemon.file", "daemon.log")? + .set_default("logs.ai.file", "ai.log")? .set_default("kv.db_path", kv_path.to_str())? .set_default("scripts.db_path", scripts_path.to_str())? .set_default("meta.db_path", meta_path.to_str())? @@ -1218,6 +1420,9 @@ impl Settings { settings.key_path = Self::expand_path(settings.key_path)?; settings.daemon.socket_path = Self::expand_path(settings.daemon.socket_path)?; settings.daemon.pidfile_path = Self::expand_path(settings.daemon.pidfile_path)?; + settings.logs.dir = Self::expand_path(settings.logs.dir)?; + settings.logs.search.file = Self::expand_path(settings.logs.search.file)?; + settings.logs.daemon.file = Self::expand_path(settings.logs.daemon.file)?; // Validate UI settings settings.ui.validate()?; @@ -1264,6 +1469,20 @@ impl Default for Settings { } } +/// Initialize the meta store configuration for testing. +/// +/// This should only be used in tests. It allows tests to bypass the normal +/// Settings::new() flow while still being able to use Settings::host_id() +/// and other meta store dependent functions. +/// +/// # Safety +/// This function is not thread-safe with concurrent calls to Settings::new() +/// or other meta store initialization. Only call from tests. +#[doc(hidden)] +pub fn init_meta_config_for_testing(meta_db_path: impl Into, local_timeout: f64) { + META_CONFIG.set((meta_db_path.into(), local_timeout)).ok(); +} + #[cfg(test)] pub(crate) fn test_local_timeout() -> f64 { std::env::var("ATUIN_TEST_LOCAL_TIMEOUT") diff --git a/crates/atuin-common/src/utils.rs b/crates/atuin-common/src/utils.rs index bb291ebf..b885423e 100644 --- a/crates/atuin-common/src/utils.rs +++ b/crates/atuin-common/src/utils.rs @@ -88,6 +88,10 @@ pub fn runtime_dir() -> PathBuf { std::env::var("XDG_RUNTIME_DIR").map_or_else(|_| data_dir(), PathBuf::from) } +pub fn logs_dir() -> PathBuf { + home_dir().join(".atuin").join("logs") +} + pub fn dotfiles_cache_dir() -> PathBuf { // In most cases, this will be ~/.local/share/atuin/dotfiles/cache let data_dir = std::env::var("XDG_DATA_HOME") diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml index 36917789..97ed88ea 100644 --- a/crates/atuin-daemon/Cargo.toml +++ b/crates/atuin-daemon/Cargo.toml @@ -14,9 +14,10 @@ readme.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -atuin-client = { path = "../atuin-client", version = "18.13.0-beta.1" } -atuin-dotfiles = { path = "../atuin-dotfiles", version = "18.13.0-beta.1" } -atuin-history = { path = "../atuin-history", version = "18.13.0-beta.1" } +atuin-client = { path = "../atuin-client" } +atuin-common = { path = "../atuin-common" } +atuin-dotfiles = { path = "../atuin-dotfiles" } +atuin-history = { path = "../atuin-history" } time = { workspace = true } uuid = { workspace = true } @@ -32,10 +33,11 @@ tonic = "0.14" tonic-prost = "0.14" prost = "0.14" prost-types = "0.14" -tokio-stream = {version="0.1.14", features=["net"]} +tokio-stream = { version = "0.1.14", features = ["net"] } hyper-util = "0.1" rand.workspace = true +nucleo = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" } [target.'cfg(target_os = "linux")'.dependencies] listenfd = "1.0.1" diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs index fbe34d12..7034aa04 100644 --- a/crates/atuin-daemon/build.rs +++ b/crates/atuin-daemon/build.rs @@ -3,7 +3,11 @@ use std::{env, fs, path::PathBuf}; use protox::prost::Message; fn main() -> std::io::Result<()> { - let proto_paths = ["proto/history.proto"]; + let proto_paths = [ + "proto/history.proto", + "proto/search.proto", + "proto/control.proto", + ]; let proto_include_dirs = ["proto"]; let file_descriptors = protox::compile(proto_paths, proto_include_dirs).unwrap(); diff --git a/crates/atuin-daemon/proto/control.proto b/crates/atuin-daemon/proto/control.proto new file mode 100644 index 00000000..06347902 --- /dev/null +++ b/crates/atuin-daemon/proto/control.proto @@ -0,0 +1,62 @@ +syntax = "proto3"; +package control; + +// The Control service allows external processes (CLI commands, etc.) +// to inject events into the running daemon. +service Control { + // Send an event to the daemon's event bus + rpc SendEvent(SendEventRequest) returns (SendEventResponse); +} + +message SendEventRequest { + oneof event { + // History was pruned - search index needs full rebuild + HistoryPrunedEvent history_pruned = 1; + + // Specific history items were deleted + HistoryDeletedEvent history_deleted = 2; + + // Request immediate sync + ForceSyncEvent force_sync = 3; + + // Settings have changed, reload if needed + SettingsReloadedEvent settings_reloaded = 4; + + // Request graceful shutdown + ShutdownEvent shutdown = 5; + + // History was rebuilt - search index needs full rebuild + HistoryRebuiltEvent history_rebuilt = 6; + } +} + +message SendEventResponse { + // Empty on success; errors come through gRPC status +} + +// Individual event message types + +message HistoryPrunedEvent { + // No fields needed - just signals that pruning happened +} + +message HistoryRebuiltEvent { + // No fields needed - just signals that rebuilding happened +} + +message HistoryDeletedEvent { + // IDs of deleted history items (UUIDs as strings) + repeated string ids = 1; +} + +message ForceSyncEvent { + // No fields needed - just triggers sync +} + +message SettingsReloadedEvent { + // No fields needed - components should re-read settings +} + +message ShutdownEvent { + // No fields needed - triggers graceful shutdown +} diff --git a/crates/atuin-daemon/proto/search.proto b/crates/atuin-daemon/proto/search.proto new file mode 100644 index 00000000..6b84acbd --- /dev/null +++ b/crates/atuin-daemon/proto/search.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; +package search; + +enum FilterMode { + GLOBAL = 0; + HOST = 1; + SESSION = 2; + DIRECTORY = 3; + WORKSPACE = 4; + SESSION_PRELOAD = 5; +} + +message SearchContext { + string session_id = 1; + string cwd = 2; + string hostname = 3; + string host_id = 4; + optional string git_root = 5; +} + +message SearchRequest { + string query = 1; + uint64 query_id = 2; // Incrementing ID to match responses to queries + FilterMode filter_mode = 3; + SearchContext context = 4; +} + +message SearchResponse { + uint64 query_id = 1; // Echo back the query ID + repeated bytes ids = 2; +} + +service Search { + rpc Search(stream SearchRequest) returns (stream SearchResponse); +} diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs index 3b76a680..2f492f6b 100644 --- a/crates/atuin-daemon/src/client.rs +++ b/crates/atuin-daemon/src/client.rs @@ -1,4 +1,6 @@ -use eyre::{Context, Result}; +use atuin_client::database::Context; +use atuin_client::settings::{FilterMode, Settings}; +use eyre::{Context as EyreContext, Result}; #[cfg(windows)] use tokio::net::TcpStream; use tonic::Code; @@ -11,11 +13,22 @@ use hyper_util::rt::TokioIo; use tokio::net::UnixStream; use atuin_client::history::History; +use tracing::{Level, instrument, span}; +use crate::control::HistoryRebuiltEvent; +use crate::control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, + SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, +}; +use crate::events::DaemonEvent; use crate::history::{ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient, }; +use crate::search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, + search_client::SearchClient as SearchServiceClient, +}; pub struct HistoryClient { client: HistoryServiceClient, @@ -52,6 +65,8 @@ pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { impl HistoryClient { #[cfg(unix)] pub async fn new(path: String) -> Result { + use eyre::Context; + let log_path = path.clone(); let channel = Endpoint::try_from("http://atuin_local_daemon:0")? .connect_with_connector(service_fn(move |_: Uri| { @@ -130,3 +145,275 @@ impl HistoryClient { Ok(resp.accepted) } } + +pub struct SearchClient { + client: SearchServiceClient, +} + +impl SearchClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] + pub async fn search( + &mut self, + query: String, + query_id: u64, + filter_mode: FilterMode, + context: Option, + ) -> Result> { + let request = SearchRequest { + query, + query_id, + filter_mode: RpcFilterMode::from(filter_mode).into(), + context: context.map(RpcSearchContext::from), + }; + let request_stream = tokio_stream::once(request); + let response = span!(Level::TRACE, "daemon_client_search.request") + .in_scope(async || self.client.search(request_stream).await) + .await?; + + Ok(response.into_inner()) + } +} + +impl From for RpcFilterMode { + fn from(filter_mode: FilterMode) -> Self { + match filter_mode { + FilterMode::Global => RpcFilterMode::Global, + FilterMode::Host => RpcFilterMode::Host, + FilterMode::Session => RpcFilterMode::Session, + FilterMode::Directory => RpcFilterMode::Directory, + FilterMode::Workspace => RpcFilterMode::Workspace, + FilterMode::SessionPreload => RpcFilterMode::SessionPreload, + } + } +} + +impl From for RpcSearchContext { + fn from(context: Context) -> Self { + RpcSearchContext { + session_id: context.session, + cwd: context.cwd, + hostname: context.hostname, + host_id: context.host_id, + git_root: context + .git_root + .map(|path| path.to_string_lossy().to_string()), + } + } +} + +// ============================================================================ +// Control Client +// ============================================================================ + +/// Client for the Control gRPC service. +/// +/// Used to inject events into a running daemon from external processes. +pub struct ControlClient { + client: ControlServiceClient, +} + +impl ControlClient { + /// Connect to the daemon's control service. + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect to the daemon's control service. + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect using settings. + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.socket_path.clone()).await + } + + /// Connect using settings. + #[cfg(not(unix))] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.tcp_port).await + } + + /// Send an event to the daemon. + pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { + let proto_event = daemon_event_to_proto(event); + let request = SendEventRequest { + event: Some(proto_event), + }; + self.client.send_event(request).await?; + Ok(()) + } +} + +/// Convert a daemon event to its proto representation. +fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event { + use crate::control::send_event_request::Event; + + match event { + DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), + DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), + DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { + ids: ids.into_iter().map(|id| id.0).collect(), + }), + DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), + DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), + DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), + // These events are internal and not sent via the control service + DaemonEvent::HistoryStarted(_) + | DaemonEvent::HistoryEnded(_) + | DaemonEvent::RecordsAdded(_) + | DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } => { + // Use shutdown as a fallback, though this shouldn't happen + tracing::warn!("attempted to send internal event via control service"); + Event::Shutdown(ShutdownEvent {}) + } + } +} + +// ============================================================================ +// Convenience Functions +// ============================================================================ + +/// Emit an event to the daemon. +/// +/// This is a fire-and-forget helper for sending events to the daemon from +/// external processes like CLI commands. If the daemon isn't running, this +/// will silently succeed (returns Ok). +/// +/// # Example +/// +/// ```ignore +/// // After pruning history +/// emit_event(DaemonEvent::HistoryPruned).await?; +/// +/// // After deleting specific history items +/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; +/// +/// // Request immediate sync +/// emit_event(DaemonEvent::ForceSync).await?; +/// ``` +pub async fn emit_event(event: DaemonEvent) -> Result<()> { + emit_event_with_settings(event, None).await +} + +/// Emit an event to the daemon with explicit settings. +/// +/// If settings are not provided, they will be loaded from the default location. +/// If the daemon isn't running, this will silently succeed. +pub async fn emit_event_with_settings( + event: DaemonEvent, + settings: Option<&Settings>, +) -> Result<()> { + // Load settings if not provided + let owned_settings; + let settings = match settings { + Some(s) => s, + None => { + owned_settings = Settings::new()?; + &owned_settings + } + }; + + // Try to connect - if daemon isn't running, that's fine + let mut client = match ControlClient::from_settings(settings).await { + Ok(c) => c, + Err(e) => { + tracing::debug!(?e, "daemon not running, skipping event emission"); + return Ok(()); + } + }; + + // Send the event + if let Err(e) = client.send_event(event).await { + tracing::debug!(?e, "failed to send event to daemon"); + // Don't fail - this is fire-and-forget + } + + Ok(()) +} diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs new file mode 100644 index 00000000..23d48c5e --- /dev/null +++ b/crates/atuin-daemon/src/components/history.rs @@ -0,0 +1,252 @@ +//! History component. +//! +//! Handles command history lifecycle (start/end) and provides the History gRPC service. + +use std::sync::Arc; + +use atuin_client::{ + database::Database, + history::{History, HistoryId, store::HistoryStore}, + settings::Settings, +}; +use dashmap::DashMap; +use eyre::Result; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Level, instrument}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + history::{ + EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, + StartHistoryRequest, StatusReply, StatusRequest, + history_server::{History as HistorySvc, HistoryServer}, + }, +}; + +const DAEMON_PROTOCOL_VERSION: u32 = 1; + +/// History component - manages command history lifecycle. +/// +/// This component: +/// - Tracks currently running commands (stored in memory) +/// - Saves completed commands to the database and record store +/// - Emits history events for other components (e.g., search indexing) +/// - Provides the History gRPC service +pub struct HistoryComponent { + inner: Arc, +} + +struct HistoryComponentInner { + /// Commands currently running (not yet completed). + running: DashMap, + + /// Handle to the daemon (set during start). + handle: tokio::sync::RwLock>, + + /// History store for pushing records (set during start). + history_store: tokio::sync::RwLock>, +} + +impl HistoryComponent { + /// Create a new history component. + pub fn new() -> Self { + Self { + inner: Arc::new(HistoryComponentInner { + running: DashMap::new(), + handle: tokio::sync::RwLock::new(None), + history_store: tokio::sync::RwLock::new(None), + }), + } + } + + /// Get the gRPC service for this component. + /// + /// This returns a tonic service that can be added to a gRPC server. + pub fn grpc_service(&self) -> HistoryServer { + HistoryServer::new(HistoryGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for HistoryComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for HistoryComponent { + fn name(&self) -> &'static str { + "history" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + // Create the history store + let host_id = Settings::host_id().await?; + let history_store = + HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); + + *self.inner.history_store.write().await = Some(history_store); + *self.inner.handle.write().await = Some(handle); + + tracing::info!("history component started"); + Ok(()) + } + + async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { + // History component produces events but doesn't need to react to them + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + tracing::info!("history component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +/// +/// This is a thin wrapper that delegates to the component's shared state. +pub struct HistoryGrpcService { + inner: Arc, +} + +#[tonic::async_trait] +impl HistorySvc for HistoryGrpcService { + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .build() + .into(); + + // Emit the event + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.emit(DaemonEvent::HistoryStarted(h.clone())); + } + + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + self.inner.running.insert(id.clone(), h); + + let reply = StartHistoryReply { + id: id.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let id = HistoryId(req.id); + + if let Some((_, mut history)) = self.inner.running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Get the handle and store to save the history + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let store_guard = self.inner.history_store.read().await; + let history_store = store_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + // Save to database + handle + .history_db() + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + // Push to record store + let (record_id, idx) = history_store + .push(history.clone()) + .await + .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; + + // Emit the event + handle.emit(DaemonEvent::HistoryEnded(history)); + + let reply = EndHistoryReply { + id: record_id.0.to_string(), + idx, + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn status( + &self, + _request: Request, + ) -> Result, Status> { + let reply = StatusReply { + healthy: true, + version: env!("CARGO_PKG_VERSION").to_string(), + pid: std::process::id(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn shutdown( + &self, + _request: Request, + ) -> Result, Status> { + // Use the daemon handle to request shutdown + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.shutdown(); + } + Ok(Response::new(ShutdownReply { accepted: true })) + } +} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs new file mode 100644 index 00000000..5950d5d5 --- /dev/null +++ b/crates/atuin-daemon/src/components/mod.rs @@ -0,0 +1,22 @@ +//! Daemon components. +//! +//! Components are the building blocks of the daemon. Each component handles +//! a specific domain and can: +//! +//! - Expose gRPC services +//! - React to events +//! - Spawn background tasks +//! +//! Available components: +//! +//! - [`history::HistoryComponent`]: Command history lifecycle management +//! - [`search::SearchComponent`]: Fuzzy search over history +//! - [`sync::SyncComponent`]: Cloud sync + +pub mod history; +pub mod search; +pub mod sync; + +pub use history::HistoryComponent; +pub use search::SearchComponent; +pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs new file mode 100644 index 00000000..7fb59dea --- /dev/null +++ b/crates/atuin-daemon/src/components/search.rs @@ -0,0 +1,394 @@ +//! Search component. +//! +//! Provides fuzzy search over command history using the Nucleo search library +//! with frecency-based ranking and dynamic filtering. + +use std::{pin::Pin, sync::Arc}; + +use atuin_client::database::Database; +use eyre::Result; +use tokio::sync::RwLock; +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, debug, info, instrument, span, trace}; +use uuid::Uuid; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + search::{ + FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + search_server::{Search as SearchSvc, SearchServer}, + }, +}; + +const PAGE_SIZE: usize = 5000; +const RESULTS_LIMIT: u32 = 200; +/// How often to rebuild the frecency map (in seconds). +const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; + +/// Search component - provides fuzzy search over command history. +/// +/// This component: +/// - Maintains a deduplicated search index with frecency ranking +/// - Loads history from the database on startup +/// - Updates the index when history events occur +/// - Provides the Search gRPC service +pub struct SearchComponent { + index: Arc>, + handle: tokio::sync::RwLock>, + loader_handle: Option>, + frecency_handle: Option>, +} + +impl SearchComponent { + /// Create a new search component. + pub fn new() -> Self { + Self { + index: Arc::new(RwLock::new(SearchIndex::new())), + handle: tokio::sync::RwLock::new(None), + loader_handle: None, + frecency_handle: None, + } + } + + /// Get the gRPC service for this component. + pub fn grpc_service(&self) -> SearchServer { + SearchServer::new(SearchGrpcService { + index: self.index.clone(), + }) + } + + /// Rebuild the entire search index from the database. + async fn rebuild_index(&self) -> Result<()> { + let handle_guard = self.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("component not initialized"))?; + + info!("Rebuilding search index from database"); + + // Create a new index + let new_index = SearchIndex::new(); + + // Load all history into the new index + let db = handle.history_db().clone(); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + new_index.add_histories(&histories); + } + Ok(None) => break, + Err(e) => { + tracing::error!("Failed to load history during rebuild: {}", e); + break; + } + } + } + + info!( + "Search index rebuild complete; {} unique commands", + new_index.command_count() + ); + + // Replace the old index with the new one + *self.index.write().await = new_index; + Ok(()) + } +} + +impl Default for SearchComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SearchComponent { + fn name(&self) -> &'static str { + "search" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + *self.handle.write().await = Some(handle.clone()); + + // Spawn background task to load history into index + let index = self.index.clone(); + let db = handle.history_db().clone(); + + self.loader_handle = Some(tokio::spawn(async move { + info!( + "Loading history into search index; page size = {}", + PAGE_SIZE + ); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + index.read().await.add_histories(&histories); + } + Ok(None) => { + info!( + "Initial history load complete; {} unique commands indexed", + index.read().await.command_count() + ); + // Build initial frecency map + index.read().await.rebuild_frecency().await; + info!("Initial frecency map built"); + break; + } + Err(e) => { + tracing::error!("Failed to load history: {}", e); + break; + } + } + } + })); + + // Spawn background task to periodically refresh frecency + let index_for_frecency = self.index.clone(); + self.frecency_handle = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + FRECENCY_REFRESH_INTERVAL_SECS, + )); + loop { + interval.tick().await; + trace!("Refreshing frecency map"); + index_for_frecency.read().await.rebuild_frecency().await; + } + })); + + tracing::info!("search component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::RecordsAdded(records) => { + debug!( + count = records.len(), + "Processing added records for search index" + ); + + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let histories: Vec<_> = handle + .history_db() + .query_history( + format!( + "select * from history where id in ({})", + records + .iter() + .map(|record| record.0.to_string()) + .collect::>() + .join(",") + ) + .as_str(), + ) + .await + .unwrap_or_default(); + + span!(Level::TRACE, "inject_records", count = histories.len()) + .in_scope(async || { + self.index.read().await.add_histories(&histories); + }) + .await; + } + } + DaemonEvent::HistoryStarted(history) => { + debug!(id = %history.id, command = %history.command, "History started (no index action)"); + } + DaemonEvent::HistoryEnded(history) => { + span!(Level::TRACE, "inject_history_ended") + .in_scope(async || { + self.index.read().await.add_history(history); + }) + .await; + } + DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { + info!("History store pruned or rebuilt, rebuilding search index"); + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::HistoryDeleted { ids } => { + info!( + count = ids.len(), + "History deleted, rebuilding search index" + ); + // For now, just rebuild the entire index. A more efficient implementation + // would remove specific items from the index. + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + // Events we don't care about + DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } + | DaemonEvent::ForceSync + | DaemonEvent::SettingsReloaded + | DaemonEvent::ShutdownRequested => {} + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(handle) = self.loader_handle.take() { + handle.abort(); + } + if let Some(handle) = self.frecency_handle.take() { + handle.abort(); + } + tracing::info!("search component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +pub struct SearchGrpcService { + index: Arc>, +} + +#[tonic::async_trait] +impl SearchSvc for SearchGrpcService { + type SearchStream = Pin> + Send>>; + + #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] + async fn search( + &self, + request: Request>, + ) -> Result, Status> { + let mut in_stream = request.into_inner(); + let index = self.index.clone(); + + // Create output channel + let (tx, rx) = tokio::sync::mpsc::channel::>(128); + + // Spawn task to handle incoming requests and send responses + tokio::spawn(async move { + while let Some(req) = in_stream.message().await.transpose() { + match req { + Ok(search_req) => { + let query = search_req.query; + let query_id = search_req.query_id; + let filter_mode: FilterMode = search_req + .filter_mode + .try_into() + .unwrap_or(FilterMode::Global); + let proto_context = search_req.context; + + debug!( + "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", + query, + query_id, + filter_mode.as_str_name(), + proto_context + ); + + // Convert proto FilterMode + context to IndexFilterMode + let index_filter = convert_filter_mode(filter_mode, &proto_context); + + // Build QueryContext from proto context + let query_context = proto_context + .map(|ctx| QueryContext { + cwd: Some(with_trailing_slash(&ctx.cwd)), + git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), + hostname: Some(ctx.hostname), + session_id: Some(ctx.session_id), + }) + .unwrap_or_default(); + + // Perform the search + let history_ids = + span!(Level::TRACE, "daemon_search_query", %query, query_id) + .in_scope(|| async { + let index = index.read().await; + index + .search(&query, index_filter, &query_context, RESULTS_LIMIT) + .await + }) + .await; + + // Convert history IDs to bytes + let ids: Vec> = history_ids + .iter() + .filter_map(|id| { + Uuid::parse_str(id) + .ok() + .map(|uuid| uuid.as_bytes().to_vec()) + }) + .collect(); + + if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + // Convert receiver to stream + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream))) + } +} + +/// Convert proto FilterMode and context to IndexFilterMode. +fn convert_filter_mode( + mode: FilterMode, + context: &Option, +) -> IndexFilterMode { + match (mode, context) { + (FilterMode::Global, _) => IndexFilterMode::Global, + (FilterMode::Directory, Some(ctx)) => { + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + (FilterMode::Workspace, Some(ctx)) => { + if let Some(ref git_root) = ctx.git_root { + IndexFilterMode::Workspace(with_trailing_slash(git_root)) + } else { + // Fall back to directory if no git root + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + } + (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), + (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), + (FilterMode::SessionPreload, Some(ctx)) => { + // SessionPreload is similar to Session - filter by session + IndexFilterMode::Session(ctx.session_id.clone()) + } + // If no context provided, fall back to global + _ => IndexFilterMode::Global, + } +} + +#[cfg(windows)] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('\\') { + s.to_string() + } else { + format!("{}\\", s) + } +} + +#[cfg(not(windows))] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s.to_string() + } else { + format!("{}/", s) + } +} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs new file mode 100644 index 00000000..6217706a --- /dev/null +++ b/crates/atuin-daemon/src/components/sync.rs @@ -0,0 +1,257 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; +use atuin_dotfiles::store::{AliasStore, var::VarStore}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option>, + command_tx: Option>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key); + let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = ticker.tick() => { + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + alias_store: &AliasStore, + var_store: &VarStore, + ticker: &mut time::Interval, + max_interval: f64, +) { + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return; + } + + // Perform the sync + let res = sync::sync(&settings, handle.store()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + + tracing::error!("backing off, next sync tick in {new_interval}"); + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Rebuild alias and var stores + if let Err(e) = alias_store.build().await { + tracing::error!("failed to rebuild alias store: {e}"); + } + if let Err(e) = var_store.build().await { + tracing::error!("failed to rebuild var store: {e}"); + } + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + } + } +} diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs new file mode 100644 index 00000000..afb29c57 --- /dev/null +++ b/crates/atuin-daemon/src/control/mod.rs @@ -0,0 +1,12 @@ +//! Control module for external event injection. +//! +//! This module provides the gRPC service that allows external processes +//! (like CLI commands) to inject events into the daemon's event bus. + +mod service; + +// Include the generated proto code +tonic::include_proto!("control"); + +// Re-export the service +pub use service::ControlService; diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs new file mode 100644 index 00000000..2e7403ce --- /dev/null +++ b/crates/atuin-daemon/src/control/service.rs @@ -0,0 +1,71 @@ +//! Control service implementation. +//! +//! This gRPC service allows external processes (like CLI commands) to inject +//! events into the daemon's event bus. + +use atuin_client::history::HistoryId; +use tonic::{Request, Response, Status}; +use tracing::{Level, info, instrument}; + +use super::{ + SendEventRequest, SendEventResponse, + control_server::{Control, ControlServer}, + send_event_request::Event, +}; +use crate::{daemon::DaemonHandle, events::DaemonEvent}; + +/// The Control gRPC service. +/// +/// This service is used by external processes to inject events into the daemon. +/// It's not a component - it's part of the daemon's core infrastructure. +pub struct ControlService { + handle: DaemonHandle, +} + +impl ControlService { + /// Create a new control service with the given daemon handle. + pub fn new(handle: DaemonHandle) -> Self { + Self { handle } + } + + /// Get a tonic server for this service. + pub fn into_server(self) -> ControlServer { + ControlServer::new(self) + } +} + +#[tonic::async_trait] +impl Control for ControlService { + #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] + async fn send_event( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let event = req + .event + .ok_or_else(|| Status::invalid_argument("event is required"))?; + + let daemon_event = proto_event_to_daemon_event(event)?; + + info!(?daemon_event, "received control event"); + self.handle.emit(daemon_event); + + Ok(Response::new(SendEventResponse {})) + } +} + +/// Convert a proto event to a daemon event. +fn proto_event_to_daemon_event(event: Event) -> Result { + match event { + Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), + Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), + Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { + ids: e.ids.into_iter().map(HistoryId).collect(), + }), + Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), + Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), + Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), + } +} diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs new file mode 100644 index 00000000..ec0b7b68 --- /dev/null +++ b/crates/atuin-daemon/src/daemon.rs @@ -0,0 +1,450 @@ +//! Core daemon infrastructure. +//! +//! This module provides the foundational types for building the atuin daemon: +//! +//! - [`DaemonState`]: Shared state owned by the daemon +//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state +//! - [`Component`]: A trait for implementing daemon components +//! - [`Daemon`]: The main daemon orchestrator +//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon + +use std::sync::Arc; + +use atuin_client::{ + database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore, + settings::Settings, +}; +use eyre::{Context, Result}; +use tokio::sync::{RwLock, broadcast}; + +use crate::events::DaemonEvent; + +// ============================================================================ +// DaemonState +// ============================================================================ + +/// Shared state owned by the daemon. +/// +/// This contains all the resources that components and services need access to. +/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`]. +pub struct DaemonState { + // Event bus + event_tx: broadcast::Sender, + + // Configuration (mutable - can be reloaded) + settings: RwLock, + + // Encryption key (immutable - derived at startup) + encryption_key: [u8; 32], + + // Database handles + history_db: HistoryDatabase, + store: SqliteStore, +} + +// ============================================================================ +// DaemonHandle +// ============================================================================ + +/// A lightweight handle to the daemon's shared state. +/// +/// This is the primary way for components, gRPC services, and spawned tasks to +/// interact with the daemon. It provides access to: +/// +/// - Event emission and subscription +/// - Configuration (settings, encryption key) +/// - Database handles +/// +/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed +/// around to any code that needs daemon access. +/// +/// # Example +/// +/// ```ignore +/// // Emit an event +/// handle.emit(DaemonEvent::HistoryPruned); +/// +/// // Access settings +/// let settings = handle.settings().await; +/// let sync_freq = settings.daemon.sync_frequency; +/// +/// // Access database +/// let history = handle.history_db().load(id).await?; +/// ``` +#[derive(Clone)] +pub struct DaemonHandle { + state: Arc, +} + +impl DaemonHandle { + // ---- Events ---- + + /// Emit an event to the daemon's event bus. + /// + /// This is fire-and-forget - if no receivers are listening (which shouldn't + /// happen in normal operation), the event is dropped silently. + pub fn emit(&self, event: DaemonEvent) { + if let Err(e) = self.state.event_tx.send(event) { + tracing::warn!("failed to emit event (no receivers?): {e}"); + } + } + + /// Subscribe to the event bus. + /// + /// Returns a receiver that will receive all events emitted after this call. + /// Useful for components that need to listen for events outside of the + /// normal `handle_event` callback flow. + pub fn subscribe(&self) -> broadcast::Receiver { + self.state.event_tx.subscribe() + } + + /// Request graceful shutdown of the daemon. + pub fn shutdown(&self) { + self.emit(DaemonEvent::ShutdownRequested); + } + + // ---- Configuration ---- + + /// Get the current settings. + /// + /// This acquires a read lock on the settings. For most use cases, clone + /// the settings if you need to hold onto them. + pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> { + self.state.settings.read().await + } + + /// Reload settings from disk and emit a SettingsReloaded event. + /// + /// Components listening for `SettingsReloaded` can then re-read settings + /// via `handle.settings()` to pick up the changes. + pub async fn reload_settings(&self) -> Result<()> { + let new_settings = Settings::new()?; + *self.state.settings.write().await = new_settings; + self.emit(DaemonEvent::SettingsReloaded); + tracing::info!("settings reloaded"); + Ok(()) + } + + /// Get the encryption key. + pub fn encryption_key(&self) -> &[u8; 32] { + &self.state.encryption_key + } + + // ---- Database ---- + + /// Get a reference to the history database. + pub fn history_db(&self) -> &HistoryDatabase { + &self.state.history_db + } + + /// Get a reference to the record store. + pub fn store(&self) -> &SqliteStore { + &self.state.store + } +} + +impl std::fmt::Debug for DaemonHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DaemonHandle").finish_non_exhaustive() + } +} + +// ============================================================================ +// Component Trait +// ============================================================================ + +/// A daemon component that handles a specific domain. +/// +/// Components are the building blocks of the daemon. Each component: +/// +/// - Has a unique name for logging and debugging +/// - Can optionally expose gRPC services +/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources +/// - Handles events from the event bus +/// - Performs cleanup on shutdown +/// +/// # Lifecycle +/// +/// 1. **Construction**: Component is created (usually via `new()`) +/// 2. **Start**: `start()` is called with a [`DaemonHandle`] +/// 3. **Running**: `handle_event()` is called for each event on the bus +/// 4. **Shutdown**: `stop()` is called for cleanup +/// +/// # Example +/// +/// ```ignore +/// pub struct MyComponent { +/// handle: Option, +/// } +/// +/// #[async_trait] +/// impl Component for MyComponent { +/// fn name(&self) -> &'static str { "my-component" } +/// +/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> { +/// self.handle = Some(handle); +/// Ok(()) +/// } +/// +/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { +/// match event { +/// DaemonEvent::SomeEvent => { +/// // Handle the event +/// if let Some(handle) = &self.handle { +/// handle.emit(DaemonEvent::ResponseEvent); +/// } +/// } +/// _ => {} +/// } +/// Ok(()) +/// } +/// +/// async fn stop(&mut self) -> Result<()> { +/// Ok(()) +/// } +/// } +/// ``` +#[tonic::async_trait] +pub trait Component: Send + Sync { + /// Human-readable name for logging and debugging. + fn name(&self) -> &'static str; + + /// Called once at startup. + /// + /// Store the handle if you need to emit events or access daemon resources + /// later. The handle is cheaply cloneable, so feel free to clone it for + /// spawned tasks. + async fn start(&mut self, handle: DaemonHandle) -> Result<()>; + + /// Handle an incoming event. + /// + /// Called for every event on the bus. To emit new events in response, + /// use the handle stored during `start()`. Events emitted here will be + /// processed in subsequent event loop iterations. + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; + + /// Called on graceful shutdown. + /// + /// Use this to clean up resources, abort spawned tasks, etc. + async fn stop(&mut self) -> Result<()>; +} + +// ============================================================================ +// Daemon +// ============================================================================ + +/// The main daemon orchestrator. +/// +/// The daemon manages components, runs the event loop, and coordinates startup +/// and shutdown. It is constructed via [`DaemonBuilder`]. +/// +/// # Event Loop +/// +/// The daemon runs a simple event loop: +/// +/// 1. Wait for an event on the bus +/// 2. Dispatch the event to all components (in registration order) +/// 3. Components may emit new events in response +/// 4. Repeat until `ShutdownRequested` is received +/// +/// Events emitted during handling are queued and processed in subsequent +/// iterations, ensuring the loop eventually drains. +pub struct Daemon { + components: Vec>, + handle: DaemonHandle, +} + +impl Daemon { + /// Create a new daemon builder. + pub fn builder(settings: Settings) -> DaemonBuilder { + DaemonBuilder::new(settings) + } + + /// Get a clone of the daemon handle. + /// + /// The handle can be used to emit events, access settings, etc. + pub fn handle(&self) -> DaemonHandle { + self.handle.clone() + } + + /// Start all components. + /// + /// This must be called before `run_event_loop()`. It initializes all + /// registered components with the daemon handle. + pub async fn start_components(&mut self) -> Result<()> { + for component in &mut self.components { + tracing::info!(component = component.name(), "starting component"); + component + .start(self.handle.clone()) + .await + .with_context(|| format!("failed to start component: {}", component.name()))?; + } + Ok(()) + } + + /// Run the daemon event loop. + /// + /// This processes events until a ShutdownRequested event is received. + /// Components must be started first via `start_components()`. + pub async fn run_event_loop(&mut self) -> Result<()> { + let mut event_rx = self.handle.subscribe(); + loop { + match event_rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => { + tracing::info!("shutdown requested, stopping daemon"); + break; + } + Ok(event) => { + tracing::debug!(?event, "processing event"); + self.dispatch_event(&event).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "event receiver lagged, some events were dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::info!("event bus closed, stopping daemon"); + break; + } + } + } + Ok(()) + } + + /// Stop all components. + /// + /// This performs graceful shutdown of all components. + pub async fn stop_components(&mut self) { + for component in &mut self.components { + tracing::info!(component = component.name(), "stopping component"); + if let Err(e) = component.stop().await { + tracing::error!( + component = component.name(), + error = ?e, + "error stopping component" + ); + } + } + tracing::info!("all components stopped"); + } + + /// Run the daemon. + /// + /// This is a convenience method that starts components, runs the event loop, + /// and handles shutdown. It does not return until the daemon is shut down. + pub async fn run(mut self) -> Result<()> { + self.start_components().await?; + self.run_event_loop().await?; + self.stop_components().await; + tracing::info!("daemon stopped"); + Ok(()) + } + + async fn dispatch_event(&mut self, event: &DaemonEvent) { + for component in &mut self.components { + if let Err(e) = component.handle_event(event).await { + tracing::error!( + component = component.name(), + error = ?e, + "error handling event" + ); + } + } + } +} + +// ============================================================================ +// DaemonBuilder +// ============================================================================ + +/// Builder for constructing a [`Daemon`]. +/// +/// # Example +/// +/// ```ignore +/// let daemon = Daemon::builder(settings) +/// .store(store) +/// .history_db(history_db) +/// .component(HistoryComponent::new()) +/// .component(SearchComponent::new()) +/// .component(SyncComponent::new()) +/// .build() +/// .await?; +/// +/// daemon.run().await?; +/// ``` +pub struct DaemonBuilder { + settings: Settings, + store: Option, + history_db: Option, + components: Vec>, +} + +impl DaemonBuilder { + /// Create a new daemon builder with the given settings. + pub fn new(settings: Settings) -> Self { + Self { + settings, + store: None, + history_db: None, + components: Vec::new(), + } + } + + /// Set the record store. + pub fn store(mut self, store: SqliteStore) -> Self { + self.store = Some(store); + self + } + + /// Set the history database. + pub fn history_db(mut self, db: HistoryDatabase) -> Self { + self.history_db = Some(db); + self + } + + /// Register a component. + /// + /// Components are started in registration order and stopped in reverse order. + pub fn component(mut self, component: impl Component + 'static) -> Self { + self.components.push(Box::new(component)); + self + } + + /// Build the daemon. + /// + /// This loads the encryption key and creates the daemon state. + pub async fn build(self) -> Result { + let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?; + let history_db = self + .history_db + .ok_or_else(|| eyre::eyre!("history_db is required"))?; + + // Load encryption key + let encryption_key: [u8; 32] = encryption::load_key(&self.settings) + .context("could not load encryption key")? + .into(); + + // Create the event bus + let (event_tx, _) = broadcast::channel(64); + + // Create the shared state + let state = Arc::new(DaemonState { + event_tx, + settings: RwLock::new(self.settings), + encryption_key, + history_db, + store, + }); + + // Create the handle (just a reference to the state) + let handle = DaemonHandle { state }; + + Ok(Daemon { + components: self.components, + handle, + }) + } +} diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs new file mode 100644 index 00000000..4e6c6ff3 --- /dev/null +++ b/crates/atuin-daemon/src/events.rs @@ -0,0 +1,74 @@ +//! Daemon events. +//! +//! Events are the primary communication mechanism within the daemon. +//! Components emit events to notify others of state changes, and handle +//! events to react to changes elsewhere in the system. +//! +//! External processes (like CLI commands) can also inject events via the +//! Control gRPC service. + +use atuin_client::history::{History, HistoryId}; +use atuin_common::record::RecordId; + +/// Events that flow through the daemon's event bus. +/// +/// Events are broadcast to all components. Each component decides which +/// events it cares about in its `handle_event` implementation. +#[derive(Debug, Clone)] +pub enum DaemonEvent { + // ---- History lifecycle ---- + /// A command has started running. + HistoryStarted(History), + + /// A command has finished running. + HistoryEnded(History), + + // ---- Sync ---- + /// Records were synced from the server. + /// + /// The search component uses this to update its index with new history. + RecordsAdded(Vec), + + /// Sync completed successfully. + SyncCompleted { + /// Number of records uploaded. + uploaded: usize, + /// Number of records downloaded. + downloaded: usize, + }, + + /// Sync failed. + SyncFailed { + /// Error message describing what went wrong. + error: String, + }, + + /// Request an immediate sync (external trigger). + ForceSync, + + // ---- External commands ---- + /// History was pruned - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin history prune` or similar. + HistoryPruned, + + /// History was rebuilt - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin store rebuild history` or similar. + HistoryRebuilt, + + /// Specific history items were deleted. + /// + /// The search component should remove these from its index. + HistoryDeleted { + /// IDs of the deleted history entries. + ids: Vec, + }, + + /// Settings have changed, components should reload if needed. + SettingsReloaded, + + // ---- Lifecycle ---- + /// Request graceful shutdown of the daemon. + ShutdownRequested, +} diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs deleted file mode 100644 index 57f5b2cf..00000000 --- a/crates/atuin-daemon/src/history.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs new file mode 100644 index 00000000..b71853df --- /dev/null +++ b/crates/atuin-daemon/src/history/mod.rs @@ -0,0 +1,6 @@ +//! History module for the daemon gRPC history service. +//! +//! This module contains the proto-generated types for the history gRPC service. + +// Include the generated proto code +tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs index e00060bc..6dc04db3 100644 --- a/crates/atuin-daemon/src/lib.rs +++ b/crates/atuin-daemon/src/lib.rs @@ -1,3 +1,110 @@ +use atuin_client::database::Sqlite as HistoryDatabase; +use atuin_client::{record::sqlite_store::SqliteStore, settings::Settings}; +use eyre::Result; + pub mod client; +pub mod components; +pub mod control; +pub mod daemon; +pub mod events; pub mod history; +pub mod search; pub mod server; + +// Re-export core daemon types for convenience +pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle}; +pub use events::DaemonEvent; + +// Re-export components +pub use components::{HistoryComponent, SearchComponent, SyncComponent}; + +// Re-export client helpers +pub use client::{ControlClient, emit_event, emit_event_with_settings}; + +/// Boot the daemon using the new component-based architecture. +/// +/// This creates a daemon with the standard components (history, search, sync), +/// starts the gRPC server with their services, and runs the event loop. +pub async fn boot( + settings: Settings, + store: SqliteStore, + history_db: HistoryDatabase, +) -> Result<()> { + // Create the components + let history_component = HistoryComponent::new(); + let search_component = SearchComponent::new(); + let sync_component = SyncComponent::new(); + + // Get the gRPC services before moving components into the daemon + // (The services share state with the components via Arc) + let history_service = history_component.grpc_service(); + let search_service = search_component.grpc_service(); + + // Build the daemon + let mut daemon = Daemon::builder(settings.clone()) + .store(store) + .history_db(history_db) + .component(history_component) + .component(search_component) + .component(sync_component) + .build() + .await?; + + // Get a handle for the control service and gRPC server shutdown + let handle = daemon.handle(); + + // Create the control service + let control_service = control::ControlService::new(handle.clone()); + + // Start all components first (so gRPC services can work) + daemon.start_components().await?; + + // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM + let signal_handle = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + tracing::info!("received shutdown signal"); + signal_handle.shutdown(); + }); + + // Start the gRPC server in the background + server::run_grpc_server( + settings, + history_service, + search_service, + control_service.into_server(), + handle, + ) + .await?; + + // Run the daemon event loop + daemon.run_event_loop().await?; + + // Stop all components on shutdown + daemon.stop_components().await; + + tracing::info!("daemon shut down complete"); + Ok(()) +} + +/// Wait for a shutdown signal (Ctrl+C or SIGTERM). +#[cfg(unix)] +async fn shutdown_signal() { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register sigterm handler"); + let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register sigint handler"); + + tokio::select! { + _ = term.recv() => {}, + _ = int.recv() => {}, + } +} + +/// Wait for a shutdown signal (Ctrl+C). +#[cfg(not(unix))] +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c"); +} diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs new file mode 100644 index 00000000..b15b057f --- /dev/null +++ b/crates/atuin-daemon/src/search/index.rs @@ -0,0 +1,572 @@ +//! Search index with frecency-based ranking. +//! +//! This module provides a deduplicated search index where each unique command +//! is stored once, with metadata about all its invocations. This enables: +//! +//! - Efficient fuzzy matching (fewer items to match) +//! - Frecency-based ranking (frequency + recency) +//! - Dynamic filtering by directory, host, session, etc. + +use std::{collections::HashMap, sync::Arc}; + +use atuin_client::history::History; +use dashmap::{DashMap, DashSet}; +use nucleo::{Injector, Nucleo, pattern}; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{Level, instrument}; + +use crate::components::search::with_trailing_slash; + +/// Data for a single invocation of a command. +#[derive(Debug, Clone)] +pub struct Invocation { + /// When the command was run. + pub timestamp: i64, + /// The working directory when the command was run. + #[allow(dead_code)] + pub cwd: String, + /// The hostname where the command was run. + #[allow(dead_code)] + pub hostname: String, + /// The session ID. + #[allow(dead_code)] + pub session: String, + /// The history entry ID (for returning in search results). + pub history_id: String, +} + +impl From<&History> for Invocation { + fn from(history: &History) -> Self { + Self { + timestamp: history.timestamp.unix_timestamp(), + cwd: history.cwd.clone(), + hostname: history.hostname.clone(), + session: history.session.clone(), + history_id: history.id.0.clone(), + } + } +} + +/// Pre-computed frecency data for O(1) lookup. +#[derive(Debug, Clone, Default)] +pub struct FrecencyData { + /// Total number of times this command was used. + pub count: u32, + /// Most recent usage timestamp (unix seconds). + pub last_used: i64, +} + +impl FrecencyData { + /// Record a new usage of this command. + pub fn record_use(&mut self, timestamp: i64) { + self.count += 1; + if timestamp > self.last_used { + self.last_used = timestamp; + } + } + + /// Compute frecency score based on count and recency. + /// + /// Uses a decay function where more recent commands score higher. + /// The formula balances frequency (how often) with recency (how recent). + #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] + pub fn compute(&self, now: i64) -> u32 { + if self.count == 0 { + return 0; + } + + // Time-based decay: score decreases as time passes + let age_seconds = (now - self.last_used).max(0) as u64; + let age_hours = age_seconds / 3600; + + // Decay factor: recent commands get higher scores + // - Last hour: multiplier ~1.0 + // - Last day: multiplier ~0.5 + // - Last week: multiplier ~0.1 + // - Older: multiplier approaches 0 + let recency_score = match age_hours { + 0 => 100, + 1..=6 => 90, + 7..=24 => 70, + 25..=72 => 50, + 73..=168 => 30, + 169..=720 => 15, + _ => 5, + }; + + // Frequency boost: more uses = higher score (with diminishing returns) + let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0) as u32; + + // Combined score + recency_score + frequency_score + } +} + +/// Data for a unique command, including all its invocations. +pub struct CommandData { + /// The command text (stored for debugging/logging purposes). + #[allow(dead_code)] + pub command: String, + /// All invocations of this command, sorted by timestamp (newest first). + pub invocations: Vec, + /// Pre-computed global frecency. + pub global_frecency: FrecencyData, + + // Pre-computed indexes for O(1) filter lookups + /// All directories where this command has been run. + directories: DashSet, + /// All hostnames where this command has been run. + hosts: DashSet, + /// All sessions where this command has been run. + sessions: DashSet, +} + +impl CommandData { + /// Create a new CommandData from a history entry. + pub fn new(history: &History) -> Self { + let mut data = Self { + command: history.command.clone(), + invocations: Vec::new(), + global_frecency: FrecencyData::default(), + directories: DashSet::new(), + hosts: DashSet::new(), + sessions: DashSet::new(), + }; + data.add_invocation(history); + data + } + + /// Add an invocation from a history entry. + pub fn add_invocation(&mut self, history: &History) { + let timestamp = history.timestamp.unix_timestamp(); + + // Update global frecency + self.global_frecency.record_use(timestamp); + + // Update pre-computed indexes for O(1) filter lookups + self.directories.insert(with_trailing_slash(&history.cwd)); + self.hosts.insert(history.hostname.clone()); + self.sessions.insert(history.session.clone()); + + let invocation = Invocation::from(history); + + // Insert sorted by timestamp (newest first) + let pos = self + .invocations + .iter() + .position(|inv| inv.timestamp < timestamp) + .unwrap_or(self.invocations.len()); + self.invocations.insert(pos, invocation); + } + + /// Get the most recent history ID for this command. + pub fn most_recent_id(&self) -> Option<&str> { + self.invocations.first().map(|inv| inv.history_id.as_str()) + } + + /// Check if any invocation matches a directory filter (exact match). + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_dir(&self, dir: &str) -> bool { + self.directories.contains(dir) + } + + /// Check if any invocation matches a directory prefix (workspace/git root). + /// O(n) where n = number of unique directories for this command. + pub fn has_invocation_in_workspace(&self, prefix: &str) -> bool { + self.directories.iter().any(|d| d.starts_with(prefix)) + } + + /// Check if any invocation matches a hostname. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_on_host(&self, hostname: &str) -> bool { + self.hosts.contains(hostname) + } + + /// Check if any invocation matches a session. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_session(&self, session: &str) -> bool { + self.sessions.contains(session) + } +} + +/// Filter mode for search queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum IndexFilterMode { + /// No filtering - search all commands. + Global, + /// Filter to commands run in a specific directory. + Directory(String), + /// Filter to commands run in a workspace (directory prefix). + Workspace(String), + /// Filter to commands run on a specific host. + Host(String), + /// Filter to commands run in a specific session. + Session(String), +} + +/// Context for search queries. +#[derive(Debug, Clone, Default)] +pub struct QueryContext { + pub cwd: Option, + pub git_root: Option, + pub hostname: Option, + pub session_id: Option, +} + +/// A deduplicated search index with frecency-based ranking. +/// +/// Commands are stored by their text, with metadata about all invocations. +/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. +/// +/// Global frecency is precomputed by a background task and used for scoring. +/// If frecency data is not available, search still works but without frecency ranking; +/// although this should never happen due to precomputing the frecency map. +pub struct SearchIndex { + /// Map from command text to command data. + /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. + commands: Arc>, + /// Nucleo fuzzy matcher - items are command strings. + nucleo: RwLock>, + /// Injector for adding new commands to Nucleo. + injector: Injector, + /// Precomputed global frecency map (command -> frecency score). + /// Updated by background task. If None, search works without frecency. + frecency_map: RwLock>>>, +} + +impl SearchIndex { + /// Create a new empty search index. + pub fn new() -> Self { + let nucleo_config = nucleo::Config::DEFAULT; + // Single column for command text + let nucleo = Nucleo::::new(nucleo_config, Arc::new(|| {}), None, 1); + let injector = nucleo.injector(); + + Self { + commands: Arc::new(DashMap::new()), + nucleo: RwLock::new(nucleo), + injector, + frecency_map: RwLock::new(None), + } + } + + /// Add a history entry to the index. + /// + /// If the command already exists, updates its invocation data. + /// If it's a new command, adds it to both the map and Nucleo. + pub fn add_history(&self, history: &History) { + let command = &history.command; + + if let Some(mut entry) = self.commands.get_mut(command) { + // Existing command - just update invocations + entry.add_invocation(history); + } else { + // New command - add to both map and Nucleo + let data = CommandData::new(history); + self.commands.insert(command.clone(), data); + self.injector.push(command.clone(), |cmd, cols| { + cols[0] = cmd.clone().into(); + }); + } + // Note: frecency_map is rebuilt by background task, not invalidated here + } + + /// Add multiple history entries to the index. + pub fn add_histories(&self, histories: &[History]) { + for history in histories { + self.add_history(history); + } + } + + /// Get the number of unique commands in the index. + pub fn command_count(&self) -> usize { + self.commands.len() + } + + /// Get the number of items in Nucleo (should match command_count). + pub async fn nucleo_item_count(&self) -> u32 { + self.nucleo.read().await.snapshot().item_count() + } + + /// Search for commands matching a query. + /// + /// Returns a list of history IDs (most recent invocation per command). + /// Uses precomputed global frecency for scoring if available. + #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] + pub async fn search( + &self, + query: &str, + filter_mode: IndexFilterMode, + _context: &QueryContext, + limit: u32, + ) -> Vec { + let mut nucleo = self.nucleo.write().await; + + // Get precomputed frecency map (may be None if not yet computed) + let frecency_map = self.frecency_map.read().await.clone(); + + // Build filter based on mode + let filter = self.build_filter(&filter_mode); + nucleo.set_filter(filter); + + // Build scorer from precomputed frecency (or None if not available) + let scorer = Self::build_scorer(frecency_map); + nucleo.set_scorer(scorer); + + // Update pattern + nucleo.pattern.reparse( + 0, + query, + pattern::CaseMatching::Smart, + pattern::Normalization::Smart, + false, + ); + + tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { + // Tick until complete + while nucleo.tick(10).running {} + }); + + // Collect results + let snapshot = nucleo.snapshot(); + let matched_count = snapshot.matched_item_count().min(limit); + + tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { + snapshot + .matched_items(..matched_count) + .filter_map(|item| { + let cmd = item.data; + self.commands + .get(cmd) + .and_then(|data| data.most_recent_id().map(|s| s.to_string())) + }) + .collect() + }) + } + + /// Rebuild the global frecency map. + /// + /// This should be called by a background task periodically. + /// The map is used for scoring search results. + #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] + pub async fn rebuild_frecency(&self) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let mut frecency_map: HashMap = HashMap::new(); + + for entry in self.commands.iter() { + let frecency = entry.global_frecency.compute(now); + frecency_map.insert(entry.key().clone(), frecency); + } + + *self.frecency_map.write().await = Some(Arc::new(frecency_map)); + } + + /// Build filter predicate for the given mode. + fn build_filter(&self, mode: &IndexFilterMode) -> Option> { + // For Global mode, no filter needed + if matches!(mode, IndexFilterMode::Global) { + return None; + } + + // Pre-compute which commands pass the filter + let passing_commands: Arc> = { + let mut set = std::collections::HashSet::new(); + for entry in self.commands.iter() { + let passes = match mode { + IndexFilterMode::Global => unreachable!(), + IndexFilterMode::Directory(dir) => entry.has_invocation_in_dir(dir), + IndexFilterMode::Workspace(prefix) => entry.has_invocation_in_workspace(prefix), + IndexFilterMode::Host(hostname) => entry.has_invocation_on_host(hostname), + IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), + }; + if passes { + set.insert(entry.key().clone()); + } + } + Arc::new(set) + }; + + Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) + } + + /// Build scorer from precomputed frecency map. + /// + /// Returns None if frecency map is not available (search still works, just without frecency ranking). + fn build_scorer( + frecency_map: Option>>, + ) -> Option> { + let map = frecency_map?; + Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { + let frecency = map.get(cmd).copied().unwrap_or(0); + fuzzy_score + frecency + })) + } +} + +impl Default for SearchIndex { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::macros::datetime; + + fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History { + History::import() + .timestamp(timestamp) + .command(command) + .cwd(cwd) + .build() + .into() + } + + #[test] + fn frecency_data_compute() { + let now = 1000000i64; + + // Recent command + let recent = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago + }; + assert!(recent.compute(now) > 100); // High score + + // Old command + let old = FrecencyData { + count: 5, + last_used: now - 86400 * 30, // 30 days ago + }; + assert!(old.compute(now) < recent.compute(now)); + + // Frequently used old command + let frequent_old = FrecencyData { + count: 100, + last_used: now - 86400 * 7, // 1 week ago + }; + // Should still have decent score due to frequency + assert!(frequent_old.compute(now) > 50); + } + + #[test] + fn command_data_add_invocation() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&history1); + assert_eq!(data.invocations.len(), 1); + assert_eq!(data.global_frecency.count, 1); + + data.add_invocation(&history2); + assert_eq!(data.invocations.len(), 2); + assert_eq!(data.global_frecency.count, 2); + + // Most recent should be first + assert_eq!(data.invocations[0].cwd, dir2); + assert_eq!(data.invocations[1].cwd, dir1); + } + + #[test] + fn command_data_filters() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&h1); + data.add_invocation(&h2); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User\\project"), + with_trailing_slash("C:\\Users\\User\\other"), + with_trailing_slash("C:\\Users\\User\\missing"), + ) + } else { + ( + with_trailing_slash("/home/user/project"), + with_trailing_slash("/home/user/other"), + with_trailing_slash("/home/user/missing"), + ) + }; + + assert!(data.has_invocation_in_dir(&check1)); + assert!(data.has_invocation_in_dir(&check2)); + assert!(!data.has_invocation_in_dir(&check3)); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User"), + with_trailing_slash("C:\\Users"), + with_trailing_slash("C:\\Users\\User\\var"), + ) + } else { + ( + with_trailing_slash("/home/user"), + with_trailing_slash("/home"), + with_trailing_slash("/var"), + ) + }; + + assert!(data.has_invocation_in_workspace(&check1)); + assert!(data.has_invocation_in_workspace(&check2)); + assert!(!data.has_invocation_in_workspace(&check3)); + } + + #[tokio::test] + async fn search_index_add_and_search() { + let index = SearchIndex::new(); + + let h1 = make_history( + "git status", + "/home/user/project", + datetime!(2024-01-01 10:00 UTC), + ); + let h2 = make_history( + "git commit -m 'test'", + "/home/user/project", + datetime!(2024-01-01 10:05 UTC), + ); + let h3 = make_history( + "ls -la", + "/home/user/other", + datetime!(2024-01-01 10:10 UTC), + ); + + index.add_history(&h1); + index.add_history(&h2); + index.add_history(&h3); + + assert_eq!(index.command_count(), 3); + + // Search for "git" - should match 2 commands + let results = index + .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) + .await; + assert_eq!(results.len(), 2); + + // Search with directory filter + let results = index + .search( + "", + IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), + &QueryContext::default(), + 10, + ) + .await; + assert_eq!(results.len(), 2); // git status and git commit + } +} diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs new file mode 100644 index 00000000..4d261956 --- /dev/null +++ b/crates/atuin-daemon/src/search/mod.rs @@ -0,0 +1,11 @@ +//! Search module for the daemon gRPC search service. +//! +//! This module provides fuzzy search over command history using Nucleo. + +mod index; + +// Include the generated proto code +tonic::include_proto!("search"); + +// Re-export the service and index +pub use index::{IndexFilterMode, QueryContext, SearchIndex}; diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs index 826d6191..a11de612 100644 --- a/crates/atuin-daemon/src/server.rs +++ b/crates/atuin-daemon/src/server.rs @@ -1,249 +1,49 @@ -use eyre::WrapErr; - -use atuin_client::encryption; -use atuin_client::history::store::HistoryStore; -use atuin_client::record::sqlite_store::SqliteStore; -use atuin_client::settings::Settings; -use std::io::ErrorKind; -#[cfg(unix)] -use std::path::PathBuf; -use std::sync::Arc; -use time::OffsetDateTime; -use tokio::sync::watch; -use tracing::{Level, instrument}; - -use atuin_client::database::{Database, Sqlite as HistoryDatabase}; -use atuin_client::history::{History, HistoryId}; -use dashmap::DashMap; use eyre::Result; -use tonic::{Request, Response, Status, transport::Server}; - -use crate::history::history_server::{History as HistorySvc, HistoryServer}; - -use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest}; -use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest}; - -mod sync; - -const DAEMON_PROTOCOL_VERSION: u32 = 1; - -#[derive(Debug)] -pub struct HistoryService { - // A store for WIP history - // This is history that has not yet been completed, aka a command that's current running. - running: Arc>, - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender, -} - -impl HistoryService { - pub fn new( - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender, - ) -> Self { - Self { - running: Arc::new(DashMap::new()), - store, - history_db, - shutdown_tx, - } - } -} - -#[tonic::async_trait()] -impl HistorySvc for HistoryService { - #[instrument(skip_all, level = Level::INFO)] - async fn start_history( - &self, - request: Request, - ) -> Result, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { - Status::invalid_argument( - "failed to parse timestamp as unix time (expected nanos since epoch)", - ) - })?; - - let mut h: History = History::daemon() - .timestamp(timestamp) - .command(req.command) - .cwd(req.cwd) - .session(req.session) - .hostname(req.hostname) - .build() - .into(); - if !req.author.trim().is_empty() { - h.author = req.author; - } - if !req.intent.trim().is_empty() { - h.intent = Some(req.intent); - } - - // The old behaviour had us inserting half-finished history records into the database - // The new behaviour no longer allows that. - // History that's running is stored in-memory by the daemon, and only committed when - // complete. - // If anyone relied on the old behaviour, we could perhaps insert to the history db here - // too. I'd rather keep it pure, unless that ends up being the case. - let id = h.id.clone(); - tracing::info!(id = id.to_string(), "start history"); - running.insert(id.clone(), h); - - let reply = StartHistoryReply { - id: id.to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn end_history( - &self, - request: Request, - ) -> Result, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let id = HistoryId(req.id); - - if let Some((_, mut history)) = running.remove(&id) { - history.exit = req.exit; - history.duration = match req.duration { - 0 => i64::try_from( - (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), - ) - .expect("failed to convert calculated duration to i64"), - value => i64::try_from(value).expect("failed to get i64 duration"), - }; - - // Perhaps allow the incremental build to handle this entirely. - self.history_db - .save(&history) - .await - .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); - - let (id, idx) = - self.store.push(history).await.map_err(|e| { - Status::internal(format!("failed to push record to store: {e:?}")) - })?; - - let reply = EndHistoryReply { - id: id.0.to_string(), - idx, - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - return Ok(Response::new(reply)); - } - - Err(Status::not_found(format!( - "could not find history with id: {id}" - ))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn status( - &self, - _request: Request, - ) -> Result, Status> { - let reply = StatusReply { - // If status RPC responds, the daemon control plane is healthy. - healthy: true, - version: env!("CARGO_PKG_VERSION").to_string(), - pid: std::process::id(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - #[instrument(skip_all, level = Level::INFO)] - async fn shutdown( - &self, - _request: Request, - ) -> Result, Status> { - let _ = self.shutdown_tx.send(true); - Ok(Response::new(ShutdownReply { accepted: true })) - } -} - -#[cfg(unix)] -async fn shutdown_signal(socket: Option, mut shutdown_rx: watch::Receiver) { - let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to register sigterm handler"); - let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("failed to register sigint handler"); +use crate::components::history::HistoryGrpcService; +use crate::components::search::SearchGrpcService; +use crate::control::{ControlService, control_server::ControlServer}; +use crate::daemon::DaemonHandle; +use crate::history::history_server::HistoryServer; +use crate::search::search_server::SearchServer; - tokio::select! { - _ = term.recv() => {}, - _ = int.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - - eprintln!("Removing socket..."); - if let Some(socket) = socket { - match std::fs::remove_file(socket) { - Ok(()) => {} - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - eprintln!("failed to remove socket: {err}"); - } - } - } - eprintln!("Shutting down..."); -} - -#[cfg(windows)] -async fn shutdown_signal(mut shutdown_rx: watch::Receiver) { - let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler"); - tokio::select! { - _ = ctrl_c.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - eprintln!("Shutting down..."); -} +use atuin_client::settings::Settings; +/// Run the gRPC server with the given services. +/// +/// This starts the gRPC server in the background and returns immediately. +/// The server will shut down when a ShutdownRequested event is received. #[cfg(unix)] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver, + history_service: HistoryServer, + search_service: SearchServer, + control_service: ControlServer, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; - let socket_path = settings.daemon.socket_path; + let socket_path = settings.daemon.socket_path.clone(); let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket { #[cfg(target_os = "linux")] { - use eyre::OptionExt; + use eyre::{OptionExt, WrapErr}; + use std::os::unix::net::SocketAddr; + use std::path::PathBuf; tracing::info!("getting systemd socket"); let listener = listenfd::ListenFd::from_env() .take_unix_listener(0)? .ok_or_eyre("missing systemd socket")?; listener.set_nonblocking(true)?; - let actual_path = listener + let actual_path: Result = listener .local_addr() .context("getting systemd socket's path") - .and_then(|addr| { + .and_then(|addr: SocketAddr| { addr.as_pathname() .ok_or_eyre("systemd socket missing path") - .map(|path| path.to_owned()) + .map(|path: &std::path::Path| path.to_owned()) }); match actual_path { Ok(actual_path) => { @@ -271,66 +71,94 @@ async fn start_server( let uds_stream = UnixListenerStream::new(uds); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown( - uds_stream, - shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx), - ) - .await?; + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + let mut rx = handle.subscribe(); + loop { + use crate::DaemonEvent; + + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + if cleanup { + eprintln!("Removing socket..."); + if let Err(e) = std::fs::remove_file(&socket_path) + && e.kind() != std::io::ErrorKind::NotFound + { + eprintln!("failed to remove socket: {e}"); + } + } + eprintln!("Shutting down gRPC server..."); + }; + + // Spawn the server in the background + tokio::spawn(async move { + use tonic::transport::Server; + + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(uds_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); Ok(()) } +/// Run the gRPC server with the given services (Windows/TCP version). #[cfg(not(unix))] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver, + history_service: HistoryServer, + search_service: SearchServer, + control_service: ControlServer, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; + use tonic::transport::Server; let port = settings.daemon.tcp_port; let url = format!("127.0.0.1:{port}"); - let tcp = TcpListener::bind(url).await?; + let tcp = TcpListener::bind(&url).await?; let tcp_stream = TcpListenerStream::new(tcp); tracing::info!("listening on tcp port {:?}", port); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx)) - .await?; - Ok(()) -} - -// break the above down when we end up with multiple services - -/// Listen on a unix socket -/// Pass the path to the socket -pub async fn listen( - settings: Settings, - store: SqliteStore, - history_db: HistoryDatabase, -) -> Result<()> { - let encryption_key: [u8; 32] = encryption::load_key(&settings) - .context("could not load encryption key")? - .into(); - - let host_id = Settings::host_id().await?; - let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + use crate::DaemonEvent; - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx); + let mut rx = handle.subscribe(); + loop { + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + eprintln!("Shutting down gRPC server..."); + }; - // start services - tokio::spawn(sync::worker( - settings.clone(), - store, - history_store, - history_db, - )); + // Spawn the server in the background + tokio::spawn(async move { + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(tcp_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); - start_server(settings, history, shutdown_rx).await + Ok(()) } diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs deleted file mode 100644 index e1e49597..00000000 --- a/crates/atuin-daemon/src/server/sync.rs +++ /dev/null @@ -1,96 +0,0 @@ -use eyre::Result; -use rand::Rng; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::database::Sqlite as HistoryDatabase; -use atuin_client::{ - encryption, - history::store::HistoryStore, - record::{sqlite_store::SqliteStore, sync}, - settings::Settings, -}; - -use atuin_dotfiles::store::{AliasStore, var::VarStore}; - -pub async fn worker( - settings: Settings, - store: SqliteStore, - history_store: HistoryStore, - history_db: HistoryDatabase, -) -> Result<()> { - tracing::info!("booting sync worker"); - - let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into(); - let host_id = Settings::host_id().await?; - let alias_store = AliasStore::new(store.clone(), host_id, encryption_key); - let var_store = VarStore::new(store.clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. No bueno! - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - ticker.tick().await; - tracing::info!("sync worker tick"); - - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - continue; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - continue; - } - - let res = sync::sync(&settings, &store).await; - - if let Err(e) = res { - tracing::error!("sync tick failed with {e}"); - - let mut rng = rand::thread_rng(); - - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - ticker = time::interval(time::Duration::from_secs(new_interval as u64)); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - - tracing::error!("backing off, next sync tick in {new_interval}"); - } else { - let (uploaded, downloaded) = res.unwrap(); - - tracing::info!( - uploaded = ?uploaded, - downloaded = ?downloaded, - "sync complete" - ); - - history_store - .incremental_build(&history_db, &downloaded) - .await?; - - alias_store.build().await?; - var_store.build().await?; - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - } - - // store sync time - Settings::save_sync_time().await?; - } - } -} diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs index 56457fa7..3b6952de 100644 --- a/crates/atuin-daemon/tests/lifecycle.rs +++ b/crates/atuin-daemon/tests/lifecycle.rs @@ -8,52 +8,97 @@ mod unix { use std::time::Duration; use atuin_client::database::Sqlite; - use atuin_client::history::store::HistoryStore; use atuin_client::record::sqlite_store::SqliteStore; - use atuin_common::record::HostId; - use atuin_common::utils::uuid_v7; + use atuin_client::settings::{Settings, init_meta_config_for_testing}; use atuin_daemon::client::HistoryClient; - use atuin_daemon::history::history_server::HistoryServer; - use atuin_daemon::server::HistoryService; + use atuin_daemon::components::HistoryComponent; + use atuin_daemon::{Daemon, DaemonHandle}; use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::sync::watch; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; /// Spins up a daemon server on a temp socket and returns a connected client, - /// the shutdown sender, and the temp dir (must be held to keep paths alive). - async fn start_test_daemon() -> (HistoryClient, watch::Sender, TempDir) { + /// the daemon handle (for shutdown), and the temp dir (must be held to keep paths alive). + async fn start_test_daemon() -> (HistoryClient, DaemonHandle, TempDir) { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("history.db"); let record_path = tmp.path().join("records.db"); + let key_path = tmp.path().join("key"); + let socket_path = tmp.path().join("test.sock"); + let meta_path = tmp.path().join("meta.db"); + + // Initialize the meta store config for testing (required for Settings::host_id()) + init_meta_config_for_testing(meta_path.to_str().unwrap(), 5.0); + + // Build settings with test paths + let settings: Settings = Settings::builder() + .expect("could not build settings builder") + .set_override("db_path", db_path.to_str().unwrap()) + .expect("failed to set db_path") + .set_override("record_store_path", record_path.to_str().unwrap()) + .expect("failed to set record_store_path") + .set_override("key_path", key_path.to_str().unwrap()) + .expect("failed to set key_path") + .set_override("daemon.socket_path", socket_path.to_str().unwrap()) + .expect("failed to set socket_path") + .set_override("meta.db_path", meta_path.to_str().unwrap()) + .expect("failed to set meta.db_path") + .build() + .expect("could not build settings") + .try_deserialize() + .expect("could not deserialize settings"); + // Create databases let history_db = Sqlite::new(&db_path, 5.0).await.unwrap(); let store = SqliteStore::new(&record_path, 5.0).await.unwrap(); - let host_id = HostId(uuid_v7()); - let encryption_key = [0u8; 32]; - let history_store = HistoryStore::new(store, host_id, encryption_key); + // Create the history component and get its gRPC service + let history_component = HistoryComponent::new(); + let history_service = history_component.grpc_service(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let service = HistoryService::new(history_store, history_db, shutdown_tx.clone()); + // Build and start the daemon + let mut daemon = Daemon::builder(settings) + .store(store) + .history_db(history_db) + .component(history_component) + .build() + .await + .unwrap(); - let socket_path = tmp.path().join("test.sock"); + let handle = daemon.handle(); + + // Start components (this initializes the history component with the handle) + daemon.start_components().await.unwrap(); + + // Start the gRPC server let uds = UnixListener::bind(&socket_path).unwrap(); let stream = UnixListenerStream::new(uds); - let mut rx = shutdown_rx.clone(); + let server_handle = handle.clone(); tokio::spawn(async move { + let mut rx = server_handle.subscribe(); Server::builder() - .add_service(HistoryServer::new(service)) + .add_service(history_service) .serve_with_incoming_shutdown(stream, async move { - let _ = rx.changed().await; + loop { + match rx.recv().await { + Ok(atuin_daemon::DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, + } + } }) .await .unwrap(); }); + // Spawn the daemon event loop in the background + tokio::spawn(async move { + daemon.run_event_loop().await.unwrap(); + }); + // Give the server a moment to bind. tokio::time::sleep(Duration::from_millis(50)).await; @@ -61,12 +106,12 @@ mod unix { .await .unwrap(); - (client, shutdown_tx, tmp) + (client, handle, tmp) } #[tokio::test] async fn test_status() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let status = client.status().await.unwrap(); assert!(status.healthy); @@ -79,7 +124,7 @@ mod unix { async fn test_start_end_history() { use atuin_client::history::History; - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let history = History::daemon() .timestamp(time::OffsetDateTime::now_utc()) @@ -102,7 +147,7 @@ mod unix { #[tokio::test] async fn test_end_unknown_history_fails() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let result = client .end_history("nonexistent-id".to_string(), 1000, 0) @@ -112,7 +157,7 @@ mod unix { #[tokio::test] async fn test_shutdown() { - let (mut client, _shutdown_tx, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let accepted = client.shutdown().await.unwrap(); assert!(accepted); diff --git a/crates/atuin/Cargo.toml b/crates/atuin/Cargo.toml index e7a0daaa..d8938121 100644 --- a/crates/atuin/Cargo.toml +++ b/crates/atuin/Cargo.toml @@ -78,10 +78,12 @@ open = "5" ratatui = { workspace = true } tracing = "0.1" tracing-subscriber = { workspace = true } +tracing-appender = "0.2" uuid = { workspace = true } sysinfo = "0.30.7" regex = "1.10.5" norm = { version = "0.1.1", features = ["fzf-v2"] } +nucleo-matcher = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" } tempfile = { workspace = true } shlex = "1.3.0" diff --git a/crates/atuin/src/command/client.rs b/crates/atuin/src/command/client.rs index 0cb0a2ae..ba55466d 100644 --- a/crates/atuin/src/command/client.rs +++ b/crates/atuin/src/command/client.rs @@ -1,4 +1,5 @@ -use std::path::PathBuf; +use std::fs::{self, OpenOptions}; +use std::path::{Path, PathBuf}; use clap::Subcommand; use eyre::{Result, WrapErr}; @@ -6,7 +7,38 @@ use eyre::{Result, WrapErr}; use atuin_client::{ database::Sqlite, record::sqlite_store::SqliteStore, settings::Settings, theme, }; -use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::{ + Layer, filter::EnvFilter, filter::LevelFilter, fmt, fmt::format::FmtSpan, prelude::*, +}; + +fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) { + let cutoff = std::time::SystemTime::now() + - std::time::Duration::from_secs(retention_days * 24 * 60 * 60); + + let Ok(entries) = fs::read_dir(log_dir) else { + return; + }; + + for entry in entries.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23" + if !name.starts_with(prefix) || name == prefix { + continue; + } + + if let Ok(metadata) = entry.metadata() + && let Ok(modified) = metadata.modified() + && modified < cutoff + { + let _ = fs::remove_file(&path); + } + } +} #[cfg(feature = "sync")] mod sync; @@ -122,18 +154,157 @@ impl Cmd { res } + #[allow(clippy::too_many_lines)] async fn run_inner( self, mut settings: Settings, mut theme_manager: theme::ThemeManager, ) -> Result<()> { - let filter = + // ATUIN_LOG env var overrides config file level settings + let env_log_set = std::env::var("ATUIN_LOG").is_ok(); + + // Base filter from env var (or empty if not set) + let base_filter = EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?); - tracing_subscriber::registry() - .with(fmt::layer()) - .with(filter) - .init(); + let is_interactive_search = matches!(&self, Self::Search(cmd) if cmd.is_interactive()); + // Use file-based logging for interactive search (TUI mode) + let use_search_logging = is_interactive_search && settings.logs.search_enabled(); + + // Use file-based logging for daemon + #[cfg(feature = "daemon")] + let use_daemon_logging = matches!(&self, Self::Daemon(_)) && settings.logs.daemon_enabled(); + + #[cfg(not(feature = "daemon"))] + let use_daemon_logging = false; + + // Check if daemon should also log to console + #[cfg(feature = "daemon")] + let daemon_show_logs = matches!(&self, Self::Daemon(cmd) if cmd.show_logs()); + + #[cfg(not(feature = "daemon"))] + let daemon_show_logs = false; + + // Set up span timing JSON logs if ATUIN_SPAN is set + let span_path = std::env::var("ATUIN_SPAN").ok().map(|p| { + if p.is_empty() { + "atuin-spans.json".to_string() + } else { + p + } + }); + + // Helper to create span timing layer + macro_rules! make_span_layer { + ($path:expr) => {{ + let span_file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open($path)?; + Some( + fmt::layer() + .json() + .with_writer(span_file) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_filter(LevelFilter::TRACE), + ) + }}; + } + + // Build the subscriber with all configured layers + if use_search_logging { + let search_filename = settings.logs.search.file.clone(); + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + // Clean up old log files + cleanup_old_logs(&log_dir, &search_filename, settings.logs.search_retention()); + + let file_appender = + RollingFileAppender::new(Rotation::DAILY, &log_dir, &search_filename); + + // Use config level unless ATUIN_LOG is set + let filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.search_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let base = tracing_subscriber::registry().with( + fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(filter), + ); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } else if use_daemon_logging { + let daemon_filename = settings.logs.daemon.file.clone(); + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + // Clean up old log files + cleanup_old_logs(&log_dir, &daemon_filename, settings.logs.daemon_retention()); + + let file_appender = + RollingFileAppender::new(Rotation::DAILY, &log_dir, &daemon_filename); + + // Use config level unless ATUIN_LOG is set + let file_filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.daemon_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let file_layer = fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(file_filter); + + // Optionally add console layer for --show-logs + if daemon_show_logs { + let console_filter = EnvFilter::from_env("ATUIN_LOG") + .add_directive("sqlx_sqlite::regexp=off".parse()?); + + let console_layer = fmt::layer().with_filter(console_filter); + + let base = tracing_subscriber::registry() + .with(file_layer) + .with(console_layer); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } else { + let base = tracing_subscriber::registry().with(file_layer); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } + } tracing::trace!(command = ?self, "client command"); diff --git a/crates/atuin/src/command/client/daemon.rs b/crates/atuin/src/command/client/daemon.rs index a92e8f8e..64547505 100644 --- a/crates/atuin/src/command/client/daemon.rs +++ b/crates/atuin/src/command/client/daemon.rs @@ -9,10 +9,7 @@ use std::time::{Duration, Instant}; use atuin_client::{ database::Sqlite, history::History, record::sqlite_store::SqliteStore, settings::Settings, }; -use atuin_daemon::{ - client::{DaemonClientErrorKind, HistoryClient, classify_error}, - server::listen, -}; +use atuin_daemon::client::{DaemonClientErrorKind, HistoryClient, classify_error}; use clap::Subcommand; #[cfg(unix)] use daemonize::Daemonize; @@ -26,6 +23,10 @@ pub struct Cmd { #[arg(long, hide = true)] daemonize: bool, + /// Also write daemon logs to the console (useful for debugging) + #[arg(long)] + show_logs: bool, + #[command(subcommand)] subcmd: Option, } @@ -37,6 +38,14 @@ pub enum SubCmd { Start { #[arg(long, hide = true)] daemonize: bool, + + /// Also write daemon logs to the console (useful for debugging) + #[arg(long)] + show_logs: bool, + + /// Force start: kill existing daemon process and reset the socket + #[arg(long)] + force: bool, }, /// Show the daemon's current status @@ -55,12 +64,21 @@ impl Cmd { #[cfg(unix)] pub fn should_daemonize(&self) -> bool { match &self.subcmd { - Some(SubCmd::Start { daemonize }) => *daemonize, + Some(SubCmd::Start { daemonize, .. }) => *daemonize, None => self.daemonize, _ => false, } } + /// Returns `true` when logs should also be written to the console. + pub fn show_logs(&self) -> bool { + match &self.subcmd { + Some(SubCmd::Start { show_logs, .. }) => *show_logs, + None => self.show_logs, + _ => false, + } + } + pub async fn run( self, settings: Settings, @@ -70,9 +88,9 @@ impl Cmd { match self.subcmd { None => { eprintln!("Warning: `atuin daemon` is deprecated, use `atuin daemon start`"); - run(settings, store, history_db).await + run(settings, store, history_db, false).await } - Some(SubCmd::Start { .. }) => run(settings, store, history_db).await, + Some(SubCmd::Start { force, .. }) => run(settings, store, history_db, force).await, Some(SubCmd::Status) => status_cmd(&settings).await, Some(SubCmd::Stop) => stop_cmd(&settings).await, Some(SubCmd::Restart) => restart_cmd(&settings).await, @@ -547,15 +565,82 @@ pub fn daemonize_current_process() -> Result<()> { Ok(()) } -async fn run(settings: Settings, store: SqliteStore, history_db: Sqlite) -> Result<()> { +async fn run( + settings: Settings, + store: SqliteStore, + history_db: Sqlite, + force: bool, +) -> Result<()> { + if force { + force_cleanup(&settings); + } + let pidfile_path = PathBuf::from(&settings.daemon.pidfile_path); let _pidfile_guard = PidfileGuard::acquire(&pidfile_path)?; - listen(settings, store, history_db).await?; + atuin_daemon::boot(settings, store, history_db).await?; Ok(()) } +/// Force cleanup: kill existing daemon process and remove socket. +fn force_cleanup(settings: &Settings) { + let pidfile_path = Path::new(&settings.daemon.pidfile_path); + + // Read and kill the existing process if pidfile exists + if pidfile_path.exists() { + if let Ok(contents) = fs::read_to_string(pidfile_path) + && let Some(pid_str) = contents.lines().next() + && let Ok(pid) = pid_str.parse::() + { + kill_process(pid); + // Give it a moment to release resources + std::thread::sleep(Duration::from_millis(100)); + } + + // Remove the pidfile + if let Err(e) = fs::remove_file(pidfile_path) + && e.kind() != ErrorKind::NotFound + { + tracing::warn!("failed to remove pidfile: {e}"); + } + } + + // Remove the socket file + #[cfg(unix)] + { + let socket_path = Path::new(&settings.daemon.socket_path); + if socket_path.exists() + && let Err(e) = fs::remove_file(socket_path) + && e.kind() != ErrorKind::NotFound + { + tracing::warn!("failed to remove socket: {e}"); + } + } +} + +/// Kill a process by PID. +#[cfg(unix)] +fn kill_process(pid: u32) { + // Use kill command to send SIGTERM for graceful shutdown + let _ = Command::new("kill") + .args(["-TERM", &pid.to_string()]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); +} + +/// Kill a process by PID. +#[cfg(not(unix))] +fn kill_process(pid: u32) { + // On Windows, use taskkill + let _ = Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/F"]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/atuin/src/command/client/history.rs b/crates/atuin/src/command/client/history.rs index c20f64a3..fe9a7e32 100644 --- a/crates/atuin/src/command/client/history.rs +++ b/crates/atuin/src/command/client/history.rs @@ -10,6 +10,9 @@ use clap::Subcommand; use eyre::{Context, Result}; use runtime_format::{FormatKey, FormatKeyError, ParseSegment, ParsedFmt}; +#[cfg(feature = "daemon")] +use atuin_daemon::emit_event; + use atuin_client::{ database::{Database, Sqlite, current_context}, encryption, @@ -624,6 +627,9 @@ impl Cmd { db.delete(entry.clone()).await?; } } + + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryPruned).await; } Ok(()) } @@ -670,6 +676,9 @@ impl Cmd { let host_id = Settings::host_id().await?; let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + #[cfg(feature = "daemon")] + let ids = matches.iter().map(|h| h.id.clone()).collect::>(); + for entry in matches { eprintln!("deleting {}", entry.id); if settings.sync.records { @@ -679,6 +688,9 @@ impl Cmd { db.delete(entry).await?; } } + + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryDeleted { ids }).await; } Ok(()) } diff --git a/crates/atuin/src/command/client/search.rs b/crates/atuin/src/command/client/search.rs index 70a25ed9..3f3687b8 100644 --- a/crates/atuin/src/command/client/search.rs +++ b/crates/atuin/src/command/client/search.rs @@ -141,6 +141,11 @@ pub struct Cmd { } impl Cmd { + /// Returns true if this search command will run in interactive (TUI) mode + pub fn is_interactive(&self) -> bool { + self.interactive + } + // clippy: please write this instead // clippy: now it has too many lines // me: I'll do it later OKAY diff --git a/crates/atuin/src/command/client/search/engines.rs b/crates/atuin/src/command/client/search/engines.rs index 5c53817e..8cbee0c3 100644 --- a/crates/atuin/src/command/client/search/engines.rs +++ b/crates/atuin/src/command/client/search/engines.rs @@ -8,12 +8,22 @@ use eyre::Result; use super::cursor::Cursor; +#[cfg(feature = "daemon")] +pub mod daemon; pub mod db; pub mod skim; -pub fn engine(search_mode: SearchMode) -> Box { +#[allow(unused)] // settings is only used if daemon feature is enabled +pub fn engine(search_mode: SearchMode, settings: &Settings) -> Box { match search_mode { SearchMode::Skim => Box::new(skim::Search::new()) as Box<_>, + #[cfg(feature = "daemon")] + SearchMode::DaemonFuzzy => Box::new(daemon::Search::new(settings)) as Box<_>, + #[cfg(not(feature = "daemon"))] + SearchMode::DaemonFuzzy => { + // Fall back to fuzzy mode if daemon feature is not enabled + Box::new(db::Search(SearchMode::Fuzzy)) as Box<_> + } mode => Box::new(db::Search(mode)) as Box<_>, } } diff --git a/crates/atuin/src/command/client/search/engines/daemon.rs b/crates/atuin/src/command/client/search/engines/daemon.rs new file mode 100644 index 00000000..d317a4f6 --- /dev/null +++ b/crates/atuin/src/command/client/search/engines/daemon.rs @@ -0,0 +1,206 @@ +use async_trait::async_trait; +use atuin_client::{ + database::{Database, OptFilters}, + history::History, + settings::{SearchMode, Settings}, +}; +use atuin_daemon::client::SearchClient; +use eyre::Result; +use nucleo_matcher::{ + Config, Matcher, Utf32Str, + pattern::{CaseMatching, Normalization, Pattern}, +}; +use tracing::{Level, debug, instrument, span}; +use uuid::Uuid; + +use super::{SearchEngine, SearchState}; + +pub struct Search { + client: Option, + query_id: u64, + socket_path: String, + #[cfg(not(unix))] + tcp_port: u64, +} + +impl Search { + pub fn new(settings: &Settings) -> Self { + Search { + client: None, + query_id: 0, + socket_path: settings.daemon.socket_path.clone(), + #[cfg(not(unix))] + tcp_port: settings.daemon.tcp_port, + } + } + + #[instrument(skip_all, level = Level::TRACE, name = "get_daemon_client")] + async fn get_client(&mut self) -> Result<&mut SearchClient> { + if self.client.is_none() { + #[cfg(unix)] + let client = SearchClient::new(self.socket_path.clone()).await?; + + #[cfg(not(unix))] + let client = SearchClient::new(self.tcp_port).await?; + + self.client = Some(client); + } + Ok(self.client.as_mut().unwrap()) + } + + fn next_query_id(&mut self) -> u64 { + self.query_id += 1; + self.query_id + } + + /// Check if query contains regex pattern (r/.../) + /// Nucleo doesn't support regex, so we fall back to database search + fn contains_regex_pattern(query: &str) -> bool { + query.starts_with("r/") || query.contains(" r/") + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_db_fallback")] + async fn fallback_to_db_search( + &self, + state: &SearchState, + db: &dyn Database, + ) -> Result> { + let results = db + .search( + SearchMode::FullText, + state.filter_mode, + &state.context, + state.input.as_str(), + OptFilters { + limit: Some(200), + ..Default::default() + }, + ) + .await + .map_or(Vec::new(), |r| r.into_iter().collect()); + Ok(results) + } + + #[instrument(skip_all, level = Level::TRACE, name = "hydrate_from_db", fields(count = ids.len()))] + async fn hydrate_from_db(&self, db: &dyn Database, ids: &[String]) -> Result> { + let placeholders: Vec = ids.iter().map(|id| format!("'{id}'")).collect(); + let sql_query = format!( + "SELECT * FROM history WHERE id IN ({}) ORDER BY timestamp DESC", + placeholders.join(",") + ); + Ok(db.query_history(&sql_query).await?) + } +} + +#[async_trait] +impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "daemon_search", fields(query = %state.input.as_str()))] + async fn full_query( + &mut self, + state: &SearchState, + db: &mut dyn Database, + ) -> Result> { + let query = state.input.as_str().to_string(); + + // Fall back to database for regex queries (Nucleo doesn't support regex) + if Self::contains_regex_pattern(&query) { + debug!(query = %query, "[daemon-client] regex detected, falling back to db"); + return self.fallback_to_db_search(state, db).await; + } + + let query_id = self.next_query_id(); + + let span = + span!(Level::TRACE, "daemon_search.req_resp", query = %query, query_id = query_id); + + let client = self.get_client().await?; + + let _span = span.enter(); + let mut stream = client + .search( + query.clone(), + query_id, + state.filter_mode, + Some(state.context.clone()), + ) + .await?; + + let mut ids = Vec::with_capacity(200); + span!(Level::TRACE, "daemon_search.resp") + .in_scope(async || { + while let Ok(Some(response)) = stream.message().await { + let span2 = span!( + Level::TRACE, + "daemon_search.resp.item", + query_id = response.query_id + ); + let _span2 = span2.enter(); + // Only process if the query_id matches (prevents stale responses) + if response.query_id == query_id { + let uuids = response + .ids + .iter() + .map(|id| { + let bytes: [u8; 16] = + id.as_slice().try_into().expect("id should be 16 bytes"); + Uuid::from_bytes(bytes).as_simple().to_string() + }) + .collect::>(); + ids.extend(uuids); + } + drop(_span2); + drop(span2); + } + }) + .await; + drop(_span); + drop(span); + + if ids.is_empty() { + debug!(query = %query, results = 0, "[daemon-client] empty results"); + return Ok(Vec::new()); + } + + // // Hydrate from local database + let results = self.hydrate_from_db(db, &ids).await?; + + // // Reorder results to match the order from the daemon (which is ranked by relevance) + let ordered_results = span!(Level::TRACE, "reorder_results").in_scope(|| { + let mut ordered_results = Vec::with_capacity(results.len()); + for id in &ids { + if let Some(history) = results.iter().find(|h| h.id.0 == *id) { + ordered_results.push(history.clone()); + } + } + ordered_results + }); + + debug!( + query = %query, + results = results.len(), + "[daemon-client]" + ); + + Ok(ordered_results) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_highlight")] + fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { + // Use fulltext highlighting for regex queries + if Self::contains_regex_pattern(search_input) { + return super::db::get_highlight_indices_fulltext(command, search_input); + } + + let mut matcher = Matcher::new(Config::DEFAULT); + let pattern = Pattern::parse(search_input, CaseMatching::Smart, Normalization::Smart); + + let mut indices: Vec = Vec::new(); + let mut haystack_buf = Vec::new(); + + let haystack = Utf32Str::new(command, &mut haystack_buf); + pattern.indices(haystack, &mut matcher, &mut indices); + + // Convert u32 indices to usize + indices.into_iter().map(|i| i as usize).collect() + } +} diff --git a/crates/atuin/src/command/client/search/engines/db.rs b/crates/atuin/src/command/client/search/engines/db.rs index f0ed424e..476462f5 100644 --- a/crates/atuin/src/command/client/search/engines/db.rs +++ b/crates/atuin/src/command/client/search/engines/db.rs @@ -11,17 +11,19 @@ use eyre::Result; use norm::Metric; use norm::fzf::{FzfParser, FzfV2}; use std::ops::Range; +use tracing::{Level, instrument}; pub struct Search(pub SearchMode); #[async_trait] impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "db_search", fields(mode = ?self.0, query = %state.input.as_str()))] async fn full_query( &mut self, state: &SearchState, db: &mut dyn Database, ) -> Result> { - Ok(db + let results = db .search( self.0, state.filter_mode, @@ -34,9 +36,11 @@ impl SearchEngine for Search { ) .await // ignore errors as it may be caused by incomplete regex - .map_or(Vec::new(), |r| r.into_iter().collect())) + .map_or(Vec::new(), |r| r.into_iter().collect()); + Ok(results) } + #[instrument(skip_all, level = Level::TRACE, name = "db_highlight")] fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { if self.0 == SearchMode::Prefix { return vec![]; @@ -54,7 +58,8 @@ impl SearchEngine for Search { } } -fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec { +#[instrument(skip_all, level = Level::TRACE, name = "db_highlight_fulltext")] +pub fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec { let mut ranges = vec![]; let lower_command = command.to_ascii_lowercase(); diff --git a/crates/atuin/src/command/client/search/engines/skim.rs b/crates/atuin/src/command/client/search/engines/skim.rs index cb7ce24f..7d9feb40 100644 --- a/crates/atuin/src/command/client/search/engines/skim.rs +++ b/crates/atuin/src/command/client/search/engines/skim.rs @@ -7,6 +7,7 @@ use fuzzy_matcher::{FuzzyMatcher, skim::SkimMatcherV2}; use itertools::Itertools; use time::OffsetDateTime; use tokio::task::yield_now; +use tracing::{Level, instrument, warn}; use uuid; use super::{SearchEngine, SearchState}; @@ -27,18 +28,20 @@ impl Search { #[async_trait] impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "skim_search", fields(query = %state.input.as_str()))] async fn full_query( &mut self, state: &SearchState, db: &mut dyn Database, ) -> Result> { if self.all_history.is_empty() { - self.all_history = db.all_with_count().await.unwrap(); + self.all_history = load_all_history(db).await; } Ok(fuzzy_search(&self.engine, state, &self.all_history).await) } + #[instrument(skip_all, level = Level::TRACE, name = "skim_highlight")] fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { let (_, indices) = self .engine @@ -48,7 +51,13 @@ impl SearchEngine for Search { } } +#[instrument(skip_all, level = Level::TRACE, name = "load_all_history")] +async fn load_all_history(db: &dyn Database) -> Vec<(History, i32)> { + db.all_with_count().await.unwrap() +} + #[allow(clippy::too_many_lines)] +#[instrument(skip_all, level = Level::TRACE, name = "fuzzy_match", fields(history_count = all_history.len()))] async fn fuzzy_search( engine: &SkimMatcherV2, state: &SearchState, @@ -97,11 +106,11 @@ async fn fuzzy_search( if !is_current_session { let Ok(uuid) = uuid::Uuid::parse_str(&context.session) else { - log::warn!("failed to parse session id '{}'", context.session); + warn!("failed to parse session id '{}'", context.session); continue; }; let Some(timestamp) = uuid.get_timestamp() else { - log::warn!( + warn!( "failed to get timestamp from uuid '{}'", uuid.as_hyphenated() ); @@ -111,7 +120,7 @@ async fn fuzzy_search( let Ok(session_start) = time::OffsetDateTime::from_unix_timestamp_nanos( i128::from(seconds) * 1_000_000_000 + i128::from(nanos), ) else { - log::warn!( + warn!( "failed to create OffsetDateTime from second: {seconds}, nanosecond: {nanos}" ); continue; diff --git a/crates/atuin/src/command/client/search/interactive.rs b/crates/atuin/src/command/client/search/interactive.rs index c6a6064a..729c80ce 100644 --- a/crates/atuin/src/command/client/search/interactive.rs +++ b/crates/atuin/src/command/client/search/interactive.rs @@ -657,7 +657,7 @@ impl State { Action::CycleSearchMode => { self.switched_search_mode = true; self.search_mode = self.search_mode.next(settings); - self.engine = engines::engine(self.search_mode); + self.engine = engines::engine(self.search_mode, settings); InputAction::Continue } Action::SwitchContext => { @@ -1419,7 +1419,7 @@ pub async fn history( context: initial_context.clone(), custom_context: None, }, - engine: engines::engine(search_mode), + engine: engines::engine(search_mode, settings), results_len: 0, accept: false, keymap_mode: match settings.keymap_mode { @@ -1875,7 +1875,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -1930,7 +1930,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2049,7 +2049,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2108,7 +2108,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2163,7 +2163,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2214,7 +2214,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2274,7 +2274,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2335,7 +2335,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; state.results_state.select(selected); @@ -2714,7 +2714,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; diff --git a/crates/atuin/src/command/client/store/rebuild.rs b/crates/atuin/src/command/client/store/rebuild.rs index 8acec531..a98f8142 100644 --- a/crates/atuin/src/command/client/store/rebuild.rs +++ b/crates/atuin/src/command/client/store/rebuild.rs @@ -3,6 +3,9 @@ use atuin_scripts::store::ScriptStore; use clap::Args; use eyre::{Result, bail}; +#[cfg(feature = "daemon")] +use atuin_daemon::emit_event; + use atuin_client::{ database::Database, encryption, history::store::HistoryStore, record::sqlite_store::SqliteStore, settings::Settings, @@ -57,6 +60,9 @@ impl Rebuild { history_store.build(database).await?; + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryRebuilt).await; + Ok(()) } diff --git a/docs/docs/configuration/config.md b/docs/docs/configuration/config.md index 2bc1a682..693a528d 100644 --- a/docs/docs/configuration/config.md +++ b/docs/docs/configuration/config.md @@ -765,6 +765,63 @@ The port to use for client -> daemon communication. Only used on non-unix system tcp_port = 8889 ``` +## logs + +Atuin version: >= 18.13 + +Behavior of log files. + +### enabled + +Default: `true` + +Whether or not to enable file-based logging. + +### dir + +Default: `"~/.atuin/logs"` + +The directory in which to store log files. + +### level + +Default: `"info"` + +The logging level to use. Valid values are `"trace"`, `"debug"`, `"info"`, `"warn"`, and `"error"`, in order of highest-to-lowest verbosity. + +### retention + +Default: `4` + +How many days of log files to keep (per file type). Files older than this will be removed. + +### ai + +A sub-object with specific options for AI logging: + +* `enabled` - whether to output AI logs; defaults to `logs.enabled` +* `file` - the filename to use for the AI logs; defaults to `"ai.log"`. Can be absolute, or relative to `logs.dir`. +* `level` - override the log level for the AI logs; defaults to `logs.level` +* `retention` - how many days to store AI logs; defaults to `logs.retention` + +### daemon + +A sub-object with specific options for daemon logging: + +* `enabled` - whether to output daemon logs; defaults to `logs.enabled` +* `file` - the filename to use for the daemon logs; defaults to `"daemon.log"`. Can be absolute, or relative to `logs.dir`. +* `level` - override the log level for the daemon logs; defaults to `logs.level` +* `retention` - how many days to store daemon logs; defaults to `logs.retention` + +### search + +A sub-object with specific options for search logging: + +* `enabled` - whether to output search logs; defaults to `logs.enabled` +* `file` - the filename to use for the search logs; defaults to `"search.log"`. Can be absolute, or relative to `logs.dir`. +* `level` - override the log level for the search logs; defaults to `logs.level` +* `retention` - how many days to store search logs; defaults to `logs.retention` + ## theme Atuin version: >= 18.4 diff --git a/scripts/span-table.ts b/scripts/span-table.ts new file mode 100755 index 00000000..3656f129 --- /dev/null +++ b/scripts/span-table.ts @@ -0,0 +1,420 @@ +#!/usr/bin/env bun +/** + * Analyze span timing JSON logs generated with ATUIN_SPAN + * + * Usage: bun scripts/span-table.ts [options] + * --filter Only show spans matching pattern (regex) + * --sort Sort by: calls, avg, total, p99 (default: total) + * --top Show top N spans (default: 20) + * --detail Show individual calls for a specific span + * --all Include internal/library spans + */ + +import { readFileSync } from "fs"; + +interface SpanEvent { + timestamp: string; + level: string; + fields: { + message: string; + "time.busy"?: string; + "time.idle"?: string; + }; + target: string; + span?: { + name: string; + [key: string]: unknown; + }; + spans?: Array<{ name: string; [key: string]: unknown }>; +} + +interface SpanStats { + name: string; + calls: number; + busyTimes: number[]; // in microseconds + idleTimes: number[]; + parentCounts: Map; // parent span name -> count +} + +// Parse duration strings like "1.23ms", "456µs", "789ns" to microseconds +function parseDuration(duration: string): number { + const match = duration.match(/^([\d.]+)(ns|µs|us|ms|s)$/); + if (!match) return 0; + + const value = parseFloat(match[1]); + const unit = match[2]; + + switch (unit) { + case "ns": + return value / 1000; + case "µs": + case "us": + return value; + case "ms": + return value * 1000; + case "s": + return value * 1_000_000; + default: + return 0; + } +} + +// Format microseconds for display +function formatDuration(us: number): string { + if (us < 1) { + return `${(us * 1000).toFixed(0)}ns`; + } else if (us < 1000) { + return `${us.toFixed(2)}µs`; + } else if (us < 1_000_000) { + return `${(us / 1000).toFixed(2)}ms`; + } else { + return `${(us / 1_000_000).toFixed(2)}s`; + } +} + +function percentile(arr: number[], p: number): number { + if (arr.length === 0) return 0; + const sorted = [...arr].sort((a, b) => a - b); + const idx = Math.floor(sorted.length * p); + return sorted[Math.min(idx, sorted.length - 1)]; +} + +function parseJsonLines(content: string): SpanEvent[] { + const events: SpanEvent[] = []; + for (const line of content.trim().split("\n")) { + if (!line.trim()) continue; + try { + events.push(JSON.parse(line)); + } catch { + // Skip malformed lines + } + } + return events; +} + +function main() { + const args = process.argv.slice(2); + + // Parse arguments + let filterPattern: RegExp | null = null; + let sortField = "total"; + let topN = 20; + let detailSpan: string | null = null; + let showAll = false; + const files: string[] = []; + + for (let i = 0; i < args.length; i++) { + if (args[i] === "--filter" && args[i + 1]) { + filterPattern = new RegExp(args[++i]); + } else if (args[i] === "--sort" && args[i + 1]) { + sortField = args[++i]; + } else if (args[i] === "--top" && args[i + 1]) { + topN = parseInt(args[++i], 10); + } else if (args[i] === "--detail" && args[i + 1]) { + detailSpan = args[++i]; + } else if (args[i] === "--all") { + showAll = true; + } else if (!args[i].startsWith("-")) { + files.push(args[i]); + } + } + + if (files.length === 0) { + console.error("Usage: bun scripts/span-table.ts [options]"); + console.error(" --filter Only show spans matching pattern (regex)"); + console.error(" --sort Sort by: calls, avg, total, p99 (default: total)"); + console.error(" --top Show top N spans (default: 20)"); + console.error(" --detail Show individual calls for a specific span"); + console.error(" --all Include internal/library spans"); + process.exit(1); + } + + // Parse all files + const allEvents: SpanEvent[] = []; + for (const file of files) { + const content = readFileSync(file, "utf-8"); + for (const event of parseJsonLines(content)) { + allEvents.push(event); + } + } + + // Filter to close events and aggregate by span name + const spans = new Map(); + + for (const event of allEvents) { + if (event.fields?.message !== "close") continue; + if (!event.span?.name) continue; + if (!event.fields["time.busy"]) continue; + + const name = event.span.name; + + // Apply filter if specified + if (filterPattern && !filterPattern.test(name)) continue; + + // Skip noisy internal spans unless explicitly requested + if ( + !showAll && + !filterPattern && + !detailSpan && + (name.startsWith("FramedRead::") || + name.startsWith("FramedWrite::") || + name.startsWith("Prioritize::") || + name === "poll" || + name === "poll_ready" || + name === "Connection" || + name.startsWith("assign_") || + name.startsWith("reserve_") || + name.startsWith("try_") || + name.startsWith("send_") || + name.startsWith("pop_")) + ) { + continue; + } + + if (!spans.has(name)) { + spans.set(name, { name, calls: 0, busyTimes: [], idleTimes: [], parentCounts: new Map() }); + } + + const stats = spans.get(name)!; + stats.calls++; + stats.busyTimes.push(parseDuration(event.fields["time.busy"])); + if (event.fields["time.idle"]) { + stats.idleTimes.push(parseDuration(event.fields["time.idle"])); + } + + // Track parent relationship (immediate parent is the last element in spans array) + const parents = event.spans || []; + const parentName = parents.length > 0 ? parents[parents.length - 1].name : "__root__"; + stats.parentCounts.set(parentName, (stats.parentCounts.get(parentName) || 0) + 1); + } + + if (spans.size === 0) { + console.error("No matching span close events found"); + process.exit(1); + } + + // Detail mode: show individual calls for a specific span + if (detailSpan) { + const detailEvents: Array<{ + timestamp: string; + busy: number; + idle: number; + fields: Record; + parents: string[]; + }> = []; + + for (const event of allEvents) { + if (event.fields?.message !== "close") continue; + if (event.span?.name !== detailSpan) continue; + if (!event.fields["time.busy"]) continue; + + // Extract span fields (excluding name) + const fields: Record = {}; + if (event.span) { + for (const [k, v] of Object.entries(event.span)) { + if (k !== "name") fields[k] = v; + } + } + + // Get parent span names + const parents = (event.spans || []).map((s) => s.name); + + detailEvents.push({ + timestamp: event.timestamp, + busy: parseDuration(event.fields["time.busy"]), + idle: event.fields["time.idle"] ? parseDuration(event.fields["time.idle"]) : 0, + fields, + parents, + }); + } + + if (detailEvents.length === 0) { + console.error(`No events found for span "${detailSpan}"`); + process.exit(1); + } + + console.log(""); + console.log(`Individual calls for: ${detailSpan}`); + console.log("-".repeat(110)); + console.log( + "#".padStart(4) + + "Wall".padStart(12) + + "Busy".padStart(12) + + "Idle".padStart(12) + + " Fields" + ); + console.log("-".repeat(110)); + + detailEvents.forEach((e, i) => { + const fieldsStr = Object.keys(e.fields).length > 0 + ? JSON.stringify(e.fields) + : ""; + + console.log( + (i + 1).toString().padStart(4) + + formatDuration(e.busy + e.idle).padStart(12) + + formatDuration(e.busy).padStart(12) + + formatDuration(e.idle).padStart(12) + + " " + + fieldsStr + ); + }); + + // Summary stats + const busyTimes = detailEvents.map((e) => e.busy); + const wallTimes = detailEvents.map((e) => e.busy + e.idle); + console.log(""); + console.log( + `Summary: ${detailEvents.length} calls\n` + + ` Wall: avg=${formatDuration(wallTimes.reduce((a, b) => a + b, 0) / wallTimes.length)}, ` + + `min=${formatDuration(Math.min(...wallTimes))}, ` + + `max=${formatDuration(Math.max(...wallTimes))}, ` + + `p50=${formatDuration(percentile(wallTimes, 0.5))}, ` + + `p99=${formatDuration(percentile(wallTimes, 0.99))}\n` + + ` Busy: avg=${formatDuration(busyTimes.reduce((a, b) => a + b, 0) / busyTimes.length)}, ` + + `min=${formatDuration(Math.min(...busyTimes))}, ` + + `max=${formatDuration(Math.max(...busyTimes))}, ` + + `p50=${formatDuration(percentile(busyTimes, 0.5))}, ` + + `p99=${formatDuration(percentile(busyTimes, 0.99))}` + ); + return; + } + + // Calculate stats + const results = [...spans.values()].map((s) => { + // Calculate wall times (busy + idle) for each call + const wallTimes = s.busyTimes.map((busy, i) => busy + (s.idleTimes[i] || 0)); + + // Find most common parent + let mostCommonParent = "__root__"; + let maxCount = 0; + for (const [parent, count] of s.parentCounts) { + if (count > maxCount) { + maxCount = count; + mostCommonParent = parent; + } + } + + return { + name: s.name, + calls: s.calls, + total: s.busyTimes.reduce((a, b) => a + b, 0), + avg: s.busyTimes.reduce((a, b) => a + b, 0) / s.calls, + min: Math.min(...s.busyTimes), + max: Math.max(...s.busyTimes), + p50: percentile(s.busyTimes, 0.5), + p99: percentile(s.busyTimes, 0.99), + avgWall: wallTimes.reduce((a, b) => a + b, 0) / s.calls, + p50Wall: percentile(wallTimes, 0.5), + p99Wall: percentile(wallTimes, 0.99), + parent: mostCommonParent, + }; + }); + + // Build tree structure + const childrenOf = new Map(); + childrenOf.set("__root__", []); + for (const r of results) { + if (!childrenOf.has(r.name)) { + childrenOf.set(r.name, []); + } + if (!childrenOf.has(r.parent)) { + childrenOf.set(r.parent, []); + } + childrenOf.get(r.parent)!.push(r.name); + } + + // Sort children by the specified field + const resultMap = new Map(results.map(r => [r.name, r])); + const sortChildren = (children: string[]) => { + children.sort((a, b) => { + const ra = resultMap.get(a); + const rb = resultMap.get(b); + if (!ra || !rb) return 0; + switch (sortField) { + case "calls": + return rb.calls - ra.calls; + case "avg": + return rb.avg - ra.avg; + case "p99": + return rb.p99 - ra.p99; + case "total": + default: + return rb.total - ra.total; + } + }); + }; + + // Traverse tree to build ordered display list with depths + const displayResults: Array<{ result: typeof results[0]; depth: number }> = []; + const visited = new Set(); + + function traverse(name: string, depth: number) { + if (visited.has(name)) return; + visited.add(name); + + const result = resultMap.get(name); + if (result) { + displayResults.push({ result, depth }); + } + + const children = childrenOf.get(name) || []; + sortChildren(children); + for (const child of children) { + traverse(child, depth + 1); + } + } + + // Start from roots + const roots = childrenOf.get("__root__") || []; + sortChildren(roots); + for (const root of roots) { + traverse(root, 0); + } + + // Add any orphaned spans (whose parent wasn't in our span list) + for (const r of results) { + if (!visited.has(r.name)) { + displayResults.push({ result: r, depth: 0 }); + } + } + + // Apply topN limit + const limitedResults = displayResults.slice(0, topN); + + console.log(""); + console.log( + "Span Name".padEnd(40) + + "Calls".padStart(6) + + "Avg(wall)".padStart(11) + + "P50(wall)".padStart(11) + + "P99(wall)".padStart(11) + + "Avg(busy)".padStart(11) + + "P50(busy)".padStart(11) + + "P99(busy)".padStart(11) + ); + console.log("-".repeat(112)); + + for (const { result: r, depth } of limitedResults) { + const indent = " ".repeat(depth); + const maxNameLen = 38 - indent.length; + const truncatedName = r.name.length > maxNameLen ? "..." + r.name.slice(-(maxNameLen - 3)) : r.name; + const displayName = indent + truncatedName; + + console.log( + displayName.padEnd(40) + + r.calls.toString().padStart(6) + + formatDuration(r.avgWall).padStart(11) + + formatDuration(r.p50Wall).padStart(11) + + formatDuration(r.p99Wall).padStart(11) + + formatDuration(r.avg).padStart(11) + + formatDuration(r.p50).padStart(11) + + formatDuration(r.p99).padStart(11) + ); + } + + console.log(""); + console.log(`Showing ${limitedResults.length} of ${results.length} spans (sorted by ${sortField})`); +} + +main(); -- cgit v1.3.1