aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-daemon
diff options
context:
space:
mode:
Diffstat (limited to 'crates/atuin-daemon')
-rw-r--r--crates/atuin-daemon/Cargo.toml34
-rw-r--r--crates/atuin-daemon/build.rs4
-rw-r--r--crates/atuin-daemon/proto/history.proto33
-rw-r--r--crates/atuin-daemon/src/client.rs60
-rw-r--r--crates/atuin-daemon/src/history.rs1
-rw-r--r--crates/atuin-daemon/src/lib.rs3
-rw-r--r--crates/atuin-daemon/src/server.rs186
-rw-r--r--crates/atuin-daemon/src/server/sync.rs55
8 files changed, 376 insertions, 0 deletions
diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml
new file mode 100644
index 00000000..5bcd1611
--- /dev/null
+++ b/crates/atuin-daemon/Cargo.toml
@@ -0,0 +1,34 @@
+[package]
+name = "atuin-daemon"
+edition = "2021"
+version = "0.1.0"
+authors.workspace = true
+rust-version.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+readme.workspace = true
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+atuin-client = { path = "../atuin-client", version = "18.0.1" }
+
+time = { workspace = true }
+uuid = { workspace = true }
+tokio = { workspace = true }
+tower = { workspace = true }
+eyre = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+
+dashmap = "5.5.3"
+tonic-types = "0.11.0"
+tonic = "0.11"
+prost = "0.12"
+prost-types = "0.12"
+tokio-stream = {version="0.1.14", features=["net"]}
+rand.workspace = true
+
+[build-dependencies]
+tonic-build = "0.11"
diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs
new file mode 100644
index 00000000..95118f6f
--- /dev/null
+++ b/crates/atuin-daemon/build.rs
@@ -0,0 +1,4 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ tonic_build::compile_protos("./proto/history.proto")?;
+ Ok(())
+}
diff --git a/crates/atuin-daemon/proto/history.proto b/crates/atuin-daemon/proto/history.proto
new file mode 100644
index 00000000..95a12282
--- /dev/null
+++ b/crates/atuin-daemon/proto/history.proto
@@ -0,0 +1,33 @@
+syntax = "proto3";
+package history;
+
+import "google/protobuf/timestamp.proto";
+
+message StartHistoryRequest {
+ // If people are still using my software in ~530 years, they can figure out a u128 migration
+ uint64 timestamp = 1; // nanosecond unix epoch
+ string command = 2;
+ string cwd = 3;
+ string session = 4;
+ string hostname = 5;
+}
+
+message EndHistoryRequest {
+ string id = 1;
+ int64 exit = 2;
+ uint64 duration = 3;
+}
+
+message StartHistoryReply {
+ string id = 1;
+}
+
+message EndHistoryReply {
+ string id = 1;
+ uint64 idx = 2;
+}
+
+service History {
+ rpc StartHistory(StartHistoryRequest) returns (StartHistoryReply);
+ rpc EndHistory(EndHistoryRequest) returns (EndHistoryReply);
+}
diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs
new file mode 100644
index 00000000..a832f9a9
--- /dev/null
+++ b/crates/atuin-daemon/src/client.rs
@@ -0,0 +1,60 @@
+use eyre::{eyre, Result};
+use tokio::net::UnixStream;
+use tonic::transport::{Channel, Endpoint, Uri};
+use tower::service_fn;
+
+use atuin_client::history::History;
+
+use crate::history::{
+ history_client::HistoryClient as HistoryServiceClient, EndHistoryRequest, StartHistoryRequest,
+};
+
+pub struct HistoryClient {
+ client: HistoryServiceClient<Channel>,
+}
+
+// Wrap the grpc client
+impl HistoryClient {
+ pub async fn new(path: String) -> Result<Self> {
+ let channel = Endpoint::try_from("http://atuin_local_daemon:0")?
+ .connect_with_connector(service_fn(move |_: Uri| {
+ let path = path.to_string();
+
+ UnixStream::connect(path)
+ }))
+ .await
+ .map_err(|_| eyre!("failed to connect to local atuin daemon. Is it running?"))?;
+
+ let client = HistoryServiceClient::new(channel);
+
+ Ok(HistoryClient { client })
+ }
+
+ pub async fn start_history(&mut self, h: History) -> Result<String> {
+ let req = StartHistoryRequest {
+ command: h.command,
+ cwd: h.cwd,
+ hostname: h.hostname,
+ session: h.session,
+ timestamp: h.timestamp.unix_timestamp_nanos() as u64,
+ };
+
+ let resp = self.client.start_history(req).await?;
+
+ Ok(resp.into_inner().id)
+ }
+
+ pub async fn end_history(
+ &mut self,
+ id: String,
+ duration: u64,
+ exit: i64,
+ ) -> Result<(String, u64)> {
+ let req = EndHistoryRequest { id, duration, exit };
+
+ let resp = self.client.end_history(req).await?;
+ let resp = resp.into_inner();
+
+ Ok((resp.id, resp.idx))
+ }
+}
diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs
new file mode 100644
index 00000000..57f5b2cf
--- /dev/null
+++ b/crates/atuin-daemon/src/history.rs
@@ -0,0 +1 @@
+tonic::include_proto!("history");
diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs
new file mode 100644
index 00000000..e00060bc
--- /dev/null
+++ b/crates/atuin-daemon/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod client;
+pub mod history;
+pub mod server;
diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs
new file mode 100644
index 00000000..42ef1701
--- /dev/null
+++ b/crates/atuin-daemon/src/server.rs
@@ -0,0 +1,186 @@
+use eyre::WrapErr;
+
+use atuin_client::encryption;
+use atuin_client::history::store::HistoryStore;
+use atuin_client::record::sqlite_store::SqliteStore;
+use atuin_client::settings::Settings;
+use std::path::PathBuf;
+use std::sync::Arc;
+use time::OffsetDateTime;
+use tracing::{instrument, Level};
+
+use atuin_client::database::{Database, Sqlite as HistoryDatabase};
+use atuin_client::history::{History, HistoryId};
+use dashmap::DashMap;
+use eyre::Result;
+use tokio::net::UnixListener;
+use tokio_stream::wrappers::UnixListenerStream;
+use tonic::{transport::Server, Request, Response, Status};
+
+use crate::history::history_server::{History as HistorySvc, HistoryServer};
+
+use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest};
+
+mod sync;
+
+#[derive(Debug)]
+pub struct HistoryService {
+ // A store for WIP history
+ // This is history that has not yet been completed, aka a command that's current running.
+ running: Arc<DashMap<HistoryId, History>>,
+ store: HistoryStore,
+ history_db: HistoryDatabase,
+}
+
+impl HistoryService {
+ pub fn new(store: HistoryStore, history_db: HistoryDatabase) -> Self {
+ Self {
+ running: Arc::new(DashMap::new()),
+ store,
+ history_db,
+ }
+ }
+}
+
+#[tonic::async_trait()]
+impl HistorySvc for HistoryService {
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn start_history(
+ &self,
+ request: Request<StartHistoryRequest>,
+ ) -> Result<Response<StartHistoryReply>, Status> {
+ let running = self.running.clone();
+ let req = request.into_inner();
+
+ let timestamp =
+ OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
+ Status::invalid_argument(
+ "failed to parse timestamp as unix time (expected nanos since epoch)",
+ )
+ })?;
+
+ let h: History = History::daemon()
+ .timestamp(timestamp)
+ .command(req.command)
+ .cwd(req.cwd)
+ .session(req.session)
+ .hostname(req.hostname)
+ .build()
+ .into();
+
+ // The old behaviour had us inserting half-finished history records into the database
+ // The new behaviour no longer allows that.
+ // History that's running is stored in-memory by the daemon, and only committed when
+ // complete.
+ // If anyone relied on the old behaviour, we could perhaps insert to the history db here
+ // too. I'd rather keep it pure, unless that ends up being the case.
+ let id = h.id.clone();
+ tracing::info!(id = id.to_string(), "start history");
+ running.insert(id.clone(), h);
+
+ let reply = StartHistoryReply { id: id.to_string() };
+
+ Ok(Response::new(reply))
+ }
+
+ #[instrument(skip_all, level = Level::INFO)]
+ async fn end_history(
+ &self,
+ request: Request<EndHistoryRequest>,
+ ) -> Result<Response<EndHistoryReply>, Status> {
+ let running = self.running.clone();
+ let req = request.into_inner();
+
+ let id = HistoryId(req.id);
+
+ if let Some((_, mut history)) = running.remove(&id) {
+ history.exit = req.exit;
+ history.duration = match req.duration {
+ 0 => i64::try_from(
+ (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
+ )
+ .expect("failed to convert calculated duration to i64"),
+ value => i64::try_from(value).expect("failed to get i64 duration"),
+ };
+
+ // Perhaps allow the incremental build to handle this entirely.
+ self.history_db
+ .save(&history)
+ .await
+ .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
+
+ tracing::info!(
+ id = id.0.to_string(),
+ duration = history.duration,
+ "end history"
+ );
+
+ let (id, idx) =
+ self.store.push(history).await.map_err(|e| {
+ Status::internal(format!("failed to push record to store: {e:?}"))
+ })?;
+
+ let reply = EndHistoryReply {
+ id: id.0.to_string(),
+ idx,
+ };
+
+ return Ok(Response::new(reply));
+ }
+
+ Err(Status::not_found(format!(
+ "could not find history with id: {id}"
+ )))
+ }
+}
+
+async fn shutdown_signal(socket: PathBuf) {
+ let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .expect("failed to register sigterm handler");
+ let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
+ .expect("failed to register sigint handler");
+
+ tokio::select! {
+ _ = term.recv() => {},
+ _ = int.recv() => {},
+ }
+
+ eprintln!("Removing socket...");
+ std::fs::remove_file(socket).expect("failed to remove socket");
+ eprintln!("Shutting down...");
+}
+
+// break the above down when we end up with multiple services
+
+/// Listen on a unix socket
+/// Pass the path to the socket
+pub async fn listen(
+ settings: Settings,
+ store: SqliteStore,
+ history_db: HistoryDatabase,
+) -> Result<()> {
+ let encryption_key: [u8; 32] = encryption::load_key(&settings)
+ .context("could not load encryption key")?
+ .into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
+
+ let history = HistoryService::new(history_store, history_db);
+
+ let socket = settings.daemon.socket_path.clone();
+ let uds = UnixListener::bind(socket.clone())?;
+ let uds_stream = UnixListenerStream::new(uds);
+
+ tracing::info!("listening on unix socket {:?}", socket);
+
+ // start services
+ tokio::spawn(sync::worker(settings.clone(), store));
+
+ Server::builder()
+ .add_service(HistoryServer::new(history))
+ .serve_with_incoming_shutdown(uds_stream, shutdown_signal(socket.into()))
+ .await?;
+
+ Ok(())
+}
diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs
new file mode 100644
index 00000000..de34779c
--- /dev/null
+++ b/crates/atuin-daemon/src/server/sync.rs
@@ -0,0 +1,55 @@
+use eyre::Result;
+use rand::Rng;
+use tokio::time::{self, MissedTickBehavior};
+
+use atuin_client::{
+ record::{sqlite_store::SqliteStore, sync},
+ settings::Settings,
+};
+
+pub async fn worker(settings: Settings, store: SqliteStore) -> Result<()> {
+ tracing::info!("booting sync worker");
+
+ let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+
+ // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
+ // we may end up running a lot of syncs in a hot loop. No bueno!
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+
+ loop {
+ ticker.tick().await;
+ tracing::info!("sync worker tick");
+
+ let res = sync::sync(&settings, &store).await;
+
+ if let Err(e) = res {
+ tracing::error!("sync tick failed with {e}");
+ let mut rng = rand::thread_rng();
+
+ let new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
+
+ // Don't backoff by more than 30 mins
+ if new_interval > 60.0 * 30.0 {
+ continue;
+ }
+
+ ticker = time::interval(time::Duration::from_secs(new_interval as u64));
+ ticker.reset_after(time::Duration::from_secs(new_interval as u64));
+
+ tracing::error!("backing off, next sync tick in {new_interval}");
+ } else {
+ let (uploaded, downloaded) = res.unwrap();
+
+ tracing::info!(
+ uploaded = ?uploaded,
+ downloaded = ?downloaded,
+ "sync complete"
+ );
+
+ // Reset backoff on success
+ if ticker.period().as_secs() != settings.daemon.sync_frequency {
+ ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
+ }
+ }
+ }
+}