diff options
Diffstat (limited to 'atuin-client/src/record/sync.rs')
| -rw-r--r-- | atuin-client/src/record/sync.rs | 486 |
1 files changed, 311 insertions, 175 deletions
diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs index 56be0638..2694e0ff 100644 --- a/atuin-client/src/record/sync.rs +++ b/atuin-client/src/record/sync.rs @@ -1,27 +1,51 @@ // do a sync :O +use std::cmp::Ordering; + use eyre::Result; +use thiserror::Error; use super::store::Store; use crate::{api_client::Client, settings::Settings}; -use atuin_common::record::{Diff, HostId, RecordId, RecordIndex}; +use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus}; + +#[derive(Error, Debug)] +pub enum SyncError { + #[error("the local store is ahead of the remote, but for another host. has remote lost data?")] + LocalAheadOtherHost, + + #[error("an issue with the local database occured")] + LocalStoreError, + + #[error("something has gone wrong with the sync logic: {msg:?}")] + SyncLogicError { msg: String }, + + #[error("a request to the sync server failed")] + RemoteRequestError, +} #[derive(Debug, Eq, PartialEq)] pub enum Operation { - // Either upload or download until the tail matches the below + // Either upload or download until the states matches the below Upload { - tail: RecordId, + local: RecordIdx, + remote: Option<RecordIdx>, host: HostId, tag: String, }, Download { - tail: RecordId, + local: Option<RecordIdx>, + remote: RecordIdx, + host: HostId, + tag: String, + }, + Noop { host: HostId, tag: String, }, } -pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<(Vec<Diff>, RecordIndex)> { +pub async fn diff(settings: &Settings, store: &impl Store) -> Result<(Vec<Diff>, RecordStatus)> { let client = Client::new( &settings.sync_address, &settings.session_token, @@ -29,8 +53,8 @@ pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<(Vec<Di settings.network_timeout, )?; - let local_index = store.tail_records().await?; - let remote_index = client.record_index().await?; + let local_index = store.status().await?; + let remote_index = client.record_status().await?; let diff = local_index.diff(&remote_index); @@ -41,39 +65,57 @@ pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<(Vec<Di // With the store as context, we can determine if a tail exists locally or not and therefore if it needs uploading or download. // In theory this could be done as a part of the diffing stage, but it's easier to reason // about and test this way -pub async fn operations(diffs: Vec<Diff>, store: &impl Store) -> Result<Vec<Operation>> { +pub async fn operations( + diffs: Vec<Diff>, + _store: &impl Store, +) -> Result<Vec<Operation>, SyncError> { let mut operations = Vec::with_capacity(diffs.len()); for diff in diffs { - // First, try to fetch the tail - // If it exists locally, then that means we need to update the remote - // host until it has the same tail. Ie, upload. - // If it does not exist locally, that means remote is ahead of us. - // Therefore, we need to download until our local tail matches - let record = store.get(diff.tail).await; - - let op = if record.is_ok() { - // if local has the ID, then we should find the actual tail of this - // store, so we know what we need to update the remote to. - let tail = store - .tail(diff.host, diff.tag.as_str()) - .await? - .expect("failed to fetch last record, expected tag/host to exist"); - - // TODO(ellie) update the diffing so that it stores the context of the current tail - // that way, we can determine how much we need to upload. - // For now just keep uploading until tails match + let op = match (diff.local, diff.remote) { + // We both have it! Could be either. Compare. + (Some(local), Some(remote)) => match local.cmp(&remote) { + Ordering::Equal => Operation::Noop { + host: diff.host, + tag: diff.tag, + }, + Ordering::Greater => Operation::Upload { + local, + remote: Some(remote), + host: diff.host, + tag: diff.tag, + }, + Ordering::Less => Operation::Download { + local: Some(local), + remote, + host: diff.host, + tag: diff.tag, + }, + }, - Operation::Upload { - tail: tail.id, + // Remote has it, we don't. Gotta be download + (None, Some(remote)) => Operation::Download { + local: None, + remote, host: diff.host, tag: diff.tag, - } - } else { - Operation::Download { - tail: diff.tail, + }, + + // We have it, remote doesn't. Gotta be upload. + (Some(local), None) => Operation::Upload { + local, + remote: None, host: diff.host, tag: diff.tag, + }, + + // something is pretty fucked. + (None, None) => { + return Err(SyncError::SyncLogicError { + msg: String::from( + "diff has nothing for local or remote - (host, tag) does not exist", + ), + }) } }; @@ -86,149 +128,130 @@ pub async fn operations(diffs: Vec<Diff>, store: &impl Store) -> Result<Vec<Oper // with the same properties operations.sort_by_key(|op| match op { - Operation::Upload { tail, host, .. } => ("upload", *host, *tail), - Operation::Download { tail, host, .. } => ("download", *host, *tail), + Operation::Noop { host, tag } => (0, *host, tag.clone()), + + Operation::Upload { host, tag, .. } => (1, *host, tag.clone()), + + Operation::Download { host, tag, .. } => (2, *host, tag.clone()), }); Ok(operations) } async fn sync_upload( - store: &mut impl Store, - remote_index: &RecordIndex, + store: &impl Store, client: &Client<'_>, - op: (HostId, String, RecordId), -) -> Result<i64> { + host: HostId, + tag: String, + local: RecordIdx, + remote: Option<RecordIdx>, +) -> Result<i64, SyncError> { + let remote = remote.unwrap_or(0); + let expected = local - remote; let upload_page_size = 100; - let mut total = 0; - - // so. we have an upload operation, with the tail representing the state - // we want to get the remote to - let current_tail = remote_index.get(op.0, op.1.clone()); + let mut progress = 0; println!( - "Syncing local {:?}/{}/{:?}, remote has {:?}", - op.0, op.1, op.2, current_tail + "Uploading {} records to {}/{}", + expected, + host.0.as_simple(), + tag ); - let start = if let Some(current_tail) = current_tail { - current_tail - } else { - store - .head(op.0, op.1.as_str()) + // preload with the first entry if remote does not know of this store + loop { + let page = store + .next(host, tag.as_str(), remote + progress, upload_page_size) .await - .expect("failed to fetch host/tag head") - .expect("host/tag not in current index") - .id - }; + .map_err(|e| { + error!("failed to read upload page: {e:?}"); - debug!("starting push to remote from: {:?}", start); + SyncError::LocalStoreError + })?; - // we have the start point for sync. it is either the head of the store if - // the remote has no data for it, or the tail that the remote has - // we need to iterate from the remote tail, and keep going until - // remote tail = current local tail + client.post_records(&page).await.map_err(|e| { + error!("failed to post records: {e:?}"); - let mut record = if current_tail.is_some() { - let r = store.get(start).await.unwrap(); - store.next(&r).await? - } else { - Some(store.get(start).await.unwrap()) - }; + SyncError::RemoteRequestError + })?; - let mut buf = Vec::with_capacity(upload_page_size); - - while let Some(r) = record { - if buf.len() < upload_page_size { - buf.push(r.clone()); - } else { - client.post_records(&buf).await?; + println!( + "uploaded {} to remote, progress {}/{}", + page.len(), + progress, + expected + ); + progress += page.len() as u64; - // can we reset what we have? len = 0 but keep capacity - buf = Vec::with_capacity(upload_page_size); + if progress >= expected { + break; } - record = store.next(&r).await?; - - total += 1; } - if !buf.is_empty() { - client.post_records(&buf).await?; - } - - Ok(total) + Ok(progress as i64) } async fn sync_download( - store: &mut impl Store, - remote_index: &RecordIndex, + store: &impl Store, client: &Client<'_>, - op: (HostId, String, RecordId), -) -> Result<i64> { - // TODO(ellie): implement variable page sizing like on history sync - let download_page_size = 1000; + host: HostId, + tag: String, + local: Option<RecordIdx>, + remote: RecordIdx, +) -> Result<i64, SyncError> { + let local = local.unwrap_or(0); + let expected = remote - local; + let download_page_size = 100; + let mut progress = 0; - let mut total = 0; + println!( + "Downloading {} records from {}/{}", + expected, + host.0.as_simple(), + tag + ); - // We know that the remote is ahead of us, so let's keep downloading until both - // 1) The remote stops returning full pages - // 2) The tail equals what we expect - // - // If (1) occurs without (2), then something is wrong with our index calculation - // and we should bail. - let remote_tail = remote_index - .get(op.0, op.1.clone()) - .expect("remote index does not contain expected tail during download"); - let local_tail = store.tail(op.0, op.1.as_str()).await?; - // - // We expect that the operations diff will represent the desired state - // In this case, that contains the remote tail. - assert_eq!(remote_tail, op.2); + // preload with the first entry if remote does not know of this store + loop { + let page = client + .next_records(host, tag.clone(), local + progress, download_page_size) + .await + .map_err(|_| SyncError::RemoteRequestError)?; - println!("Downloading {:?}/{}/{:?} to local", op.0, op.1, op.2); + store + .push_batch(page.iter()) + .await + .map_err(|_| SyncError::LocalStoreError)?; - let mut records = client - .next_records( - op.0, - op.1.clone(), - local_tail.map(|r| r.id), - download_page_size, - ) - .await?; + println!( + "downloaded {} records from remote, progress {}/{}", + page.len(), + progress, + expected + ); - while !records.is_empty() { - total += std::cmp::min(download_page_size, records.len() as u64); - store.push_batch(records.iter()).await?; + progress += page.len() as u64; - if records.last().unwrap().id == remote_tail { + if progress >= expected { break; } - - records = client - .next_records( - op.0, - op.1.clone(), - records.last().map(|r| r.id), - download_page_size, - ) - .await?; } - Ok(total as i64) + Ok(progress as i64) } pub async fn sync_remote( operations: Vec<Operation>, - remote_index: &RecordIndex, - local_store: &mut impl Store, + local_store: &impl Store, settings: &Settings, -) -> Result<(i64, i64)> { +) -> Result<(i64, i64), SyncError> { let client = Client::new( &settings.sync_address, &settings.session_token, settings.network_connect_timeout, settings.network_timeout, - )?; + ) + .expect("failed to create client"); let mut uploaded = 0; let mut downloaded = 0; @@ -236,14 +259,23 @@ pub async fn sync_remote( // this can totally run in parallel, but lets get it working first for i in operations { match i { - Operation::Upload { tail, host, tag } => { - uploaded += - sync_upload(local_store, remote_index, &client, (host, tag, tail)).await? - } - Operation::Download { tail, host, tag } => { - downloaded += - sync_download(local_store, remote_index, &client, (host, tag, tail)).await? + Operation::Upload { + host, + tag, + local, + remote, + } => uploaded += sync_upload(local_store, &client, host, tag, local, remote).await?, + + Operation::Download { + host, + tag, + local, + remote, + } => { + downloaded += sync_download(local_store, &client, host, tag, local, remote).await? } + + Operation::Noop { .. } => continue, } } @@ -264,13 +296,16 @@ mod tests { fn test_record() -> Record<EncryptedData> { Record::builder() - .host(HostId(atuin_common::utils::uuid_v7())) + .host(atuin_common::record::Host::new(HostId( + atuin_common::utils::uuid_v7(), + ))) .version("v1".into()) .tag(atuin_common::utils::uuid_v7().simple().to_string()) .data(EncryptedData { data: String::new(), content_encryption_key: String::new(), }) + .idx(0) .build() } @@ -296,8 +331,8 @@ mod tests { remote_store.push(&i).await.unwrap(); } - let local_index = local_store.tail_records().await.unwrap(); - let remote_index = remote_store.tail_records().await.unwrap(); + let local_index = local_store.status().await.unwrap(); + let remote_index = remote_store.status().await.unwrap(); let diff = local_index.diff(&remote_index); @@ -320,9 +355,10 @@ mod tests { assert_eq!( operations[0], Operation::Upload { - host: record.host, + host: record.host.id, tag: record.tag, - tail: record.id + local: record.idx, + remote: None, } ); } @@ -333,12 +369,14 @@ mod tests { // another. One upload, one download let shared_record = test_record(); - let remote_ahead = test_record(); + let local_ahead = shared_record - .new_child(vec![1, 2, 3]) + .append(vec![1, 2, 3]) .encrypt::<PASETO_V4>(&[0; 32]); + assert_eq!(local_ahead.idx, 1); + let local = vec![shared_record.clone(), local_ahead.clone()]; // local knows about the already synced, and something newer in the same store let remote = vec![shared_record.clone(), remote_ahead.clone()]; // remote knows about the already-synced, and one new record in a new store @@ -350,15 +388,19 @@ mod tests { assert_eq!( operations, vec![ - Operation::Download { - tail: remote_ahead.id, - host: remote_ahead.host, - tag: remote_ahead.tag, - }, + // Or in otherwords, local is ahead by one Operation::Upload { - tail: local_ahead.id, - host: local_ahead.host, + host: local_ahead.host.id, tag: local_ahead.tag, + local: 1, + remote: Some(0), + }, + // Or in other words, remote knows of a record in an entirely new store (tag) + Operation::Download { + host: remote_ahead.host.id, + tag: remote_ahead.tag, + local: None, + remote: 0, }, ] ); @@ -371,66 +413,160 @@ mod tests { // One known only by remote let shared_record = test_record(); + let local_only = test_record(); + + let local_only_20 = test_record(); + let local_only_21 = local_only_20 + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + let local_only_22 = local_only_21 + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + let local_only_23 = local_only_22 + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); - let remote_known = test_record(); - let local_known = test_record(); + let remote_only = test_record(); + + let remote_only_20 = test_record(); + let remote_only_21 = remote_only_20 + .append(vec![2, 3, 2]) + .encrypt::<PASETO_V4>(&[0; 32]); + let remote_only_22 = remote_only_21 + .append(vec![2, 3, 2]) + .encrypt::<PASETO_V4>(&[0; 32]); + let remote_only_23 = remote_only_22 + .append(vec![2, 3, 2]) + .encrypt::<PASETO_V4>(&[0; 32]); + let remote_only_24 = remote_only_23 + .append(vec![2, 3, 2]) + .encrypt::<PASETO_V4>(&[0; 32]); let second_shared = test_record(); let second_shared_remote_ahead = second_shared - .new_child(vec![1, 2, 3]) + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + let second_shared_remote_ahead2 = second_shared_remote_ahead + .append(vec![1, 2, 3]) .encrypt::<PASETO_V4>(&[0; 32]); - let local_ahead = shared_record - .new_child(vec![1, 2, 3]) + let third_shared = test_record(); + let third_shared_local_ahead = third_shared + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + let third_shared_local_ahead2 = third_shared_local_ahead + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + + let fourth_shared = test_record(); + let fourth_shared_remote_ahead = fourth_shared + .append(vec![1, 2, 3]) + .encrypt::<PASETO_V4>(&[0; 32]); + let fourth_shared_remote_ahead2 = fourth_shared_remote_ahead + .append(vec![1, 2, 3]) .encrypt::<PASETO_V4>(&[0; 32]); let local = vec![ shared_record.clone(), second_shared.clone(), - local_known.clone(), - local_ahead.clone(), + third_shared.clone(), + fourth_shared.clone(), + fourth_shared_remote_ahead.clone(), + // single store, only local has it + local_only.clone(), + // bigger store, also only known by local + local_only_20.clone(), + local_only_21.clone(), + local_only_22.clone(), + local_only_23.clone(), + // another shared store, but local is ahead on this one + third_shared_local_ahead.clone(), + third_shared_local_ahead2.clone(), ]; let remote = vec![ + remote_only.clone(), + remote_only_20.clone(), + remote_only_21.clone(), + remote_only_22.clone(), + remote_only_23.clone(), + remote_only_24.clone(), shared_record.clone(), second_shared.clone(), + third_shared.clone(), second_shared_remote_ahead.clone(), - remote_known.clone(), + second_shared_remote_ahead2.clone(), + fourth_shared.clone(), + fourth_shared_remote_ahead.clone(), + fourth_shared_remote_ahead2.clone(), ]; // remote knows about the already-synced, and one new record in a new store let (store, diff) = build_test_diff(local, remote).await; let operations = sync::operations(diff, &store).await.unwrap(); - assert_eq!(operations.len(), 4); + assert_eq!(operations.len(), 7); let mut result_ops = vec![ + // We started with a shared record, but the remote knows of two newer records in the + // same store + Operation::Download { + local: Some(0), + remote: 2, + host: second_shared_remote_ahead.host.id, + tag: second_shared_remote_ahead.tag, + }, + // We have a shared record, local knows of the first two but not the last + Operation::Download { + local: Some(1), + remote: 2, + host: fourth_shared_remote_ahead2.host.id, + tag: fourth_shared_remote_ahead2.tag, + }, + // Remote knows of a store with a single record that local does not have Operation::Download { - tail: remote_known.id, - host: remote_known.host, - tag: remote_known.tag, + local: None, + remote: 0, + host: remote_only.host.id, + tag: remote_only.tag, }, + // Remote knows of a store with a bunch of records that local does not have Operation::Download { - tail: second_shared_remote_ahead.id, - host: second_shared.host, - tag: second_shared.tag, + local: None, + remote: 4, + host: remote_only_20.host.id, + tag: remote_only_20.tag, }, + // Local knows of a record in a store that remote does not have Operation::Upload { - tail: local_ahead.id, - host: local_ahead.host, - tag: local_ahead.tag, + local: 0, + remote: None, + host: local_only.host.id, + tag: local_only.tag, }, + // Local knows of 4 records in a store that remote does not have Operation::Upload { - tail: local_known.id, - host: local_known.host, - tag: local_known.tag, + local: 3, + remote: None, + host: local_only_20.host.id, + tag: local_only_20.tag, + }, + // Local knows of 2 more records in a shared store that remote only has one of + Operation::Upload { + local: 2, + remote: Some(0), + host: third_shared.host.id, + tag: third_shared.tag, }, ]; result_ops.sort_by_key(|op| match op { - Operation::Upload { tail, host, .. } => ("upload", *host, *tail), - Operation::Download { tail, host, .. } => ("download", *host, *tail), + Operation::Noop { host, tag } => (0, *host, tag.clone()), + + Operation::Upload { host, tag, .. } => (1, *host, tag.clone()), + + Operation::Download { host, tag, .. } => (2, *host, tag.clone()), }); - assert_eq!(operations, result_ops); + assert_eq!(result_ops, operations); } } |
