diff options
Diffstat (limited to 'crates/atuin-daemon/src')
| -rw-r--r-- | crates/atuin-daemon/src/client.rs | 11 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 81 |
2 files changed, 88 insertions, 4 deletions
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs index 2f492f6b..5f4ce20f 100644 --- a/crates/atuin-daemon/src/client.rs +++ b/crates/atuin-daemon/src/client.rs @@ -23,7 +23,8 @@ use crate::control::{ use crate::events::DaemonEvent; use crate::history::{ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, - StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient, + StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, + history_client::HistoryClient as HistoryServiceClient, }; use crate::search::{ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, @@ -140,6 +141,14 @@ impl HistoryClient { Ok(self.client.status(StatusRequest {}).await?.into_inner()) } + pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> { + Ok(self + .client + .tail_history(TailHistoryRequest {}) + .await? + .into_inner()) + } + pub async fn shutdown(&mut self) -> Result<bool> { let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner(); Ok(resp.accepted) diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs index 23d48c5e..c82c8f94 100644 --- a/crates/atuin-daemon/src/components/history.rs +++ b/crates/atuin-daemon/src/components/history.rs @@ -2,7 +2,7 @@ //! //! Handles command history lifecycle (start/end) and provides the History gRPC service. -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use atuin_client::{ database::Database, @@ -12,6 +12,7 @@ use atuin_client::{ use dashmap::DashMap; use eyre::Result; use time::OffsetDateTime; +use tokio_stream::Stream; use tonic::{Request, Response, Status}; use tracing::{Level, instrument}; @@ -19,8 +20,9 @@ use crate::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, history::{ - EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, - StartHistoryRequest, StatusReply, StatusRequest, + EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, + ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, + TailHistoryReply, TailHistoryRequest, history_server::{History as HistorySvc, HistoryServer}, }, }; @@ -114,8 +116,28 @@ pub struct HistoryGrpcService { inner: Arc<HistoryComponentInner>, } +fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { + TailHistoryReply { + kind: kind as i32, + history: Some(HistoryEntry { + timestamp: history.timestamp.unix_timestamp_nanos() as u64, + id: history.id.0, + command: history.command, + cwd: history.cwd, + session: history.session, + hostname: history.hostname, + author: history.author, + intent: history.intent.unwrap_or_default(), + exit: history.exit, + duration: history.duration, + }), + } +} + #[tonic::async_trait] impl HistorySvc for HistoryGrpcService { + type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; + #[instrument(skip_all, level = Level::INFO)] async fn start_history( &self, @@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService { .cwd(req.cwd) .session(req.session) .hostname(req.hostname) + .author(req.author) + .intent(req.intent) .build() .into(); @@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + async fn tail_history( + &self, + _request: Request<TailHistoryRequest>, + ) -> Result<Response<Self::TailHistoryStream>, Status> { + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .cloned() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let mut rx = handle.subscribe(); + let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); + + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + let _ = tx + .send(Err(Status::resource_exhausted(format!( + "tail stream lagged behind and dropped {skipped} events" + )))) + .await; + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + }; + + let reply = match event { + DaemonEvent::HistoryStarted(history) => { + Some(history_to_tail_reply(HistoryEventKind::Started, history)) + } + DaemonEvent::HistoryEnded(history) => { + Some(history_to_tail_reply(HistoryEventKind::Ended, history)) + } + _ => None, + }; + + if let Some(reply) = reply + && tx.send(Ok(reply)).await.is_err() + { + break; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); + Ok(Response::new(Box::pin(stream))) + } + + #[instrument(skip_all, level = Level::INFO)] async fn status( &self, _request: Request<StatusRequest>, |
