From bce0faa1c2dc221b0ff77d2cd647bfb2a48ffa7e Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Wed, 8 May 2024 12:09:04 +0100 Subject: feat: add background daemon (#2006) * init daemon crate * wip * minimal functioning daemon, needs cleanup for sure * better errors * add signal cleanup * logging * things * add sync worker * move daemon crate * 30s -> 5mins * make clippy happy * fix stuff maybe? * fmt * trim packages * rate limit fix * more protoc huh * this makes no sense, why linux why * can it install literally just curl * windows in ci is slow, and all the newer things will not work there. disable the daemon feature and it will build * add daemon feature * maybe this * ok wut where is protoc * try setting protoc * hm * try copying protoc * remove optional * add cross config * idk nix * does nix want this? * some random pkg I found does this * uh oh * hack, be gone! * update contributing --- crates/atuin-daemon/Cargo.toml | 34 ++++++ crates/atuin-daemon/build.rs | 4 + crates/atuin-daemon/proto/history.proto | 33 ++++++ crates/atuin-daemon/src/client.rs | 60 +++++++++++ crates/atuin-daemon/src/history.rs | 1 + crates/atuin-daemon/src/lib.rs | 3 + crates/atuin-daemon/src/server.rs | 186 ++++++++++++++++++++++++++++++++ crates/atuin-daemon/src/server/sync.rs | 55 ++++++++++ 8 files changed, 376 insertions(+) create mode 100644 crates/atuin-daemon/Cargo.toml create mode 100644 crates/atuin-daemon/build.rs create mode 100644 crates/atuin-daemon/proto/history.proto create mode 100644 crates/atuin-daemon/src/client.rs create mode 100644 crates/atuin-daemon/src/history.rs create mode 100644 crates/atuin-daemon/src/lib.rs create mode 100644 crates/atuin-daemon/src/server.rs create mode 100644 crates/atuin-daemon/src/server/sync.rs (limited to 'crates/atuin-daemon') diff --git a/crates/atuin-daemon/Cargo.toml b/crates/atuin-daemon/Cargo.toml new file mode 100644 index 00000000..5bcd1611 --- /dev/null +++ b/crates/atuin-daemon/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "atuin-daemon" +edition = "2021" +version = "0.1.0" +authors.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +readme.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +atuin-client = { path = "../atuin-client", version = "18.0.1" } + +time = { workspace = true } +uuid = { workspace = true } +tokio = { workspace = true } +tower = { workspace = true } +eyre = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } + +dashmap = "5.5.3" +tonic-types = "0.11.0" +tonic = "0.11" +prost = "0.12" +prost-types = "0.12" +tokio-stream = {version="0.1.14", features=["net"]} +rand.workspace = true + +[build-dependencies] +tonic-build = "0.11" diff --git a/crates/atuin-daemon/build.rs b/crates/atuin-daemon/build.rs new file mode 100644 index 00000000..95118f6f --- /dev/null +++ b/crates/atuin-daemon/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("./proto/history.proto")?; + Ok(()) +} diff --git a/crates/atuin-daemon/proto/history.proto b/crates/atuin-daemon/proto/history.proto new file mode 100644 index 00000000..95a12282 --- /dev/null +++ b/crates/atuin-daemon/proto/history.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; +package history; + +import "google/protobuf/timestamp.proto"; + +message StartHistoryRequest { + // If people are still using my software in ~530 years, they can figure out a u128 migration + uint64 timestamp = 1; // nanosecond unix epoch + string command = 2; + string cwd = 3; + string session = 4; + string hostname = 5; +} + +message EndHistoryRequest { + string id = 1; + int64 exit = 2; + uint64 duration = 3; +} + +message StartHistoryReply { + string id = 1; +} + +message EndHistoryReply { + string id = 1; + uint64 idx = 2; +} + +service History { + rpc StartHistory(StartHistoryRequest) returns (StartHistoryReply); + rpc EndHistory(EndHistoryRequest) returns (EndHistoryReply); +} diff --git a/crates/atuin-daemon/src/client.rs b/crates/atuin-daemon/src/client.rs new file mode 100644 index 00000000..a832f9a9 --- /dev/null +++ b/crates/atuin-daemon/src/client.rs @@ -0,0 +1,60 @@ +use eyre::{eyre, Result}; +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; + +use atuin_client::history::History; + +use crate::history::{ + history_client::HistoryClient as HistoryServiceClient, EndHistoryRequest, StartHistoryRequest, +}; + +pub struct HistoryClient { + client: HistoryServiceClient, +} + +// Wrap the grpc client +impl HistoryClient { + pub async fn new(path: String) -> Result { + let channel = Endpoint::try_from("http://atuin_local_daemon:0")? + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.to_string(); + + UnixStream::connect(path) + })) + .await + .map_err(|_| eyre!("failed to connect to local atuin daemon. Is it running?"))?; + + let client = HistoryServiceClient::new(channel); + + Ok(HistoryClient { client }) + } + + pub async fn start_history(&mut self, h: History) -> Result { + let req = StartHistoryRequest { + command: h.command, + cwd: h.cwd, + hostname: h.hostname, + session: h.session, + timestamp: h.timestamp.unix_timestamp_nanos() as u64, + }; + + let resp = self.client.start_history(req).await?; + + Ok(resp.into_inner().id) + } + + pub async fn end_history( + &mut self, + id: String, + duration: u64, + exit: i64, + ) -> Result<(String, u64)> { + let req = EndHistoryRequest { id, duration, exit }; + + let resp = self.client.end_history(req).await?; + let resp = resp.into_inner(); + + Ok((resp.id, resp.idx)) + } +} diff --git a/crates/atuin-daemon/src/history.rs b/crates/atuin-daemon/src/history.rs new file mode 100644 index 00000000..57f5b2cf --- /dev/null +++ b/crates/atuin-daemon/src/history.rs @@ -0,0 +1 @@ +tonic::include_proto!("history"); diff --git a/crates/atuin-daemon/src/lib.rs b/crates/atuin-daemon/src/lib.rs new file mode 100644 index 00000000..e00060bc --- /dev/null +++ b/crates/atuin-daemon/src/lib.rs @@ -0,0 +1,3 @@ +pub mod client; +pub mod history; +pub mod server; diff --git a/crates/atuin-daemon/src/server.rs b/crates/atuin-daemon/src/server.rs new file mode 100644 index 00000000..42ef1701 --- /dev/null +++ b/crates/atuin-daemon/src/server.rs @@ -0,0 +1,186 @@ +use eyre::WrapErr; + +use atuin_client::encryption; +use atuin_client::history::store::HistoryStore; +use atuin_client::record::sqlite_store::SqliteStore; +use atuin_client::settings::Settings; +use std::path::PathBuf; +use std::sync::Arc; +use time::OffsetDateTime; +use tracing::{instrument, Level}; + +use atuin_client::database::{Database, Sqlite as HistoryDatabase}; +use atuin_client::history::{History, HistoryId}; +use dashmap::DashMap; +use eyre::Result; +use tokio::net::UnixListener; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::{transport::Server, Request, Response, Status}; + +use crate::history::history_server::{History as HistorySvc, HistoryServer}; + +use crate::history::{EndHistoryReply, EndHistoryRequest, StartHistoryReply, StartHistoryRequest}; + +mod sync; + +#[derive(Debug)] +pub struct HistoryService { + // A store for WIP history + // This is history that has not yet been completed, aka a command that's current running. + running: Arc>, + store: HistoryStore, + history_db: HistoryDatabase, +} + +impl HistoryService { + pub fn new(store: HistoryStore, history_db: HistoryDatabase) -> Self { + Self { + running: Arc::new(DashMap::new()), + store, + history_db, + } + } +} + +#[tonic::async_trait()] +impl HistorySvc for HistoryService { + #[instrument(skip_all, level = Level::INFO)] + async fn start_history( + &self, + request: Request, + ) -> Result, Status> { + let running = self.running.clone(); + let req = request.into_inner(); + + let timestamp = + OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| { + Status::invalid_argument( + "failed to parse timestamp as unix time (expected nanos since epoch)", + ) + })?; + + let h: History = History::daemon() + .timestamp(timestamp) + .command(req.command) + .cwd(req.cwd) + .session(req.session) + .hostname(req.hostname) + .build() + .into(); + + // The old behaviour had us inserting half-finished history records into the database + // The new behaviour no longer allows that. + // History that's running is stored in-memory by the daemon, and only committed when + // complete. + // If anyone relied on the old behaviour, we could perhaps insert to the history db here + // too. I'd rather keep it pure, unless that ends up being the case. + let id = h.id.clone(); + tracing::info!(id = id.to_string(), "start history"); + running.insert(id.clone(), h); + + let reply = StartHistoryReply { id: id.to_string() }; + + Ok(Response::new(reply)) + } + + #[instrument(skip_all, level = Level::INFO)] + async fn end_history( + &self, + request: Request, + ) -> Result, Status> { + let running = self.running.clone(); + let req = request.into_inner(); + + let id = HistoryId(req.id); + + if let Some((_, mut history)) = running.remove(&id) { + history.exit = req.exit; + history.duration = match req.duration { + 0 => i64::try_from( + (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(), + ) + .expect("failed to convert calculated duration to i64"), + value => i64::try_from(value).expect("failed to get i64 duration"), + }; + + // Perhaps allow the incremental build to handle this entirely. + self.history_db + .save(&history) + .await + .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?; + + tracing::info!( + id = id.0.to_string(), + duration = history.duration, + "end history" + ); + + let (id, idx) = + self.store.push(history).await.map_err(|e| { + Status::internal(format!("failed to push record to store: {e:?}")) + })?; + + let reply = EndHistoryReply { + id: id.0.to_string(), + idx, + }; + + return Ok(Response::new(reply)); + } + + Err(Status::not_found(format!( + "could not find history with id: {id}" + ))) + } +} + +async fn shutdown_signal(socket: PathBuf) { + let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register sigterm handler"); + let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("failed to register sigint handler"); + + tokio::select! { + _ = term.recv() => {}, + _ = int.recv() => {}, + } + + eprintln!("Removing socket..."); + std::fs::remove_file(socket).expect("failed to remove socket"); + eprintln!("Shutting down..."); +} + +// break the above down when we end up with multiple services + +/// Listen on a unix socket +/// Pass the path to the socket +pub async fn listen( + settings: Settings, + store: SqliteStore, + history_db: HistoryDatabase, +) -> Result<()> { + let encryption_key: [u8; 32] = encryption::load_key(&settings) + .context("could not load encryption key")? + .into(); + + let host_id = Settings::host_id().expect("failed to get host_id"); + let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); + + let history = HistoryService::new(history_store, history_db); + + let socket = settings.daemon.socket_path.clone(); + let uds = UnixListener::bind(socket.clone())?; + let uds_stream = UnixListenerStream::new(uds); + + tracing::info!("listening on unix socket {:?}", socket); + + // start services + tokio::spawn(sync::worker(settings.clone(), store)); + + Server::builder() + .add_service(HistoryServer::new(history)) + .serve_with_incoming_shutdown(uds_stream, shutdown_signal(socket.into())) + .await?; + + Ok(()) +} diff --git a/crates/atuin-daemon/src/server/sync.rs b/crates/atuin-daemon/src/server/sync.rs new file mode 100644 index 00000000..de34779c --- /dev/null +++ b/crates/atuin-daemon/src/server/sync.rs @@ -0,0 +1,55 @@ +use eyre::Result; +use rand::Rng; +use tokio::time::{self, MissedTickBehavior}; + +use atuin_client::{ + record::{sqlite_store::SqliteStore, sync}, + settings::Settings, +}; + +pub async fn worker(settings: Settings, store: SqliteStore) -> Result<()> { + tracing::info!("booting sync worker"); + + let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + + // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed, + // we may end up running a lot of syncs in a hot loop. No bueno! + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + ticker.tick().await; + tracing::info!("sync worker tick"); + + let res = sync::sync(&settings, &store).await; + + if let Err(e) = res { + tracing::error!("sync tick failed with {e}"); + let mut rng = rand::thread_rng(); + + let new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2); + + // Don't backoff by more than 30 mins + if new_interval > 60.0 * 30.0 { + continue; + } + + ticker = time::interval(time::Duration::from_secs(new_interval as u64)); + ticker.reset_after(time::Duration::from_secs(new_interval as u64)); + + tracing::error!("backing off, next sync tick in {new_interval}"); + } else { + let (uploaded, downloaded) = res.unwrap(); + + tracing::info!( + uploaded = ?uploaded, + downloaded = ?downloaded, + "sync complete" + ); + + // Reset backoff on success + if ticker.period().as_secs() != settings.daemon.sync_frequency { + ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency)); + } + } + } +} -- cgit v1.3.1