diff options
| author | Ellie Huxtable <ellie@atuin.sh> | 2026-04-11 01:32:24 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-04-11 01:32:24 +0100 |
| commit | 7e47f4df6ceb0fe7e32c166776e4e3b960039b67 (patch) | |
| tree | 5e512c17c6c9d36c6e347a579a9baacc56832817 /crates/atuin-daemon/src/components/history.rs | |
| parent | chore: Prepare 18.14.0-beta.1 release (#3393) (diff) | |
| download | atuin-7e47f4df6ceb0fe7e32c166776e4e3b960039b67.zip | |
feat: add history tail for live monitoring view (#3389)
Useful for watching what agents are doing, or viewing live info from
other machines. Regardless I am liking it for debugging
## Checks
- [ ] I am happy for maintainers to push small adjustments to this PR,
to speed up the review cycle
- [ ] I have checked that there are no existing pull requests for the
same thing
Diffstat (limited to 'crates/atuin-daemon/src/components/history.rs')
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 81 |
1 files changed, 78 insertions, 3 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs index 23d48c5e..c82c8f94 100644 --- a/crates/atuin-daemon/src/components/history.rs +++ b/crates/atuin-daemon/src/components/history.rs @@ -2,7 +2,7 @@ //! //! Handles command history lifecycle (start/end) and provides the History gRPC service. -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use atuin_client::{ database::Database, @@ -12,6 +12,7 @@ use atuin_client::{ use dashmap::DashMap; use eyre::Result; use time::OffsetDateTime; +use tokio_stream::Stream; use tonic::{Request, Response, Status}; use tracing::{Level, instrument}; @@ -19,8 +20,9 @@ use crate::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, history::{ - EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, - StartHistoryRequest, StatusReply, StatusRequest, + EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, + ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, + TailHistoryReply, TailHistoryRequest, history_server::{History as HistorySvc, HistoryServer}, }, }; @@ -114,8 +116,28 @@ pub struct HistoryGrpcService { inner: Arc<HistoryComponentInner>, } +fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { + TailHistoryReply { + kind: kind as i32, + history: Some(HistoryEntry { + timestamp: history.timestamp.unix_timestamp_nanos() as u64, + id: history.id.0, + command: history.command, + cwd: history.cwd, + session: history.session, + hostname: history.hostname, + author: history.author, + intent: history.intent.unwrap_or_default(), + exit: history.exit, + duration: history.duration, + }), + } +} + #[tonic::async_trait] impl HistorySvc for HistoryGrpcService { + type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; + #[instrument(skip_all, level = Level::INFO)] async fn start_history( &self, @@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService { .cwd(req.cwd) .session(req.session) .hostname(req.hostname) + .author(req.author) + .intent(req.intent) .build() .into(); @@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + async fn tail_history( + &self, + _request: Request<TailHistoryRequest>, + ) -> Result<Response<Self::TailHistoryStream>, Status> { + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .cloned() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let mut rx = handle.subscribe(); + let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); + + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + let _ = tx + .send(Err(Status::resource_exhausted(format!( + "tail stream lagged behind and dropped {skipped} events" + )))) + .await; + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + }; + + let reply = match event { + DaemonEvent::HistoryStarted(history) => { + Some(history_to_tail_reply(HistoryEventKind::Started, history)) + } + DaemonEvent::HistoryEnded(history) => { + Some(history_to_tail_reply(HistoryEventKind::Ended, history)) + } + _ => None, + }; + + if let Some(reply) = reply + && tx.send(Ok(reply)).await.is_err() + { + break; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); + Ok(Response::new(Box::pin(stream))) + } + + #[instrument(skip_all, level = Level::INFO)] async fn status( &self, _request: Request<StatusRequest>, |
