aboutsummaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/atuin-ai/src/commands.rs103
-rw-r--r--crates/atuin-ai/src/commands/inline.rs40
-rw-r--r--crates/atuin-client/src/database.rs192
-rw-r--r--crates/atuin-client/src/hub.rs20
-rw-r--r--crates/atuin-client/src/settings.rs221
-rw-r--r--crates/atuin-common/src/utils.rs4
-rw-r--r--crates/atuin-daemon/Cargo.toml10
-rw-r--r--crates/atuin-daemon/build.rs6
-rw-r--r--crates/atuin-daemon/proto/control.proto62
-rw-r--r--crates/atuin-daemon/proto/search.proto35
-rw-r--r--crates/atuin-daemon/src/client.rs289
-rw-r--r--crates/atuin-daemon/src/components/history.rs252
-rw-r--r--crates/atuin-daemon/src/components/mod.rs22
-rw-r--r--crates/atuin-daemon/src/components/search.rs394
-rw-r--r--crates/atuin-daemon/src/components/sync.rs257
-rw-r--r--crates/atuin-daemon/src/control/mod.rs12
-rw-r--r--crates/atuin-daemon/src/control/service.rs71
-rw-r--r--crates/atuin-daemon/src/daemon.rs450
-rw-r--r--crates/atuin-daemon/src/events.rs74
-rw-r--r--crates/atuin-daemon/src/history.rs1
-rw-r--r--crates/atuin-daemon/src/history/mod.rs6
-rw-r--r--crates/atuin-daemon/src/lib.rs107
-rw-r--r--crates/atuin-daemon/src/search/index.rs572
-rw-r--r--crates/atuin-daemon/src/search/mod.rs11
-rw-r--r--crates/atuin-daemon/src/server.rs360
-rw-r--r--crates/atuin-daemon/src/server/sync.rs96
-rw-r--r--crates/atuin-daemon/tests/lifecycle.rs89
-rw-r--r--crates/atuin/Cargo.toml2
-rw-r--r--crates/atuin/src/command/client.rs185
-rw-r--r--crates/atuin/src/command/client/daemon.rs103
-rw-r--r--crates/atuin/src/command/client/history.rs12
-rw-r--r--crates/atuin/src/command/client/search.rs5
-rw-r--r--crates/atuin/src/command/client/search/engines.rs12
-rw-r--r--crates/atuin/src/command/client/search/engines/daemon.rs206
-rw-r--r--crates/atuin/src/command/client/search/engines/db.rs11
-rw-r--r--crates/atuin/src/command/client/search/engines/skim.rs17
-rw-r--r--crates/atuin/src/command/client/search/interactive.rs22
-rw-r--r--crates/atuin/src/command/client/store/rebuild.rs6
38 files changed, 3876 insertions, 461 deletions
diff --git a/crates/atuin-ai/src/commands.rs b/crates/atuin-ai/src/commands.rs
index 7d5ca16b..b35cec9e 100644
--- a/crates/atuin-ai/src/commands.rs
+++ b/crates/atuin-ai/src/commands.rs
@@ -1,8 +1,13 @@
+use std::{
+ fs,
+ path::{Path, PathBuf},
+};
+
use atuin_common::shell::Shell;
use clap::{Parser, Subcommand};
-use tracing::Level;
+use eyre::Result;
+use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt};
-
#[cfg(debug_assertions)]
pub mod debug_render;
@@ -72,7 +77,11 @@ enum Commands {
pub async fn run() -> eyre::Result<()> {
let cli = Cli::parse();
- init_tracing(cli.verbose);
+ let settings = atuin_client::settings::Settings::new()?;
+
+ if settings.logs.ai_enabled() {
+ init_logging(&settings, cli.verbose)?;
+ }
match cli.command {
Commands::Init { shell } => init::run(shell).await,
@@ -89,6 +98,7 @@ pub async fn run() -> eyre::Result<()> {
cli.api_token,
keep,
debug_state,
+ &settings,
)
.await
}
@@ -104,39 +114,90 @@ pub async fn run() -> eyre::Result<()> {
}
}
-fn init_tracing(verbose: bool) {
- let level = if verbose { Level::DEBUG } else { Level::INFO };
+pub fn detect_shell() -> Option<String> {
+ Some(Shell::current().to_string())
+}
- // Create env filter
- let env_filter = EnvFilter::from_default_env().add_directive(
- format!("atuin_ai={}", level.as_str().to_lowercase())
- .parse()
- .unwrap(),
- );
+/// Initializes logging for the AI commands.
+fn init_logging(settings: &atuin_client::settings::Settings, verbose: bool) -> Result<()> {
+ // ATUIN_LOG env var overrides config file level settings
+ let env_log_set = std::env::var("ATUIN_LOG").is_ok();
+
+ // Base filter from env var (or empty if not set)
+ let base_filter =
+ EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?);
+
+ // Use config level unless ATUIN_LOG is set
+ let filter = if env_log_set {
+ base_filter
+ } else {
+ EnvFilter::default()
+ .add_directive(settings.logs.ai_level().as_directive().parse()?)
+ .add_directive("sqlx_sqlite::regexp=off".parse()?)
+ };
+
+ let log_dir = PathBuf::from(&settings.logs.dir);
+ fs::create_dir_all(&log_dir)?;
+
+ let filename = settings.logs.ai.file.clone();
+
+ // Clean up old log files
+ cleanup_old_logs(&log_dir, &filename, settings.logs.ai_retention());
- // Create console layer (only for verbose mode)
let console_layer = if verbose {
Some(
fmt::layer()
.with_writer(std::io::stderr)
.with_ansi(true)
.with_target(false)
- .with_filter(env_filter),
+ .with_filter(filter.clone()),
)
} else {
None
};
- // Initialize subscriber
- let subscriber = tracing_subscriber::registry();
+ let file_appender = RollingFileAppender::new(Rotation::DAILY, &log_dir, &filename);
- if let Some(console) = console_layer {
- subscriber.with(console).init();
+ let base = tracing_subscriber::registry().with(
+ fmt::layer()
+ .with_writer(file_appender)
+ .with_ansi(false)
+ .with_filter(filter),
+ );
+
+ if let Some(console_layer) = console_layer {
+ base.with(console_layer).init();
} else {
- subscriber.init();
- }
+ base.init();
+ };
+
+ Ok(())
}
-pub fn detect_shell() -> Option<String> {
- Some(Shell::current().to_string())
+fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) {
+ let cutoff = std::time::SystemTime::now()
+ - std::time::Duration::from_secs(retention_days * 24 * 60 * 60);
+
+ let Ok(entries) = fs::read_dir(log_dir) else {
+ return;
+ };
+
+ for entry in entries.flatten() {
+ let path = entry.path();
+ let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
+ continue;
+ };
+
+ // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23"
+ if !name.starts_with(prefix) || name == prefix {
+ continue;
+ }
+
+ if let Ok(metadata) = entry.metadata()
+ && let Ok(modified) = metadata.modified()
+ && modified < cutoff
+ {
+ let _ = fs::remove_file(&path);
+ }
+ }
}
diff --git a/crates/atuin-ai/src/commands/inline.rs b/crates/atuin-ai/src/commands/inline.rs
index 3f9278a2..b49bfece 100644
--- a/crates/atuin-ai/src/commands/inline.rs
+++ b/crates/atuin-ai/src/commands/inline.rs
@@ -15,6 +15,7 @@ use eyre::{Context as _, Result, bail};
use futures::StreamExt;
use reqwest::Url;
use std::io::Write;
+use tracing::{debug, error, info, trace};
pub async fn run(
initial_command: Option<String>,
@@ -23,6 +24,7 @@ pub async fn run(
api_token: Option<String>,
keep_output: bool,
debug_state_file: Option<String>,
+ settings: &atuin_client::settings::Settings,
) -> Result<()> {
// Install panic hook once at entry point to ensure terminal restoration
install_panic_hook();
@@ -31,7 +33,6 @@ pub async fn run(
// 1. Command line arguments/environment variables
// 2. Settings file
// 3. Default
- let settings = atuin_client::settings::Settings::new()?;
let endpoint = api_endpoint.as_deref().unwrap_or(
settings
.ai
@@ -44,7 +45,7 @@ pub async fn run(
let token = if let Some(token) = &api_token {
token.to_string()
} else {
- ensure_hub_session(&settings, endpoint).await?
+ ensure_hub_session(settings, endpoint).await?
};
let action = run_inline_tui(
@@ -57,6 +58,7 @@ pub async fn run(
},
keep_output,
debug_state_file,
+ settings,
)
.await?;
emit_shell_result(action.0, &action.1);
@@ -69,9 +71,12 @@ async fn ensure_hub_session(
hub_address: &str,
) -> Result<String> {
if let Some(token) = atuin_client::hub::get_session_token().await? {
+ debug!("Found Hub session, using existing token");
return Ok(token);
}
+ info!("No Hub session found, prompting for authentication");
+
println!("Atuin AI requires authenticating with Atuin Hub.");
println!("This is separate from your sync setup.");
println!("Press enter to begin (or esc to cancel).");
@@ -79,6 +84,8 @@ async fn ensure_hub_session(
bail!("authentication canceled");
}
+ debug!("Starting Atuin Hub authentication...");
+
println!("Authenticating with Atuin Hub...");
let mut auth_settings = settings.clone();
auth_settings.hub_address = hub_address.to_string();
@@ -93,6 +100,8 @@ async fn ensure_hub_session(
)
.await?;
+ info!("Authentication complete, saving session token");
+
atuin_client::hub::save_session(&token).await?;
Ok(token)
}
@@ -141,6 +150,8 @@ fn create_chat_stream(
}
};
+ debug!("Sending SSE request to {endpoint}");
+
// Build request body
let mut request_body = serde_json::json!({
"messages": messages,
@@ -155,6 +166,7 @@ fn create_chat_stream(
// Include session_id only if present (not on first request)
if let Some(ref sid) = session_id {
+ trace!("Including session_id in request: {sid}");
request_body["session_id"] = serde_json::json!(sid);
}
@@ -178,12 +190,14 @@ fn create_chat_stream(
let status = response.status();
if status == reqwest::StatusCode::UNAUTHORIZED {
// Clear saved session on auth error
+ error!("SSE request failed with status: {status}, clearing session");
let _ = atuin_client::hub::delete_session().await;
yield Err(eyre::eyre!("Hub session expired. Re-run to authenticate again."));
return;
}
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
+ error!("SSE request failed ({}): {}", status, body);
yield Err(eyre::eyre!("SSE request failed ({}): {}", status, body));
return;
}
@@ -197,7 +211,7 @@ fn create_chat_stream(
let event_type = sse_event.event.as_str();
let data = sse_event.data.clone();
- tracing::debug!(event_type = %event_type, data = %data, "SSE event received");
+ debug!(event_type = %event_type, "SSE event received");
match event_type {
"text" => {
@@ -245,8 +259,10 @@ fn create_chat_stream(
"error" => {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&data) {
let message = json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string();
+ error!("SSE error: {}", message);
yield Ok(ChatStreamEvent::Error(message));
} else {
+ error!("SSE error: {}", data);
yield Ok(ChatStreamEvent::Error(data));
}
break;
@@ -391,6 +407,7 @@ async fn run_inline_tui(
initial_prompt: Option<String>,
keep_output: bool,
debug_state_file: Option<String>,
+ settings: &atuin_client::settings::Settings,
) -> Result<(Action, String)> {
// Initialize terminal guard and app state
let mut guard = TerminalGuard::new(keep_output)?;
@@ -425,7 +442,6 @@ async fn run_inline_tui(
log_state!("init");
// Load theme
- let settings = atuin_client::settings::Settings::new()?;
let mut theme_manager = ThemeManager::new(None, None);
let theme = theme_manager.load_theme(&settings.theme.name, None);
@@ -486,12 +502,12 @@ async fn run_inline_tui(
match stream.as_mut().poll_next(&mut cx) {
std::task::Poll::Ready(Some(Ok(event))) => match event {
ChatStreamEvent::TextChunk(text) => {
- tracing::debug!(text = %text, "Processing TextChunk");
+ trace!(text = %text, "Processing TextChunk");
app.state.append_streaming_text(&text);
log_state!("text_chunk");
}
ChatStreamEvent::ToolCall { id, name, input } => {
- tracing::debug!(id = %id, name = %name, "Processing ToolCall");
+ trace!(id = %id, name = %name, "Processing ToolCall");
app.state.add_tool_call(id, name, input);
log_state!("tool_call");
}
@@ -500,17 +516,17 @@ async fn run_inline_tui(
content,
is_error,
} => {
- tracing::debug!(tool_use_id = %tool_use_id, "Processing ToolResult");
+ trace!(tool_use_id = %tool_use_id, "Processing ToolResult");
app.state.add_tool_result(tool_use_id, content, is_error);
log_state!("tool_result");
}
ChatStreamEvent::Status(status) => {
- tracing::debug!(status = %status, "Processing Status");
+ trace!(status = %status, "Processing Status");
app.state.update_streaming_status(&status);
log_state!("status");
}
ChatStreamEvent::Done { session_id } => {
- tracing::debug!(session_id = %session_id, "Processing Done");
+ trace!(session_id = %session_id, "Processing Done");
chat_stream = None;
if !session_id.is_empty() {
app.state.store_session_id(session_id);
@@ -519,7 +535,7 @@ async fn run_inline_tui(
log_state!("done");
}
ChatStreamEvent::Error(msg) => {
- tracing::debug!(error = %msg, "Processing Error");
+ trace!(error = %msg, "Processing Error");
chat_stream = None;
app.state.streaming_error(msg);
log_state!("error");
@@ -544,7 +560,7 @@ async fn run_inline_tui(
// Handle user cancellation (Esc during streaming) - drop the stream
if app.state.was_interrupted && chat_stream.is_some() {
- tracing::debug!("User cancelled streaming, dropping chat stream");
+ debug!("User cancelled streaming, dropping chat stream");
chat_stream = None;
app.state.was_interrupted = false; // Reset the flag
}
@@ -579,7 +595,7 @@ async fn run_inline_tui(
token.clone(),
app.state.session_id.clone(),
messages,
- &settings,
+ settings,
));
}
}
diff --git a/crates/atuin-client/src/database.rs b/crates/atuin-client/src/database.rs
index 5f292bec..7c63368d 100644
--- a/crates/atuin-client/src/database.rs
+++ b/crates/atuin-client/src/database.rs
@@ -138,9 +138,13 @@ pub trait Database: Send + Sync + 'static {
async fn all_with_count(&self) -> Result<Vec<(History, i32)>>;
+ fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged;
+
async fn stats(&self, h: &History) -> Result<HistoryStats>;
async fn get_dups(&self, before: i64, dupkeep: u32) -> Result<Vec<History>>;
+
+ fn clone_boxed(&self) -> Box<dyn Database + 'static>;
}
// Intended for use on a developer machine and not a sync server.
@@ -650,6 +654,10 @@ impl Database for Sqlite {
Ok(res)
}
+ fn all_paged(&self, page_size: usize, include_deleted: bool, unique: bool) -> Paged {
+ Paged::new(Box::new(self.clone()), page_size, include_deleted, unique)
+ }
+
// deleted_at doesn't mean the actual time that the user deleted it,
// but the time that the system marks it as deleted
async fn delete(&self, mut h: History) -> Result<()> {
@@ -814,6 +822,70 @@ impl Database for Sqlite {
Ok(res)
}
+
+ fn clone_boxed(&self) -> Box<dyn Database + 'static> {
+ Box::new(self.clone())
+ }
+}
+
+pub struct Paged {
+ database: Box<dyn Database + 'static>,
+ page_size: usize,
+ last_id: Option<String>,
+ include_deleted: bool,
+ unique: bool,
+}
+
+impl Paged {
+ pub fn new(
+ database: Box<dyn Database + 'static>,
+ page_size: usize,
+ include_deleted: bool,
+ unique: bool,
+ ) -> Self {
+ Self {
+ database,
+ page_size,
+ last_id: None,
+ include_deleted,
+ unique,
+ }
+ }
+
+ pub async fn next(&mut self) -> Result<Option<Vec<History>>> {
+ let mut query = SqlBuilder::select_from(SqlName::new("history").alias("h").baquoted());
+
+ query.field("*").order_desc("id");
+
+ if !self.include_deleted {
+ query.and_where_is_null("deleted_at");
+ }
+
+ if self.unique {
+ // We want to deduplicate on command, but the user can search via cwd, hostname, and session.
+ // Without those fields, filter modes won't work right. With those fields, we get duplicates.
+ // This must be handled upstream.
+ query
+ .group_by("command, cwd, hostname, session")
+ .having("max(timestamp)");
+ }
+
+ query.limit(self.page_size);
+
+ if let Some(last_id) = &self.last_id {
+ query.and_where_lt("id", quote(last_id));
+ }
+
+ let query = query.sql().expect("bug in list query. please report");
+ let res = self.database.query_history(&query).await?;
+
+ if res.is_empty() {
+ Ok(None)
+ } else {
+ self.last_id = Some(res.last().unwrap().id.0.clone());
+ Ok(Some(res))
+ }
+ }
}
trait SqlBuilderExt {
@@ -1166,6 +1238,126 @@ mod test {
}
#[tokio::test(flavor = "multi_thread")]
+ async fn test_paged_basic() {
+ let mut db = Sqlite::new("sqlite::memory:", test_local_timeout())
+ .await
+ .unwrap();
+
+ // Add 5 history items
+ for i in 0..5 {
+ new_history_item(&mut db, &format!("command{}", i))
+ .await
+ .unwrap();
+ }
+
+ // Create a paged iterator with page_size of 2
+ let mut paged = db.all_paged(2, false, false);
+
+ // First page should have 2 items
+ let page1 = paged.next().await.unwrap();
+ assert!(page1.is_some());
+ assert_eq!(page1.unwrap().len(), 2);
+
+ // Second page should have 2 items
+ let page2 = paged.next().await.unwrap();
+ assert!(page2.is_some());
+ assert_eq!(page2.unwrap().len(), 2);
+
+ // Third page should have 1 item
+ let page3 = paged.next().await.unwrap();
+ assert!(page3.is_some());
+ assert_eq!(page3.unwrap().len(), 1);
+
+ // Fourth page should be None (exhausted)
+ let page4 = paged.next().await.unwrap();
+ assert!(page4.is_none());
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn test_paged_empty() {
+ let db = Sqlite::new("sqlite::memory:", test_local_timeout())
+ .await
+ .unwrap();
+
+ // Create a paged iterator on empty database
+ let mut paged = db.all_paged(10, false, false);
+
+ // Should return None immediately
+ let page = paged.next().await.unwrap();
+ assert!(page.is_none());
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn test_paged_unique() {
+ let mut db = Sqlite::new("sqlite::memory:", test_local_timeout())
+ .await
+ .unwrap();
+
+ // Add duplicate commands
+ new_history_item(&mut db, "duplicate").await.unwrap();
+ new_history_item(&mut db, "duplicate").await.unwrap();
+ new_history_item(&mut db, "unique1").await.unwrap();
+ new_history_item(&mut db, "unique2").await.unwrap();
+
+ // Without unique flag - should get all 4
+ let mut paged = db.all_paged(10, false, false);
+ let page = paged.next().await.unwrap().unwrap();
+ assert_eq!(page.len(), 4);
+
+ // With unique flag - should get 3 (duplicates collapsed)
+ let mut paged_unique = db.all_paged(10, false, true);
+ let page_unique = paged_unique.next().await.unwrap().unwrap();
+ assert_eq!(page_unique.len(), 3);
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn test_paged_include_deleted() {
+ let mut db = Sqlite::new("sqlite::memory:", test_local_timeout())
+ .await
+ .unwrap();
+
+ // Add items
+ new_history_item(&mut db, "keep1").await.unwrap();
+ new_history_item(&mut db, "keep2").await.unwrap();
+ new_history_item(&mut db, "delete_me").await.unwrap();
+
+ // Delete one item
+ let all = db
+ .list(
+ &[],
+ &Context {
+ hostname: "".to_string(),
+ session: "".to_string(),
+ cwd: "".to_string(),
+ host_id: "".to_string(),
+ git_root: None,
+ },
+ None,
+ false,
+ false,
+ )
+ .await
+ .unwrap();
+
+ let to_delete = all
+ .iter()
+ .find(|h| h.command == "delete_me")
+ .unwrap()
+ .clone();
+ db.delete(to_delete).await.unwrap();
+
+ // Without include_deleted - should get 2
+ let mut paged = db.all_paged(10, false, false);
+ let page = paged.next().await.unwrap().unwrap();
+ assert_eq!(page.len(), 2);
+
+ // With include_deleted - should get 3
+ let mut paged_deleted = db.all_paged(10, true, false);
+ let page_deleted = paged_deleted.next().await.unwrap().unwrap();
+ assert_eq!(page_deleted.len(), 3);
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
async fn test_search_bench_dupes() {
let context = Context {
hostname: "test:host".to_string(),
diff --git a/crates/atuin-client/src/hub.rs b/crates/atuin-client/src/hub.rs
index 5b34574b..b94c69ea 100644
--- a/crates/atuin-client/src/hub.rs
+++ b/crates/atuin-client/src/hub.rs
@@ -58,10 +58,14 @@ impl HubAuthSession {
///
/// Returns a session containing the code and auth URL that the user should visit.
pub async fn start(settings: &Settings) -> Result<Self> {
+ debug!("Starting Hub authentication process...");
+
let code_response = request_code(&settings.hub_address)
.await
.context("Failed to request authentication code from Hub")?;
+ debug!("Received code from Hub");
+
let code = code_response.code;
let auth_url = format!("{}/auth/cli?code={}", settings.hub_address, code);
@@ -79,8 +83,10 @@ impl HubAuthSession {
match verify_code(&self.hub_address, &self.code).await {
Ok(response) => {
if let Some(token) = response.token {
+ debug!("Authentication complete, received token");
Ok(HubAuthStatus::Complete(token))
} else if let Some(error) = response.error {
+ error!("Authentication failed: {}", error);
Ok(HubAuthStatus::Failed(error))
} else {
Ok(HubAuthStatus::Pending)
@@ -105,8 +111,11 @@ impl HubAuthSession {
) -> Result<String> {
let start = std::time::Instant::now();
+ debug!("Polling for Hub authentication completion...");
+
loop {
if start.elapsed() > timeout {
+ warn!("Authentication loop exited due to timeout");
bail!("Authentication timed out. Please try again.");
}
@@ -181,17 +190,21 @@ async fn handle_resp_error(resp: reqwest::Response) -> Result<reqwest::Response>
let status = resp.status();
if status == StatusCode::SERVICE_UNAVAILABLE {
+ error!("Service unavailable: check https://status.atuin.sh");
bail!("Service unavailable: check https://status.atuin.sh");
}
if status == StatusCode::TOO_MANY_REQUESTS {
+ error!("Rate limited; please wait before trying again");
bail!("Rate limited; please wait before trying again");
}
if !status.is_success() {
if let Ok(error) = resp.json::<ErrorResponse>().await {
+ error!("Hub error: {} - {}", status, error.reason);
bail!("Hub error: {} - {}", status, error.reason);
}
+ error!("Hub request failed with status: {}", status);
bail!("Hub request failed with status: {}", status);
}
@@ -204,6 +217,8 @@ async fn request_code(address: &str) -> Result<CliCodeResponse> {
let url = make_url(address, "/auth/cli/code")?;
let client = reqwest::Client::new();
+ debug!("Requesting code from Hub at {url}");
+
let resp = client
.post(&url)
.header(USER_AGENT, APP_USER_AGENT)
@@ -219,9 +234,12 @@ async fn request_code(address: &str) -> Result<CliCodeResponse> {
/// Poll to verify the CLI auth code and get the session token
async fn verify_code(address: &str, code: &str) -> Result<CliVerifyResponse> {
ensure_crypto_provider();
- let url = make_url(address, &format!("/auth/cli/verify?code={}", code))?;
+ let base = make_url(address, "/auth/cli/verify")?;
+ let url = format!("{base}?code={code}");
let client = reqwest::Client::new();
+ debug!("Verifying code with Hub at {base}?code=******");
+
let resp = client
.post(&url)
.header(USER_AGENT, APP_USER_AGENT)
diff --git a/crates/atuin-client/src/settings.rs b/crates/atuin-client/src/settings.rs
index a15ce461..8e874832 100644
--- a/crates/atuin-client/src/settings.rs
+++ b/crates/atuin-client/src/settings.rs
@@ -42,6 +42,10 @@ pub enum SearchMode {
#[serde(rename = "skim")]
Skim,
+
+ #[serde(rename = "daemon-fuzzy")]
+ #[clap(aliases = &["daemon-fuzzy"])]
+ DaemonFuzzy,
}
impl SearchMode {
@@ -51,6 +55,7 @@ impl SearchMode {
SearchMode::FullText => "FULLTXT",
SearchMode::Fuzzy => "FUZZY",
SearchMode::Skim => "SKIM",
+ SearchMode::DaemonFuzzy => "DAEMON",
}
}
pub fn next(&self, settings: &Settings) -> Self {
@@ -58,9 +63,13 @@ impl SearchMode {
SearchMode::Prefix => SearchMode::FullText,
// if the user is using skim, we go to skim
SearchMode::FullText if settings.search_mode == SearchMode::Skim => SearchMode::Skim,
+ // if the user is using daemon-fuzzy, we go to daemon-fuzzy
+ SearchMode::FullText if settings.search_mode == SearchMode::DaemonFuzzy => {
+ SearchMode::DaemonFuzzy
+ }
// otherwise fuzzy.
SearchMode::FullText => SearchMode::Fuzzy,
- SearchMode::Fuzzy | SearchMode::Skim => SearchMode::Prefix,
+ SearchMode::Fuzzy | SearchMode::Skim | SearchMode::DaemonFuzzy => SearchMode::Prefix,
}
}
}
@@ -477,6 +486,78 @@ pub struct Tmux {
pub height: String,
}
+/// Log level for file logging. Maps to tracing's LevelFilter.
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "lowercase")]
+pub enum LogLevel {
+ Trace,
+ Debug,
+ #[default]
+ Info,
+ Warn,
+ Error,
+}
+
+impl LogLevel {
+ /// Convert to a tracing directive string for use with EnvFilter.
+ pub fn as_directive(&self) -> &'static str {
+ match self {
+ LogLevel::Trace => "trace",
+ LogLevel::Debug => "debug",
+ LogLevel::Info => "info",
+ LogLevel::Warn => "warn",
+ LogLevel::Error => "error",
+ }
+ }
+}
+
+/// Configuration for a specific log type (search or daemon).
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
+pub struct LogConfig {
+ /// Log file name (relative to dir) or absolute path.
+ pub file: String,
+
+ /// Override global enabled setting for this log type.
+ pub enabled: Option<bool>,
+
+ /// Override global level setting for this log type.
+ pub level: Option<LogLevel>,
+
+ /// Override global retention days setting for this log type.
+ pub retention: Option<u64>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct Logs {
+ /// Enable file logging globally. Defaults to true.
+ #[serde(default = "Logs::default_enabled")]
+ pub enabled: bool,
+
+ /// Directory for log files. Defaults to ~/.atuin/logs
+ pub dir: String,
+
+ /// Default log level for file logging. Defaults to "info".
+ /// Note: ATUIN_LOG environment variable overrides this.
+ #[serde(default)]
+ pub level: LogLevel,
+
+ /// Default retention days for log files. Defaults to 4.
+ #[serde(default = "Logs::default_retention")]
+ pub retention: u64,
+
+ /// Search log settings
+ #[serde(default)]
+ pub search: LogConfig,
+
+ /// Daemon log settings
+ #[serde(default)]
+ pub daemon: LogConfig,
+
+ /// AI log settings
+ #[serde(default)]
+ pub ai: LogConfig,
+}
+
#[derive(Default, Clone, Debug, Deserialize, Serialize)]
pub struct Ai {
/// The address of the Atuin AI endpoint. Used for AI features like command generation.
@@ -523,6 +604,117 @@ impl Default for Daemon {
}
}
+impl Default for Logs {
+ fn default() -> Self {
+ Self {
+ enabled: true,
+ dir: "".to_string(),
+ level: LogLevel::default(),
+ retention: Self::default_retention(),
+ search: LogConfig {
+ file: "search.log".to_string(),
+ ..Default::default()
+ },
+ daemon: LogConfig {
+ file: "daemon.log".to_string(),
+ ..Default::default()
+ },
+ ai: LogConfig {
+ file: "ai.log".to_string(),
+ ..Default::default()
+ },
+ }
+ }
+}
+
+impl Logs {
+ fn default_enabled() -> bool {
+ true
+ }
+
+ fn default_retention() -> u64 {
+ 4
+ }
+
+ /// Returns whether search logging is enabled.
+ /// Uses search-specific setting if set, otherwise falls back to global.
+ pub fn search_enabled(&self) -> bool {
+ self.search.enabled.unwrap_or(self.enabled)
+ }
+
+ /// Returns whether daemon logging is enabled.
+ /// Uses daemon-specific setting if set, otherwise falls back to global.
+ pub fn daemon_enabled(&self) -> bool {
+ self.daemon.enabled.unwrap_or(self.enabled)
+ }
+
+ /// Returns whether AI logging is enabled.
+ /// Uses AI-specific setting if set, otherwise falls back to global.
+ pub fn ai_enabled(&self) -> bool {
+ self.ai.enabled.unwrap_or(self.enabled)
+ }
+
+ /// Returns the log level for search logging.
+ /// Uses search-specific setting if set, otherwise falls back to global.
+ pub fn search_level(&self) -> LogLevel {
+ self.search.level.unwrap_or(self.level)
+ }
+
+ /// Returns the log level for daemon logging.
+ /// Uses daemon-specific setting if set, otherwise falls back to global.
+ pub fn daemon_level(&self) -> LogLevel {
+ self.daemon.level.unwrap_or(self.level)
+ }
+
+ /// Returns the log level for AI logging.
+ /// Uses AI-specific setting if set, otherwise falls back to global.
+ pub fn ai_level(&self) -> LogLevel {
+ self.ai.level.unwrap_or(self.level)
+ }
+
+ /// Returns the retention days for search logging.
+ /// Uses search-specific setting if set, otherwise falls back to global.
+ pub fn search_retention(&self) -> u64 {
+ self.search.retention.unwrap_or(self.retention)
+ }
+
+ /// Returns the retention days for daemon logging.
+ /// Uses daemon-specific setting if set, otherwise falls back to global.
+ pub fn daemon_retention(&self) -> u64 {
+ self.daemon.retention.unwrap_or(self.retention)
+ }
+
+ /// Returns the retention days for AI logging.
+ /// Uses AI-specific setting if set, otherwise falls back to global.
+ pub fn ai_retention(&self) -> u64 {
+ self.ai.retention.unwrap_or(self.retention)
+ }
+
+ /// Returns the full path for the search log file.
+ /// If `file` is an absolute path, returns it directly.
+ /// Otherwise, joins it with `dir`.
+ pub fn search_path(&self) -> PathBuf {
+ let path = PathBuf::from(&self.search.file);
+ if path.is_absolute() {
+ path
+ } else {
+ PathBuf::from(&self.dir).join(path)
+ }
+ }
+
+ /// Returns the full path for the daemon log file.
+ /// If `file` is an absolute path, returns it directly.
+ /// Otherwise, joins it with `dir`.
+ pub fn daemon_path(&self) -> PathBuf {
+ let path = PathBuf::from(&self.daemon.file);
+ if path.is_absolute() {
+ path
+ } else {
+ PathBuf::from(&self.dir).join(path)
+ }
+ }
+}
+
impl Default for Search {
fn default() -> Self {
Self {
@@ -849,6 +1041,9 @@ pub struct Settings {
pub tmux: Tmux,
#[serde(default)]
+ pub logs: Logs,
+
+ #[serde(default)]
pub meta: meta::Settings,
#[serde(default)]
@@ -1033,6 +1228,7 @@ impl Settings {
let scripts_path = data_dir.join("scripts.db");
let socket_path = atuin_common::utils::runtime_dir().join("atuin.sock");
let pidfile_path = data_dir.join("atuin-daemon.pid");
+ let logs_dir = atuin_common::utils::logs_dir();
let key_path = data_dir.join("key");
let meta_path = data_dir.join("meta.db");
@@ -1101,6 +1297,12 @@ impl Settings {
.set_default("daemon.pidfile_path", pidfile_path.to_str())?
.set_default("daemon.systemd_socket", false)?
.set_default("daemon.tcp_port", 8889)?
+ .set_default("logs.enabled", true)?
+ .set_default("logs.dir", logs_dir.to_str())?
+ .set_default("logs.level", "info")?
+ .set_default("logs.search.file", "search.log")?
+ .set_default("logs.daemon.file", "daemon.log")?
+ .set_default("logs.ai.file", "ai.log")?
.set_default("kv.db_path", kv_path.to_str())?
.set_default("scripts.db_path", scripts_path.to_str())?
.set_default("meta.db_path", meta_path.to_str())?
@@ -1218,6 +1420,9 @@ impl Settings {
settings.key_path = Self::expand_path(settings.key_path)?;
settings.daemon.socket_path = Self::expand_path(settings.daemon.socket_path)?;
settings.daemon.pidfile_path = Self::expand_path(settings.daemon.pidfile_path)?;
+ settings.logs.dir = Self::expand_path(settings.logs.dir)?;
+ settings.logs.search.file = Self::expand_path(settings.logs.search.file)?;
+ settings.logs.daemon.file = Self::expand_path(settings.logs.daemon.file)?;
// Validate UI settings
settings.ui.validate()?;
@@ -1264,6 +1469,20 @@ impl Default for Settings {
}
}
+/// Initialize the meta store configuration for testing.
+///
+/// This should only be used in tests. It allows tests to bypass the normal
+/// Settings::new() flow while still being able to use Settings::host_id()
+/// and other meta store dependent functions.
+///
+/// # Safety
+/// This function is not thread-safe with concurrent calls to Settings::new()
+/// or other meta store initialization. Only call from tests.
+#[doc(hidden)]
+pub fn init_meta_config_for_testing(meta_db_path: impl Into<String>, local_timeout: f64) {
+ META_CONFIG.set((meta_db_path.into(), local_timeout)).ok();
+}
+
#[cfg(test)]
pub(crate) fn test_local_timeout() -> f64 {
std::env::var("ATUIN_TEST_LOCAL_TIMEOUT")
diff --git a/crates/atuin-common/src/utils.rs b/crates/atuin-common/src/utils.rs
index bb291ebf..b885423e 100644
--- a/crates/atuin-common/src/utils.rs
+++ b/crates/atuin-common/src/utils.rs
@@ -88,6 +88,10 @@ pub fn runtime_dir() -> PathBuf {
std::env::var("XDG_RUNTIME_DIR").map_or_else(|_| data_dir(), PathBuf::from)
}
+pub fn logs_dir() -> PathBuf {
+ home_dir().join(".atuin").join("logs")
+}
+
pub fn dotfiles_cache_dir() -> PathBuf {
// In most cases, this will be ~/.local/share/atuin/dotfiles/cache
let data_dir = std::env::var("XDG_DATA_HOME")
diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml
index 36917789..97ed88ea 100644
--- a/crates/atuin-daemon/Cargo.toml
+++ b/crates/atuin-daemon/Cargo.toml
@@ -14,9 +14,10 @@ readme.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-atuin-client = { path = "../atuin-client", version = "18.13.0-beta.1" }
-atuin-dotfiles = { path = "../atuin-dotfiles", version = "18.13.0-beta.1" }
-atuin-history = { path = "../atuin-history", version = "18.13.0-beta.1" }
+atuin-client = { path = "../atuin-client" }
+atuin-common = { path = "../atuin-common" }
+atuin-dotfiles = { path = "../atuin-dotfiles" }
+atuin-history = { path = "../atuin-history" }
time = { workspace = true }
uuid = { workspace = true }
@@ -32,10 +33,11 @@ tonic = "0.14"
tonic-prost = "0.14"
prost = "0.14"
prost-types = "0.14"
-tokio-stream = {version="0.1.14", features=["net"]}
+tokio-stream = { version = "0.1.14", features = ["net"] }
hyper-util = "0.1"
rand.workspace = true
+nucleo = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" }
[target.'cfg(target_os = "linux")'.dependencies]
listenfd = "1.0.1"
diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs
index fbe34d12..7034aa04 100644
--- a/crates/atuin-daemon/build.rs
+++ b/crates/atuin-daemon/build.rs
@@ -3,7 +3,11 @@ use std::{env, fs, path::PathBuf};
use protox::prost::Message;
fn main() -> std::io::Result<()> {
- let proto_paths = ["proto/history.proto"];
+ let proto_paths = [
+ "proto/history.proto",
+ "proto/search.proto",
+ "proto/control.proto",
+ ];
let proto_include_dirs = ["proto"];
let file_descriptors = protox::compile(proto_paths, proto_include_dirs).unwrap();
diff --git a/crates/atuin-daemon/proto/control.proto b/crates/atuin-daemon/proto/control.proto
new file mode 100644
index 00000000..06347902
--- /dev/null
+++ b/crates/atuin-daemon/proto/control.proto
@@ -0,0 +1,62 @@
+syntax = "proto3";
+package control;
+
+// The Control service allows external processes (CLI commands, etc.)
+// to inject events into the running daemon.
+service Control {
+ // Send an event to the daemon's event bus
+ rpc SendEvent(SendEventRequest) returns (SendEventResponse);
+}
+
+message SendEventRequest {
+ oneof event {
+ // History was pruned - search index needs full rebuild
+ HistoryPrunedEvent history_pruned = 1;
+
+ // Specific history items were deleted
+ HistoryDeletedEvent history_deleted = 2;
+
+ // Request immediate sync
+ ForceSyncEvent force_sync = 3;
+
+ // Settings have changed, reload if needed
+ SettingsReloadedEvent settings_reloaded = 4;
+
+ // Request graceful shutdown
+ ShutdownEvent shutdown = 5;
+
+ // History was rebuilt - search index needs full rebuild
+ HistoryRebuiltEvent history_rebuilt = 6;
+ }
+}
+
+message SendEventResponse {
+ // Empty on success; errors come through gRPC status
+}
+
+// Individual event message types
+
+message HistoryPrunedEvent {
+ // No fields needed - just signals that pruning happened
+}
+
+message HistoryRebuiltEvent {
+ // No fields needed - just signals that rebuilding happened
+}
+
+message HistoryDeletedEvent {
+ // IDs of deleted history items (UUIDs as strings)
+ repeated string ids = 1;
+}
+
+message ForceSyncEvent {
+ // No fields needed - just triggers sync
+}
+
+message SettingsReloadedEvent {
+ // No fields needed - components should re-read settings
+}
+
+message ShutdownEvent {
+ // No fields needed - triggers graceful shutdown
+}
diff --git a/crates/atuin-daemon/proto/search.proto b/crates/atuin-daemon/proto/search.proto
new file mode 100644
index 00000000..6b84acbd
--- /dev/null
+++ b/crates/atuin-daemon/proto/search.proto
@@ -0,0 +1,35 @@
+syntax = "proto3";
+package search;
+
+enum FilterMode {
+ GLOBAL = 0;
+ HOST = 1;
+ SESSION = 2;
+ DIRECTORY = 3;
+ WORKSPACE = 4;
+ SESSION_PRELOAD = 5;
+}
+
+message SearchContext {
+ string session_id = 1;
+ string cwd = 2;
+ string hostname = 3;
+ string host_id = 4;
+ optional string git_root = 5;
+}
+
+message SearchRequest {
+ string query = 1;
+ uint64 query_id = 2; // Incrementing ID to match responses to queries
+ FilterMode filter_mode = 3;
+ SearchContext context = 4;
+}
+
+message SearchResponse {
+ uint64 query_id = 1; // Echo back the query ID
+ repeated bytes ids = 2;
+}
+
+service Search {
+ rpc Search(stream SearchRequest) returns (stream SearchResponse);
+}
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
index 3b76a680..2f492f6b 100644
--- a/crates/atuin-daemon/src/client.rs
+++ b/crates/atuin-daemon/src/client.rs
@@ -1,4 +1,6 @@
-use eyre::{Context, Result};
+use atuin_client::database::Context;
+use atuin_client::settings::{FilterMode, Settings};
+use eyre::{Context as EyreContext, Result};
#[cfg(windows)]
use tokio::net::TcpStream;
use tonic::Code;
@@ -11,11 +13,22 @@ use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use atuin_client::history::History;
+use tracing::{Level, instrument, span};
+use crate::control::HistoryRebuiltEvent;
+use crate::control::{
+ ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
+ SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
+};
+use crate::events::DaemonEvent;
use crate::history::{
EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient,
};
+use crate::search::{
+ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
+ search_client::SearchClient as SearchServiceClient,
+};
pub struct HistoryClient {
client: HistoryServiceClient<Channel>,
@@ -52,6 +65,8 @@ pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind {
impl HistoryClient {
#[cfg(unix)]
pub async fn new(path: String) -> Result<Self> {
+ use eyre::Context;
+
let log_path = path.clone();
let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
.connect_with_connector(service_fn(move |_: Uri| {
@@ -130,3 +145,275 @@ impl HistoryClient {
Ok(resp.accepted)
}
}
+
+pub struct SearchClient {
+ client: SearchServiceClient<Channel>,
+}
+
+impl SearchClient {
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = SearchServiceClient::new(channel);
+
+ Ok(SearchClient { client })
+ }
+
+ #[cfg(not(unix))]
+ pub async fn new(port: u64) -> Result<Self> {
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let url = format!("127.0.0.1:{port}");
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
+ )
+ })?;
+
+ let client = SearchServiceClient::new(channel);
+
+ Ok(SearchClient { client })
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))]
+ pub async fn search(
+ &mut self,
+ query: String,
+ query_id: u64,
+ filter_mode: FilterMode,
+ context: Option<Context>,
+ ) -> Result<tonic::Streaming<SearchResponse>> {
+ let request = SearchRequest {
+ query,
+ query_id,
+ filter_mode: RpcFilterMode::from(filter_mode).into(),
+ context: context.map(RpcSearchContext::from),
+ };
+ let request_stream = tokio_stream::once(request);
+ let response = span!(Level::TRACE, "daemon_client_search.request")
+ .in_scope(async || self.client.search(request_stream).await)
+ .await?;
+
+ Ok(response.into_inner())
+ }
+}
+
+impl From<FilterMode> for RpcFilterMode {
+ fn from(filter_mode: FilterMode) -> Self {
+ match filter_mode {
+ FilterMode::Global => RpcFilterMode::Global,
+ FilterMode::Host => RpcFilterMode::Host,
+ FilterMode::Session => RpcFilterMode::Session,
+ FilterMode::Directory => RpcFilterMode::Directory,
+ FilterMode::Workspace => RpcFilterMode::Workspace,
+ FilterMode::SessionPreload => RpcFilterMode::SessionPreload,
+ }
+ }
+}
+
+impl From<Context> for RpcSearchContext {
+ fn from(context: Context) -> Self {
+ RpcSearchContext {
+ session_id: context.session,
+ cwd: context.cwd,
+ hostname: context.hostname,
+ host_id: context.host_id,
+ git_root: context
+ .git_root
+ .map(|path| path.to_string_lossy().to_string()),
+ }
+ }
+}
+
+// ============================================================================
+// Control Client
+// ============================================================================
+
+/// Client for the Control gRPC service.
+///
+/// Used to inject events into a running daemon from external processes.
+pub struct ControlClient {
+ client: ControlServiceClient<Channel>,
+}
+
+impl ControlClient {
+ /// Connect to the daemon's control service.
+ #[cfg(unix)]
+ pub async fn new(path: String) -> Result<Self> {
+ let log_path = path.clone();
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.clone();
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at {}. Is it running?",
+ &log_path
+ )
+ })?;
+
+ let client = ControlServiceClient::new(channel);
+
+ Ok(ControlClient { client })
+ }
+
+ /// Connect to the daemon's control service.
+ #[cfg(not(unix))]
+ pub async fn new(port: u64) -> Result<Self> {
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let url = format!("127.0.0.1:{port}");
+
+ async move {
+ Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
+ }
+ }))
+ .await
+ .wrap_err_with(|| {
+ format!(
+ "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
+ )
+ })?;
+
+ let client = ControlServiceClient::new(channel);
+
+ Ok(ControlClient { client })
+ }
+
+ /// Connect using settings.
+ #[cfg(unix)]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.socket_path.clone()).await
+ }
+
+ /// Connect using settings.
+ #[cfg(not(unix))]
+ pub async fn from_settings(settings: &Settings) -> Result<Self> {
+ Self::new(settings.daemon.tcp_port).await
+ }
+
+ /// Send an event to the daemon.
+ pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> {
+ let proto_event = daemon_event_to_proto(event);
+ let request = SendEventRequest {
+ event: Some(proto_event),
+ };
+ self.client.send_event(request).await?;
+ Ok(())
+ }
+}
+
+/// Convert a daemon event to its proto representation.
+fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event {
+ use crate::control::send_event_request::Event;
+
+ match event {
+ DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
+ DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}),
+ DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent {
+ ids: ids.into_iter().map(|id| id.0).collect(),
+ }),
+ DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}),
+ DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}),
+ DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}),
+ // These events are internal and not sent via the control service
+ DaemonEvent::HistoryStarted(_)
+ | DaemonEvent::HistoryEnded(_)
+ | DaemonEvent::RecordsAdded(_)
+ | DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. } => {
+ // Use shutdown as a fallback, though this shouldn't happen
+ tracing::warn!("attempted to send internal event via control service");
+ Event::Shutdown(ShutdownEvent {})
+ }
+ }
+}
+
+// ============================================================================
+// Convenience Functions
+// ============================================================================
+
+/// Emit an event to the daemon.
+///
+/// This is a fire-and-forget helper for sending events to the daemon from
+/// external processes like CLI commands. If the daemon isn't running, this
+/// will silently succeed (returns Ok).
+///
+/// # Example
+///
+/// ```ignore
+/// // After pruning history
+/// emit_event(DaemonEvent::HistoryPruned).await?;
+///
+/// // After deleting specific history items
+/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?;
+///
+/// // Request immediate sync
+/// emit_event(DaemonEvent::ForceSync).await?;
+/// ```
+pub async fn emit_event(event: DaemonEvent) -> Result<()> {
+ emit_event_with_settings(event, None).await
+}
+
+/// Emit an event to the daemon with explicit settings.
+///
+/// If settings are not provided, they will be loaded from the default location.
+/// If the daemon isn't running, this will silently succeed.
+pub async fn emit_event_with_settings(
+ event: DaemonEvent,
+ settings: Option<&Settings>,
+) -> Result<()> {
+ // Load settings if not provided
+ let owned_settings;
+ let settings = match settings {
+ Some(s) => s,
+ None => {
+ owned_settings = Settings::new()?;
+ &owned_settings
+ }
+ };
+
+ // Try to connect - if daemon isn't running, that's fine
+ let mut client = match ControlClient::from_settings(settings).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::debug!(?e, "daemon not running, skipping event emission");
+ return Ok(());
+ }
+ };
+
+ // Send the event
+ if let Err(e) = client.send_event(event).await {
+ tracing::debug!(?e, "failed to send event to daemon");
+ // Don't fail - this is fire-and-forget
+ }
+
+ Ok(())
+}
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
new file mode 100644
index 00000000..23d48c5e
--- /dev/null
+++ b/crates/atuin-daemon/src/components/history.rs
@@ -0,0 +1,252 @@
+//! History component.
+//!
+//! Handles command history lifecycle (start/end) and provides the History gRPC service.
+
+use std::sync::Arc;
+
+use atuin_client::{
+ database::Database,
+ history::{History, HistoryId, store::HistoryStore},
+ settings::Settings,
+};
+use dashmap::DashMap;
+use eyre::Result;
+use time::OffsetDateTime;
+use tonic::{Request, Response, Status};
+use tracing::{Level, instrument};
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ history::{
+ EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply,
+ StartHistoryRequest, StatusReply, StatusRequest,
+ history_server::{History as HistorySvc, HistoryServer},
+ },
+};
+
+const DAEMON_PROTOCOL_VERSION: u32 = 1;
+
+/// History component - manages command history lifecycle.
+///
+/// This component:
+/// - Tracks currently running commands (stored in memory)
+/// - Saves completed commands to the database and record store
+/// - Emits history events for other components (e.g., search indexing)
+/// - Provides the History gRPC service
+pub struct HistoryComponent {
+ inner: Arc<HistoryComponentInner>,
+}
+
+struct HistoryComponentInner {
+ /// Commands currently running (not yet completed).
+ running: DashMap<HistoryId, History>,
+
+ /// Handle to the daemon (set during start).
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+
+ /// History store for pushing records (set during start).
+ history_store: tokio::sync::RwLock<Option<HistoryStore>>,
+}
+
+impl HistoryComponent {
+ /// Create a new history component.
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(HistoryComponentInner {
+ running: DashMap::new(),
+ handle: tokio::sync::RwLock::new(None),
+ history_store: tokio::sync::RwLock::new(None),
+ }),
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ ///
+ /// This returns a tonic service that can be added to a gRPC server.
+ pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
+ HistoryServer::new(HistoryGrpcService {
+ inner: self.inner.clone(),
+ })
+ }
+}
+
+impl Default for HistoryComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for HistoryComponent {
+ fn name(&self) -> &'static str {
+ "history"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ // Create the history store
+ let host_id = Settings::host_id().await?;
+ let history_store =
+ HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
+
+ *self.inner.history_store.write().await = Some(history_store);
+ *self.inner.handle.write().await = Some(handle);
+
+ tracing::info!("history component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
+ // History component produces events but doesn't need to react to them
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ tracing::info!("history component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+///
+/// This is a thin wrapper that delegates to the component's shared state.
+pub struct HistoryGrpcService {
+ inner: Arc<HistoryComponentInner>,
+}
+
+#[tonic::async_trait]
+impl HistorySvc for HistoryGrpcService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .build()
+ .into();
+
+ // Emit the event
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.emit(DaemonEvent::HistoryStarted(h.clone()));
+ }
+
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ self.inner.running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply {
+ id: id.to_string(),
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let req = request.into_inner();
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = self.inner.running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Get the handle and store to save the history
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let store_guard = self.inner.history_store.read().await;
+ let history_store = store_guard
+ .as_ref()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ // Save to database
+ handle
+ .history_db()
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ // Push to record store
+ let (record_id, idx) = history_store
+ .push(history.clone())
+ .await
+ .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
+
+ // Emit the event
+ handle.emit(DaemonEvent::HistoryEnded(history));
+
+ let reply = EndHistoryReply {
+ id: record_id.0.to_string(),
+ idx,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn status(
+ &self,
+ _request: Request<StatusRequest>,
+ ) -> Result<Response<StatusReply>, Status> {
+ let reply = StatusReply {
+ healthy: true,
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ pid: std::process::id(),
+ protocol: DAEMON_PROTOCOL_VERSION,
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn shutdown(
+ &self,
+ _request: Request<ShutdownRequest>,
+ ) -> Result<Response<ShutdownReply>, Status> {
+ // Use the daemon handle to request shutdown
+ if let Some(handle) = self.inner.handle.read().await.as_ref() {
+ handle.shutdown();
+ }
+ Ok(Response::new(ShutdownReply { accepted: true }))
+ }
+}
diff --git a/crates/atuin-daemon/src/components/mod.rs b/crates/atuin-daemon/src/components/mod.rs
new file mode 100644
index 00000000..5950d5d5
--- /dev/null
+++ b/crates/atuin-daemon/src/components/mod.rs
@@ -0,0 +1,22 @@
+//! Daemon components.
+//!
+//! Components are the building blocks of the daemon. Each component handles
+//! a specific domain and can:
+//!
+//! - Expose gRPC services
+//! - React to events
+//! - Spawn background tasks
+//!
+//! Available components:
+//!
+//! - [`history::HistoryComponent`]: Command history lifecycle management
+//! - [`search::SearchComponent`]: Fuzzy search over history
+//! - [`sync::SyncComponent`]: Cloud sync
+
+pub mod history;
+pub mod search;
+pub mod sync;
+
+pub use history::HistoryComponent;
+pub use search::SearchComponent;
+pub use sync::SyncComponent;
diff --git a/crates/atuin-daemon/src/components/search.rs b/crates/atuin-daemon/src/components/search.rs
new file mode 100644
index 00000000..7fb59dea
--- /dev/null
+++ b/crates/atuin-daemon/src/components/search.rs
@@ -0,0 +1,394 @@
+//! Search component.
+//!
+//! Provides fuzzy search over command history using the Nucleo search library
+//! with frecency-based ranking and dynamic filtering.
+
+use std::{pin::Pin, sync::Arc};
+
+use atuin_client::database::Database;
+use eyre::Result;
+use tokio::sync::RwLock;
+use tokio_stream::Stream;
+use tonic::{Request, Response, Status, Streaming};
+use tracing::{Level, debug, info, instrument, span, trace};
+use uuid::Uuid;
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+ search::{
+ FilterMode, IndexFilterMode, QueryContext, SearchIndex, SearchRequest, SearchResponse,
+ search_server::{Search as SearchSvc, SearchServer},
+ },
+};
+
+const PAGE_SIZE: usize = 5000;
+const RESULTS_LIMIT: u32 = 200;
+/// How often to rebuild the frecency map (in seconds).
+const FRECENCY_REFRESH_INTERVAL_SECS: u64 = 60;
+
+/// Search component - provides fuzzy search over command history.
+///
+/// This component:
+/// - Maintains a deduplicated search index with frecency ranking
+/// - Loads history from the database on startup
+/// - Updates the index when history events occur
+/// - Provides the Search gRPC service
+pub struct SearchComponent {
+ index: Arc<RwLock<SearchIndex>>,
+ handle: tokio::sync::RwLock<Option<DaemonHandle>>,
+ loader_handle: Option<tokio::task::JoinHandle<()>>,
+ frecency_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl SearchComponent {
+ /// Create a new search component.
+ pub fn new() -> Self {
+ Self {
+ index: Arc::new(RwLock::new(SearchIndex::new())),
+ handle: tokio::sync::RwLock::new(None),
+ loader_handle: None,
+ frecency_handle: None,
+ }
+ }
+
+ /// Get the gRPC service for this component.
+ pub fn grpc_service(&self) -> SearchServer<SearchGrpcService> {
+ SearchServer::new(SearchGrpcService {
+ index: self.index.clone(),
+ })
+ }
+
+ /// Rebuild the entire search index from the database.
+ async fn rebuild_index(&self) -> Result<()> {
+ let handle_guard = self.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .ok_or_else(|| eyre::eyre!("component not initialized"))?;
+
+ info!("Rebuilding search index from database");
+
+ // Create a new index
+ let new_index = SearchIndex::new();
+
+ // Load all history into the new index
+ let db = handle.history_db().clone();
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ new_index.add_histories(&histories);
+ }
+ Ok(None) => break,
+ Err(e) => {
+ tracing::error!("Failed to load history during rebuild: {}", e);
+ break;
+ }
+ }
+ }
+
+ info!(
+ "Search index rebuild complete; {} unique commands",
+ new_index.command_count()
+ );
+
+ // Replace the old index with the new one
+ *self.index.write().await = new_index;
+ Ok(())
+ }
+}
+
+impl Default for SearchComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SearchComponent {
+ fn name(&self) -> &'static str {
+ "search"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ *self.handle.write().await = Some(handle.clone());
+
+ // Spawn background task to load history into index
+ let index = self.index.clone();
+ let db = handle.history_db().clone();
+
+ self.loader_handle = Some(tokio::spawn(async move {
+ info!(
+ "Loading history into search index; page size = {}",
+ PAGE_SIZE
+ );
+ let mut pager = db.all_paged(PAGE_SIZE, false, true);
+ loop {
+ match pager.next().await {
+ Ok(Some(histories)) => {
+ info!(
+ "Loading {} history entries into search index",
+ histories.len()
+ );
+ index.read().await.add_histories(&histories);
+ }
+ Ok(None) => {
+ info!(
+ "Initial history load complete; {} unique commands indexed",
+ index.read().await.command_count()
+ );
+ // Build initial frecency map
+ index.read().await.rebuild_frecency().await;
+ info!("Initial frecency map built");
+ break;
+ }
+ Err(e) => {
+ tracing::error!("Failed to load history: {}", e);
+ break;
+ }
+ }
+ }
+ }));
+
+ // Spawn background task to periodically refresh frecency
+ let index_for_frecency = self.index.clone();
+ self.frecency_handle = Some(tokio::spawn(async move {
+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(
+ FRECENCY_REFRESH_INTERVAL_SECS,
+ ));
+ loop {
+ interval.tick().await;
+ trace!("Refreshing frecency map");
+ index_for_frecency.read().await.rebuild_frecency().await;
+ }
+ }));
+
+ tracing::info!("search component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ match event {
+ DaemonEvent::RecordsAdded(records) => {
+ debug!(
+ count = records.len(),
+ "Processing added records for search index"
+ );
+
+ let handle_guard = self.handle.read().await;
+ if let Some(handle) = handle_guard.as_ref() {
+ let histories: Vec<_> = handle
+ .history_db()
+ .query_history(
+ format!(
+ "select * from history where id in ({})",
+ records
+ .iter()
+ .map(|record| record.0.to_string())
+ .collect::<Vec<_>>()
+ .join(",")
+ )
+ .as_str(),
+ )
+ .await
+ .unwrap_or_default();
+
+ span!(Level::TRACE, "inject_records", count = histories.len())
+ .in_scope(async || {
+ self.index.read().await.add_histories(&histories);
+ })
+ .await;
+ }
+ }
+ DaemonEvent::HistoryStarted(history) => {
+ debug!(id = %history.id, command = %history.command, "History started (no index action)");
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ span!(Level::TRACE, "inject_history_ended")
+ .in_scope(async || {
+ self.index.read().await.add_history(history);
+ })
+ .await;
+ }
+ DaemonEvent::HistoryPruned | DaemonEvent::HistoryRebuilt => {
+ info!("History store pruned or rebuilt, rebuilding search index");
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ DaemonEvent::HistoryDeleted { ids } => {
+ info!(
+ count = ids.len(),
+ "History deleted, rebuilding search index"
+ );
+ // For now, just rebuild the entire index. A more efficient implementation
+ // would remove specific items from the index.
+ if let Err(e) = self.rebuild_index().await {
+ tracing::error!("Failed to rebuild search index: {}", e);
+ }
+ }
+ // Events we don't care about
+ DaemonEvent::SyncCompleted { .. }
+ | DaemonEvent::SyncFailed { .. }
+ | DaemonEvent::ForceSync
+ | DaemonEvent::SettingsReloaded
+ | DaemonEvent::ShutdownRequested => {}
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(handle) = self.loader_handle.take() {
+ handle.abort();
+ }
+ if let Some(handle) = self.frecency_handle.take() {
+ handle.abort();
+ }
+ tracing::info!("search component stopped");
+ Ok(())
+ }
+}
+
+/// The gRPC service implementation.
+pub struct SearchGrpcService {
+ index: Arc<RwLock<SearchIndex>>,
+}
+
+#[tonic::async_trait]
+impl SearchSvc for SearchGrpcService {
+ type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResponse, Status>> + Send>>;
+
+ #[instrument(skip_all, level = Level::TRACE, name = "search_rpc")]
+ async fn search(
+ &self,
+ request: Request<Streaming<SearchRequest>>,
+ ) -> Result<Response<Self::SearchStream>, Status> {
+ let mut in_stream = request.into_inner();
+ let index = self.index.clone();
+
+ // Create output channel
+ let (tx, rx) = tokio::sync::mpsc::channel::<Result<SearchResponse, Status>>(128);
+
+ // Spawn task to handle incoming requests and send responses
+ tokio::spawn(async move {
+ while let Some(req) = in_stream.message().await.transpose() {
+ match req {
+ Ok(search_req) => {
+ let query = search_req.query;
+ let query_id = search_req.query_id;
+ let filter_mode: FilterMode = search_req
+ .filter_mode
+ .try_into()
+ .unwrap_or(FilterMode::Global);
+ let proto_context = search_req.context;
+
+ debug!(
+ "search request: query = {}, query_id = {}, filter_mode = {}, context = {:?}",
+ query,
+ query_id,
+ filter_mode.as_str_name(),
+ proto_context
+ );
+
+ // Convert proto FilterMode + context to IndexFilterMode
+ let index_filter = convert_filter_mode(filter_mode, &proto_context);
+
+ // Build QueryContext from proto context
+ let query_context = proto_context
+ .map(|ctx| QueryContext {
+ cwd: Some(with_trailing_slash(&ctx.cwd)),
+ git_root: ctx.git_root.map(|s| with_trailing_slash(&s)),
+ hostname: Some(ctx.hostname),
+ session_id: Some(ctx.session_id),
+ })
+ .unwrap_or_default();
+
+ // Perform the search
+ let history_ids =
+ span!(Level::TRACE, "daemon_search_query", %query, query_id)
+ .in_scope(|| async {
+ let index = index.read().await;
+ index
+ .search(&query, index_filter, &query_context, RESULTS_LIMIT)
+ .await
+ })
+ .await;
+
+ // Convert history IDs to bytes
+ let ids: Vec<Vec<u8>> = history_ids
+ .iter()
+ .filter_map(|id| {
+ Uuid::parse_str(id)
+ .ok()
+ .map(|uuid| uuid.as_bytes().to_vec())
+ })
+ .collect();
+
+ if tx.send(Ok(SearchResponse { query_id, ids })).await.is_err() {
+ break; // Client disconnected
+ }
+ }
+ Err(e) => {
+ let _ = tx.send(Err(e)).await;
+ break;
+ }
+ }
+ }
+ });
+
+ // Convert receiver to stream
+ let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
+ Ok(Response::new(Box::pin(out_stream)))
+ }
+}
+
+/// Convert proto FilterMode and context to IndexFilterMode.
+fn convert_filter_mode(
+ mode: FilterMode,
+ context: &Option<crate::search::SearchContext>,
+) -> IndexFilterMode {
+ match (mode, context) {
+ (FilterMode::Global, _) => IndexFilterMode::Global,
+ (FilterMode::Directory, Some(ctx)) => {
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ (FilterMode::Workspace, Some(ctx)) => {
+ if let Some(ref git_root) = ctx.git_root {
+ IndexFilterMode::Workspace(with_trailing_slash(git_root))
+ } else {
+ // Fall back to directory if no git root
+ IndexFilterMode::Directory(with_trailing_slash(&ctx.cwd))
+ }
+ }
+ (FilterMode::Host, Some(ctx)) => IndexFilterMode::Host(ctx.hostname.clone()),
+ (FilterMode::Session, Some(ctx)) => IndexFilterMode::Session(ctx.session_id.clone()),
+ (FilterMode::SessionPreload, Some(ctx)) => {
+ // SessionPreload is similar to Session - filter by session
+ IndexFilterMode::Session(ctx.session_id.clone())
+ }
+ // If no context provided, fall back to global
+ _ => IndexFilterMode::Global,
+ }
+}
+
+#[cfg(windows)]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('\\') {
+ s.to_string()
+ } else {
+ format!("{}\\", s)
+ }
+}
+
+#[cfg(not(windows))]
+pub fn with_trailing_slash(s: &str) -> String {
+ if s.ends_with('/') {
+ s.to_string()
+ } else {
+ format!("{}/", s)
+ }
+}
diff --git a/crates/atuin-daemon/src/components/sync.rs b/crates/atuin-daemon/src/components/sync.rs
new file mode 100644
index 00000000..6217706a
--- /dev/null
+++ b/crates/atuin-daemon/src/components/sync.rs
@@ -0,0 +1,257 @@
+//! Sync component.
+//!
+//! Handles periodic synchronization with the Atuin cloud server.
+
+use eyre::Result;
+use rand::Rng;
+use tokio::sync::mpsc;
+use tokio::time::{self, MissedTickBehavior};
+
+use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
+use atuin_dotfiles::store::{AliasStore, var::VarStore};
+
+use crate::{
+ daemon::{Component, DaemonHandle},
+ events::DaemonEvent,
+};
+
+/// Commands that can be sent to the sync task.
+enum SyncCommand {
+ /// Trigger an immediate sync.
+ ForceSync,
+ /// Stop the sync loop.
+ Stop,
+}
+
+/// Sync component - handles periodic cloud synchronization.
+///
+/// This component:
+/// - Runs a background sync loop on a configurable interval
+/// - Implements exponential backoff on sync failures
+/// - Responds to ForceSync events for immediate sync
+/// - Emits SyncCompleted/SyncFailed events
+pub struct SyncComponent {
+ task_handle: Option<tokio::task::JoinHandle<()>>,
+ command_tx: Option<mpsc::Sender<SyncCommand>>,
+}
+
+impl SyncComponent {
+ /// Create a new sync component.
+ pub fn new() -> Self {
+ Self {
+ task_handle: None,
+ command_tx: None,
+ }
+ }
+}
+
+impl Default for SyncComponent {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[tonic::async_trait]
+impl Component for SyncComponent {
+ fn name(&self) -> &'static str {
+ "sync"
+ }
+
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+ let (cmd_tx, cmd_rx) = mpsc::channel(16);
+ self.command_tx = Some(cmd_tx);
+
+ // Spawn the sync loop with its own copy of the handle
+ self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
+
+ tracing::info!("sync component started");
+ Ok(())
+ }
+
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+ if let DaemonEvent::ForceSync = event {
+ tracing::info!("force sync requested");
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::ForceSync).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn stop(&mut self) -> Result<()> {
+ if let Some(tx) = &self.command_tx {
+ let _ = tx.send(SyncCommand::Stop).await;
+ }
+ if let Some(handle) = self.task_handle.take() {
+ // Give the task a moment to shut down gracefully
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ tracing::info!("sync component stopped");
+ Ok(())
+ }
+}
+
+/// The main sync loop.
+///
+/// This runs in a spawned task and handles periodic sync as well as
+/// force sync requests.
+async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
+ tracing::info!("sync loop starting");
+
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+ let host_id = match Settings::host_id().await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("failed to get host id, sync disabled: {e}");
+ return;
+ }
+ };
+
+ // Create the stores we need
+ let encryption_key = *handle.encryption_key();
+ let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
+ let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key);
+ let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key);
+
+ // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
+ let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop.
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ loop {
+ tokio::select! {
+ _ = ticker.tick() => {
+ do_sync_tick(
+ &handle,
+ &history_store,
+ &alias_store,
+ &var_store,
+ &mut ticker,
+ max_interval,
+ ).await;
+ }
+ cmd = cmd_rx.recv() => {
+ match cmd {
+ Some(SyncCommand::ForceSync) => {
+ tracing::info!("executing force sync");
+ do_sync_tick(
+ &handle,
+ &history_store,
+ &alias_store,
+ &var_store,
+ &mut ticker,
+ max_interval,
+ ).await;
+ }
+ Some(SyncCommand::Stop) | None => {
+ tracing::info!("sync loop stopping");
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Execute a single sync tick.
+async fn do_sync_tick(
+ handle: &DaemonHandle,
+ history_store: &HistoryStore,
+ alias_store: &AliasStore,
+ var_store: &VarStore,
+ ticker: &mut time::Interval,
+ max_interval: f64,
+) {
+ // Clone settings since we need them across await points
+ let settings = handle.settings().await.clone();
+
+ tracing::info!("sync tick");
+
+ // Check if logged in
+ let logged_in = match settings.logged_in().await {
+ Ok(v) => v,
+ Err(e) => {
+ tracing::warn!("failed to check login status, skipping sync tick: {e}");
+ return;
+ }
+ };
+
+ if !logged_in {
+ tracing::debug!("not logged in, skipping sync tick");
+ return;
+ }
+
+ // Perform the sync
+ let res = sync::sync(&settings, handle.store()).await;
+
+ match res {
+ Err(e) => {
+ tracing::error!("sync tick failed with {e}");
+
+ // Emit failure event
+ handle.emit(DaemonEvent::SyncFailed {
+ error: e.to_string(),
+ });
+
+ // Exponential backoff
+ let mut rng = rand::thread_rng();
+ let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ if new_interval > max_interval {
+ new_interval = max_interval;
+ }
+
+ *ticker = time::interval(time::Duration::from_secs(new_interval as u64));
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+ }
+ Ok((uploaded_count, downloaded_records)) => {
+ tracing::info!(
+ uploaded = uploaded_count,
+ downloaded = downloaded_records.len(),
+ "sync complete"
+ );
+
+ // Build history from downloaded records
+ if let Err(e) = history_store
+ .incremental_build(handle.history_db(), &downloaded_records)
+ .await
+ {
+ tracing::error!("failed to build history from downloaded records: {e}");
+ }
+
+ // Emit the records added event (for search indexing)
+ handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
+
+ // Emit sync completed event
+ handle.emit(DaemonEvent::SyncCompleted {
+ uploaded: uploaded_count as usize,
+ downloaded: downloaded_records.len(),
+ });
+
+ // Rebuild alias and var stores
+ if let Err(e) = alias_store.build().await {
+ tracing::error!("failed to rebuild alias store: {e}");
+ }
+ if let Err(e) = var_store.build().await {
+ tracing::error!("failed to rebuild var store: {e}");
+ }
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ *ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+ }
+
+ // Store sync time
+ if let Err(e) = Settings::save_sync_time().await {
+ tracing::error!("failed to save sync time: {e}");
+ }
+ }
+ }
+}
diff --git a/crates/atuin-daemon/src/control/mod.rs b/crates/atuin-daemon/src/control/mod.rs
new file mode 100644
index 00000000..afb29c57
--- /dev/null
+++ b/crates/atuin-daemon/src/control/mod.rs
@@ -0,0 +1,12 @@
+//! Control module for external event injection.
+//!
+//! This module provides the gRPC service that allows external processes
+//! (like CLI commands) to inject events into the daemon's event bus.
+
+mod service;
+
+// Include the generated proto code
+tonic::include_proto!("control");
+
+// Re-export the service
+pub use service::ControlService;
diff --git a/crates/atuin-daemon/src/control/service.rs b/crates/atuin-daemon/src/control/service.rs
new file mode 100644
index 00000000..2e7403ce
--- /dev/null
+++ b/crates/atuin-daemon/src/control/service.rs
@@ -0,0 +1,71 @@
+//! Control service implementation.
+//!
+//! This gRPC service allows external processes (like CLI commands) to inject
+//! events into the daemon's event bus.
+
+use atuin_client::history::HistoryId;
+use tonic::{Request, Response, Status};
+use tracing::{Level, info, instrument};
+
+use super::{
+ SendEventRequest, SendEventResponse,
+ control_server::{Control, ControlServer},
+ send_event_request::Event,
+};
+use crate::{daemon::DaemonHandle, events::DaemonEvent};
+
+/// The Control gRPC service.
+///
+/// This service is used by external processes to inject events into the daemon.
+/// It's not a component - it's part of the daemon's core infrastructure.
+pub struct ControlService {
+ handle: DaemonHandle,
+}
+
+impl ControlService {
+ /// Create a new control service with the given daemon handle.
+ pub fn new(handle: DaemonHandle) -> Self {
+ Self { handle }
+ }
+
+ /// Get a tonic server for this service.
+ pub fn into_server(self) -> ControlServer<Self> {
+ ControlServer::new(self)
+ }
+}
+
+#[tonic::async_trait]
+impl Control for ControlService {
+ #[instrument(skip_all, level = Level::INFO, name = "control_send_event")]
+ async fn send_event(
+ &self,
+ request: Request<SendEventRequest>,
+ ) -> Result<Response<SendEventResponse>, Status> {
+ let req = request.into_inner();
+
+ let event = req
+ .event
+ .ok_or_else(|| Status::invalid_argument("event is required"))?;
+
+ let daemon_event = proto_event_to_daemon_event(event)?;
+
+ info!(?daemon_event, "received control event");
+ self.handle.emit(daemon_event);
+
+ Ok(Response::new(SendEventResponse {}))
+ }
+}
+
+/// Convert a proto event to a daemon event.
+fn proto_event_to_daemon_event(event: Event) -> Result<DaemonEvent, Status> {
+ match event {
+ Event::HistoryPruned(_) => Ok(DaemonEvent::HistoryPruned),
+ Event::HistoryRebuilt(_) => Ok(DaemonEvent::HistoryRebuilt),
+ Event::HistoryDeleted(e) => Ok(DaemonEvent::HistoryDeleted {
+ ids: e.ids.into_iter().map(HistoryId).collect(),
+ }),
+ Event::ForceSync(_) => Ok(DaemonEvent::ForceSync),
+ Event::SettingsReloaded(_) => Ok(DaemonEvent::SettingsReloaded),
+ Event::Shutdown(_) => Ok(DaemonEvent::ShutdownRequested),
+ }
+}
diff --git a/crates/atuin-daemon/src/daemon.rs b/crates/atuin-daemon/src/daemon.rs
new file mode 100644
index 00000000..ec0b7b68
--- /dev/null
+++ b/crates/atuin-daemon/src/daemon.rs
@@ -0,0 +1,450 @@
+//! Core daemon infrastructure.
+//!
+//! This module provides the foundational types for building the atuin daemon:
+//!
+//! - [`DaemonState`]: Shared state owned by the daemon
+//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
+//! - [`Component`]: A trait for implementing daemon components
+//! - [`Daemon`]: The main daemon orchestrator
+//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
+
+use std::sync::Arc;
+
+use atuin_client::{
+ database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
+ settings::Settings,
+};
+use eyre::{Context, Result};
+use tokio::sync::{RwLock, broadcast};
+
+use crate::events::DaemonEvent;
+
+// ============================================================================
+// DaemonState
+// ============================================================================
+
+/// Shared state owned by the daemon.
+///
+/// This contains all the resources that components and services need access to.
+/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
+pub struct DaemonState {
+ // Event bus
+ event_tx: broadcast::Sender<DaemonEvent>,
+
+ // Configuration (mutable - can be reloaded)
+ settings: RwLock<Settings>,
+
+ // Encryption key (immutable - derived at startup)
+ encryption_key: [u8; 32],
+
+ // Database handles
+ history_db: HistoryDatabase,
+ store: SqliteStore,
+}
+
+// ============================================================================
+// DaemonHandle
+// ============================================================================
+
+/// A lightweight handle to the daemon's shared state.
+///
+/// This is the primary way for components, gRPC services, and spawned tasks to
+/// interact with the daemon. It provides access to:
+///
+/// - Event emission and subscription
+/// - Configuration (settings, encryption key)
+/// - Database handles
+///
+/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
+/// around to any code that needs daemon access.
+///
+/// # Example
+///
+/// ```ignore
+/// // Emit an event
+/// handle.emit(DaemonEvent::HistoryPruned);
+///
+/// // Access settings
+/// let settings = handle.settings().await;
+/// let sync_freq = settings.daemon.sync_frequency;
+///
+/// // Access database
+/// let history = handle.history_db().load(id).await?;
+/// ```
+#[derive(Clone)]
+pub struct DaemonHandle {
+ state: Arc<DaemonState>,
+}
+
+impl DaemonHandle {
+ // ---- Events ----
+
+ /// Emit an event to the daemon's event bus.
+ ///
+ /// This is fire-and-forget - if no receivers are listening (which shouldn't
+ /// happen in normal operation), the event is dropped silently.
+ pub fn emit(&self, event: DaemonEvent) {
+ if let Err(e) = self.state.event_tx.send(event) {
+ tracing::warn!("failed to emit event (no receivers?): {e}");
+ }
+ }
+
+ /// Subscribe to the event bus.
+ ///
+ /// Returns a receiver that will receive all events emitted after this call.
+ /// Useful for components that need to listen for events outside of the
+ /// normal `handle_event` callback flow.
+ pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
+ self.state.event_tx.subscribe()
+ }
+
+ /// Request graceful shutdown of the daemon.
+ pub fn shutdown(&self) {
+ self.emit(DaemonEvent::ShutdownRequested);
+ }
+
+ // ---- Configuration ----
+
+ /// Get the current settings.
+ ///
+ /// This acquires a read lock on the settings. For most use cases, clone
+ /// the settings if you need to hold onto them.
+ pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
+ self.state.settings.read().await
+ }
+
+ /// Reload settings from disk and emit a SettingsReloaded event.
+ ///
+ /// Components listening for `SettingsReloaded` can then re-read settings
+ /// via `handle.settings()` to pick up the changes.
+ pub async fn reload_settings(&self) -> Result<()> {
+ let new_settings = Settings::new()?;
+ *self.state.settings.write().await = new_settings;
+ self.emit(DaemonEvent::SettingsReloaded);
+ tracing::info!("settings reloaded");
+ Ok(())
+ }
+
+ /// Get the encryption key.
+ pub fn encryption_key(&self) -> &[u8; 32] {
+ &self.state.encryption_key
+ }
+
+ // ---- Database ----
+
+ /// Get a reference to the history database.
+ pub fn history_db(&self) -> &HistoryDatabase {
+ &self.state.history_db
+ }
+
+ /// Get a reference to the record store.
+ pub fn store(&self) -> &SqliteStore {
+ &self.state.store
+ }
+}
+
+impl std::fmt::Debug for DaemonHandle {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DaemonHandle").finish_non_exhaustive()
+ }
+}
+
+// ============================================================================
+// Component Trait
+// ============================================================================
+
+/// A daemon component that handles a specific domain.
+///
+/// Components are the building blocks of the daemon. Each component:
+///
+/// - Has a unique name for logging and debugging
+/// - Can optionally expose gRPC services
+/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
+/// - Handles events from the event bus
+/// - Performs cleanup on shutdown
+///
+/// # Lifecycle
+///
+/// 1. **Construction**: Component is created (usually via `new()`)
+/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
+/// 3. **Running**: `handle_event()` is called for each event on the bus
+/// 4. **Shutdown**: `stop()` is called for cleanup
+///
+/// # Example
+///
+/// ```ignore
+/// pub struct MyComponent {
+/// handle: Option<DaemonHandle>,
+/// }
+///
+/// #[async_trait]
+/// impl Component for MyComponent {
+/// fn name(&self) -> &'static str { "my-component" }
+///
+/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
+/// self.handle = Some(handle);
+/// Ok(())
+/// }
+///
+/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
+/// match event {
+/// DaemonEvent::SomeEvent => {
+/// // Handle the event
+/// if let Some(handle) = &self.handle {
+/// handle.emit(DaemonEvent::ResponseEvent);
+/// }
+/// }
+/// _ => {}
+/// }
+/// Ok(())
+/// }
+///
+/// async fn stop(&mut self) -> Result<()> {
+/// Ok(())
+/// }
+/// }
+/// ```
+#[tonic::async_trait]
+pub trait Component: Send + Sync {
+ /// Human-readable name for logging and debugging.
+ fn name(&self) -> &'static str;
+
+ /// Called once at startup.
+ ///
+ /// Store the handle if you need to emit events or access daemon resources
+ /// later. The handle is cheaply cloneable, so feel free to clone it for
+ /// spawned tasks.
+ async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
+
+ /// Handle an incoming event.
+ ///
+ /// Called for every event on the bus. To emit new events in response,
+ /// use the handle stored during `start()`. Events emitted here will be
+ /// processed in subsequent event loop iterations.
+ async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
+
+ /// Called on graceful shutdown.
+ ///
+ /// Use this to clean up resources, abort spawned tasks, etc.
+ async fn stop(&mut self) -> Result<()>;
+}
+
+// ============================================================================
+// Daemon
+// ============================================================================
+
+/// The main daemon orchestrator.
+///
+/// The daemon manages components, runs the event loop, and coordinates startup
+/// and shutdown. It is constructed via [`DaemonBuilder`].
+///
+/// # Event Loop
+///
+/// The daemon runs a simple event loop:
+///
+/// 1. Wait for an event on the bus
+/// 2. Dispatch the event to all components (in registration order)
+/// 3. Components may emit new events in response
+/// 4. Repeat until `ShutdownRequested` is received
+///
+/// Events emitted during handling are queued and processed in subsequent
+/// iterations, ensuring the loop eventually drains.
+pub struct Daemon {
+ components: Vec<Box<dyn Component>>,
+ handle: DaemonHandle,
+}
+
+impl Daemon {
+ /// Create a new daemon builder.
+ pub fn builder(settings: Settings) -> DaemonBuilder {
+ DaemonBuilder::new(settings)
+ }
+
+ /// Get a clone of the daemon handle.
+ ///
+ /// The handle can be used to emit events, access settings, etc.
+ pub fn handle(&self) -> DaemonHandle {
+ self.handle.clone()
+ }
+
+ /// Start all components.
+ ///
+ /// This must be called before `run_event_loop()`. It initializes all
+ /// registered components with the daemon handle.
+ pub async fn start_components(&mut self) -> Result<()> {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "starting component");
+ component
+ .start(self.handle.clone())
+ .await
+ .with_context(|| format!("failed to start component: {}", component.name()))?;
+ }
+ Ok(())
+ }
+
+ /// Run the daemon event loop.
+ ///
+ /// This processes events until a ShutdownRequested event is received.
+ /// Components must be started first via `start_components()`.
+ pub async fn run_event_loop(&mut self) -> Result<()> {
+ let mut event_rx = self.handle.subscribe();
+ loop {
+ match event_rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => {
+ tracing::info!("shutdown requested, stopping daemon");
+ break;
+ }
+ Ok(event) => {
+ tracing::debug!(?event, "processing event");
+ self.dispatch_event(&event).await;
+ }
+ Err(broadcast::error::RecvError::Lagged(n)) => {
+ tracing::warn!(
+ skipped = n,
+ "event receiver lagged, some events were dropped"
+ );
+ }
+ Err(broadcast::error::RecvError::Closed) => {
+ tracing::info!("event bus closed, stopping daemon");
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Stop all components.
+ ///
+ /// This performs graceful shutdown of all components.
+ pub async fn stop_components(&mut self) {
+ for component in &mut self.components {
+ tracing::info!(component = component.name(), "stopping component");
+ if let Err(e) = component.stop().await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error stopping component"
+ );
+ }
+ }
+ tracing::info!("all components stopped");
+ }
+
+ /// Run the daemon.
+ ///
+ /// This is a convenience method that starts components, runs the event loop,
+ /// and handles shutdown. It does not return until the daemon is shut down.
+ pub async fn run(mut self) -> Result<()> {
+ self.start_components().await?;
+ self.run_event_loop().await?;
+ self.stop_components().await;
+ tracing::info!("daemon stopped");
+ Ok(())
+ }
+
+ async fn dispatch_event(&mut self, event: &DaemonEvent) {
+ for component in &mut self.components {
+ if let Err(e) = component.handle_event(event).await {
+ tracing::error!(
+ component = component.name(),
+ error = ?e,
+ "error handling event"
+ );
+ }
+ }
+ }
+}
+
+// ============================================================================
+// DaemonBuilder
+// ============================================================================
+
+/// Builder for constructing a [`Daemon`].
+///
+/// # Example
+///
+/// ```ignore
+/// let daemon = Daemon::builder(settings)
+/// .store(store)
+/// .history_db(history_db)
+/// .component(HistoryComponent::new())
+/// .component(SearchComponent::new())
+/// .component(SyncComponent::new())
+/// .build()
+/// .await?;
+///
+/// daemon.run().await?;
+/// ```
+pub struct DaemonBuilder {
+ settings: Settings,
+ store: Option<SqliteStore>,
+ history_db: Option<HistoryDatabase>,
+ components: Vec<Box<dyn Component>>,
+}
+
+impl DaemonBuilder {
+ /// Create a new daemon builder with the given settings.
+ pub fn new(settings: Settings) -> Self {
+ Self {
+ settings,
+ store: None,
+ history_db: None,
+ components: Vec::new(),
+ }
+ }
+
+ /// Set the record store.
+ pub fn store(mut self, store: SqliteStore) -> Self {
+ self.store = Some(store);
+ self
+ }
+
+ /// Set the history database.
+ pub fn history_db(mut self, db: HistoryDatabase) -> Self {
+ self.history_db = Some(db);
+ self
+ }
+
+ /// Register a component.
+ ///
+ /// Components are started in registration order and stopped in reverse order.
+ pub fn component(mut self, component: impl Component + 'static) -> Self {
+ self.components.push(Box::new(component));
+ self
+ }
+
+ /// Build the daemon.
+ ///
+ /// This loads the encryption key and creates the daemon state.
+ pub async fn build(self) -> Result<Daemon> {
+ let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
+ let history_db = self
+ .history_db
+ .ok_or_else(|| eyre::eyre!("history_db is required"))?;
+
+ // Load encryption key
+ let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
+ .context("could not load encryption key")?
+ .into();
+
+ // Create the event bus
+ let (event_tx, _) = broadcast::channel(64);
+
+ // Create the shared state
+ let state = Arc::new(DaemonState {
+ event_tx,
+ settings: RwLock::new(self.settings),
+ encryption_key,
+ history_db,
+ store,
+ });
+
+ // Create the handle (just a reference to the state)
+ let handle = DaemonHandle { state };
+
+ Ok(Daemon {
+ components: self.components,
+ handle,
+ })
+ }
+}
diff --git a/crates/atuin-daemon/src/events.rs b/crates/atuin-daemon/src/events.rs
new file mode 100644
index 00000000..4e6c6ff3
--- /dev/null
+++ b/crates/atuin-daemon/src/events.rs
@@ -0,0 +1,74 @@
+//! Daemon events.
+//!
+//! Events are the primary communication mechanism within the daemon.
+//! Components emit events to notify others of state changes, and handle
+//! events to react to changes elsewhere in the system.
+//!
+//! External processes (like CLI commands) can also inject events via the
+//! Control gRPC service.
+
+use atuin_client::history::{History, HistoryId};
+use atuin_common::record::RecordId;
+
+/// Events that flow through the daemon's event bus.
+///
+/// Events are broadcast to all components. Each component decides which
+/// events it cares about in its `handle_event` implementation.
+#[derive(Debug, Clone)]
+pub enum DaemonEvent {
+ // ---- History lifecycle ----
+ /// A command has started running.
+ HistoryStarted(History),
+
+ /// A command has finished running.
+ HistoryEnded(History),
+
+ // ---- Sync ----
+ /// Records were synced from the server.
+ ///
+ /// The search component uses this to update its index with new history.
+ RecordsAdded(Vec<RecordId>),
+
+ /// Sync completed successfully.
+ SyncCompleted {
+ /// Number of records uploaded.
+ uploaded: usize,
+ /// Number of records downloaded.
+ downloaded: usize,
+ },
+
+ /// Sync failed.
+ SyncFailed {
+ /// Error message describing what went wrong.
+ error: String,
+ },
+
+ /// Request an immediate sync (external trigger).
+ ForceSync,
+
+ // ---- External commands ----
+ /// History was pruned - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin history prune` or similar.
+ HistoryPruned,
+
+ /// History was rebuilt - search index needs a full rebuild.
+ ///
+ /// Emitted when the user runs `atuin store rebuild history` or similar.
+ HistoryRebuilt,
+
+ /// Specific history items were deleted.
+ ///
+ /// The search component should remove these from its index.
+ HistoryDeleted {
+ /// IDs of the deleted history entries.
+ ids: Vec<HistoryId>,
+ },
+
+ /// Settings have changed, components should reload if needed.
+ SettingsReloaded,
+
+ // ---- Lifecycle ----
+ /// Request graceful shutdown of the daemon.
+ ShutdownRequested,
+}
diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs
deleted file mode 100644
index 57f5b2cf..00000000
--- a/crates/atuin-daemon/src/history.rs
+++ /dev/null
@@ -1 +0,0 @@
-tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/history/mod.rs b/crates/atuin-daemon/src/history/mod.rs
new file mode 100644
index 00000000..b71853df
--- /dev/null
+++ b/crates/atuin-daemon/src/history/mod.rs
@@ -0,0 +1,6 @@
+//! History module for the daemon gRPC history service.
+//!
+//! This module contains the proto-generated types for the history gRPC service.
+
+// Include the generated proto code
+tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs
index e00060bc..6dc04db3 100644
--- a/crates/atuin-daemon/src/lib.rs
+++ b/crates/atuin-daemon/src/lib.rs
@@ -1,3 +1,110 @@
+use atuin_client::database::Sqlite as HistoryDatabase;
+use atuin_client::{record::sqlite_store::SqliteStore, settings::Settings};
+use eyre::Result;
+
pub mod client;
+pub mod components;
+pub mod control;
+pub mod daemon;
+pub mod events;
pub mod history;
+pub mod search;
pub mod server;
+
+// Re-export core daemon types for convenience
+pub use daemon::{Component, Daemon, DaemonBuilder, DaemonHandle};
+pub use events::DaemonEvent;
+
+// Re-export components
+pub use components::{HistoryComponent, SearchComponent, SyncComponent};
+
+// Re-export client helpers
+pub use client::{ControlClient, emit_event, emit_event_with_settings};
+
+/// Boot the daemon using the new component-based architecture.
+///
+/// This creates a daemon with the standard components (history, search, sync),
+/// starts the gRPC server with their services, and runs the event loop.
+pub async fn boot(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: HistoryDatabase,
+) -> Result<()> {
+ // Create the components
+ let history_component = HistoryComponent::new();
+ let search_component = SearchComponent::new();
+ let sync_component = SyncComponent::new();
+
+ // Get the gRPC services before moving components into the daemon
+ // (The services share state with the components via Arc)
+ let history_service = history_component.grpc_service();
+ let search_service = search_component.grpc_service();
+
+ // Build the daemon
+ let mut daemon = Daemon::builder(settings.clone())
+ .store(store)
+ .history_db(history_db)
+ .component(history_component)
+ .component(search_component)
+ .component(sync_component)
+ .build()
+ .await?;
+
+ // Get a handle for the control service and gRPC server shutdown
+ let handle = daemon.handle();
+
+ // Create the control service
+ let control_service = control::ControlService::new(handle.clone());
+
+ // Start all components first (so gRPC services can work)
+ daemon.start_components().await?;
+
+ // Spawn signal handler to emit ShutdownRequested on Ctrl+C/SIGTERM
+ let signal_handle = handle.clone();
+ tokio::spawn(async move {
+ shutdown_signal().await;
+ tracing::info!("received shutdown signal");
+ signal_handle.shutdown();
+ });
+
+ // Start the gRPC server in the background
+ server::run_grpc_server(
+ settings,
+ history_service,
+ search_service,
+ control_service.into_server(),
+ handle,
+ )
+ .await?;
+
+ // Run the daemon event loop
+ daemon.run_event_loop().await?;
+
+ // Stop all components on shutdown
+ daemon.stop_components().await;
+
+ tracing::info!("daemon shut down complete");
+ Ok(())
+}
+
+/// Wait for a shutdown signal (Ctrl+C or SIGTERM).
+#[cfg(unix)]
+async fn shutdown_signal() {
+ let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .expect("failed to register sigterm handler");
+ let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
+ .expect("failed to register sigint handler");
+
+ tokio::select! {
+ _ = term.recv() => {},
+ _ = int.recv() => {},
+ }
+}
+
+/// Wait for a shutdown signal (Ctrl+C).
+#[cfg(not(unix))]
+async fn shutdown_signal() {
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to listen for ctrl+c");
+}
diff --git a/crates/atuin-daemon/src/search/index.rs b/crates/atuin-daemon/src/search/index.rs
new file mode 100644
index 00000000..b15b057f
--- /dev/null
+++ b/crates/atuin-daemon/src/search/index.rs
@@ -0,0 +1,572 @@
+//! Search index with frecency-based ranking.
+//!
+//! This module provides a deduplicated search index where each unique command
+//! is stored once, with metadata about all its invocations. This enables:
+//!
+//! - Efficient fuzzy matching (fewer items to match)
+//! - Frecency-based ranking (frequency + recency)
+//! - Dynamic filtering by directory, host, session, etc.
+
+use std::{collections::HashMap, sync::Arc};
+
+use atuin_client::history::History;
+use dashmap::{DashMap, DashSet};
+use nucleo::{Injector, Nucleo, pattern};
+use time::OffsetDateTime;
+use tokio::sync::RwLock;
+use tracing::{Level, instrument};
+
+use crate::components::search::with_trailing_slash;
+
+/// Data for a single invocation of a command.
+#[derive(Debug, Clone)]
+pub struct Invocation {
+ /// When the command was run.
+ pub timestamp: i64,
+ /// The working directory when the command was run.
+ #[allow(dead_code)]
+ pub cwd: String,
+ /// The hostname where the command was run.
+ #[allow(dead_code)]
+ pub hostname: String,
+ /// The session ID.
+ #[allow(dead_code)]
+ pub session: String,
+ /// The history entry ID (for returning in search results).
+ pub history_id: String,
+}
+
+impl From<&History> for Invocation {
+ fn from(history: &History) -> Self {
+ Self {
+ timestamp: history.timestamp.unix_timestamp(),
+ cwd: history.cwd.clone(),
+ hostname: history.hostname.clone(),
+ session: history.session.clone(),
+ history_id: history.id.0.clone(),
+ }
+ }
+}
+
+/// Pre-computed frecency data for O(1) lookup.
+#[derive(Debug, Clone, Default)]
+pub struct FrecencyData {
+ /// Total number of times this command was used.
+ pub count: u32,
+ /// Most recent usage timestamp (unix seconds).
+ pub last_used: i64,
+}
+
+impl FrecencyData {
+ /// Record a new usage of this command.
+ pub fn record_use(&mut self, timestamp: i64) {
+ self.count += 1;
+ if timestamp > self.last_used {
+ self.last_used = timestamp;
+ }
+ }
+
+ /// Compute frecency score based on count and recency.
+ ///
+ /// Uses a decay function where more recent commands score higher.
+ /// The formula balances frequency (how often) with recency (how recent).
+ #[instrument(level = tracing::Level::TRACE, name = "index_frecency_compute")]
+ pub fn compute(&self, now: i64) -> u32 {
+ if self.count == 0 {
+ return 0;
+ }
+
+ // Time-based decay: score decreases as time passes
+ let age_seconds = (now - self.last_used).max(0) as u64;
+ let age_hours = age_seconds / 3600;
+
+ // Decay factor: recent commands get higher scores
+ // - Last hour: multiplier ~1.0
+ // - Last day: multiplier ~0.5
+ // - Last week: multiplier ~0.1
+ // - Older: multiplier approaches 0
+ let recency_score = match age_hours {
+ 0 => 100,
+ 1..=6 => 90,
+ 7..=24 => 70,
+ 25..=72 => 50,
+ 73..=168 => 30,
+ 169..=720 => 15,
+ _ => 5,
+ };
+
+ // Frequency boost: more uses = higher score (with diminishing returns)
+ let frequency_score = ((self.count as f64).ln() * 20.0).min(100.0) as u32;
+
+ // Combined score
+ recency_score + frequency_score
+ }
+}
+
+/// Data for a unique command, including all its invocations.
+pub struct CommandData {
+ /// The command text (stored for debugging/logging purposes).
+ #[allow(dead_code)]
+ pub command: String,
+ /// All invocations of this command, sorted by timestamp (newest first).
+ pub invocations: Vec<Invocation>,
+ /// Pre-computed global frecency.
+ pub global_frecency: FrecencyData,
+
+ // Pre-computed indexes for O(1) filter lookups
+ /// All directories where this command has been run.
+ directories: DashSet<String>,
+ /// All hostnames where this command has been run.
+ hosts: DashSet<String>,
+ /// All sessions where this command has been run.
+ sessions: DashSet<String>,
+}
+
+impl CommandData {
+ /// Create a new CommandData from a history entry.
+ pub fn new(history: &History) -> Self {
+ let mut data = Self {
+ command: history.command.clone(),
+ invocations: Vec::new(),
+ global_frecency: FrecencyData::default(),
+ directories: DashSet::new(),
+ hosts: DashSet::new(),
+ sessions: DashSet::new(),
+ };
+ data.add_invocation(history);
+ data
+ }
+
+ /// Add an invocation from a history entry.
+ pub fn add_invocation(&mut self, history: &History) {
+ let timestamp = history.timestamp.unix_timestamp();
+
+ // Update global frecency
+ self.global_frecency.record_use(timestamp);
+
+ // Update pre-computed indexes for O(1) filter lookups
+ self.directories.insert(with_trailing_slash(&history.cwd));
+ self.hosts.insert(history.hostname.clone());
+ self.sessions.insert(history.session.clone());
+
+ let invocation = Invocation::from(history);
+
+ // Insert sorted by timestamp (newest first)
+ let pos = self
+ .invocations
+ .iter()
+ .position(|inv| inv.timestamp < timestamp)
+ .unwrap_or(self.invocations.len());
+ self.invocations.insert(pos, invocation);
+ }
+
+ /// Get the most recent history ID for this command.
+ pub fn most_recent_id(&self) -> Option<&str> {
+ self.invocations.first().map(|inv| inv.history_id.as_str())
+ }
+
+ /// Check if any invocation matches a directory filter (exact match).
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_dir(&self, dir: &str) -> bool {
+ self.directories.contains(dir)
+ }
+
+ /// Check if any invocation matches a directory prefix (workspace/git root).
+ /// O(n) where n = number of unique directories for this command.
+ pub fn has_invocation_in_workspace(&self, prefix: &str) -> bool {
+ self.directories.iter().any(|d| d.starts_with(prefix))
+ }
+
+ /// Check if any invocation matches a hostname.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_on_host(&self, hostname: &str) -> bool {
+ self.hosts.contains(hostname)
+ }
+
+ /// Check if any invocation matches a session.
+ /// O(1) lookup using pre-computed index.
+ pub fn has_invocation_in_session(&self, session: &str) -> bool {
+ self.sessions.contains(session)
+ }
+}
+
+/// Filter mode for search queries.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum IndexFilterMode {
+ /// No filtering - search all commands.
+ Global,
+ /// Filter to commands run in a specific directory.
+ Directory(String),
+ /// Filter to commands run in a workspace (directory prefix).
+ Workspace(String),
+ /// Filter to commands run on a specific host.
+ Host(String),
+ /// Filter to commands run in a specific session.
+ Session(String),
+}
+
+/// Context for search queries.
+#[derive(Debug, Clone, Default)]
+pub struct QueryContext {
+ pub cwd: Option<String>,
+ pub git_root: Option<String>,
+ pub hostname: Option<String>,
+ pub session_id: Option<String>,
+}
+
+/// A deduplicated search index with frecency-based ranking.
+///
+/// Commands are stored by their text, with metadata about all invocations.
+/// Nucleo handles fuzzy matching, while frecency is computed via scorer callback.
+///
+/// Global frecency is precomputed by a background task and used for scoring.
+/// If frecency data is not available, search still works but without frecency ranking;
+/// although this should never happen due to precomputing the frecency map.
+pub struct SearchIndex {
+ /// Map from command text to command data.
+ /// Using DashMap for concurrent read/write access, wrapped in Arc for sharing with scorer.
+ commands: Arc<DashMap<String, CommandData>>,
+ /// Nucleo fuzzy matcher - items are command strings.
+ nucleo: RwLock<Nucleo<String>>,
+ /// Injector for adding new commands to Nucleo.
+ injector: Injector<String>,
+ /// Precomputed global frecency map (command -> frecency score).
+ /// Updated by background task. If None, search works without frecency.
+ frecency_map: RwLock<Option<Arc<HashMap<String, u32>>>>,
+}
+
+impl SearchIndex {
+ /// Create a new empty search index.
+ pub fn new() -> Self {
+ let nucleo_config = nucleo::Config::DEFAULT;
+ // Single column for command text
+ let nucleo = Nucleo::<String>::new(nucleo_config, Arc::new(|| {}), None, 1);
+ let injector = nucleo.injector();
+
+ Self {
+ commands: Arc::new(DashMap::new()),
+ nucleo: RwLock::new(nucleo),
+ injector,
+ frecency_map: RwLock::new(None),
+ }
+ }
+
+ /// Add a history entry to the index.
+ ///
+ /// If the command already exists, updates its invocation data.
+ /// If it's a new command, adds it to both the map and Nucleo.
+ pub fn add_history(&self, history: &History) {
+ let command = &history.command;
+
+ if let Some(mut entry) = self.commands.get_mut(command) {
+ // Existing command - just update invocations
+ entry.add_invocation(history);
+ } else {
+ // New command - add to both map and Nucleo
+ let data = CommandData::new(history);
+ self.commands.insert(command.clone(), data);
+ self.injector.push(command.clone(), |cmd, cols| {
+ cols[0] = cmd.clone().into();
+ });
+ }
+ // Note: frecency_map is rebuilt by background task, not invalidated here
+ }
+
+ /// Add multiple history entries to the index.
+ pub fn add_histories(&self, histories: &[History]) {
+ for history in histories {
+ self.add_history(history);
+ }
+ }
+
+ /// Get the number of unique commands in the index.
+ pub fn command_count(&self) -> usize {
+ self.commands.len()
+ }
+
+ /// Get the number of items in Nucleo (should match command_count).
+ pub async fn nucleo_item_count(&self) -> u32 {
+ self.nucleo.read().await.snapshot().item_count()
+ }
+
+ /// Search for commands matching a query.
+ ///
+ /// Returns a list of history IDs (most recent invocation per command).
+ /// Uses precomputed global frecency for scoring if available.
+ #[instrument(skip_all, level = tracing::Level::TRACE, name = "index_search", fields(query = %query))]
+ pub async fn search(
+ &self,
+ query: &str,
+ filter_mode: IndexFilterMode,
+ _context: &QueryContext,
+ limit: u32,
+ ) -> Vec<String> {
+ let mut nucleo = self.nucleo.write().await;
+
+ // Get precomputed frecency map (may be None if not yet computed)
+ let frecency_map = self.frecency_map.read().await.clone();
+
+ // Build filter based on mode
+ let filter = self.build_filter(&filter_mode);
+ nucleo.set_filter(filter);
+
+ // Build scorer from precomputed frecency (or None if not available)
+ let scorer = Self::build_scorer(frecency_map);
+ nucleo.set_scorer(scorer);
+
+ // Update pattern
+ nucleo.pattern.reparse(
+ 0,
+ query,
+ pattern::CaseMatching::Smart,
+ pattern::Normalization::Smart,
+ false,
+ );
+
+ tracing::span!(Level::TRACE, "index_search_tick").in_scope(|| {
+ // Tick until complete
+ while nucleo.tick(10).running {}
+ });
+
+ // Collect results
+ let snapshot = nucleo.snapshot();
+ let matched_count = snapshot.matched_item_count().min(limit);
+
+ tracing::span!(Level::TRACE, "index_search_results").in_scope(|| {
+ snapshot
+ .matched_items(..matched_count)
+ .filter_map(|item| {
+ let cmd = item.data;
+ self.commands
+ .get(cmd)
+ .and_then(|data| data.most_recent_id().map(|s| s.to_string()))
+ })
+ .collect()
+ })
+ }
+
+ /// Rebuild the global frecency map.
+ ///
+ /// This should be called by a background task periodically.
+ /// The map is used for scoring search results.
+ #[instrument(skip_all, level = tracing::Level::DEBUG, name = "rebuild_frecency")]
+ pub async fn rebuild_frecency(&self) {
+ let now = OffsetDateTime::now_utc().unix_timestamp();
+ let mut frecency_map: HashMap<String, u32> = HashMap::new();
+
+ for entry in self.commands.iter() {
+ let frecency = entry.global_frecency.compute(now);
+ frecency_map.insert(entry.key().clone(), frecency);
+ }
+
+ *self.frecency_map.write().await = Some(Arc::new(frecency_map));
+ }
+
+ /// Build filter predicate for the given mode.
+ fn build_filter(&self, mode: &IndexFilterMode) -> Option<nucleo::Filter<String>> {
+ // For Global mode, no filter needed
+ if matches!(mode, IndexFilterMode::Global) {
+ return None;
+ }
+
+ // Pre-compute which commands pass the filter
+ let passing_commands: Arc<std::collections::HashSet<String>> = {
+ let mut set = std::collections::HashSet::new();
+ for entry in self.commands.iter() {
+ let passes = match mode {
+ IndexFilterMode::Global => unreachable!(),
+ IndexFilterMode::Directory(dir) => entry.has_invocation_in_dir(dir),
+ IndexFilterMode::Workspace(prefix) => entry.has_invocation_in_workspace(prefix),
+ IndexFilterMode::Host(hostname) => entry.has_invocation_on_host(hostname),
+ IndexFilterMode::Session(session) => entry.has_invocation_in_session(session),
+ };
+ if passes {
+ set.insert(entry.key().clone());
+ }
+ }
+ Arc::new(set)
+ };
+
+ Some(Arc::new(move |cmd: &String| passing_commands.contains(cmd)))
+ }
+
+ /// Build scorer from precomputed frecency map.
+ ///
+ /// Returns None if frecency map is not available (search still works, just without frecency ranking).
+ fn build_scorer(
+ frecency_map: Option<Arc<HashMap<String, u32>>>,
+ ) -> Option<nucleo::Scorer<String>> {
+ let map = frecency_map?;
+ Some(Arc::new(move |cmd: &String, fuzzy_score: u32| {
+ let frecency = map.get(cmd).copied().unwrap_or(0);
+ fuzzy_score + frecency
+ }))
+ }
+}
+
+impl Default for SearchIndex {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use time::macros::datetime;
+
+ fn make_history(command: &str, cwd: &str, timestamp: OffsetDateTime) -> History {
+ History::import()
+ .timestamp(timestamp)
+ .command(command)
+ .cwd(cwd)
+ .build()
+ .into()
+ }
+
+ #[test]
+ fn frecency_data_compute() {
+ let now = 1000000i64;
+
+ // Recent command
+ let recent = FrecencyData {
+ count: 5,
+ last_used: now - 60, // 1 minute ago
+ };
+ assert!(recent.compute(now) > 100); // High score
+
+ // Old command
+ let old = FrecencyData {
+ count: 5,
+ last_used: now - 86400 * 30, // 30 days ago
+ };
+ assert!(old.compute(now) < recent.compute(now));
+
+ // Frequently used old command
+ let frequent_old = FrecencyData {
+ count: 100,
+ last_used: now - 86400 * 7, // 1 week ago
+ };
+ // Should still have decent score due to frequency
+ assert!(frequent_old.compute(now) > 50);
+ }
+
+ #[test]
+ fn command_data_add_invocation() {
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let history1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let history2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&history1);
+ assert_eq!(data.invocations.len(), 1);
+ assert_eq!(data.global_frecency.count, 1);
+
+ data.add_invocation(&history2);
+ assert_eq!(data.invocations.len(), 2);
+ assert_eq!(data.global_frecency.count, 2);
+
+ // Most recent should be first
+ assert_eq!(data.invocations[0].cwd, dir2);
+ assert_eq!(data.invocations[1].cwd, dir1);
+ }
+
+ #[test]
+ fn command_data_filters() {
+ let (dir1, dir2) = if cfg!(windows) {
+ ("C:\\Users\\User\\project", "C:\\Users\\User\\other")
+ } else {
+ ("/home/user/project", "/home/user/other")
+ };
+
+ let h1 = make_history("git status", dir1, datetime!(2024-01-01 10:00 UTC));
+ let h2 = make_history("git status", dir2, datetime!(2024-01-01 12:00 UTC));
+
+ let mut data = CommandData::new(&h1);
+ data.add_invocation(&h2);
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User\\project"),
+ with_trailing_slash("C:\\Users\\User\\other"),
+ with_trailing_slash("C:\\Users\\User\\missing"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user/project"),
+ with_trailing_slash("/home/user/other"),
+ with_trailing_slash("/home/user/missing"),
+ )
+ };
+
+ assert!(data.has_invocation_in_dir(&check1));
+ assert!(data.has_invocation_in_dir(&check2));
+ assert!(!data.has_invocation_in_dir(&check3));
+
+ let (check1, check2, check3) = if cfg!(windows) {
+ (
+ with_trailing_slash("C:\\Users\\User"),
+ with_trailing_slash("C:\\Users"),
+ with_trailing_slash("C:\\Users\\User\\var"),
+ )
+ } else {
+ (
+ with_trailing_slash("/home/user"),
+ with_trailing_slash("/home"),
+ with_trailing_slash("/var"),
+ )
+ };
+
+ assert!(data.has_invocation_in_workspace(&check1));
+ assert!(data.has_invocation_in_workspace(&check2));
+ assert!(!data.has_invocation_in_workspace(&check3));
+ }
+
+ #[tokio::test]
+ async fn search_index_add_and_search() {
+ let index = SearchIndex::new();
+
+ let h1 = make_history(
+ "git status",
+ "/home/user/project",
+ datetime!(2024-01-01 10:00 UTC),
+ );
+ let h2 = make_history(
+ "git commit -m 'test'",
+ "/home/user/project",
+ datetime!(2024-01-01 10:05 UTC),
+ );
+ let h3 = make_history(
+ "ls -la",
+ "/home/user/other",
+ datetime!(2024-01-01 10:10 UTC),
+ );
+
+ index.add_history(&h1);
+ index.add_history(&h2);
+ index.add_history(&h3);
+
+ assert_eq!(index.command_count(), 3);
+
+ // Search for "git" - should match 2 commands
+ let results = index
+ .search("git", IndexFilterMode::Global, &QueryContext::default(), 10)
+ .await;
+ assert_eq!(results.len(), 2);
+
+ // Search with directory filter
+ let results = index
+ .search(
+ "",
+ IndexFilterMode::Directory(with_trailing_slash("/home/user/project")),
+ &QueryContext::default(),
+ 10,
+ )
+ .await;
+ assert_eq!(results.len(), 2); // git status and git commit
+ }
+}
diff --git a/crates/atuin-daemon/src/search/mod.rs b/crates/atuin-daemon/src/search/mod.rs
new file mode 100644
index 00000000..4d261956
--- /dev/null
+++ b/crates/atuin-daemon/src/search/mod.rs
@@ -0,0 +1,11 @@
+//! Search module for the daemon gRPC search service.
+//!
+//! This module provides fuzzy search over command history using Nucleo.
+
+mod index;
+
+// Include the generated proto code
+tonic::include_proto!("search");
+
+// Re-export the service and index
+pub use index::{IndexFilterMode, QueryContext, SearchIndex};
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
index 826d6191..a11de612 100644
--- a/crates/atuin-daemon/src/server.rs
+++ b/crates/atuin-daemon/src/server.rs
@@ -1,249 +1,49 @@
-use eyre::WrapErr;
-
-use atuin_client::encryption;
-use atuin_client::history::store::HistoryStore;
-use atuin_client::record::sqlite_store::SqliteStore;
-use atuin_client::settings::Settings;
-use std::io::ErrorKind;
-#[cfg(unix)]
-use std::path::PathBuf;
-use std::sync::Arc;
-use time::OffsetDateTime;
-use tokio::sync::watch;
-use tracing::{Level, instrument};
-
-use atuin_client::database::{Database, Sqlite as HistoryDatabase};
-use atuin_client::history::{History, HistoryId};
-use dashmap::DashMap;
use eyre::Result;
-use tonic::{Request, Response, Status, transport::Server};
-
-use crate::history::history_server::{History as HistorySvc, HistoryServer};
-
-use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest};
-use crate::history::{ShutdownReply, ShutdownRequest, StatusReply, StatusRequest};
-
-mod sync;
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-#[derive(Debug)]
-pub struct HistoryService {
- // A store for WIP history
- // This is history that has not yet been completed, aka a command that's current running.
- running: Arc<DashMap<HistoryId, History>>,
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
-}
-
-impl HistoryService {
- pub fn new(
- store: HistoryStore,
- history_db: HistoryDatabase,
- shutdown_tx: watch::Sender<bool>,
- ) -> Self {
- Self {
- running: Arc::new(DashMap::new()),
- store,
- history_db,
- shutdown_tx,
- }
- }
-}
-
-#[tonic::async_trait()]
-impl HistorySvc for HistoryService {
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let mut h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .build()
- .into();
- if !req.author.trim().is_empty() {
- h.author = req.author;
- }
- if !req.intent.trim().is_empty() {
- h.intent = Some(req.intent);
- }
-
- // The old behaviour had us inserting half-finished history records into the database
- // The new behaviour no longer allows that.
- // History that's running is stored in-memory by the daemon, and only committed when
- // complete.
- // If anyone relied on the old behaviour, we could perhaps insert to the history db here
- // too. I'd rather keep it pure, unless that ends up being the case.
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let running = self.running.clone();
- let req = request.into_inner();
-
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Perhaps allow the incremental build to handle this entirely.
- self.history_db
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- let (id, idx) =
- self.store.push(history).await.map_err(|e| {
- Status::internal(format!("failed to push record to store: {e:?}"))
- })?;
-
- let reply = EndHistoryReply {
- id: id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- // If status RPC responds, the daemon control plane is healthy.
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- let _ = self.shutdown_tx.send(true);
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}
-
-#[cfg(unix)]
-async fn shutdown_signal(socket: Option<PathBuf>, mut shutdown_rx: watch::Receiver<bool>) {
- let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .expect("failed to register sigterm handler");
- let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
- .expect("failed to register sigint handler");
+use crate::components::history::HistoryGrpcService;
+use crate::components::search::SearchGrpcService;
+use crate::control::{ControlService, control_server::ControlServer};
+use crate::daemon::DaemonHandle;
+use crate::history::history_server::HistoryServer;
+use crate::search::search_server::SearchServer;
- tokio::select! {
- _ = term.recv() => {},
- _ = int.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
-
- eprintln!("Removing socket...");
- if let Some(socket) = socket {
- match std::fs::remove_file(socket) {
- Ok(()) => {}
- Err(err) if err.kind() == ErrorKind::NotFound => {}
- Err(err) => {
- eprintln!("failed to remove socket: {err}");
- }
- }
- }
- eprintln!("Shutting down...");
-}
-
-#[cfg(windows)]
-async fn shutdown_signal(mut shutdown_rx: watch::Receiver<bool>) {
- let mut ctrl_c = tokio::signal::windows::ctrl_c().expect("failed to register signal handler");
- tokio::select! {
- _ = ctrl_c.recv() => {},
- _ = shutdown_rx.changed() => {},
- }
- eprintln!("Shutting down...");
-}
+use atuin_client::settings::Settings;
+/// Run the gRPC server with the given services.
+///
+/// This starts the gRPC server in the background and returns immediately.
+/// The server will shut down when a ShutdownRequested event is received.
#[cfg(unix)]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
- let socket_path = settings.daemon.socket_path;
+ let socket_path = settings.daemon.socket_path.clone();
let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
#[cfg(target_os = "linux")]
{
- use eyre::OptionExt;
+ use eyre::{OptionExt, WrapErr};
+ use std::os::unix::net::SocketAddr;
+ use std::path::PathBuf;
tracing::info!("getting systemd socket");
let listener = listenfd::ListenFd::from_env()
.take_unix_listener(0)?
.ok_or_eyre("missing systemd socket")?;
listener.set_nonblocking(true)?;
- let actual_path = listener
+ let actual_path: Result<PathBuf, eyre::Report> = listener
.local_addr()
.context("getting systemd socket's path")
- .and_then(|addr| {
+ .and_then(|addr: SocketAddr| {
addr.as_pathname()
.ok_or_eyre("systemd socket missing path")
- .map(|path| path.to_owned())
+ .map(|path: &std::path::Path| path.to_owned())
});
match actual_path {
Ok(actual_path) => {
@@ -271,66 +71,94 @@ async fn start_server(
let uds_stream = UnixListenerStream::new(uds);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(
- uds_stream,
- shutdown_signal(cleanup.then_some(socket_path.into()), shutdown_rx),
- )
- .await?;
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ let mut rx = handle.subscribe();
+ loop {
+ use crate::DaemonEvent;
+
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ if cleanup {
+ eprintln!("Removing socket...");
+ if let Err(e) = std::fs::remove_file(&socket_path)
+ && e.kind() != std::io::ErrorKind::NotFound
+ {
+ eprintln!("failed to remove socket: {e}");
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
+
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ use tonic::transport::Server;
+
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
Ok(())
}
+/// Run the gRPC server with the given services (Windows/TCP version).
#[cfg(not(unix))]
-async fn start_server(
+pub async fn run_grpc_server(
settings: Settings,
- history: HistoryService,
- shutdown_rx: watch::Receiver<bool>,
+ history_service: HistoryServer<HistoryGrpcService>,
+ search_service: SearchServer<SearchGrpcService>,
+ control_service: ControlServer<ControlService>,
+ handle: DaemonHandle,
) -> Result<()> {
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
+ use tonic::transport::Server;
let port = settings.daemon.tcp_port;
let url = format!("127.0.0.1:{port}");
- let tcp = TcpListener::bind(url).await?;
+ let tcp = TcpListener::bind(&url).await?;
let tcp_stream = TcpListenerStream::new(tcp);
tracing::info!("listening on tcp port {:?}", port);
- Server::builder()
- .add_service(HistoryServer::new(history))
- .serve_with_incoming_shutdown(tcp_stream, shutdown_signal(shutdown_rx))
- .await?;
- Ok(())
-}
-
-// break the above down when we end up with multiple services
-
-/// Listen on a unix socket
-/// Pass the path to the socket
-pub async fn listen(
- settings: Settings,
- store: SqliteStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- let encryption_key: [u8; 32] = encryption::load_key(&settings)
- .context("could not load encryption key")?
- .into();
-
- let host_id = Settings::host_id().await?;
- let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+ // Create shutdown signal from daemon handle
+ let shutdown_signal = async move {
+ use crate::DaemonEvent;
- let (shutdown_tx, shutdown_rx) = watch::channel(false);
- let history = HistoryService::new(history_store.clone(), history_db.clone(), shutdown_tx);
+ let mut rx = handle.subscribe();
+ loop {
+ match rx.recv().await {
+ Ok(DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break, // Channel closed
+ }
+ }
+ eprintln!("Shutting down gRPC server...");
+ };
- // start services
- tokio::spawn(sync::worker(
- settings.clone(),
- store,
- history_store,
- history_db,
- ));
+ // Spawn the server in the background
+ tokio::spawn(async move {
+ if let Err(e) = Server::builder()
+ .add_service(history_service)
+ .add_service(search_service)
+ .add_service(control_service)
+ .serve_with_incoming_shutdown(tcp_stream, shutdown_signal)
+ .await
+ {
+ tracing::error!("gRPC server error: {e}");
+ }
+ });
- start_server(settings, history, shutdown_rx).await
+ Ok(())
}
diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs
deleted file mode 100644
index e1e49597..00000000
--- a/crates/atuin-daemon/src/server/sync.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-use eyre::Result;
-use rand::Rng;
-use tokio::time::{self, MissedTickBehavior};
-
-use atuin_client::database::Sqlite as HistoryDatabase;
-use atuin_client::{
- encryption,
- history::store::HistoryStore,
- record::{sqlite_store::SqliteStore, sync},
- settings::Settings,
-};
-
-use atuin_dotfiles::store::{AliasStore, var::VarStore};
-
-pub async fn worker(
- settings: Settings,
- store: SqliteStore,
- history_store: HistoryStore,
- history_db: HistoryDatabase,
-) -> Result<()> {
- tracing::info!("booting sync worker");
-
- let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into();
- let host_id = Settings::host_id().await?;
- let alias_store = AliasStore::new(store.clone(), host_id, encryption_key);
- let var_store = VarStore::new(store.clone(), host_id, encryption_key);
-
- // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
- let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
-
- let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
-
- // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
- // we may end up running a lot of syncs in a hot loop. No bueno!
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
-
- loop {
- ticker.tick().await;
- tracing::info!("sync worker tick");
-
- let logged_in = match settings.logged_in().await {
- Ok(v) => v,
- Err(e) => {
- tracing::warn!("failed to check login status, skipping sync tick: {e}");
- continue;
- }
- };
-
- if !logged_in {
- tracing::debug!("not logged in, skipping sync tick");
- continue;
- }
-
- let res = sync::sync(&settings, &store).await;
-
- if let Err(e) = res {
- tracing::error!("sync tick failed with {e}");
-
- let mut rng = rand::thread_rng();
-
- let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
-
- if new_interval > max_interval {
- new_interval = max_interval;
- }
-
- ticker = time::interval(time::Duration::from_secs(new_interval as u64));
- ticker.reset_after(time::Duration::from_secs(new_interval as u64));
-
- tracing::error!("backing off, next sync tick in {new_interval}");
- } else {
- let (uploaded, downloaded) = res.unwrap();
-
- tracing::info!(
- uploaded = ?uploaded,
- downloaded = ?downloaded,
- "sync complete"
- );
-
- history_store
- .incremental_build(&history_db, &downloaded)
- .await?;
-
- alias_store.build().await?;
- var_store.build().await?;
-
- // Reset backoff on success
- if ticker.period().as_secs() != settings.daemon.sync_frequency {
- ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
- }
-
- // store sync time
- Settings::save_sync_time().await?;
- }
- }
-}
diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs
index 56457fa7..3b6952de 100644
--- a/crates/atuin-daemon/tests/lifecycle.rs
+++ b/crates/atuin-daemon/tests/lifecycle.rs
@@ -8,52 +8,97 @@ mod unix {
use std::time::Duration;
use atuin_client::database::Sqlite;
- use atuin_client::history::store::HistoryStore;
use atuin_client::record::sqlite_store::SqliteStore;
- use atuin_common::record::HostId;
- use atuin_common::utils::uuid_v7;
+ use atuin_client::settings::{Settings, init_meta_config_for_testing};
use atuin_daemon::client::HistoryClient;
- use atuin_daemon::history::history_server::HistoryServer;
- use atuin_daemon::server::HistoryService;
+ use atuin_daemon::components::HistoryComponent;
+ use atuin_daemon::{Daemon, DaemonHandle};
use tempfile::TempDir;
use tokio::net::UnixListener;
- use tokio::sync::watch;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
/// Spins up a daemon server on a temp socket and returns a connected client,
- /// the shutdown sender, and the temp dir (must be held to keep paths alive).
- async fn start_test_daemon() -> (HistoryClient, watch::Sender<bool>, TempDir) {
+ /// the daemon handle (for shutdown), and the temp dir (must be held to keep paths alive).
+ async fn start_test_daemon() -> (HistoryClient, DaemonHandle, TempDir) {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("history.db");
let record_path = tmp.path().join("records.db");
+ let key_path = tmp.path().join("key");
+ let socket_path = tmp.path().join("test.sock");
+ let meta_path = tmp.path().join("meta.db");
+
+ // Initialize the meta store config for testing (required for Settings::host_id())
+ init_meta_config_for_testing(meta_path.to_str().unwrap(), 5.0);
+
+ // Build settings with test paths
+ let settings: Settings = Settings::builder()
+ .expect("could not build settings builder")
+ .set_override("db_path", db_path.to_str().unwrap())
+ .expect("failed to set db_path")
+ .set_override("record_store_path", record_path.to_str().unwrap())
+ .expect("failed to set record_store_path")
+ .set_override("key_path", key_path.to_str().unwrap())
+ .expect("failed to set key_path")
+ .set_override("daemon.socket_path", socket_path.to_str().unwrap())
+ .expect("failed to set socket_path")
+ .set_override("meta.db_path", meta_path.to_str().unwrap())
+ .expect("failed to set meta.db_path")
+ .build()
+ .expect("could not build settings")
+ .try_deserialize()
+ .expect("could not deserialize settings");
+ // Create databases
let history_db = Sqlite::new(&db_path, 5.0).await.unwrap();
let store = SqliteStore::new(&record_path, 5.0).await.unwrap();
- let host_id = HostId(uuid_v7());
- let encryption_key = [0u8; 32];
- let history_store = HistoryStore::new(store, host_id, encryption_key);
+ // Create the history component and get its gRPC service
+ let history_component = HistoryComponent::new();
+ let history_service = history_component.grpc_service();
- let (shutdown_tx, shutdown_rx) = watch::channel(false);
- let service = HistoryService::new(history_store, history_db, shutdown_tx.clone());
+ // Build and start the daemon
+ let mut daemon = Daemon::builder(settings)
+ .store(store)
+ .history_db(history_db)
+ .component(history_component)
+ .build()
+ .await
+ .unwrap();
- let socket_path = tmp.path().join("test.sock");
+ let handle = daemon.handle();
+
+ // Start components (this initializes the history component with the handle)
+ daemon.start_components().await.unwrap();
+
+ // Start the gRPC server
let uds = UnixListener::bind(&socket_path).unwrap();
let stream = UnixListenerStream::new(uds);
- let mut rx = shutdown_rx.clone();
+ let server_handle = handle.clone();
tokio::spawn(async move {
+ let mut rx = server_handle.subscribe();
Server::builder()
- .add_service(HistoryServer::new(service))
+ .add_service(history_service)
.serve_with_incoming_shutdown(stream, async move {
- let _ = rx.changed().await;
+ loop {
+ match rx.recv().await {
+ Ok(atuin_daemon::DaemonEvent::ShutdownRequested) => break,
+ Ok(_) => continue,
+ Err(_) => break,
+ }
+ }
})
.await
.unwrap();
});
+ // Spawn the daemon event loop in the background
+ tokio::spawn(async move {
+ daemon.run_event_loop().await.unwrap();
+ });
+
// Give the server a moment to bind.
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -61,12 +106,12 @@ mod unix {
.await
.unwrap();
- (client, shutdown_tx, tmp)
+ (client, handle, tmp)
}
#[tokio::test]
async fn test_status() {
- let (mut client, _shutdown, _tmp) = start_test_daemon().await;
+ let (mut client, _handle, _tmp) = start_test_daemon().await;
let status = client.status().await.unwrap();
assert!(status.healthy);
@@ -79,7 +124,7 @@ mod unix {
async fn test_start_end_history() {
use atuin_client::history::History;
- let (mut client, _shutdown, _tmp) = start_test_daemon().await;
+ let (mut client, _handle, _tmp) = start_test_daemon().await;
let history = History::daemon()
.timestamp(time::OffsetDateTime::now_utc())
@@ -102,7 +147,7 @@ mod unix {
#[tokio::test]
async fn test_end_unknown_history_fails() {
- let (mut client, _shutdown, _tmp) = start_test_daemon().await;
+ let (mut client, _handle, _tmp) = start_test_daemon().await;
let result = client
.end_history("nonexistent-id".to_string(), 1000, 0)
@@ -112,7 +157,7 @@ mod unix {
#[tokio::test]
async fn test_shutdown() {
- let (mut client, _shutdown_tx, _tmp) = start_test_daemon().await;
+ let (mut client, _handle, _tmp) = start_test_daemon().await;
let accepted = client.shutdown().await.unwrap();
assert!(accepted);
diff --git a/crates/atuin/Cargo.toml b/crates/atuin/Cargo.toml
index e7a0daaa..d8938121 100644
--- a/crates/atuin/Cargo.toml
+++ b/crates/atuin/Cargo.toml
@@ -78,10 +78,12 @@ open = "5"
ratatui = { workspace = true }
tracing = "0.1"
tracing-subscriber = { workspace = true }
+tracing-appender = "0.2"
uuid = { workspace = true }
sysinfo = "0.30.7"
regex = "1.10.5"
norm = { version = "0.1.1", features = ["fzf-v2"] }
+nucleo-matcher = { git = "https://github.com/atuinsh/nucleo-ext.git", branch = "main" }
tempfile = { workspace = true }
shlex = "1.3.0"
diff --git a/crates/atuin/src/command/client.rs b/crates/atuin/src/command/client.rs
index 0cb0a2ae..ba55466d 100644
--- a/crates/atuin/src/command/client.rs
+++ b/crates/atuin/src/command/client.rs
@@ -1,4 +1,5 @@
-use std::path::PathBuf;
+use std::fs::{self, OpenOptions};
+use std::path::{Path, PathBuf};
use clap::Subcommand;
use eyre::{Result, WrapErr};
@@ -6,7 +7,38 @@ use eyre::{Result, WrapErr};
use atuin_client::{
database::Sqlite, record::sqlite_store::SqliteStore, settings::Settings, theme,
};
-use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
+use tracing_appender::rolling::{RollingFileAppender, Rotation};
+use tracing_subscriber::{
+ Layer, filter::EnvFilter, filter::LevelFilter, fmt, fmt::format::FmtSpan, prelude::*,
+};
+
+fn cleanup_old_logs(log_dir: &Path, prefix: &str, retention_days: u64) {
+ let cutoff = std::time::SystemTime::now()
+ - std::time::Duration::from_secs(retention_days * 24 * 60 * 60);
+
+ let Ok(entries) = fs::read_dir(log_dir) else {
+ return;
+ };
+
+ for entry in entries.flatten() {
+ let path = entry.path();
+ let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
+ continue;
+ };
+
+ // Match files like "search.log.2024-02-23" or "daemon.log.2024-02-23"
+ if !name.starts_with(prefix) || name == prefix {
+ continue;
+ }
+
+ if let Ok(metadata) = entry.metadata()
+ && let Ok(modified) = metadata.modified()
+ && modified < cutoff
+ {
+ let _ = fs::remove_file(&path);
+ }
+ }
+}
#[cfg(feature = "sync")]
mod sync;
@@ -122,18 +154,157 @@ impl Cmd {
res
}
+ #[allow(clippy::too_many_lines)]
async fn run_inner(
self,
mut settings: Settings,
mut theme_manager: theme::ThemeManager,
) -> Result<()> {
- let filter =
+ // ATUIN_LOG env var overrides config file level settings
+ let env_log_set = std::env::var("ATUIN_LOG").is_ok();
+
+ // Base filter from env var (or empty if not set)
+ let base_filter =
EnvFilter::from_env("ATUIN_LOG").add_directive("sqlx_sqlite::regexp=off".parse()?);
- tracing_subscriber::registry()
- .with(fmt::layer())
- .with(filter)
- .init();
+ let is_interactive_search = matches!(&self, Self::Search(cmd) if cmd.is_interactive());
+ // Use file-based logging for interactive search (TUI mode)
+ let use_search_logging = is_interactive_search && settings.logs.search_enabled();
+
+ // Use file-based logging for daemon
+ #[cfg(feature = "daemon")]
+ let use_daemon_logging = matches!(&self, Self::Daemon(_)) && settings.logs.daemon_enabled();
+
+ #[cfg(not(feature = "daemon"))]
+ let use_daemon_logging = false;
+
+ // Check if daemon should also log to console
+ #[cfg(feature = "daemon")]
+ let daemon_show_logs = matches!(&self, Self::Daemon(cmd) if cmd.show_logs());
+
+ #[cfg(not(feature = "daemon"))]
+ let daemon_show_logs = false;
+
+ // Set up span timing JSON logs if ATUIN_SPAN is set
+ let span_path = std::env::var("ATUIN_SPAN").ok().map(|p| {
+ if p.is_empty() {
+ "atuin-spans.json".to_string()
+ } else {
+ p
+ }
+ });
+
+ // Helper to create span timing layer
+ macro_rules! make_span_layer {
+ ($path:expr) => {{
+ let span_file = OpenOptions::new()
+ .create(true)
+ .truncate(true)
+ .write(true)
+ .open($path)?;
+ Some(
+ fmt::layer()
+ .json()
+ .with_writer(span_file)
+ .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
+ .with_filter(LevelFilter::TRACE),
+ )
+ }};
+ }
+
+ // Build the subscriber with all configured layers
+ if use_search_logging {
+ let search_filename = settings.logs.search.file.clone();
+ let log_dir = PathBuf::from(&settings.logs.dir);
+ fs::create_dir_all(&log_dir)?;
+
+ // Clean up old log files
+ cleanup_old_logs(&log_dir, &search_filename, settings.logs.search_retention());
+
+ let file_appender =
+ RollingFileAppender::new(Rotation::DAILY, &log_dir, &search_filename);
+
+ // Use config level unless ATUIN_LOG is set
+ let filter = if env_log_set {
+ base_filter
+ } else {
+ EnvFilter::default()
+ .add_directive(settings.logs.search_level().as_directive().parse()?)
+ .add_directive("sqlx_sqlite::regexp=off".parse()?)
+ };
+
+ let base = tracing_subscriber::registry().with(
+ fmt::layer()
+ .with_writer(file_appender)
+ .with_ansi(false)
+ .with_filter(filter),
+ );
+
+ match &span_path {
+ Some(sp) => {
+ base.with(make_span_layer!(sp)).init();
+ }
+ None => {
+ base.init();
+ }
+ }
+ } else if use_daemon_logging {
+ let daemon_filename = settings.logs.daemon.file.clone();
+ let log_dir = PathBuf::from(&settings.logs.dir);
+ fs::create_dir_all(&log_dir)?;
+
+ // Clean up old log files
+ cleanup_old_logs(&log_dir, &daemon_filename, settings.logs.daemon_retention());
+
+ let file_appender =
+ RollingFileAppender::new(Rotation::DAILY, &log_dir, &daemon_filename);
+
+ // Use config level unless ATUIN_LOG is set
+ let file_filter = if env_log_set {
+ base_filter
+ } else {
+ EnvFilter::default()
+ .add_directive(settings.logs.daemon_level().as_directive().parse()?)
+ .add_directive("sqlx_sqlite::regexp=off".parse()?)
+ };
+
+ let file_layer = fmt::layer()
+ .with_writer(file_appender)
+ .with_ansi(false)
+ .with_filter(file_filter);
+
+ // Optionally add console layer for --show-logs
+ if daemon_show_logs {
+ let console_filter = EnvFilter::from_env("ATUIN_LOG")
+ .add_directive("sqlx_sqlite::regexp=off".parse()?);
+
+ let console_layer = fmt::layer().with_filter(console_filter);
+
+ let base = tracing_subscriber::registry()
+ .with(file_layer)
+ .with(console_layer);
+
+ match &span_path {
+ Some(sp) => {
+ base.with(make_span_layer!(sp)).init();
+ }
+ None => {
+ base.init();
+ }
+ }
+ } else {
+ let base = tracing_subscriber::registry().with(file_layer);
+
+ match &span_path {
+ Some(sp) => {
+ base.with(make_span_layer!(sp)).init();
+ }
+ None => {
+ base.init();
+ }
+ }
+ }
+ }
tracing::trace!(command = ?self, "client command");
diff --git a/crates/atuin/src/command/client/daemon.rs b/crates/atuin/src/command/client/daemon.rs
index a92e8f8e..64547505 100644
--- a/crates/atuin/src/command/client/daemon.rs
+++ b/crates/atuin/src/command/client/daemon.rs
@@ -9,10 +9,7 @@ use std::time::{Duration, Instant};
use atuin_client::{
database::Sqlite, history::History, record::sqlite_store::SqliteStore, settings::Settings,
};
-use atuin_daemon::{
- client::{DaemonClientErrorKind, HistoryClient, classify_error},
- server::listen,
-};
+use atuin_daemon::client::{DaemonClientErrorKind, HistoryClient, classify_error};
use clap::Subcommand;
#[cfg(unix)]
use daemonize::Daemonize;
@@ -26,6 +23,10 @@ pub struct Cmd {
#[arg(long, hide = true)]
daemonize: bool,
+ /// Also write daemon logs to the console (useful for debugging)
+ #[arg(long)]
+ show_logs: bool,
+
#[command(subcommand)]
subcmd: Option<SubCmd>,
}
@@ -37,6 +38,14 @@ pub enum SubCmd {
Start {
#[arg(long, hide = true)]
daemonize: bool,
+
+ /// Also write daemon logs to the console (useful for debugging)
+ #[arg(long)]
+ show_logs: bool,
+
+ /// Force start: kill existing daemon process and reset the socket
+ #[arg(long)]
+ force: bool,
},
/// Show the daemon's current status
@@ -55,12 +64,21 @@ impl Cmd {
#[cfg(unix)]
pub fn should_daemonize(&self) -> bool {
match &self.subcmd {
- Some(SubCmd::Start { daemonize }) => *daemonize,
+ Some(SubCmd::Start { daemonize, .. }) => *daemonize,
None => self.daemonize,
_ => false,
}
}
+ /// Returns `true` when logs should also be written to the console.
+ pub fn show_logs(&self) -> bool {
+ match &self.subcmd {
+ Some(SubCmd::Start { show_logs, .. }) => *show_logs,
+ None => self.show_logs,
+ _ => false,
+ }
+ }
+
pub async fn run(
self,
settings: Settings,
@@ -70,9 +88,9 @@ impl Cmd {
match self.subcmd {
None => {
eprintln!("Warning: `atuin daemon` is deprecated, use `atuin daemon start`");
- run(settings, store, history_db).await
+ run(settings, store, history_db, false).await
}
- Some(SubCmd::Start { .. }) => run(settings, store, history_db).await,
+ Some(SubCmd::Start { force, .. }) => run(settings, store, history_db, force).await,
Some(SubCmd::Status) => status_cmd(&settings).await,
Some(SubCmd::Stop) => stop_cmd(&settings).await,
Some(SubCmd::Restart) => restart_cmd(&settings).await,
@@ -547,15 +565,82 @@ pub fn daemonize_current_process() -> Result<()> {
Ok(())
}
-async fn run(settings: Settings, store: SqliteStore, history_db: Sqlite) -> Result<()> {
+async fn run(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: Sqlite,
+ force: bool,
+) -> Result<()> {
+ if force {
+ force_cleanup(&settings);
+ }
+
let pidfile_path = PathBuf::from(&settings.daemon.pidfile_path);
let _pidfile_guard = PidfileGuard::acquire(&pidfile_path)?;
- listen(settings, store, history_db).await?;
+ atuin_daemon::boot(settings, store, history_db).await?;
Ok(())
}
+/// Force cleanup: kill existing daemon process and remove socket.
+fn force_cleanup(settings: &Settings) {
+ let pidfile_path = Path::new(&settings.daemon.pidfile_path);
+
+ // Read and kill the existing process if pidfile exists
+ if pidfile_path.exists() {
+ if let Ok(contents) = fs::read_to_string(pidfile_path)
+ && let Some(pid_str) = contents.lines().next()
+ && let Ok(pid) = pid_str.parse::<u32>()
+ {
+ kill_process(pid);
+ // Give it a moment to release resources
+ std::thread::sleep(Duration::from_millis(100));
+ }
+
+ // Remove the pidfile
+ if let Err(e) = fs::remove_file(pidfile_path)
+ && e.kind() != ErrorKind::NotFound
+ {
+ tracing::warn!("failed to remove pidfile: {e}");
+ }
+ }
+
+ // Remove the socket file
+ #[cfg(unix)]
+ {
+ let socket_path = Path::new(&settings.daemon.socket_path);
+ if socket_path.exists()
+ && let Err(e) = fs::remove_file(socket_path)
+ && e.kind() != ErrorKind::NotFound
+ {
+ tracing::warn!("failed to remove socket: {e}");
+ }
+ }
+}
+
+/// Kill a process by PID.
+#[cfg(unix)]
+fn kill_process(pid: u32) {
+ // Use kill command to send SIGTERM for graceful shutdown
+ let _ = Command::new("kill")
+ .args(["-TERM", &pid.to_string()])
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status();
+}
+
+/// Kill a process by PID.
+#[cfg(not(unix))]
+fn kill_process(pid: u32) {
+ // On Windows, use taskkill
+ let _ = Command::new("taskkill")
+ .args(["/PID", &pid.to_string(), "/F"])
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status();
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/atuin/src/command/client/history.rs b/crates/atuin/src/command/client/history.rs
index c20f64a3..fe9a7e32 100644
--- a/crates/atuin/src/command/client/history.rs
+++ b/crates/atuin/src/command/client/history.rs
@@ -10,6 +10,9 @@ use clap::Subcommand;
use eyre::{Context, Result};
use runtime_format::{FormatKey, FormatKeyError, ParseSegment, ParsedFmt};
+#[cfg(feature = "daemon")]
+use atuin_daemon::emit_event;
+
use atuin_client::{
database::{Database, Sqlite, current_context},
encryption,
@@ -624,6 +627,9 @@ impl Cmd {
db.delete(entry.clone()).await?;
}
}
+
+ #[cfg(feature = "daemon")]
+ let _ = emit_event(atuin_daemon::DaemonEvent::HistoryPruned).await;
}
Ok(())
}
@@ -670,6 +676,9 @@ impl Cmd {
let host_id = Settings::host_id().await?;
let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+ #[cfg(feature = "daemon")]
+ let ids = matches.iter().map(|h| h.id.clone()).collect::<Vec<_>>();
+
for entry in matches {
eprintln!("deleting {}", entry.id);
if settings.sync.records {
@@ -679,6 +688,9 @@ impl Cmd {
db.delete(entry).await?;
}
}
+
+ #[cfg(feature = "daemon")]
+ let _ = emit_event(atuin_daemon::DaemonEvent::HistoryDeleted { ids }).await;
}
Ok(())
}
diff --git a/crates/atuin/src/command/client/search.rs b/crates/atuin/src/command/client/search.rs
index 70a25ed9..3f3687b8 100644
--- a/crates/atuin/src/command/client/search.rs
+++ b/crates/atuin/src/command/client/search.rs
@@ -141,6 +141,11 @@ pub struct Cmd {
}
impl Cmd {
+ /// Returns true if this search command will run in interactive (TUI) mode
+ pub fn is_interactive(&self) -> bool {
+ self.interactive
+ }
+
// clippy: please write this instead
// clippy: now it has too many lines
// me: I'll do it later OKAY
diff --git a/crates/atuin/src/command/client/search/engines.rs b/crates/atuin/src/command/client/search/engines.rs
index 5c53817e..8cbee0c3 100644
--- a/crates/atuin/src/command/client/search/engines.rs
+++ b/crates/atuin/src/command/client/search/engines.rs
@@ -8,12 +8,22 @@ use eyre::Result;
use super::cursor::Cursor;
+#[cfg(feature = "daemon")]
+pub mod daemon;
pub mod db;
pub mod skim;
-pub fn engine(search_mode: SearchMode) -> Box<dyn SearchEngine> {
+#[allow(unused)] // settings is only used if daemon feature is enabled
+pub fn engine(search_mode: SearchMode, settings: &Settings) -> Box<dyn SearchEngine> {
match search_mode {
SearchMode::Skim => Box::new(skim::Search::new()) as Box<_>,
+ #[cfg(feature = "daemon")]
+ SearchMode::DaemonFuzzy => Box::new(daemon::Search::new(settings)) as Box<_>,
+ #[cfg(not(feature = "daemon"))]
+ SearchMode::DaemonFuzzy => {
+ // Fall back to fuzzy mode if daemon feature is not enabled
+ Box::new(db::Search(SearchMode::Fuzzy)) as Box<_>
+ }
mode => Box::new(db::Search(mode)) as Box<_>,
}
}
diff --git a/crates/atuin/src/command/client/search/engines/daemon.rs b/crates/atuin/src/command/client/search/engines/daemon.rs
new file mode 100644
index 00000000..d317a4f6
--- /dev/null
+++ b/crates/atuin/src/command/client/search/engines/daemon.rs
@@ -0,0 +1,206 @@
+use async_trait::async_trait;
+use atuin_client::{
+ database::{Database, OptFilters},
+ history::History,
+ settings::{SearchMode, Settings},
+};
+use atuin_daemon::client::SearchClient;
+use eyre::Result;
+use nucleo_matcher::{
+ Config, Matcher, Utf32Str,
+ pattern::{CaseMatching, Normalization, Pattern},
+};
+use tracing::{Level, debug, instrument, span};
+use uuid::Uuid;
+
+use super::{SearchEngine, SearchState};
+
+pub struct Search {
+ client: Option<SearchClient>,
+ query_id: u64,
+ socket_path: String,
+ #[cfg(not(unix))]
+ tcp_port: u64,
+}
+
+impl Search {
+ pub fn new(settings: &Settings) -> Self {
+ Search {
+ client: None,
+ query_id: 0,
+ socket_path: settings.daemon.socket_path.clone(),
+ #[cfg(not(unix))]
+ tcp_port: settings.daemon.tcp_port,
+ }
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "get_daemon_client")]
+ async fn get_client(&mut self) -> Result<&mut SearchClient> {
+ if self.client.is_none() {
+ #[cfg(unix)]
+ let client = SearchClient::new(self.socket_path.clone()).await?;
+
+ #[cfg(not(unix))]
+ let client = SearchClient::new(self.tcp_port).await?;
+
+ self.client = Some(client);
+ }
+ Ok(self.client.as_mut().unwrap())
+ }
+
+ fn next_query_id(&mut self) -> u64 {
+ self.query_id += 1;
+ self.query_id
+ }
+
+ /// Check if query contains regex pattern (r/.../)
+ /// Nucleo doesn't support regex, so we fall back to database search
+ fn contains_regex_pattern(query: &str) -> bool {
+ query.starts_with("r/") || query.contains(" r/")
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_db_fallback")]
+ async fn fallback_to_db_search(
+ &self,
+ state: &SearchState,
+ db: &dyn Database,
+ ) -> Result<Vec<History>> {
+ let results = db
+ .search(
+ SearchMode::FullText,
+ state.filter_mode,
+ &state.context,
+ state.input.as_str(),
+ OptFilters {
+ limit: Some(200),
+ ..Default::default()
+ },
+ )
+ .await
+ .map_or(Vec::new(), |r| r.into_iter().collect());
+ Ok(results)
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "hydrate_from_db", fields(count = ids.len()))]
+ async fn hydrate_from_db(&self, db: &dyn Database, ids: &[String]) -> Result<Vec<History>> {
+ let placeholders: Vec<String> = ids.iter().map(|id| format!("'{id}'")).collect();
+ let sql_query = format!(
+ "SELECT * FROM history WHERE id IN ({}) ORDER BY timestamp DESC",
+ placeholders.join(",")
+ );
+ Ok(db.query_history(&sql_query).await?)
+ }
+}
+
+#[async_trait]
+impl SearchEngine for Search {
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_search", fields(query = %state.input.as_str()))]
+ async fn full_query(
+ &mut self,
+ state: &SearchState,
+ db: &mut dyn Database,
+ ) -> Result<Vec<History>> {
+ let query = state.input.as_str().to_string();
+
+ // Fall back to database for regex queries (Nucleo doesn't support regex)
+ if Self::contains_regex_pattern(&query) {
+ debug!(query = %query, "[daemon-client] regex detected, falling back to db");
+ return self.fallback_to_db_search(state, db).await;
+ }
+
+ let query_id = self.next_query_id();
+
+ let span =
+ span!(Level::TRACE, "daemon_search.req_resp", query = %query, query_id = query_id);
+
+ let client = self.get_client().await?;
+
+ let _span = span.enter();
+ let mut stream = client
+ .search(
+ query.clone(),
+ query_id,
+ state.filter_mode,
+ Some(state.context.clone()),
+ )
+ .await?;
+
+ let mut ids = Vec::with_capacity(200);
+ span!(Level::TRACE, "daemon_search.resp")
+ .in_scope(async || {
+ while let Ok(Some(response)) = stream.message().await {
+ let span2 = span!(
+ Level::TRACE,
+ "daemon_search.resp.item",
+ query_id = response.query_id
+ );
+ let _span2 = span2.enter();
+ // Only process if the query_id matches (prevents stale responses)
+ if response.query_id == query_id {
+ let uuids = response
+ .ids
+ .iter()
+ .map(|id| {
+ let bytes: [u8; 16] =
+ id.as_slice().try_into().expect("id should be 16 bytes");
+ Uuid::from_bytes(bytes).as_simple().to_string()
+ })
+ .collect::<Vec<_>>();
+ ids.extend(uuids);
+ }
+ drop(_span2);
+ drop(span2);
+ }
+ })
+ .await;
+ drop(_span);
+ drop(span);
+
+ if ids.is_empty() {
+ debug!(query = %query, results = 0, "[daemon-client] empty results");
+ return Ok(Vec::new());
+ }
+
+ // // Hydrate from local database
+ let results = self.hydrate_from_db(db, &ids).await?;
+
+ // // Reorder results to match the order from the daemon (which is ranked by relevance)
+ let ordered_results = span!(Level::TRACE, "reorder_results").in_scope(|| {
+ let mut ordered_results = Vec::with_capacity(results.len());
+ for id in &ids {
+ if let Some(history) = results.iter().find(|h| h.id.0 == *id) {
+ ordered_results.push(history.clone());
+ }
+ }
+ ordered_results
+ });
+
+ debug!(
+ query = %query,
+ results = results.len(),
+ "[daemon-client]"
+ );
+
+ Ok(ordered_results)
+ }
+
+ #[instrument(skip_all, level = Level::TRACE, name = "daemon_highlight")]
+ fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec<usize> {
+ // Use fulltext highlighting for regex queries
+ if Self::contains_regex_pattern(search_input) {
+ return super::db::get_highlight_indices_fulltext(command, search_input);
+ }
+
+ let mut matcher = Matcher::new(Config::DEFAULT);
+ let pattern = Pattern::parse(search_input, CaseMatching::Smart, Normalization::Smart);
+
+ let mut indices: Vec<u32> = Vec::new();
+ let mut haystack_buf = Vec::new();
+
+ let haystack = Utf32Str::new(command, &mut haystack_buf);
+ pattern.indices(haystack, &mut matcher, &mut indices);
+
+ // Convert u32 indices to usize
+ indices.into_iter().map(|i| i as usize).collect()
+ }
+}
diff --git a/crates/atuin/src/command/client/search/engines/db.rs b/crates/atuin/src/command/client/search/engines/db.rs
index f0ed424e..476462f5 100644
--- a/crates/atuin/src/command/client/search/engines/db.rs
+++ b/crates/atuin/src/command/client/search/engines/db.rs
@@ -11,17 +11,19 @@ use eyre::Result;
use norm::Metric;
use norm::fzf::{FzfParser, FzfV2};
use std::ops::Range;
+use tracing::{Level, instrument};
pub struct Search(pub SearchMode);
#[async_trait]
impl SearchEngine for Search {
+ #[instrument(skip_all, level = Level::TRACE, name = "db_search", fields(mode = ?self.0, query = %state.input.as_str()))]
async fn full_query(
&mut self,
state: &SearchState,
db: &mut dyn Database,
) -> Result<Vec<History>> {
- Ok(db
+ let results = db
.search(
self.0,
state.filter_mode,
@@ -34,9 +36,11 @@ impl SearchEngine for Search {
)
.await
// ignore errors as it may be caused by incomplete regex
- .map_or(Vec::new(), |r| r.into_iter().collect()))
+ .map_or(Vec::new(), |r| r.into_iter().collect());
+ Ok(results)
}
+ #[instrument(skip_all, level = Level::TRACE, name = "db_highlight")]
fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec<usize> {
if self.0 == SearchMode::Prefix {
return vec![];
@@ -54,7 +58,8 @@ impl SearchEngine for Search {
}
}
-fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec<usize> {
+#[instrument(skip_all, level = Level::TRACE, name = "db_highlight_fulltext")]
+pub fn get_highlight_indices_fulltext(command: &str, search_input: &str) -> Vec<usize> {
let mut ranges = vec![];
let lower_command = command.to_ascii_lowercase();
diff --git a/crates/atuin/src/command/client/search/engines/skim.rs b/crates/atuin/src/command/client/search/engines/skim.rs
index cb7ce24f..7d9feb40 100644
--- a/crates/atuin/src/command/client/search/engines/skim.rs
+++ b/crates/atuin/src/command/client/search/engines/skim.rs
@@ -7,6 +7,7 @@ use fuzzy_matcher::{FuzzyMatcher, skim::SkimMatcherV2};
use itertools::Itertools;
use time::OffsetDateTime;
use tokio::task::yield_now;
+use tracing::{Level, instrument, warn};
use uuid;
use super::{SearchEngine, SearchState};
@@ -27,18 +28,20 @@ impl Search {
#[async_trait]
impl SearchEngine for Search {
+ #[instrument(skip_all, level = Level::TRACE, name = "skim_search", fields(query = %state.input.as_str()))]
async fn full_query(
&mut self,
state: &SearchState,
db: &mut dyn Database,
) -> Result<Vec<History>> {
if self.all_history.is_empty() {
- self.all_history = db.all_with_count().await.unwrap();
+ self.all_history = load_all_history(db).await;
}
Ok(fuzzy_search(&self.engine, state, &self.all_history).await)
}
+ #[instrument(skip_all, level = Level::TRACE, name = "skim_highlight")]
fn get_highlight_indices(&self, command: &str, search_input: &str) -> Vec<usize> {
let (_, indices) = self
.engine
@@ -48,7 +51,13 @@ impl SearchEngine for Search {
}
}
+#[instrument(skip_all, level = Level::TRACE, name = "load_all_history")]
+async fn load_all_history(db: &dyn Database) -> Vec<(History, i32)> {
+ db.all_with_count().await.unwrap()
+}
+
#[allow(clippy::too_many_lines)]
+#[instrument(skip_all, level = Level::TRACE, name = "fuzzy_match", fields(history_count = all_history.len()))]
async fn fuzzy_search(
engine: &SkimMatcherV2,
state: &SearchState,
@@ -97,11 +106,11 @@ async fn fuzzy_search(
if !is_current_session {
let Ok(uuid) = uuid::Uuid::parse_str(&context.session) else {
- log::warn!("failed to parse session id '{}'", context.session);
+ warn!("failed to parse session id '{}'", context.session);
continue;
};
let Some(timestamp) = uuid.get_timestamp() else {
- log::warn!(
+ warn!(
"failed to get timestamp from uuid '{}'",
uuid.as_hyphenated()
);
@@ -111,7 +120,7 @@ async fn fuzzy_search(
let Ok(session_start) = time::OffsetDateTime::from_unix_timestamp_nanos(
i128::from(seconds) * 1_000_000_000 + i128::from(nanos),
) else {
- log::warn!(
+ warn!(
"failed to create OffsetDateTime from second: {seconds}, nanosecond: {nanos}"
);
continue;
diff --git a/crates/atuin/src/command/client/search/interactive.rs b/crates/atuin/src/command/client/search/interactive.rs
index c6a6064a..729c80ce 100644
--- a/crates/atuin/src/command/client/search/interactive.rs
+++ b/crates/atuin/src/command/client/search/interactive.rs
@@ -657,7 +657,7 @@ impl State {
Action::CycleSearchMode => {
self.switched_search_mode = true;
self.search_mode = self.search_mode.next(settings);
- self.engine = engines::engine(self.search_mode);
+ self.engine = engines::engine(self.search_mode, settings);
InputAction::Continue
}
Action::SwitchContext => {
@@ -1419,7 +1419,7 @@ pub async fn history(
context: initial_context.clone(),
custom_context: None,
},
- engine: engines::engine(search_mode),
+ engine: engines::engine(search_mode, settings),
results_len: 0,
accept: false,
keymap_mode: match settings.keymap_mode {
@@ -1875,7 +1875,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -1930,7 +1930,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2049,7 +2049,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2108,7 +2108,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2163,7 +2163,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2214,7 +2214,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2274,7 +2274,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
@@ -2335,7 +2335,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
state.results_state.select(selected);
@@ -2714,7 +2714,7 @@ mod tests {
},
custom_context: None,
},
- engine: engines::engine(SearchMode::Fuzzy),
+ engine: engines::engine(SearchMode::Fuzzy, &settings),
now: Box::new(OffsetDateTime::now_utc),
};
diff --git a/crates/atuin/src/command/client/store/rebuild.rs b/crates/atuin/src/command/client/store/rebuild.rs
index 8acec531..a98f8142 100644
--- a/crates/atuin/src/command/client/store/rebuild.rs
+++ b/crates/atuin/src/command/client/store/rebuild.rs
@@ -3,6 +3,9 @@ use atuin_scripts::store::ScriptStore;
use clap::Args;
use eyre::{Result, bail};
+#[cfg(feature = "daemon")]
+use atuin_daemon::emit_event;
+
use atuin_client::{
database::Database, encryption, history::store::HistoryStore,
record::sqlite_store::SqliteStore, settings::Settings,
@@ -57,6 +60,9 @@ impl Rebuild {
history_store.build(database).await?;
+ #[cfg(feature = "daemon")]
+ let _ = emit_event(atuin_daemon::DaemonEvent::HistoryRebuilt).await;
+
Ok(())
}