aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@atuin.sh>2026-04-11 01:32:24 +0100
committerGitHub <noreply@github.com>2026-04-11 01:32:24 +0100
commit7e47f4df6ceb0fe7e32c166776e4e3b960039b67 (patch)
tree5e512c17c6c9d36c6e347a579a9baacc56832817 /crates/atuin-daemon
parentchore: Prepare 18.14.0-beta.1 release (#3393) (diff)
downloadatuin-7e47f4df6ceb0fe7e32c166776e4e3b960039b67.zip
feat: add history tail for live monitoring view (#3389)
Useful for watching what agents are doing, or viewing live info from other machines. Regardless I am liking it for debugging ## Checks - [ ] I am happy for maintainers to push small adjustments to this PR, to speed up the review cycle - [ ] I have checked that there are no existing pull requests for the same thing
Diffstat (limited to 'crates/atuin-daemon')
-rw-r--r--crates/atuin-daemon/proto/history.proto27
-rw-r--r--crates/atuin-daemon/src/client.rs11
-rw-r--r--crates/atuin-daemon/src/components/history.rs81
-rw-r--r--crates/atuin-daemon/tests/lifecycle.rs50
4 files changed, 165 insertions, 4 deletions
diff --git a/crates/atuin-daemon/proto/history.proto b/crates/atuin-daemon/proto/history.proto
index 2a45b7cf..59c12471 100644
--- a/crates/atuin-daemon/proto/history.proto
+++ b/crates/atuin-daemon/proto/history.proto
@@ -46,9 +46,36 @@ message ShutdownReply {
bool accepted = 1;
}
+message TailHistoryRequest {}
+
+enum HistoryEventKind {
+ HISTORY_EVENT_KIND_UNSPECIFIED = 0;
+ HISTORY_EVENT_KIND_STARTED = 1;
+ HISTORY_EVENT_KIND_ENDED = 2;
+}
+
+message HistoryEntry {
+ uint64 timestamp = 1; // nanosecond unix epoch
+ string id = 2;
+ string command = 3;
+ string cwd = 4;
+ string session = 5;
+ string hostname = 6;
+ string author = 7;
+ string intent = 8;
+ int64 exit = 9;
+ int64 duration = 10;
+}
+
+message TailHistoryReply {
+ HistoryEventKind kind = 1;
+ HistoryEntry history = 2;
+}
+
service History {
rpc StartHistory(StartHistoryRequest) returns (StartHistoryReply);
rpc EndHistory(EndHistoryRequest) returns (EndHistoryReply);
+ rpc TailHistory(TailHistoryRequest) returns (stream TailHistoryReply);
rpc Status(StatusRequest) returns (StatusReply);
rpc Shutdown(ShutdownRequest) returns (ShutdownReply);
}
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
index 2f492f6b..5f4ce20f 100644
--- a/crates/atuin-daemon/src/client.rs
+++ b/crates/atuin-daemon/src/client.rs
@@ -23,7 +23,8 @@ use crate::control::{
use crate::events::DaemonEvent;
use crate::history::{
EndHistoryReply, EndHistoryRequest, ShutdownRequest, StartHistoryReply, StartHistoryRequest,
- StatusReply, StatusRequest, history_client::HistoryClient as HistoryServiceClient,
+ StatusReply, StatusRequest, TailHistoryReply, TailHistoryRequest,
+ history_client::HistoryClient as HistoryServiceClient,
};
use crate::search::{
FilterMode as RpcFilterMode, SearchContext as RpcSearchContext, SearchRequest, SearchResponse,
@@ -140,6 +141,14 @@ impl HistoryClient {
Ok(self.client.status(StatusRequest {}).await?.into_inner())
}
+ pub async fn tail_history(&mut self) -> Result<tonic::Streaming<TailHistoryReply>> {
+ Ok(self
+ .client
+ .tail_history(TailHistoryRequest {})
+ .await?
+ .into_inner())
+ }
+
pub async fn shutdown(&mut self) -> Result<bool> {
let resp = self.client.shutdown(ShutdownRequest {}).await?.into_inner();
Ok(resp.accepted)
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
index 23d48c5e..c82c8f94 100644
--- a/crates/atuin-daemon/src/components/history.rs
+++ b/crates/atuin-daemon/src/components/history.rs
@@ -2,7 +2,7 @@
//!
//! Handles command history lifecycle (start/end) and provides the History gRPC service.
-use std::sync::Arc;
+use std::{pin::Pin, sync::Arc};
use atuin_client::{
database::Database,
@@ -12,6 +12,7 @@ use atuin_client::{
use dashmap::DashMap;
use eyre::Result;
use time::OffsetDateTime;
+use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use tracing::{Level, instrument};
@@ -19,8 +20,9 @@ use crate::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
history::{
- EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply,
- StartHistoryRequest, StatusReply, StatusRequest,
+ EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
+ ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
+ TailHistoryReply, TailHistoryRequest,
history_server::{History as HistorySvc, HistoryServer},
},
};
@@ -114,8 +116,28 @@ pub struct HistoryGrpcService {
inner: Arc<HistoryComponentInner>,
}
+fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
+ TailHistoryReply {
+ kind: kind as i32,
+ history: Some(HistoryEntry {
+ timestamp: history.timestamp.unix_timestamp_nanos() as u64,
+ id: history.id.0,
+ command: history.command,
+ cwd: history.cwd,
+ session: history.session,
+ hostname: history.hostname,
+ author: history.author,
+ intent: history.intent.unwrap_or_default(),
+ exit: history.exit,
+ duration: history.duration,
+ }),
+ }
+}
+
#[tonic::async_trait]
impl HistorySvc for HistoryGrpcService {
+ type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
+
#[instrument(skip_all, level = Level::INFO)]
async fn start_history(
&self,
@@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService {
.cwd(req.cwd)
.session(req.session)
.hostname(req.hostname)
+ .author(req.author)
+ .intent(req.intent)
.build()
.into();
@@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService {
}
#[instrument(skip_all, level = Level::INFO)]
+ async fn tail_history(
+ &self,
+ _request: Request<TailHistoryRequest>,
+ ) -> Result<Response<Self::TailHistoryStream>, Status> {
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .cloned()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let mut rx = handle.subscribe();
+ let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
+
+ tokio::spawn(async move {
+ loop {
+ let event = match rx.recv().await {
+ Ok(event) => event,
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ let _ = tx
+ .send(Err(Status::resource_exhausted(format!(
+ "tail stream lagged behind and dropped {skipped} events"
+ ))))
+ .await;
+ break;
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
+ };
+
+ let reply = match event {
+ DaemonEvent::HistoryStarted(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Started, history))
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Ended, history))
+ }
+ _ => None,
+ };
+
+ if let Some(reply) = reply
+ && tx.send(Ok(reply)).await.is_err()
+ {
+ break;
+ }
+ }
+ });
+
+ let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
+ Ok(Response::new(Box::pin(stream)))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
async fn status(
&self,
_request: Request<StatusRequest>,
diff --git a/crates/atuin-daemon/tests/lifecycle.rs b/crates/atuin-daemon/tests/lifecycle.rs
index 3b6952de..4a91e5cb 100644
--- a/crates/atuin-daemon/tests/lifecycle.rs
+++ b/crates/atuin-daemon/tests/lifecycle.rs
@@ -146,6 +146,56 @@ mod unix {
}
#[tokio::test]
+ async fn test_tail_history_streams_started_and_ended_events() {
+ use atuin_client::history::History;
+ use atuin_daemon::history::HistoryEventKind;
+
+ let (mut client, _handle, _tmp) = start_test_daemon().await;
+ let mut stream = client.tail_history().await.unwrap();
+
+ let history = History::daemon()
+ .timestamp(time::OffsetDateTime::now_utc())
+ .command("git status".to_string())
+ .cwd("/tmp/repo".to_string())
+ .session("tail-session".to_string())
+ .hostname("test-host:ellie".to_string())
+ .author("claude".to_string())
+ .intent("inspect repository state".to_string())
+ .build()
+ .into();
+
+ let start_reply = client.start_history(history).await.unwrap();
+
+ let started = stream.message().await.unwrap().unwrap();
+ assert_eq!(
+ HistoryEventKind::try_from(started.kind).unwrap(),
+ HistoryEventKind::Started
+ );
+ let started_history = started.history.unwrap();
+ assert_eq!(started_history.id, start_reply.id);
+ assert_eq!(started_history.command, "git status");
+ assert_eq!(started_history.cwd, "/tmp/repo");
+ assert_eq!(started_history.hostname, "test-host:ellie");
+ assert_eq!(started_history.author, "claude");
+ assert_eq!(started_history.intent, "inspect repository state");
+
+ client
+ .end_history(start_reply.id.clone(), 1_000_000, 0)
+ .await
+ .unwrap();
+
+ let ended = stream.message().await.unwrap().unwrap();
+ assert_eq!(
+ HistoryEventKind::try_from(ended.kind).unwrap(),
+ HistoryEventKind::Ended
+ );
+ let ended_history = ended.history.unwrap();
+ assert_eq!(ended_history.id, start_reply.id);
+ assert_eq!(ended_history.exit, 0);
+ assert_eq!(ended_history.duration, 1_000_000);
+ }
+
+ #[tokio::test]
async fn test_end_unknown_history_fails() {
let (mut client, _handle, _tmp) = start_test_daemon().await;