From 7bc6ccdd70422f8fc763e2fd814a481bc79ce7b5 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Fri, 5 Jan 2024 17:57:49 +0000 Subject: feat: rework record sync for improved reliability (#1478) * feat: rework record sync for improved reliability So, to tell a story 1. We introduced the record sync, intended to be the new algorithm to sync history. 2. On top of this, I added the KV store. This was intended as a simple test of the record sync, and to see if people wanted that sort of functionality 3. History remained syncing via the old means, as while it had issues it worked more-or-less OK. And we are aware of its flaws 4. If KV syncing worked ok, history would be moved across KV syncing ran ok for 6mo or so, so I started to move across history. For several weeks, I ran a local fork of Atuin + the server that synced via records instead. The record store maintained ordering via a linked list, which was a mistake. It performed well in testing, but was really difficult to debug and reason about. So when a few small sync issues occured, they took an extremely long time to debug. This PR is huge, which I regret. It involves replacing the "parent" relationship that records once had (pointing to the previous record) with a simple index (generally referred to as idx). This also means we had to change the recordindex, which referenced "tails". Tails were the last item in the chain. Now that we use an "array" vs linked list, that logic was also replaced. And is much simpler :D Same for the queries that act on this data. ---- This isn't final - we still need to add 1. Proper server/client error handling, which has been lacking for a while 2. The actual history implementation on top This exists in a branch, just without deletions. Won't be much to add that, I just don't want to make this any larger than it already is The _only_ caveat here is that we basically lose data synced via the old record store. This is the KV data from before. It hasn't been deleted or anything, just no longer hooked up. So it's totally possible to write a migration script. I just need to do that. * update .gitignore * use correct endpoint * fix for stores with length of 1 * use create/delete enum for history store * lint, remove unneeded host_id * remove prints * add command to import old history * add enable/disable switch for record sync * add record sync to auto sync * satisfy the almighty clippy * remove file that I did not mean to commit * feedback --- atuin-client/src/kv.rs | 100 +++++++++++++++++++++---------------------------- 1 file changed, 42 insertions(+), 58 deletions(-) (limited to 'atuin-client/src/kv.rs') diff --git a/atuin-client/src/kv.rs b/atuin-client/src/kv.rs index 1ca6b5e8..cee7063d 100644 --- a/atuin-client/src/kv.rs +++ b/atuin-client/src/kv.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use atuin_common::record::{DecryptedData, HostId}; +use atuin_common::record::{DecryptedData, Host, HostId}; use eyre::{bail, ensure, eyre, Result}; use serde::Deserialize; @@ -89,7 +89,7 @@ impl KvStore { pub async fn set( &self, - store: &mut (impl Store + Send + Sync), + store: &(impl Store + Send + Sync), encryption_key: &[u8; 32], host_id: HostId, namespace: &str, @@ -111,13 +111,16 @@ impl KvStore { let bytes = record.serialize()?; - let parent = store.tail(host_id, KV_TAG).await?.map(|entry| entry.id); + let idx = store + .last(host_id, KV_TAG) + .await? + .map_or(0, |entry| entry.idx + 1); let record = atuin_common::record::Record::builder() - .host(host_id) + .host(Host::new(host_id)) .version(KV_VERSION.to_string()) .tag(KV_TAG.to_string()) - .parent(parent) + .idx(idx) .data(bytes) .build(); @@ -137,43 +140,18 @@ impl KvStore { namespace: &str, key: &str, ) -> Result> { - // Currently, this is O(n). When we have an actual KV store, it can be better - // Just a poc for now! + // TODO: don't rebuild every time... + let map = self.build_kv(store, encryption_key).await?; - // iterate records to find the value we want - // start at the end, so we get the most recent version - let tails = store.tag_tails(KV_TAG).await?; + let res = map.get(namespace); - if tails.is_empty() { - return Ok(None); - } - - // first, decide on a record. - // try getting the newest first - // we always need a way of deciding the "winner" of a write - // TODO(ellie): something better than last-write-wins, what if two write at the same time? - let mut record = tails.iter().max_by_key(|r| r.timestamp).unwrap().clone(); - - loop { - let decrypted = match record.version.as_str() { - KV_VERSION => record.decrypt::(encryption_key)?, - version => bail!("unknown version {version:?}"), - }; - - let kv = KvRecord::deserialize(&decrypted.data, &decrypted.version)?; - if kv.key == key && kv.namespace == namespace { - return Ok(Some(kv)); - } + if let Some(ns) = res { + let value = ns.get(key); - if let Some(parent) = decrypted.parent { - record = store.get(parent).await?; - } else { - break; - } + Ok(value.cloned()) + } else { + Ok(None) } - - // if we get here, then... we didn't find the record with that key :( - Ok(None) } // Build a kv map out of the linked list kv store @@ -184,32 +162,30 @@ impl KvStore { &self, store: &impl Store, encryption_key: &[u8; 32], - ) -> Result>> { + ) -> Result>> { let mut map = BTreeMap::new(); - let tails = store.tag_tails(KV_TAG).await?; - - if tails.is_empty() { - return Ok(map); - } - let mut record = tails.iter().max_by_key(|r| r.timestamp).unwrap().clone(); + // TODO: maybe don't load the entire tag into memory to build the kv + // we can be smart about it and only load values since the last build + // or, iterate/paginate + let tagged = store.all_tagged(KV_TAG).await?; - loop { + // iterate through all tags and play each KV record at a time + // this is "last write wins" + // probably good enough for now, but revisit in future + for record in tagged { let decrypted = match record.version.as_str() { KV_VERSION => record.decrypt::(encryption_key)?, version => bail!("unknown version {version:?}"), }; - let kv = KvRecord::deserialize(&decrypted.data, &decrypted.version)?; + let kv = KvRecord::deserialize(&decrypted.data, KV_VERSION)?; - let ns = map.entry(kv.namespace).or_insert_with(BTreeMap::new); - ns.entry(kv.key).or_insert_with(|| kv.value); + let ns = map + .entry(kv.namespace.clone()) + .or_insert_with(BTreeMap::new); - if let Some(parent) = decrypted.parent { - record = store.get(parent).await?; - } else { - break; - } + ns.insert(kv.key.clone(), kv); } Ok(map) @@ -261,19 +237,27 @@ mod tests { let map = kv.build_kv(&store, &key).await.unwrap(); assert_eq!( - map.get("test-kv") + *map.get("test-kv") .expect("map namespace not set") .get("foo") .expect("map key not set"), - "bar" + KvRecord { + namespace: String::from("test-kv"), + key: String::from("foo"), + value: String::from("bar") + } ); assert_eq!( - map.get("test-kv") + *map.get("test-kv") .expect("map namespace not set") .get("1") .expect("map key not set"), - "2" + KvRecord { + namespace: String::from("test-kv"), + key: String::from("1"), + value: String::from("2") + } ); } } -- cgit v1.3.1