aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/client.rs
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/atuin-daemon/src/client.rs
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/atuin-daemon/src/client.rs')
-rw-r--r--crates/atuin-daemon/src/client.rs518
1 files changed, 0 insertions, 518 deletions
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
deleted file mode 100644
index c18e0e46..00000000
--- a/crates/atuin-daemon/src/client.rs
+++ /dev/null
@@ -1,518 +0,0 @@
-use atuin_client::database::Context;
-use atuin_client::settings::{FilterMode, Settings};
-use eyre::{Context as EyreContext, Result};
-#[cfg(windows)]
-use tokio::net::TcpStream;
-use tonic::Code;
-use tonic::transport::{Channel, Endpoint, Uri};
-use tower::service_fn;
-
-use hyper_util::rt::TokioIo;
-
-#[cfg(unix)]
-use tokio::net::UnixStream;
-
-use atuin_client::history::History;
-use tracing::{Level, instrument, span};
-
-use crate::control::HistoryRebuiltEvent;
-use crate::control::{
- ForceSyncEvent, HistoryDeletedEvent, HistoryPrunedEvent, SendEventRequest,
- SettingsReloadedEvent, ShutdownEvent, control_client::ControlClient as ControlServiceClient,
-};
-use crate::events::DaemonEvent;
-use crate::history::{
- EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
- StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest,
- history_client::HistoryClient as HistoryServiceClient,
-};
-use crate::search::{
- FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
- search_client::SearchClient as SearchServiceClient,
-};
-use crate::semantic::{
- CommandCapture, CommandOutputReply, CommandOutputRequest, OutputRange, RecordCommandsReply,
- semantic_client::SemanticClient as SemanticServiceClient,
-};
-
-pub struct HistoryClient {
- client: HistoryServiceClient<Channel>,
-}
-
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub enum DaemonClientErrorKind {
- Connect,
- Unavailable,
- Unimplemented,
- Other,
-}
-
-#[must_use]
-pub fn classify_error(error: &eyre::Report) -> DaemonClientErrorKind {
- for cause in error.chain() {
- if cause.downcast_ref::<tonic::transport::Error>().is_some() {
- return DaemonClientErrorKind::Connect;
- }
-
- if let Some(status) = cause.downcast_ref::<tonic::Status>() {
- return match status.code() {
- Code::Unavailable => DaemonClientErrorKind::Unavailable,
- Code::Unimplemented => DaemonClientErrorKind::Unimplemented,
- _ => DaemonClientErrorKind::Other,
- };
- }
- }
-
- DaemonClientErrorKind::Other
-}
-
-// Wrap the grpc client
-impl HistoryClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- use eyre::Context;
-
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = HistoryServiceClient::new(channel);
-
- Ok(HistoryClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = HistoryServiceClient::new(channel);
-
- Ok(HistoryClient { client })
- }
-
- pub async fn start_history(&mut self, h: History) -> Result<StartHistoryReply> {
- let req = StartHistoryRequest {
- command: h.command,
- cwd: h.cwd,
- hostname: h.hostname,
- session: h.session,
- timestamp: h.timestamp.unix_timestamp_nanos() as u64,
- author: h.author,
- intent: h.intent.unwrap_or_default(),
- };
-
- Ok(self.client.start_history(req).await?.into_inner())
- }
-
- pub async fn end_history(
- &mut self,
- id: String,
- duration: u64,
- exit: i64,
- ) -> Result<EndHistoryReply> {
- let req = EndHistoryRequest { id, duration, exit };
-
- Ok(self.client.end_history(req).await?.into_inner())
- }
-
- pub async fn status(&mut self) -> Result<StatusReply> {
- Ok(self.client.status(StatusRequest {}).await?.into_inner())
- }
-
- pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> {
- Ok(self
- .client
- .tail_history(TailHistoryRequest {})
- .await?
- .into_inner())
- }
-
- pub async fn shutdown(&mut self) -> Result<bool> {
- let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner();
- Ok(resp.accepted)
- }
-}
-
-pub struct SearchClient {
- client: SearchServiceClient<Channel>,
-}
-
-impl SearchClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = SearchServiceClient::new(channel);
-
- Ok(SearchClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = SearchServiceClient::new(channel);
-
- Ok(SearchClient { client })
- }
-
- #[instrument(skip_all, level = Level::TRACE, name = "daemon_client_search", fields(query = %query, query_id = query_id))]
- pub async fn search(
- &mut self,
- query: String,
- query_id: u64,
- filter_mode: FilterMode,
- context: Option<Context>,
- ) -> Result<tonic::Streaming<SearchResponse>> {
- let request = SearchRequest {
- query,
- query_id,
- filter_mode: RpcFilterMode::from(filter_mode).into(),
- context: context.map(RpcSearchContext::from),
- };
- let request_stream = tokio_stream::once(request);
- let response = span!(Level::TRACE, "daemon_client_search.request")
- .in_scope(async || self.client.search(request_stream).await)
- .await?;
-
- Ok(response.into_inner())
- }
-}
-
-impl From<FilterMode> for RpcFilterMode {
- fn from(filter_mode: FilterMode) -> Self {
- match filter_mode {
- FilterMode::Global => RpcFilterMode::Global,
- FilterMode::Host => RpcFilterMode::Host,
- FilterMode::Session => RpcFilterMode::Session,
- FilterMode::Directory => RpcFilterMode::Directory,
- FilterMode::Workspace => RpcFilterMode::Workspace,
- FilterMode::SessionPreload => RpcFilterMode::SessionPreload,
- }
- }
-}
-
-impl From<Context> for RpcSearchContext {
- fn from(context: Context) -> Self {
- RpcSearchContext {
- session_id: context.session,
- cwd: context.cwd,
- hostname: context.hostname,
- host_id: context.host_id,
- git_root: context
- .git_root
- .map(|path| path.to_string_lossy().to_string()),
- }
- }
-}
-
-pub struct SemanticClient {
- client: SemanticServiceClient<Channel>,
-}
-
-impl SemanticClient {
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = SemanticServiceClient::new(channel);
-
- Ok(SemanticClient { client })
- }
-
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = SemanticServiceClient::new(channel);
-
- Ok(SemanticClient { client })
- }
-
- #[cfg(unix)]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.socket_path.clone()).await
- }
-
- #[cfg(not(unix))]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.tcp_port).await
- }
-
- pub async fn record_commands(
- &mut self,
- captures: Vec<CommandCapture>,
- ) -> Result<RecordCommandsReply> {
- let stream = tokio_stream::iter(captures);
- Ok(self.client.record_commands(stream).await?.into_inner())
- }
-
- pub async fn command_output(
- &mut self,
- history_id: String,
- ranges: Vec<(i64, i64)>,
- ) -> Result<CommandOutputReply> {
- let request = CommandOutputRequest {
- history_id,
- ranges: ranges
- .into_iter()
- .map(|(start, end)| OutputRange { start, end })
- .collect(),
- };
-
- Ok(self.client.command_output(request).await?.into_inner())
- }
-}
-
-// ============================================================================
-// Control Client
-// ============================================================================
-
-/// Client for the Control gRPC service.
-///
-/// Used to inject events into a running daemon from external processes.
-pub struct ControlClient {
- client: ControlServiceClient<Channel>,
-}
-
-impl ControlClient {
- /// Connect to the daemon's control service.
- #[cfg(unix)]
- pub async fn new(path: String) -> Result<Self> {
- let log_path = path.clone();
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let path = path.clone();
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at {}. Is it running?",
- &log_path
- )
- })?;
-
- let client = ControlServiceClient::new(channel);
-
- Ok(ControlClient { client })
- }
-
- /// Connect to the daemon's control service.
- #[cfg(not(unix))]
- pub async fn new(port: u64) -> Result<Self> {
- let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
- .connect_with_connector(service_fn(move |_: Uri| {
- let url = format!("127.0.0.1:{port}");
-
- async move {
- Ok::<_, std::io::Error>(TokioIo::new(TcpStream::connect(url.clone()).await?))
- }
- }))
- .await
- .wrap_err_with(|| {
- format!(
- "failed to connect to local atuin daemon at 127.0.0.1:{port}. Is it running?"
- )
- })?;
-
- let client = ControlServiceClient::new(channel);
-
- Ok(ControlClient { client })
- }
-
- /// Connect using settings.
- #[cfg(unix)]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.socket_path.clone()).await
- }
-
- /// Connect using settings.
- #[cfg(not(unix))]
- pub async fn from_settings(settings: &Settings) -> Result<Self> {
- Self::new(settings.daemon.tcp_port).await
- }
-
- /// Send an event to the daemon.
- pub async fn send_event(&mut self, event: DaemonEvent) -> Result<()> {
- let proto_event = daemon_event_to_proto(event);
- let request = SendEventRequest {
- event: Some(proto_event),
- };
- self.client.send_event(request).await?;
- Ok(())
- }
-}
-
-/// Convert a daemon event to its proto representation.
-fn daemon_event_to_proto(event: DaemonEvent) -> crate::control::send_event_request::Event {
- use crate::control::send_event_request::Event;
-
- match event {
- DaemonEvent::HistoryPruned => Event::HistoryPruned(HistoryPrunedEvent {}),
- DaemonEvent::HistoryRebuilt => Event::HistoryRebuilt(HistoryRebuiltEvent {}),
- DaemonEvent::HistoryDeleted { ids } => Event::HistoryDeleted(HistoryDeletedEvent {
- ids: ids.into_iter().map(|id| id.0).collect(),
- }),
- DaemonEvent::ForceSync => Event::ForceSync(ForceSyncEvent {}),
- DaemonEvent::SettingsReloaded => Event::SettingsReloaded(SettingsReloadedEvent {}),
- DaemonEvent::ShutdownRequested => Event::Shutdown(ShutdownEvent {}),
- // These events are internal and not sent via the control service
- DaemonEvent::HistoryStarted(_)
- | DaemonEvent::HistoryEnded(_)
- | DaemonEvent::RecordsAdded(_)
- | DaemonEvent::SyncCompleted { .. }
- | DaemonEvent::SyncFailed { .. } => {
- // Use shutdown as a fallback, though this shouldn't happen
- tracing::warn!("attempted to send internal event via control service");
- Event::Shutdown(ShutdownEvent {})
- }
- }
-}
-
-// ============================================================================
-// Convenience Functions
-// ============================================================================
-
-/// Emit an event to the daemon.
-///
-/// This is a fire-and-forget helper for sending events to the daemon from
-/// external processes like CLI commands. If the daemon isn't running, this
-/// will silently succeed (returns Ok).
-///
-/// # Example
-///
-/// ```ignore
-/// // After pruning history
-/// emit_event(DaemonEvent::HistoryPruned).await?;
-///
-/// // After deleting specific history items
-/// emit_event(DaemonEvent::HistoryDeleted { ids: vec![...] }).await?;
-///
-/// // Request immediate sync
-/// emit_event(DaemonEvent::ForceSync).await?;
-/// ```
-pub async fn emit_event(event: DaemonEvent) -> Result<()> {
- emit_event_with_settings(event, None).await
-}
-
-/// Emit an event to the daemon with explicit settings.
-///
-/// If settings are not provided, they will be loaded from the default location.
-/// If the daemon isn't running, this will silently succeed.
-pub async fn emit_event_with_settings(
- event: DaemonEvent,
- settings: Option<&Settings>,
-) -> Result<()> {
- // Load settings if not provided
- let owned_settings;
- let settings = match settings {
- Some(s) => s,
- None => {
- owned_settings = Settings::new()?;
- &owned_settings
- }
- };
-
- // Try to connect - if daemon isn't running, that's fine
- let mut client = match ControlClient::from_settings(settings).await {
- Ok(c) => c,
- Err(e) => {
- tracing::debug!(?e, "daemon not running, skipping event emission");
- return Ok(());
- }
- };
-
- // Send the event
- if let Err(e) = client.send_event(event).await {
- tracing::debug!(?e, "failed to send event to daemon");
- // Don't fail - this is fire-and-forget
- }
-
- Ok(())
-}