From 5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 Mon Sep 17 00:00:00 2001 From: Benedikt Peetz Date: Thu, 11 Jun 2026 00:54:30 +0200 Subject: chore: Move everything into one big crate That helps remove duplicated code and rustc/cargo will now also show dead code correctly. --- crates/turtle/src/atuin_daemon/client.rs | 418 +++++++++++++++++++++++++++++++ 1 file changed, 418 insertions(+) create mode 100644 crates/turtle/src/atuin_daemon/client.rs (limited to 'crates/turtle/src/atuin_daemon/client.rs') diff --git a/crates/turtle/src/atuin_daemon/client.rs b/crates/turtle/src/atuin_daemon/client.rs new file mode 100644 index 00000000..45ef19e9 --- /dev/null +++ b/crates/turtle/src/atuin_daemon/client.rs @@ -0,0 +1,418 @@ +use crate::atuin_client::database::Context; +use crate::atuin_client::settings::{FilterMode, Settings}; +use eyre::{Context as EyreContext, Result}; +use tonic::Code; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; + +use hyper_util::rt::TokioIo; + +#[cfg(unix)] +use tokio::net::UnixStream; + +use crate::atuin_client::history::History; +use tracing::{Level, instrument, span}; + +use crate::atuin_daemon::control::HistoryRebuiltEvent; +use crate::atuin_daemon::control::{ + ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest, + SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient, +}; +use crate::atuin_daemon::events::DaemonEvent; +use crate::atuin_daemon::history::{ + EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, + StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, + history_client::HistoryClient as HistoryServiceClient, +}; +use crate::atuin_daemon::search::{ + FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, + search_client::SearchClient as SearchServiceClient, +}; +use crate::atuin_daemon::semantic::{ + CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply, + semantic_client::SemanticClient as SemanticServiceClient, +}; + +pub struct HistoryClient { + client: HistoryServiceClient, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum DaemonClientErrorKind { + Connect, + Unavailable, + Unimplemented, + Other, +} + +#[must_use] +pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind { + for cause in error.chain() { + if cause.downcast_ref::().is_some() { + return DaemonClientErrorKind::Connect; + } + + if let Some(status) = cause.downcast_ref::() { + return match status.code() { + Code::Unavailable => DaemonClientErrorKind::Unavailable, + Code::Unimplemented => DaemonClientErrorKind::Unimplemented, + _ => DaemonClientErrorKind::Other, + }; + } + } + + DaemonClientErrorKind::Other +} + +// Wrap the grpc client +impl HistoryClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result { + use eyre::Context; + + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = HistoryServiceClient::new(channel); + + Ok(HistoryClient { client }) + } + + pub async fn start_history(&mut self, h: History) -> Result { + let req = StartHistoryRequest { + command: h.command, + cwd: h.cwd, + hostname: h.hostname, + session: h.session, + timestamp: h.timestamp.unix_timestamp_nanos() as u64, + author: h.author, + intent: h.intent.unwrap_or_default(), + }; + + Ok(self.client.start_history(req).await?.into_inner()) + } + + pub async fn end_history( + &mut self, + id: String, + duration: u64, + exit: i64, + ) -> Result { + let req = EndHistoryRequest { id, duration, exit }; + + Ok(self.client.end_history(req).await?.into_inner()) + } + + pub async fn status(&mut self) -> Result { + Ok(self.client.status(StatusRequest {}).await?.into_inner()) + } + + pub async fn tail_history(&mut self) -> Result> { + Ok(self + .client + .tail_history(TailHistoryRequest {}) + .await? + .into_inner()) + } + + pub async fn shutdown(&mut self) -> Result { + let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner(); + Ok(resp.accepted) + } +} + +pub struct SearchClient { + client: SearchServiceClient, +} + +impl SearchClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SearchServiceClient::new(channel); + + Ok(SearchClient { client }) + } + + #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))] + pub async fn search( + &mut self, + query: String, + query_id: u64, + filter_mode: FilterMode, + context: Option, + ) -> Result> { + let request = SearchRequest { + query, + query_id, + filter_mode: RpcFilterMode::from(filter_mode).into(), + context: context.map(RpcSearchContext::from), + }; + let request_stream = tokio_stream::once(request); + let response = span!(Level::TRACE, "daemon_client_search.request") + .in_scope(async || self.client.search(request_stream).await) + .await?; + + Ok(response.into_inner()) + } +} + +impl From for RpcFilterMode { + fn from(filter_mode: FilterMode) -> Self { + match filter_mode { + FilterMode::Global => RpcFilterMode::Global, + FilterMode::Host => RpcFilterMode::Host, + FilterMode::Session => RpcFilterMode::Session, + FilterMode::Directory => RpcFilterMode::Directory, + FilterMode::Workspace => RpcFilterMode::Workspace, + FilterMode::SessionPreload => RpcFilterMode::SessionPreload, + } + } +} + +impl From for RpcSearchContext { + fn from(context: Context) -> Self { + RpcSearchContext { + session_id: context.session, + cwd: context.cwd, + hostname: context.hostname, + host_id: context.host_id, + git_root: context + .git_root + .map(|path| path.to_string_lossy().to_string()), + } + } +} + +pub struct SemanticClient { + client: SemanticServiceClient, +} + +impl SemanticClient { + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = SemanticServiceClient::new(channel); + + Ok(SemanticClient { client }) + } + + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.socket_path.clone()).await + } + + pub async fn record_commands( + &mut self, + captures: Vec, + ) -> Result { + let stream = tokio_stream::iter(captures); + Ok(self.client.record_commands(stream).await?.into_inner()) + } + + pub async fn command_output( + &mut self, + history_id: String, + ranges: Vec<(i64, i64)>, + ) -> Result { + let request = CommandOutputRequest { + history_id, + ranges: ranges + .into_iter() + .map(|(start, end)| OutputRange { start, end }) + .collect(), + }; + + Ok(self.client.command_output(request).await?.into_inner()) + } +} + +// ============================================================================ +// Control Client +// ============================================================================ + +/// Client for the Control gRPC service. +/// +/// Used to inject events into a running daemon from external processes. +pub struct ControlClient { + client: ControlServiceClient, +} + +impl ControlClient { + /// Connect to the daemon's control service. + #[cfg(unix)] + pub async fn new(path: String) -> Result { + let log_path = path.clone(); + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.clone(); + + async move { + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?)) + } + })) + .await + .wrap_err_with(|| { + format!( + "failed to connect to local atuin daemon at {}. Is it running?", + &log_path + ) + })?; + + let client = ControlServiceClient::new(channel); + + Ok(ControlClient { client }) + } + + /// Connect using settings. + #[cfg(unix)] + pub async fn from_settings(settings: &Settings) -> Result { + Self::new(settings.daemon.socket_path.clone()).await + } + + /// Send an event to the daemon. + pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> { + let proto_event = daemon_event_to_proto(event); + let request = SendEventRequest { + event: Some(proto_event), + }; + self.client.send_event(request).await?; + Ok(()) + } +} + +/// Convert a daemon event to its proto representation. +fn daemon_event_to_proto( + event: DaemonEvent, +) -> crate::atuin_daemon::control::send_event_request::Event { + use crate::atuin_daemon::control::send_event_request::Event; + + match event { + DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}), + DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}), + DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent { + ids: ids.into_iter().map(|id| id.0).collect(), + }), + DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}), + DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}), + DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}), + // These events are internal and not sent via the control service + DaemonEvent::HistoryStarted(_) + | DaemonEvent::HistoryEnded(_) + | DaemonEvent::RecordsAdded(_) + | DaemonEvent::SyncCompleted { .. } + | DaemonEvent::SyncFailed { .. } => { + // Use shutdown as a fallback, though this shouldn't happen + tracing::warn!("attempted to send internal event via control service"); + Event::Shutdown(ShutdownEvent {}) + } + } +} + +// ============================================================================ +// Convenience Functions +// ============================================================================ + +/// Emit an event to the daemon. +/// +/// This is a fire-and-forget helper for sending events to the daemon from +/// external processes like CLI commands. If the daemon isn't running, this +/// will silently succeed (returns Ok). +/// +/// # Example +/// +/// ```ignore +/// // After pruning history +/// emit_event(DaemonEvent::HistoryPruned).await?; +/// +/// // After deleting specific history items +/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?; +/// +/// // Request immediate sync +/// emit_event(DaemonEvent::ForceSync).await?; +/// ``` +pub async fn emit_event(event: DaemonEvent) -> Result<()> { + emit_event_with_settings(event, None).await +} + +/// Emit an event to the daemon with explicit settings. +/// +/// If settings are not provided, they will be loaded from the default location. +/// If the daemon isn't running, this will silently succeed. +pub async fn emit_event_with_settings( + event: DaemonEvent, + settings: Option<&Settings>, +) -> Result<()> { + // Load settings if not provided + let owned_settings; + let settings = match settings { + Some(s) => s, + None => { + owned_settings = Settings::new()?; + &owned_settings + } + }; + + // Try to connect - if daemon isn't running, that's fine + let mut client = match ControlClient::from_settings(settings).await { + Ok(c) => c, + Err(e) => { + tracing::debug!(?e, "daemon not running, skipping event emission"); + return Ok(()); + } + }; + + // Send the event + if let Err(e) = client.send_event(event).await { + tracing::debug!(?e, "failed to send event to daemon"); + // Don't fail - this is fire-and-forget + } + + Ok(()) +} -- cgit v1.3.1