aboutsummaryrefslogtreecommitdiffstats
path: root/atuin-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'atuin-client/src')
-rw-r--r--atuin-client/src/database.rs27
-rw-r--r--atuin-client/src/history/store.rs104
-rw-r--r--atuin-client/src/record/store.rs5
-rw-r--r--atuin-client/src/record/sync.rs17
4 files changed, 131 insertions, 22 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 376c7b75..a6957093 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -18,7 +18,7 @@ use sqlx::{
};
use time::OffsetDateTime;
-use crate::history::HistoryStats;
+use crate::history::{HistoryId, HistoryStats};
use super::{
history::History,
@@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static {
async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>;
async fn delete(&self, h: History) -> Result<()>;
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
@@ -172,6 +173,18 @@ impl Sqlite {
Ok(())
}
+ async fn delete_row_raw(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ id: HistoryId,
+ ) -> Result<()> {
+ sqlx::query("delete from history where id = ?1")
+ .bind(id.0.as_str())
+ .execute(&mut **tx)
+ .await?;
+
+ Ok(())
+ }
+
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
@@ -567,6 +580,18 @@ impl Database for Sqlite {
Ok(())
}
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> {
+ let mut tx = self.pool.begin().await?;
+
+ for id in ids {
+ Self::delete_row_raw(&mut tx, id.clone()).await?;
+ }
+
+ tx.commit().await?;
+
+ Ok(())
+ }
+
async fn stats(&self, h: &History) -> Result<HistoryStats> {
// We select the previous in the session by time
let mut prev = SqlBuilder::select_from("history");
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs
index f4aa9d93..a7785452 100644
--- a/atuin-client/src/history/store.rs
+++ b/atuin-client/src/history/store.rs
@@ -1,8 +1,11 @@
use eyre::{bail, eyre, Result};
use rmp::decode::Bytes;
-use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
-use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
+use crate::{
+ database::Database,
+ record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
+};
+use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
@@ -58,14 +61,14 @@ impl HistoryRecord {
Ok(DecryptedData(output))
}
- pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
+ pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
use rmp::decode;
fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}
- let mut bytes = Bytes::new(bytes);
+ let mut bytes = Bytes::new(&bytes.0);
let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
@@ -147,10 +150,89 @@ impl HistoryStore {
self.push_record(record).await
}
+
+ pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
+ // Atm this loads all history into memory
+ // Not ideal as that is potentially quite a lot, although history will be small.
+ let records = self.store.all_tagged(HISTORY_TAG).await?;
+ let mut ret = Vec::with_capacity(records.len());
+
+ for record in records.into_iter() {
+ let hist = match record.version.as_str() {
+ HISTORY_VERSION => {
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
+ }
+ version => bail!("unknown history version {version:?}"),
+ }?;
+
+ ret.push(hist);
+ }
+
+ Ok(ret)
+ }
+
+ pub async fn build(&self, database: &dyn Database) -> Result<()> {
+ // I'd like to change how we rebuild and not couple this with the database, but need to
+ // consider the structure more deeply. This will be easy to change.
+
+ // TODO(ellie): page or iterate this
+ let history = self.history().await?;
+
+ // In theory we could flatten this here
+ // The current issue is that the database may have history in it already, from the old sync
+ // This didn't actually delete old history
+ // If we're sure we have a DB only maintained by the new store, we can flatten
+ // create/delete before we even get to sqlite
+ let mut creates = Vec::new();
+ let mut deletes = Vec::new();
+
+ for i in history {
+ match i {
+ HistoryRecord::Create(h) => {
+ creates.push(h);
+ }
+ HistoryRecord::Delete(id) => {
+ deletes.push(id);
+ }
+ }
+ }
+
+ database.save_bulk(&creates).await?;
+ database.delete_rows(&deletes).await?;
+
+ Ok(())
+ }
+
+ pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
+ for id in ids {
+ let record = self.store.get(*id).await?;
+
+ if record.tag != HISTORY_TAG {
+ continue;
+ }
+
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
+
+ match record {
+ HistoryRecord::Create(h) => {
+ // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
+ database.save(&h).await?;
+ }
+ HistoryRecord::Delete(id) => {
+ database.delete_rows(&[id]).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
#[cfg(test)]
mod tests {
+ use atuin_common::record::DecryptedData;
use time::macros::datetime;
use crate::history::{store::HistoryRecord, HISTORY_VERSION};
@@ -187,13 +269,14 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
// check the snapshot too
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
@@ -208,12 +291,13 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
}
diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs
index a5c156d6..efe2eb4a 100644
--- a/atuin-client/src/record/store.rs
+++ b/atuin-client/src/record/store.rs
@@ -2,6 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus};
+
/// A record store stores records
/// In more detail - we tend to need to process this into _another_ format to actually query it.
/// As is, the record store is intended as the source of truth for arbitratry data, which could
@@ -44,8 +45,6 @@ pub trait Store {
async fn status(&self) -> Result<RecordStatus>;
- /// Get every start record for a given tag, regardless of host.
- /// Useful when actually operating on synchronized data, and will often have conflict
- /// resolution applied.
+ /// Get all records for a given tag
async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>;
}
diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs
index 2694e0ff..19b8dd1b 100644
--- a/atuin-client/src/record/sync.rs
+++ b/atuin-client/src/record/sync.rs
@@ -7,7 +7,7 @@ use thiserror::Error;
use super::store::Store;
use crate::{api_client::Client, settings::Settings};
-use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus};
+use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus};
#[derive(Error, Debug)]
pub enum SyncError {
@@ -198,11 +198,12 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
-) -> Result<i64, SyncError> {
+) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
+ let mut ret = Vec::new();
println!(
"Downloading {} records from {}/{}",
@@ -230,6 +231,8 @@ async fn sync_download(
expected
);
+ ret.extend(page.iter().map(|f| f.id));
+
progress += page.len() as u64;
if progress >= expected {
@@ -237,14 +240,14 @@ async fn sync_download(
}
}
- Ok(progress as i64)
+ Ok(ret)
}
pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
-) -> Result<(i64, i64), SyncError> {
+) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
&settings.session_token,
@@ -254,7 +257,7 @@ pub async fn sync_remote(
.expect("failed to create client");
let mut uploaded = 0;
- let mut downloaded = 0;
+ let mut downloaded = Vec::new();
// this can totally run in parallel, but lets get it working first
for i in operations {
@@ -271,9 +274,7 @@ pub async fn sync_remote(
tag,
local,
remote,
- } => {
- downloaded += sync_download(local_store, &client, host, tag, local, remote).await?
- }
+ } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?,
Operation::Noop { .. } => continue,
}