diff options
Diffstat (limited to 'crates/atuin-daemon/src/components/history.rs')
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 81 |
1 files changed, 78 insertions, 3 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs index 23d48c5e..c82c8f94 100644 --- a/crates/atuin-daemon/src/components/history.rs +++ b/crates/atuin-daemon/src/components/history.rs @@ -2,7 +2,7 @@ //! //! Handles command history lifecycle (start/end) and provides the History gRPC service. -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use atuin_client::{ database::Database, @@ -12,6 +12,7 @@ use atuin_client::{ use dashmap::DashMap; use eyre::Result; use time::OffsetDateTime; +use tokio_stream::Stream; use tonic::{Request, Response, Status}; use tracing::{Level, instrument}; @@ -19,8 +20,9 @@ use crate::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, history::{ - EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, - StartHistoryRequest, StatusReply, StatusRequest, + EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, + ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, + TailHistoryReply, TailHistoryRequest, history_server::{History as HistorySvc, HistoryServer}, }, }; @@ -114,8 +116,28 @@ pub struct HistoryGrpcService { inner: Arc<HistoryComponentInner>, } +fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { + TailHistoryReply { + kind: kind as i32, + history: Some(HistoryEntry { + timestamp: history.timestamp.unix_timestamp_nanos() as u64, + id: history.id.0, + command: history.command, + cwd: history.cwd, + session: history.session, + hostname: history.hostname, + author: history.author, + intent: history.intent.unwrap_or_default(), + exit: history.exit, + duration: history.duration, + }), + } +} + #[tonic::async_trait] impl HistorySvc for HistoryGrpcService { + type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; + #[instrument(skip_all, level = Level::INFO)] async fn start_history( &self, @@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService { .cwd(req.cwd) .session(req.session) .hostname(req.hostname) + .author(req.author) + .intent(req.intent) .build() .into(); @@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + async fn tail_history( + &self, + _request: Request<TailHistoryRequest>, + ) -> Result<Response<Self::TailHistoryStream>, Status> { + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .cloned() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let mut rx = handle.subscribe(); + let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); + + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + let _ = tx + .send(Err(Status::resource_exhausted(format!( + "tail stream lagged behind and dropped {skipped} events" + )))) + .await; + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + }; + + let reply = match event { + DaemonEvent::HistoryStarted(history) => { + Some(history_to_tail_reply(HistoryEventKind::Started, history)) + } + DaemonEvent::HistoryEnded(history) => { + Some(history_to_tail_reply(HistoryEventKind::Ended, history)) + } + _ => None, + }; + + if let Some(reply) = reply + && tx.send(Ok(reply)).await.is_err() + { + break; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); + Ok(Response::new(Box::pin(stream))) + } + + #[instrument(skip_all, level = Level::INFO)] async fn status( &self, _request: Request<StatusRequest>, |
