diff options
| author | Michelle Tilley <michelle@michelletilley.net> | 2026-02-26 14:42:08 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-26 14:42:08 -0800 |
| commit | 3ba47446f06d5b0fbeaeb59d4ffed768b70729d8 (patch) | |
| tree | 28348bb3d18e9983e9212c26840f691766cad985 /crates/atuin-daemon | |
| parent | feat: Add history author/intent metadata and v1 record version (#3205) (diff) | |
| download | atuin-3ba47446f06d5b0fbeaeb59d4ffed768b70729d8.zip | |
feat: In-memory search index with atuin daemon (#3201)
## Summary
This PR adds a persistent, in-memory search index to the Atuin daemon,
enabling fast fuzzy search without the startup delay of building an
index each time the TUI opens.
### Key Changes
- **Daemon search service**: A new gRPC service that maintains a Nucleo
fuzzy search index in memory
- **Real-time index updates**: The daemon listens for history events
(new commands, synced records) and updates the index immediately
- **Filter mode support**: All existing filter modes work (Global, Host,
Session, Directory, Workspace)
- **New search engine**: `daemon-fuzzy` search mode that queries the
daemon instead of building a local index
- **Paged history loading**: Database pagination support for efficient
initial index loading
- **Configurable logging**: New `[logs]` settings section for daemon and
search log configuration
- **Component-based daemon architecture**: Refactored daemon internals
into a modular, event-driven system
- **Fallback to DB search for regex**: Since Nucleo doesn't support
regex matching
## Daemon Architecture
The daemon has been refactored to use a component-based, event-driven
architecture that makes it easier to add new functionality and reason
about the system.
### Core Concepts
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Atuin Daemon │
│ │
│ ┌─────────────┐ ┌──────────────────────────────────────────────────┐ │
│ │ Daemon │ │ Components │ │
│ │ Handle │────▶│ │ │
│ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │ │
│ │ • emit() │ │ │ History │ │ Search │ │ Sync │ │ │
│ │ • subscribe │ │ │ Component │ │ Component │ │ Component │ │ │
│ │ • settings │ │ │ │ │ │ │ │ │ │
│ │ • databases │ │ │ gRPC service│ │ gRPC service│ │ background │ │ │
│ └─────────────┘ │ │ WIP history │ │ Nucleo index│ │ sync │ │ │
│ │ │ └─────────────┘ └─────────────┘ └────────────┘ │ │
│ │ └──────────────────────────────────────────────────┘ │
│ │ ▲ │
│ ▼ │ │
│ ┌─────────────────────────────────────┴────────────────────────────────┐ │
│ │ Event Bus (broadcast) │ │
│ │ │ │
│ │ HistoryStarted │ HistoryEnded │ RecordsAdded │ SyncCompleted │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ ┌───────────────────────────────────┴──────────────────────────────────┐ │
│ │ Control Service (gRPC) │ │
│ │ External event injection from CLI commands │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### DaemonHandle
A lightweight, cloneable handle that provides access to shared daemon
resources:
- **Event emission**: `handle.emit(DaemonEvent::...)` broadcasts to all
components
- **Event subscription**: `handle.subscribe()` returns a receiver for
the event bus
- **Settings**: `handle.settings()` for configuration access
- **Databases**: `handle.history_db()` and `handle.store()` for data
access
### Component Trait
Components implement a simple lifecycle:
```rust
#[async_trait]
trait Component: Send + Sync {
fn name(&self) -> &'static str;
async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
}
```
### Event-Driven Design
Components communicate via events rather than direct coupling:
| Event | Emitted By | Consumed By |
|-------|-----------|-------------|
| `HistoryStarted` | History gRPC | Search (logging) |
| `HistoryEnded` | History gRPC | Search (index update) |
| `RecordsAdded` | Sync | Search (index update) |
| `HistoryPruned` | CLI (via Control) | Search (index rebuild) |
| `HistoryDeleted` | CLI (via Control) | Search (index rebuild) |
| `ForceSync` | CLI (via Control) | Sync |
| `ShutdownRequested` | Signal handler | All (graceful shutdown) |
### External Event Injection
CLI commands can inject events into a running daemon:
```rust
// After `atuin history prune`
emit_event(DaemonEvent::HistoryPruned).await?;
// After deleting specific items
emit_event(DaemonEvent::HistoryDeleted { ids }).await?;
// Request immediate sync
emit_event(DaemonEvent::ForceSync).await?;
```
This ensures the daemon's search index stays in sync with database
changes made by CLI commands.
## Search Architecture
The search service uses a [forked version of
Nucleo](https://github.com/atuinsh/nucleo-ext) that adds filter and
scorer callbacks, enabling efficient filtering and frecency-based
ranking.
```
┌────────────────────────────────────────────────────────────────┐
│ Atuin Daemon │
│ │
│ ┌─────────────────┐ ┌──────────────────────────────────┐ │
│ │ Event System │───▶│ Search Component │ │
│ │ │ │ │ │
│ │ • RecordsAdded │ │ ┌────────────────────────────┐ │ │
│ │ • HistoryEnded │ │ │ Deduplicated Index │ │ │
│ │ • HistoryPruned │ │ │ │ │ │
│ └─────────────────┘ │ │ CommandData per command: │ │ │
│ │ │ • Global frecency │ │ │
│ ┌─────────────────┐ │ │ • Filter indexes (sets) │ │ │
│ │ Background Task │ │ │ • Invocation history │ │ │
│ │ │ │ └────────────────────────────┘ │ │
│ │ Rebuilds │ │ │ │ │
│ │ frecency map │ │ ▼ │ │
│ │ every 60s │───▶│ ┌────────────────────────────┐ │ │
│ └─────────────────┘ │ │ Nucleo (forked) │ │ │
│ │ │ │ │ │
│ │ │ • Filter callback │ │ │
│ │ │ • Scorer callback │ │ │
│ │ │ • Fuzzy matching │ │ │
│ │ └────────────────────────────┘ │ │
│ └──────────────────────────────────┘ │
│ │ │
│ │ gRPC (Unix socket) │
└──────────────────────────────────────│─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Search TUI (Client) │
│ │
│ 1. Send query + filter mode + context to daemon │
│ 2. Receive matching history IDs (ranked by frecency) │
│ 3. Hydrate full records from local SQLite database │
│ 4. Display results in TUI │
└─────────────────────────────────────────────────────────────────┘
```
### Nucleo Fork
The [nucleo-ext fork](https://github.com/atuinsh/nucleo-ext) adds two
key features to Nucleo:
1. **Filter callback**: Pre-filter items before fuzzy matching (used for
directory/host/session filtering)
2. **Scorer callback**: Compute custom scores after matching (used for
frecency ranking)
```rust
// Filter: only include commands run in current directory
nucleo.set_filter(Some(Arc::new(|cmd: &String| {
passing_commands.contains(cmd)
})));
// Scorer: combine fuzzy score with frecency
nucleo.set_scorer(Some(Arc::new(|cmd: &String, fuzzy_score: u32| {
let frecency = frecency_map.get(cmd).unwrap_or(0);
fuzzy_score + (frecency * 10)
})));
```
### Deduplicated Index
Commands are stored once per unique command text, with metadata tracking
all invocations:
```rust
struct CommandData {
command: String,
invocations: Vec<Invocation>, // All times this command was run
global_frecency: FrecencyData, // Precomputed frecency score
// O(1) filter indexes
directories: HashSet<String>, // All cwds where command was run
hosts: HashSet<String>, // All hostnames
sessions: HashSet<String>, // All session IDs
}
```
This deduplication means:
- **Fewer items to match**: ~13K unique commands vs ~62K history entries
- **O(1) filter checks**: HashSet lookups instead of scanning
invocations
- **Single frecency score**: Global frecency computed once, used for all
filter modes
### Frecency Scoring
Frecency (frequency + recency) scoring prioritizes recently and
frequently used commands:
```rust
fn compute_frecency(count: u32, last_used: i64, now: i64) -> u32 {
let age_hours = (now - last_used) / 3600;
// Recency: decays over time (half-life ~24 hours)
let recency = (100.0 * (-age_hours as f64 / 24.0).exp()) as u32;
// Frequency: logarithmic scaling
let frequency = (count.ln() * 20.0).min(100.0) as u32;
recency + frequency
}
```
The frecency map is:
- **Precomputed by background task** every 60 seconds
- **Never computed inline** during search (no latency impact)
- **Graceful fallback**: If unavailable, search works without frecency
ranking
### Filter Mode Implementation
| Filter Mode | Implementation |
|-------------|----------------|
| Global | No filter (all commands) |
| Directory | `command.directories.contains(cwd)` |
| Workspace | `command.directories.any(\|d\| d.starts_with(git_root))` |
| Host | `command.hosts.contains(hostname)` |
| Session | `command.sessions.contains(session_id)` |
Filters are pre-computed into a HashSet before the search, making the
filter callback O(1).
### Search Flow
1. **Daemon startup**: Loads history from SQLite in pages, builds
deduplicated index
2. **Frecency precompute**: Background task builds frecency map after
history loads
3. **Search request**: Client sends query with filter mode and context
4. **Filter**: Pre-computed HashSet determines which commands pass the
filter
5. **Match**: Nucleo fuzzy matches the query against command text
6. **Score**: Frecency scorer ranks results (fuzzy score + frecency *
10)
7. **Response**: Returns history IDs for the most recent invocation of
each matching command
8. **Hydration**: Client fetches full records from local SQLite
### Configuration
```toml
# Enable daemon + autostart
[daemon]
enabled = true
autostart = true
# Enable daemon-based fuzzy search
[search]
search_mode = "daemon-fuzzy"
```
## Performance
Performance varies based on several factors, but in most initial testing
with the new architecture shows improvement:
* **Nucleo performs searches up to 4.5x faster**: direct DB search
averages 18.07ms, but the daemon completes the same queries in 3.99ms.
* **IPC overhead is significant, but acceptable**: a significant amount
of wall-time is taken up by the transfer of data over IPC (via UDS in
this case). This averages to about ~7.8ms and accounts for 66% of
client-side wall time.
* **Tail latency improves at every layer**: p99 times correspond to
initial requests, worst-case query patterns, etc. but the average p99
daemon-based response time is 3.6x better than the associated DB-based
search p99 time
* **Query complexity no longer impacts performance**: the Nucleo-based
search shows consistent 2-7ms times regardless of query pattern. The
DB-based search had a 17x variance (3.59ms to 62.46ms).
Interestingly, @ellie - who has a larger history store than I do - gets
even better performance on the IPC layer. This could use a lot more
testing in various edge cases and on various hardware, but seems
promising.
### Regular DB search
```
Individual calls for: db_search
--------------------------------------------------------------------------------------------------------------
# Wall Busy Idle Fields
--------------------------------------------------------------------------------------------------------------
1 32.25ms 32.20ms 47.70µs {"mode":"Fuzzy","query":"^"}
2 19.48ms 19.40ms 84.20µs {"mode":"Fuzzy","query":"^c"}
3 20.40ms 20.10ms 297.00µs {"mode":"Fuzzy","query":"^ca"}
4 13.07ms 13.00ms 69.90µs {"mode":"Fuzzy","query":"^car"}
5 12.17ms 12.10ms 67.10µs {"mode":"Fuzzy","query":"^carg"}
6 20.78ms 20.70ms 76.60µs {"mode":"Fuzzy","query":"^cargo"}
7 9.15ms 9.10ms 53.20µs {"mode":"Fuzzy","query":"^cargo "}
8 10.24ms 10.00ms 237.00µs {"mode":"Fuzzy","query":"^cargo b"}
9 10.01ms 9.68ms 325.00µs {"mode":"Fuzzy","query":"^cargo bu"}
10 5.89ms 5.83ms 57.20µs {"mode":"Fuzzy","query":"^cargo bui"}
11 8.85ms 8.28ms 568.00µs {"mode":"Fuzzy","query":"^cargo buil"}
12 7.70ms 7.49ms 212.00µs {"mode":"Fuzzy","query":"^cargo build"}
13 3.59ms 3.53ms 57.00µs {"mode":"Fuzzy","query":"^cargo build$"}
14 6.50ms 6.44ms 63.60µs {"mode":"Fuzzy","query":"^cargo "}
15 6.48ms 6.38ms 100.00µs {"mode":"Fuzzy","query":"!"}
16 31.68ms 31.60ms 75.90µs {"mode":"Fuzzy","query":"!g"}
17 62.46ms 62.40ms 58.90µs {"mode":"Fuzzy","query":"!gi"}
18 30.35ms 30.30ms 46.90µs {"mode":"Fuzzy","query":"!git"}
19 53.84ms 53.80ms 40.80µs {"mode":"Fuzzy","query":"!git "}
20 19.24ms 19.20ms 39.70µs {"mode":"Fuzzy","query":"!git c"}
21 22.03ms 22.00ms 34.70µs {"mode":"Fuzzy","query":"!git co"}
22 17.13ms 17.00ms 133.00µs {"mode":"Fuzzy","query":"!git com"}
23 16.14ms 15.90ms 242.00µs {"mode":"Fuzzy","query":"!git comm"}
24 5.11ms 5.08ms 28.60µs {"mode":"Fuzzy","query":"!git commi"}
25 7.31ms 7.26ms 52.70µs {"mode":"Fuzzy","query":"!git commit"}
Summary: 25 calls
Wall: avg=18.07ms, min=3.59ms, max=62.46ms, p50=13.07ms, p99=62.46ms
Busy: avg=17.95ms, min=3.53ms, max=62.40ms, p50=13.00ms, p99=62.40ms
```
### Daemon-based search
**Client**
```
Individual calls for: daemon_search
--------------------------------------------------------------------------------------------------------------
# Wall Busy Idle Fields
--------------------------------------------------------------------------------------------------------------
1 13.05ms 2.55ms 10.50ms {"query":"^"}
2 10.65ms 1.40ms 9.25ms {"query":"^c"}
3 10.72ms 1.18ms 9.54ms {"query":"^ca"}
4 5.54ms 485.00µs 5.06ms {"query":"^car"}
5 15.02ms 1.02ms 14.00ms {"query":"^carg"}
6 9.49ms 840.00µs 8.65ms {"query":"^cargo"}
7 5.53ms 555.00µs 4.97ms {"query":"^cargo "}
8 8.56ms 717.00µs 7.84ms {"query":"^cargo b"}
9 12.34ms 1.24ms 11.10ms {"query":"^cargo bu"}
10 8.38ms 650.00µs 7.73ms {"query":"^cargo bui"}
11 13.07ms 770.00µs 12.30ms {"query":"^cargo buil"}
12 17.11ms 709.00µs 16.40ms {"query":"^cargo build"}
13 15.41ms 907.00µs 14.50ms {"query":"^cargo build$"}
14 8.19ms 665.00µs 7.52ms {"query":"^cargo "}
15 7.98ms 1.72ms 6.26ms {"query":"!"}
16 13.56ms 856.00µs 12.70ms {"query":"!g"}
17 8.11ms 624.00µs 7.49ms {"query":"!gi"}
18 14.57ms 775.00µs 13.80ms {"query":"!git"}
19 14.18ms 779.00µs 13.40ms {"query":"!git "}
20 9.62ms 802.00µs 8.82ms {"query":"!git c"}
21 15.50ms 1.50ms 14.00ms {"query":"!git co"}
22 11.58ms 1.48ms 10.10ms {"query":"!git com"}
23 13.82ms 2.12ms 11.70ms {"query":"!git comm"}
24 17.48ms 2.18ms 15.30ms {"query":"!git commi"}
25 14.81ms 1.71ms 13.10ms {"query":"!git commit"}
Summary: 25 calls
Wall: avg=11.77ms, min=5.53ms, max=17.48ms, p50=12.34ms, p99=17.48ms
Busy: avg=1.13ms, min=485.00µs, max=2.55ms, p50=856.00µs, p99=2.55ms
```
**Daemon**
```
Individual calls for: daemon_search_query
--------------------------------------------------------------------------------------------------------------
# Wall Busy Idle Fields
--------------------------------------------------------------------------------------------------------------
1 1.75ms 250ns 1.75ms {"query":"^","query_id":1}
2 4.58ms 125ns 4.58ms {"query":"^c","query_id":2}
3 4.39ms 250ns 4.39ms {"query":"^ca","query_id":3}
4 2.52ms 125ns 2.52ms {"query":"^car","query_id":4}
5 4.44ms 250ns 4.44ms {"query":"^carg","query_id":5}
6 3.66ms 167ns 3.66ms {"query":"^cargo","query_id":6}
7 2.38ms 84ns 2.38ms {"query":"^cargo ","query_id":7}
8 4.13ms 84ns 4.13ms {"query":"^cargo b","query_id":8}
9 4.40ms 167ns 4.40ms {"query":"^cargo bu","query_id":9}
10 3.87ms 125ns 3.87ms {"query":"^cargo bui","query_id":10}
11 4.36ms 84ns 4.36ms {"query":"^cargo buil","query_id":11}
12 3.96ms 333ns 3.96ms {"query":"^cargo build","query_id":12}
13 4.61ms 167ns 4.61ms {"query":"^cargo build$","query_id":13}
14 4.20ms 209ns 4.20ms {"query":"^cargo ","query_id":14}
15 238.17µs 167ns 238.00µs {"query":"!","query_id":15}
16 4.44ms 125ns 4.44ms {"query":"!g","query_id":16}
17 3.47ms 83ns 3.47ms {"query":"!gi","query_id":17}
18 4.57ms 125ns 4.57ms {"query":"!git","query_id":18}
19 7.15ms 167ns 7.15ms {"query":"!git ","query_id":19}
20 4.27ms 250ns 4.27ms {"query":"!git c","query_id":20}
21 5.19ms 292ns 5.19ms {"query":"!git co","query_id":21}
22 4.29ms 417ns 4.29ms {"query":"!git com","query_id":22}
23 4.08ms 125ns 4.08ms {"query":"!git comm","query_id":23}
24 4.50ms 167ns 4.50ms {"query":"!git commi","query_id":24}
25 4.35ms 208ns 4.35ms {"query":"!git commit","query_id":25}
Summary: 25 calls
Wall: avg=3.99ms, min=238.17µs, max=7.15ms, p50=4.29ms, p99=7.15ms
Busy: avg=182ns, min=83ns, max=417ns, p50=167ns, p99=417ns
```
**Nucleo matching time (in daemon)**
```
Individual calls for: nucleo_match
--------------------------------------------------------------------------------------------------------------
# Wall Busy Idle Fields
--------------------------------------------------------------------------------------------------------------
1 1.73ms 125ns 1.73ms {"query":"^","query_id":1}
2 4.57ms 167ns 4.57ms {"query":"^c","query_id":2}
3 4.37ms 125ns 4.37ms {"query":"^ca","query_id":3}
4 2.51ms 84ns 2.51ms {"query":"^car","query_id":4}
5 4.43ms 125ns 4.43ms {"query":"^carg","query_id":5}
6 3.64ms 125ns 3.64ms {"query":"^cargo","query_id":6}
7 2.37ms 84ns 2.37ms {"query":"^cargo ","query_id":7}
8 4.11ms 125ns 4.11ms {"query":"^cargo b","query_id":8}
9 4.36ms 208ns 4.36ms {"query":"^cargo bu","query_id":9}
10 3.85ms 125ns 3.85ms {"query":"^cargo bui","query_id":10}
11 4.35ms 125ns 4.35ms {"query":"^cargo buil","query_id":11}
12 3.94ms 250ns 3.94ms {"query":"^cargo build","query_id":12}
13 4.59ms 125ns 4.59ms {"query":"^cargo build$","query_id":13}
14 4.18ms 84ns 4.18ms {"query":"^cargo ","query_id":14}
15 220.13µs 125ns 220.00µs {"query":"!","query_id":15}
16 4.43ms 125ns 4.43ms {"query":"!g","query_id":16}
17 3.45ms 125ns 3.45ms {"query":"!gi","query_id":17}
18 4.55ms 125ns 4.55ms {"query":"!git","query_id":18}
19 7.12ms 209ns 7.12ms {"query":"!git ","query_id":19}
20 4.25ms 166ns 4.25ms {"query":"!git c","query_id":20}
21 5.18ms 125ns 5.18ms {"query":"!git co","query_id":21}
22 4.27ms 125ns 4.27ms {"query":"!git com","query_id":22}
23 4.06ms 292ns 4.06ms {"query":"!git comm","query_id":23}
24 4.46ms 166ns 4.46ms {"query":"!git commi","query_id":24}
25 4.31ms 208ns 4.31ms {"query":"!git commit","query_id":25}
Summary: 25 calls
Wall: avg=3.97ms, min=220.13µs, max=7.12ms, p50=4.27ms, p99=7.12ms
Busy: avg=147ns, min=84ns, max=292ns, p50=125ns, p99=292ns
```
Diffstat (limited to 'crates/atuin-daemon')
21 files changed, 2785 insertions, 391 deletions
diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml index 36917789..97ed88ea 100644 --- a/crates/atuin-daemon/Cargo.toml +++ b/crates/atuin-daemon/Cargo.toml @@ -14,9 +14,10 @@ readme.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -atuin-client = { path = "../atuin-client", version = "18.13.0-beta.1" } -atuin-dotfiles = { path = "../atuin-dotfiles", version = "18.13.0-beta.1" } -atuin-history = { path = "../atuin-history", version = "18.13.0-beta.1" } +atuin-client = { path = "../atuin-client" } +atuin-common = { path = "../atuin-common" } +atuin-dotfiles = { path = "../atuin-dotfiles" } +atuin-history = { path = "../atuin-history" } time = { workspace = true } uuid = { workspace = true } @@ -32,10 +33,11 @@ tonic = "0.14" tonic-prost = "0.14" prost = "0.14" prost-types = "0.14" -tokio-stream = {version="0.1.14", features=["net"]} +tokio-stream = { version = "0.1.14", features = ["net"] } hyper-util = "0.1" rand.workspace = true +nucleo = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" } [target.'cfg(target_os = "linux")'.dependencies] listenfd = "1.0.1" diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs index fbe34d12..7034aa04 100644 --- a/crates/atuin-daemon/build.rs +++ b/crates/atuin-daemon/build.rs @@ -3,7 +3,11 @@ use std::{env, fs, path::PathBuf}; use protox::prost::Message; fn main() -> std::io::Result<()> { - let proto_paths = ["proto/history.proto"]; + let proto_paths = [ + "proto/history.proto", + "proto/search.proto", + "proto/control.proto", + ]; let proto_include_dirs = ["proto"]; let file_descriptors = protox::compile(proto_paths, proto_include_dirs).unwrap(); diff --git a/crates/atuin-daemon/proto/control.proto b/crates/atuin-daemon/proto/control.proto new file mode 100644 index 00000000..06347902 --- /dev/null +++ b/crates/atuin-daemon/proto/control.proto @@ -0,0 +1,62 @@ +syntax = "proto3"; +package control; + +// The Control service allows external processes (CLI commands, etc.) +// to inject events into the running daemon. +service Control { + // Send an event to the daemon's event bus + rpc SendEvent(SendEventRequest) returns (SendEventResponse); +} + +message SendEventRequest { + oneof event { + // History was pruned - search index needs full rebuild + HistoryPrunedEvent history_pruned = 1; + + // Specific history items were deleted + HistoryDeletedEvent history_deleted = 2; + + // Request immediate sync + ForceSyncEvent force_sync = 3; + + // Settings have changed, reload if needed + SettingsReloadedEvent settings_reloaded = 4; + + // Request graceful shutdown + ShutdownEvent shutdown = 5; + + // History was rebuilt - search index needs full rebuild + HistoryRebuiltEvent history_rebuilt = 6; + } +} + +message SendEventResponse { + // Empty on success; errors come through gRPC status +} + +// Individual event message types + +message HistoryPrunedEvent { + // No fields needed - just signals that pruning happened +} + +message HistoryRebuiltEvent { + // No fields needed - just signals that rebuilding happened +} + +message HistoryDeletedEvent { + // IDs of deleted history items (UUIDs as strings) + repeated string ids = 1; +} + +message ForceSyncEvent { + // No fields needed - just triggers sync +} + +message SettingsReloadedEvent { + // No fields needed - components should re-read settings +} + +message ShutdownEvent { + // No fields needed - triggers graceful shutdown +} diff --git a/crates/atuin-daemon/proto/search.proto b/crates/atuin-daemon/proto/search.proto new file mode 100644 index 00000000..6b84acbd --- /dev/null +++ b/crates/atuin-daemon/proto/search.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; +package search; + +enum FilterMode { + GLOBAL = 0; + HOST = 1; + SESSION = 2; + DIRECTORY = 3; + WORKSPACE = 4; + SESSION_PRELOAD = 5; +} + +message SearchContext { + string session_id = 1; + string cwd = 2; + string hostname = 3; + string host_id = 4; + optional string git_root = 5; +} + +message SearchRequest { + string query = 1; + uint64 query_id = 2; // Incrementing ID to match responses to queries + FilterMode filter_mode = 3; + SearchContext context = 4; +} + +message SearchResponse { + uint64 query_id = 1; // Echo back the query ID + repeated bytes ids = 2; +} + +service Search { + rpc Search(stream SearchRequest) returns (stream SearchResponse); +} diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs index 3b76a680..2f492f6b 100644 --- a/crates/atuin-daemon/src/client.rs +++ b/crates/atuin-daemon/src/client.rs @@ -1,4 +1,6 @@ -use eyre::{Context, Result}; +use atuin_client::database::Context; +use atuin_client::settings::{FilterMode, Settings}; +use eyre::{Context as EyreContext, Result}; #[cfg(windows)] use tokio::net::TcpStream; use tonic::Code; @@ -11,11 +13,22 @@ use hyper_util::rt::TokioIo; use tokio::net::UnixStream; use atuin_client::history::History; +use tracing::{Level, instrument, span}; +use crate::control::HistoryRebuiltEvent; +use crate::control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, + SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, +}; +use crate::events::DaemonEvent; use crate::history::{ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient, }; +use crate::search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, + search_client::SearchClient as SearchServiceClient, +}; pub struct HistoryClient { client: HistoryServiceClient<Channel>, @@ -52,6 +65,8 @@ pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { impl HistoryClient { #[cfg(unix)] pub async fn new(path: String) -> Result<Self> { + use eyre::Context; + let log_path = path.clone(); let channel = Endpoint::try_from("http://atuin_local_daemon:0")? .connect_with_connector(service_fn(move |_: Uri| { @@ -130,3 +145,275 @@ impl HistoryClient { Ok(resp.accepted) } } + +pub struct SearchClient { + client: SearchServiceClient<Channel>, +} + +impl SearchClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result<Self> { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] + pub async fn search( + &mut self, + query: String, + query_id: u64, + filter_mode: FilterMode, + context: Option<Context>, + ) -> Result<tonic::Streaming<SearchResponse>> { + let request = SearchRequest { + query, + query_id, + filter_mode: RpcFilterMode::from(filter_mode).into(), + context: context.map(RpcSearchContext::from), + }; + let request_stream = tokio_stream::once(request); + let response = span!(Level::TRACE, "daemon_client_search.request") + .in_scope(async || self.client.search(request_stream).await) + .await?; + + Ok(response.into_inner()) + } +} + +impl From<FilterMode> for RpcFilterMode { + fn from(filter_mode: FilterMode) -> Self { + match filter_mode { + FilterMode::Global => RpcFilterMode::Global, + FilterMode::Host => RpcFilterMode::Host, + FilterMode::Session => RpcFilterMode::Session, + FilterMode::Directory => RpcFilterMode::Directory, + FilterMode::Workspace => RpcFilterMode::Workspace, + FilterMode::SessionPreload => RpcFilterMode::SessionPreload, + } + } +} + +impl From<Context> for RpcSearchContext { + fn from(context: Context) -> Self { + RpcSearchContext { + session_id: context.session, + cwd: context.cwd, + hostname: context.hostname, + host_id: context.host_id, + git_root: context + .git_root + .map(|path| path.to_string_lossy().to_string()), + } + } +} + +// ============================================================================ +// Control Client +// ============================================================================ + +/// Client for the Control gRPC service. +/// +/// Used to inject events into a running daemon from external processes. +pub struct ControlClient { + client: ControlServiceClient<Channel>, +} + +impl ControlClient { + /// Connect to the daemon's control service. + #[cfg(unix)] + pub async fn new(path: String) -> Result<Self> { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect to the daemon's control service. + #[cfg(not(unix))] + pub async fn new(port: u64) -> Result<Self> { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let url = format!("127.0.0.1:{port}"); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?" + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect using settings. + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result<Self> { + Self::new(settings.daemon.socket_path.clone()).await + } + + /// Connect using settings. + #[cfg(not(unix))] + pub async fn from_settings(settings: &Settings) -> Result<Self> { + Self::new(settings.daemon.tcp_port).await + } + + /// Send an event to the daemon. + pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { + let proto_event = daemon_event_to_proto(event); + let request = SendEventRequest { + event: Some(proto_event), + }; + self.client.send_event(request).await?; + Ok(()) + } +} + +/// Convert a daemon event to its proto representation. +fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event { + use crate::control::send_event_request::Event; + + match event { + DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), + DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), + DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { + ids: ids.into_iter().map(|id| id.0).collect(), + }), + DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), + DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), + DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), + // These events are internal and not sent via the control service + DaemonEvent::HistoryStarted(_) + | DaemonEvent::HistoryEnded(_) + | DaemonEvent::RecordsAdded(_) + | DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } => { + // Use shutdown as a fallback, though this shouldn't happen + tracing::warn!("attempted to send internal event via control service"); + Event::Shutdown(ShutdownEvent {}) + } + } +} + +// ============================================================================ +// Convenience Functions +// ============================================================================ + +/// Emit an event to the daemon. +/// +/// This is a fire-and-forget helper for sending events to the daemon from +/// external processes like CLI commands. If the daemon isn't running, this +/// will silently succeed (returns Ok). +/// +/// # Example +/// +/// ```ignore +/// // After pruning history +/// emit_event(DaemonEvent::HistoryPruned).await?; +/// +/// // After deleting specific history items +/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; +/// +/// // Request immediate sync +/// emit_event(DaemonEvent::ForceSync).await?; +/// ``` +pub async fn emit_event(event: DaemonEvent) -> Result<()> { + emit_event_with_settings(event, None).await +} + +/// Emit an event to the daemon with explicit settings. +/// +/// If settings are not provided, they will be loaded from the default location. +/// If the daemon isn't running, this will silently succeed. +pub async fn emit_event_with_settings( + event: DaemonEvent, + settings: Option<&Settings>, +) -> Result<()> { + // Load settings if not provided + let owned_settings; + let settings = match settings { + Some(s) => s, + None => { + owned_settings = Settings::new()?; + &owned_settings + } + }; + + // Try to connect - if daemon isn't running, that's fine + let mut client = match ControlClient::from_settings(settings).await { + Ok(c) => c, + Err(e) => { + tracing::debug!(?e, "daemon not running, skipping event emission"); + return Ok(()); + } + }; + + // Send the event + if let Err(e) = client.send_event(event).await { + tracing::debug!(?e, "failed to send event to daemon"); + // Don't fail - this is fire-and-forget + } + + Ok(()) +} diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs new file mode 100644 index 00000000..23d48c5e --- /dev/null +++ b/crates/atuin-daemon/src/components/history.rs @@ -0,0 +1,252 @@ +//! History component. +//! +//! Handles command history lifecycle (start/end) and provides the History gRPC service. + +use std::sync::Arc; + +use atuin_client::{ + database::Database, + history::{History, HistoryId, store::HistoryStore}, + settings::Settings, +}; +use dashmap::DashMap; +use eyre::Result; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Level, instrument}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + history::{ + EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, + StartHistoryRequest, StatusReply, StatusRequest, + history_server::{History as HistorySvc, HistoryServer}, + }, +}; + +const DAEMON_PROTOCOL_VERSION: u32 = 1; + +/// History component - manages command history lifecycle. +/// +/// This component: +/// - Tracks currently running commands (stored in memory) +/// - Saves completed commands to the database and record store +/// - Emits history events for other components (e.g., search indexing) +/// - Provides the History gRPC service +pub struct HistoryComponent { + inner: Arc<HistoryComponentInner>, +} + +struct HistoryComponentInner { + /// Commands currently running (not yet completed). + running: DashMap<HistoryId, History>, + + /// Handle to the daemon (set during start). + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + + /// History store for pushing records (set during start). + history_store: tokio::sync::RwLock<Option<HistoryStore>>, +} + +impl HistoryComponent { + /// Create a new history component. + pub fn new() -> Self { + Self { + inner: Arc::new(HistoryComponentInner { + running: DashMap::new(), + handle: tokio::sync::RwLock::new(None), + history_store: tokio::sync::RwLock::new(None), + }), + } + } + + /// Get the gRPC service for this component. + /// + /// This returns a tonic service that can be added to a gRPC server. + pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> { + HistoryServer::new(HistoryGrpcService { + inner: self.inner.clone(), + }) + } +} + +impl Default for HistoryComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for HistoryComponent { + fn name(&self) -> &'static str { + "history" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + // Create the history store + let host_id = Settings::host_id().await?; + let history_store = + HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key()); + + *self.inner.history_store.write().await = Some(history_store); + *self.inner.handle.write().await = Some(handle); + + tracing::info!("history component started"); + Ok(()) + } + + async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> { + // History component produces events but doesn't need to react to them + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + tracing::info!("history component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +/// +/// This is a thin wrapper that delegates to the component's shared state. +pub struct HistoryGrpcService { + inner: Arc<HistoryComponentInner>, +} + +#[tonic::async_trait] +impl HistorySvc for HistoryGrpcService { + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request<StartHistoryRequest>, + ) -> Result<Response<StartHistoryReply>, Status> { + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .build() + .into(); + + // Emit the event + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.emit(DaemonEvent::HistoryStarted(h.clone())); + } + + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + self.inner.running.insert(id.clone(), h); + + let reply = StartHistoryReply { + id: id.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request<EndHistoryRequest>, + ) -> Result<Response<EndHistoryReply>, Status> { + let req = request.into_inner(); + let id = HistoryId(req.id); + + if let Some((_, mut history)) = self.inner.running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Get the handle and store to save the history + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let store_guard = self.inner.history_store.read().await; + let history_store = store_guard + .as_ref() + .ok_or_else(|| Status::internal("component not initialized"))?; + + // Save to database + handle + .history_db() + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + // Push to record store + let (record_id, idx) = history_store + .push(history.clone()) + .await + .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?; + + // Emit the event + handle.emit(DaemonEvent::HistoryEnded(history)); + + let reply = EndHistoryReply { + id: record_id.0.to_string(), + idx, + version: env!("CARGO_PKG_VERSION").to_string(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn status( + &self, + _request: Request<StatusRequest>, + ) -> Result<Response<StatusReply>, Status> { + let reply = StatusReply { + healthy: true, + version: env!("CARGO_PKG_VERSION").to_string(), + pid: std::process::id(), + protocol: DAEMON_PROTOCOL_VERSION, + }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn shutdown( + &self, + _request: Request<ShutdownRequest>, + ) -> Result<Response<ShutdownReply>, Status> { + // Use the daemon handle to request shutdown + if let Some(handle) = self.inner.handle.read().await.as_ref() { + handle.shutdown(); + } + Ok(Response::new(ShutdownReply { accepted: true })) + } +} diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs new file mode 100644 index 00000000..5950d5d5 --- /dev/null +++ b/crates/atuin-daemon/src/components/mod.rs @@ -0,0 +1,22 @@ +//! Daemon components. +//! +//! Components are the building blocks of the daemon. Each component handles +//! a specific domain and can: +//! +//! - Expose gRPC services +//! - React to events +//! - Spawn background tasks +//! +//! Available components: +//! +//! - [`history::HistoryComponent`]: Command history lifecycle management +//! - [`search::SearchComponent`]: Fuzzy search over history +//! - [`sync::SyncComponent`]: Cloud sync + +pub mod history; +pub mod search; +pub mod sync; + +pub use history::HistoryComponent; +pub use search::SearchComponent; +pub use sync::SyncComponent; diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs new file mode 100644 index 00000000..7fb59dea --- /dev/null +++ b/crates/atuin-daemon/src/components/search.rs @@ -0,0 +1,394 @@ +//! Search component. +//! +//! Provides fuzzy search over command history using the Nucleo search library +//! with frecency-based ranking and dynamic filtering. + +use std::{pin::Pin, sync::Arc}; + +use atuin_client::database::Database; +use eyre::Result; +use tokio::sync::RwLock; +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{Level, debug, info, instrument, span, trace}; +use uuid::Uuid; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, + search::{ + FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse, + search_server::{Search as SearchSvc, SearchServer}, + }, +}; + +const PAGE_SIZE: usize = 5000; +const RESULTS_LIMIT: u32 = 200; +/// How often to rebuild the frecency map (in seconds). +const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60; + +/// Search component - provides fuzzy search over command history. +/// +/// This component: +/// - Maintains a deduplicated search index with frecency ranking +/// - Loads history from the database on startup +/// - Updates the index when history events occur +/// - Provides the Search gRPC service +pub struct SearchComponent { + index: Arc<RwLock<SearchIndex>>, + handle: tokio::sync::RwLock<Option<DaemonHandle>>, + loader_handle: Option<tokio::task::JoinHandle<()>>, + frecency_handle: Option<tokio::task::JoinHandle<()>>, +} + +impl SearchComponent { + /// Create a new search component. + pub fn new() -> Self { + Self { + index: Arc::new(RwLock::new(SearchIndex::new())), + handle: tokio::sync::RwLock::new(None), + loader_handle: None, + frecency_handle: None, + } + } + + /// Get the gRPC service for this component. + pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> { + SearchServer::new(SearchGrpcService { + index: self.index.clone(), + }) + } + + /// Rebuild the entire search index from the database. + async fn rebuild_index(&self) -> Result<()> { + let handle_guard = self.handle.read().await; + let handle = handle_guard + .as_ref() + .ok_or_else(|| eyre::eyre!("component not initialized"))?; + + info!("Rebuilding search index from database"); + + // Create a new index + let new_index = SearchIndex::new(); + + // Load all history into the new index + let db = handle.history_db().clone(); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + new_index.add_histories(&histories); + } + Ok(None) => break, + Err(e) => { + tracing::error!("Failed to load history during rebuild: {}", e); + break; + } + } + } + + info!( + "Search index rebuild complete; {} unique commands", + new_index.command_count() + ); + + // Replace the old index with the new one + *self.index.write().await = new_index; + Ok(()) + } +} + +impl Default for SearchComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SearchComponent { + fn name(&self) -> &'static str { + "search" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + *self.handle.write().await = Some(handle.clone()); + + // Spawn background task to load history into index + let index = self.index.clone(); + let db = handle.history_db().clone(); + + self.loader_handle = Some(tokio::spawn(async move { + info!( + "Loading history into search index; page size = {}", + PAGE_SIZE + ); + let mut pager = db.all_paged(PAGE_SIZE, false, true); + loop { + match pager.next().await { + Ok(Some(histories)) => { + info!( + "Loading {} history entries into search index", + histories.len() + ); + index.read().await.add_histories(&histories); + } + Ok(None) => { + info!( + "Initial history load complete; {} unique commands indexed", + index.read().await.command_count() + ); + // Build initial frecency map + index.read().await.rebuild_frecency().await; + info!("Initial frecency map built"); + break; + } + Err(e) => { + tracing::error!("Failed to load history: {}", e); + break; + } + } + } + })); + + // Spawn background task to periodically refresh frecency + let index_for_frecency = self.index.clone(); + self.frecency_handle = Some(tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + FRECENCY_REFRESH_INTERVAL_SECS, + )); + loop { + interval.tick().await; + trace!("Refreshing frecency map"); + index_for_frecency.read().await.rebuild_frecency().await; + } + })); + + tracing::info!("search component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + match event { + DaemonEvent::RecordsAdded(records) => { + debug!( + count = records.len(), + "Processing added records for search index" + ); + + let handle_guard = self.handle.read().await; + if let Some(handle) = handle_guard.as_ref() { + let histories: Vec<_> = handle + .history_db() + .query_history( + format!( + "select * from history where id in ({})", + records + .iter() + .map(|record| record.0.to_string()) + .collect::<Vec<_>>() + .join(",") + ) + .as_str(), + ) + .await + .unwrap_or_default(); + + span!(Level::TRACE, "inject_records", count = histories.len()) + .in_scope(async || { + self.index.read().await.add_histories(&histories); + }) + .await; + } + } + DaemonEvent::HistoryStarted(history) => { + debug!(id = %history.id, command = %history.command, "History started (no index action)"); + } + DaemonEvent::HistoryEnded(history) => { + span!(Level::TRACE, "inject_history_ended") + .in_scope(async || { + self.index.read().await.add_history(history); + }) + .await; + } + DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => { + info!("History store pruned or rebuilt, rebuilding search index"); + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + DaemonEvent::HistoryDeleted { ids } => { + info!( + count = ids.len(), + "History deleted, rebuilding search index" + ); + // For now, just rebuild the entire index. A more efficient implementation + // would remove specific items from the index. + if let Err(e) = self.rebuild_index().await { + tracing::error!("Failed to rebuild search index: {}", e); + } + } + // Events we don't care about + DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } + | DaemonEvent::ForceSync + | DaemonEvent::SettingsReloaded + | DaemonEvent::ShutdownRequested => {} + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(handle) = self.loader_handle.take() { + handle.abort(); + } + if let Some(handle) = self.frecency_handle.take() { + handle.abort(); + } + tracing::info!("search component stopped"); + Ok(()) + } +} + +/// The gRPC service implementation. +pub struct SearchGrpcService { + index: Arc<RwLock<SearchIndex>>, +} + +#[tonic::async_trait] +impl SearchSvc for SearchGrpcService { + type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>; + + #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")] + async fn search( + &self, + request: Request<Streaming<SearchRequest>>, + ) -> Result<Response<Self::SearchStream>, Status> { + let mut in_stream = request.into_inner(); + let index = self.index.clone(); + + // Create output channel + let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128); + + // Spawn task to handle incoming requests and send responses + tokio::spawn(async move { + while let Some(req) = in_stream.message().await.transpose() { + match req { + Ok(search_req) => { + let query = search_req.query; + let query_id = search_req.query_id; + let filter_mode: FilterMode = search_req + .filter_mode + .try_into() + .unwrap_or(FilterMode::Global); + let proto_context = search_req.context; + + debug!( + "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}", + query, + query_id, + filter_mode.as_str_name(), + proto_context + ); + + // Convert proto FilterMode + context to IndexFilterMode + let index_filter = convert_filter_mode(filter_mode, &proto_context); + + // Build QueryContext from proto context + let query_context = proto_context + .map(|ctx| QueryContext { + cwd: Some(with_trailing_slash(&ctx.cwd)), + git_root: ctx.git_root.map(|s| with_trailing_slash(&s)), + hostname: Some(ctx.hostname), + session_id: Some(ctx.session_id), + }) + .unwrap_or_default(); + + // Perform the search + let history_ids = + span!(Level::TRACE, "daemon_search_query", %query, query_id) + .in_scope(|| async { + let index = index.read().await; + index + .search(&query, index_filter, &query_context, RESULTS_LIMIT) + .await + }) + .await; + + // Convert history IDs to bytes + let ids: Vec<Vec<u8>> = history_ids + .iter() + .filter_map(|id| { + Uuid::parse_str(id) + .ok() + .map(|uuid| uuid.as_bytes().to_vec()) + }) + .collect(); + + if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }); + + // Convert receiver to stream + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(out_stream))) + } +} + +/// Convert proto FilterMode and context to IndexFilterMode. +fn convert_filter_mode( + mode: FilterMode, + context: &Option<crate::search::SearchContext>, +) -> IndexFilterMode { + match (mode, context) { + (FilterMode::Global, _) => IndexFilterMode::Global, + (FilterMode::Directory, Some(ctx)) => { + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + (FilterMode::Workspace, Some(ctx)) => { + if let Some(ref git_root) = ctx.git_root { + IndexFilterMode::Workspace(with_trailing_slash(git_root)) + } else { + // Fall back to directory if no git root + IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd)) + } + } + (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()), + (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()), + (FilterMode::SessionPreload, Some(ctx)) => { + // SessionPreload is similar to Session - filter by session + IndexFilterMode::Session(ctx.session_id.clone()) + } + // If no context provided, fall back to global + _ => IndexFilterMode::Global, + } +} + +#[cfg(windows)] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('\\') { + s.to_string() + } else { + format!("{}\\", s) + } +} + +#[cfg(not(windows))] +pub fn with_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s.to_string() + } else { + format!("{}/", s) + } +} diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs new file mode 100644 index 00000000..6217706a --- /dev/null +++ b/crates/atuin-daemon/src/components/sync.rs @@ -0,0 +1,257 @@ +//! Sync component. +//! +//! Handles periodic synchronization with the Atuin cloud server. + +use eyre::Result; +use rand::Rng; +use tokio::sync::mpsc; +use tokio::time::{self, MissedTickBehavior}; + +use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings}; +use atuin_dotfiles::store::{AliasStore, var::VarStore}; + +use crate::{ + daemon::{Component, DaemonHandle}, + events::DaemonEvent, +}; + +/// Commands that can be sent to the sync task. +enum SyncCommand { + /// Trigger an immediate sync. + ForceSync, + /// Stop the sync loop. + Stop, +} + +/// Sync component - handles periodic cloud synchronization. +/// +/// This component: +/// - Runs a background sync loop on a configurable interval +/// - Implements exponential backoff on sync failures +/// - Responds to ForceSync events for immediate sync +/// - Emits SyncCompleted/SyncFailed events +pub struct SyncComponent { + task_handle: Option<tokio::task::JoinHandle<()>>, + command_tx: Option<mpsc::Sender<SyncCommand>>, +} + +impl SyncComponent { + /// Create a new sync component. + pub fn new() -> Self { + Self { + task_handle: None, + command_tx: None, + } + } +} + +impl Default for SyncComponent { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl Component for SyncComponent { + fn name(&self) -> &'static str { + "sync" + } + + async fn start(&mut self, handle: DaemonHandle) -> Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel(16); + self.command_tx = Some(cmd_tx); + + // Spawn the sync loop with its own copy of the handle + self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx))); + + tracing::info!("sync component started"); + Ok(()) + } + + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { + if let DaemonEvent::ForceSync = event { + tracing::info!("force sync requested"); + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::ForceSync).await; + } + } + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + if let Some(tx) = &self.command_tx { + let _ = tx.send(SyncCommand::Stop).await; + } + if let Some(handle) = self.task_handle.take() { + // Give the task a moment to shut down gracefully + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + tracing::info!("sync component stopped"); + Ok(()) + } +} + +/// The main sync loop. +/// +/// This runs in a spawned task and handles periodic sync as well as +/// force sync requests. +async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) { + tracing::info!("sync loop starting"); + + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + let host_id = match Settings::host_id().await { + Ok(id) => id, + Err(e) => { + tracing::error!("failed to get host id, sync disabled: {e}"); + return; + } + }; + + // Create the stores we need + let encryption_key = *handle.encryption_key(); + let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key); + let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key); + let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key); + + // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) + let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = ticker.tick() => { + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + cmd = cmd_rx.recv() => { + match cmd { + Some(SyncCommand::ForceSync) => { + tracing::info!("executing force sync"); + do_sync_tick( + &handle, + &history_store, + &alias_store, + &var_store, + &mut ticker, + max_interval, + ).await; + } + Some(SyncCommand::Stop) | None => { + tracing::info!("sync loop stopping"); + break; + } + } + } + } + } +} + +/// Execute a single sync tick. +async fn do_sync_tick( + handle: &DaemonHandle, + history_store: &HistoryStore, + alias_store: &AliasStore, + var_store: &VarStore, + ticker: &mut time::Interval, + max_interval: f64, +) { + // Clone settings since we need them across await points + let settings = handle.settings().await.clone(); + + tracing::info!("sync tick"); + + // Check if logged in + let logged_in = match settings.logged_in().await { + Ok(v) => v, + Err(e) => { + tracing::warn!("failed to check login status, skipping sync tick: {e}"); + return; + } + }; + + if !logged_in { + tracing::debug!("not logged in, skipping sync tick"); + return; + } + + // Perform the sync + let res = sync::sync(&settings, handle.store()).await; + + match res { + Err(e) => { + tracing::error!("sync tick failed with {e}"); + + // Emit failure event + handle.emit(DaemonEvent::SyncFailed { + error: e.to_string(), + }); + + // Exponential backoff + let mut rng = rand::thread_rng(); + let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + if new_interval > max_interval { + new_interval = max_interval; + } + + *ticker = time::interval(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + + tracing::error!("backing off, next sync tick in {new_interval}"); + } + Ok((uploaded_count, downloaded_records)) => { + tracing::info!( + uploaded = uploaded_count, + downloaded = downloaded_records.len(), + "sync complete" + ); + + // Build history from downloaded records + if let Err(e) = history_store + .incremental_build(handle.history_db(), &downloaded_records) + .await + { + tracing::error!("failed to build history from downloaded records: {e}"); + } + + // Emit the records added event (for search indexing) + handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone())); + + // Emit sync completed event + handle.emit(DaemonEvent::SyncCompleted { + uploaded: uploaded_count as usize, + downloaded: downloaded_records.len(), + }); + + // Rebuild alias and var stores + if let Err(e) = alias_store.build().await { + tracing::error!("failed to rebuild alias store: {e}"); + } + if let Err(e) = var_store.build().await { + tracing::error!("failed to rebuild var store: {e}"); + } + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + } + + // Store sync time + if let Err(e) = Settings::save_sync_time().await { + tracing::error!("failed to save sync time: {e}"); + } + } + } +} diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs new file mode 100644 index 00000000..afb29c57 --- /dev/null +++ b/crates/atuin-daemon/src/control/mod.rs @@ -0,0 +1,12 @@ +//! Control module for external event injection. +//! +//! This module provides the gRPC service that allows external processes +//! (like CLI commands) to inject events into the daemon's event bus. + +mod service; + +// Include the generated proto code +tonic::include_proto!("control"); + +// Re-export the service +pub use service::ControlService; diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs new file mode 100644 index 00000000..2e7403ce --- /dev/null +++ b/crates/atuin-daemon/src/control/service.rs @@ -0,0 +1,71 @@ +//! Control service implementation. +//! +//! This gRPC service allows external processes (like CLI commands) to inject +//! events into the daemon's event bus. + +use atuin_client::history::HistoryId; +use tonic::{Request, Response, Status}; +use tracing::{Level, info, instrument}; + +use super::{ + SendEventRequest, SendEventResponse, + control_server::{Control, ControlServer}, + send_event_request::Event, +}; +use crate::{daemon::DaemonHandle, events::DaemonEvent}; + +/// The Control gRPC service. +/// +/// This service is used by external processes to inject events into the daemon. +/// It's not a component - it's part of the daemon's core infrastructure. +pub struct ControlService { + handle: DaemonHandle, +} + +impl ControlService { + /// Create a new control service with the given daemon handle. + pub fn new(handle: DaemonHandle) -> Self { + Self { handle } + } + + /// Get a tonic server for this service. + pub fn into_server(self) -> ControlServer<Self> { + ControlServer::new(self) + } +} + +#[tonic::async_trait] +impl Control for ControlService { + #[instrument(skip_all, level = Level::INFO, name = "control_send_event")] + async fn send_event( + &self, + request: Request<SendEventRequest>, + ) -> Result<Response<SendEventResponse>, Status> { + let req = request.into_inner(); + + let event = req + .event + .ok_or_else(|| Status::invalid_argument("event is required"))?; + + let daemon_event = proto_event_to_daemon_event(event)?; + + info!(?daemon_event, "received control event"); + self.handle.emit(daemon_event); + + Ok(Response::new(SendEventResponse {})) + } +} + +/// Convert a proto event to a daemon event. +fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> { + match event { + Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned), + Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt), + Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted { + ids: e.ids.into_iter().map(HistoryId).collect(), + }), + Event::ForceSync(_) => Ok(DaemonEvent::ForceSync), + Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded), + Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested), + } +} diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs new file mode 100644 index 00000000..ec0b7b68 --- /dev/null +++ b/crates/atuin-daemon/src/daemon.rs @@ -0,0 +1,450 @@ +//! Core daemon infrastructure. +//! +//! This module provides the foundational types for building the atuin daemon: +//! +//! - [`DaemonState`]: Shared state owned by the daemon +//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state +//! - [`Component`]: A trait for implementing daemon components +//! - [`Daemon`]: The main daemon orchestrator +//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon + +use std::sync::Arc; + +use atuin_client::{ + database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore, + settings::Settings, +}; +use eyre::{Context, Result}; +use tokio::sync::{RwLock, broadcast}; + +use crate::events::DaemonEvent; + +// ============================================================================ +// DaemonState +// ============================================================================ + +/// Shared state owned by the daemon. +/// +/// This contains all the resources that components and services need access to. +/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`]. +pub struct DaemonState { + // Event bus + event_tx: broadcast::Sender<DaemonEvent>, + + // Configuration (mutable - can be reloaded) + settings: RwLock<Settings>, + + // Encryption key (immutable - derived at startup) + encryption_key: [u8; 32], + + // Database handles + history_db: HistoryDatabase, + store: SqliteStore, +} + +// ============================================================================ +// DaemonHandle +// ============================================================================ + +/// A lightweight handle to the daemon's shared state. +/// +/// This is the primary way for components, gRPC services, and spawned tasks to +/// interact with the daemon. It provides access to: +/// +/// - Event emission and subscription +/// - Configuration (settings, encryption key) +/// - Database handles +/// +/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed +/// around to any code that needs daemon access. +/// +/// # Example +/// +/// ```ignore +/// // Emit an event +/// handle.emit(DaemonEvent::HistoryPruned); +/// +/// // Access settings +/// let settings = handle.settings().await; +/// let sync_freq = settings.daemon.sync_frequency; +/// +/// // Access database +/// let history = handle.history_db().load(id).await?; +/// ``` +#[derive(Clone)] +pub struct DaemonHandle { + state: Arc<DaemonState>, +} + +impl DaemonHandle { + // ---- Events ---- + + /// Emit an event to the daemon's event bus. + /// + /// This is fire-and-forget - if no receivers are listening (which shouldn't + /// happen in normal operation), the event is dropped silently. + pub fn emit(&self, event: DaemonEvent) { + if let Err(e) = self.state.event_tx.send(event) { + tracing::warn!("failed to emit event (no receivers?): {e}"); + } + } + + /// Subscribe to the event bus. + /// + /// Returns a receiver that will receive all events emitted after this call. + /// Useful for components that need to listen for events outside of the + /// normal `handle_event` callback flow. + pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> { + self.state.event_tx.subscribe() + } + + /// Request graceful shutdown of the daemon. + pub fn shutdown(&self) { + self.emit(DaemonEvent::ShutdownRequested); + } + + // ---- Configuration ---- + + /// Get the current settings. + /// + /// This acquires a read lock on the settings. For most use cases, clone + /// the settings if you need to hold onto them. + pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> { + self.state.settings.read().await + } + + /// Reload settings from disk and emit a SettingsReloaded event. + /// + /// Components listening for `SettingsReloaded` can then re-read settings + /// via `handle.settings()` to pick up the changes. + pub async fn reload_settings(&self) -> Result<()> { + let new_settings = Settings::new()?; + *self.state.settings.write().await = new_settings; + self.emit(DaemonEvent::SettingsReloaded); + tracing::info!("settings reloaded"); + Ok(()) + } + + /// Get the encryption key. + pub fn encryption_key(&self) -> &[u8; 32] { + &self.state.encryption_key + } + + // ---- Database ---- + + /// Get a reference to the history database. + pub fn history_db(&self) -> &HistoryDatabase { + &self.state.history_db + } + + /// Get a reference to the record store. + pub fn store(&self) -> &SqliteStore { + &self.state.store + } +} + +impl std::fmt::Debug for DaemonHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DaemonHandle").finish_non_exhaustive() + } +} + +// ============================================================================ +// Component Trait +// ============================================================================ + +/// A daemon component that handles a specific domain. +/// +/// Components are the building blocks of the daemon. Each component: +/// +/// - Has a unique name for logging and debugging +/// - Can optionally expose gRPC services +/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources +/// - Handles events from the event bus +/// - Performs cleanup on shutdown +/// +/// # Lifecycle +/// +/// 1. **Construction**: Component is created (usually via `new()`) +/// 2. **Start**: `start()` is called with a [`DaemonHandle`] +/// 3. **Running**: `handle_event()` is called for each event on the bus +/// 4. **Shutdown**: `stop()` is called for cleanup +/// +/// # Example +/// +/// ```ignore +/// pub struct MyComponent { +/// handle: Option<DaemonHandle>, +/// } +/// +/// #[async_trait] +/// impl Component for MyComponent { +/// fn name(&self) -> &'static str { "my-component" } +/// +/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> { +/// self.handle = Some(handle); +/// Ok(()) +/// } +/// +/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> { +/// match event { +/// DaemonEvent::SomeEvent => { +/// // Handle the event +/// if let Some(handle) = &self.handle { +/// handle.emit(DaemonEvent::ResponseEvent); +/// } +/// } +/// _ => {} +/// } +/// Ok(()) +/// } +/// +/// async fn stop(&mut self) -> Result<()> { +/// Ok(()) +/// } +/// } +/// ``` +#[tonic::async_trait] +pub trait Component: Send + Sync { + /// Human-readable name for logging and debugging. + fn name(&self) -> &'static str; + + /// Called once at startup. + /// + /// Store the handle if you need to emit events or access daemon resources + /// later. The handle is cheaply cloneable, so feel free to clone it for + /// spawned tasks. + async fn start(&mut self, handle: DaemonHandle) -> Result<()>; + + /// Handle an incoming event. + /// + /// Called for every event on the bus. To emit new events in response, + /// use the handle stored during `start()`. Events emitted here will be + /// processed in subsequent event loop iterations. + async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>; + + /// Called on graceful shutdown. + /// + /// Use this to clean up resources, abort spawned tasks, etc. + async fn stop(&mut self) -> Result<()>; +} + +// ============================================================================ +// Daemon +// ============================================================================ + +/// The main daemon orchestrator. +/// +/// The daemon manages components, runs the event loop, and coordinates startup +/// and shutdown. It is constructed via [`DaemonBuilder`]. +/// +/// # Event Loop +/// +/// The daemon runs a simple event loop: +/// +/// 1. Wait for an event on the bus +/// 2. Dispatch the event to all components (in registration order) +/// 3. Components may emit new events in response +/// 4. Repeat until `ShutdownRequested` is received +/// +/// Events emitted during handling are queued and processed in subsequent +/// iterations, ensuring the loop eventually drains. +pub struct Daemon { + components: Vec<Box<dyn Component>>, + handle: DaemonHandle, +} + +impl Daemon { + /// Create a new daemon builder. + pub fn builder(settings: Settings) -> DaemonBuilder { + DaemonBuilder::new(settings) + } + + /// Get a clone of the daemon handle. + /// + /// The handle can be used to emit events, access settings, etc. + pub fn handle(&self) -> DaemonHandle { + self.handle.clone() + } + + /// Start all components. + /// + /// This must be called before `run_event_loop()`. It initializes all + /// registered components with the daemon handle. + pub async fn start_components(&mut self) -> Result<()> { + for component in &mut self.components { + tracing::info!(component = component.name(), "starting component"); + component + .start(self.handle.clone()) + .await + .with_context(|| format!("failed to start component: {}", component.name()))?; + } + Ok(()) + } + + /// Run the daemon event loop. + /// + /// This processes events until a ShutdownRequested event is received. + /// Components must be started first via `start_components()`. + pub async fn run_event_loop(&mut self) -> Result<()> { + let mut event_rx = self.handle.subscribe(); + loop { + match event_rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => { + tracing::info!("shutdown requested, stopping daemon"); + break; + } + Ok(event) => { + tracing::debug!(?event, "processing event"); + self.dispatch_event(&event).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "event receiver lagged, some events were dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::info!("event bus closed, stopping daemon"); + break; + } + } + } + Ok(()) + } + + /// Stop all components. + /// + /// This performs graceful shutdown of all components. + pub async fn stop_components(&mut self) { + for component in &mut self.components { + tracing::info!(component = component.name(), "stopping component"); + if let Err(e) = component.stop().await { + tracing::error!( + component = component.name(), + error = ?e, + "error stopping component" + ); + } + } + tracing::info!("all components stopped"); + } + + /// Run the daemon. + /// + /// This is a convenience method that starts components, runs the event loop, + /// and handles shutdown. It does not return until the daemon is shut down. + pub async fn run(mut self) -> Result<()> { + self.start_components().await?; + self.run_event_loop().await?; + self.stop_components().await; + tracing::info!("daemon stopped"); + Ok(()) + } + + async fn dispatch_event(&mut self, event: &DaemonEvent) { + for component in &mut self.components { + if let Err(e) = component.handle_event(event).await { + tracing::error!( + component = component.name(), + error = ?e, + "error handling event" + ); + } + } + } +} + +// ============================================================================ +// DaemonBuilder +// ============================================================================ + +/// Builder for constructing a [`Daemon`]. +/// +/// # Example +/// +/// ```ignore +/// let daemon = Daemon::builder(settings) +/// .store(store) +/// .history_db(history_db) +/// .component(HistoryComponent::new()) +/// .component(SearchComponent::new()) +/// .component(SyncComponent::new()) +/// .build() +/// .await?; +/// +/// daemon.run().await?; +/// ``` +pub struct DaemonBuilder { + settings: Settings, + store: Option<SqliteStore>, + history_db: Option<HistoryDatabase>, + components: Vec<Box<dyn Component>>, +} + +impl DaemonBuilder { + /// Create a new daemon builder with the given settings. + pub fn new(settings: Settings) -> Self { + Self { + settings, + store: None, + history_db: None, + components: Vec::new(), + } + } + + /// Set the record store. + pub fn store(mut self, store: SqliteStore) -> Self { + self.store = Some(store); + self + } + + /// Set the history database. + pub fn history_db(mut self, db: HistoryDatabase) -> Self { + self.history_db = Some(db); + self + } + + /// Register a component. + /// + /// Components are started in registration order and stopped in reverse order. + pub fn component(mut self, component: impl Component + 'static) -> Self { + self.components.push(Box::new(component)); + self + } + + /// Build the daemon. + /// + /// This loads the encryption key and creates the daemon state. + pub async fn build(self) -> Result<Daemon> { + let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?; + let history_db = self + .history_db + .ok_or_else(|| eyre::eyre!("history_db is required"))?; + + // Load encryption key + let encryption_key: [u8; 32] = encryption::load_key(&self.settings) + .context("could not load encryption key")? + .into(); + + // Create the event bus + let (event_tx, _) = broadcast::channel(64); + + // Create the shared state + let state = Arc::new(DaemonState { + event_tx, + settings: RwLock::new(self.settings), + encryption_key, + history_db, + store, + }); + + // Create the handle (just a reference to the state) + let handle = DaemonHandle { state }; + + Ok(Daemon { + components: self.components, + handle, + }) + } +} diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs new file mode 100644 index 00000000..4e6c6ff3 --- /dev/null +++ b/crates/atuin-daemon/src/events.rs @@ -0,0 +1,74 @@ +//! Daemon events. +//! +//! Events are the primary communication mechanism within the daemon. +//! Components emit events to notify others of state changes, and handle +//! events to react to changes elsewhere in the system. +//! +//! External processes (like CLI commands) can also inject events via the +//! Control gRPC service. + +use atuin_client::history::{History, HistoryId}; +use atuin_common::record::RecordId; + +/// Events that flow through the daemon's event bus. +/// +/// Events are broadcast to all components. Each component decides which +/// events it cares about in its `handle_event` implementation. +#[derive(Debug, Clone)] +pub enum DaemonEvent { + // ---- History lifecycle ---- + /// A command has started running. + HistoryStarted(History), + + /// A command has finished running. + HistoryEnded(History), + + // ---- Sync ---- + /// Records were synced from the server. + /// + /// The search component uses this to update its index with new history. + RecordsAdded(Vec<RecordId>), + + /// Sync completed successfully. + SyncCompleted { + /// Number of records uploaded. + uploaded: usize, + /// Number of records downloaded. + downloaded: usize, + }, + + /// Sync failed. + SyncFailed { + /// Error message describing what went wrong. + error: String, + }, + + /// Request an immediate sync (external trigger). + ForceSync, + + // ---- External commands ---- + /// History was pruned - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin history prune` or similar. + HistoryPruned, + + /// History was rebuilt - search index needs a full rebuild. + /// + /// Emitted when the user runs `atuin store rebuild history` or similar. + HistoryRebuilt, + + /// Specific history items were deleted. + /// + /// The search component should remove these from its index. + HistoryDeleted { + /// IDs of the deleted history entries. + ids: Vec<HistoryId>, + }, + + /// Settings have changed, components should reload if needed. + SettingsReloaded, + + // ---- Lifecycle ---- + /// Request graceful shutdown of the daemon. + ShutdownRequested, +} diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs deleted file mode 100644 index 57f5b2cf..00000000 --- a/crates/atuin-daemon/src/history.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs new file mode 100644 index 00000000..b71853df --- /dev/null +++ b/crates/atuin-daemon/src/history/mod.rs @@ -0,0 +1,6 @@ +//! History module for the daemon gRPC history service. +//! +//! This module contains the proto-generated types for the history gRPC service. + +// Include the generated proto code +tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs index e00060bc..6dc04db3 100644 --- a/crates/atuin-daemon/src/lib.rs +++ b/crates/atuin-daemon/src/lib.rs @@ -1,3 +1,110 @@ +use atuin_client::database::Sqlite as HistoryDatabase; +use atuin_client::{record::sqlite_store::SqliteStore, settings::Settings}; +use eyre::Result; + pub mod client; +pub mod components; +pub mod control; +pub mod daemon; +pub mod events; pub mod history; +pub mod search; pub mod server; + +// Re-export core daemon types for convenience +pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle}; +pub use events::DaemonEvent; + +// Re-export components +pub use components::{HistoryComponent, SearchComponent, SyncComponent}; + +// Re-export client helpers +pub use client::{ControlClient, emit_event, emit_event_with_settings}; + +/// Boot the daemon using the new component-based architecture. +/// +/// This creates a daemon with the standard components (history, search, sync), +/// starts the gRPC server with their services, and runs the event loop. +pub async fn boot( + settings: Settings, + store: SqliteStore, + history_db: HistoryDatabase, +) -> Result<()> { + // Create the components + let history_component = HistoryComponent::new(); + let search_component = SearchComponent::new(); + let sync_component = SyncComponent::new(); + + // Get the gRPC services before moving components into the daemon + // (The services share state with the components via Arc) + let history_service = history_component.grpc_service(); + let search_service = search_component.grpc_service(); + + // Build the daemon + let mut daemon = Daemon::builder(settings.clone()) + .store(store) + .history_db(history_db) + .component(history_component) + .component(search_component) + .component(sync_component) + .build() + .await?; + + // Get a handle for the control service and gRPC server shutdown + let handle = daemon.handle(); + + // Create the control service + let control_service = control::ControlService::new(handle.clone()); + + // Start all components first (so gRPC services can work) + daemon.start_components().await?; + + // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM + let signal_handle = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + tracing::info!("received shutdown signal"); + signal_handle.shutdown(); + }); + + // Start the gRPC server in the background + server::run_grpc_server( + settings, + history_service, + search_service, + control_service.into_server(), + handle, + ) + .await?; + + // Run the daemon event loop + daemon.run_event_loop().await?; + + // Stop all components on shutdown + daemon.stop_components().await; + + tracing::info!("daemon shut down complete"); + Ok(()) +} + +/// Wait for a shutdown signal (Ctrl+C or SIGTERM). +#[cfg(unix)] +async fn shutdown_signal() { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register sigterm handler"); + let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register sigint handler"); + + tokio::select! { + _ = term.recv() => {}, + _ = int.recv() => {}, + } +} + +/// Wait for a shutdown signal (Ctrl+C). +#[cfg(not(unix))] +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c"); +} diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs new file mode 100644 index 00000000..b15b057f --- /dev/null +++ b/crates/atuin-daemon/src/search/index.rs @@ -0,0 +1,572 @@ +//! Search index with frecency-based ranking. +//! +//! This module provides a deduplicated search index where each unique command +//! is stored once, with metadata about all its invocations. This enables: +//! +//! - Efficient fuzzy matching (fewer items to match) +//! - Frecency-based ranking (frequency + recency) +//! - Dynamic filtering by directory, host, session, etc. + +use std::{collections::HashMap, sync::Arc}; + +use atuin_client::history::History; +use dashmap::{DashMap, DashSet}; +use nucleo::{Injector, Nucleo, pattern}; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{Level, instrument}; + +use crate::components::search::with_trailing_slash; + +/// Data for a single invocation of a command. +#[derive(Debug, Clone)] +pub struct Invocation { + /// When the command was run. + pub timestamp: i64, + /// The working directory when the command was run. + #[allow(dead_code)] + pub cwd: String, + /// The hostname where the command was run. + #[allow(dead_code)] + pub hostname: String, + /// The session ID. + #[allow(dead_code)] + pub session: String, + /// The history entry ID (for returning in search results). + pub history_id: String, +} + +impl From<&History> for Invocation { + fn from(history: &History) -> Self { + Self { + timestamp: history.timestamp.unix_timestamp(), + cwd: history.cwd.clone(), + hostname: history.hostname.clone(), + session: history.session.clone(), + history_id: history.id.0.clone(), + } + } +} + +/// Pre-computed frecency data for O(1) lookup. +#[derive(Debug, Clone, Default)] +pub struct FrecencyData { + /// Total number of times this command was used. + pub count: u32, + /// Most recent usage timestamp (unix seconds). + pub last_used: i64, +} + +impl FrecencyData { + /// Record a new usage of this command. + pub fn record_use(&mut self, timestamp: i64) { + self.count += 1; + if timestamp > self.last_used { + self.last_used = timestamp; + } + } + + /// Compute frecency score based on count and recency. + /// + /// Uses a decay function where more recent commands score higher. + /// The formula balances frequency (how often) with recency (how recent). + #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")] + pub fn compute(&self, now: i64) -> u32 { + if self.count == 0 { + return 0; + } + + // Time-based decay: score decreases as time passes + let age_seconds = (now - self.last_used).max(0) as u64; + let age_hours = age_seconds / 3600; + + // Decay factor: recent commands get higher scores + // - Last hour: multiplier ~1.0 + // - Last day: multiplier ~0.5 + // - Last week: multiplier ~0.1 + // - Older: multiplier approaches 0 + let recency_score = match age_hours { + 0 => 100, + 1..=6 => 90, + 7..=24 => 70, + 25..=72 => 50, + 73..=168 => 30, + 169..=720 => 15, + _ => 5, + }; + + // Frequency boost: more uses = higher score (with diminishing returns) + let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0) as u32; + + // Combined score + recency_score + frequency_score + } +} + +/// Data for a unique command, including all its invocations. +pub struct CommandData { + /// The command text (stored for debugging/logging purposes). + #[allow(dead_code)] + pub command: String, + /// All invocations of this command, sorted by timestamp (newest first). + pub invocations: Vec<Invocation>, + /// Pre-computed global frecency. + pub global_frecency: FrecencyData, + + // Pre-computed indexes for O(1) filter lookups + /// All directories where this command has been run. + directories: DashSet<String>, + /// All hostnames where this command has been run. + hosts: DashSet<String>, + /// All sessions where this command has been run. + sessions: DashSet<String>, +} + +impl CommandData { + /// Create a new CommandData from a history entry. + pub fn new(history: &History) -> Self { + let mut data = Self { + command: history.command.clone(), + invocations: Vec::new(), + global_frecency: FrecencyData::default(), + directories: DashSet::new(), + hosts: DashSet::new(), + sessions: DashSet::new(), + }; + data.add_invocation(history); + data + } + + /// Add an invocation from a history entry. + pub fn add_invocation(&mut self, history: &History) { + let timestamp = history.timestamp.unix_timestamp(); + + // Update global frecency + self.global_frecency.record_use(timestamp); + + // Update pre-computed indexes for O(1) filter lookups + self.directories.insert(with_trailing_slash(&history.cwd)); + self.hosts.insert(history.hostname.clone()); + self.sessions.insert(history.session.clone()); + + let invocation = Invocation::from(history); + + // Insert sorted by timestamp (newest first) + let pos = self + .invocations + .iter() + .position(|inv| inv.timestamp < timestamp) + .unwrap_or(self.invocations.len()); + self.invocations.insert(pos, invocation); + } + + /// Get the most recent history ID for this command. + pub fn most_recent_id(&self) -> Option<&str> { + self.invocations.first().map(|inv| inv.history_id.as_str()) + } + + /// Check if any invocation matches a directory filter (exact match). + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_dir(&self, dir: &str) -> bool { + self.directories.contains(dir) + } + + /// Check if any invocation matches a directory prefix (workspace/git root). + /// O(n) where n = number of unique directories for this command. + pub fn has_invocation_in_workspace(&self, prefix: &str) -> bool { + self.directories.iter().any(|d| d.starts_with(prefix)) + } + + /// Check if any invocation matches a hostname. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_on_host(&self, hostname: &str) -> bool { + self.hosts.contains(hostname) + } + + /// Check if any invocation matches a session. + /// O(1) lookup using pre-computed index. + pub fn has_invocation_in_session(&self, session: &str) -> bool { + self.sessions.contains(session) + } +} + +/// Filter mode for search queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum IndexFilterMode { + /// No filtering - search all commands. + Global, + /// Filter to commands run in a specific directory. + Directory(String), + /// Filter to commands run in a workspace (directory prefix). + Workspace(String), + /// Filter to commands run on a specific host. + Host(String), + /// Filter to commands run in a specific session. + Session(String), +} + +/// Context for search queries. +#[derive(Debug, Clone, Default)] +pub struct QueryContext { + pub cwd: Option<String>, + pub git_root: Option<String>, + pub hostname: Option<String>, + pub session_id: Option<String>, +} + +/// A deduplicated search index with frecency-based ranking. +/// +/// Commands are stored by their text, with metadata about all invocations. +/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback. +/// +/// Global frecency is precomputed by a background task and used for scoring. +/// If frecency data is not available, search still works but without frecency ranking; +/// although this should never happen due to precomputing the frecency map. +pub struct SearchIndex { + /// Map from command text to command data. + /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer. + commands: Arc<DashMap<String, CommandData>>, + /// Nucleo fuzzy matcher - items are command strings. + nucleo: RwLock<Nucleo<String>>, + /// Injector for adding new commands to Nucleo. + injector: Injector<String>, + /// Precomputed global frecency map (command -> frecency score). + /// Updated by background task. If None, search works without frecency. + frecency_map: RwLock<Option<Arc<HashMap<String, u32>>>>, +} + +impl SearchIndex { + /// Create a new empty search index. + pub fn new() -> Self { + let nucleo_config = nucleo::Config::DEFAULT; + // Single column for command text + let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1); + let injector = nucleo.injector(); + + Self { + commands: Arc::new(DashMap::new()), + nucleo: RwLock::new(nucleo), + injector, + frecency_map: RwLock::new(None), + } + } + + /// Add a history entry to the index. + /// + /// If the command already exists, updates its invocation data. + /// If it's a new command, adds it to both the map and Nucleo. + pub fn add_history(&self, history: &History) { + let command = &history.command; + + if let Some(mut entry) = self.commands.get_mut(command) { + // Existing command - just update invocations + entry.add_invocation(history); + } else { + // New command - add to both map and Nucleo + let data = CommandData::new(history); + self.commands.insert(command.clone(), data); + self.injector.push(command.clone(), |cmd, cols| { + cols[0] = cmd.clone().into(); + }); + } + // Note: frecency_map is rebuilt by background task, not invalidated here + } + + /// Add multiple history entries to the index. + pub fn add_histories(&self, histories: &[History]) { + for history in histories { + self.add_history(history); + } + } + + /// Get the number of unique commands in the index. + pub fn command_count(&self) -> usize { + self.commands.len() + } + + /// Get the number of items in Nucleo (should match command_count). + pub async fn nucleo_item_count(&self) -> u32 { + self.nucleo.read().await.snapshot().item_count() + } + + /// Search for commands matching a query. + /// + /// Returns a list of history IDs (most recent invocation per command). + /// Uses precomputed global frecency for scoring if available. + #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))] + pub async fn search( + &self, + query: &str, + filter_mode: IndexFilterMode, + _context: &QueryContext, + limit: u32, + ) -> Vec<String> { + let mut nucleo = self.nucleo.write().await; + + // Get precomputed frecency map (may be None if not yet computed) + let frecency_map = self.frecency_map.read().await.clone(); + + // Build filter based on mode + let filter = self.build_filter(&filter_mode); + nucleo.set_filter(filter); + + // Build scorer from precomputed frecency (or None if not available) + let scorer = Self::build_scorer(frecency_map); + nucleo.set_scorer(scorer); + + // Update pattern + nucleo.pattern.reparse( + 0, + query, + pattern::CaseMatching::Smart, + pattern::Normalization::Smart, + false, + ); + + tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| { + // Tick until complete + while nucleo.tick(10).running {} + }); + + // Collect results + let snapshot = nucleo.snapshot(); + let matched_count = snapshot.matched_item_count().min(limit); + + tracing::span!(Level::TRACE, "index_search_results").in_scope(|| { + snapshot + .matched_items(..matched_count) + .filter_map(|item| { + let cmd = item.data; + self.commands + .get(cmd) + .and_then(|data| data.most_recent_id().map(|s| s.to_string())) + }) + .collect() + }) + } + + /// Rebuild the global frecency map. + /// + /// This should be called by a background task periodically. + /// The map is used for scoring search results. + #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")] + pub async fn rebuild_frecency(&self) { + let now = OffsetDateTime::now_utc().unix_timestamp(); + let mut frecency_map: HashMap<String, u32> = HashMap::new(); + + for entry in self.commands.iter() { + let frecency = entry.global_frecency.compute(now); + frecency_map.insert(entry.key().clone(), frecency); + } + + *self.frecency_map.write().await = Some(Arc::new(frecency_map)); + } + + /// Build filter predicate for the given mode. + fn build_filter(&self, mode: &IndexFilterMode) -> Option<nucleo::Filter<String>> { + // For Global mode, no filter needed + if matches!(mode, IndexFilterMode::Global) { + return None; + } + + // Pre-compute which commands pass the filter + let passing_commands: Arc<std::collections::HashSet<String>> = { + let mut set = std::collections::HashSet::new(); + for entry in self.commands.iter() { + let passes = match mode { + IndexFilterMode::Global => unreachable!(), + IndexFilterMode::Directory(dir) => entry.has_invocation_in_dir(dir), + IndexFilterMode::Workspace(prefix) => entry.has_invocation_in_workspace(prefix), + IndexFilterMode::Host(hostname) => entry.has_invocation_on_host(hostname), + IndexFilterMode::Session(session) => entry.has_invocation_in_session(session), + }; + if passes { + set.insert(entry.key().clone()); + } + } + Arc::new(set) + }; + + Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd))) + } + + /// Build scorer from precomputed frecency map. + /// + /// Returns None if frecency map is not available (search still works, just without frecency ranking). + fn build_scorer( + frecency_map: Option<Arc<HashMap<String, u32>>>, + ) -> Option<nucleo::Scorer<String>> { + let map = frecency_map?; + Some(Arc::new(move |cmd: &String, fuzzy_score: u32| { + let frecency = map.get(cmd).copied().unwrap_or(0); + fuzzy_score + frecency + })) + } +} + +impl Default for SearchIndex { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::macros::datetime; + + fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History { + History::import() + .timestamp(timestamp) + .command(command) + .cwd(cwd) + .build() + .into() + } + + #[test] + fn frecency_data_compute() { + let now = 1000000i64; + + // Recent command + let recent = FrecencyData { + count: 5, + last_used: now - 60, // 1 minute ago + }; + assert!(recent.compute(now) > 100); // High score + + // Old command + let old = FrecencyData { + count: 5, + last_used: now - 86400 * 30, // 30 days ago + }; + assert!(old.compute(now) < recent.compute(now)); + + // Frequently used old command + let frequent_old = FrecencyData { + count: 100, + last_used: now - 86400 * 7, // 1 week ago + }; + // Should still have decent score due to frequency + assert!(frequent_old.compute(now) > 50); + } + + #[test] + fn command_data_add_invocation() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&history1); + assert_eq!(data.invocations.len(), 1); + assert_eq!(data.global_frecency.count, 1); + + data.add_invocation(&history2); + assert_eq!(data.invocations.len(), 2); + assert_eq!(data.global_frecency.count, 2); + + // Most recent should be first + assert_eq!(data.invocations[0].cwd, dir2); + assert_eq!(data.invocations[1].cwd, dir1); + } + + #[test] + fn command_data_filters() { + let (dir1, dir2) = if cfg!(windows) { + ("C:\\Users\\User\\project", "C:\\Users\\User\\other") + } else { + ("/home/user/project", "/home/user/other") + }; + + let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC)); + let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC)); + + let mut data = CommandData::new(&h1); + data.add_invocation(&h2); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User\\project"), + with_trailing_slash("C:\\Users\\User\\other"), + with_trailing_slash("C:\\Users\\User\\missing"), + ) + } else { + ( + with_trailing_slash("/home/user/project"), + with_trailing_slash("/home/user/other"), + with_trailing_slash("/home/user/missing"), + ) + }; + + assert!(data.has_invocation_in_dir(&check1)); + assert!(data.has_invocation_in_dir(&check2)); + assert!(!data.has_invocation_in_dir(&check3)); + + let (check1, check2, check3) = if cfg!(windows) { + ( + with_trailing_slash("C:\\Users\\User"), + with_trailing_slash("C:\\Users"), + with_trailing_slash("C:\\Users\\User\\var"), + ) + } else { + ( + with_trailing_slash("/home/user"), + with_trailing_slash("/home"), + with_trailing_slash("/var"), + ) + }; + + assert!(data.has_invocation_in_workspace(&check1)); + assert!(data.has_invocation_in_workspace(&check2)); + assert!(!data.has_invocation_in_workspace(&check3)); + } + + #[tokio::test] + async fn search_index_add_and_search() { + let index = SearchIndex::new(); + + let h1 = make_history( + "git status", + "/home/user/project", + datetime!(2024-01-01 10:00 UTC), + ); + let h2 = make_history( + "git commit -m 'test'", + "/home/user/project", + datetime!(2024-01-01 10:05 UTC), + ); + let h3 = make_history( + "ls -la", + "/home/user/other", + datetime!(2024-01-01 10:10 UTC), + ); + + index.add_history(&h1); + index.add_history(&h2); + index.add_history(&h3); + + assert_eq!(index.command_count(), 3); + + // Search for "git" - should match 2 commands + let results = index + .search("git", IndexFilterMode::Global, &QueryContext::default(), 10) + .await; + assert_eq!(results.len(), 2); + + // Search with directory filter + let results = index + .search( + "", + IndexFilterMode::Directory(with_trailing_slash("/home/user/project")), + &QueryContext::default(), + 10, + ) + .await; + assert_eq!(results.len(), 2); // git status and git commit + } +} diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs new file mode 100644 index 00000000..4d261956 --- /dev/null +++ b/crates/atuin-daemon/src/search/mod.rs @@ -0,0 +1,11 @@ +//! Search module for the daemon gRPC search service. +//! +//! This module provides fuzzy search over command history using Nucleo. + +mod index; + +// Include the generated proto code +tonic::include_proto!("search"); + +// Re-export the service and index +pub use index::{IndexFilterMode, QueryContext, SearchIndex}; diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs index 826d6191..a11de612 100644 --- a/crates/atuin-daemon/src/server.rs +++ b/crates/atuin-daemon/src/server.rs @@ -1,249 +1,49 @@ -use eyre::WrapErr; - -use atuin_client::encryption; -use atuin_client::history::store::HistoryStore; -use atuin_client::record::sqlite_store::SqliteStore; -use atuin_client::settings::Settings; -use std::io::ErrorKind; -#[cfg(unix)] -use std::path::PathBuf; -use std::sync::Arc; -use time::OffsetDateTime; -use tokio::sync::watch; -use tracing::{Level, instrument}; - -use atuin_client::database::{Database, Sqlite as HistoryDatabase}; -use atuin_client::history::{History, HistoryId}; -use dashmap::DashMap; use eyre::Result; -use tonic::{Request, Response, Status, transport::Server}; - -use crate::history::history_server::{History as HistorySvc, HistoryServer}; - -use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest}; -use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest}; - -mod sync; - -const DAEMON_PROTOCOL_VERSION: u32 = 1; - -#[derive(Debug)] -pub struct HistoryService { - // A store for WIP history - // This is history that has not yet been completed, aka a command that's current running. - running: Arc<DashMap<HistoryId, History>>, - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender<bool>, -} - -impl HistoryService { - pub fn new( - store: HistoryStore, - history_db: HistoryDatabase, - shutdown_tx: watch::Sender<bool>, - ) -> Self { - Self { - running: Arc::new(DashMap::new()), - store, - history_db, - shutdown_tx, - } - } -} - -#[tonic::async_trait()] -impl HistorySvc for HistoryService { - #[instrument(skip_all, level = Level::INFO)] - async fn start_history( - &self, - request: Request<StartHistoryRequest>, - ) -> Result<Response<StartHistoryReply>, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let timestamp = - OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { - Status::invalid_argument( - "failed to parse timestamp as unix time (expected nanos since epoch)", - ) - })?; - - let mut h: History = History::daemon() - .timestamp(timestamp) - .command(req.command) - .cwd(req.cwd) - .session(req.session) - .hostname(req.hostname) - .build() - .into(); - if !req.author.trim().is_empty() { - h.author = req.author; - } - if !req.intent.trim().is_empty() { - h.intent = Some(req.intent); - } - - // The old behaviour had us inserting half-finished history records into the database - // The new behaviour no longer allows that. - // History that's running is stored in-memory by the daemon, and only committed when - // complete. - // If anyone relied on the old behaviour, we could perhaps insert to the history db here - // too. I'd rather keep it pure, unless that ends up being the case. - let id = h.id.clone(); - tracing::info!(id = id.to_string(), "start history"); - running.insert(id.clone(), h); - - let reply = StartHistoryReply { - id: id.to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn end_history( - &self, - request: Request<EndHistoryRequest>, - ) -> Result<Response<EndHistoryReply>, Status> { - let running = self.running.clone(); - let req = request.into_inner(); - - let id = HistoryId(req.id); - - if let Some((_, mut history)) = running.remove(&id) { - history.exit = req.exit; - history.duration = match req.duration { - 0 => i64::try_from( - (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), - ) - .expect("failed to convert calculated duration to i64"), - value => i64::try_from(value).expect("failed to get i64 duration"), - }; - - // Perhaps allow the incremental build to handle this entirely. - self.history_db - .save(&history) - .await - .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; - - tracing::info!( - id = id.0.to_string(), - duration = history.duration, - "end history" - ); - - let (id, idx) = - self.store.push(history).await.map_err(|e| { - Status::internal(format!("failed to push record to store: {e:?}")) - })?; - - let reply = EndHistoryReply { - id: id.0.to_string(), - idx, - version: env!("CARGO_PKG_VERSION").to_string(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - return Ok(Response::new(reply)); - } - - Err(Status::not_found(format!( - "could not find history with id: {id}" - ))) - } - - #[instrument(skip_all, level = Level::INFO)] - async fn status( - &self, - _request: Request<StatusRequest>, - ) -> Result<Response<StatusReply>, Status> { - let reply = StatusReply { - // If status RPC responds, the daemon control plane is healthy. - healthy: true, - version: env!("CARGO_PKG_VERSION").to_string(), - pid: std::process::id(), - protocol: DAEMON_PROTOCOL_VERSION, - }; - - Ok(Response::new(reply)) - } - #[instrument(skip_all, level = Level::INFO)] - async fn shutdown( - &self, - _request: Request<ShutdownRequest>, - ) -> Result<Response<ShutdownReply>, Status> { - let _ = self.shutdown_tx.send(true); - Ok(Response::new(ShutdownReply { accepted: true })) - } -} - -#[cfg(unix)] -async fn shutdown_signal(socket: Option<PathBuf>, mut shutdown_rx: watch::Receiver<bool>) { - let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to register sigterm handler"); - let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("failed to register sigint handler"); +use crate::components::history::HistoryGrpcService; +use crate::components::search::SearchGrpcService; +use crate::control::{ControlService, control_server::ControlServer}; +use crate::daemon::DaemonHandle; +use crate::history::history_server::HistoryServer; +use crate::search::search_server::SearchServer; - tokio::select! { - _ = term.recv() => {}, - _ = int.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - - eprintln!("Removing socket..."); - if let Some(socket) = socket { - match std::fs::remove_file(socket) { - Ok(()) => {} - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - eprintln!("failed to remove socket: {err}"); - } - } - } - eprintln!("Shutting down..."); -} - -#[cfg(windows)] -async fn shutdown_signal(mut shutdown_rx: watch::Receiver<bool>) { - let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler"); - tokio::select! { - _ = ctrl_c.recv() => {}, - _ = shutdown_rx.changed() => {}, - } - eprintln!("Shutting down..."); -} +use atuin_client::settings::Settings; +/// Run the gRPC server with the given services. +/// +/// This starts the gRPC server in the background and returns immediately. +/// The server will shut down when a ShutdownRequested event is received. #[cfg(unix)] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver<bool>, + history_service: HistoryServer<HistoryGrpcService>, + search_service: SearchServer<SearchGrpcService>, + control_service: ControlServer<ControlService>, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; - let socket_path = settings.daemon.socket_path; + let socket_path = settings.daemon.socket_path.clone(); let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket { #[cfg(target_os = "linux")] { - use eyre::OptionExt; + use eyre::{OptionExt, WrapErr}; + use std::os::unix::net::SocketAddr; + use std::path::PathBuf; tracing::info!("getting systemd socket"); let listener = listenfd::ListenFd::from_env() .take_unix_listener(0)? .ok_or_eyre("missing systemd socket")?; listener.set_nonblocking(true)?; - let actual_path = listener + let actual_path: Result<PathBuf, eyre::Report> = listener .local_addr() .context("getting systemd socket's path") - .and_then(|addr| { + .and_then(|addr: SocketAddr| { addr.as_pathname() .ok_or_eyre("systemd socket missing path") - .map(|path| path.to_owned()) + .map(|path: &std::path::Path| path.to_owned()) }); match actual_path { Ok(actual_path) => { @@ -271,66 +71,94 @@ async fn start_server( let uds_stream = UnixListenerStream::new(uds); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown( - uds_stream, - shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx), - ) - .await?; + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + let mut rx = handle.subscribe(); + loop { + use crate::DaemonEvent; + + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + if cleanup { + eprintln!("Removing socket..."); + if let Err(e) = std::fs::remove_file(&socket_path) + && e.kind() != std::io::ErrorKind::NotFound + { + eprintln!("failed to remove socket: {e}"); + } + } + eprintln!("Shutting down gRPC server..."); + }; + + // Spawn the server in the background + tokio::spawn(async move { + use tonic::transport::Server; + + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(uds_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); Ok(()) } +/// Run the gRPC server with the given services (Windows/TCP version). #[cfg(not(unix))] -async fn start_server( +pub async fn run_grpc_server( settings: Settings, - history: HistoryService, - shutdown_rx: watch::Receiver<bool>, + history_service: HistoryServer<HistoryGrpcService>, + search_service: SearchServer<SearchGrpcService>, + control_service: ControlServer<ControlService>, + handle: DaemonHandle, ) -> Result<()> { use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; + use tonic::transport::Server; let port = settings.daemon.tcp_port; let url = format!("127.0.0.1:{port}"); - let tcp = TcpListener::bind(url).await?; + let tcp = TcpListener::bind(&url).await?; let tcp_stream = TcpListenerStream::new(tcp); tracing::info!("listening on tcp port {:?}", port); - Server::builder() - .add_service(HistoryServer::new(history)) - .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx)) - .await?; - Ok(()) -} - -// break the above down when we end up with multiple services - -/// Listen on a unix socket -/// Pass the path to the socket -pub async fn listen( - settings: Settings, - store: SqliteStore, - history_db: HistoryDatabase, -) -> Result<()> { - let encryption_key: [u8; 32] = encryption::load_key(&settings) - .context("could not load encryption key")? - .into(); - - let host_id = Settings::host_id().await?; - let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + // Create shutdown signal from daemon handle + let shutdown_signal = async move { + use crate::DaemonEvent; - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx); + let mut rx = handle.subscribe(); + loop { + match rx.recv().await { + Ok(DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, // Channel closed + } + } + eprintln!("Shutting down gRPC server..."); + }; - // start services - tokio::spawn(sync::worker( - settings.clone(), - store, - history_store, - history_db, - )); + // Spawn the server in the background + tokio::spawn(async move { + if let Err(e) = Server::builder() + .add_service(history_service) + .add_service(search_service) + .add_service(control_service) + .serve_with_incoming_shutdown(tcp_stream, shutdown_signal) + .await + { + tracing::error!("gRPC server error: {e}"); + } + }); - start_server(settings, history, shutdown_rx).await + Ok(()) } diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs deleted file mode 100644 index e1e49597..00000000 --- a/crates/atuin-daemon/src/server/sync.rs +++ /dev/null @@ -1,96 +0,0 @@ -use eyre::Result; -use rand::Rng; -use tokio::time::{self, MissedTickBehavior}; - -use atuin_client::database::Sqlite as HistoryDatabase; -use atuin_client::{ - encryption, - history::store::HistoryStore, - record::{sqlite_store::SqliteStore, sync}, - settings::Settings, -}; - -use atuin_dotfiles::store::{AliasStore, var::VarStore}; - -pub async fn worker( - settings: Settings, - store: SqliteStore, - history_store: HistoryStore, - history_db: HistoryDatabase, -) -> Result<()> { - tracing::info!("booting sync worker"); - - let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into(); - let host_id = Settings::host_id().await?; - let alias_store = AliasStore::new(store.clone(), host_id, encryption_key); - let var_store = VarStore::new(store.clone(), host_id, encryption_key); - - // Don't backoff by more than 30 mins (with a random jitter of up to 1 min) - let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0); - - let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - - // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, - // we may end up running a lot of syncs in a hot loop. No bueno! - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - ticker.tick().await; - tracing::info!("sync worker tick"); - - let logged_in = match settings.logged_in().await { - Ok(v) => v, - Err(e) => { - tracing::warn!("failed to check login status, skipping sync tick: {e}"); - continue; - } - }; - - if !logged_in { - tracing::debug!("not logged in, skipping sync tick"); - continue; - } - - let res = sync::sync(&settings, &store).await; - - if let Err(e) = res { - tracing::error!("sync tick failed with {e}"); - - let mut rng = rand::thread_rng(); - - let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); - - if new_interval > max_interval { - new_interval = max_interval; - } - - ticker = time::interval(time::Duration::from_secs(new_interval as u64)); - ticker.reset_after(time::Duration::from_secs(new_interval as u64)); - - tracing::error!("backing off, next sync tick in {new_interval}"); - } else { - let (uploaded, downloaded) = res.unwrap(); - - tracing::info!( - uploaded = ?uploaded, - downloaded = ?downloaded, - "sync complete" - ); - - history_store - .incremental_build(&history_db, &downloaded) - .await?; - - alias_store.build().await?; - var_store.build().await?; - - // Reset backoff on success - if ticker.period().as_secs() != settings.daemon.sync_frequency { - ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); - } - - // store sync time - Settings::save_sync_time().await?; - } - } -} diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs index 56457fa7..3b6952de 100644 --- a/crates/atuin-daemon/tests/lifecycle.rs +++ b/crates/atuin-daemon/tests/lifecycle.rs @@ -8,52 +8,97 @@ mod unix { use std::time::Duration; use atuin_client::database::Sqlite; - use atuin_client::history::store::HistoryStore; use atuin_client::record::sqlite_store::SqliteStore; - use atuin_common::record::HostId; - use atuin_common::utils::uuid_v7; + use atuin_client::settings::{Settings, init_meta_config_for_testing}; use atuin_daemon::client::HistoryClient; - use atuin_daemon::history::history_server::HistoryServer; - use atuin_daemon::server::HistoryService; + use atuin_daemon::components::HistoryComponent; + use atuin_daemon::{Daemon, DaemonHandle}; use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::sync::watch; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; /// Spins up a daemon server on a temp socket and returns a connected client, - /// the shutdown sender, and the temp dir (must be held to keep paths alive). - async fn start_test_daemon() -> (HistoryClient, watch::Sender<bool>, TempDir) { + /// the daemon handle (for shutdown), and the temp dir (must be held to keep paths alive). + async fn start_test_daemon() -> (HistoryClient, DaemonHandle, TempDir) { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("history.db"); let record_path = tmp.path().join("records.db"); + let key_path = tmp.path().join("key"); + let socket_path = tmp.path().join("test.sock"); + let meta_path = tmp.path().join("meta.db"); + + // Initialize the meta store config for testing (required for Settings::host_id()) + init_meta_config_for_testing(meta_path.to_str().unwrap(), 5.0); + + // Build settings with test paths + let settings: Settings = Settings::builder() + .expect("could not build settings builder") + .set_override("db_path", db_path.to_str().unwrap()) + .expect("failed to set db_path") + .set_override("record_store_path", record_path.to_str().unwrap()) + .expect("failed to set record_store_path") + .set_override("key_path", key_path.to_str().unwrap()) + .expect("failed to set key_path") + .set_override("daemon.socket_path", socket_path.to_str().unwrap()) + .expect("failed to set socket_path") + .set_override("meta.db_path", meta_path.to_str().unwrap()) + .expect("failed to set meta.db_path") + .build() + .expect("could not build settings") + .try_deserialize() + .expect("could not deserialize settings"); + // Create databases let history_db = Sqlite::new(&db_path, 5.0).await.unwrap(); let store = SqliteStore::new(&record_path, 5.0).await.unwrap(); - let host_id = HostId(uuid_v7()); - let encryption_key = [0u8; 32]; - let history_store = HistoryStore::new(store, host_id, encryption_key); + // Create the history component and get its gRPC service + let history_component = HistoryComponent::new(); + let history_service = history_component.grpc_service(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let service = HistoryService::new(history_store, history_db, shutdown_tx.clone()); + // Build and start the daemon + let mut daemon = Daemon::builder(settings) + .store(store) + .history_db(history_db) + .component(history_component) + .build() + .await + .unwrap(); - let socket_path = tmp.path().join("test.sock"); + let handle = daemon.handle(); + + // Start components (this initializes the history component with the handle) + daemon.start_components().await.unwrap(); + + // Start the gRPC server let uds = UnixListener::bind(&socket_path).unwrap(); let stream = UnixListenerStream::new(uds); - let mut rx = shutdown_rx.clone(); + let server_handle = handle.clone(); tokio::spawn(async move { + let mut rx = server_handle.subscribe(); Server::builder() - .add_service(HistoryServer::new(service)) + .add_service(history_service) .serve_with_incoming_shutdown(stream, async move { - let _ = rx.changed().await; + loop { + match rx.recv().await { + Ok(atuin_daemon::DaemonEvent::ShutdownRequested) => break, + Ok(_) => continue, + Err(_) => break, + } + } }) .await .unwrap(); }); + // Spawn the daemon event loop in the background + tokio::spawn(async move { + daemon.run_event_loop().await.unwrap(); + }); + // Give the server a moment to bind. tokio::time::sleep(Duration::from_millis(50)).await; @@ -61,12 +106,12 @@ mod unix { .await .unwrap(); - (client, shutdown_tx, tmp) + (client, handle, tmp) } #[tokio::test] async fn test_status() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let status = client.status().await.unwrap(); assert!(status.healthy); @@ -79,7 +124,7 @@ mod unix { async fn test_start_end_history() { use atuin_client::history::History; - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let history = History::daemon() .timestamp(time::OffsetDateTime::now_utc()) @@ -102,7 +147,7 @@ mod unix { #[tokio::test] async fn test_end_unknown_history_fails() { - let (mut client, _shutdown, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let result = client .end_history("nonexistent-id".to_string(), 1000, 0) @@ -112,7 +157,7 @@ mod unix { #[tokio::test] async fn test_shutdown() { - let (mut client, _shutdown_tx, _tmp) = start_test_daemon().await; + let (mut client, _handle, _tmp) = start_test_daemon().await; let accepted = client.shutdown().await.unwrap(); assert!(accepted); |
