diff options
Diffstat (limited to 'atuin-client')
| -rw-r--r-- | atuin-client/migrations/20230315220114_drop-events.sql | 2 | ||||
| -rw-r--r-- | atuin-client/migrations/20230319185725_deleted_at.sql | 2 | ||||
| -rw-r--r-- | atuin-client/src/api_client.rs | 43 | ||||
| -rw-r--r-- | atuin-client/src/database.rs | 98 | ||||
| -rw-r--r-- | atuin-client/src/encryption.rs | 1 | ||||
| -rw-r--r-- | atuin-client/src/event.rs | 47 | ||||
| -rw-r--r-- | atuin-client/src/history.rs | 4 | ||||
| -rw-r--r-- | atuin-client/src/import/bash.rs | 1 | ||||
| -rw-r--r-- | atuin-client/src/import/fish.rs | 2 | ||||
| -rw-r--r-- | atuin-client/src/import/resh.rs | 1 | ||||
| -rw-r--r-- | atuin-client/src/import/zsh.rs | 2 | ||||
| -rw-r--r-- | atuin-client/src/import/zsh_histdb.rs | 1 | ||||
| -rw-r--r-- | atuin-client/src/lib.rs | 1 | ||||
| -rw-r--r-- | atuin-client/src/sync.rs | 31 |
14 files changed, 112 insertions, 124 deletions
diff --git a/atuin-client/migrations/20230315220114_drop-events.sql b/atuin-client/migrations/20230315220114_drop-events.sql new file mode 100644 index 00000000..fe3cae17 --- /dev/null +++ b/atuin-client/migrations/20230315220114_drop-events.sql @@ -0,0 +1,2 @@ +-- Add migration script here +drop table events; diff --git a/atuin-client/migrations/20230319185725_deleted_at.sql b/atuin-client/migrations/20230319185725_deleted_at.sql new file mode 100644 index 00000000..6c422abc --- /dev/null +++ b/atuin-client/migrations/20230319185725_deleted_at.sql @@ -0,0 +1,2 @@ +-- Add migration script here +alter table history add column deleted_at integer; diff --git a/atuin-client/src/api_client.rs b/atuin-client/src/api_client.rs index 44375c06..dee98613 100644 --- a/atuin-client/src/api_client.rs +++ b/atuin-client/src/api_client.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::collections::HashSet; use chrono::Utc; use eyre::{bail, Result}; @@ -9,8 +10,8 @@ use reqwest::{ use sodiumoxide::crypto::secretbox; use atuin_common::api::{ - AddHistoryRequest, CountResponse, ErrorResponse, IndexResponse, LoginRequest, LoginResponse, - RegisterResponse, SyncHistoryResponse, + AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse, + LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse, }; use semver::Version; @@ -138,11 +139,27 @@ impl<'a> Client<'a> { Ok(count.count) } + pub async fn status(&self) -> Result<StatusResponse> { + let url = format!("{}/sync/status", self.sync_addr); + let url = Url::parse(url.as_str())?; + + let resp = self.client.get(url).send().await?; + + if resp.status() != StatusCode::OK { + bail!("failed to get status (are you logged in?)"); + } + + let status = resp.json::<StatusResponse>().await?; + + Ok(status) + } + pub async fn get_history( &self, sync_ts: chrono::DateTime<Utc>, history_ts: chrono::DateTime<Utc>, host: Option<String>, + deleted: HashSet<String>, ) -> Result<Vec<History>> { let host = match host { None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())), @@ -163,8 +180,17 @@ impl<'a> Client<'a> { let history = history .history .iter() + // TODO: handle deletion earlier in this chain .map(|h| serde_json::from_str(h).expect("invalid base64")) .map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key")) + .map(|mut h| { + if deleted.contains(&h.id) { + h.deleted_at = Some(chrono::Utc::now()); + h.command = String::from(""); + } + + h + }) .collect(); Ok(history) @@ -178,4 +204,17 @@ impl<'a> Client<'a> { Ok(()) } + + pub async fn delete_history(&self, h: History) -> Result<()> { + let url = format!("{}/history", self.sync_addr); + let url = Url::parse(url.as_str())?; + + self.client + .delete(url) + .json(&DeleteHistoryRequest { client_id: h.id }) + .send() + .await?; + + Ok(()) + } } diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index b24020c0..a29dfa13 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -14,7 +14,6 @@ use sqlx::{ }; use super::{ - event::{Event, EventType}, history::History, ordering, settings::{FilterMode, SearchMode}, @@ -62,13 +61,14 @@ pub trait Database: Send + Sync { async fn update(&self, h: &History) -> Result<()>; async fn history_count(&self) -> Result<i64>; - async fn event_count(&self) -> Result<i64>; - async fn merge_events(&self) -> Result<i64>; async fn first(&self) -> Result<History>; async fn last(&self) -> Result<History>; async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>; + async fn delete(&self, mut h: History) -> Result<()>; + async fn deleted(&self) -> Result<Vec<History>>; + // Yes I know, it's a lot. // Could maybe break it down to a searchparams struct or smth but that feels a little... pointless. // Been debating maybe a DSL for search? eg "before:time limit:1 the query" @@ -126,31 +126,10 @@ impl Sqlite { Ok(()) } - async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> { - let event_type = match e.event_type { - EventType::Create => "create", - EventType::Delete => "delete", - }; - - sqlx::query( - "insert or ignore into events(id, timestamp, hostname, event_type, history_id) - values(?1, ?2, ?3, ?4, ?5)", - ) - .bind(e.id.as_str()) - .bind(e.timestamp.timestamp_nanos()) - .bind(e.hostname.as_str()) - .bind(event_type) - .bind(e.history_id.as_str()) - .execute(tx) - .await?; - - Ok(()) - } - async fn save_raw(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, h: &History) -> Result<()> { sqlx::query( - "insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname) - values(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + "insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname, deleted_at) + values(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", ) .bind(h.id.as_str()) .bind(h.timestamp.timestamp_nanos()) @@ -160,6 +139,7 @@ impl Sqlite { .bind(h.cwd.as_str()) .bind(h.session.as_str()) .bind(h.hostname.as_str()) + .bind(h.deleted_at.map(|t|t.timestamp_nanos())) .execute(tx) .await?; @@ -167,6 +147,8 @@ impl Sqlite { } fn query_history(row: SqliteRow) -> History { + let deleted_at: Option<i64> = row.get("deleted_at"); + History { id: row.get("id"), timestamp: Utc.timestamp_nanos(row.get("timestamp")), @@ -176,6 +158,7 @@ impl Sqlite { cwd: row.get("cwd"), session: row.get("session"), hostname: row.get("hostname"), + deleted_at: deleted_at.map(|t| Utc.timestamp_nanos(t)), } } } @@ -184,11 +167,8 @@ impl Sqlite { impl Database for Sqlite { async fn save(&mut self, h: &History) -> Result<()> { debug!("saving history to sqlite"); - let event = Event::new_create(h); - let mut tx = self.pool.begin().await?; Self::save_raw(&mut tx, h).await?; - Self::save_event(&mut tx, &event).await?; tx.commit().await?; Ok(()) @@ -200,9 +180,7 @@ impl Database for Sqlite { let mut tx = self.pool.begin().await?; for i in h { - let event = Event::new_create(i); Self::save_raw(&mut tx, i).await?; - Self::save_event(&mut tx, &event).await?; } tx.commit().await?; @@ -227,7 +205,7 @@ impl Database for Sqlite { sqlx::query( "update history - set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8 + set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8, deleted_at = ?9 where id = ?1", ) .bind(h.id.as_str()) @@ -238,6 +216,7 @@ impl Database for Sqlite { .bind(h.cwd.as_str()) .bind(h.session.as_str()) .bind(h.hostname.as_str()) + .bind(h.deleted_at.map(|t|t.timestamp_nanos())) .execute(&self.pool) .await?; @@ -338,49 +317,15 @@ impl Database for Sqlite { Ok(res) } - async fn event_count(&self) -> Result<i64> { - let res: i64 = sqlx::query_scalar("select count(1) from events") - .fetch_one(&self.pool) + async fn deleted(&self) -> Result<Vec<History>> { + let res = sqlx::query("select * from history where deleted_at is not null") + .map(Self::query_history) + .fetch_all(&self.pool) .await?; Ok(res) } - // Ensure that we have correctly merged the event log - async fn merge_events(&self) -> Result<i64> { - // Ensure that we do not have more history locally than we do events. - // We can think of history as the merged log of events. There should never be more history than - // events, and the only time this could happen is if someone is upgrading from an old Atuin version - // from before we stored events. - let history_count = self.history_count().await?; - let event_count = self.event_count().await?; - - if history_count > event_count { - // pass an empty context, because with global listing we don't care - let no_context = Context { - cwd: String::from(""), - session: String::from(""), - hostname: String::from(""), - }; - - // We're just gonna load everything into memory here. That sucks, I know, sorry. - // But also even if you have a LOT of history that should be fine, and we're only going to be doing this once EVER. - let all_the_history = self - .list(FilterMode::Global, &no_context, None, false) - .await?; - - let mut tx = self.pool.begin().await?; - for i in all_the_history.iter() { - // A CREATE for every single history item is to be expected. - let event = Event::new_create(i); - Self::save_event(&mut tx, &event).await?; - } - tx.commit().await?; - } - - Ok(0) - } - async fn history_count(&self) -> Result<i64> { let res: (i64,) = sqlx::query_as("select count(1) from history") .fetch_one(&self.pool) @@ -528,6 +473,18 @@ impl Database for Sqlite { Ok(res) } + + // deleted_at doesn't mean the actual time that the user deleted it, + // but the time that the system marks it as deleted + async fn delete(&self, mut h: History) -> Result<()> { + let now = chrono::Utc::now(); + h.command = String::from(""); // blank it + h.deleted_at = Some(now); // delete it + + self.update(&h).await?; // save it + + Ok(()) + } } #[cfg(test)] @@ -585,6 +542,7 @@ mod test { 1, Some("beep boop".to_string()), Some("booop".to_string()), + None, ); db.save(&history).await } diff --git a/atuin-client/src/encryption.rs b/atuin-client/src/encryption.rs index bb017b74..c718d4da 100644 --- a/atuin-client/src/encryption.rs +++ b/atuin-client/src/encryption.rs @@ -124,6 +124,7 @@ mod test { 1, Some("beep boop".to_string()), Some("booop".to_string()), + None, ); let e1 = encrypt(&history, &key1).unwrap(); diff --git a/atuin-client/src/event.rs b/atuin-client/src/event.rs deleted file mode 100644 index 4e76c077..00000000 --- a/atuin-client/src/event.rs +++ /dev/null @@ -1,47 +0,0 @@ -use chrono::Utc; -use serde::{Deserialize, Serialize}; - -use crate::history::History; -use atuin_common::utils::uuid_v4; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum EventType { - Create, - Delete, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)] -pub struct Event { - pub id: String, - pub timestamp: chrono::DateTime<Utc>, - pub hostname: String, - pub event_type: EventType, - - pub history_id: String, -} - -impl Event { - pub fn new_create(history: &History) -> Event { - Event { - id: uuid_v4(), - timestamp: history.timestamp, - hostname: history.hostname.clone(), - event_type: EventType::Create, - - history_id: history.id.clone(), - } - } - - pub fn new_delete(history_id: &str) -> Event { - let hostname = format!("{}:{}", whoami::hostname(), whoami::username()); - - Event { - id: uuid_v4(), - timestamp: chrono::Utc::now(), - hostname, - event_type: EventType::Create, - - history_id: history_id.to_string(), - } - } -} diff --git a/atuin-client/src/history.rs b/atuin-client/src/history.rs index 9a26c95d..f3778612 100644 --- a/atuin-client/src/history.rs +++ b/atuin-client/src/history.rs @@ -16,9 +16,11 @@ pub struct History { pub cwd: String, pub session: String, pub hostname: String, + pub deleted_at: Option<chrono::DateTime<Utc>>, } impl History { + #[allow(clippy::too_many_arguments)] pub fn new( timestamp: chrono::DateTime<Utc>, command: String, @@ -27,6 +29,7 @@ impl History { duration: i64, session: Option<String>, hostname: Option<String>, + deleted_at: Option<chrono::DateTime<Utc>>, ) -> Self { let session = session .or_else(|| env::var("ATUIN_SESSION").ok()) @@ -43,6 +46,7 @@ impl History { duration, session, hostname, + deleted_at, } } diff --git a/atuin-client/src/import/bash.rs b/atuin-client/src/import/bash.rs index 520b49c8..bfba47de 100644 --- a/atuin-client/src/import/bash.rs +++ b/atuin-client/src/import/bash.rs @@ -78,6 +78,7 @@ impl Importer for Bash { -1, None, None, + None, ); h.push(entry).await?; next_timestamp += timestamp_increment; diff --git a/atuin-client/src/import/fish.rs b/atuin-client/src/import/fish.rs index 850814c7..7186537e 100644 --- a/atuin-client/src/import/fish.rs +++ b/atuin-client/src/import/fish.rs @@ -80,6 +80,7 @@ impl Importer for Fish { -1, None, None, + None, )) .await?; } @@ -115,6 +116,7 @@ impl Importer for Fish { -1, None, None, + None, )) .await?; } diff --git a/atuin-client/src/import/resh.rs b/atuin-client/src/import/resh.rs index 75487fee..41f54836 100644 --- a/atuin-client/src/import/resh.rs +++ b/atuin-client/src/import/resh.rs @@ -131,6 +131,7 @@ impl Importer for Resh { cwd: entry.pwd, session: uuid_v4(), hostname: entry.host, + deleted_at: None, }) .await?; } diff --git a/atuin-client/src/import/zsh.rs b/atuin-client/src/import/zsh.rs index 634ce9ae..50c64cb2 100644 --- a/atuin-client/src/import/zsh.rs +++ b/atuin-client/src/import/zsh.rs @@ -86,6 +86,7 @@ impl Importer for Zsh { -1, None, None, + None, )) .await?; } @@ -119,6 +120,7 @@ fn parse_extended(line: &str, counter: i64) -> History { duration, None, None, + None, ) } diff --git a/atuin-client/src/import/zsh_histdb.rs b/atuin-client/src/import/zsh_histdb.rs index b9bce34d..2f9a192d 100644 --- a/atuin-client/src/import/zsh_histdb.rs +++ b/atuin-client/src/import/zsh_histdb.rs @@ -80,6 +80,7 @@ impl From<HistDbEntry> for History { .trim_end() .to_string(), ), + None, ) } } diff --git a/atuin-client/src/lib.rs b/atuin-client/src/lib.rs index 37d1b0f0..497c5e74 100644 --- a/atuin-client/src/lib.rs +++ b/atuin-client/src/lib.rs @@ -11,7 +11,6 @@ pub mod encryption; pub mod sync; pub mod database; -pub mod event; pub mod history; pub mod import; pub mod ordering; diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs index 94ae24c4..1c0acaf8 100644 --- a/atuin-client/src/sync.rs +++ b/atuin-client/src/sync.rs @@ -1,4 +1,6 @@ +use std::collections::HashSet; use std::convert::TryInto; +use std::iter::FromIterator; use chrono::prelude::*; use eyre::Result; @@ -37,7 +39,11 @@ async fn sync_download( ) -> Result<(i64, i64)> { debug!("starting sync download"); - let remote_count = client.count().await?; + let remote_status = client.status().await?; + let remote_count = remote_status.count; + + // useful to ensure we don't even save something that hasn't yet been synced + deleted + let remote_deleted = HashSet::from_iter(remote_status.deleted.clone()); let initial_local = db.history_count().await?; let mut local_count = initial_local; @@ -54,7 +60,12 @@ async fn sync_download( while remote_count > local_count { let page = client - .get_history(last_sync, last_timestamp, host.clone()) + .get_history( + last_sync, + last_timestamp, + host.clone(), + remote_deleted.clone(), + ) .await?; db.save_bulk(&page).await?; @@ -81,6 +92,13 @@ async fn sync_download( } } + for i in remote_status.deleted { + // we will update the stored history to have this data + // pretty much everything can be nullified + let h = db.load(i.as_str()).await?; + db.delete(h).await?; + } + Ok((local_count - initial_local, local_count)) } @@ -136,12 +154,17 @@ async fn sync_upload( debug!("upload cursor: {:?}", cursor); } + let deleted = db.deleted().await?; + + for i in deleted { + info!("deleting {} on remote", i.id); + client.delete_history(i).await?; + } + Ok(()) } pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { - db.merge_events().await?; - let client = api_client::Client::new( &settings.sync_address, &settings.session_token, |
