diff options
| author | Ellie Huxtable <ellie@atuin.sh> | 2026-04-11 01:32:24 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-04-11 01:32:24 +0100 |
| commit | 7e47f4df6ceb0fe7e32c166776e4e3b960039b67 (patch) | |
| tree | 5e512c17c6c9d36c6e347a579a9baacc56832817 /crates/atuin-daemon | |
| parent | chore: Prepare 18.14.0-beta.1 release (#3393) (diff) | |
| download | atuin-7e47f4df6ceb0fe7e32c166776e4e3b960039b67.zip | |
feat: add history tail for live monitoring view (#3389)
Useful for watching what agents are doing, or viewing live info from
other machines. Regardless I am liking it for debugging
## Checks
- [ ] I am happy for maintainers to push small adjustments to this PR,
to speed up the review cycle
- [ ] I have checked that there are no existing pull requests for the
same thing
Diffstat (limited to 'crates/atuin-daemon')
| -rw-r--r-- | crates/atuin-daemon/proto/history.proto | 27 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/client.rs | 11 | ||||
| -rw-r--r-- | crates/atuin-daemon/src/components/history.rs | 81 | ||||
| -rw-r--r-- | crates/atuin-daemon/tests/lifecycle.rs | 50 |
4 files changed, 165 insertions, 4 deletions
diff --git a/crates/atuin-daemon/proto/history.proto b/crates/atuin-daemon/proto/history.proto index 2a45b7cf..59c12471 100644 --- a/crates/atuin-daemon/proto/history.proto +++ b/crates/atuin-daemon/proto/history.proto @@ -46,9 +46,36 @@ message ShutdownReply { bool accepted = 1; } +message TailHistoryRequest {} + +enum HistoryEventKind { + HISTORY_EVENT_KIND_UNSPECIFIED = 0; + HISTORY_EVENT_KIND_STARTED = 1; + HISTORY_EVENT_KIND_ENDED = 2; +} + +message HistoryEntry { + uint64 timestamp = 1; // nanosecond unix epoch + string id = 2; + string command = 3; + string cwd = 4; + string session = 5; + string hostname = 6; + string author = 7; + string intent = 8; + int64 exit = 9; + int64 duration = 10; +} + +message TailHistoryReply { + HistoryEventKind kind = 1; + HistoryEntry history = 2; +} + service History { rpc StartHistory(StartHistoryRequest) returns (StartHistoryReply); rpc EndHistory(EndHistoryRequest) returns (EndHistoryReply); + rpc TailHistory(TailHistoryRequest) returns (stream TailHistoryReply); rpc Status(StatusRequest) returns (StatusReply); rpc Shutdown(ShutdownRequest) returns (ShutdownReply); } diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs index 2f492f6b..5f4ce20f 100644 --- a/crates/atuin-daemon/src/client.rs +++ b/crates/atuin-daemon/src/client.rs @@ -23,7 +23,8 @@ use crate::control::{ use crate::events::DaemonEvent; use crate::history::{ EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest, - StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient, + StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest, + history_client::HistoryClient as HistoryServiceClient, }; use crate::search::{ FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse, @@ -140,6 +141,14 @@ impl HistoryClient { Ok(self.client.status(StatusRequest {}).await?.into_inner()) } + pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> { + Ok(self + .client + .tail_history(TailHistoryRequest {}) + .await? + .into_inner()) + } + pub async fn shutdown(&mut self) -> Result<bool> { let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner(); Ok(resp.accepted) diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs index 23d48c5e..c82c8f94 100644 --- a/crates/atuin-daemon/src/components/history.rs +++ b/crates/atuin-daemon/src/components/history.rs @@ -2,7 +2,7 @@ //! //! Handles command history lifecycle (start/end) and provides the History gRPC service. -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use atuin_client::{ database::Database, @@ -12,6 +12,7 @@ use atuin_client::{ use dashmap::DashMap; use eyre::Result; use time::OffsetDateTime; +use tokio_stream::Stream; use tonic::{Request, Response, Status}; use tracing::{Level, instrument}; @@ -19,8 +20,9 @@ use crate::{ daemon::{Component, DaemonHandle}, events::DaemonEvent, history::{ - EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply, - StartHistoryRequest, StatusReply, StatusRequest, + EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply, + ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest, + TailHistoryReply, TailHistoryRequest, history_server::{History as HistorySvc, HistoryServer}, }, }; @@ -114,8 +116,28 @@ pub struct HistoryGrpcService { inner: Arc<HistoryComponentInner>, } +fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply { + TailHistoryReply { + kind: kind as i32, + history: Some(HistoryEntry { + timestamp: history.timestamp.unix_timestamp_nanos() as u64, + id: history.id.0, + command: history.command, + cwd: history.cwd, + session: history.session, + hostname: history.hostname, + author: history.author, + intent: history.intent.unwrap_or_default(), + exit: history.exit, + duration: history.duration, + }), + } +} + #[tonic::async_trait] impl HistorySvc for HistoryGrpcService { + type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>; + #[instrument(skip_all, level = Level::INFO)] async fn start_history( &self, @@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService { .cwd(req.cwd) .session(req.session) .hostname(req.hostname) + .author(req.author) + .intent(req.intent) .build() .into(); @@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService { } #[instrument(skip_all, level = Level::INFO)] + async fn tail_history( + &self, + _request: Request<TailHistoryRequest>, + ) -> Result<Response<Self::TailHistoryStream>, Status> { + let handle_guard = self.inner.handle.read().await; + let handle = handle_guard + .as_ref() + .cloned() + .ok_or_else(|| Status::internal("component not initialized"))?; + + let mut rx = handle.subscribe(); + let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128); + + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + let _ = tx + .send(Err(Status::resource_exhausted(format!( + "tail stream lagged behind and dropped {skipped} events" + )))) + .await; + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + }; + + let reply = match event { + DaemonEvent::HistoryStarted(history) => { + Some(history_to_tail_reply(HistoryEventKind::Started, history)) + } + DaemonEvent::HistoryEnded(history) => { + Some(history_to_tail_reply(HistoryEventKind::Ended, history)) + } + _ => None, + }; + + if let Some(reply) = reply + && tx.send(Ok(reply)).await.is_err() + { + break; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx); + Ok(Response::new(Box::pin(stream))) + } + + #[instrument(skip_all, level = Level::INFO)] async fn status( &self, _request: Request<StatusRequest>, diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs index 3b6952de..4a91e5cb 100644 --- a/crates/atuin-daemon/tests/lifecycle.rs +++ b/crates/atuin-daemon/tests/lifecycle.rs @@ -146,6 +146,56 @@ mod unix { } #[tokio::test] + async fn test_tail_history_streams_started_and_ended_events() { + use atuin_client::history::History; + use atuin_daemon::history::HistoryEventKind; + + let (mut client, _handle, _tmp) = start_test_daemon().await; + let mut stream = client.tail_history().await.unwrap(); + + let history = History::daemon() + .timestamp(time::OffsetDateTime::now_utc()) + .command("git status".to_string()) + .cwd("/tmp/repo".to_string()) + .session("tail-session".to_string()) + .hostname("test-host:ellie".to_string()) + .author("claude".to_string()) + .intent("inspect repository state".to_string()) + .build() + .into(); + + let start_reply = client.start_history(history).await.unwrap(); + + let started = stream.message().await.unwrap().unwrap(); + assert_eq!( + HistoryEventKind::try_from(started.kind).unwrap(), + HistoryEventKind::Started + ); + let started_history = started.history.unwrap(); + assert_eq!(started_history.id, start_reply.id); + assert_eq!(started_history.command, "git status"); + assert_eq!(started_history.cwd, "/tmp/repo"); + assert_eq!(started_history.hostname, "test-host:ellie"); + assert_eq!(started_history.author, "claude"); + assert_eq!(started_history.intent, "inspect repository state"); + + client + .end_history(start_reply.id.clone(), 1_000_000, 0) + .await + .unwrap(); + + let ended = stream.message().await.unwrap().unwrap(); + assert_eq!( + HistoryEventKind::try_from(ended.kind).unwrap(), + HistoryEventKind::Ended + ); + let ended_history = ended.history.unwrap(); + assert_eq!(ended_history.id, start_reply.id); + assert_eq!(ended_history.exit, 0); + assert_eq!(ended_history.duration, 1_000_000); + } + + #[tokio::test] async fn test_end_unknown_history_fails() { let (mut client, _handle, _tmp) = start_test_daemon().await; |
