1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
|
//! History component.
//!
//! Handles command history lifecycle (start/end) and provides the History gRPC service.
use std::{pin::Pin, sync::Arc};
use atuin_client::{
database::Database,
history::{History, HistoryId, store::HistoryStore},
settings::Settings,
};
use dashmap::DashMap;
use eyre::Result;
use time::OffsetDateTime;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use tracing::{Level, instrument};
use crate::{
daemon::{Component, DaemonHandle},
events::DaemonEvent,
history::{
EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
TailHistoryReply, TailHistoryRequest,
history_server::{History as HistorySvc, HistoryServer},
},
};
const DAEMON_PROTOCOL_VERSION: u32 = 1;
/// History component - manages command history lifecycle.
///
/// This component:
/// - Tracks currently running commands (stored in memory)
/// - Saves completed commands to the database and record store
/// - Emits history events for other components (e.g., search indexing)
/// - Provides the History gRPC service
pub struct HistoryComponent {
inner: Arc<HistoryComponentInner>,
}
struct HistoryComponentInner {
/// Commands currently running (not yet completed).
running: DashMap<HistoryId, History>,
/// Handle to the daemon (set during start).
handle: tokio::sync::RwLock<Option<DaemonHandle>>,
/// History store for pushing records (set during start).
history_store: tokio::sync::RwLock<Option<HistoryStore>>,
}
impl HistoryComponent {
/// Create a new history component.
pub fn new() -> Self {
Self {
inner: Arc::new(HistoryComponentInner {
running: DashMap::new(),
handle: tokio::sync::RwLock::new(None),
history_store: tokio::sync::RwLock::new(None),
}),
}
}
/// Get the gRPC service for this component.
///
/// This returns a tonic service that can be added to a gRPC server.
pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
HistoryServer::new(HistoryGrpcService {
inner: self.inner.clone(),
})
}
}
impl Default for HistoryComponent {
fn default() -> Self {
Self::new()
}
}
#[tonic::async_trait]
impl Component for HistoryComponent {
fn name(&self) -> &'static str {
"history"
}
async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
// Create the history store
let host_id = Settings::host_id().await?;
let history_store =
HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
*self.inner.history_store.write().await = Some(history_store);
*self.inner.handle.write().await = Some(handle);
tracing::info!("history component started");
Ok(())
}
async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
// History component produces events but doesn't need to react to them
Ok(())
}
async fn stop(&mut self) -> Result<()> {
tracing::info!("history component stopped");
Ok(())
}
}
/// The gRPC service implementation.
///
/// This is a thin wrapper that delegates to the component's shared state.
pub struct HistoryGrpcService {
inner: Arc<HistoryComponentInner>,
}
fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
TailHistoryReply {
kind: kind as i32,
history: Some(HistoryEntry {
timestamp: history.timestamp.unix_timestamp_nanos() as u64,
id: history.id.0,
command: history.command,
cwd: history.cwd,
session: history.session,
hostname: history.hostname,
author: history.author,
intent: history.intent.unwrap_or_default(),
exit: history.exit,
duration: history.duration,
}),
}
}
#[tonic::async_trait]
impl HistorySvc for HistoryGrpcService {
type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
#[instrument(skip_all, level = Level::INFO)]
async fn start_history(
&self,
request: Request<StartHistoryRequest>,
) -> Result<Response<StartHistoryReply>, Status> {
let req = request.into_inner();
let timestamp =
OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
Status::invalid_argument(
"failed to parse timestamp as unix time (expected nanos since epoch)",
)
})?;
let h: History = History::daemon()
.timestamp(timestamp)
.command(req.command)
.cwd(req.cwd)
.session(req.session)
.hostname(req.hostname)
.author(req.author)
.intent(req.intent)
.build()
.into();
// Emit the event
if let Some(handle) = self.inner.handle.read().await.as_ref() {
handle.emit(DaemonEvent::HistoryStarted(h.clone()));
}
let id = h.id.clone();
tracing::info!(id = id.to_string(), "start history");
self.inner.running.insert(id.clone(), h);
let reply = StartHistoryReply {
id: id.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
protocol: DAEMON_PROTOCOL_VERSION,
};
Ok(Response::new(reply))
}
#[instrument(skip_all, level = Level::INFO)]
async fn end_history(
&self,
request: Request<EndHistoryRequest>,
) -> Result<Response<EndHistoryReply>, Status> {
let req = request.into_inner();
let id = HistoryId(req.id);
if let Some((_, mut history)) = self.inner.running.remove(&id) {
history.exit = req.exit;
history.duration = match req.duration {
0 => i64::try_from(
(OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
)
.expect("failed to convert calculated duration to i64"),
value => i64::try_from(value).expect("failed to get i64 duration"),
};
// Get the handle and store to save the history
let handle_guard = self.inner.handle.read().await;
let handle = handle_guard
.as_ref()
.ok_or_else(|| Status::internal("component not initialized"))?;
let store_guard = self.inner.history_store.read().await;
let history_store = store_guard
.as_ref()
.ok_or_else(|| Status::internal("component not initialized"))?;
// Save to database
handle
.history_db()
.save(&history)
.await
.map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
tracing::info!(
id = id.0.to_string(),
duration = history.duration,
"end history"
);
// Push to record store
let (record_id, idx) = history_store
.push(history.clone())
.await
.map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
// Emit the event
handle.emit(DaemonEvent::HistoryEnded(history));
let reply = EndHistoryReply {
id: record_id.0.to_string(),
idx,
version: env!("CARGO_PKG_VERSION").to_string(),
protocol: DAEMON_PROTOCOL_VERSION,
};
return Ok(Response::new(reply));
}
Err(Status::not_found(format!(
"could not find history with id: {id}"
)))
}
#[instrument(skip_all, level = Level::INFO)]
async fn tail_history(
&self,
_request: Request<TailHistoryRequest>,
) -> Result<Response<Self::TailHistoryStream>, Status> {
let handle_guard = self.inner.handle.read().await;
let handle = handle_guard
.as_ref()
.cloned()
.ok_or_else(|| Status::internal("component not initialized"))?;
let mut rx = handle.subscribe();
let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
tokio::spawn(async move {
loop {
let event = match rx.recv().await {
Ok(event) => event,
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
let _ = tx
.send(Err(Status::resource_exhausted(format!(
"tail stream lagged behind and dropped {skipped} events"
))))
.await;
break;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
};
let reply = match event {
DaemonEvent::HistoryStarted(history) => {
Some(history_to_tail_reply(HistoryEventKind::Started, history))
}
DaemonEvent::HistoryEnded(history) => {
Some(history_to_tail_reply(HistoryEventKind::Ended, history))
}
_ => None,
};
if let Some(reply) = reply
&& tx.send(Ok(reply)).await.is_err()
{
break;
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
Ok(Response::new(Box::pin(stream)))
}
#[instrument(skip_all, level = Level::INFO)]
async fn status(
&self,
_request: Request<StatusRequest>,
) -> Result<Response<StatusReply>, Status> {
let reply = StatusReply {
healthy: true,
version: env!("CARGO_PKG_VERSION").to_string(),
pid: std::process::id(),
protocol: DAEMON_PROTOCOL_VERSION,
};
Ok(Response::new(reply))
}
#[instrument(skip_all, level = Level::INFO)]
async fn shutdown(
&self,
_request: Request<ShutdownRequest>,
) -> Result<Response<ShutdownReply>, Status> {
// Use the daemon handle to request shutdown
if let Some(handle) = self.inner.handle.read().await.as_ref() {
handle.shutdown();
}
Ok(Response::new(ShutdownReply { accepted: true }))
}
}
|