diff options
| author | Ellie Huxtable <e@elm.sh> | 2021-04-20 21:53:07 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-04-20 20:53:07 +0000 |
| commit | a21737e2b7f8d1e426726bdd7536033f299d476a (patch) | |
| tree | e940afdff9c145d25d9a2895fd44a77d70719a2e /src/local | |
| parent | Switch to Warp + SQLx, use async, switch to Rust stable (#36) (diff) | |
| download | atuin-a21737e2b7f8d1e426726bdd7536033f299d476a.zip | |
Use cargo workspaces (#37)
* Switch to Cargo workspaces
Breaking things into "client", "server" and "common" makes managing the
codebase much easier!
client - anything running on a user's machine for adding history
server - handles storing/syncing history and running a HTTP server
common - request/response API definitions, common utils, etc
* Update dockerfile
Diffstat (limited to 'src/local')
| -rw-r--r-- | src/local/api_client.rs | 95 | ||||
| -rw-r--r-- | src/local/database.rs | 272 | ||||
| -rw-r--r-- | src/local/encryption.rs | 108 | ||||
| -rw-r--r-- | src/local/history.rs | 66 | ||||
| -rw-r--r-- | src/local/import.rs | 176 | ||||
| -rw-r--r-- | src/local/mod.rs | 6 | ||||
| -rw-r--r-- | src/local/sync.rs | 141 |
7 files changed, 0 insertions, 864 deletions
diff --git a/src/local/api_client.rs b/src/local/api_client.rs deleted file mode 100644 index 1b64a295..00000000 --- a/src/local/api_client.rs +++ /dev/null @@ -1,95 +0,0 @@ -use chrono::Utc; -use eyre::Result; -use reqwest::header::{HeaderMap, AUTHORIZATION}; -use reqwest::Url; -use sodiumoxide::crypto::secretbox; - -use crate::api::{AddHistoryRequest, CountResponse, SyncHistoryResponse}; -use crate::local::encryption::decrypt; -use crate::local::history::History; -use crate::utils::hash_str; - -pub struct Client<'a> { - sync_addr: &'a str, - token: &'a str, - key: secretbox::Key, - client: reqwest::Client, -} - -impl<'a> Client<'a> { - pub fn new(sync_addr: &'a str, token: &'a str, key: secretbox::Key) -> Self { - Client { - sync_addr, - token, - key, - client: reqwest::Client::new(), - } - } - - pub async fn count(&self) -> Result<i64> { - let url = format!("{}/sync/count", self.sync_addr); - let url = Url::parse(url.as_str())?; - let token = format!("Token {}", self.token); - let token = token.parse()?; - - let mut headers = HeaderMap::new(); - headers.insert(AUTHORIZATION, token); - - let resp = self.client.get(url).headers(headers).send().await?; - - let count = resp.json::<CountResponse>().await?; - - Ok(count.count) - } - - pub async fn get_history( - &self, - sync_ts: chrono::DateTime<Utc>, - history_ts: chrono::DateTime<Utc>, - host: Option<String>, - ) -> Result<Vec<History>> { - let host = match host { - None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())), - Some(h) => h, - }; - - let url = format!( - "{}/sync/history?sync_ts={}&history_ts={}&host={}", - self.sync_addr, - urlencoding::encode(sync_ts.to_rfc3339().as_str()), - urlencoding::encode(history_ts.to_rfc3339().as_str()), - host, - ); - - let resp = self - .client - .get(url) - .header(AUTHORIZATION, format!("Token {}", self.token)) - .send() - .await?; - - let history = resp.json::<SyncHistoryResponse>().await?; - let history = history - .history - .iter() - .map(|h| serde_json::from_str(h).expect("invalid base64")) - .map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key")) - .collect(); - - Ok(history) - } - - pub async fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> { - let url = format!("{}/history", self.sync_addr); - let url = Url::parse(url.as_str())?; - - self.client - .post(url) - .json(history) - .header(AUTHORIZATION, format!("Token {}", self.token)) - .send() - .await?; - - Ok(()) - } -} diff --git a/src/local/database.rs b/src/local/database.rs deleted file mode 100644 index abc22bb8..00000000 --- a/src/local/database.rs +++ /dev/null @@ -1,272 +0,0 @@ -use chrono::prelude::*; -use chrono::Utc; -use std::path::Path; - -use eyre::Result; - -use rusqlite::{params, Connection}; -use rusqlite::{Params, Transaction}; - -use super::history::History; - -pub trait Database { - fn save(&mut self, h: &History) -> Result<()>; - fn save_bulk(&mut self, h: &[History]) -> Result<()>; - - fn load(&self, id: &str) -> Result<History>; - fn list(&self) -> Result<Vec<History>>; - fn range(&self, from: chrono::DateTime<Utc>, to: chrono::DateTime<Utc>) - -> Result<Vec<History>>; - - fn query(&self, query: &str, params: impl Params) -> Result<Vec<History>>; - fn update(&self, h: &History) -> Result<()>; - fn history_count(&self) -> Result<i64>; - - fn first(&self) -> Result<History>; - fn last(&self) -> Result<History>; - fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>; - - fn prefix_search(&self, query: &str) -> Result<Vec<History>>; -} - -// Intended for use on a developer machine and not a sync server. -// TODO: implement IntoIterator -pub struct Sqlite { - conn: Connection, -} - -impl Sqlite { - pub fn new(path: impl AsRef<Path>) -> Result<Self> { - let path = path.as_ref(); - debug!("opening sqlite database at {:?}", path); - - let create = !path.exists(); - if create { - if let Some(dir) = path.parent() { - std::fs::create_dir_all(dir)?; - } - } - - let conn = Connection::open(path)?; - - Self::setup_db(&conn)?; - - Ok(Self { conn }) - } - - fn setup_db(conn: &Connection) -> Result<()> { - debug!("running sqlite database setup"); - - conn.execute( - "create table if not exists history ( - id text primary key, - timestamp integer not null, - duration integer not null, - exit integer not null, - command text not null, - cwd text not null, - session text not null, - hostname text not null, - - unique(timestamp, cwd, command) - )", - [], - )?; - - conn.execute( - "create table if not exists history_encrypted ( - id text primary key, - data blob not null - )", - [], - )?; - - Ok(()) - } - - fn save_raw(tx: &Transaction, h: &History) -> Result<()> { - tx.execute( - "insert or ignore into history ( - id, - timestamp, - duration, - exit, - command, - cwd, - session, - hostname - ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", - params![ - h.id, - h.timestamp.timestamp_nanos(), - h.duration, - h.exit, - h.command, - h.cwd, - h.session, - h.hostname - ], - )?; - - Ok(()) - } -} - -impl Database for Sqlite { - fn save(&mut self, h: &History) -> Result<()> { - debug!("saving history to sqlite"); - - let tx = self.conn.transaction()?; - Self::save_raw(&tx, h)?; - tx.commit()?; - - Ok(()) - } - - fn save_bulk(&mut self, h: &[History]) -> Result<()> { - debug!("saving history to sqlite"); - - let tx = self.conn.transaction()?; - for i in h { - Self::save_raw(&tx, i)? - } - tx.commit()?; - - Ok(()) - } - - fn load(&self, id: &str) -> Result<History> { - debug!("loading history item"); - - let mut stmt = self.conn.prepare( - "select id, timestamp, duration, exit, command, cwd, session, hostname from history - where id = ?1", - )?; - - let history = stmt.query_row(params![id], |row| { - history_from_sqlite_row(Some(id.to_string()), row) - })?; - - Ok(history) - } - - fn update(&self, h: &History) -> Result<()> { - debug!("updating sqlite history"); - - self.conn.execute( - "update history - set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8 - where id = ?1", - params![h.id, h.timestamp.timestamp_nanos(), h.duration, h.exit, h.command, h.cwd, h.session, h.hostname], - )?; - - Ok(()) - } - - fn list(&self) -> Result<Vec<History>> { - debug!("listing history"); - - let mut stmt = self - .conn - .prepare("SELECT * FROM history order by timestamp asc")?; - - let history_iter = stmt.query_map(params![], |row| history_from_sqlite_row(None, row))?; - - Ok(history_iter.filter_map(Result::ok).collect()) - } - - fn range( - &self, - from: chrono::DateTime<Utc>, - to: chrono::DateTime<Utc>, - ) -> Result<Vec<History>> { - debug!("listing history from {:?} to {:?}", from, to); - - let mut stmt = self.conn.prepare( - "SELECT * FROM history where timestamp >= ?1 and timestamp <= ?2 order by timestamp asc", - )?; - - let history_iter = stmt.query_map( - params![from.timestamp_nanos(), to.timestamp_nanos()], - |row| history_from_sqlite_row(None, row), - )?; - - Ok(history_iter.filter_map(Result::ok).collect()) - } - - fn first(&self) -> Result<History> { - let mut stmt = self - .conn - .prepare("SELECT * FROM history order by timestamp asc limit 1")?; - - let history = stmt.query_row(params![], |row| history_from_sqlite_row(None, row))?; - - Ok(history) - } - - fn last(&self) -> Result<History> { - let mut stmt = self - .conn - .prepare("SELECT * FROM history order by timestamp desc limit 1")?; - - let history = stmt.query_row(params![], |row| history_from_sqlite_row(None, row))?; - - Ok(history) - } - - fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> { - let mut stmt = self - .conn - .prepare("SELECT * FROM history where timestamp < ? order by timestamp desc limit ?")?; - - let history_iter = stmt.query_map(params![timestamp.timestamp_nanos(), count], |row| { - history_from_sqlite_row(None, row) - })?; - - Ok(history_iter.filter_map(Result::ok).collect()) - } - - fn query(&self, query: &str, params: impl Params) -> Result<Vec<History>> { - let mut stmt = self.conn.prepare(query)?; - - let history_iter = stmt.query_map(params, |row| history_from_sqlite_row(None, row))?; - - Ok(history_iter.filter_map(Result::ok).collect()) - } - - fn prefix_search(&self, query: &str) -> Result<Vec<History>> { - self.query( - "select * from history where command like ?1 || '%' order by timestamp asc limit 1000", - &[query], - ) - } - - fn history_count(&self) -> Result<i64> { - let res: i64 = - self.conn - .query_row_and_then("select count(1) from history;", params![], |row| row.get(0))?; - - Ok(res) - } -} - -fn history_from_sqlite_row( - id: Option<String>, - row: &rusqlite::Row, -) -> Result<History, rusqlite::Error> { - let id = match id { - Some(id) => id, - None => row.get(0)?, - }; - - Ok(History { - id, - timestamp: Utc.timestamp_nanos(row.get(1)?), - duration: row.get(2)?, - exit: row.get(3)?, - command: row.get(4)?, - cwd: row.get(5)?, - session: row.get(6)?, - hostname: row.get(7)?, - }) -} diff --git a/src/local/encryption.rs b/src/local/encryption.rs deleted file mode 100644 index 3c1699e3..00000000 --- a/src/local/encryption.rs +++ /dev/null @@ -1,108 +0,0 @@ -// The general idea is that we NEVER send cleartext history to the server -// This way the odds of anything private ending up where it should not are -// very low -// The server authenticates via the usual username and password. This has -// nothing to do with the encryption, and is purely authentication! The client -// generates its own secret key, and encrypts all shell history with libsodium's -// secretbox. The data is then sent to the server, where it is stored. All -// clients must share the secret in order to be able to sync, as it is needed -// to decrypt - -use std::fs::File; -use std::io::prelude::*; -use std::path::PathBuf; - -use eyre::{eyre, Result}; -use sodiumoxide::crypto::secretbox; - -use crate::local::history::History; -use crate::settings::Settings; - -#[derive(Debug, Serialize, Deserialize)] -pub struct EncryptedHistory { - pub ciphertext: Vec<u8>, - pub nonce: secretbox::Nonce, -} - -// Loads the secret key, will create + save if it doesn't exist -pub fn load_key(settings: &Settings) -> Result<secretbox::Key> { - let path = settings.local.key_path.as_str(); - - if PathBuf::from(path).exists() { - let bytes = std::fs::read(path)?; - let key: secretbox::Key = rmp_serde::from_read_ref(&bytes)?; - Ok(key) - } else { - let key = secretbox::gen_key(); - let buf = rmp_serde::to_vec(&key)?; - - let mut file = File::create(path)?; - file.write_all(&buf)?; - - Ok(key) - } -} - -pub fn encrypt(history: &History, key: &secretbox::Key) -> Result<EncryptedHistory> { - // serialize with msgpack - let buf = rmp_serde::to_vec(history)?; - - let nonce = secretbox::gen_nonce(); - - let ciphertext = secretbox::seal(&buf, &nonce, key); - - Ok(EncryptedHistory { ciphertext, nonce }) -} - -pub fn decrypt(encrypted_history: &EncryptedHistory, key: &secretbox::Key) -> Result<History> { - let plaintext = secretbox::open(&encrypted_history.ciphertext, &encrypted_history.nonce, key) - .map_err(|_| eyre!("failed to open secretbox - invalid key?"))?; - - let history = rmp_serde::from_read_ref(&plaintext)?; - - Ok(history) -} - -#[cfg(test)] -mod test { - use sodiumoxide::crypto::secretbox; - - use crate::local::history::History; - - use super::{decrypt, encrypt}; - - #[test] - fn test_encrypt_decrypt() { - let key1 = secretbox::gen_key(); - let key2 = secretbox::gen_key(); - - let history = History::new( - chrono::Utc::now(), - "ls".to_string(), - "/home/ellie".to_string(), - 0, - 1, - Some("beep boop".to_string()), - Some("booop".to_string()), - ); - - let e1 = encrypt(&history, &key1).unwrap(); - let e2 = encrypt(&history, &key2).unwrap(); - - assert_ne!(e1.ciphertext, e2.ciphertext); - assert_ne!(e1.nonce, e2.nonce); - - // test decryption works - // this should pass - match decrypt(&e1, &key1) { - Err(e) => assert!(false, "failed to decrypt, got {}", e), - Ok(h) => assert_eq!(h, history), - }; - - // this should err - match decrypt(&e2, &key1) { - Ok(_) => assert!(false, "expected an error decrypting with invalid key"), - Err(_) => {} - }; - } -} diff --git a/src/local/history.rs b/src/local/history.rs deleted file mode 100644 index 1712f8b9..00000000 --- a/src/local/history.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::env; -use std::hash::{Hash, Hasher}; - -use chrono::Utc; - -use crate::command::uuid_v4; - -// Any new fields MUST be Optional<>! -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct History { - pub id: String, - pub timestamp: chrono::DateTime<Utc>, - pub duration: i64, - pub exit: i64, - pub command: String, - pub cwd: String, - pub session: String, - pub hostname: String, -} - -impl History { - pub fn new( - timestamp: chrono::DateTime<Utc>, - command: String, - cwd: String, - exit: i64, - duration: i64, - session: Option<String>, - hostname: Option<String>, - ) -> Self { - let session = session - .or_else(|| env::var("ATUIN_SESSION").ok()) - .unwrap_or_else(uuid_v4); - let hostname = - hostname.unwrap_or_else(|| format!("{}:{}", whoami::hostname(), whoami::username())); - - Self { - id: uuid_v4(), - timestamp, - command, - cwd, - exit, - duration, - session, - hostname, - } - } -} - -impl PartialEq for History { - // for the sakes of listing unique history only, we do not care about - // anything else - // obviously this does not refer to the *same* item of history, but when - // we only render the command, it looks the same - fn eq(&self, other: &Self) -> bool { - self.command == other.command - } -} - -impl Eq for History {} - -impl Hash for History { - fn hash<H: Hasher>(&self, state: &mut H) { - self.command.hash(state); - } -} diff --git a/src/local/import.rs b/src/local/import.rs deleted file mode 100644 index 3b0b2a69..00000000 --- a/src/local/import.rs +++ /dev/null @@ -1,176 +0,0 @@ -// import old shell history! -// automatically hoover up all that we can find - -use std::io::{BufRead, BufReader, Seek, SeekFrom}; -use std::{fs::File, path::Path}; - -use chrono::prelude::*; -use chrono::Utc; -use eyre::{eyre, Result}; -use itertools::Itertools; - -use super::history::History; - -#[derive(Debug)] -pub struct Zsh { - file: BufReader<File>, - - pub loc: u64, - pub counter: i64, -} - -// this could probably be sped up -fn count_lines(buf: &mut BufReader<File>) -> Result<usize> { - let lines = buf.lines().count(); - buf.seek(SeekFrom::Start(0))?; - - Ok(lines) -} - -impl Zsh { - pub fn new(path: impl AsRef<Path>) -> Result<Self> { - let file = File::open(path)?; - let mut buf = BufReader::new(file); - let loc = count_lines(&mut buf)?; - - Ok(Self { - file: buf, - loc: loc as u64, - counter: 0, - }) - } -} - -fn parse_extended(line: &str, counter: i64) -> History { - let line = line.replacen(": ", "", 2); - let (time, duration) = line.splitn(2, ':').collect_tuple().unwrap(); - let (duration, command) = duration.splitn(2, ';').collect_tuple().unwrap(); - - let time = time - .parse::<i64>() - .unwrap_or_else(|_| chrono::Utc::now().timestamp()); - - let offset = chrono::Duration::milliseconds(counter); - let time = Utc.timestamp(time, 0); - let time = time + offset; - - let duration = duration.parse::<i64>().map_or(-1, |t| t * 1_000_000_000); - - // use nanos, because why the hell not? we won't display them. - History::new( - time, - command.trim_end().to_string(), - String::from("unknown"), - 0, // assume 0, we have no way of knowing :( - duration, - None, - None, - ) -} - -impl Zsh { - fn read_line(&mut self) -> Option<Result<String>> { - let mut line = String::new(); - - match self.file.read_line(&mut line) { - Ok(0) => None, - Ok(_) => Some(Ok(line)), - Err(e) => Some(Err(eyre!("failed to read line: {}", e))), // we can skip past things like invalid utf8 - } - } -} - -impl Iterator for Zsh { - type Item = Result<History>; - - fn next(&mut self) -> Option<Self::Item> { - // ZSH extended history records the timestamp + command duration - // These lines begin with : - // So, if the line begins with :, parse it. Otherwise it's just - // the command - let line = self.read_line()?; - - if let Err(e) = line { - return Some(Err(e)); // :( - } - - let mut line = line.unwrap(); - - while line.ends_with("\\\n") { - let next_line = self.read_line()?; - - if next_line.is_err() { - // There's a chance that the last line of a command has invalid - // characters, the only safe thing to do is break :/ - // usually just invalid utf8 or smth - // however, we really need to avoid missing history, so it's - // better to have some items that should have been part of - // something else, than to miss things. So break. - break; - } - - line.push_str(next_line.unwrap().as_str()); - } - - // We have to handle the case where a line has escaped newlines. - // Keep reading until we have a non-escaped newline - - let extended = line.starts_with(':'); - - if extended { - self.counter += 1; - Some(Ok(parse_extended(line.as_str(), self.counter))) - } else { - let time = chrono::Utc::now(); - let offset = chrono::Duration::seconds(self.counter); - let time = time - offset; - - self.counter += 1; - - Some(Ok(History::new( - time, - line.trim_end().to_string(), - String::from("unknown"), - -1, - -1, - None, - None, - ))) - } - } -} - -#[cfg(test)] -mod test { - use chrono::prelude::*; - use chrono::Utc; - - use super::parse_extended; - - #[test] - fn test_parse_extended_simple() { - let parsed = parse_extended(": 1613322469:0;cargo install atuin", 0); - - assert_eq!(parsed.command, "cargo install atuin"); - assert_eq!(parsed.duration, 0); - assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); - - let parsed = parse_extended(": 1613322469:10;cargo install atuin;cargo update", 0); - - assert_eq!(parsed.command, "cargo install atuin;cargo update"); - assert_eq!(parsed.duration, 10_000_000_000); - assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); - - let parsed = parse_extended(": 1613322469:10;cargo :b̷i̶t̴r̵o̴t̴ ̵i̷s̴ ̷r̶e̵a̸l̷", 0); - - assert_eq!(parsed.command, "cargo :b̷i̶t̴r̵o̴t̴ ̵i̷s̴ ̷r̶e̵a̸l̷"); - assert_eq!(parsed.duration, 10_000_000_000); - assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); - - let parsed = parse_extended(": 1613322469:10;cargo install \\n atuin\n", 0); - - assert_eq!(parsed.command, "cargo install \\n atuin"); - assert_eq!(parsed.duration, 10_000_000_000); - assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); - } -} diff --git a/src/local/mod.rs b/src/local/mod.rs deleted file mode 100644 index 9fe31292..00000000 --- a/src/local/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod api_client; -pub mod database; -pub mod encryption; -pub mod history; -pub mod import; -pub mod sync; diff --git a/src/local/sync.rs b/src/local/sync.rs deleted file mode 100644 index e0feb759..00000000 --- a/src/local/sync.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::convert::TryInto; - -use chrono::prelude::*; -use eyre::Result; - -use crate::local::api_client; -use crate::local::database::Database; -use crate::local::encryption::{encrypt, load_key}; -use crate::settings::{Local, Settings, HISTORY_PAGE_SIZE}; -use crate::{api::AddHistoryRequest, utils::hash_str}; - -// Currently sync is kinda naive, and basically just pages backwards through -// history. This means newly added stuff shows up properly! We also just use -// the total count in each database to indicate whether a sync is needed. -// I think this could be massively improved! If we had a way of easily -// indicating count per time period (hour, day, week, year, etc) then we can -// easily pinpoint where we are missing data and what needs downloading. Start -// with year, then find the week, then the day, then the hour, then download it -// all! The current naive approach will do for now. - -// Check if remote has things we don't, and if so, download them. -// Returns (num downloaded, total local) -async fn sync_download( - force: bool, - client: &api_client::Client<'_>, - db: &mut (impl Database + Send), -) -> Result<(i64, i64)> { - let remote_count = client.count().await?; - - let initial_local = db.history_count()?; - let mut local_count = initial_local; - - let mut last_sync = if force { - Utc.timestamp_millis(0) - } else { - Local::last_sync()? - }; - - let mut last_timestamp = Utc.timestamp_millis(0); - - let host = if force { Some(String::from("")) } else { None }; - - while remote_count > local_count { - let page = client - .get_history(last_sync, last_timestamp, host.clone()) - .await?; - - if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() { - break; - } - - db.save_bulk(&page)?; - - local_count = db.history_count()?; - - let page_last = page - .last() - .expect("could not get last element of page") - .timestamp; - - // in the case of a small sync frequency, it's possible for history to - // be "lost" between syncs. In this case we need to rewind the sync - // timestamps - if page_last == last_timestamp { - last_timestamp = Utc.timestamp_millis(0); - last_sync = last_sync - chrono::Duration::hours(1); - } else { - last_timestamp = page_last; - } - } - - Ok((local_count - initial_local, local_count)) -} - -// Check if we have things remote doesn't, and if so, upload them -async fn sync_upload( - settings: &Settings, - _force: bool, - client: &api_client::Client<'_>, - db: &mut (impl Database + Send), -) -> Result<()> { - let initial_remote_count = client.count().await?; - let mut remote_count = initial_remote_count; - - let local_count = db.history_count()?; - - let key = load_key(settings)?; // encryption key - - // first just try the most recent set - - let mut cursor = Utc::now(); - - while local_count > remote_count { - let last = db.before(cursor, HISTORY_PAGE_SIZE)?; - let mut buffer = Vec::<AddHistoryRequest>::new(); - - if last.is_empty() { - break; - } - - for i in last { - let data = encrypt(&i, &key)?; - let data = serde_json::to_string(&data)?; - - let add_hist = AddHistoryRequest { - id: i.id, - timestamp: i.timestamp, - data, - hostname: hash_str(i.hostname.as_str()), - }; - - buffer.push(add_hist); - } - - // anything left over outside of the 100 block size - client.post_history(&buffer).await?; - cursor = buffer.last().unwrap().timestamp; - - remote_count = client.count().await?; - } - - Ok(()) -} - -pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { - let client = api_client::Client::new( - settings.local.sync_address.as_str(), - settings.local.session_token.as_str(), - load_key(settings)?, - ); - - sync_upload(settings, force, &client, db).await?; - - let download = sync_download(force, &client, db).await?; - - debug!("sync downloaded {}", download.0); - - Local::save_sync_time()?; - - Ok(()) -} |
