aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/components
diff options
context:
space:
mode:
Diffstat (limited to 'crates/atuin-daemon/src/components')
-rw-r--r--crates/atuin-daemon/src/components/history.rs81
1 files changed, 78 insertions, 3 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
index 23d48c5e..c82c8f94 100644
--- a/crates/atuin-daemon/src/components/history.rs
+++ b/crates/atuin-daemon/src/components/history.rs
@@ -2,7 +2,7 @@
//!
//! Handles command history lifecycle (start/end) and provides the History gRPC service.
-use std::sync::Arc;
+use std::{pin::Pin, sync::Arc};
use atuin_client::{
database::Database,
@@ -12,6 +12,7 @@ use atuin_client::{
use dashmap::DashMap;
use eyre::Result;
use time::OffsetDateTime;
+use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use tracing::{Level, instrument};
@@ -19,8 +20,9 @@ use crate::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
history::{
- EndHistoryReply, EndHistoryRequest, ShutdownReply, ShutdownRequest, StartHistoryReply,
- StartHistoryRequest, StatusReply, StatusRequest,
+ EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
+ ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
+ TailHistoryReply, TailHistoryRequest,
history_server::{History as HistorySvc, HistoryServer},
},
};
@@ -114,8 +116,28 @@ pub struct HistoryGrpcService {
inner: Arc<HistoryComponentInner>,
}
+fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
+ TailHistoryReply {
+ kind: kind as i32,
+ history: Some(HistoryEntry {
+ timestamp: history.timestamp.unix_timestamp_nanos() as u64,
+ id: history.id.0,
+ command: history.command,
+ cwd: history.cwd,
+ session: history.session,
+ hostname: history.hostname,
+ author: history.author,
+ intent: history.intent.unwrap_or_default(),
+ exit: history.exit,
+ duration: history.duration,
+ }),
+ }
+}
+
#[tonic::async_trait]
impl HistorySvc for HistoryGrpcService {
+ type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
+
#[instrument(skip_all, level = Level::INFO)]
async fn start_history(
&self,
@@ -136,6 +158,8 @@ impl HistorySvc for HistoryGrpcService {
.cwd(req.cwd)
.session(req.session)
.hostname(req.hostname)
+ .author(req.author)
+ .intent(req.intent)
.build()
.into();
@@ -224,6 +248,57 @@ impl HistorySvc for HistoryGrpcService {
}
#[instrument(skip_all, level = Level::INFO)]
+ async fn tail_history(
+ &self,
+ _request: Request<TailHistoryRequest>,
+ ) -> Result<Response<Self::TailHistoryStream>, Status> {
+ let handle_guard = self.inner.handle.read().await;
+ let handle = handle_guard
+ .as_ref()
+ .cloned()
+ .ok_or_else(|| Status::internal("component not initialized"))?;
+
+ let mut rx = handle.subscribe();
+ let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
+
+ tokio::spawn(async move {
+ loop {
+ let event = match rx.recv().await {
+ Ok(event) => event,
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ let _ = tx
+ .send(Err(Status::resource_exhausted(format!(
+ "tail stream lagged behind and dropped {skipped} events"
+ ))))
+ .await;
+ break;
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
+ };
+
+ let reply = match event {
+ DaemonEvent::HistoryStarted(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Started, history))
+ }
+ DaemonEvent::HistoryEnded(history) => {
+ Some(history_to_tail_reply(HistoryEventKind::Ended, history))
+ }
+ _ => None,
+ };
+
+ if let Some(reply) = reply
+ && tx.send(Ok(reply)).await.is_err()
+ {
+ break;
+ }
+ }
+ });
+
+ let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
+ Ok(Response::new(Box::pin(stream)))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
async fn status(
&self,
_request: Request<StatusRequest>,