From 3ba47446f06d5b0fbeaeb59d4ffed768b70729d8 Mon Sep 17 00:00:00 2001 From: Michelle Tilley Date: Thu, 26 Feb 2026 14:42:08 -0800 Subject: feat: In-memory search index with atuin daemon (#3201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary This PR adds a persistent, in-memory search index to the Atuin daemon, enabling fast fuzzy search without the startup delay of building an index each time the TUI opens. ### Key Changes - **Daemon search service**: A new gRPC service that maintains a Nucleo fuzzy search index in memory - **Real-time index updates**: The daemon listens for history events (new commands, synced records) and updates the index immediately - **Filter mode support**: All existing filter modes work (Global, Host, Session, Directory, Workspace) - **New search engine**: `daemon-fuzzy` search mode that queries the daemon instead of building a local index - **Paged history loading**: Database pagination support for efficient initial index loading - **Configurable logging**: New `[logs]` settings section for daemon and search log configuration - **Component-based daemon architecture**: Refactored daemon internals into a modular, event-driven system - **Fallback to DB search for regex**: Since Nucleo doesn't support regex matching ## Daemon Architecture The daemon has been refactored to use a component-based, event-driven architecture that makes it easier to add new functionality and reason about the system. ### Core Concepts ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────┐ ┌──────────────────────────────────────────────────┐ │ │ │ Daemon │ │ Components │ │ │ │ Handle │────▶│ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ │ │ │ • emit() │ │ │ History │ │ Search │ │ Sync │ │ │ │ │ • subscribe │ │ │ Component │ │ Component │ │ Component │ │ │ │ │ • settings │ │ │ │ │ │ │ │ │ │ │ │ • databases │ │ │ gRPC service│ │ gRPC service│ │ background │ │ │ │ └─────────────┘ │ │ WIP history │ │ Nucleo index│ │ sync │ │ │ │ │ │ └─────────────┘ └─────────────┘ └────────────┘ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ ▲ │ │ ▼ │ │ │ ┌─────────────────────────────────────┴────────────────────────────────┐ │ │ │ Event Bus (broadcast) │ │ │ │ │ │ │ │ HistoryStarted │ HistoryEnded │ RecordsAdded │ SyncCompleted │ ... │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ ┌───────────────────────────────────┴──────────────────────────────────┐ │ │ │ Control Service (gRPC) │ │ │ │ External event injection from CLI commands │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### DaemonHandle A lightweight, cloneable handle that provides access to shared daemon resources: - **Event emission**: `handle.emit(DaemonEvent::...)` broadcasts to all components - **Event subscription**: `handle.subscribe()` returns a receiver for the event bus - **Settings**: `handle.settings()` for configuration access - **Databases**: `handle.history_db()` and `handle.store()` for data access ### Component Trait Components implement a simple lifecycle: ```rust #[async_trait] trait Component: Send + Sync { fn name(&self) -> &'static str; async fn start(&mut self, handle: DaemonHandle) -> Result<()>; async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; async fn stop(&mut self) -> Result<()>; } ``` ### Event-Driven Design Components communicate via events rather than direct coupling: | Event | Emitted By | Consumed By | |-------|-----------|-------------| | `HistoryStarted` | History gRPC | Search (logging) | | `HistoryEnded` | History gRPC | Search (index update) | | `RecordsAdded` | Sync | Search (index update) | | `HistoryPruned` | CLI (via Control) | Search (index rebuild) | | `HistoryDeleted` | CLI (via Control) | Search (index rebuild) | | `ForceSync` | CLI (via Control) | Sync | | `ShutdownRequested` | Signal handler | All (graceful shutdown) | ### External Event Injection CLI commands can inject events into a running daemon: ```rust // After `atuin history prune` emit_event(DaemonEvent::HistoryPruned).await?; // After deleting specific items emit_event(DaemonEvent::HistoryDeleted { ids }).await?; // Request immediate sync emit_event(DaemonEvent::ForceSync).await?; ``` This ensures the daemon's search index stays in sync with database changes made by CLI commands. ## Search Architecture The search service uses a [forked version of Nucleo](https://github.com/atuinsh/nucleo-ext) that adds filter and scorer callbacks, enabling efficient filtering and frecency-based ranking. ``` ┌────────────────────────────────────────────────────────────────┐ │ Atuin Daemon │ │ │ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │ │ Event System │───▶│ Search Component │ │ │ │ │ │ │ │ │ │ • RecordsAdded │ │ ┌────────────────────────────┐ │ │ │ │ • HistoryEnded │ │ │ Deduplicated Index │ │ │ │ │ • HistoryPruned │ │ │ │ │ │ │ └─────────────────┘ │ │ CommandData per command: │ │ │ │ │ │ • Global frecency │ │ │ │ ┌─────────────────┐ │ │ • Filter indexes (sets) │ │ │ │ │ Background Task │ │ │ • Invocation history │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ Rebuilds │ │ │ │ │ │ │ frecency map │ │ ▼ │ │ │ │ every 60s │───▶│ ┌────────────────────────────┐ │ │ │ └─────────────────┘ │ │ Nucleo (forked) │ │ │ │ │ │ │ │ │ │ │ │ • Filter callback │ │ │ │ │ │ • Scorer callback │ │ │ │ │ │ • Fuzzy matching │ │ │ │ │ └────────────────────────────┘ │ │ │ └──────────────────────────────────┘ │ │ │ │ │ │ gRPC (Unix socket) │ └──────────────────────────────────────│─────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Search TUI (Client) │ │ │ │ 1. Send query + filter mode + context to daemon │ │ 2. Receive matching history IDs (ranked by frecency) │ │ 3. Hydrate full records from local SQLite database │ │ 4. Display results in TUI │ └─────────────────────────────────────────────────────────────────┘ ``` ### Nucleo Fork The [nucleo-ext fork](https://github.com/atuinsh/nucleo-ext) adds two key features to Nucleo: 1. **Filter callback**: Pre-filter items before fuzzy matching (used for directory/host/session filtering) 2. **Scorer callback**: Compute custom scores after matching (used for frecency ranking) ```rust // Filter: only include commands run in current directory nucleo.set_filter(Some(Arc::new(|cmd: &String| { passing_commands.contains(cmd) }))); // Scorer: combine fuzzy score with frecency nucleo.set_scorer(Some(Arc::new(|cmd: &String, fuzzy_score: u32| { let frecency = frecency_map.get(cmd).unwrap_or(0); fuzzy_score + (frecency * 10) }))); ``` ### Deduplicated Index Commands are stored once per unique command text, with metadata tracking all invocations: ```rust struct CommandData { command: String, invocations: Vec, // All times this command was run global_frecency: FrecencyData, // Precomputed frecency score // O(1) filter indexes directories: HashSet, // All cwds where command was run hosts: HashSet, // All hostnames sessions: HashSet, // All session IDs } ``` This deduplication means: - **Fewer items to match**: ~13K unique commands vs ~62K history entries - **O(1) filter checks**: HashSet lookups instead of scanning invocations - **Single frecency score**: Global frecency computed once, used for all filter modes ### Frecency Scoring Frecency (frequency + recency) scoring prioritizes recently and frequently used commands: ```rust fn compute_frecency(count: u32, last_used: i64, now: i64) -> u32 { let age_hours = (now - last_used) / 3600; // Recency: decays over time (half-life ~24 hours) let recency = (100.0 * (-age_hours as f64 / 24.0).exp()) as u32; // Frequency: logarithmic scaling let frequency = (count.ln() * 20.0).min(100.0) as u32; recency + frequency } ``` The frecency map is: - **Precomputed by background task** every 60 seconds - **Never computed inline** during search (no latency impact) - **Graceful fallback**: If unavailable, search works without frecency ranking ### Filter Mode Implementation | Filter Mode | Implementation | |-------------|----------------| | Global | No filter (all commands) | | Directory | `command.directories.contains(cwd)` | | Workspace | `command.directories.any(\|d\| d.starts_with(git_root))` | | Host | `command.hosts.contains(hostname)` | | Session | `command.sessions.contains(session_id)` | Filters are pre-computed into a HashSet before the search, making the filter callback O(1). ### Search Flow 1. **Daemon startup**: Loads history from SQLite in pages, builds deduplicated index 2. **Frecency precompute**: Background task builds frecency map after history loads 3. **Search request**: Client sends query with filter mode and context 4. **Filter**: Pre-computed HashSet determines which commands pass the filter 5. **Match**: Nucleo fuzzy matches the query against command text 6. **Score**: Frecency scorer ranks results (fuzzy score + frecency * 10) 7. **Response**: Returns history IDs for the most recent invocation of each matching command 8. **Hydration**: Client fetches full records from local SQLite ### Configuration ```toml # Enable daemon + autostart [daemon] enabled = true autostart = true # Enable daemon-based fuzzy search [search] search_mode = "daemon-fuzzy" ``` ## Performance Performance varies based on several factors, but in most initial testing with the new architecture shows improvement: * **Nucleo performs searches up to 4.5x faster**: direct DB search averages 18.07ms, but the daemon completes the same queries in 3.99ms. * **IPC overhead is significant, but acceptable**: a significant amount of wall-time is taken up by the transfer of data over IPC (via UDS in this case). This averages to about ~7.8ms and accounts for 66% of client-side wall time. * **Tail latency improves at every layer**: p99 times correspond to initial requests, worst-case query patterns, etc. but the average p99 daemon-based response time is 3.6x better than the associated DB-based search p99 time * **Query complexity no longer impacts performance**: the Nucleo-based search shows consistent 2-7ms times regardless of query pattern. The DB-based search had a 17x variance (3.59ms to 62.46ms). Interestingly, @ellie - who has a larger history store than I do - gets even better performance on the IPC layer. This could use a lot more testing in various edge cases and on various hardware, but seems promising. ### Regular DB search ``` Individual calls for: db_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 32.25ms 32.20ms 47.70µs {"mode":"Fuzzy","query":"^"} 2 19.48ms 19.40ms 84.20µs {"mode":"Fuzzy","query":"^c"} 3 20.40ms 20.10ms 297.00µs {"mode":"Fuzzy","query":"^ca"} 4 13.07ms 13.00ms 69.90µs {"mode":"Fuzzy","query":"^car"} 5 12.17ms 12.10ms 67.10µs {"mode":"Fuzzy","query":"^carg"} 6 20.78ms 20.70ms 76.60µs {"mode":"Fuzzy","query":"^cargo"} 7 9.15ms 9.10ms 53.20µs {"mode":"Fuzzy","query":"^cargo "} 8 10.24ms 10.00ms 237.00µs {"mode":"Fuzzy","query":"^cargo b"} 9 10.01ms 9.68ms 325.00µs {"mode":"Fuzzy","query":"^cargo bu"} 10 5.89ms 5.83ms 57.20µs {"mode":"Fuzzy","query":"^cargo bui"} 11 8.85ms 8.28ms 568.00µs {"mode":"Fuzzy","query":"^cargo buil"} 12 7.70ms 7.49ms 212.00µs {"mode":"Fuzzy","query":"^cargo build"} 13 3.59ms 3.53ms 57.00µs {"mode":"Fuzzy","query":"^cargo build$"} 14 6.50ms 6.44ms 63.60µs {"mode":"Fuzzy","query":"^cargo "} 15 6.48ms 6.38ms 100.00µs {"mode":"Fuzzy","query":"!"} 16 31.68ms 31.60ms 75.90µs {"mode":"Fuzzy","query":"!g"} 17 62.46ms 62.40ms 58.90µs {"mode":"Fuzzy","query":"!gi"} 18 30.35ms 30.30ms 46.90µs {"mode":"Fuzzy","query":"!git"} 19 53.84ms 53.80ms 40.80µs {"mode":"Fuzzy","query":"!git "} 20 19.24ms 19.20ms 39.70µs {"mode":"Fuzzy","query":"!git c"} 21 22.03ms 22.00ms 34.70µs {"mode":"Fuzzy","query":"!git co"} 22 17.13ms 17.00ms 133.00µs {"mode":"Fuzzy","query":"!git com"} 23 16.14ms 15.90ms 242.00µs {"mode":"Fuzzy","query":"!git comm"} 24 5.11ms 5.08ms 28.60µs {"mode":"Fuzzy","query":"!git commi"} 25 7.31ms 7.26ms 52.70µs {"mode":"Fuzzy","query":"!git commit"} Summary: 25 calls Wall: avg=18.07ms, min=3.59ms, max=62.46ms, p50=13.07ms, p99=62.46ms Busy: avg=17.95ms, min=3.53ms, max=62.40ms, p50=13.00ms, p99=62.40ms ``` ### Daemon-based search **Client** ``` Individual calls for: daemon_search -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 13.05ms 2.55ms 10.50ms {"query":"^"} 2 10.65ms 1.40ms 9.25ms {"query":"^c"} 3 10.72ms 1.18ms 9.54ms {"query":"^ca"} 4 5.54ms 485.00µs 5.06ms {"query":"^car"} 5 15.02ms 1.02ms 14.00ms {"query":"^carg"} 6 9.49ms 840.00µs 8.65ms {"query":"^cargo"} 7 5.53ms 555.00µs 4.97ms {"query":"^cargo "} 8 8.56ms 717.00µs 7.84ms {"query":"^cargo b"} 9 12.34ms 1.24ms 11.10ms {"query":"^cargo bu"} 10 8.38ms 650.00µs 7.73ms {"query":"^cargo bui"} 11 13.07ms 770.00µs 12.30ms {"query":"^cargo buil"} 12 17.11ms 709.00µs 16.40ms {"query":"^cargo build"} 13 15.41ms 907.00µs 14.50ms {"query":"^cargo build$"} 14 8.19ms 665.00µs 7.52ms {"query":"^cargo "} 15 7.98ms 1.72ms 6.26ms {"query":"!"} 16 13.56ms 856.00µs 12.70ms {"query":"!g"} 17 8.11ms 624.00µs 7.49ms {"query":"!gi"} 18 14.57ms 775.00µs 13.80ms {"query":"!git"} 19 14.18ms 779.00µs 13.40ms {"query":"!git "} 20 9.62ms 802.00µs 8.82ms {"query":"!git c"} 21 15.50ms 1.50ms 14.00ms {"query":"!git co"} 22 11.58ms 1.48ms 10.10ms {"query":"!git com"} 23 13.82ms 2.12ms 11.70ms {"query":"!git comm"} 24 17.48ms 2.18ms 15.30ms {"query":"!git commi"} 25 14.81ms 1.71ms 13.10ms {"query":"!git commit"} Summary: 25 calls Wall: avg=11.77ms, min=5.53ms, max=17.48ms, p50=12.34ms, p99=17.48ms Busy: avg=1.13ms, min=485.00µs, max=2.55ms, p50=856.00µs, p99=2.55ms ``` **Daemon** ``` Individual calls for: daemon_search_query -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.75ms 250ns 1.75ms {"query":"^","query_id":1} 2 4.58ms 125ns 4.58ms {"query":"^c","query_id":2} 3 4.39ms 250ns 4.39ms {"query":"^ca","query_id":3} 4 2.52ms 125ns 2.52ms {"query":"^car","query_id":4} 5 4.44ms 250ns 4.44ms {"query":"^carg","query_id":5} 6 3.66ms 167ns 3.66ms {"query":"^cargo","query_id":6} 7 2.38ms 84ns 2.38ms {"query":"^cargo ","query_id":7} 8 4.13ms 84ns 4.13ms {"query":"^cargo b","query_id":8} 9 4.40ms 167ns 4.40ms {"query":"^cargo bu","query_id":9} 10 3.87ms 125ns 3.87ms {"query":"^cargo bui","query_id":10} 11 4.36ms 84ns 4.36ms {"query":"^cargo buil","query_id":11} 12 3.96ms 333ns 3.96ms {"query":"^cargo build","query_id":12} 13 4.61ms 167ns 4.61ms {"query":"^cargo build$","query_id":13} 14 4.20ms 209ns 4.20ms {"query":"^cargo ","query_id":14} 15 238.17µs 167ns 238.00µs {"query":"!","query_id":15} 16 4.44ms 125ns 4.44ms {"query":"!g","query_id":16} 17 3.47ms 83ns 3.47ms {"query":"!gi","query_id":17} 18 4.57ms 125ns 4.57ms {"query":"!git","query_id":18} 19 7.15ms 167ns 7.15ms {"query":"!git ","query_id":19} 20 4.27ms 250ns 4.27ms {"query":"!git c","query_id":20} 21 5.19ms 292ns 5.19ms {"query":"!git co","query_id":21} 22 4.29ms 417ns 4.29ms {"query":"!git com","query_id":22} 23 4.08ms 125ns 4.08ms {"query":"!git comm","query_id":23} 24 4.50ms 167ns 4.50ms {"query":"!git commi","query_id":24} 25 4.35ms 208ns 4.35ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.99ms, min=238.17µs, max=7.15ms, p50=4.29ms, p99=7.15ms Busy: avg=182ns, min=83ns, max=417ns, p50=167ns, p99=417ns ``` **Nucleo matching time (in daemon)** ``` Individual calls for: nucleo_match -------------------------------------------------------------------------------------------------------------- # Wall Busy Idle Fields -------------------------------------------------------------------------------------------------------------- 1 1.73ms 125ns 1.73ms {"query":"^","query_id":1} 2 4.57ms 167ns 4.57ms {"query":"^c","query_id":2} 3 4.37ms 125ns 4.37ms {"query":"^ca","query_id":3} 4 2.51ms 84ns 2.51ms {"query":"^car","query_id":4} 5 4.43ms 125ns 4.43ms {"query":"^carg","query_id":5} 6 3.64ms 125ns 3.64ms {"query":"^cargo","query_id":6} 7 2.37ms 84ns 2.37ms {"query":"^cargo ","query_id":7} 8 4.11ms 125ns 4.11ms {"query":"^cargo b","query_id":8} 9 4.36ms 208ns 4.36ms {"query":"^cargo bu","query_id":9} 10 3.85ms 125ns 3.85ms {"query":"^cargo bui","query_id":10} 11 4.35ms 125ns 4.35ms {"query":"^cargo buil","query_id":11} 12 3.94ms 250ns 3.94ms {"query":"^cargo build","query_id":12} 13 4.59ms 125ns 4.59ms {"query":"^cargo build$","query_id":13} 14 4.18ms 84ns 4.18ms {"query":"^cargo ","query_id":14} 15 220.13µs 125ns 220.00µs {"query":"!","query_id":15} 16 4.43ms 125ns 4.43ms {"query":"!g","query_id":16} 17 3.45ms 125ns 3.45ms {"query":"!gi","query_id":17} 18 4.55ms 125ns 4.55ms {"query":"!git","query_id":18} 19 7.12ms 209ns 7.12ms {"query":"!git ","query_id":19} 20 4.25ms 166ns 4.25ms {"query":"!git c","query_id":20} 21 5.18ms 125ns 5.18ms {"query":"!git co","query_id":21} 22 4.27ms 125ns 4.27ms {"query":"!git com","query_id":22} 23 4.06ms 292ns 4.06ms {"query":"!git comm","query_id":23} 24 4.46ms 166ns 4.46ms {"query":"!git commi","query_id":24} 25 4.31ms 208ns 4.31ms {"query":"!git commit","query_id":25} Summary: 25 calls Wall: avg=3.97ms, min=220.13µs, max=7.12ms, p50=4.27ms, p99=7.12ms Busy: avg=147ns, min=84ns, max=292ns, p50=125ns, p99=292ns ``` --- crates/atuin-ai/src/commands.rs | 103 +++- crates/atuin-ai/src/commands/inline.rs | 40 +- crates/atuin-client/src/database.rs | 192 +++++++ crates/atuin-client/src/hub.rs | 20 +- crates/atuin-client/src/settings.rs | 221 +++++++- crates/atuin-common/src/utils.rs | 4 + crates/atuin-daemon/Cargo.toml | 10 +- crates/atuin-daemon/build.rs | 6 +- crates/atuin-daemon/proto/control.proto | 62 +++ crates/atuin-daemon/proto/search.proto | 35 ++ crates/atuin-daemon/src/client.rs | 289 ++++++++++- crates/atuin-daemon/src/components/history.rs | 252 +++++++++ crates/atuin-daemon/src/components/mod.rs | 22 + crates/atuin-daemon/src/components/search.rs | 394 ++++++++++++++ crates/atuin-daemon/src/components/sync.rs | 257 +++++++++ crates/atuin-daemon/src/control/mod.rs | 12 + crates/atuin-daemon/src/control/service.rs | 71 +++ crates/atuin-daemon/src/daemon.rs | 450 ++++++++++++++++ crates/atuin-daemon/src/events.rs | 74 +++ crates/atuin-daemon/src/history.rs | 1 - crates/atuin-daemon/src/history/mod.rs | 6 + crates/atuin-daemon/src/lib.rs | 107 ++++ crates/atuin-daemon/src/search/index.rs | 572 +++++++++++++++++++++ crates/atuin-daemon/src/search/mod.rs | 11 + crates/atuin-daemon/src/server.rs | 360 ++++--------- crates/atuin-daemon/src/server/sync.rs | 96 ---- crates/atuin-daemon/tests/lifecycle.rs | 89 +++- crates/atuin/Cargo.toml | 2 + crates/atuin/src/command/client.rs | 185 ++++++- crates/atuin/src/command/client/daemon.rs | 103 +++- crates/atuin/src/command/client/history.rs | 12 + crates/atuin/src/command/client/search.rs | 5 + crates/atuin/src/command/client/search/engines.rs | 12 +- .../src/command/client/search/engines/daemon.rs | 206 ++++++++ .../atuin/src/command/client/search/engines/db.rs | 11 +- .../src/command/client/search/engines/skim.rs | 17 +- .../atuin/src/command/client/search/interactive.rs | 22 +- crates/atuin/src/command/client/store/rebuild.rs | 6 + 38 files changed, 3876 insertions(+), 461 deletions(-) create mode 100644 crates/atuin-daemon/proto/control.proto create mode 100644 crates/atuin-daemon/proto/search.proto create mode 100644 crates/atuin-daemon/src/components/history.rs create mode 100644 crates/atuin-daemon/src/components/mod.rs create mode 100644 crates/atuin-daemon/src/components/search.rs create mode 100644 crates/atuin-daemon/src/components/sync.rs create mode 100644 crates/atuin-daemon/src/control/mod.rs create mode 100644 crates/atuin-daemon/src/control/service.rs create mode 100644 crates/atuin-daemon/src/daemon.rs create mode 100644 crates/atuin-daemon/src/events.rs delete mode 100644 crates/atuin-daemon/src/history.rs create mode 100644 crates/atuin-daemon/src/history/mod.rs create mode 100644 crates/atuin-daemon/src/search/index.rs create mode 100644 crates/atuin-daemon/src/search/mod.rs delete mode 100644 crates/atuin-daemon/src/server/sync.rs create mode 100644 crates/atuin/src/command/client/search/engines/daemon.rs (limited to 'crates') diff --git a/crates/atuin-ai/src/commands.rs b/crates/atuin-ai/src/commands.rs index 7d5ca16b..b35cec9e 100644 --- a/crates/atuin-ai/src/commands.rs +++ b/crates/atuin-ai/src/commands.rs @@ -1,8 +1,13 @@ +use std::{ + fs, + path::{Path, PathBuf}, +}; + use atuin_common::shell::Shell; use clap::{Parser, Subcommand}; -use tracing::Level; +use eyre::Result; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt}; - #[cfg(debug_assertions)] pub mod debug_render; @@ -72,7 +77,11 @@ enum Commands { pub async fn run() -> eyre::Result<()> { let cli = Cli::parse(); - init_tracing(cli.verbose); + let settings = atuin_client::settings::Settings::new()?; + + if settings.logs.ai_enabled() { + init_logging(&settings, cli.verbose)?; + } match cli.command { Commands::Init { shell } => init::run(shell).await, @@ -89,6 +98,7 @@ pub async fn run() -> eyre::Result<()> { cli.api_token, keep, debug_state, + &settings, ) .await } @@ -104,39 +114,90 @@ pub async fn run() -> eyre::Result<()> { } } -fn init_tracing(verbose: bool) { - let level = if verbose { Level::DEBUG } else { Level::INFO }; +pub fn detect_shell() -> Option { + Some(Shell::current().to_string()) +} - // Create env filter - let env_filter = EnvFilter::from_default_env().add_directive( - format!("atuin_ai={}", level.as_str().to_lowercase()) - .parse() - .unwrap(), - ); +/// Initializes logging for the AI commands. +fn init_logging(settings: &atuin_client::settings::Settings, verbose: bool) -> Result<()> { + // ATUIN_LOG env var overrides config file level settings + let env_log_set = std::env::var("ATUIN_LOG").is_ok(); + + // Base filter from env var (or empty if not set) + let base_filter = + EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?); + + // Use config level unless ATUIN_LOG is set + let filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.ai_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + let filename = settings.logs.ai.file.clone(); + + // Clean up old log files + cleanup_old_logs(&log_dir, &filename, settings.logs.ai_retention()); - // Create console layer (only for verbose mode) let console_layer = if verbose { Some( fmt::layer() .with_writer(std::io::stderr) .with_ansi(true) .with_target(false) - .with_filter(env_filter), + .with_filter(filter.clone()), ) } else { None }; - // Initialize subscriber - let subscriber = tracing_subscriber::registry(); + let file_appender = RollingFileAppender::new(Rotation::DAILY, &log_dir, &filename); - if let Some(console) = console_layer { - subscriber.with(console).init(); + let base = tracing_subscriber::registry().with( + fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(filter), + ); + + if let Some(console_layer) = console_layer { + base.with(console_layer).init(); } else { - subscriber.init(); - } + base.init(); + }; + + Ok(()) } -pub fn detect_shell() -> Option { - Some(Shell::current().to_string()) +fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) { + let cutoff = std::time::SystemTime::now() + - std::time::Duration::from_secs(retention_days * 24 * 60 * 60); + + let Ok(entries) = fs::read_dir(log_dir) else { + return; + }; + + for entry in entries.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23" + if !name.starts_with(prefix) || name == prefix { + continue; + } + + if let Ok(metadata) = entry.metadata() + && let Ok(modified) = metadata.modified() + && modified < cutoff + { + let _ = fs::remove_file(&path); + } + } } diff --git a/crates/atuin-ai/src/commands/inline.rs b/crates/atuin-ai/src/commands/inline.rs index 3f9278a2..b49bfece 100644 --- a/crates/atuin-ai/src/commands/inline.rs +++ b/crates/atuin-ai/src/commands/inline.rs @@ -15,6 +15,7 @@ use eyre::{Context as _, Result, bail}; use futures::StreamExt; use reqwest::Url; use std::io::Write; +use tracing::{debug, error, info, trace}; pub async fn run( initial_command: Option, @@ -23,6 +24,7 @@ pub async fn run( api_token: Option, keep_output: bool, debug_state_file: Option, + settings: &atuin_client::settings::Settings, ) -> Result<()> { // Install panic hook once at entry point to ensure terminal restoration install_panic_hook(); @@ -31,7 +33,6 @@ pub async fn run( // 1. Command line arguments/environment variables // 2. Settings file // 3. Default - let settings = atuin_client::settings::Settings::new()?; let endpoint = api_endpoint.as_deref().unwrap_or( settings .ai @@ -44,7 +45,7 @@ pub async fn run( let token = if let Some(token) = &api_token { token.to_string() } else { - ensure_hub_session(&settings, endpoint).await? + ensure_hub_session(settings, endpoint).await? }; let action = run_inline_tui( @@ -57,6 +58,7 @@ pub async fn run( }, keep_output, debug_state_file, + settings, ) .await?; emit_shell_result(action.0, &action.1); @@ -69,9 +71,12 @@ async fn ensure_hub_session( hub_address: &str, ) -> Result { if let Some(token) = atuin_client::hub::get_session_token().await? { + debug!("Found Hub session, using existing token"); return Ok(token); } + info!("No Hub session found, prompting for authentication"); + println!("Atuin AI requires authenticating with Atuin Hub."); println!("This is separate from your sync setup."); println!("Press enter to begin (or esc to cancel)."); @@ -79,6 +84,8 @@ async fn ensure_hub_session( bail!("authentication canceled"); } + debug!("Starting Atuin Hub authentication..."); + println!("Authenticating with Atuin Hub..."); let mut auth_settings = settings.clone(); auth_settings.hub_address = hub_address.to_string(); @@ -93,6 +100,8 @@ async fn ensure_hub_session( ) .await?; + info!("Authentication complete, saving session token"); + atuin_client::hub::save_session(&token).await?; Ok(token) } @@ -141,6 +150,8 @@ fn create_chat_stream( } }; + debug!("Sending SSE request to {endpoint}"); + // Build request body let mut request_body = serde_json::json!({ "messages": messages, @@ -155,6 +166,7 @@ fn create_chat_stream( // Include session_id only if present (not on first request) if let Some(ref sid) = session_id { + trace!("Including session_id in request: {sid}"); request_body["session_id"] = serde_json::json!(sid); } @@ -178,12 +190,14 @@ fn create_chat_stream( let status = response.status(); if status == reqwest::StatusCode::UNAUTHORIZED { // Clear saved session on auth error + error!("SSE request failed with status: {status}, clearing session"); let _ = atuin_client::hub::delete_session().await; yield Err(eyre::eyre!("Hub session expired. Re-run to authenticate again.")); return; } if !status.is_success() { let body = response.text().await.unwrap_or_default(); + error!("SSE request failed ({}): {}", status, body); yield Err(eyre::eyre!("SSE request failed ({}): {}", status, body)); return; } @@ -197,7 +211,7 @@ fn create_chat_stream( let event_type = sse_event.event.as_str(); let data = sse_event.data.clone(); - tracing::debug!(event_type = %event_type, data = %data, "SSE event received"); + debug!(event_type = %event_type, "SSE event received"); match event_type { "text" => { @@ -245,8 +259,10 @@ fn create_chat_stream( "error" => { if let Ok(json) = serde_json::from_str::(&data) { let message = json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string(); + error!("SSE error: {}", message); yield Ok(ChatStreamEvent::Error(message)); } else { + error!("SSE error: {}", data); yield Ok(ChatStreamEvent::Error(data)); } break; @@ -391,6 +407,7 @@ async fn run_inline_tui( initial_prompt: Option, keep_output: bool, debug_state_file: Option, + settings: &atuin_client::settings::Settings, ) -> Result<(Action, String)> { // Initialize terminal guard and app state let mut guard = TerminalGuard::new(keep_output)?; @@ -425,7 +442,6 @@ async fn run_inline_tui( log_state!("init"); // Load theme - let settings = atuin_client::settings::Settings::new()?; let mut theme_manager = ThemeManager::new(None, None); let theme = theme_manager.load_theme(&settings.theme.name, None); @@ -486,12 +502,12 @@ async fn run_inline_tui( match stream.as_mut().poll_next(&mut cx) { std::task::Poll::Ready(Some(Ok(event))) => match event { ChatStreamEvent::TextChunk(text) => { - tracing::debug!(text = %text, "Processing TextChunk"); + trace!(text = %text, "Processing TextChunk"); app.state.append_streaming_text(&text); log_state!("text_chunk"); } ChatStreamEvent::ToolCall { id, name, input } => { - tracing::debug!(id = %id, name = %name, "Processing ToolCall"); + trace!(id = %id, name = %name, "Processing ToolCall"); app.state.add_tool_call(id, name, input); log_state!("tool_call"); } @@ -500,17 +516,17 @@ async fn run_inline_tui( content, is_error, } => { - tracing::debug!(tool_use_id = %tool_use_id, "Processing ToolResult"); + trace!(tool_use_id = %tool_use_id, "Processing ToolResult"); app.state.add_tool_result(tool_use_id, content, is_error); log_state!("tool_result"); } ChatStreamEvent::Status(status) => { - tracing::debug!(status = %status, "Processing Status"); + trace!(status = %status, "Processing Status"); app.state.update_streaming_status(&status); log_state!("status"); } ChatStreamEvent::Done { session_id } => { - tracing::debug!(session_id = %session_id, "Processing Done"); + trace!(session_id = %session_id, "Processing Done"); chat_stream = None; if !session_id.is_empty() { app.state.store_session_id(session_id); @@ -519,7 +535,7 @@ async fn run_inline_tui( log_state!("done"); } ChatStreamEvent::Error(msg) => { - tracing::debug!(error = %msg, "Processing Error"); + trace!(error = %msg, "Processing Error"); chat_stream = None; app.state.streaming_error(msg); log_state!("error"); @@ -544,7 +560,7 @@ async fn run_inline_tui( // Handle user cancellation (Esc during streaming) - drop the stream if app.state.was_interrupted && chat_stream.is_some() { - tracing::debug!("User cancelled streaming, dropping chat stream"); + debug!("User cancelled streaming, dropping chat stream"); chat_stream = None; app.state.was_interrupted = false; // Reset the flag } @@ -579,7 +595,7 @@ async fn run_inline_tui( token.clone(), app.state.session_id.clone(), messages, - &settings, + settings, )); } } diff --git a/crates/atuin-client/src/database.rs b/crates/atuin-client/src/database.rs index 5f292bec..7c63368d 100644 --- a/crates/atuin-client/src/database.rs +++ b/crates/atuin-client/src/database.rs @@ -138,9 +138,13 @@ pub trait Database: Send + Sync + 'static { async fn all_with_count(&self) -> Result>; + fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged; + async fn stats(&self, h: &History) -> Result; async fn get_dups(&self, before: i64, dupkeep: u32) -> Result>; + + fn clone_boxed(&self) -> Box; } // Intended for use on a developer machine and not a sync server. @@ -650,6 +654,10 @@ impl Database for Sqlite { Ok(res) } + fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged { + Paged::new(Box::new(self.clone()), page_size, include_deleted, unique) + } + // deleted_at doesn't mean the actual time that the user deleted it, // but the time that the system marks it as deleted async fn delete(&self, mut h: History) -> Result<()> { @@ -814,6 +822,70 @@ impl Database for Sqlite { Ok(res) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } +} + +pub struct Paged { + database: Box, + page_size: usize, + last_id: Option, + include_deleted: bool, + unique: bool, +} + +impl Paged { + pub fn new( + database: Box, + page_size: usize, + include_deleted: bool, + unique: bool, + ) -> Self { + Self { + database, + page_size, + last_id: None, + include_deleted, + unique, + } + } + + pub async fn next(&mut self) -> Result>> { + let mut query = SqlBuilder::select_from(SqlName::new("history").alias("h").baquoted()); + + query.field("*").order_desc("id"); + + if !self.include_deleted { + query.and_where_is_null("deleted_at"); + } + + if self.unique { + // We want to deduplicate on command, but the user can search via cwd, hostname, and session. + // Without those fields, filter modes won't work right. With those fields, we get duplicates. + // This must be handled upstream. + query + .group_by("command, cwd, hostname, session") + .having("max(timestamp)"); + } + + query.limit(self.page_size); + + if let Some(last_id) = &self.last_id { + query.and_where_lt("id", quote(last_id)); + } + + let query = query.sql().expect("bug in list query. please report"); + let res = self.database.query_history(&query).await?; + + if res.is_empty() { + Ok(None) + } else { + self.last_id = Some(res.last().unwrap().id.0.clone()); + Ok(Some(res)) + } + } } trait SqlBuilderExt { @@ -1165,6 +1237,126 @@ mod test { .unwrap(); } + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_basic() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add 5 history items + for i in 0..5 { + new_history_item(&mut db, &format!("command{}", i)) + .await + .unwrap(); + } + + // Create a paged iterator with page_size of 2 + let mut paged = db.all_paged(2, false, false); + + // First page should have 2 items + let page1 = paged.next().await.unwrap(); + assert!(page1.is_some()); + assert_eq!(page1.unwrap().len(), 2); + + // Second page should have 2 items + let page2 = paged.next().await.unwrap(); + assert!(page2.is_some()); + assert_eq!(page2.unwrap().len(), 2); + + // Third page should have 1 item + let page3 = paged.next().await.unwrap(); + assert!(page3.is_some()); + assert_eq!(page3.unwrap().len(), 1); + + // Fourth page should be None (exhausted) + let page4 = paged.next().await.unwrap(); + assert!(page4.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_empty() { + let db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Create a paged iterator on empty database + let mut paged = db.all_paged(10, false, false); + + // Should return None immediately + let page = paged.next().await.unwrap(); + assert!(page.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_unique() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add duplicate commands + new_history_item(&mut db, "duplicate").await.unwrap(); + new_history_item(&mut db, "duplicate").await.unwrap(); + new_history_item(&mut db, "unique1").await.unwrap(); + new_history_item(&mut db, "unique2").await.unwrap(); + + // Without unique flag - should get all 4 + let mut paged = db.all_paged(10, false, false); + let page = paged.next().await.unwrap().unwrap(); + assert_eq!(page.len(), 4); + + // With unique flag - should get 3 (duplicates collapsed) + let mut paged_unique = db.all_paged(10, false, true); + let page_unique = paged_unique.next().await.unwrap().unwrap(); + assert_eq!(page_unique.len(), 3); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_paged_include_deleted() { + let mut db = Sqlite::new("sqlite::memory:", test_local_timeout()) + .await + .unwrap(); + + // Add items + new_history_item(&mut db, "keep1").await.unwrap(); + new_history_item(&mut db, "keep2").await.unwrap(); + new_history_item(&mut db, "delete_me").await.unwrap(); + + // Delete one item + let all = db + .list( + &[], + &Context { + hostname: "".to_string(), + session: "".to_string(), + cwd: "".to_string(), + host_id: "".to_string(), + git_root: None, + }, + None, + false, + false, + ) + .await + .unwrap(); + + let to_delete = all + .iter() + .find(|h| h.command == "delete_me") + .unwrap() + .clone(); + db.delete(to_delete).await.unwrap(); + + // Without include_deleted - should get 2 + let mut paged = db.all_paged(10, false, false); + let page = paged.next().await.unwrap().unwrap(); + assert_eq!(page.len(), 2); + + // With include_deleted - should get 3 + let mut paged_deleted = db.all_paged(10, true, false); + let page_deleted = paged_deleted.next().await.unwrap().unwrap(); + assert_eq!(page_deleted.len(), 3); + } + #[tokio::test(flavor = "multi_thread")] async fn test_search_bench_dupes() { let context = Context { diff --git a/crates/atuin-client/src/hub.rs b/crates/atuin-client/src/hub.rs index 5b34574b..b94c69ea 100644 --- a/crates/atuin-client/src/hub.rs +++ b/crates/atuin-client/src/hub.rs @@ -58,10 +58,14 @@ impl HubAuthSession { /// /// Returns a session containing the code and auth URL that the user should visit. pub async fn start(settings: &Settings) -> Result { + debug!("Starting Hub authentication process..."); + let code_response = request_code(&settings.hub_address) .await .context("Failed to request authentication code from Hub")?; + debug!("Received code from Hub"); + let code = code_response.code; let auth_url = format!("{}/auth/cli?code={}", settings.hub_address, code); @@ -79,8 +83,10 @@ impl HubAuthSession { match verify_code(&self.hub_address, &self.code).await { Ok(response) => { if let Some(token) = response.token { + debug!("Authentication complete, received token"); Ok(HubAuthStatus::Complete(token)) } else if let Some(error) = response.error { + error!("Authentication failed: {}", error); Ok(HubAuthStatus::Failed(error)) } else { Ok(HubAuthStatus::Pending) @@ -105,8 +111,11 @@ impl HubAuthSession { ) -> Result { let start = std::time::Instant::now(); + debug!("Polling for Hub authentication completion..."); + loop { if start.elapsed() > timeout { + warn!("Authentication loop exited due to timeout"); bail!("Authentication timed out. Please try again."); } @@ -181,17 +190,21 @@ async fn handle_resp_error(resp: reqwest::Response) -> Result let status = resp.status(); if status == StatusCode::SERVICE_UNAVAILABLE { + error!("Service unavailable: check https://status.atuin.sh"); bail!("Service unavailable: check https://status.atuin.sh"); } if status == StatusCode::TOO_MANY_REQUESTS { + error!("Rate limited; please wait before trying again"); bail!("Rate limited; please wait before trying again"); } if !status.is_success() { if let Ok(error) = resp.json::().await { + error!("Hub error: {} - {}", status, error.reason); bail!("Hub error: {} - {}", status, error.reason); } + error!("Hub request failed with status: {}", status); bail!("Hub request failed with status: {}", status); } @@ -204,6 +217,8 @@ async fn request_code(address: &str) -> Result { let url = make_url(address, "/auth/cli/code")?; let client = reqwest::Client::new(); + debug!("Requesting code from Hub at {url}"); + let resp = client .post(&url) .header(USER_AGENT, APP_USER_AGENT) @@ -219,9 +234,12 @@ async fn request_code(address: &str) -> Result { /// Poll to verify the CLI auth code and get the session token async fn verify_code(address: &str, code: &str) -> Result { ensure_crypto_provider(); - let url = make_url(address, &format!("/auth/cli/verify?code={}", code))?; + let base = make_url(address, "/auth/cli/verify")?; + let url = format!("{base}?code={code}"); let client = reqwest::Client::new(); + debug!("Verifying code with Hub at {base}?code=******"); + let resp = client .post(&url) .header(USER_AGENT, APP_USER_AGENT) diff --git a/crates/atuin-client/src/settings.rs b/crates/atuin-client/src/settings.rs index a15ce461..8e874832 100644 --- a/crates/atuin-client/src/settings.rs +++ b/crates/atuin-client/src/settings.rs @@ -42,6 +42,10 @@ pub enum SearchMode { #[serde(rename = "skim")] Skim, + + #[serde(rename = "daemon-fuzzy")] + #[clap(aliases = &["daemon-fuzzy"])] + DaemonFuzzy, } impl SearchMode { @@ -51,6 +55,7 @@ impl SearchMode { SearchMode::FullText => "FULLTXT", SearchMode::Fuzzy => "FUZZY", SearchMode::Skim => "SKIM", + SearchMode::DaemonFuzzy => "DAEMON", } } pub fn next(&self, settings: &Settings) -> Self { @@ -58,9 +63,13 @@ impl SearchMode { SearchMode::Prefix => SearchMode::FullText, // if the user is using skim, we go to skim SearchMode::FullText if settings.search_mode == SearchMode::Skim => SearchMode::Skim, + // if the user is using daemon-fuzzy, we go to daemon-fuzzy + SearchMode::FullText if settings.search_mode == SearchMode::DaemonFuzzy => { + SearchMode::DaemonFuzzy + } // otherwise fuzzy. SearchMode::FullText => SearchMode::Fuzzy, - SearchMode::Fuzzy | SearchMode::Skim => SearchMode::Prefix, + SearchMode::Fuzzy | SearchMode::Skim | SearchMode::DaemonFuzzy => SearchMode::Prefix, } } } @@ -477,6 +486,78 @@ pub struct Tmux { pub height: String, } +/// Log level for file logging. Maps to tracing's LevelFilter. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum LogLevel { + Trace, + Debug, + #[default] + Info, + Warn, + Error, +} + +impl LogLevel { + /// Convert to a tracing directive string for use with EnvFilter. + pub fn as_directive(&self) -> &'static str { + match self { + LogLevel::Trace => "trace", + LogLevel::Debug => "debug", + LogLevel::Info => "info", + LogLevel::Warn => "warn", + LogLevel::Error => "error", + } + } +} + +/// Configuration for a specific log type (search or daemon). +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct LogConfig { + /// Log file name (relative to dir) or absolute path. + pub file: String, + + /// Override global enabled setting for this log type. + pub enabled: Option, + + /// Override global level setting for this log type. + pub level: Option, + + /// Override global retention days setting for this log type. + pub retention: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Logs { + /// Enable file logging globally. Defaults to true. + #[serde(default = "Logs::default_enabled")] + pub enabled: bool, + + /// Directory for log files. Defaults to ~/.atuin/logs + pub dir: String, + + /// Default log level for file logging. Defaults to "info". + /// Note: ATUIN_LOG environment variable overrides this. + #[serde(default)] + pub level: LogLevel, + + /// Default retention days for log files. Defaults to 4. + #[serde(default = "Logs::default_retention")] + pub retention: u64, + + /// Search log settings + #[serde(default)] + pub search: LogConfig, + + /// Daemon log settings + #[serde(default)] + pub daemon: LogConfig, + + /// AI log settings + #[serde(default)] + pub ai: LogConfig, +} + #[derive(Default, Clone, Debug, Deserialize, Serialize)] pub struct Ai { /// The address of the Atuin AI endpoint. Used for AI features like command generation. @@ -523,6 +604,117 @@ impl Default for Daemon { } } +impl Default for Logs { + fn default() -> Self { + Self { + enabled: true, + dir: "".to_string(), + level: LogLevel::default(), + retention: Self::default_retention(), + search: LogConfig { + file: "search.log".to_string(), + ..Default::default() + }, + daemon: LogConfig { + file: "daemon.log".to_string(), + ..Default::default() + }, + ai: LogConfig { + file: "ai.log".to_string(), + ..Default::default() + }, + } + } +} + +impl Logs { + fn default_enabled() -> bool { + true + } + + fn default_retention() -> u64 { + 4 + } + + /// Returns whether search logging is enabled. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_enabled(&self) -> bool { + self.search.enabled.unwrap_or(self.enabled) + } + + /// Returns whether daemon logging is enabled. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_enabled(&self) -> bool { + self.daemon.enabled.unwrap_or(self.enabled) + } + + /// Returns whether AI logging is enabled. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_enabled(&self) -> bool { + self.ai.enabled.unwrap_or(self.enabled) + } + + /// Returns the log level for search logging. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_level(&self) -> LogLevel { + self.search.level.unwrap_or(self.level) + } + + /// Returns the log level for daemon logging. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_level(&self) -> LogLevel { + self.daemon.level.unwrap_or(self.level) + } + + /// Returns the log level for AI logging. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_level(&self) -> LogLevel { + self.ai.level.unwrap_or(self.level) + } + + /// Returns the retention days for search logging. + /// Uses search-specific setting if set, otherwise falls back to global. + pub fn search_retention(&self) -> u64 { + self.search.retention.unwrap_or(self.retention) + } + + /// Returns the retention days for daemon logging. + /// Uses daemon-specific setting if set, otherwise falls back to global. + pub fn daemon_retention(&self) -> u64 { + self.daemon.retention.unwrap_or(self.retention) + } + + /// Returns the retention days for AI logging. + /// Uses AI-specific setting if set, otherwise falls back to global. + pub fn ai_retention(&self) -> u64 { + self.ai.retention.unwrap_or(self.retention) + } + + /// Returns the full path for the search log file. + /// If `file` is an absolute path, returns it directly. + /// Otherwise, joins it with `dir`. + pub fn search_path(&self) -> PathBuf { + let path = PathBuf::from(&self.search.file); + if path.is_absolute() { + path + } else { + PathBuf::from(&self.dir).join(path) + } + } + + /// Returns the full path for the daemon log file. + /// If `file` is an absolute path, returns it directly. + /// Otherwise, joins it with `dir`. + pub fn daemon_path(&self) -> PathBuf { + let path = PathBuf::from(&self.daemon.file); + if path.is_absolute() { + path + } else { + PathBuf::from(&self.dir).join(path) + } + } +} + impl Default for Search { fn default() -> Self { Self { @@ -848,6 +1040,9 @@ pub struct Settings { #[serde(default)] pub tmux: Tmux, + #[serde(default)] + pub logs: Logs, + #[serde(default)] pub meta: meta::Settings, @@ -1033,6 +1228,7 @@ impl Settings { let scripts_path = data_dir.join("scripts.db"); let socket_path = atuin_common::utils::runtime_dir().join("atuin.sock"); let pidfile_path = data_dir.join("atuin-daemon.pid"); + let logs_dir = atuin_common::utils::logs_dir(); let key_path = data_dir.join("key"); let meta_path = data_dir.join("meta.db"); @@ -1101,6 +1297,12 @@ impl Settings { .set_default("daemon.pidfile_path", pidfile_path.to_str())? .set_default("daemon.systemd_socket", false)? .set_default("daemon.tcp_port", 8889)? + .set_default("logs.enabled", true)? + .set_default("logs.dir", logs_dir.to_str())? + .set_default("logs.level", "info")? + .set_default("logs.search.file", "search.log")? + .set_default("logs.daemon.file", "daemon.log")? + .set_default("logs.ai.file", "ai.log")? .set_default("kv.db_path", kv_path.to_str())? .set_default("scripts.db_path", scripts_path.to_str())? .set_default("meta.db_path", meta_path.to_str())? @@ -1218,6 +1420,9 @@ impl Settings { settings.key_path = Self::expand_path(settings.key_path)?; settings.daemon.socket_path = Self::expand_path(settings.daemon.socket_path)?; settings.daemon.pidfile_path = Self::expand_path(settings.daemon.pidfile_path)?; + settings.logs.dir = Self::expand_path(settings.logs.dir)?; + settings.logs.search.file = Self::expand_path(settings.logs.search.file)?; + settings.logs.daemon.file = Self::expand_path(settings.logs.daemon.file)?; // Validate UI settings settings.ui.validate()?; @@ -1264,6 +1469,20 @@ impl Default for Settings { } } +/// Initialize the meta store configuration for testing. +/// +/// This should only be used in tests. It allows tests to bypass the normal +/// Settings::new() flow while still being able to use Settings::host_id() +/// and other meta store dependent functions. +/// +/// # Safety +/// This function is not thread-safe with concurrent calls to Settings::new() +/// or other meta store initialization. Only call from tests. +#[doc(hidden)] +pub fn init_meta_config_for_testing(meta_db_path: impl Into, local_timeout: f64) { + META_CONFIG.set((meta_db_path.into(), local_timeout)).ok(); +} + #[cfg(test)] pub(crate) fn test_local_timeout() -> f64 { std::env::var("ATUIN_TEST_LOCAL_TIMEOUT") diff --git a/crates/atuin-common/src/utils.rs b/crates/atuin-common/src/utils.rs index bb291ebf..b885423e 100644 --- a/crates/atuin-common/src/utils.rs +++ b/crates/atuin-common/src/utils.rs @@ -88,6 +88,10 @@ pub fn runtime_dir() -> PathBuf { std::env::var("XDG_RUNTIME_DIR").map_or_else(|_| data_dir(), PathBuf::from) } +pub fn logs_dir() -> PathBuf { + home_dir().join(".atuin").join("logs") +} + pub fn dotfiles_cache_dir() -> PathBuf { // In most cases, this will be ~/.local/share/atuin/dotfiles/cache let data_dir = std::env::var("XDG_DATA_HOME") diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml index 36917789..97ed88ea 100644 --- a/crates/atuin-daemon/Cargo.toml +++ b/crates/atuin-daemon/Cargo.toml @@ -14,9 +14,10 @@ readme.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -atuin-client = { path = "../atuin-client", version = "18.13.0-beta.1" } -atuin-dotfiles = { path = "../atuin-dotfiles", version = "18.13.0-beta.1" } -atuin-history = { path = "../atuin-history", version = "18.13.0-beta.1" } +atuin-client = { path = "../atuin-client" } +atuin-common = { path = "../atuin-common" } +atuin-dotfiles = { path = "../atuin-dotfiles" } +atuin-history = { path = "../atuin-history" } time = { workspace = true } uuid = { workspace = true } @@ -32,10 +33,11 @@ tonic = "0.14" tonic-prost = "0.14" prost = "0.14" prost-types = "0.14" -tokio-stream = {version="0.1.14", features=["net"]} +tokio-stream = { version = "0.1.14", features = ["net"] } hyper-util = "0.1" rand.workspace = true +nucleo = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" } [target.'cfg(target_os = "linux")'.dependencies] listenfd = "1.0.1" diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs index fbe34d12..7034aa04 100644 --- a/crates/atuin-daemon/build.rs +++ b/crates/atuin-daemon/build.rs @@ -3,7 +3,11 @@ use std::{env, fs, path::PathBuf}; use protox::prost::Message; fn main() -> std::io::Result<()> { - let proto_paths = ["proto/history.proto"]; + let proto_paths = [ + "proto/history.proto", + "proto/search.proto", + "proto/control.proto", + ]; let proto_include_dirs = ["proto"]; let file_descriptors = protox::compile(proto_paths, proto_include_dirs).unwrap(); diff --git a/crates/atuin-daemon/proto/control.proto b/crates/atuin-daemon/proto/control.proto new file mode 100644 index 00000000..06347902 --- /dev/null +++ b/crates/atuin-daemon/proto/control.proto @@ -0,0 +1,62 @@ +syntax = "proto3"; +package control; + +// The Control service allows external processes (CLI commands, etc.) +// to inject events into the running daemon. +service Control { + // Send an event to the daemon's event bus + rpc SendEvent(SendEventRequest) returns (SendEventResponse); +} + +message SendEventRequest { + oneof event { + // History was pruned - search index needs full rebuild + HistoryPrunedEvent history_pruned = 1; + + // Specific history items were deleted + HistoryDeletedEvent history_deleted = 2; + + // Request immediate sync + ForceSyncEvent force_sync = 3; + + // Settings have changed, reload if needed + SettingsReloadedEvent settings_reloaded = 4; + + // Request graceful shutdown + ShutdownEvent shutdown = 5; + + // History was rebuilt - search index needs full rebuild + HistoryRebuiltEvent history_rebuilt = 6; + } +} + +message SendEventResponse { + // Empty on success; errors come through gRPC status +} + +// Individual event message types + +message HistoryPrunedEvent { + // No fields needed - just signals that pruning happened +} + +message HistoryRebuiltEvent { + // No fields needed - just signals that rebuilding happened +} + +message HistoryDeletedEvent { + // IDs of deleted history items (UUIDs as strings) + repeated string ids = 1; +} + +message ForceSyncEvent { + // No fields needed - just triggers sync +} + +message SettingsReloadedEvent { + // No fields needed - components should re-read settings +} + +message ShutdownEvent { + // No fields needed - triggers graceful shutdown +} diff --git a/crates/atuin-daemon/proto/search.proto b/crates/atuin-daemon/proto/search.proto new file mode 100644 index 00000000..6b84acbd --- /dev/null +++ b/crates/atuin-daemon/proto/search.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; +package search; + +enum FilterMode { + GLOBAL = 0; + HOST = 1; + SESSION = 2; + DIRECTORY = 3; + WORKSPACE = 4; + SESSION_PRELOAD = 5; +} + +message SearchContext { + string session_id = 1; + string cwd = 2; + string hostname = 3; + string host_id = 4; + optional string git_root = 5; +} + +message SearchRequest { + string query = 1; + uint64 query_id = 2; // Incrementing ID to match responses to queries + FilterMode filter_mode = 3; + SearchContext context = 4; +} + +message SearchResponse { + uint64 query_id = 1; // Echo back the query ID + repeated bytes ids = 2; +} + +service Search { + rpc Search(stream SearchRequest) returns (stream SearchResponse); +} diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs index 3b76a680..2f492f6b 100644 --- a/crates/atuin-daemon/src/client.rs +++ b/crates/atuin-daemon/src/client.rs @@ -1,4 +1,6 @@ -use eyre::{Context, Result}; +use atuin_client::database::Context; +use atuin_client::settings::{FilterMode, Settings}; +use eyre::{Context as EyreContext, Result}; #[cfg(windows)] use tokio::net::TcpStream; use tonic::Code; @@ -11,11 +13,22 @@ use hyper_util::rt::TokioIo; use tokio::net::UnixStream; use atuin_client::history::History; +use tracing::{Level, instrument, span}; +use crate::control::HistoryRebuiltEvent; +use crate::control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, + SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, +}; +use crate::events::DaemonEvent; use crate::history::{ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient, }; +use crate::search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, + search_client::SearchClient as SearchServiceClient, +}; pub struct HistoryClient { client: HistoryServiceClient, @@ -52,6 +65,8 @@ pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { impl HistoryClient { #[cfg(unix)] pub async fn new(path: String) -> Result { + use eyre::Context; + let log_path = path.clone(); let channel = Endpoint::try_from("http://atuin_local_daemon:0")? .connect_with_connector(service_fn(move |_: Uri| { @@ -130,3 +145,275 @@ impl HistoryClient { Ok(resp.accepted) } } + +pub struct SearchClient { + client: SearchServiceClient, +} + +impl SearchClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] + pub async fn search( + &mut self, + query: String, + query_id: u64, + filter_mode: FilterMode, + context: Option, + ) -> Result> { + let request = SearchRequest { + query, + query_id, + filter_mode: RpcFilterMode::from(filter_mode).into(), + context: context.map(RpcSearchContext::from), + }; + let request_stream = tokio_stream::once(request); + let response = span!(Level::TRACE, "daemon_client_search.request") + .in_scope(async || self.client.search(request_stream).await) + .await?; + + Ok(response.into_inner()) + } +} + +impl From for RpcFilterMode { + fn from(filter_mode: FilterMode) -> Self { + match filter_mode { + FilterMode::Global => RpcFilterMode::Global, + FilterMode::Host => RpcFilterMode::Host, + FilterMode::Session => RpcFilterMode::Session, + FilterMode::Directory => RpcFilterMode::Directory, + FilterMode::Workspace => RpcFilterMode::Workspace, + FilterMode::SessionPreload => RpcFilterMode::SessionPreload, + } + } +} + +impl From for RpcSearchContext { + fn from(context: Context) -> Self { + RpcSearchContext { + session_id: context.session, + cwd: context.cwd, + hostname: context.hostname, + host_id: context.host_id, + git_root: context + .git_root + .map(|path| path.to_string_lossy().to_string()), + } + } +} + +// ============================================================================ +// Control Client +// ============================================================================ + +/// Client for the Control gRPC service. +/// +/// Used to inject events into a running daemon from external processes. +pub struct ControlClient { + client: ControlServiceClient, +} + +impl ControlClient { + /// Connect to the daemon's control service. + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect to the daemon's control service. + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect using settings. + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.socket_path.clone()).await + } + + /// Connect using settings. + #[cfg(not(unix))] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.tcp_port).await + } + + /// Send an event to the daemon. + pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { + let proto_event = daemon_event_to_proto(event); + let request = SendEventRequest { + event: Some(proto_event), + }; + self.client.send_event(request).await?; + Ok(()) + } +} + +/// Convert a daemon event to its proto representation. +fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event { + use crate::control::send_event_request::Event; + + match event { + DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), + DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), + DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { + ids: ids.into_iter().map(|id| id.0).collect(), + }), + DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), + DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), + DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), + // These events are internal and not sent via the control service + DaemonEvent::HistoryStarted(_) + | DaemonEvent::HistoryEnded(_) + | DaemonEvent::RecordsAdded(_) + | DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } => { + // Use shutdown as a fallback, though this shouldn't happen + tracing::warn!("attempted to send internal event via control service"); + Event::Shutdown(ShutdownEvent {}) + } + } +} + +// ============================================================================ +// Convenience Functions +// ============================================================================ + +/// Emit an event to the daemon. +/// +/// This is a fire-and-forget helper for sending events to the daemon from +/// external processes like CLI commands. If the daemon isn't running, this +/// will silently succeed (returns Ok). +/// +/// # Example +/// +/// ```ignore +/// // After pruning history +/// emit_event(DaemonEvent::HistoryPruned).await?; +/// +/// // After deleting specific history items +/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; +/// +/// // Request immediate sync +/// emit_event(DaemonEvent::ForceSync).await?; +/// ``` +pub async fn emit_event(event: DaemonEvent) -> Result<()> { + emit_event_with_settings(event, None).await +} + +/// Emit an event to the daemon with explicit settings. +/// +/// If settings are not provided, they will be loaded from the default location. +/// If the daemon isn't running, this will silently succeed. +pub async fn emit_event_with_settings( + event: DaemonEvent, + settings: Option<&Settings>, +) -> Result<()> { + // Load settings if not provided + let owned_settings; + let settings = match settings { + Some(s) => s, + None => { + owned_settings = Settings::new()?; + &owned_settings + } + }; + + // Try to connect - if daemon isn't running, that's fine + let mut client = match ControlClient::from_settings(settings).await { + Ok(c) => c, + Err(e) => { + tracing::debug!(?e, "daemon not running, skipping event emission"); + return Ok(()); + } + }; + + // Send the event + if let Err(e) = client.send_event(event).await { + tracing::debug!(?e, "failed to send event to daemon"); + // Don't fail - this is fire-and-forget + } + + Ok(()) +} diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs new file mode 100644 index 00000000..23d48c5e --- /dev/null +++ b/crates/atuin-daemon/src/components/history.rs @@ -0,0 +1,252 @@ +//! History component. +//! +//! Handles command history lifecycle (start/end) and provides the History gRPC service. + +use std::sync::Arc; + +use atuin_client::{ + database::Database, + history::{History, HistoryId, store::HistoryStore}, + settings::Settings, +}; +use dashmap::DashMap; +use eyre::Result; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Level, instrument}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + history::{ + EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, + StartHistoryRequest, StatusReply, StatusRequest, + history_server::{History as HistorySvc, HistoryServer}, + }, +}; + +const DAEMON_PROTOCOL_VERSION: u32 = 1; + +/// History component - manages command history lifecycle. +/// +/// This component: +/// - Tracks currently running commands (stored in memory) +/// - Saves completed commands to the database and record store +/// - Emits history events for other components (e.g., search indexing) +/// - Provides the History gRPC service +pub struct HistoryComponent { + inner: Arc, +} + +struct HistoryComponentInner { + /// Commands currently running (not yet completed). + running: DashMap, + + /// Handle to the daemon (set during start). + handle: tokio::sync::RwLock>, + + /// History store for pushing records (set during start). + history_store: tokio::sync::RwLock>, +} + +impl HistoryComponent { + /// Create a new history component. + pub fn new() -> Self { + Self { + inner: Arc::new(HistoryComponentInner { + running: DashMap::new(), + handle: tokio::sync::RwLock::new(None), + history_store: tokio::sync::RwLock::new(None), + }), + } + } + + /// Get the gRPC service for this component. + /// + /// This returns a tonic service that can be added to a gRPC server. + pub fn grpc_service(&self) -> HistoryServer { + HistoryServer::new(HistoryGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for HistoryComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for HistoryComponent { + fn name(&self) -> &'static str { + "history" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + // Create the history store + let host_id = Settings::host_id().await?; + let history_store = + HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); + + *self.inner.history_store.write().await = Some(history_store); + *self.inner.handle.write().await = Some(handle); + + tracing::info!("history component started"); + Ok(()) + } + + async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { + // History component produces events but doesn't need to react to them + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + tracing::info!("history component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +/// +/// This is a thin wrapper that delegates to the component's shared state. +pub struct HistoryGrpcService { + inner: Arc, +} + +#[tonic::async_trait] +impl HistorySvc for HistoryGrpcService { + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .build() + .into(); + + // Emit the event + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.emit(DaemonEvent::HistoryStarted(h.clone())); + } + + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + self.inner.running.insert(id.clone(), h); + + let reply = StartHistoryReply { + id: id.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let id = HistoryId(req.id); + + if let Some((_, mut history)) = self.inner.running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Get the handle and store to save the history + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let store_guard = self.inner.history_store.read().await; + let history_store = store_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + // Save to database + handle + .history_db() + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + // Push to record store + let (record_id, idx) = history_store + .push(history.clone()) + .await + .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; + + // Emit the event + handle.emit(DaemonEvent::HistoryEnded(history)); + + let reply = EndHistoryReply { + id: record_id.0.to_string(), + idx, + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn status( + &self, + _request: Request, + ) -> Result, Status> { + let reply = StatusReply { + healthy: true, + version: env!("CARGO_PKG_VERSION").to_string(), + pid: std::process::id(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn shutdown( + &self, + _request: Request, + ) -> Result, Status> { + // Use the daemon handle to request shutdown + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.shutdown(); + } + Ok(Response::new(ShutdownReply { accepted: true })) + } +} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs new file mode 100644 index 00000000..5950d5d5 --- /dev/null +++ b/crates/atuin-daemon/src/components/mod.rs @@ -0,0 +1,22 @@ +//! Daemon components. +//! +//! Components are the building blocks of the daemon. Each component handles +//! a specific domain and can: +//! +//! - Expose gRPC services +//! - React to events +//! - Spawn background tasks +//! +//! Available components: +//! +//! - [`history::HistoryComponent`]: Command history lifecycle management +//! - [`search::SearchComponent`]: Fuzzy search over history +//! - [`sync::SyncComponent`]: Cloud sync + +pub mod history; +pub mod search; +pub mod sync; + +pub use history::HistoryComponent; +pub use search::SearchComponent; +pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs new file mode 100644 index 00000000..7fb59dea --- /dev/null +++ b/crates/atuin-daemon/src/components/search.rs @@ -0,0 +1,394 @@ +//! Search component. +//! +//! Provides fuzzy search over command history using the Nucleo search library +//! with frecency-based ranking and dynamic filtering. + +use std::{pin::Pin, sync::Arc}; + +use atuin_client::database::Database; +use eyre::Result; +use tokio::sync::RwLock; +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, debug, info, instrument, span, trace}; +use uuid::Uuid; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + search::{ + FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + search_server::{Search as SearchSvc, SearchServer}, + }, +}; + +const PAGE_SIZE: usize = 5000; +const RESULTS_LIMIT: u32 = 200; +/// How often to rebuild the frecency map (in seconds). +const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; + +/// Search component - provides fuzzy search over command history. +/// +/// This component: +/// - Maintains a deduplicated search index with frecency ranking +/// - Loads history from the database on startup +/// - Updates the index when history events occur +/// - Provides the Search gRPC service +pub struct SearchComponent { + index: Arc>, + handle: tokio::sync::RwLock>, + loader_handle: Option>, + frecency_handle: Option>, +} + +impl SearchComponent { + /// Create a new search component. + pub fn new() -> Self { + Self { + index: Arc::new(RwLock::new(SearchIndex::new())), + handle: tokio::sync::RwLock::new(None), + loader_handle: None, + frecency_handle: None, + } + } + + /// Get the gRPC service for this component. + pub fn grpc_service(&self) -> SearchServer { + SearchServer::new(SearchGrpcService { + index: self.index.clone(), + }) + } + + /// Rebuild the entire search index from the database. + async fn rebuild_index(&self) -> Result<()> { + let handle_guard = self.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("component not initialized"))?; + + info!("Rebuilding search index from database"); + + // Create a new index + let new_index = SearchIndex::new(); + + // Load all history into the new index + let db = handle.history_db().clone(); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + new_index.add_histories(&histories); + } + Ok(None) => break, + Err(e) => { + tracing::error!("Failed to load history during rebuild: {}", e); + break; + } + } + } + + info!( + "Search index rebuild complete; {} unique commands", + new_index.command_count() + ); + + // Replace the old index with the new one + *self.index.write().await = new_index; + Ok(()) + } +} + +impl Default for SearchComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SearchComponent { + fn name(&self) -> &'static str { + "search" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + *self.handle.write().await = Some(handle.clone()); + + // Spawn background task to load history into index + let index = self.index.clone(); + let db = handle.history_db().clone(); + + self.loader_handle = Some(tokio::spawn(async move { + info!( + "Loading history into search index; page size = {}", + PAGE_SIZE + ); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + index.read().await.add_histories(&histories); + } + Ok(None) => { + info!( + "Initial history load complete; {} unique commands indexed", + index.read().await.command_count() + ); + // Build initial frecency map + index.read().await.rebuild_frecency().await; + info!("Initial frecency map built"); + break; + } + Err(e) => { + tracing::error!("Failed to load history: {}", e); + break; + } + } + } + })); + + // Spawn background task to periodically refresh frecency + let index_for_frecency = self.index.clone(); + self.frecency_handle = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + FRECENCY_REFRESH_INTERVAL_SECS, + )); + loop { + interval.tick().await; + trace!("Refreshing frecency map"); + index_for_frecency.read().await.rebuild_frecency().await; + } + })); + + tracing::info!("search component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::RecordsAdded(records) => { + debug!( + count = records.len(), + "Processing added records for search index" + ); + + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let histories: Vec<_> = handle + .history_db() + .query_history( + format!( + "select * from history where id in ({})", + records + .iter() + .map(|record| record.0.to_string()) + .collect::>() + .join(",") + ) + .as_str(), + ) + .await + .unwrap_or_default(); + + span!(Level::TRACE, "inject_records", count = histories.len()) + .in_scope(async || { + self.index.read().await.add_histories(&histories); + }) + .await; + } + } + DaemonEvent::HistoryStarted(history) => { + debug!(id = %history.id, command = %history.command, "History started (no index action)"); + } + DaemonEvent::HistoryEnded(history) => { + span!(Level::TRACE, "inject_history_ended") + .in_scope(async || { + self.index.read().await.add_history(history); + }) + .await; + } + DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { + info!("History store pruned or rebuilt, rebuilding search index"); + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::HistoryDeleted { ids } => { + info!( + count = ids.len(), + "History deleted, rebuilding search index" + ); + // For now, just rebuild the entire index. A more efficient implementation + // would remove specific items from the index. + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + // Events we don't care about + DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } + | DaemonEvent::ForceSync + | DaemonEvent::SettingsReloaded + | DaemonEvent::ShutdownRequested => {} + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(handle) = self.loader_handle.take() { + handle.abort(); + } + if let Some(handle) = self.frecency_handle.take() { + handle.abort(); + } + tracing::info!("search component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +pub struct SearchGrpcService { + index: Arc>, +} + +#[tonic::async_trait] +impl SearchSvc for SearchGrpcService { + type SearchStream = Pin> + Send>>; + + #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] + async fn search( + &self, + request: Request>, + ) -> Result, Status> { + let mut in_stream = request.into_inner(); + let index = self.index.clone(); + + // Create output channel + let (tx, rx) = tokio::sync::mpsc::channel::>(128); + + // Spawn task to handle incoming requests and send responses + tokio::spawn(async move { + while let Some(req) = in_stream.message().await.transpose() { + match req { + Ok(search_req) => { + let query = search_req.query; + let query_id = search_req.query_id; + let filter_mode: FilterMode = search_req + .filter_mode + .try_into() + .unwrap_or(FilterMode::Global); + let proto_context = search_req.context; + + debug!( + "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", + query, + query_id, + filter_mode.as_str_name(), + proto_context + ); + + // Convert proto FilterMode + context to IndexFilterMode + let index_filter = convert_filter_mode(filter_mode, &proto_context); + + // Build QueryContext from proto context + let query_context = proto_context + .map(|ctx| QueryContext { + cwd: Some(with_trailing_slash(&ctx.cwd)), + git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), + hostname: Some(ctx.hostname), + session_id: Some(ctx.session_id), + }) + .unwrap_or_default(); + + // Perform the search + let history_ids = + span!(Level::TRACE, "daemon_search_query", %query, query_id) + .in_scope(|| async { + let index = index.read().await; + index + .search(&query, index_filter, &query_context, RESULTS_LIMIT) + .await + }) + .await; + + // Convert history IDs to bytes + let ids: Vec> = history_ids + .iter() + .filter_map(|id| { + Uuid::parse_str(id) + .ok() + .map(|uuid| uuid.as_bytes().to_vec()) + }) + .collect(); + + if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + // Convert receiver to stream + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream))) + } +} + +/// Convert proto FilterMode and context to IndexFilterMode. +fn convert_filter_mode( + mode: FilterMode, + context: &Option, +) -> IndexFilterMode { + match (mode, context) { + (FilterMode::Global, _) => IndexFilterMode::Global, + (FilterMode::Directory, Some(ctx)) => { + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + (FilterMode::Workspace, Some(ctx)) => { + if let Some(ref git_root) = ctx.git_root { + IndexFilterMode::Workspace(with_trailing_slash(git_root)) + } else { + // Fall back to directory if no git root + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + } + (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), + (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), + (FilterMode::SessionPreload, Some(ctx)) => { + // SessionPreload is similar to Session - filter by session + IndexFilterMode::Session(ctx.session_id.clone()) + } + // If no context provided, fall back to global + _ => IndexFilterMode::Global, + } +} + +#[cfg(windows)] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('\\') { + s.to_string() + } else { + format!("{}\\", s) + } +} + +#[cfg(not(windows))] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s.to_string() + } else { + format!("{}/", s) + } +} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs new file mode 100644 index 00000000..6217706a --- /dev/null +++ b/crates/atuin-daemon/src/components/sync.rs @@ -0,0 +1,257 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; +use atuin_dotfiles::store::{AliasStore, var::VarStore}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option>, + command_tx: Option>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key); + let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = ticker.tick() => { + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + alias_store: &AliasStore, + var_store: &VarStore, + ticker: &mut time::Interval, + max_interval: f64, +) { + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return; + } + + // Perform the sync + let res = sync::sync(&settings, handle.store()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + + tracing::error!("backing off, next sync tick in {new_interval}"); + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Rebuild alias and var stores + if let Err(e) = alias_store.build().await { + tracing::error!("failed to rebuild alias store: {e}"); + } + if let Err(e) = var_store.build().await { + tracing::error!("failed to rebuild var store: {e}"); + } + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + } + } +} diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs new file mode 100644 index 00000000..afb29c57 --- /dev/null +++ b/crates/atuin-daemon/src/control/mod.rs @@ -0,0 +1,12 @@ +//! Control module for external event injection. +//! +//! This module provides the gRPC service that allows external processes +//! (like CLI commands) to inject events into the daemon's event bus. + +mod service; + +// Include the generated proto code +tonic::include_proto!("control"); + +// Re-export the service +pub use service::ControlService; diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs new file mode 100644 index 00000000..2e7403ce --- /dev/null +++ b/crates/atuin-daemon/src/control/service.rs @@ -0,0 +1,71 @@ +//! Control service implementation. +//! +//! This gRPC service allows external processes (like CLI commands) to inject +//! events into the daemon's event bus. + +use atuin_client::history::HistoryId; +use tonic::{Request, Response, Status}; +use tracing::{Level, info, instrument}; + +use super::{ + SendEventRequest, SendEventResponse, + control_server::{Control, ControlServer}, + send_event_request::Event, +}; +use crate::{daemon::DaemonHandle, events::DaemonEvent}; + +/// The Control gRPC service. +/// +/// This service is used by external processes to inject events into the daemon. +/// It's not a component - it's part of the daemon's core infrastructure. +pub struct ControlService { + handle: DaemonHandle, +} + +impl ControlService { + /// Create a new control service with the given daemon handle. + pub fn new(handle: DaemonHandle) -> Self { + Self { handle } + } + + /// Get a tonic server for this service. + pub fn into_server(self) -> ControlServer { + ControlServer::new(self) + } +} + +#[tonic::async_trait] +impl Control for ControlService { + #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] + async fn send_event( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let event = req + .event + .ok_or_else(|| Status::invalid_argument("event is required"))?; + + let daemon_event = proto_event_to_daemon_event(event)?; + + info!(?daemon_event, "received control event"); + self.handle.emit(daemon_event); + + Ok(Response::new(SendEventResponse {})) + } +} + +/// Convert a proto event to a daemon event. +fn proto_event_to_daemon_event(event: Event) -> Result { + match event { + Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), + Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), + Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { + ids: e.ids.into_iter().map(HistoryId).collect(), + }), + Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), + Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), + Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), + } +} diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs new file mode 100644 index 00000000..ec0b7b68 --- /dev/null +++ b/crates/atuin-daemon/src/daemon.rs @@ -0,0 +1,450 @@ +//! Core daemon infrastructure. +//! +//! This module provides the foundational types for building the atuin daemon: +//! +//! - [`DaemonState`]: Shared state owned by the daemon +//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state +//! - [`Component`]: A trait for implementing daemon components +//! - [`Daemon`]: The main daemon orchestrator +//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon + +use std::sync::Arc; + +use atuin_client::{ + database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore, + settings::Settings, +}; +use eyre::{Context, Result}; +use tokio::sync::{RwLock, broadcast}; + +use crate::events::DaemonEvent; + +// ============================================================================ +// DaemonState +// ============================================================================ + +/// Shared state owned by the daemon. +/// +/// This contains all the resources that components and services need access to. +/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`]. +pub struct DaemonState { + // Event bus + event_tx: broadcast::Sender, + + // Configuration (mutable - can be reloaded) + settings: RwLock, + + // Encryption key (immutable - derived at startup) + encryption_key: [u8; 32], + + // Database handles + history_db: HistoryDatabase, + store: SqliteStore, +} + +// ============================================================================ +// DaemonHandle +// ============================================================================ + +/// A lightweight handle to the daemon's shared state. +/// +/// This is the primary way for components, gRPC services, and spawned tasks to +/// interact with the daemon. It provides access to: +/// +/// - Event emission and subscription +/// - Configuration (settings, encryption key) +/// - Database handles +/// +/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed +/// around to any code that needs daemon access. +/// +/// # Example +/// +/// ```ignore +/// // Emit an event +/// handle.emit(DaemonEvent::HistoryPruned); +/// +/// // Access settings +/// let settings = handle.settings().await; +/// let sync_freq = settings.daemon.sync_frequency; +/// +/// // Access database +/// let history = handle.history_db().load(id).await?; +/// ``` +#[derive(Clone)] +pub struct DaemonHandle { + state: Arc, +} + +impl DaemonHandle { + // ---- Events ---- + + /// Emit an event to the daemon's event bus. + /// + /// This is fire-and-forget - if no receivers are listening (which shouldn't + /// happen in normal operation), the event is dropped silently. + pub fn emit(&self, event: DaemonEvent) { + if let Err(e) = self.state.event_tx.send(event) { + tracing::warn!("failed to emit event (no receivers?): {e}"); + } + } + + /// Subscribe to the event bus. + /// + /// Returns a receiver that will receive all events emitted after this call. + /// Useful for components that need to listen for events outside of the + /// normal `handle_event` callback flow. + pub fn subscribe(&self) -> broadcast::Receiver { + self.state.event_tx.subscribe() + } + + /// Request graceful shutdown of the daemon. + pub fn shutdown(&self) { + self.emit(DaemonEvent::ShutdownRequested); + } + + // ---- Configuration ---- + + /// Get the current settings. + /// + /// This acquires a read lock on the settings. For most use cases, clone + /// the settings if you need to hold onto them. + pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> { + self.state.settings.read().await + } + + /// Reload settings from disk and emit a SettingsReloaded event. + /// + /// Components listening for `SettingsReloaded` can then re-read settings + /// via `handle.settings()` to pick up the changes. + pub async fn reload_settings(&self) -> Result<()> { + let new_settings = Settings::new()?; + *self.state.settings.write().await = new_settings; + self.emit(DaemonEvent::SettingsReloaded); + tracing::info!("settings reloaded"); + Ok(()) + } + + /// Get the encryption key. + pub fn encryption_key(&self) -> &[u8; 32] { + &self.state.encryption_key + } + + // ---- Database ---- + + /// Get a reference to the history database. + pub fn history_db(&self) -> &HistoryDatabase { + &self.state.history_db + } + + /// Get a reference to the record store. + pub fn store(&self) -> &SqliteStore { + &self.state.store + } +} + +impl std::fmt::Debug for DaemonHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DaemonHandle").finish_non_exhaustive() + } +} + +// ============================================================================ +// Component Trait +// ============================================================================ + +/// A daemon component that handles a specific domain. +/// +/// Components are the building blocks of the daemon. Each component: +/// +/// - Has a unique name for logging and debugging +/// - Can optionally expose gRPC services +/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources +/// - Handles events from the event bus +/// - Performs cleanup on shutdown +/// +/// # Lifecycle +/// +/// 1. **Construction**: Component is created (usually via `new()`) +/// 2. **Start**: `start()` is called with a [`DaemonHandle`] +/// 3. **Running**: `handle_event()` is called for each event on the bus +/// 4. **Shutdown**: `stop()` is called for cleanup +/// +/// # Example +/// +/// ```ignore +/// pub struct MyComponent { +/// handle: Option, +/// } +/// +/// #[async_trait] +/// impl Component for MyComponent { +/// fn name(&self) -> &'static str { "my-component" } +/// +/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> { +/// self.handle = Some(handle); +/// Ok(()) +/// } +/// +/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { +/// match event { +/// DaemonEvent::SomeEvent => { +/// // Handle the event +/// if let Some(handle) = &self.handle { +/// handle.emit(DaemonEvent::ResponseEvent); +/// } +/// } +/// _ => {} +/// } +/// Ok(()) +/// } +/// +/// async fn stop(&mut self) -> Result<()> { +/// Ok(()) +/// } +/// } +/// ``` +#[tonic::async_trait] +pub trait Component: Send + Sync { + /// Human-readable name for logging and debugging. + fn name(&self) -> &'static str; + + /// Called once at startup. + /// + /// Store the handle if you need to emit events or access daemon resources + /// later. The handle is cheaply cloneable, so feel free to clone it for + /// spawned tasks. + async fn start(&mut self, handle: DaemonHandle) -> Result<()>; + + /// Handle an incoming event. + /// + /// Called for every event on the bus. To emit new events in response, + /// use the handle stored during `start()`. Events emitted here will be + /// processed in subsequent event loop iterations. + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; + + /// Called on graceful shutdown. + /// + /// Use this to clean up resources, abort spawned tasks, etc. + async fn stop(&mut self) -> Result<()>; +} + +// ============================================================================ +// Daemon +// ============================================================================ + +/// The main daemon orchestrator. +/// +/// The daemon manages components, runs the event loop, and coordinates startup +/// and shutdown. It is constructed via [`DaemonBuilder`]. +/// +/// # Event Loop +/// +/// The daemon runs a simple event loop: +/// +/// 1. Wait for an event on the bus +/// 2. Dispatch the event to all components (in registration order) +/// 3. Components may emit new events in response +/// 4. Repeat until `ShutdownRequested` is received +/// +/// Events emitted during handling are queued and processed in subsequent +/// iterations, ensuring the loop eventually drains. +pub struct Daemon { + components: Vec>, + handle: DaemonHandle, +} + +impl Daemon { + /// Create a new daemon builder. + pub fn builder(settings: Settings) -> DaemonBuilder { + DaemonBuilder::new(settings) + } + + /// Get a clone of the daemon handle. + /// + /// The handle can be used to emit events, access settings, etc. + pub fn handle(&self) -> DaemonHandle { + self.handle.clone() + } + + /// Start all components. + /// + /// This must be called before `run_event_loop()`. It initializes all + /// registered components with the daemon handle. + pub async fn start_components(&mut self) -> Result<()> { + for component in &mut self.components { + tracing::info!(component = component.name(), "starting component"); + component + .start(self.handle.clone()) + .await + .with_context(|| format!("failed to start component: {}", component.name()))?; + } + Ok(()) + } + + /// Run the daemon event loop. + /// + /// This processes events until a ShutdownRequested event is received. + /// Components must be started first via `start_components()`. + pub async fn run_event_loop(&mut self) -> Result<()> { + let mut event_rx = self.handle.subscribe(); + loop { + match event_rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => { + tracing::info!("shutdown requested, stopping daemon"); + break; + } + Ok(event) => { + tracing::debug!(?event, "processing event"); + self.dispatch_event(&event).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "event receiver lagged, some events were dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::info!("event bus closed, stopping daemon"); + break; + } + } + } + Ok(()) + } + + /// Stop all components. + /// + /// This performs graceful shutdown of all components. + pub async fn stop_components(&mut self) { + for component in &mut self.components { + tracing::info!(component = component.name(), "stopping component"); + if let Err(e) = component.stop().await { + tracing::error!( + component = component.name(), + error = ?e, + "error stopping component" + ); + } + } + tracing::info!("all components stopped"); + } + + /// Run the daemon. + /// + /// This is a convenience method that starts components, runs the event loop, + /// and handles shutdown. It does not return until the daemon is shut down. + pub async fn run(mut self) -> Result<()> { + self.start_components().await?; + self.run_event_loop().await?; + self.stop_components().await; + tracing::info!("daemon stopped"); + Ok(()) + } + + async fn dispatch_event(&mut self, event: &DaemonEvent) { + for component in &mut self.components { + if let Err(e) = component.handle_event(event).await { + tracing::error!( + component = component.name(), + error = ?e, + "error handling event" + ); + } + } + } +} + +// ============================================================================ +// DaemonBuilder +// ============================================================================ + +/// Builder for constructing a [`Daemon`]. +/// +/// # Example +/// +/// ```ignore +/// let daemon = Daemon::builder(settings) +/// .store(store) +/// .history_db(history_db) +/// .component(HistoryComponent::new()) +/// .component(SearchComponent::new()) +/// .component(SyncComponent::new()) +/// .build() +/// .await?; +/// +/// daemon.run().await?; +/// ``` +pub struct DaemonBuilder { + settings: Settings, + store: Option, + history_db: Option, + components: Vec>, +} + +impl DaemonBuilder { + /// Create a new daemon builder with the given settings. + pub fn new(settings: Settings) -> Self { + Self { + settings, + store: None, + history_db: None, + components: Vec::new(), + } + } + + /// Set the record store. + pub fn store(mut self, store: SqliteStore) -> Self { + self.store = Some(store); + self + } + + /// Set the history database. + pub fn history_db(mut self, db: HistoryDatabase) -> Self { + self.history_db = Some(db); + self + } + + /// Register a component. + /// + /// Components are started in registration order and stopped in reverse order. + pub fn component(mut self, component: impl Component + 'static) -> Self { + self.components.push(Box::new(component)); + self + } + + /// Build the daemon. + /// + /// This loads the encryption key and creates the daemon state. + pub async fn build(self) -> Result { + let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?; + let history_db = self + .history_db + .ok_or_else(|| eyre::eyre!("history_db is required"))?; + + // Load encryption key + let encryption_key: [u8; 32] = encryption::load_key(&self.settings) + .context("could not load encryption key")? + .into(); + + // Create the event bus + let (event_tx, _) = broadcast::channel(64); + + // Create the shared state + let state = Arc::new(DaemonState { + event_tx, + settings: RwLock::new(self.settings), + encryption_key, + history_db, + store, + }); + + // Create the handle (just a reference to the state) + let handle = DaemonHandle { state }; + + Ok(Daemon { + components: self.components, + handle, + }) + } +} diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs new file mode 100644 index 00000000..4e6c6ff3 --- /dev/null +++ b/crates/atuin-daemon/src/events.rs @@ -0,0 +1,74 @@ +//! Daemon events. +//! +//! Events are the primary communication mechanism within the daemon. +//! Components emit events to notify others of state changes, and handle +//! events to react to changes elsewhere in the system. +//! +//! External processes (like CLI commands) can also inject events via the +//! Control gRPC service. + +use atuin_client::history::{History, HistoryId}; +use atuin_common::record::RecordId; + +/// Events that flow through the daemon's event bus. +/// +/// Events are broadcast to all components. Each component decides which +/// events it cares about in its `handle_event` implementation. +#[derive(Debug, Clone)] +pub enum DaemonEvent { + // ---- History lifecycle ---- + /// A command has started running. + HistoryStarted(History), + + /// A command has finished running. + HistoryEnded(History), + + // ---- Sync ---- + /// Records were synced from the server. + /// + /// The search component uses this to update its index with new history. + RecordsAdded(Vec), + + /// Sync completed successfully. + SyncCompleted { + /// Number of records uploaded. + uploaded: usize, + /// Number of records downloaded. + downloaded: usize, + }, + + /// Sync failed. + SyncFailed { + /// Error message describing what went wrong. + error: String, + }, + + /// Request an immediate sync (external trigger). + ForceSync, + + // ---- External commands ---- + /// History was pruned - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin history prune` or similar. + HistoryPruned, + + /// History was rebuilt - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin store rebuild history` or similar. + HistoryRebuilt, + + /// Specific history items were deleted. + /// + /// The search component should remove these from its index. + HistoryDeleted { + /// IDs of the deleted history entries. + ids: Vec, + }, + + /// Settings have changed, components should reload if needed. + SettingsReloaded, + + // ---- Lifecycle ---- + /// Request graceful shutdown of the daemon. + ShutdownRequested, +} diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs deleted file mode 100644 index 57f5b2cf..00000000 --- a/crates/atuin-daemon/src/history.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs new file mode 100644 index 00000000..b71853df --- /dev/null +++ b/crates/atuin-daemon/src/history/mod.rs @@ -0,0 +1,6 @@ +//! History module for the daemon gRPC history service. +//! +//! This module contains the proto-generated types for the history gRPC service. + +// Include the generated proto code +tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs index e00060bc..6dc04db3 100644 --- a/crates/atuin-daemon/src/lib.rs +++ b/crates/atuin-daemon/src/lib.rs @@ -1,3 +1,110 @@ +use atuin_client::database::Sqlite as HistoryDatabase; +use atuin_client::{record::sqlite_store::SqliteStore, settings::Settings}; +use eyre::Result; + pub mod client; +pub mod components; +pub mod control; +pub mod daemon; +pub mod events; pub mod history; +pub mod search; pub mod server; + +// Re-export core daemon types for convenience +pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle}; +pub use events::DaemonEvent; + +// Re-export components +pub use components::{HistoryComponent, SearchComponent, SyncComponent}; + +// Re-export client helpers +pub use client::{ControlClient, emit_event, emit_event_with_settings}; + +/// Boot the daemon using the new component-based architecture. +/// +/// This creates a daemon with the standard components (history, search, sync), +/// starts the gRPC server with their services, and runs the event loop. +pub async fn boot( + settings: Settings, + store: SqliteStore, + history_db: HistoryDatabase, +) -> Result<()> { + // Create the components + let history_component = HistoryComponent::new(); + let search_component = SearchComponent::new(); + let sync_component = SyncComponent::new(); + + // Get the gRPC services before moving components into the daemon + // (The services share state with the components via Arc) + let history_service = history_component.grpc_service(); + let search_service = search_component.grpc_service(); + + // Build the daemon + let mut daemon = Daemon::builder(settings.clone()) + .store(store) + .history_db(history_db) + .component(history_component) + .component(search_component) + .component(sync_component) + .build() + .await?; + + // Get a handle for the control service and gRPC server shutdown + let handle = daemon.handle(); + + // Create the control service + let control_service = control::ControlService::new(handle.clone()); + + // Start all components first (so gRPC services can work) + daemon.start_components().await?; + + // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM + let signal_handle = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + tracing::info!("received shutdown signal"); + signal_handle.shutdown(); + }); + + // Start the gRPC server in the background + server::run_grpc_server( + settings, + history_service, + search_service, + control_service.into_server(), + handle, + ) + .await?; + + // Run the daemon event loop + daemon.run_event_loop().await?; + + // Stop all components on shutdown + daemon.stop_components().await; + + tracing::info!("daemon shut down complete"); + Ok(()) +} + +/// Wait for a shutdown signal (Ctrl+C or SIGTERM). +#[cfg(unix)] +async fn shutdown_signal() { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register sigterm handler"); + let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register sigint handler"); + + tokio::select! { + _ = term.recv() => {}, + _ = int.recv() => {}, + } +} + +/// Wait for a shutdown signal (Ctrl+C). +#[cfg(not(unix))] +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c"); +} diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs new file mode 100644 index 00000000..b15b057f --- /dev/null +++ b/crates/atuin-daemon/src/search/index.rs @@ -0,0 +1,572 @@ +//! Search index with frecency-based ranking. +//! +//! This module provides a deduplicated search index where each unique command +//! is stored once, with metadata about all its invocations. This enables: +//! +//! - Efficient fuzzy matching (fewer items to match) +//! - Frecency-based ranking (frequency + recency) +//! - Dynamic filtering by directory, host, session, etc. + +use std::{collections::HashMap, sync::Arc}; + +use atuin_client::history::History; +use dashmap::{DashMap, DashSet}; +use nucleo::{Injector, Nucleo, pattern}; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{Level, instrument}; + +use crate::components::search::with_trailing_slash; + +/// Data for a single invocation of a command. +#[derive(Debug, Clone)] +pub struct Invocation { + /// When the command was run. + pub timestamp: i64, + /// The working directory when the command was run. + #[allow(dead_code)] + pub cwd: String, + /// The hostname where the command was run. + #[allow(dead_code)] + pub hostname: String, + /// The session ID. + #[allow(dead_code)] + pub session: String, + /// The history entry ID (for returning in search results). + pub history_id: String, +} + +impl From<&History> for Invocation { + fn from(history: &History) -> Self { + Self { + timestamp: history.timestamp.unix_timestamp(), + cwd: history.cwd.clone(), + hostname: history.hostname.clone(), + session: history.session.clone(), + history_id: history.id.0.clone(), + } + } +} + +/// Pre-computed frecency data for O(1) lookup. +#[derive(Debug, Clone, Default)] +pub struct FrecencyData { + /// Total number of times this command was used. + pub count: u32, + /// Most recent usage timestamp (unix seconds). + pub last_used: i64, +} + +impl FrecencyData { + /// Record a new usage of this command. + pub fn record_use(&mut self, timestamp: i64) { + self.count += 1; + if timestamp > self.last_used { + self.last_used = timestamp; + } + } + + /// Compute frecency score based on count and recency. + /// + /// Uses a decay function where more recent commands score higher. + /// The formula balances frequency (how often) with recency (how recent). + #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] + pub fn compute(&self, now: i64) -> u32 { + if self.count == 0 { + return 0; + } + + // Time-based decay: score decreases as time passes + let age_seconds = (now - self.last_used).max(0) as u64; + let age_hours = age_seconds / 3600; + + // Decay factor: recent commands get higher scores + // - Last hour: multiplier ~1.0 + // - Last day: multiplier ~0.5 + // - Last week: multiplier ~0.1 + // - Older: multiplier approaches 0 + let recency_score = match age_hours { + 0 => 100, + 1..=6 => 90, + 7..=24 => 70, + 25..=72 => 50, + 73..=168 => 30, + 169..=720 => 15, + _ => 5, + }; + + // Frequency boost: more uses = higher score (with diminishing returns) + let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0) as u32; + + // Combined score + recency_score + frequency_score + } +} + +/// Data for a unique command, including all its invocations. +pub struct CommandData { + /// The command text (stored for debugging/logging purposes). + #[allow(dead_code)] + pub command: String, + /// All invocations of this command, sorted by timestamp (newest first). + pub invocations: Vec, + /// Pre-computed global frecency. + pub global_frecency: FrecencyData, + + // Pre-computed indexes for O(1) filter lookups + /// All directories where this command has been run. + directories: DashSet, + /// All hostnames where this command has been run. + hosts: DashSet, + /// All sessions where this command has been run. + sessions: DashSet, +} + +impl CommandData { + /// Create a new CommandData from a history entry. + pub fn new(history: &History) -> Self { + let mut data = Self { + command: history.command.clone(), + invocations: Vec::new(), + global_frecency: FrecencyData::default(), + directories: DashSet::new(), + hosts: DashSet::new(), + sessions: DashSet::new(), + }; + data.add_invocation(history); + data + } + + /// Add an invocation from a history entry. + pub fn add_invocation(&mut self, history: &History) { + let timestamp = history.timestamp.unix_timestamp(); + + // Update global frecency + self.global_frecency.record_use(timestamp); + + // Update pre-computed indexes for O(1) filter lookups + self.directories.insert(with_trailing_slash(&history.cwd)); + self.hosts.insert(history.hostname.clone()); + self.sessions.insert(history.session.clone()); + + let invocation = Invocation::from(history); + + // Insert sorted by timestamp (newest first) + let pos = self + .invocations + .iter() + .position(|inv| inv.timestamp < timestamp) + .unwrap_or(self.invocations.len()); + self.invocations.insert(pos, invocation); + } + + /// Get the most recent history ID for this command. + pub fn most_recent_id(&self) -> Option<&str> { + self.invocations.first().map(|inv| inv.history_id.as_str()) + } + + /// Check if any invocation matches a directory filter (exact match). + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_dir(&self, dir: &str) -> bool { + self.directories.contains(dir) + } + + /// Check if any invocation matches a directory prefix (workspace/git root). + /// O(n) where n = number of unique directories for this command. + pub fn has_invocation_in_workspace(&self, prefix: &str) -> bool { + self.directories.iter().any(|d| d.starts_with(prefix)) + } + + /// Check if any invocation matches a hostname. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_on_host(&self, hostname: &str) -> bool { + self.hosts.contains(hostname) + } + + /// Check if any invocation matches a session. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_session(&self, session: &str) -> bool { + self.sessions.contains(session) + } +} + +/// Filter mode for search queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum IndexFilterMode { + /// No filtering - search all commands. + Global, + /// Filter to commands run in a specific directory. + Directory(String), + /// Filter to commands run in a workspace (directory prefix). + Workspace(String), + /// Filter to commands run on a specific host. + Host(String), + /// Filter to commands run in a specific session. + Session(String), +} + +/// Context for search queries. +#[derive(Debug, Clone, Default)] +pub struct QueryContext { + pub cwd: Option, + pub git_root: Option, + pub hostname: Option, + pub session_id: Option, +} + +/// A deduplicated search index with frecency-based ranking. +/// +/// Commands are stored by their text, with metadata about all invocations. +/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. +/// +/// Global frecency is precomputed by a background task and used for scoring. +/// If frecency data is not available, search still works but without frecency ranking; +/// although this should never happen due to precomputing the frecency map. +pub struct SearchIndex { + /// Map from command text to command data. + /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. + commands: Arc>, + /// Nucleo fuzzy matcher - items are command strings. + nucleo: RwLock>, + /// Injector for adding new commands to Nucleo. + injector: Injector, + /// Precomputed global frecency map (command -> frecency score). + /// Updated by background task. If None, search works without frecency. + frecency_map: RwLock>>>, +} + +impl SearchIndex { + /// Create a new empty search index. + pub fn new() -> Self { + let nucleo_config = nucleo::Config::DEFAULT; + // Single column for command text + let nucleo = Nucleo::::new(nucleo_config, Arc::new(|| {}), None, 1); + let injector = nucleo.injector(); + + Self { + commands: Arc::new(DashMap::new()), + nucleo: RwLock::new(nucleo), + injector, + frecency_map: RwLock::new(None), + } + } + + /// Add a history entry to the index. + /// + /// If the command already exists, updates its invocation data. + /// If it's a new command, adds it to both the map and Nucleo. + pub fn add_history(&self, history: &History) { + let command = &history.command; + + if let Some(mut entry) = self.commands.get_mut(command) { + // Existing command - just update invocations + entry.add_invocation(history); + } else { + // New command - add to both map and Nucleo + let data = CommandData::new(history); + self.commands.insert(command.clone(), data); + self.injector.push(command.clone(), |cmd, cols| { + cols[0] = cmd.clone().into(); + }); + } + // Note: frecency_map is rebuilt by background task, not invalidated here + } + + /// Add multiple history entries to the index. + pub fn add_histories(&self, histories: &[History]) { + for history in histories { + self.add_history(history); + } + } + + /// Get the number of unique commands in the index. + pub fn command_count(&self) -> usize { + self.commands.len() + } + + /// Get the number of items in Nucleo (should match command_count). + pub async fn nucleo_item_count(&self) -> u32 { + self.nucleo.read().await.snapshot().item_count() + } + + /// Search for commands matching a query. + /// + /// Returns a list of history IDs (most recent invocation per command). + /// Uses precomputed global frecency for scoring if available. + #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] + pub async fn search( + &self, + query: &str, + filter_mode: IndexFilterMode, + _context: &QueryContext, + limit: u32, + ) -> Vec { + let mut nucleo = self.nucleo.write().await; + + // Get precomputed frecency map (may be None if not yet computed) + let frecency_map = self.frecency_map.read().await.clone(); + + // Build filter based on mode + let filter = self.build_filter(&filter_mode); + nucleo.set_filter(filter); + + // Build scorer from precomputed frecency (or None if not available) + let scorer = Self::build_scorer(frecency_map); + nucleo.set_scorer(scorer); + + // Update pattern + nucleo.pattern.reparse( + 0, + query, + pattern::CaseMatching::Smart, + pattern::Normalization::Smart, + false, + ); + + tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { + // Tick until complete + while nucleo.tick(10).running {} + }); + + // Collect results + let snapshot = nucleo.snapshot(); + let matched_count = snapshot.matched_item_count().min(limit); + + tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { + snapshot + .matched_items(..matched_count) + .filter_map(|item| { + let cmd = item.data; + self.commands + .get(cmd) + .and_then(|data| data.most_recent_id().map(|s| s.to_string())) + }) + .collect() + }) + } + + /// Rebuild the global frecency map. + /// + /// This should be called by a background task periodically. + /// The map is used for scoring search results. + #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] + pub async fn rebuild_frecency(&self) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let mut frecency_map: HashMap = HashMap::new(); + + for entry in self.commands.iter() { + let frecency = entry.global_frecency.compute(now); + frecency_map.insert(entry.key().clone(), frecency); + } + + *self.frecency_map.write().await = Some(Arc::new(frecency_map)); + } + + /// Build filter predicate for the given mode. + fn build_filter(&self, mode: &IndexFilterMode) -> Option> { + // For Global mode, no filter needed + if matches!(mode, IndexFilterMode::Global) { + return None; + } + + // Pre-compute which commands pass the filter + let passing_commands: Arc> = { + let mut set = std::collections::HashSet::new(); + for entry in self.commands.iter() { + let passes = match mode { + IndexFilterMode::Global => unreachable!(), + IndexFilterMode::Directory(dir) => entry.has_invocation_in_dir(dir), + IndexFilterMode::Workspace(prefix) => entry.has_invocation_in_workspace(prefix), + IndexFilterMode::Host(hostname) => entry.has_invocation_on_host(hostname), + IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), + }; + if passes { + set.insert(entry.key().clone()); + } + } + Arc::new(set) + }; + + Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) + } + + /// Build scorer from precomputed frecency map. + /// + /// Returns None if frecency map is not available (search still works, just without frecency ranking). + fn build_scorer( + frecency_map: Option>>, + ) -> Option> { + let map = frecency_map?; + Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { + let frecency = map.get(cmd).copied().unwrap_or(0); + fuzzy_score + frecency + })) + } +} + +impl Default for SearchIndex { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::macros::datetime; + + fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History { + History::import() + .timestamp(timestamp) + .command(command) + .cwd(cwd) + .build() + .into() + } + + #[test] + fn frecency_data_compute() { + let now = 1000000i64; + + // Recent command + let recent = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago + }; + assert!(recent.compute(now) > 100); // High score + + // Old command + let old = FrecencyData { + count: 5, + last_used: now - 86400 * 30, // 30 days ago + }; + assert!(old.compute(now) < recent.compute(now)); + + // Frequently used old command + let frequent_old = FrecencyData { + count: 100, + last_used: now - 86400 * 7, // 1 week ago + }; + // Should still have decent score due to frequency + assert!(frequent_old.compute(now) > 50); + } + + #[test] + fn command_data_add_invocation() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&history1); + assert_eq!(data.invocations.len(), 1); + assert_eq!(data.global_frecency.count, 1); + + data.add_invocation(&history2); + assert_eq!(data.invocations.len(), 2); + assert_eq!(data.global_frecency.count, 2); + + // Most recent should be first + assert_eq!(data.invocations[0].cwd, dir2); + assert_eq!(data.invocations[1].cwd, dir1); + } + + #[test] + fn command_data_filters() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&h1); + data.add_invocation(&h2); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User\\project"), + with_trailing_slash("C:\\Users\\User\\other"), + with_trailing_slash("C:\\Users\\User\\missing"), + ) + } else { + ( + with_trailing_slash("/home/user/project"), + with_trailing_slash("/home/user/other"), + with_trailing_slash("/home/user/missing"), + ) + }; + + assert!(data.has_invocation_in_dir(&check1)); + assert!(data.has_invocation_in_dir(&check2)); + assert!(!data.has_invocation_in_dir(&check3)); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User"), + with_trailing_slash("C:\\Users"), + with_trailing_slash("C:\\Users\\User\\var"), + ) + } else { + ( + with_trailing_slash("/home/user"), + with_trailing_slash("/home"), + with_trailing_slash("/var"), + ) + }; + + assert!(data.has_invocation_in_workspace(&check1)); + assert!(data.has_invocation_in_workspace(&check2)); + assert!(!data.has_invocation_in_workspace(&check3)); + } + + #[tokio::test] + async fn search_index_add_and_search() { + let index = SearchIndex::new(); + + let h1 = make_history( + "git status", + "/home/user/project", + datetime!(2024-01-01 10:00 UTC), + ); + let h2 = make_history( + "git commit -m 'test'", + "/home/user/project", + datetime!(2024-01-01 10:05 UTC), + ); + let h3 = make_history( + "ls -la", + "/home/user/other", + datetime!(2024-01-01 10:10 UTC), + ); + + index.add_history(&h1); + index.add_history(&h2); + index.add_history(&h3); + + assert_eq!(index.command_count(), 3); + + // Search for "git" - should match 2 commands + let results = index + .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) + .await; + assert_eq!(results.len(), 2); + + // Search with directory filter + let results = index + .search( + "", + IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), + &QueryContext::default(), + 10, + ) + .await; + assert_eq!(results.len(), 2); // git status and git commit + } +} diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs new file mode 100644 index 00000000..4d261956 --- /dev/null +++ b/crates/atuin-daemon/src/search/mod.rs @@ -0,0 +1,11 @@ +//! Search module for the daemon gRPC search service. +//! +//! This module provides fuzzy search over command history using Nucleo. + +mod index; + +// Include the generated proto code +tonic::include_proto!("search"); + +// Re-export the service and index +pub use index::{IndexFilterMode, QueryContext, SearchIndex}; diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs index 826d6191..a11de612 100644 --- a/crates/atuin-daemon/src/server.rs +++ b/crates/atuin-daemon/src/server.rs @@ -1,249 +1,49 @@ -use eyre::WrapErr; - -use atuin_client::encryption; -use atuin_client::history::store::HistoryStore; -use atuin_client::record::sqlite_store::SqliteStore; -use atuin_client::settings::Settings; -use std::io::ErrorKind; -#[cfg(unix)] -use std::path::PathBuf; -use std::sync::Arc; -use time::OffsetDateTime; -use tokio::sync::watch; -use tracing::{Level, instrument}; - -use atuin_client::database::{Database, Sqlite as HistoryDatabase}; -use atuin_client::history::{History, HistoryId}; -use dashmap::DashMap; use eyre::Result; -use tonic::{Request, Response, Status, transport::Server}; - -use crate::history::history_server::{History as HistorySvc, HistoryServer}; - -use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest}; -use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest}; - -mod sync; - -const DAEMON_PROTOCOL_VERSION: u32 = 1; - -#[derive(Debug)] -pub struct HistoryService { - // A store for WIP history - // This is history that has not yet been completed, aka a command that's current running. - running: Arc>, - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender, -} - -impl HistoryService { - pub fn new( - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender, - ) -> Self { - Self { - running: Arc::new(DashMap::new()), - store, - history_db, - shutdown_tx, - } - } -} - -#[tonic::async_trait()] -impl HistorySvc for HistoryService { - #[instrument(skip_all, level = Level::INFO)] - async fn start_history( - &self, - request: Request, - ) -> Result, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { - Status::invalid_argument( - "failed to parse timestamp as unix time (expected nanos since epoch)", - ) - })?; - - let mut h: History = History::daemon() - .timestamp(timestamp) - .command(req.command) - .cwd(req.cwd) - .session(req.session) - .hostname(req.hostname) - .build() - .into(); - if !req.author.trim().is_empty() { - h.author = req.author; - } - if !req.intent.trim().is_empty() { - h.intent = Some(req.intent); - } - - // The old behaviour had us inserting half-finished history records into the database - // The new behaviour no longer allows that. - // History that's running is stored in-memory by the daemon, and only committed when - // complete. - // If anyone relied on the old behaviour, we could perhaps insert to the history db here - // too. I'd rather keep it pure, unless that ends up being the case. - let id = h.id.clone(); - tracing::info!(id = id.to_string(), "start history"); - running.insert(id.clone(), h); - - let reply = StartHistoryReply { - id: id.to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn end_history( - &self, - request: Request, - ) -> Result, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let id = HistoryId(req.id); - - if let Some((_, mut history)) = running.remove(&id) { - history.exit = req.exit; - history.duration = match req.duration { - 0 => i64::try_from( - (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), - ) - .expect("failed to convert calculated duration to i64"), - value => i64::try_from(value).expect("failed to get i64 duration"), - }; - - // Perhaps allow the incremental build to handle this entirely. - self.history_db - .save(&history) - .await - .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); - - let (id, idx) = - self.store.push(history).await.map_err(|e| { - Status::internal(format!("failed to push record to store: {e:?}")) - })?; - - let reply = EndHistoryReply { - id: id.0.to_string(), - idx, - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - return Ok(Response::new(reply)); - } - - Err(Status::not_found(format!( - "could not find history with id: {id}" - ))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn status( - &self, - _request: Request, - ) -> Result, Status> { - let reply = StatusReply { - // If status RPC responds, the daemon control plane is healthy. - healthy: true, - version: env!("CARGO_PKG_VERSION").to_string(), - pid: std::process::id(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - #[instrument(skip_all, level = Level::INFO)] - async fn shutdown( - &self, - _request: Request, - ) -> Result, Status> { - let _ = self.shutdown_tx.send(true); - Ok(Response::new(ShutdownReply { accepted: true })) - } -} - -#[cfg(unix)] -async fn shutdown_signal(socket: Option, mut shutdown_rx: watch::Receiver) { - let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to register sigterm handler"); - let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("failed to register sigint handler"); +use crate::components::history::HistoryGrpcService; +use crate::components::search::SearchGrpcService; +use crate::control::{ControlService, control_server::ControlServer}; +use crate::daemon::DaemonHandle; +use crate::history::history_server::HistoryServer; +use crate::search::search_server::SearchServer; - tokio::select! { - _ = term.recv() => {}, - _ = int.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - - eprintln!("Removing socket..."); - if let Some(socket) = socket { - match std::fs::remove_file(socket) { - Ok(()) => {} - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - eprintln!("failed to remove socket: {err}"); - } - } - } - eprintln!("Shutting down..."); -} - -#[cfg(windows)] -async fn shutdown_signal(mut shutdown_rx: watch::Receiver) { - let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler"); - tokio::select! { - _ = ctrl_c.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - eprintln!("Shutting down..."); -} +use atuin_client::settings::Settings; +/// Run the gRPC server with the given services. +/// +/// This starts the gRPC server in the background and returns immediately. +/// The server will shut down when a ShutdownRequested event is received. #[cfg(unix)] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver, + history_service: HistoryServer, + search_service: SearchServer, + control_service: ControlServer, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; - let socket_path = settings.daemon.socket_path; + let socket_path = settings.daemon.socket_path.clone(); let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket { #[cfg(target_os = "linux")] { - use eyre::OptionExt; + use eyre::{OptionExt, WrapErr}; + use std::os::unix::net::SocketAddr; + use std::path::PathBuf; tracing::info!("getting systemd socket"); let listener = listenfd::ListenFd::from_env() .take_unix_listener(0)? .ok_or_eyre("missing systemd socket")?; listener.set_nonblocking(true)?; - let actual_path = listener + let actual_path: Result = listener .local_addr() .context("getting systemd socket's path") - .and_then(|addr| { + .and_then(|addr: SocketAddr| { addr.as_pathname() .ok_or_eyre("systemd socket missing path") - .map(|path| path.to_owned()) + .map(|path: &std::path::Path| path.to_owned()) }); match actual_path { Ok(actual_path) => { @@ -271,66 +71,94 @@ async fn start_server( let uds_stream = UnixListenerStream::new(uds); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown( - uds_stream, - shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx), - ) - .await?; + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + let mut rx = handle.subscribe(); + loop { + use crate::DaemonEvent; + + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + if cleanup { + eprintln!("Removing socket..."); + if let Err(e) = std::fs::remove_file(&socket_path) + && e.kind() != std::io::ErrorKind::NotFound + { + eprintln!("failed to remove socket: {e}"); + } + } + eprintln!("Shutting down gRPC server..."); + }; + + // Spawn the server in the background + tokio::spawn(async move { + use tonic::transport::Server; + + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(uds_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); Ok(()) } +/// Run the gRPC server with the given services (Windows/TCP version). #[cfg(not(unix))] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver, + history_service: HistoryServer, + search_service: SearchServer, + control_service: ControlServer, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; + use tonic::transport::Server; let port = settings.daemon.tcp_port; let url = format!("127.0.0.1:{port}"); - let tcp = TcpListener::bind(url).await?; + let tcp = TcpListener::bind(&url).await?; let tcp_stream = TcpListenerStream::new(tcp); tracing::info!("listening on tcp port {:?}", port); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx)) - .await?; - Ok(()) -} - -// break the above down when we end up with multiple services - -/// Listen on a unix socket -/// Pass the path to the socket -pub async fn listen( - settings: Settings, - store: SqliteStore, - history_db: HistoryDatabase, -) -> Result<()> { - let encryption_key: [u8; 32] = encryption::load_key(&settings) - .context("could not load encryption key")? - .into(); - - let host_id = Settings::host_id().await?; - let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + use crate::DaemonEvent; - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx); + let mut rx = handle.subscribe(); + loop { + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + eprintln!("Shutting down gRPC server..."); + }; - // start services - tokio::spawn(sync::worker( - settings.clone(), - store, - history_store, - history_db, - )); + // Spawn the server in the background + tokio::spawn(async move { + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(tcp_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); - start_server(settings, history, shutdown_rx).await + Ok(()) } diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs deleted file mode 100644 index e1e49597..00000000 --- a/crates/atuin-daemon/src/server/sync.rs +++ /dev/null @@ -1,96 +0,0 @@ -use eyre::Result; -use rand::Rng; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::database::Sqlite as HistoryDatabase; -use atuin_client::{ - encryption, - history::store::HistoryStore, - record::{sqlite_store::SqliteStore, sync}, - settings::Settings, -}; - -use atuin_dotfiles::store::{AliasStore, var::VarStore}; - -pub async fn worker( - settings: Settings, - store: SqliteStore, - history_store: HistoryStore, - history_db: HistoryDatabase, -) -> Result<()> { - tracing::info!("booting sync worker"); - - let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into(); - let host_id = Settings::host_id().await?; - let alias_store = AliasStore::new(store.clone(), host_id, encryption_key); - let var_store = VarStore::new(store.clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. No bueno! - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - ticker.tick().await; - tracing::info!("sync worker tick"); - - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - continue; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - continue; - } - - let res = sync::sync(&settings, &store).await; - - if let Err(e) = res { - tracing::error!("sync tick failed with {e}"); - - let mut rng = rand::thread_rng(); - - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - ticker = time::interval(time::Duration::from_secs(new_interval as u64)); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - - tracing::error!("backing off, next sync tick in {new_interval}"); - } else { - let (uploaded, downloaded) = res.unwrap(); - - tracing::info!( - uploaded = ?uploaded, - downloaded = ?downloaded, - "sync complete" - ); - - history_store - .incremental_build(&history_db, &downloaded) - .await?; - - alias_store.build().await?; - var_store.build().await?; - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - } - - // store sync time - Settings::save_sync_time().await?; - } - } -} diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs index 56457fa7..3b6952de 100644 --- a/crates/atuin-daemon/tests/lifecycle.rs +++ b/crates/atuin-daemon/tests/lifecycle.rs @@ -8,52 +8,97 @@ mod unix { use std::time::Duration; use atuin_client::database::Sqlite; - use atuin_client::history::store::HistoryStore; use atuin_client::record::sqlite_store::SqliteStore; - use atuin_common::record::HostId; - use atuin_common::utils::uuid_v7; + use atuin_client::settings::{Settings, init_meta_config_for_testing}; use atuin_daemon::client::HistoryClient; - use atuin_daemon::history::history_server::HistoryServer; - use atuin_daemon::server::HistoryService; + use atuin_daemon::components::HistoryComponent; + use atuin_daemon::{Daemon, DaemonHandle}; use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::sync::watch; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; /// Spins up a daemon server on a temp socket and returns a connected client, - /// the shutdown sender, and the temp dir (must be held to keep paths alive). - async fn start_test_daemon() -> (HistoryClient, watch::Sender, TempDir) { + /// the daemon handle (for shutdown), and the temp dir (must be held to keep paths alive). + async fn start_test_daemon() -> (HistoryClient, DaemonHandle, TempDir) { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("history.db"); let record_path = tmp.path().join("records.db"); + let key_path = tmp.path().join("key"); + let socket_path = tmp.path().join("test.sock"); + let meta_path = tmp.path().join("meta.db"); + + // Initialize the meta store config for testing (required for Settings::host_id()) + init_meta_config_for_testing(meta_path.to_str().unwrap(), 5.0); + + // Build settings with test paths + let settings: Settings = Settings::builder() + .expect("could not build settings builder") + .set_override("db_path", db_path.to_str().unwrap()) + .expect("failed to set db_path") + .set_override("record_store_path", record_path.to_str().unwrap()) + .expect("failed to set record_store_path") + .set_override("key_path", key_path.to_str().unwrap()) + .expect("failed to set key_path") + .set_override("daemon.socket_path", socket_path.to_str().unwrap()) + .expect("failed to set socket_path") + .set_override("meta.db_path", meta_path.to_str().unwrap()) + .expect("failed to set meta.db_path") + .build() + .expect("could not build settings") + .try_deserialize() + .expect("could not deserialize settings"); + // Create databases let history_db = Sqlite::new(&db_path, 5.0).await.unwrap(); let store = SqliteStore::new(&record_path, 5.0).await.unwrap(); - let host_id = HostId(uuid_v7()); - let encryption_key = [0u8; 32]; - let history_store = HistoryStore::new(store, host_id, encryption_key); + // Create the history component and get its gRPC service + let history_component = HistoryComponent::new(); + let history_service = history_component.grpc_service(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let service = HistoryService::new(history_store, history_db, shutdown_tx.clone()); + // Build and start the daemon + let mut daemon = Daemon::builder(settings) + .store(store) + .history_db(history_db) + .component(history_component) + .build() + .await + .unwrap(); - let socket_path = tmp.path().join("test.sock"); + let handle = daemon.handle(); + + // Start components (this initializes the history component with the handle) + daemon.start_components().await.unwrap(); + + // Start the gRPC server let uds = UnixListener::bind(&socket_path).unwrap(); let stream = UnixListenerStream::new(uds); - let mut rx = shutdown_rx.clone(); + let server_handle = handle.clone(); tokio::spawn(async move { + let mut rx = server_handle.subscribe(); Server::builder() - .add_service(HistoryServer::new(service)) + .add_service(history_service) .serve_with_incoming_shutdown(stream, async move { - let _ = rx.changed().await; + loop { + match rx.recv().await { + Ok(atuin_daemon::DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, + } + } }) .await .unwrap(); }); + // Spawn the daemon event loop in the background + tokio::spawn(async move { + daemon.run_event_loop().await.unwrap(); + }); + // Give the server a moment to bind. tokio::time::sleep(Duration::from_millis(50)).await; @@ -61,12 +106,12 @@ mod unix { .await .unwrap(); - (client, shutdown_tx, tmp) + (client, handle, tmp) } #[tokio::test] async fn test_status() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let status = client.status().await.unwrap(); assert!(status.healthy); @@ -79,7 +124,7 @@ mod unix { async fn test_start_end_history() { use atuin_client::history::History; - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let history = History::daemon() .timestamp(time::OffsetDateTime::now_utc()) @@ -102,7 +147,7 @@ mod unix { #[tokio::test] async fn test_end_unknown_history_fails() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let result = client .end_history("nonexistent-id".to_string(), 1000, 0) @@ -112,7 +157,7 @@ mod unix { #[tokio::test] async fn test_shutdown() { - let (mut client, _shutdown_tx, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let accepted = client.shutdown().await.unwrap(); assert!(accepted); diff --git a/crates/atuin/Cargo.toml b/crates/atuin/Cargo.toml index e7a0daaa..d8938121 100644 --- a/crates/atuin/Cargo.toml +++ b/crates/atuin/Cargo.toml @@ -78,10 +78,12 @@ open = "5" ratatui = { workspace = true } tracing = "0.1" tracing-subscriber = { workspace = true } +tracing-appender = "0.2" uuid = { workspace = true } sysinfo = "0.30.7" regex = "1.10.5" norm = { version = "0.1.1", features = ["fzf-v2"] } +nucleo-matcher = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" } tempfile = { workspace = true } shlex = "1.3.0" diff --git a/crates/atuin/src/command/client.rs b/crates/atuin/src/command/client.rs index 0cb0a2ae..ba55466d 100644 --- a/crates/atuin/src/command/client.rs +++ b/crates/atuin/src/command/client.rs @@ -1,4 +1,5 @@ -use std::path::PathBuf; +use std::fs::{self, OpenOptions}; +use std::path::{Path, PathBuf}; use clap::Subcommand; use eyre::{Result, WrapErr}; @@ -6,7 +7,38 @@ use eyre::{Result, WrapErr}; use atuin_client::{ database::Sqlite, record::sqlite_store::SqliteStore, settings::Settings, theme, }; -use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::{ + Layer, filter::EnvFilter, filter::LevelFilter, fmt, fmt::format::FmtSpan, prelude::*, +}; + +fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) { + let cutoff = std::time::SystemTime::now() + - std::time::Duration::from_secs(retention_days * 24 * 60 * 60); + + let Ok(entries) = fs::read_dir(log_dir) else { + return; + }; + + for entry in entries.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23" + if !name.starts_with(prefix) || name == prefix { + continue; + } + + if let Ok(metadata) = entry.metadata() + && let Ok(modified) = metadata.modified() + && modified < cutoff + { + let _ = fs::remove_file(&path); + } + } +} #[cfg(feature = "sync")] mod sync; @@ -122,18 +154,157 @@ impl Cmd { res } + #[allow(clippy::too_many_lines)] async fn run_inner( self, mut settings: Settings, mut theme_manager: theme::ThemeManager, ) -> Result<()> { - let filter = + // ATUIN_LOG env var overrides config file level settings + let env_log_set = std::env::var("ATUIN_LOG").is_ok(); + + // Base filter from env var (or empty if not set) + let base_filter = EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?); - tracing_subscriber::registry() - .with(fmt::layer()) - .with(filter) - .init(); + let is_interactive_search = matches!(&self, Self::Search(cmd) if cmd.is_interactive()); + // Use file-based logging for interactive search (TUI mode) + let use_search_logging = is_interactive_search && settings.logs.search_enabled(); + + // Use file-based logging for daemon + #[cfg(feature = "daemon")] + let use_daemon_logging = matches!(&self, Self::Daemon(_)) && settings.logs.daemon_enabled(); + + #[cfg(not(feature = "daemon"))] + let use_daemon_logging = false; + + // Check if daemon should also log to console + #[cfg(feature = "daemon")] + let daemon_show_logs = matches!(&self, Self::Daemon(cmd) if cmd.show_logs()); + + #[cfg(not(feature = "daemon"))] + let daemon_show_logs = false; + + // Set up span timing JSON logs if ATUIN_SPAN is set + let span_path = std::env::var("ATUIN_SPAN").ok().map(|p| { + if p.is_empty() { + "atuin-spans.json".to_string() + } else { + p + } + }); + + // Helper to create span timing layer + macro_rules! make_span_layer { + ($path:expr) => {{ + let span_file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open($path)?; + Some( + fmt::layer() + .json() + .with_writer(span_file) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_filter(LevelFilter::TRACE), + ) + }}; + } + + // Build the subscriber with all configured layers + if use_search_logging { + let search_filename = settings.logs.search.file.clone(); + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + // Clean up old log files + cleanup_old_logs(&log_dir, &search_filename, settings.logs.search_retention()); + + let file_appender = + RollingFileAppender::new(Rotation::DAILY, &log_dir, &search_filename); + + // Use config level unless ATUIN_LOG is set + let filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.search_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let base = tracing_subscriber::registry().with( + fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(filter), + ); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } else if use_daemon_logging { + let daemon_filename = settings.logs.daemon.file.clone(); + let log_dir = PathBuf::from(&settings.logs.dir); + fs::create_dir_all(&log_dir)?; + + // Clean up old log files + cleanup_old_logs(&log_dir, &daemon_filename, settings.logs.daemon_retention()); + + let file_appender = + RollingFileAppender::new(Rotation::DAILY, &log_dir, &daemon_filename); + + // Use config level unless ATUIN_LOG is set + let file_filter = if env_log_set { + base_filter + } else { + EnvFilter::default() + .add_directive(settings.logs.daemon_level().as_directive().parse()?) + .add_directive("sqlx_sqlite::regexp=off".parse()?) + }; + + let file_layer = fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .with_filter(file_filter); + + // Optionally add console layer for --show-logs + if daemon_show_logs { + let console_filter = EnvFilter::from_env("ATUIN_LOG") + .add_directive("sqlx_sqlite::regexp=off".parse()?); + + let console_layer = fmt::layer().with_filter(console_filter); + + let base = tracing_subscriber::registry() + .with(file_layer) + .with(console_layer); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } else { + let base = tracing_subscriber::registry().with(file_layer); + + match &span_path { + Some(sp) => { + base.with(make_span_layer!(sp)).init(); + } + None => { + base.init(); + } + } + } + } tracing::trace!(command = ?self, "client command"); diff --git a/crates/atuin/src/command/client/daemon.rs b/crates/atuin/src/command/client/daemon.rs index a92e8f8e..64547505 100644 --- a/crates/atuin/src/command/client/daemon.rs +++ b/crates/atuin/src/command/client/daemon.rs @@ -9,10 +9,7 @@ use std::time::{Duration, Instant}; use atuin_client::{ database::Sqlite, history::History, record::sqlite_store::SqliteStore, settings::Settings, }; -use atuin_daemon::{ - client::{DaemonClientErrorKind, HistoryClient, classify_error}, - server::listen, -}; +use atuin_daemon::client::{DaemonClientErrorKind, HistoryClient, classify_error}; use clap::Subcommand; #[cfg(unix)] use daemonize::Daemonize; @@ -26,6 +23,10 @@ pub struct Cmd { #[arg(long, hide = true)] daemonize: bool, + /// Also write daemon logs to the console (useful for debugging) + #[arg(long)] + show_logs: bool, + #[command(subcommand)] subcmd: Option, } @@ -37,6 +38,14 @@ pub enum SubCmd { Start { #[arg(long, hide = true)] daemonize: bool, + + /// Also write daemon logs to the console (useful for debugging) + #[arg(long)] + show_logs: bool, + + /// Force start: kill existing daemon process and reset the socket + #[arg(long)] + force: bool, }, /// Show the daemon's current status @@ -55,12 +64,21 @@ impl Cmd { #[cfg(unix)] pub fn should_daemonize(&self) -> bool { match &self.subcmd { - Some(SubCmd::Start { daemonize }) => *daemonize, + Some(SubCmd::Start { daemonize, .. }) => *daemonize, None => self.daemonize, _ => false, } } + /// Returns `true` when logs should also be written to the console. + pub fn show_logs(&self) -> bool { + match &self.subcmd { + Some(SubCmd::Start { show_logs, .. }) => *show_logs, + None => self.show_logs, + _ => false, + } + } + pub async fn run( self, settings: Settings, @@ -70,9 +88,9 @@ impl Cmd { match self.subcmd { None => { eprintln!("Warning: `atuin daemon` is deprecated, use `atuin daemon start`"); - run(settings, store, history_db).await + run(settings, store, history_db, false).await } - Some(SubCmd::Start { .. }) => run(settings, store, history_db).await, + Some(SubCmd::Start { force, .. }) => run(settings, store, history_db, force).await, Some(SubCmd::Status) => status_cmd(&settings).await, Some(SubCmd::Stop) => stop_cmd(&settings).await, Some(SubCmd::Restart) => restart_cmd(&settings).await, @@ -547,15 +565,82 @@ pub fn daemonize_current_process() -> Result<()> { Ok(()) } -async fn run(settings: Settings, store: SqliteStore, history_db: Sqlite) -> Result<()> { +async fn run( + settings: Settings, + store: SqliteStore, + history_db: Sqlite, + force: bool, +) -> Result<()> { + if force { + force_cleanup(&settings); + } + let pidfile_path = PathBuf::from(&settings.daemon.pidfile_path); let _pidfile_guard = PidfileGuard::acquire(&pidfile_path)?; - listen(settings, store, history_db).await?; + atuin_daemon::boot(settings, store, history_db).await?; Ok(()) } +/// Force cleanup: kill existing daemon process and remove socket. +fn force_cleanup(settings: &Settings) { + let pidfile_path = Path::new(&settings.daemon.pidfile_path); + + // Read and kill the existing process if pidfile exists + if pidfile_path.exists() { + if let Ok(contents) = fs::read_to_string(pidfile_path) + && let Some(pid_str) = contents.lines().next() + && let Ok(pid) = pid_str.parse::() + { + kill_process(pid); + // Give it a moment to release resources + std::thread::sleep(Duration::from_millis(100)); + } + + // Remove the pidfile + if let Err(e) = fs::remove_file(pidfile_path) + && e.kind() != ErrorKind::NotFound + { + tracing::warn!("failed to remove pidfile: {e}"); + } + } + + // Remove the socket file + #[cfg(unix)] + { + let socket_path = Path::new(&settings.daemon.socket_path); + if socket_path.exists() + && let Err(e) = fs::remove_file(socket_path) + && e.kind() != ErrorKind::NotFound + { + tracing::warn!("failed to remove socket: {e}"); + } + } +} + +/// Kill a process by PID. +#[cfg(unix)] +fn kill_process(pid: u32) { + // Use kill command to send SIGTERM for graceful shutdown + let _ = Command::new("kill") + .args(["-TERM", &pid.to_string()]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); +} + +/// Kill a process by PID. +#[cfg(not(unix))] +fn kill_process(pid: u32) { + // On Windows, use taskkill + let _ = Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/F"]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/atuin/src/command/client/history.rs b/crates/atuin/src/command/client/history.rs index c20f64a3..fe9a7e32 100644 --- a/crates/atuin/src/command/client/history.rs +++ b/crates/atuin/src/command/client/history.rs @@ -10,6 +10,9 @@ use clap::Subcommand; use eyre::{Context, Result}; use runtime_format::{FormatKey, FormatKeyError, ParseSegment, ParsedFmt}; +#[cfg(feature = "daemon")] +use atuin_daemon::emit_event; + use atuin_client::{ database::{Database, Sqlite, current_context}, encryption, @@ -624,6 +627,9 @@ impl Cmd { db.delete(entry.clone()).await?; } } + + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryPruned).await; } Ok(()) } @@ -670,6 +676,9 @@ impl Cmd { let host_id = Settings::host_id().await?; let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + #[cfg(feature = "daemon")] + let ids = matches.iter().map(|h| h.id.clone()).collect::>(); + for entry in matches { eprintln!("deleting {}", entry.id); if settings.sync.records { @@ -679,6 +688,9 @@ impl Cmd { db.delete(entry).await?; } } + + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryDeleted { ids }).await; } Ok(()) } diff --git a/crates/atuin/src/command/client/search.rs b/crates/atuin/src/command/client/search.rs index 70a25ed9..3f3687b8 100644 --- a/crates/atuin/src/command/client/search.rs +++ b/crates/atuin/src/command/client/search.rs @@ -141,6 +141,11 @@ pub struct Cmd { } impl Cmd { + /// Returns true if this search command will run in interactive (TUI) mode + pub fn is_interactive(&self) -> bool { + self.interactive + } + // clippy: please write this instead // clippy: now it has too many lines // me: I'll do it later OKAY diff --git a/crates/atuin/src/command/client/search/engines.rs b/crates/atuin/src/command/client/search/engines.rs index 5c53817e..8cbee0c3 100644 --- a/crates/atuin/src/command/client/search/engines.rs +++ b/crates/atuin/src/command/client/search/engines.rs @@ -8,12 +8,22 @@ use eyre::Result; use super::cursor::Cursor; +#[cfg(feature = "daemon")] +pub mod daemon; pub mod db; pub mod skim; -pub fn engine(search_mode: SearchMode) -> Box { +#[allow(unused)] // settings is only used if daemon feature is enabled +pub fn engine(search_mode: SearchMode, settings: &Settings) -> Box { match search_mode { SearchMode::Skim => Box::new(skim::Search::new()) as Box<_>, + #[cfg(feature = "daemon")] + SearchMode::DaemonFuzzy => Box::new(daemon::Search::new(settings)) as Box<_>, + #[cfg(not(feature = "daemon"))] + SearchMode::DaemonFuzzy => { + // Fall back to fuzzy mode if daemon feature is not enabled + Box::new(db::Search(SearchMode::Fuzzy)) as Box<_> + } mode => Box::new(db::Search(mode)) as Box<_>, } } diff --git a/crates/atuin/src/command/client/search/engines/daemon.rs b/crates/atuin/src/command/client/search/engines/daemon.rs new file mode 100644 index 00000000..d317a4f6 --- /dev/null +++ b/crates/atuin/src/command/client/search/engines/daemon.rs @@ -0,0 +1,206 @@ +use async_trait::async_trait; +use atuin_client::{ + database::{Database, OptFilters}, + history::History, + settings::{SearchMode, Settings}, +}; +use atuin_daemon::client::SearchClient; +use eyre::Result; +use nucleo_matcher::{ + Config, Matcher, Utf32Str, + pattern::{CaseMatching, Normalization, Pattern}, +}; +use tracing::{Level, debug, instrument, span}; +use uuid::Uuid; + +use super::{SearchEngine, SearchState}; + +pub struct Search { + client: Option, + query_id: u64, + socket_path: String, + #[cfg(not(unix))] + tcp_port: u64, +} + +impl Search { + pub fn new(settings: &Settings) -> Self { + Search { + client: None, + query_id: 0, + socket_path: settings.daemon.socket_path.clone(), + #[cfg(not(unix))] + tcp_port: settings.daemon.tcp_port, + } + } + + #[instrument(skip_all, level = Level::TRACE, name = "get_daemon_client")] + async fn get_client(&mut self) -> Result<&mut SearchClient> { + if self.client.is_none() { + #[cfg(unix)] + let client = SearchClient::new(self.socket_path.clone()).await?; + + #[cfg(not(unix))] + let client = SearchClient::new(self.tcp_port).await?; + + self.client = Some(client); + } + Ok(self.client.as_mut().unwrap()) + } + + fn next_query_id(&mut self) -> u64 { + self.query_id += 1; + self.query_id + } + + /// Check if query contains regex pattern (r/.../) + /// Nucleo doesn't support regex, so we fall back to database search + fn contains_regex_pattern(query: &str) -> bool { + query.starts_with("r/") || query.contains(" r/") + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_db_fallback")] + async fn fallback_to_db_search( + &self, + state: &SearchState, + db: &dyn Database, + ) -> Result> { + let results = db + .search( + SearchMode::FullText, + state.filter_mode, + &state.context, + state.input.as_str(), + OptFilters { + limit: Some(200), + ..Default::default() + }, + ) + .await + .map_or(Vec::new(), |r| r.into_iter().collect()); + Ok(results) + } + + #[instrument(skip_all, level = Level::TRACE, name = "hydrate_from_db", fields(count = ids.len()))] + async fn hydrate_from_db(&self, db: &dyn Database, ids: &[String]) -> Result> { + let placeholders: Vec = ids.iter().map(|id| format!("'{id}'")).collect(); + let sql_query = format!( + "SELECT * FROM history WHERE id IN ({}) ORDER BY timestamp DESC", + placeholders.join(",") + ); + Ok(db.query_history(&sql_query).await?) + } +} + +#[async_trait] +impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "daemon_search", fields(query = %state.input.as_str()))] + async fn full_query( + &mut self, + state: &SearchState, + db: &mut dyn Database, + ) -> Result> { + let query = state.input.as_str().to_string(); + + // Fall back to database for regex queries (Nucleo doesn't support regex) + if Self::contains_regex_pattern(&query) { + debug!(query = %query, "[daemon-client] regex detected, falling back to db"); + return self.fallback_to_db_search(state, db).await; + } + + let query_id = self.next_query_id(); + + let span = + span!(Level::TRACE, "daemon_search.req_resp", query = %query, query_id = query_id); + + let client = self.get_client().await?; + + let _span = span.enter(); + let mut stream = client + .search( + query.clone(), + query_id, + state.filter_mode, + Some(state.context.clone()), + ) + .await?; + + let mut ids = Vec::with_capacity(200); + span!(Level::TRACE, "daemon_search.resp") + .in_scope(async || { + while let Ok(Some(response)) = stream.message().await { + let span2 = span!( + Level::TRACE, + "daemon_search.resp.item", + query_id = response.query_id + ); + let _span2 = span2.enter(); + // Only process if the query_id matches (prevents stale responses) + if response.query_id == query_id { + let uuids = response + .ids + .iter() + .map(|id| { + let bytes: [u8; 16] = + id.as_slice().try_into().expect("id should be 16 bytes"); + Uuid::from_bytes(bytes).as_simple().to_string() + }) + .collect::>(); + ids.extend(uuids); + } + drop(_span2); + drop(span2); + } + }) + .await; + drop(_span); + drop(span); + + if ids.is_empty() { + debug!(query = %query, results = 0, "[daemon-client] empty results"); + return Ok(Vec::new()); + } + + // // Hydrate from local database + let results = self.hydrate_from_db(db, &ids).await?; + + // // Reorder results to match the order from the daemon (which is ranked by relevance) + let ordered_results = span!(Level::TRACE, "reorder_results").in_scope(|| { + let mut ordered_results = Vec::with_capacity(results.len()); + for id in &ids { + if let Some(history) = results.iter().find(|h| h.id.0 == *id) { + ordered_results.push(history.clone()); + } + } + ordered_results + }); + + debug!( + query = %query, + results = results.len(), + "[daemon-client]" + ); + + Ok(ordered_results) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_highlight")] + fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { + // Use fulltext highlighting for regex queries + if Self::contains_regex_pattern(search_input) { + return super::db::get_highlight_indices_fulltext(command, search_input); + } + + let mut matcher = Matcher::new(Config::DEFAULT); + let pattern = Pattern::parse(search_input, CaseMatching::Smart, Normalization::Smart); + + let mut indices: Vec = Vec::new(); + let mut haystack_buf = Vec::new(); + + let haystack = Utf32Str::new(command, &mut haystack_buf); + pattern.indices(haystack, &mut matcher, &mut indices); + + // Convert u32 indices to usize + indices.into_iter().map(|i| i as usize).collect() + } +} diff --git a/crates/atuin/src/command/client/search/engines/db.rs b/crates/atuin/src/command/client/search/engines/db.rs index f0ed424e..476462f5 100644 --- a/crates/atuin/src/command/client/search/engines/db.rs +++ b/crates/atuin/src/command/client/search/engines/db.rs @@ -11,17 +11,19 @@ use eyre::Result; use norm::Metric; use norm::fzf::{FzfParser, FzfV2}; use std::ops::Range; +use tracing::{Level, instrument}; pub struct Search(pub SearchMode); #[async_trait] impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "db_search", fields(mode = ?self.0, query = %state.input.as_str()))] async fn full_query( &mut self, state: &SearchState, db: &mut dyn Database, ) -> Result> { - Ok(db + let results = db .search( self.0, state.filter_mode, @@ -34,9 +36,11 @@ impl SearchEngine for Search { ) .await // ignore errors as it may be caused by incomplete regex - .map_or(Vec::new(), |r| r.into_iter().collect())) + .map_or(Vec::new(), |r| r.into_iter().collect()); + Ok(results) } + #[instrument(skip_all, level = Level::TRACE, name = "db_highlight")] fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { if self.0 == SearchMode::Prefix { return vec![]; @@ -54,7 +58,8 @@ impl SearchEngine for Search { } } -fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec { +#[instrument(skip_all, level = Level::TRACE, name = "db_highlight_fulltext")] +pub fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec { let mut ranges = vec![]; let lower_command = command.to_ascii_lowercase(); diff --git a/crates/atuin/src/command/client/search/engines/skim.rs b/crates/atuin/src/command/client/search/engines/skim.rs index cb7ce24f..7d9feb40 100644 --- a/crates/atuin/src/command/client/search/engines/skim.rs +++ b/crates/atuin/src/command/client/search/engines/skim.rs @@ -7,6 +7,7 @@ use fuzzy_matcher::{FuzzyMatcher, skim::SkimMatcherV2}; use itertools::Itertools; use time::OffsetDateTime; use tokio::task::yield_now; +use tracing::{Level, instrument, warn}; use uuid; use super::{SearchEngine, SearchState}; @@ -27,18 +28,20 @@ impl Search { #[async_trait] impl SearchEngine for Search { + #[instrument(skip_all, level = Level::TRACE, name = "skim_search", fields(query = %state.input.as_str()))] async fn full_query( &mut self, state: &SearchState, db: &mut dyn Database, ) -> Result> { if self.all_history.is_empty() { - self.all_history = db.all_with_count().await.unwrap(); + self.all_history = load_all_history(db).await; } Ok(fuzzy_search(&self.engine, state, &self.all_history).await) } + #[instrument(skip_all, level = Level::TRACE, name = "skim_highlight")] fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec { let (_, indices) = self .engine @@ -48,7 +51,13 @@ impl SearchEngine for Search { } } +#[instrument(skip_all, level = Level::TRACE, name = "load_all_history")] +async fn load_all_history(db: &dyn Database) -> Vec<(History, i32)> { + db.all_with_count().await.unwrap() +} + #[allow(clippy::too_many_lines)] +#[instrument(skip_all, level = Level::TRACE, name = "fuzzy_match", fields(history_count = all_history.len()))] async fn fuzzy_search( engine: &SkimMatcherV2, state: &SearchState, @@ -97,11 +106,11 @@ async fn fuzzy_search( if !is_current_session { let Ok(uuid) = uuid::Uuid::parse_str(&context.session) else { - log::warn!("failed to parse session id '{}'", context.session); + warn!("failed to parse session id '{}'", context.session); continue; }; let Some(timestamp) = uuid.get_timestamp() else { - log::warn!( + warn!( "failed to get timestamp from uuid '{}'", uuid.as_hyphenated() ); @@ -111,7 +120,7 @@ async fn fuzzy_search( let Ok(session_start) = time::OffsetDateTime::from_unix_timestamp_nanos( i128::from(seconds) * 1_000_000_000 + i128::from(nanos), ) else { - log::warn!( + warn!( "failed to create OffsetDateTime from second: {seconds}, nanosecond: {nanos}" ); continue; diff --git a/crates/atuin/src/command/client/search/interactive.rs b/crates/atuin/src/command/client/search/interactive.rs index c6a6064a..729c80ce 100644 --- a/crates/atuin/src/command/client/search/interactive.rs +++ b/crates/atuin/src/command/client/search/interactive.rs @@ -657,7 +657,7 @@ impl State { Action::CycleSearchMode => { self.switched_search_mode = true; self.search_mode = self.search_mode.next(settings); - self.engine = engines::engine(self.search_mode); + self.engine = engines::engine(self.search_mode, settings); InputAction::Continue } Action::SwitchContext => { @@ -1419,7 +1419,7 @@ pub async fn history( context: initial_context.clone(), custom_context: None, }, - engine: engines::engine(search_mode), + engine: engines::engine(search_mode, settings), results_len: 0, accept: false, keymap_mode: match settings.keymap_mode { @@ -1875,7 +1875,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -1930,7 +1930,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2049,7 +2049,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2108,7 +2108,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2163,7 +2163,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2214,7 +2214,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2274,7 +2274,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; @@ -2335,7 +2335,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; state.results_state.select(selected); @@ -2714,7 +2714,7 @@ mod tests { }, custom_context: None, }, - engine: engines::engine(SearchMode::Fuzzy), + engine: engines::engine(SearchMode::Fuzzy, &settings), now: Box::new(OffsetDateTime::now_utc), }; diff --git a/crates/atuin/src/command/client/store/rebuild.rs b/crates/atuin/src/command/client/store/rebuild.rs index 8acec531..a98f8142 100644 --- a/crates/atuin/src/command/client/store/rebuild.rs +++ b/crates/atuin/src/command/client/store/rebuild.rs @@ -3,6 +3,9 @@ use atuin_scripts::store::ScriptStore; use clap::Args; use eyre::{Result, bail}; +#[cfg(feature = "daemon")] +use atuin_daemon::emit_event; + use atuin_client::{ database::Database, encryption, history::store::HistoryStore, record::sqlite_store::SqliteStore, settings::Settings, @@ -57,6 +60,9 @@ impl Rebuild { history_store.build(database).await?; + #[cfg(feature = "daemon")] + let _ = emit_event(atuin_daemon::DaemonEvent::HistoryRebuilt).await; + Ok(()) } -- cgit v1.3.1