diff options
Diffstat (limited to 'atuin-client/src')
| -rw-r--r-- | atuin-client/src/database.rs | 27 | ||||
| -rw-r--r-- | atuin-client/src/history/store.rs | 104 | ||||
| -rw-r--r-- | atuin-client/src/record/store.rs | 5 | ||||
| -rw-r--r-- | atuin-client/src/record/sync.rs | 17 |
4 files changed, 131 insertions, 22 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index 376c7b75..a6957093 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -18,7 +18,7 @@ use sqlx::{ }; use time::OffsetDateTime; -use crate::history::HistoryStats; +use crate::history::{HistoryId, HistoryStats}; use super::{ history::History, @@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static { async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>; async fn delete(&self, h: History) -> Result<()>; + async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>; async fn deleted(&self) -> Result<Vec<History>>; // Yes I know, it's a lot. @@ -172,6 +173,18 @@ impl Sqlite { Ok(()) } + async fn delete_row_raw( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + id: HistoryId, + ) -> Result<()> { + sqlx::query("delete from history where id = ?1") + .bind(id.0.as_str()) + .execute(&mut **tx) + .await?; + + Ok(()) + } + fn query_history(row: SqliteRow) -> History { let deleted_at: Option<i64> = row.get("deleted_at"); @@ -567,6 +580,18 @@ impl Database for Sqlite { Ok(()) } + async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> { + let mut tx = self.pool.begin().await?; + + for id in ids { + Self::delete_row_raw(&mut tx, id.clone()).await?; + } + + tx.commit().await?; + + Ok(()) + } + async fn stats(&self, h: &History) -> Result<HistoryStats> { // We select the previous in the session by time let mut prev = SqlBuilder::select_from("history"); diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs index f4aa9d93..a7785452 100644 --- a/atuin-client/src/history/store.rs +++ b/atuin-client/src/history/store.rs @@ -1,8 +1,11 @@ use eyre::{bail, eyre, Result}; use rmp::decode::Bytes; -use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}; -use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx}; +use crate::{ + database::Database, + record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}, +}; +use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx}; use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION}; @@ -58,14 +61,14 @@ impl HistoryRecord { Ok(DecryptedData(output)) } - pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> { + pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> { use rmp::decode; fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report { eyre!("{err:?}") } - let mut bytes = Bytes::new(bytes); + let mut bytes = Bytes::new(&bytes.0); let record_type = decode::read_u8(&mut bytes).map_err(error_report)?; @@ -147,10 +150,89 @@ impl HistoryStore { self.push_record(record).await } + + pub async fn history(&self) -> Result<Vec<HistoryRecord>> { + // Atm this loads all history into memory + // Not ideal as that is potentially quite a lot, although history will be small. + let records = self.store.all_tagged(HISTORY_TAG).await?; + let mut ret = Vec::with_capacity(records.len()); + + for record in records.into_iter() { + let hist = match record.version.as_str() { + HISTORY_VERSION => { + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION) + } + version => bail!("unknown history version {version:?}"), + }?; + + ret.push(hist); + } + + Ok(ret) + } + + pub async fn build(&self, database: &dyn Database) -> Result<()> { + // I'd like to change how we rebuild and not couple this with the database, but need to + // consider the structure more deeply. This will be easy to change. + + // TODO(ellie): page or iterate this + let history = self.history().await?; + + // In theory we could flatten this here + // The current issue is that the database may have history in it already, from the old sync + // This didn't actually delete old history + // If we're sure we have a DB only maintained by the new store, we can flatten + // create/delete before we even get to sqlite + let mut creates = Vec::new(); + let mut deletes = Vec::new(); + + for i in history { + match i { + HistoryRecord::Create(h) => { + creates.push(h); + } + HistoryRecord::Delete(id) => { + deletes.push(id); + } + } + } + + database.save_bulk(&creates).await?; + database.delete_rows(&deletes).await?; + + Ok(()) + } + + pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> { + for id in ids { + let record = self.store.get(*id).await?; + + if record.tag != HISTORY_TAG { + continue; + } + + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?; + + match record { + HistoryRecord::Create(h) => { + // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time + database.save(&h).await?; + } + HistoryRecord::Delete(id) => { + database.delete_rows(&[id]).await?; + } + } + } + + Ok(()) + } } #[cfg(test)] mod tests { + use atuin_common::record::DecryptedData; use time::macros::datetime; use crate::history::{store::HistoryRecord, HISTORY_VERSION}; @@ -187,13 +269,14 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); // check the snapshot too - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } @@ -208,12 +291,13 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } } diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs index a5c156d6..efe2eb4a 100644 --- a/atuin-client/src/record/store.rs +++ b/atuin-client/src/record/store.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use eyre::Result; use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus}; + /// A record store stores records /// In more detail - we tend to need to process this into _another_ format to actually query it. /// As is, the record store is intended as the source of truth for arbitratry data, which could @@ -44,8 +45,6 @@ pub trait Store { async fn status(&self) -> Result<RecordStatus>; - /// Get every start record for a given tag, regardless of host. - /// Useful when actually operating on synchronized data, and will often have conflict - /// resolution applied. + /// Get all records for a given tag async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>; } diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs index 2694e0ff..19b8dd1b 100644 --- a/atuin-client/src/record/sync.rs +++ b/atuin-client/src/record/sync.rs @@ -7,7 +7,7 @@ use thiserror::Error; use super::store::Store; use crate::{api_client::Client, settings::Settings}; -use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus}; +use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus}; #[derive(Error, Debug)] pub enum SyncError { @@ -198,11 +198,12 @@ async fn sync_download( tag: String, local: Option<RecordIdx>, remote: RecordIdx, -) -> Result<i64, SyncError> { +) -> Result<Vec<RecordId>, SyncError> { let local = local.unwrap_or(0); let expected = remote - local; let download_page_size = 100; let mut progress = 0; + let mut ret = Vec::new(); println!( "Downloading {} records from {}/{}", @@ -230,6 +231,8 @@ async fn sync_download( expected ); + ret.extend(page.iter().map(|f| f.id)); + progress += page.len() as u64; if progress >= expected { @@ -237,14 +240,14 @@ async fn sync_download( } } - Ok(progress as i64) + Ok(ret) } pub async fn sync_remote( operations: Vec<Operation>, local_store: &impl Store, settings: &Settings, -) -> Result<(i64, i64), SyncError> { +) -> Result<(i64, Vec<RecordId>), SyncError> { let client = Client::new( &settings.sync_address, &settings.session_token, @@ -254,7 +257,7 @@ pub async fn sync_remote( .expect("failed to create client"); let mut uploaded = 0; - let mut downloaded = 0; + let mut downloaded = Vec::new(); // this can totally run in parallel, but lets get it working first for i in operations { @@ -271,9 +274,7 @@ pub async fn sync_remote( tag, local, remote, - } => { - downloaded += sync_download(local_store, &client, host, tag, local, remote).await? - } + } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?, Operation::Noop { .. } => continue, } |
