diff options
| author | Ellie Huxtable <ellie@elliehuxtable.com> | 2024-04-18 16:41:28 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-18 16:41:28 +0100 |
| commit | 95cc472037fcb3207b510e67f1a44af4e2a2cae9 (patch) | |
| tree | fc1d3e71d8e0bdb806370e4144fd6f373bcc9c5e /crates/atuin-client/src/history | |
| parent | feat: show preview auto (#1804) (diff) | |
| download | atuin-95cc472037fcb3207b510e67f1a44af4e2a2cae9.zip | |
chore: move crates into crates/ dir (#1958)
I'd like to tidy up the root a little, and it's nice to have all the
rust crates in one place
Diffstat (limited to 'crates/atuin-client/src/history')
| -rw-r--r-- | crates/atuin-client/src/history/builder.rs | 99 | ||||
| -rw-r--r-- | crates/atuin-client/src/history/store.rs | 410 |
2 files changed, 509 insertions, 0 deletions
diff --git a/crates/atuin-client/src/history/builder.rs b/crates/atuin-client/src/history/builder.rs new file mode 100644 index 00000000..4e69cf66 --- /dev/null +++ b/crates/atuin-client/src/history/builder.rs @@ -0,0 +1,99 @@ +use typed_builder::TypedBuilder; + +use super::History; + +/// Builder for a history entry that is imported from shell history. +/// +/// The only two required fields are `timestamp` and `command`. +#[derive(Debug, Clone, TypedBuilder)] +pub struct HistoryImported { + timestamp: time::OffsetDateTime, + #[builder(setter(into))] + command: String, + #[builder(default = "unknown".into(), setter(into))] + cwd: String, + #[builder(default = -1)] + exit: i64, + #[builder(default = -1)] + duration: i64, + #[builder(default, setter(strip_option, into))] + session: Option<String>, + #[builder(default, setter(strip_option, into))] + hostname: Option<String>, +} + +impl From<HistoryImported> for History { + fn from(imported: HistoryImported) -> Self { + History::new( + imported.timestamp, + imported.command, + imported.cwd, + imported.exit, + imported.duration, + imported.session, + imported.hostname, + None, + ) + } +} + +/// Builder for a history entry that is captured via hook. +/// +/// This builder is used only at the `start` step of the hook, +/// so it doesn't have any fields which are known only after +/// the command is finished, such as `exit` or `duration`. +#[derive(Debug, Clone, TypedBuilder)] +pub struct HistoryCaptured { + timestamp: time::OffsetDateTime, + #[builder(setter(into))] + command: String, + #[builder(setter(into))] + cwd: String, +} + +impl From<HistoryCaptured> for History { + fn from(captured: HistoryCaptured) -> Self { + History::new( + captured.timestamp, + captured.command, + captured.cwd, + -1, + -1, + None, + None, + None, + ) + } +} + +/// Builder for a history entry that is loaded from the database. +/// +/// All fields are required, as they are all present in the database. +#[derive(Debug, Clone, TypedBuilder)] +pub struct HistoryFromDb { + id: String, + timestamp: time::OffsetDateTime, + command: String, + cwd: String, + exit: i64, + duration: i64, + session: String, + hostname: String, + deleted_at: Option<time::OffsetDateTime>, +} + +impl From<HistoryFromDb> for History { + fn from(from_db: HistoryFromDb) -> Self { + History { + id: from_db.id.into(), + timestamp: from_db.timestamp, + exit: from_db.exit, + command: from_db.command, + cwd: from_db.cwd, + duration: from_db.duration, + session: from_db.session, + hostname: from_db.hostname, + deleted_at: from_db.deleted_at, + } + } +} diff --git a/crates/atuin-client/src/history/store.rs b/crates/atuin-client/src/history/store.rs new file mode 100644 index 00000000..fe2b7b92 --- /dev/null +++ b/crates/atuin-client/src/history/store.rs @@ -0,0 +1,410 @@ +use std::{collections::HashSet, fmt::Write, time::Duration}; + +use eyre::{bail, eyre, Result}; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use rmp::decode::Bytes; + +use crate::{ + database::{current_context, Database}, + record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}, +}; +use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx}; + +use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION}; + +#[derive(Debug)] +pub struct HistoryStore { + pub store: SqliteStore, + pub host_id: HostId, + pub encryption_key: [u8; 32], +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum HistoryRecord { + Create(History), // Create a history record + Delete(HistoryId), // Delete a history record, identified by ID +} + +impl HistoryRecord { + /// Serialize a history record, returning DecryptedData + /// The record will be of a certain type + /// We map those like so: + /// + /// HistoryRecord::Create -> 0 + /// HistoryRecord::Delete-> 1 + /// + /// This numeric identifier is then written as the first byte to the buffer. For history, we + /// append the serialized history right afterwards, to avoid having to handle serialization + /// twice. + /// + /// Deletion simply refers to the history by ID + pub fn serialize(&self) -> Result<DecryptedData> { + // probably don't actually need to use rmp here, but if we ever need to extend it, it's a + // nice wrapper around raw byte stuff + use rmp::encode; + + let mut output = vec![]; + + match self { + HistoryRecord::Create(history) => { + // 0 -> a history create + encode::write_u8(&mut output, 0)?; + + let bytes = history.serialize()?; + + encode::write_bin(&mut output, &bytes.0)?; + } + HistoryRecord::Delete(id) => { + // 1 -> a history delete + encode::write_u8(&mut output, 1)?; + encode::write_str(&mut output, id.0.as_str())?; + } + }; + + Ok(DecryptedData(output)) + } + + pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> { + use rmp::decode; + + fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report { + eyre!("{err:?}") + } + + let mut bytes = Bytes::new(&bytes.0); + + let record_type = decode::read_u8(&mut bytes).map_err(error_report)?; + + match record_type { + // 0 -> HistoryRecord::Create + 0 => { + // not super useful to us atm, but perhaps in the future + // written by write_bin above + let _ = decode::read_bin_len(&mut bytes).map_err(error_report)?; + + let record = History::deserialize(bytes.remaining_slice(), version)?; + + Ok(HistoryRecord::Create(record)) + } + + // 1 -> HistoryRecord::Delete + 1 => { + let bytes = bytes.remaining_slice(); + let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?; + + if !bytes.is_empty() { + bail!( + "trailing bytes decoding HistoryRecord::Delete - malformed? got {bytes:?}" + ); + } + + Ok(HistoryRecord::Delete(id.to_string().into())) + } + + n => { + bail!("unknown HistoryRecord type {n}") + } + } + } +} + +impl HistoryStore { + pub fn new(store: SqliteStore, host_id: HostId, encryption_key: [u8; 32]) -> Self { + HistoryStore { + store, + host_id, + encryption_key, + } + } + + async fn push_record(&self, record: HistoryRecord) -> Result<(RecordId, RecordIdx)> { + let bytes = record.serialize()?; + let idx = self + .store + .last(self.host_id, HISTORY_TAG) + .await? + .map_or(0, |p| p.idx + 1); + + let record = Record::builder() + .host(Host::new(self.host_id)) + .version(HISTORY_VERSION.to_string()) + .tag(HISTORY_TAG.to_string()) + .idx(idx) + .data(bytes) + .build(); + + let id = record.id; + + self.store + .push(&record.encrypt::<PASETO_V4>(&self.encryption_key)) + .await?; + + Ok((id, idx)) + } + + async fn push_batch(&self, records: impl Iterator<Item = HistoryRecord>) -> Result<()> { + let mut ret = Vec::new(); + + let idx = self + .store + .last(self.host_id, HISTORY_TAG) + .await? + .map_or(0, |p| p.idx + 1); + + // Could probably _also_ do this as an iterator, but let's see how this is for now. + // optimizing for minimal sqlite transactions, this code can be optimised later + for (n, record) in records.enumerate() { + let bytes = record.serialize()?; + + let record = Record::builder() + .host(Host::new(self.host_id)) + .version(HISTORY_VERSION.to_string()) + .tag(HISTORY_TAG.to_string()) + .idx(idx + n as u64) + .data(bytes) + .build(); + + let record = record.encrypt::<PASETO_V4>(&self.encryption_key); + + ret.push(record); + } + + self.store.push_batch(ret.iter()).await?; + + Ok(()) + } + + pub async fn delete(&self, id: HistoryId) -> Result<(RecordId, RecordIdx)> { + let record = HistoryRecord::Delete(id); + + self.push_record(record).await + } + + pub async fn push(&self, history: History) -> Result<(RecordId, RecordIdx)> { + // TODO(ellie): move the history store to its own file + // it's tiny rn so fine as is + let record = HistoryRecord::Create(history); + + self.push_record(record).await + } + + pub async fn history(&self) -> Result<Vec<HistoryRecord>> { + // Atm this loads all history into memory + // Not ideal as that is potentially quite a lot, although history will be small. + let records = self.store.all_tagged(HISTORY_TAG).await?; + let mut ret = Vec::with_capacity(records.len()); + + for record in records.into_iter() { + let hist = match record.version.as_str() { + HISTORY_VERSION => { + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + + HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION) + } + version => bail!("unknown history version {version:?}"), + }?; + + ret.push(hist); + } + + Ok(ret) + } + + pub async fn build(&self, database: &dyn Database) -> Result<()> { + // I'd like to change how we rebuild and not couple this with the database, but need to + // consider the structure more deeply. This will be easy to change. + + // TODO(ellie): page or iterate this + let history = self.history().await?; + + // In theory we could flatten this here + // The current issue is that the database may have history in it already, from the old sync + // This didn't actually delete old history + // If we're sure we have a DB only maintained by the new store, we can flatten + // create/delete before we even get to sqlite + let mut creates = Vec::new(); + let mut deletes = Vec::new(); + + for i in history { + match i { + HistoryRecord::Create(h) => { + creates.push(h); + } + HistoryRecord::Delete(id) => { + deletes.push(id); + } + } + } + + database.save_bulk(&creates).await?; + database.delete_rows(&deletes).await?; + + Ok(()) + } + + pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> { + for id in ids { + let record = self.store.get(*id).await; + + let record = if let Ok(record) = record { + record + } else { + continue; + }; + + if record.tag != HISTORY_TAG { + continue; + } + + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?; + + match record { + HistoryRecord::Create(h) => { + // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time + database.save(&h).await?; + } + HistoryRecord::Delete(id) => { + database.delete_rows(&[id]).await?; + } + } + } + + Ok(()) + } + + /// Get a list of history IDs that exist in the store + /// Note: This currently involves loading all history into memory. This is not going to be a + /// large amount in absolute terms, but do not all it in a hot loop. + pub async fn history_ids(&self) -> Result<HashSet<HistoryId>> { + let history = self.history().await?; + + let ret = HashSet::from_iter(history.iter().map(|h| match h { + HistoryRecord::Create(h) => h.id.clone(), + HistoryRecord::Delete(id) => id.clone(), + })); + + Ok(ret) + } + + pub async fn init_store(&self, db: &impl Database) -> Result<()> { + let pb = ProgressBar::new_spinner(); + pb.set_style( + ProgressStyle::with_template("{spinner:.blue} {msg}") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn Write| { + write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() + }) + .progress_chars("#>-"), + ); + pb.enable_steady_tick(Duration::from_millis(500)); + + pb.set_message("Fetching history from old database"); + + let context = current_context(); + let history = db.list(&[], &context, None, false, true).await?; + + pb.set_message("Fetching history already in store"); + let store_ids = self.history_ids().await?; + + pb.set_message("Converting old history to new store"); + let mut records = Vec::new(); + + for i in history { + debug!("loaded {}", i.id); + + if store_ids.contains(&i.id) { + debug!("skipping {} - already exists", i.id); + continue; + } + + if i.deleted_at.is_some() { + records.push(HistoryRecord::Delete(i.id)); + } else { + records.push(HistoryRecord::Create(i)); + } + } + + pb.set_message("Writing to db"); + + if !records.is_empty() { + self.push_batch(records.into_iter()).await?; + } + + pb.finish_with_message("Import complete"); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use atuin_common::record::DecryptedData; + use time::macros::datetime; + + use crate::history::{store::HistoryRecord, HISTORY_VERSION}; + + use super::History; + + #[test] + fn test_serialize_deserialize_create() { + let bytes = [ + 204, 0, 196, 141, 205, 0, 0, 153, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49, + 55, 53, 55, 99, 100, 50, 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99, + 56, 49, 207, 23, 166, 251, 212, 181, 82, 0, 0, 100, 0, 162, 108, 115, 217, 41, 47, 85, + 115, 101, 114, 115, 47, 101, 108, 108, 105, 101, 47, 115, 114, 99, 47, 103, 105, 116, + 104, 117, 98, 46, 99, 111, 109, 47, 97, 116, 117, 105, 110, 115, 104, 47, 97, 116, 117, + 105, 110, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 97, 100, 56, 57, 55, 53, 57, 55, + 56, 53, 50, 53, 50, 55, 97, 51, 49, 99, 57, 57, 56, 48, 53, 57, 170, 98, 111, 111, 112, + 58, 101, 108, 108, 105, 101, 192, + ]; + + let history = History { + id: "018cd4fe81757cd2aee65cd7861f9c81".to_owned().into(), + timestamp: datetime!(2024-01-04 00:00:00.000000 +00:00), + duration: 100, + exit: 0, + command: "ls".to_owned(), + cwd: "/Users/ellie/src/github.com/atuinsh/atuin".to_owned(), + session: "018cd4fead897597852527a31c998059".to_owned(), + hostname: "boop:ellie".to_owned(), + deleted_at: None, + }; + + let record = HistoryRecord::Create(history); + + let serialized = record.serialize().expect("failed to serialize history"); + assert_eq!(serialized.0, bytes); + + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); + assert_eq!(deserialized, record); + + // check the snapshot too + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); + assert_eq!(deserialized, record); + } + + #[test] + fn test_serialize_deserialize_delete() { + let bytes = [ + 204, 1, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49, 55, 53, 55, 99, 100, 50, + 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99, 56, 49, + ]; + let record = HistoryRecord::Delete("018cd4fe81757cd2aee65cd7861f9c81".to_string().into()); + + let serialized = record.serialize().expect("failed to serialize history"); + assert_eq!(serialized.0, bytes); + + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); + assert_eq!(deserialized, record); + + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); + assert_eq!(deserialized, record); + } +} |
