From 34888827f8a06de835cbe5833a06914f28cce514 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Tue, 20 Apr 2021 17:07:11 +0100 Subject: Switch to Warp + SQLx, use async, switch to Rust stable (#36) * Switch to warp + sql, use async and stable rust * Update CI to use stable --- src/local/api_client.rs | 87 +++++++++++++++++++++++++------------------------ src/local/database.rs | 8 ++--- src/local/import.rs | 7 ++-- src/local/sync.rs | 36 +++++++++++--------- 4 files changed, 73 insertions(+), 65 deletions(-) (limited to 'src/local') diff --git a/src/local/api_client.rs b/src/local/api_client.rs index 434c07ba..1b64a295 100644 --- a/src/local/api_client.rs +++ b/src/local/api_client.rs @@ -1,93 +1,94 @@ use chrono::Utc; use eyre::Result; -use reqwest::header::AUTHORIZATION; +use reqwest::header::{HeaderMap, AUTHORIZATION}; +use reqwest::Url; +use sodiumoxide::crypto::secretbox; -use crate::api::{AddHistoryRequest, CountResponse, ListHistoryResponse}; -use crate::local::encryption::{decrypt, load_key}; +use crate::api::{AddHistoryRequest, CountResponse, SyncHistoryResponse}; +use crate::local::encryption::decrypt; use crate::local::history::History; -use crate::settings::Settings; use crate::utils::hash_str; pub struct Client<'a> { - settings: &'a Settings, + sync_addr: &'a str, + token: &'a str, + key: secretbox::Key, + client: reqwest::Client, } impl<'a> Client<'a> { - pub const fn new(settings: &'a Settings) -> Self { - Client { settings } + pub fn new(sync_addr: &'a str, token: &'a str, key: secretbox::Key) -> Self { + Client { + sync_addr, + token, + key, + client: reqwest::Client::new(), + } } - pub fn count(&self) -> Result { - let url = format!("{}/sync/count", self.settings.local.sync_address); - let client = reqwest::blocking::Client::new(); + pub async fn count(&self) -> Result { + let url = format!("{}/sync/count", self.sync_addr); + let url = Url::parse(url.as_str())?; + let token = format!("Token {}", self.token); + let token = token.parse()?; - let resp = client - .get(url) - .header( - AUTHORIZATION, - format!("Token {}", self.settings.local.session_token), - ) - .send()?; + let mut headers = HeaderMap::new(); + headers.insert(AUTHORIZATION, token); + + let resp = self.client.get(url).headers(headers).send().await?; - let count = resp.json::()?; + let count = resp.json::().await?; Ok(count.count) } - pub fn get_history( + pub async fn get_history( &self, sync_ts: chrono::DateTime, history_ts: chrono::DateTime, host: Option, ) -> Result> { - let key = load_key(self.settings)?; - let host = match host { None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())), Some(h) => h, }; - // this allows for syncing between users on the same machine let url = format!( "{}/sync/history?sync_ts={}&history_ts={}&host={}", - self.settings.local.sync_address, - sync_ts.to_rfc3339(), - history_ts.to_rfc3339(), + self.sync_addr, + urlencoding::encode(sync_ts.to_rfc3339().as_str()), + urlencoding::encode(history_ts.to_rfc3339().as_str()), host, ); - let client = reqwest::blocking::Client::new(); - let resp = client + let resp = self + .client .get(url) - .header( - AUTHORIZATION, - format!("Token {}", self.settings.local.session_token), - ) - .send()?; + .header(AUTHORIZATION, format!("Token {}", self.token)) + .send() + .await?; - let history = resp.json::()?; + let history = resp.json::().await?; let history = history .history .iter() .map(|h| serde_json::from_str(h).expect("invalid base64")) - .map(|h| decrypt(&h, &key).expect("failed to decrypt history! check your key")) + .map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key")) .collect(); Ok(history) } - pub fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> { - let client = reqwest::blocking::Client::new(); + pub async fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> { + let url = format!("{}/history", self.sync_addr); + let url = Url::parse(url.as_str())?; - let url = format!("{}/history", self.settings.local.sync_address); - client + self.client .post(url) .json(history) - .header( - AUTHORIZATION, - format!("Token {}", self.settings.local.session_token), - ) - .send()?; + .header(AUTHORIZATION, format!("Token {}", self.token)) + .send() + .await?; Ok(()) } diff --git a/src/local/database.rs b/src/local/database.rs index 977f11cc..abc22bb8 100644 --- a/src/local/database.rs +++ b/src/local/database.rs @@ -215,9 +215,9 @@ impl Database for Sqlite { } fn before(&self, timestamp: chrono::DateTime, count: i64) -> Result> { - let mut stmt = self.conn.prepare( - "SELECT * FROM history where timestamp <= ? order by timestamp desc limit ?", - )?; + let mut stmt = self + .conn + .prepare("SELECT * FROM history where timestamp < ? order by timestamp desc limit ?")?; let history_iter = stmt.query_map(params![timestamp.timestamp_nanos(), count], |row| { history_from_sqlite_row(None, row) @@ -236,7 +236,7 @@ impl Database for Sqlite { fn prefix_search(&self, query: &str) -> Result> { self.query( - "select * from history where command like ?1 || '%' order by timestamp asc", + "select * from history where command like ?1 || '%' order by timestamp asc limit 1000", &[query], ) } diff --git a/src/local/import.rs b/src/local/import.rs index d0f679c9..3b0b2a69 100644 --- a/src/local/import.rs +++ b/src/local/import.rs @@ -7,6 +7,7 @@ use std::{fs::File, path::Path}; use chrono::prelude::*; use chrono::Utc; use eyre::{eyre, Result}; +use itertools::Itertools; use super::history::History; @@ -42,8 +43,8 @@ impl Zsh { fn parse_extended(line: &str, counter: i64) -> History { let line = line.replacen(": ", "", 2); - let (time, duration) = line.split_once(':').unwrap(); - let (duration, command) = duration.split_once(';').unwrap(); + let (time, duration) = line.splitn(2, ':').collect_tuple().unwrap(); + let (duration, command) = duration.splitn(2, ';').collect_tuple().unwrap(); let time = time .parse::() @@ -60,7 +61,7 @@ fn parse_extended(line: &str, counter: i64) -> History { time, command.trim_end().to_string(), String::from("unknown"), - -1, + 0, // assume 0, we have no way of knowing :( duration, None, None, diff --git a/src/local/sync.rs b/src/local/sync.rs index c22d2f27..e0feb759 100644 --- a/src/local/sync.rs +++ b/src/local/sync.rs @@ -20,12 +20,12 @@ use crate::{api::AddHistoryRequest, utils::hash_str}; // Check if remote has things we don't, and if so, download them. // Returns (num downloaded, total local) -fn sync_download( +async fn sync_download( force: bool, - client: &api_client::Client, - db: &mut impl Database, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), ) -> Result<(i64, i64)> { - let remote_count = client.count()?; + let remote_count = client.count().await?; let initial_local = db.history_count()?; let mut local_count = initial_local; @@ -41,7 +41,9 @@ fn sync_download( let host = if force { Some(String::from("")) } else { None }; while remote_count > local_count { - let page = client.get_history(last_sync, last_timestamp, host.clone())?; + let page = client + .get_history(last_sync, last_timestamp, host.clone()) + .await?; if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() { break; @@ -71,13 +73,13 @@ fn sync_download( } // Check if we have things remote doesn't, and if so, upload them -fn sync_upload( +async fn sync_upload( settings: &Settings, _force: bool, - client: &api_client::Client, - db: &mut impl Database, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), ) -> Result<()> { - let initial_remote_count = client.count()?; + let initial_remote_count = client.count().await?; let mut remote_count = initial_remote_count; let local_count = db.history_count()?; @@ -111,21 +113,25 @@ fn sync_upload( } // anything left over outside of the 100 block size - client.post_history(&buffer)?; + client.post_history(&buffer).await?; cursor = buffer.last().unwrap().timestamp; - remote_count = client.count()?; + remote_count = client.count().await?; } Ok(()) } -pub fn sync(settings: &Settings, force: bool, db: &mut impl Database) -> Result<()> { - let client = api_client::Client::new(settings); +pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { + let client = api_client::Client::new( + settings.local.sync_address.as_str(), + settings.local.session_token.as_str(), + load_key(settings)?, + ); - sync_upload(settings, force, &client, db)?; + sync_upload(settings, force, &client, db).await?; - let download = sync_download(force, &client, db)?; + let download = sync_download(force, &client, db).await?; debug!("sync downloaded {}", download.0); -- cgit v1.3.1