diff options
| -rw-r--r-- | atuin-client/src/database.rs | 27 | ||||
| -rw-r--r-- | atuin-client/src/history/store.rs | 104 | ||||
| -rw-r--r-- | atuin-client/src/record/store.rs | 5 | ||||
| -rw-r--r-- | atuin-client/src/record/sync.rs | 17 | ||||
| -rw-r--r-- | atuin/src/command/client.rs | 14 | ||||
| -rw-r--r-- | atuin/src/command/client/history.rs | 10 | ||||
| -rw-r--r-- | atuin/src/command/client/record.rs | 63 | ||||
| -rw-r--r-- | atuin/src/command/client/store.rs | 123 | ||||
| -rw-r--r-- | atuin/src/command/client/sync.rs | 29 |
9 files changed, 286 insertions, 106 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index 376c7b75..a6957093 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -18,7 +18,7 @@ use sqlx::{ }; use time::OffsetDateTime; -use crate::history::HistoryStats; +use crate::history::{HistoryId, HistoryStats}; use super::{ history::History, @@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static { async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>; async fn delete(&self, h: History) -> Result<()>; + async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>; async fn deleted(&self) -> Result<Vec<History>>; // Yes I know, it's a lot. @@ -172,6 +173,18 @@ impl Sqlite { Ok(()) } + async fn delete_row_raw( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + id: HistoryId, + ) -> Result<()> { + sqlx::query("delete from history where id = ?1") + .bind(id.0.as_str()) + .execute(&mut **tx) + .await?; + + Ok(()) + } + fn query_history(row: SqliteRow) -> History { let deleted_at: Option<i64> = row.get("deleted_at"); @@ -567,6 +580,18 @@ impl Database for Sqlite { Ok(()) } + async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> { + let mut tx = self.pool.begin().await?; + + for id in ids { + Self::delete_row_raw(&mut tx, id.clone()).await?; + } + + tx.commit().await?; + + Ok(()) + } + async fn stats(&self, h: &History) -> Result<HistoryStats> { // We select the previous in the session by time let mut prev = SqlBuilder::select_from("history"); diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs index f4aa9d93..a7785452 100644 --- a/atuin-client/src/history/store.rs +++ b/atuin-client/src/history/store.rs @@ -1,8 +1,11 @@ use eyre::{bail, eyre, Result}; use rmp::decode::Bytes; -use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}; -use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx}; +use crate::{ + database::Database, + record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store}, +}; +use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx}; use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION}; @@ -58,14 +61,14 @@ impl HistoryRecord { Ok(DecryptedData(output)) } - pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> { + pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> { use rmp::decode; fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report { eyre!("{err:?}") } - let mut bytes = Bytes::new(bytes); + let mut bytes = Bytes::new(&bytes.0); let record_type = decode::read_u8(&mut bytes).map_err(error_report)?; @@ -147,10 +150,89 @@ impl HistoryStore { self.push_record(record).await } + + pub async fn history(&self) -> Result<Vec<HistoryRecord>> { + // Atm this loads all history into memory + // Not ideal as that is potentially quite a lot, although history will be small. + let records = self.store.all_tagged(HISTORY_TAG).await?; + let mut ret = Vec::with_capacity(records.len()); + + for record in records.into_iter() { + let hist = match record.version.as_str() { + HISTORY_VERSION => { + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION) + } + version => bail!("unknown history version {version:?}"), + }?; + + ret.push(hist); + } + + Ok(ret) + } + + pub async fn build(&self, database: &dyn Database) -> Result<()> { + // I'd like to change how we rebuild and not couple this with the database, but need to + // consider the structure more deeply. This will be easy to change. + + // TODO(ellie): page or iterate this + let history = self.history().await?; + + // In theory we could flatten this here + // The current issue is that the database may have history in it already, from the old sync + // This didn't actually delete old history + // If we're sure we have a DB only maintained by the new store, we can flatten + // create/delete before we even get to sqlite + let mut creates = Vec::new(); + let mut deletes = Vec::new(); + + for i in history { + match i { + HistoryRecord::Create(h) => { + creates.push(h); + } + HistoryRecord::Delete(id) => { + deletes.push(id); + } + } + } + + database.save_bulk(&creates).await?; + database.delete_rows(&deletes).await?; + + Ok(()) + } + + pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> { + for id in ids { + let record = self.store.get(*id).await?; + + if record.tag != HISTORY_TAG { + continue; + } + + let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?; + let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?; + + match record { + HistoryRecord::Create(h) => { + // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time + database.save(&h).await?; + } + HistoryRecord::Delete(id) => { + database.delete_rows(&[id]).await?; + } + } + } + + Ok(()) + } } #[cfg(test)] mod tests { + use atuin_common::record::DecryptedData; use time::macros::datetime; use crate::history::{store::HistoryRecord, HISTORY_VERSION}; @@ -187,13 +269,14 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); // check the snapshot too - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } @@ -208,12 +291,13 @@ mod tests { let serialized = record.serialize().expect("failed to serialize history"); assert_eq!(serialized.0, bytes); - let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION) + let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION) .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); - let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION) - .expect("failed to deserialize HistoryRecord"); + let deserialized = + HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION) + .expect("failed to deserialize HistoryRecord"); assert_eq!(deserialized, record); } } diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs index a5c156d6..efe2eb4a 100644 --- a/atuin-client/src/record/store.rs +++ b/atuin-client/src/record/store.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use eyre::Result; use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus}; + /// A record store stores records /// In more detail - we tend to need to process this into _another_ format to actually query it. /// As is, the record store is intended as the source of truth for arbitratry data, which could @@ -44,8 +45,6 @@ pub trait Store { async fn status(&self) -> Result<RecordStatus>; - /// Get every start record for a given tag, regardless of host. - /// Useful when actually operating on synchronized data, and will often have conflict - /// resolution applied. + /// Get all records for a given tag async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>; } diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs index 2694e0ff..19b8dd1b 100644 --- a/atuin-client/src/record/sync.rs +++ b/atuin-client/src/record/sync.rs @@ -7,7 +7,7 @@ use thiserror::Error; use super::store::Store; use crate::{api_client::Client, settings::Settings}; -use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus}; +use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus}; #[derive(Error, Debug)] pub enum SyncError { @@ -198,11 +198,12 @@ async fn sync_download( tag: String, local: Option<RecordIdx>, remote: RecordIdx, -) -> Result<i64, SyncError> { +) -> Result<Vec<RecordId>, SyncError> { let local = local.unwrap_or(0); let expected = remote - local; let download_page_size = 100; let mut progress = 0; + let mut ret = Vec::new(); println!( "Downloading {} records from {}/{}", @@ -230,6 +231,8 @@ async fn sync_download( expected ); + ret.extend(page.iter().map(|f| f.id)); + progress += page.len() as u64; if progress >= expected { @@ -237,14 +240,14 @@ async fn sync_download( } } - Ok(progress as i64) + Ok(ret) } pub async fn sync_remote( operations: Vec<Operation>, local_store: &impl Store, settings: &Settings, -) -> Result<(i64, i64), SyncError> { +) -> Result<(i64, Vec<RecordId>), SyncError> { let client = Client::new( &settings.sync_address, &settings.session_token, @@ -254,7 +257,7 @@ pub async fn sync_remote( .expect("failed to create client"); let mut uploaded = 0; - let mut downloaded = 0; + let mut downloaded = Vec::new(); // this can totally run in parallel, but lets get it working first for i in operations { @@ -271,9 +274,7 @@ pub async fn sync_remote( tag, local, remote, - } => { - downloaded += sync_download(local_store, &client, host, tag, local, remote).await? - } + } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?, Operation::Noop { .. } => continue, } diff --git a/atuin/src/command/client.rs b/atuin/src/command/client.rs index 9ca199fd..6292d263 100644 --- a/atuin/src/command/client.rs +++ b/atuin/src/command/client.rs @@ -16,9 +16,9 @@ mod config; mod history; mod import; mod kv; -mod record; mod search; mod stats; +mod store; #[derive(Subcommand, Debug)] #[command(infer_subcommands = true)] @@ -48,7 +48,7 @@ pub enum Cmd { Kv(kv::Cmd), #[command(subcommand)] - Record(record::Cmd), + Store(store::Cmd), /// Print example configuration #[command()] @@ -83,23 +83,23 @@ impl Cmd { let record_store_path = PathBuf::from(settings.record_store_path.as_str()); let db = Sqlite::new(db_path).await?; - let store = SqliteStore::new(record_store_path).await?; + let sqlite_store = SqliteStore::new(record_store_path).await?; match self { - Self::History(history) => history.run(&settings, &db, store).await, + Self::History(history) => history.run(&settings, &db, sqlite_store).await, Self::Import(import) => import.run(&db).await, Self::Stats(stats) => stats.run(&db, &settings).await, Self::Search(search) => search.run(db, &mut settings).await, #[cfg(feature = "sync")] - Self::Sync(sync) => sync.run(settings, &db, &store).await, + Self::Sync(sync) => sync.run(settings, &db, sqlite_store).await, #[cfg(feature = "sync")] Self::Account(account) => account.run(settings).await, - Self::Kv(kv) => kv.run(&settings, &store).await, + Self::Kv(kv) => kv.run(&settings, &sqlite_store).await, - Self::Record(record) => record.run(&settings, &store).await, + Self::Store(store) => store.run(&settings, &db, sqlite_store).await, Self::DefaultConfig => { config::run(); diff --git a/atuin/src/command/client/history.rs b/atuin/src/command/client/history.rs index e22ee6db..10f1feb6 100644 --- a/atuin/src/command/client/history.rs +++ b/atuin/src/command/client/history.rs @@ -317,14 +317,14 @@ impl Cmd { if settings.sync.records { let (diff, _) = record::sync::diff(settings, &store).await?; let operations = record::sync::operations(diff, &store).await?; - let (uploaded, downloaded) = + let (_, downloaded) = record::sync::sync_remote(operations, &store, settings).await?; - println!("{uploaded}/{downloaded} up/down to record store"); + history_store.incremental_build(db, &downloaded).await?; + } else { + debug!("running periodic background sync"); + sync::sync(settings, false, db).await?; } - - debug!("running periodic background sync"); - sync::sync(settings, false, db).await?; } #[cfg(not(feature = "sync"))] debug!("not compiled with sync support"); diff --git a/atuin/src/command/client/record.rs b/atuin/src/command/client/record.rs deleted file mode 100644 index 3c91cdcc..00000000 --- a/atuin/src/command/client/record.rs +++ /dev/null @@ -1,63 +0,0 @@ -use clap::Subcommand; -use eyre::Result; - -use atuin_client::{record::store::Store, settings::Settings}; -use time::OffsetDateTime; - -#[derive(Subcommand, Debug)] -#[command(infer_subcommands = true)] -pub enum Cmd { - Status, -} - -impl Cmd { - pub async fn run( - &self, - _settings: &Settings, - store: &(impl Store + Send + Sync), - ) -> Result<()> { - let host_id = Settings::host_id().expect("failed to get host_id"); - - let status = store.status().await?; - - // TODO: should probs build some data structure and then pretty-print it or smth - for (host, st) in &status.hosts { - let host_string = if host == &host_id { - format!("host: {} <- CURRENT HOST", host.0.as_hyphenated()) - } else { - format!("host: {}", host.0.as_hyphenated()) - }; - - println!("{host_string}"); - - for (tag, idx) in st { - println!("\tstore: {tag}"); - - let first = store.first(*host, tag).await?; - let last = store.last(*host, tag).await?; - - println!("\t\tidx: {idx}"); - - if let Some(first) = first { - println!("\t\tfirst: {}", first.id.0.as_hyphenated()); - - let time = - OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?; - println!("\t\t\tcreated: {time}"); - } - - if let Some(last) = last { - println!("\t\tlast: {}", last.id.0.as_hyphenated()); - - let time = - OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?; - println!("\t\t\tcreated: {time}"); - } - } - - println!(); - } - - Ok(()) - } -} diff --git a/atuin/src/command/client/store.rs b/atuin/src/command/client/store.rs new file mode 100644 index 00000000..640a284b --- /dev/null +++ b/atuin/src/command/client/store.rs @@ -0,0 +1,123 @@ +use clap::{Args, Subcommand}; +use eyre::{bail, Result}; + +use atuin_client::{ + database::Database, + encryption, + history::store::HistoryStore, + record::{sqlite_store::SqliteStore, store::Store}, + settings::Settings, +}; +use time::OffsetDateTime; + +#[derive(Args, Debug)] +pub struct Rebuild { + pub tag: String, +} + +impl Rebuild { + pub async fn run( + &self, + settings: &Settings, + store: SqliteStore, + database: &dyn Database, + ) -> Result<()> { + // keep it as a string and not an enum atm + // would be super cool to build this dynamically in the future + // eg register handles for rebuilding various tags without having to make this part of the + // binary big + match self.tag.as_str() { + "history" => { + self.rebuild_history(settings, store.clone(), database) + .await?; + } + + tag => bail!("unknown tag: {tag}"), + } + + Ok(()) + } + + async fn rebuild_history( + &self, + settings: &Settings, + store: SqliteStore, + database: &dyn Database, + ) -> Result<()> { + let encryption_key: [u8; 32] = encryption::load_key(settings)?.into(); + + let host_id = Settings::host_id().expect("failed to get host_id"); + let history_store = HistoryStore::new(store, host_id, encryption_key); + + history_store.build(database).await?; + + Ok(()) + } +} + +#[derive(Subcommand, Debug)] +#[command(infer_subcommands = true)] +pub enum Cmd { + Status, + Rebuild(Rebuild), +} + +impl Cmd { + pub async fn run( + &self, + settings: &Settings, + database: &dyn Database, + store: SqliteStore, + ) -> Result<()> { + match self { + Self::Status => self.status(store).await, + Self::Rebuild(rebuild) => rebuild.run(settings, store, database).await, + } + } + + pub async fn status(&self, store: SqliteStore) -> Result<()> { + let host_id = Settings::host_id().expect("failed to get host_id"); + + let status = store.status().await?; + + // TODO: should probs build some data structure and then pretty-print it or smth + for (host, st) in &status.hosts { + let host_string = if host == &host_id { + format!("host: {} <- CURRENT HOST", host.0.as_hyphenated()) + } else { + format!("host: {}", host.0.as_hyphenated()) + }; + + println!("{host_string}"); + + for (tag, idx) in st { + println!("\tstore: {tag}"); + + let first = store.first(*host, tag).await?; + let last = store.last(*host, tag).await?; + + println!("\t\tidx: {idx}"); + + if let Some(first) = first { + println!("\t\tfirst: {}", first.id.0.as_hyphenated()); + + let time = + OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?; + println!("\t\t\tcreated: {time}"); + } + + if let Some(last) = last { + println!("\t\tlast: {}", last.id.0.as_hyphenated()); + + let time = + OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?; + println!("\t\t\tcreated: {time}"); + } + } + + println!(); + } + + Ok(()) + } +} diff --git a/atuin/src/command/client/sync.rs b/atuin/src/command/client/sync.rs index 1d2cdf4f..2e58f07d 100644 --- a/atuin/src/command/client/sync.rs +++ b/atuin/src/command/client/sync.rs @@ -3,7 +3,9 @@ use eyre::{Result, WrapErr}; use atuin_client::{ database::Database, - record::{store::Store, sync}, + encryption, + history::store::HistoryStore, + record::{sqlite_store::SqliteStore, sync}, settings::Settings, }; @@ -45,7 +47,7 @@ impl Cmd { self, settings: Settings, db: &impl Database, - store: &(impl Store + Send + Sync), + store: SqliteStore, ) -> Result<()> { match self { Self::Sync { force } => run(&settings, force, db, store).await, @@ -75,17 +77,26 @@ async fn run( settings: &Settings, force: bool, db: &impl Database, - store: &(impl Store + Send + Sync), + store: SqliteStore, ) -> Result<()> { if settings.sync.records { - let (diff, _) = sync::diff(settings, store).await?; - let operations = sync::operations(diff, store).await?; - let (uploaded, downloaded) = sync::sync_remote(operations, store, settings).await?; + let (diff, _) = sync::diff(settings, &store).await?; + let operations = sync::operations(diff, &store).await?; + let (uploaded, downloaded) = sync::sync_remote(operations, &store, settings).await?; - println!("{uploaded}/{downloaded} up/down to record store"); - } + let encryption_key: [u8; 32] = encryption::load_key(settings) + .context("could not load encryption key")? + .into(); + + let host_id = Settings::host_id().expect("failed to get host_id"); + let history_store = HistoryStore::new(store.clone(), host_id, encryption_key); - atuin_client::sync::sync(settings, force, db).await?; + history_store.incremental_build(db, &downloaded).await?; + + println!("{uploaded}/{} up/down to record store", downloaded.len()); + } else { + atuin_client::sync::sync(settings, force, db).await?; + } println!( "Sync complete! {} items in history database, force: {}", |
