diff options
| author | Ellie Huxtable <ellie@elliehuxtable.com> | 2024-01-16 11:25:09 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-16 11:25:09 +0000 |
| commit | a2578c4521d4615d8265744ab51a1cc4f291605e (patch) | |
| tree | 26fb3da7a1d7312691703919cc700e433bbd1220 /atuin-client/src/history | |
| parent | fix(sync): save sync time when it starts, not ends (#1573) (diff) | |
| download | atuin-a2578c4521d4615d8265744ab51a1cc4f291605e.zip | |
feat: add history rebuild (#1575)
* feat: add history rebuild
This adds a function that will
1. List all history from the store
2. Segment by create/delete
3. Insert all creates into the database
4. Delete all deleted
This replaces the old history sync.
Presently it's incomplete. There is no incremental rebuild, it can only
do the entire thing at once.
This is ran by `atuin store rebuild history`
* fix tests
* add incremental sync
* add auto sync
Diffstat (limited to 'atuin-client/src/history')
| -rw-r--r-- | atuin-client/src/history/store.rs | 104 |
1 files changed, 94 insertions, 10 deletions
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs index f4aa9d93..a7785452 100644 --- a/atuin-client/src/history/store.rs +++ b/atuin-client/src/history/store.rs @@ -1,8 +1,11 @@ use eyre::{bail, eyre, Result}; use rmp::decode::Bytes; -use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}; -use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx}; +use crate::{ + database::Database, + record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}, +}; +use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx}; use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION}; @@ -58,14 +61,14 @@ impl HistoryRecord { Ok(DecryptedData(output)) } - pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> { + pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> { use rmp::decode; fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report { eyre!("{err:?}") } - let mut bytes = Bytes::new(bytes); + let mut bytes = Bytes::new(&bytes.0); let record_type = decode::read_u8(&mut bytes).map_err(error_report)?; @@ -147,10 +150,89 @@ impl HistoryStore { self.push_record(record).await } + + pub async fn history(&self) -> Result<Vec<HistoryRecord>> { + // Atm this loads all history into memory + // Not ideal as that is potentially quite a lot, although history will be small. + let records = self.store.all_tagged(HISTORY_TAG).await?; + let mut ret = Vec::with_capacity(records.len()); + + for record in records.into_iter() { + let hist = match record.version.as_str() { + HISTORY_VERSION => { + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION) + } + version => bail!("unknown history version {version:?}"), + }?; + + ret.push(hist); + } + + Ok(ret) + } + + pub async fn build(&self, database: &dyn Database) -> Result<()> { + // I'd like to change how we rebuild and not couple this with the database, but need to + // consider the structure more deeply. This will be easy to change. + + // TODO(ellie): page or iterate this + let history = self.history().await?; + + // In theory we could flatten this here + // The current issue is that the database may have history in it already, from the old sync + // This didn't actually delete old history + // If we're sure we have a DB only maintained by the new store, we can flatten + // create/delete before we even get to sqlite + let mut creates = Vec::new(); + let mut deletes = Vec::new(); + + for i in history { + match i { + HistoryRecord::Create(h) => { + creates.push(h); + } + HistoryRecord::Delete(id) => { + deletes.push(id); + } + } + } + + database.save_bulk(&creates).await?; + database.delete_rows(&deletes).await?; + + Ok(()) + } + + pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> { + for id in ids { + let record = self.store.get(*id).await?; + + if record.tag != HISTORY_TAG { + continue; + } + + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?; + + match record { + HistoryRecord::Create(h) => { + // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time + database.save(&h).await?; + } + HistoryRecord::Delete(id) => { + database.delete_rows(&[id]).await?; + } + } + } + + Ok(()) + } } #[cfg(test)] mod tests { + use atuin_common::record::DecryptedData; use time::macros::datetime; use crate::history::{store::HistoryRecord, HISTORY_VERSION}; @@ -187,13 +269,14 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); // check the snapshot too - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } @@ -208,12 +291,13 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } } |
