aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/components/history.rs
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/atuin-daemon/src/components/history.rs
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/atuin-daemon/src/components/history.rs')
-rw-r--r--crates/atuin-daemon/src/components/history.rs327
1 files changed, 0 insertions, 327 deletions
diff --git a/crates/atuin-daemon/src/components/history.rs b/crates/atuin-daemon/src/components/history.rs
deleted file mode 100644
index c82c8f94..00000000
--- a/crates/atuin-daemon/src/components/history.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! History component.
-//!
-//! Handles command history lifecycle (start/end) and provides the History gRPC service.
-
-use std::{pin::Pin, sync::Arc};
-
-use atuin_client::{
- database::Database,
- history::{History, HistoryId, store::HistoryStore},
- settings::Settings,
-};
-use dashmap::DashMap;
-use eyre::Result;
-use time::OffsetDateTime;
-use tokio_stream::Stream;
-use tonic::{Request, Response, Status};
-use tracing::{Level, instrument};
-
-use crate::{
- daemon::{Component, DaemonHandle},
- events::DaemonEvent,
- history::{
- EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
- ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
- TailHistoryReply, TailHistoryRequest,
- history_server::{History as HistorySvc, HistoryServer},
- },
-};
-
-const DAEMON_PROTOCOL_VERSION: u32 = 1;
-
-/// History component - manages command history lifecycle.
-///
-/// This component:
-/// - Tracks currently running commands (stored in memory)
-/// - Saves completed commands to the database and record store
-/// - Emits history events for other components (e.g., search indexing)
-/// - Provides the History gRPC service
-pub struct HistoryComponent {
- inner: Arc<HistoryComponentInner>,
-}
-
-struct HistoryComponentInner {
- /// Commands currently running (not yet completed).
- running: DashMap<HistoryId, History>,
-
- /// Handle to the daemon (set during start).
- handle: tokio::sync::RwLock<Option<DaemonHandle>>,
-
- /// History store for pushing records (set during start).
- history_store: tokio::sync::RwLock<Option<HistoryStore>>,
-}
-
-impl HistoryComponent {
- /// Create a new history component.
- pub fn new() -> Self {
- Self {
- inner: Arc::new(HistoryComponentInner {
- running: DashMap::new(),
- handle: tokio::sync::RwLock::new(None),
- history_store: tokio::sync::RwLock::new(None),
- }),
- }
- }
-
- /// Get the gRPC service for this component.
- ///
- /// This returns a tonic service that can be added to a gRPC server.
- pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
- HistoryServer::new(HistoryGrpcService {
- inner: self.inner.clone(),
- })
- }
-}
-
-impl Default for HistoryComponent {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[tonic::async_trait]
-impl Component for HistoryComponent {
- fn name(&self) -> &'static str {
- "history"
- }
-
- async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
- // Create the history store
- let host_id = Settings::host_id().await?;
- let history_store =
- HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
-
- *self.inner.history_store.write().await = Some(history_store);
- *self.inner.handle.write().await = Some(handle);
-
- tracing::info!("history component started");
- Ok(())
- }
-
- async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
- // History component produces events but doesn't need to react to them
- Ok(())
- }
-
- async fn stop(&mut self) -> Result<()> {
- tracing::info!("history component stopped");
- Ok(())
- }
-}
-
-/// The gRPC service implementation.
-///
-/// This is a thin wrapper that delegates to the component's shared state.
-pub struct HistoryGrpcService {
- inner: Arc<HistoryComponentInner>,
-}
-
-fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
- TailHistoryReply {
- kind: kind as i32,
- history: Some(HistoryEntry {
- timestamp: history.timestamp.unix_timestamp_nanos() as u64,
- id: history.id.0,
- command: history.command,
- cwd: history.cwd,
- session: history.session,
- hostname: history.hostname,
- author: history.author,
- intent: history.intent.unwrap_or_default(),
- exit: history.exit,
- duration: history.duration,
- }),
- }
-}
-
-#[tonic::async_trait]
-impl HistorySvc for HistoryGrpcService {
- type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn start_history(
- &self,
- request: Request<StartHistoryRequest>,
- ) -> Result<Response<StartHistoryReply>, Status> {
- let req = request.into_inner();
-
- let timestamp =
- OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
- Status::invalid_argument(
- "failed to parse timestamp as unix time (expected nanos since epoch)",
- )
- })?;
-
- let h: History = History::daemon()
- .timestamp(timestamp)
- .command(req.command)
- .cwd(req.cwd)
- .session(req.session)
- .hostname(req.hostname)
- .author(req.author)
- .intent(req.intent)
- .build()
- .into();
-
- // Emit the event
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.emit(DaemonEvent::HistoryStarted(h.clone()));
- }
-
- let id = h.id.clone();
- tracing::info!(id = id.to_string(), "start history");
- self.inner.running.insert(id.clone(), h);
-
- let reply = StartHistoryReply {
- id: id.to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn end_history(
- &self,
- request: Request<EndHistoryRequest>,
- ) -> Result<Response<EndHistoryReply>, Status> {
- let req = request.into_inner();
- let id = HistoryId(req.id);
-
- if let Some((_, mut history)) = self.inner.running.remove(&id) {
- history.exit = req.exit;
- history.duration = match req.duration {
- 0 => i64::try_from(
- (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
- )
- .expect("failed to convert calculated duration to i64"),
- value => i64::try_from(value).expect("failed to get i64 duration"),
- };
-
- // Get the handle and store to save the history
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let store_guard = self.inner.history_store.read().await;
- let history_store = store_guard
- .as_ref()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- // Save to database
- handle
- .history_db()
- .save(&history)
- .await
- .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
-
- tracing::info!(
- id = id.0.to_string(),
- duration = history.duration,
- "end history"
- );
-
- // Push to record store
- let (record_id, idx) = history_store
- .push(history.clone())
- .await
- .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
-
- // Emit the event
- handle.emit(DaemonEvent::HistoryEnded(history));
-
- let reply = EndHistoryReply {
- id: record_id.0.to_string(),
- idx,
- version: env!("CARGO_PKG_VERSION").to_string(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- return Ok(Response::new(reply));
- }
-
- Err(Status::not_found(format!(
- "could not find history with id: {id}"
- )))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn tail_history(
- &self,
- _request: Request<TailHistoryRequest>,
- ) -> Result<Response<Self::TailHistoryStream>, Status> {
- let handle_guard = self.inner.handle.read().await;
- let handle = handle_guard
- .as_ref()
- .cloned()
- .ok_or_else(|| Status::internal("component not initialized"))?;
-
- let mut rx = handle.subscribe();
- let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
-
- tokio::spawn(async move {
- loop {
- let event = match rx.recv().await {
- Ok(event) => event,
- Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
- let _ = tx
- .send(Err(Status::resource_exhausted(format!(
- "tail stream lagged behind and dropped {skipped} events"
- ))))
- .await;
- break;
- }
- Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
- };
-
- let reply = match event {
- DaemonEvent::HistoryStarted(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Started, history))
- }
- DaemonEvent::HistoryEnded(history) => {
- Some(history_to_tail_reply(HistoryEventKind::Ended, history))
- }
- _ => None,
- };
-
- if let Some(reply) = reply
- && tx.send(Ok(reply)).await.is_err()
- {
- break;
- }
- }
- });
-
- let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
- Ok(Response::new(Box::pin(stream)))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn status(
- &self,
- _request: Request<StatusRequest>,
- ) -> Result<Response<StatusReply>, Status> {
- let reply = StatusReply {
- healthy: true,
- version: env!("CARGO_PKG_VERSION").to_string(),
- pid: std::process::id(),
- protocol: DAEMON_PROTOCOL_VERSION,
- };
-
- Ok(Response::new(reply))
- }
-
- #[instrument(skip_all, level = Level::INFO)]
- async fn shutdown(
- &self,
- _request: Request<ShutdownRequest>,
- ) -> Result<Response<ShutdownReply>, Status> {
- // Use the daemon handle to request shutdown
- if let Some(handle) = self.inner.handle.read().await.as_ref() {
- handle.shutdown();
- }
- Ok(Response::new(ShutdownReply { accepted: true }))
- }
-}