aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon/src/server.rs
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2024-05-08 12:09:04 +0100
committerGitHub <noreply@github.com>2024-05-08 12:09:04 +0100
commitbce0faa1c2dc221b0ff77d2cd647bfb2a48ffa7e (patch)
tree4d66bd95b151d3bab4cabf8799805c739f608bc4 /crates/atuin-daemon/src/server.rs
parentfix(config): add quotes for strategy value in comment (#1993) (diff)
downloadatuin-bce0faa1c2dc221b0ff77d2cd647bfb2a48ffa7e.zip
feat: add background daemon (#2006)
* init daemon crate * wip * minimal functioning daemon, needs cleanup for sure * better errors * add signal cleanup * logging * things * add sync worker * move daemon crate * 30s -> 5mins * make clippy happy * fix stuff maybe? * fmt * trim packages * rate limit fix * more protoc huh * this makes no sense, why linux why * can it install literally just curl * windows in ci is slow, and all the newer things will not work there. disable the daemon feature and it will build * add daemon feature * maybe this * ok wut where is protoc * try setting protoc * hm * try copying protoc * remove optional * add cross config * idk nix * does nix want this? * some random pkg I found does this * uh oh * hack, be gone! * update contributing
Diffstat (limited to '')
-rw-r--r--crates/atuin-daemon/src/server.rs186
1 files changed, 186 insertions, 0 deletions
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
new file mode 100644
index 00000000..42ef1701
--- /dev/null
+++ b/crates/atuin-daemon/src/server.rs
@@ -0,0 +1,186 @@
+use eyre::WrapErr;
+
+use atuin_client::encryption;
+use atuin_client::history::store::HistoryStore;
+use atuin_client::record::sqlite_store::SqliteStore;
+use atuin_client::settings::Settings;
+use std::path::PathBuf;
+use std::sync::Arc;
+use time::OffsetDateTime;
+use tracing::{instrument, Level};
+
+use atuin_client::database::{Database, Sqlite as HistoryDatabase};
+use atuin_client::history::{History, HistoryId};
+use dashmap::DashMap;
+use eyre::Result;
+use tokio::net::UnixListener;
+use tokio_stream::wrappers::UnixListenerStream;
+use tonic::{transport::Server, Request, Response, Status};
+
+use crate::history::history_server::{History as HistorySvc, HistoryServer};
+
+use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest};
+
+mod sync;
+
+#[derive(Debug)]
+pub struct HistoryService {
+ // A store for WIP history
+ // This is history that has not yet been completed, aka a command that's current running.
+ running: Arc<DashMap<HistoryId, History>>,
+ store: HistoryStore,
+ history_db: HistoryDatabase,
+}
+
+impl HistoryService {
+ pub fn new(store: HistoryStore, history_db: HistoryDatabase) -> Self {
+ Self {
+ running: Arc::new(DashMap::new()),
+ store,
+ history_db,
+ }
+ }
+}
+
+#[tonic::async_trait()]
+impl HistorySvc for HistoryService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let running = self.running.clone();
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .build()
+ .into();
+
+ // The old behaviour had us inserting half-finished history records into the database
+ // The new behaviour no longer allows that.
+ // History that's running is stored in-memory by the daemon, and only committed when
+ // complete.
+ // If anyone relied on the old behaviour, we could perhaps insert to the history db here
+ // too. I'd rather keep it pure, unless that ends up being the case.
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply { id: id.to_string() };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let running = self.running.clone();
+ let req = request.into_inner();
+
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Perhaps allow the incremental build to handle this entirely.
+ self.history_db
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ let (id, idx) =
+ self.store.push(history).await.map_err(|e| {
+ Status::internal(format!("failed to push record to store: {e:?}"))
+ })?;
+
+ let reply = EndHistoryReply {
+ id: id.0.to_string(),
+ idx,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+}
+
+async fn shutdown_signal(socket: PathBuf) {
+ let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .expect("failed to register sigterm handler");
+ let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
+ .expect("failed to register sigint handler");
+
+ tokio::select! {
+ _ = term.recv() => {},
+ _ = int.recv() => {},
+ }
+
+ eprintln!("Removing socket...");
+ std::fs::remove_file(socket).expect("failed to remove socket");
+ eprintln!("Shutting down...");
+}
+
+// break the above down when we end up with multiple services
+
+/// Listen on a unix socket
+/// Pass the path to the socket
+pub async fn listen(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: HistoryDatabase,
+) -> Result<()> {
+ let encryption_key: [u8; 32] = encryption::load_key(&settings)
+ .context("could not load encryption key")?
+ .into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+
+ let history = HistoryService::new(history_store, history_db);
+
+ let socket = settings.daemon.socket_path.clone();
+ let uds = UnixListener::bind(socket.clone())?;
+ let uds_stream = UnixListenerStream::new(uds);
+
+ tracing::info!("listening on unix socket {:?}", socket);
+
+ // start services
+ tokio::spawn(sync::worker(settings.clone(), store));
+
+ Server::builder()
+ .add_service(HistoryServer::new(history))
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal(socket.into()))
+ .await?;
+
+ Ok(())
+}