aboutsummaryrefslogtreecommitdiffstats
path: root/src/local
diff options
context:
space:
mode:
authorEllie Huxtable <e@elm.sh>2021-04-20 17:07:11 +0100
committerGitHub <noreply@github.com>2021-04-20 16:07:11 +0000
commit34888827f8a06de835cbe5833a06914f28cce514 (patch)
tree8b56f20e50065cd2c222d5e8e067ec55cf1947a1 /src/local
parentOptimise docker (#34) (diff)
downloadatuin-34888827f8a06de835cbe5833a06914f28cce514.zip
Switch to Warp + SQLx, use async, switch to Rust stable (#36)
* Switch to warp + sql, use async and stable rust * Update CI to use stable
Diffstat (limited to 'src/local')
-rw-r--r--src/local/api_client.rs87
-rw-r--r--src/local/database.rs8
-rw-r--r--src/local/import.rs7
-rw-r--r--src/local/sync.rs36
4 files changed, 73 insertions, 65 deletions
diff --git a/src/local/api_client.rs b/src/local/api_client.rs
index 434c07ba..1b64a295 100644
--- a/src/local/api_client.rs
+++ b/src/local/api_client.rs
@@ -1,93 +1,94 @@
use chrono::Utc;
use eyre::Result;
-use reqwest::header::AUTHORIZATION;
+use reqwest::header::{HeaderMap, AUTHORIZATION};
+use reqwest::Url;
+use sodiumoxide::crypto::secretbox;
-use crate::api::{AddHistoryRequest, CountResponse, ListHistoryResponse};
-use crate::local::encryption::{decrypt, load_key};
+use crate::api::{AddHistoryRequest, CountResponse, SyncHistoryResponse};
+use crate::local::encryption::decrypt;
use crate::local::history::History;
-use crate::settings::Settings;
use crate::utils::hash_str;
pub struct Client<'a> {
- settings: &'a Settings,
+ sync_addr: &'a str,
+ token: &'a str,
+ key: secretbox::Key,
+ client: reqwest::Client,
}
impl<'a> Client<'a> {
- pub const fn new(settings: &'a Settings) -> Self {
- Client { settings }
+ pub fn new(sync_addr: &'a str, token: &'a str, key: secretbox::Key) -> Self {
+ Client {
+ sync_addr,
+ token,
+ key,
+ client: reqwest::Client::new(),
+ }
}
- pub fn count(&self) -> Result<i64> {
- let url = format!("{}/sync/count", self.settings.local.sync_address);
- let client = reqwest::blocking::Client::new();
+ pub async fn count(&self) -> Result<i64> {
+ let url = format!("{}/sync/count", self.sync_addr);
+ let url = Url::parse(url.as_str())?;
+ let token = format!("Token {}", self.token);
+ let token = token.parse()?;
- let resp = client
- .get(url)
- .header(
- AUTHORIZATION,
- format!("Token {}", self.settings.local.session_token),
- )
- .send()?;
+ let mut headers = HeaderMap::new();
+ headers.insert(AUTHORIZATION, token);
+
+ let resp = self.client.get(url).headers(headers).send().await?;
- let count = resp.json::<CountResponse>()?;
+ let count = resp.json::<CountResponse>().await?;
Ok(count.count)
}
- pub fn get_history(
+ pub async fn get_history(
&self,
sync_ts: chrono::DateTime<Utc>,
history_ts: chrono::DateTime<Utc>,
host: Option<String>,
) -> Result<Vec<History>> {
- let key = load_key(self.settings)?;
-
let host = match host {
None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())),
Some(h) => h,
};
- // this allows for syncing between users on the same machine
let url = format!(
"{}/sync/history?sync_ts={}&history_ts={}&host={}",
- self.settings.local.sync_address,
- sync_ts.to_rfc3339(),
- history_ts.to_rfc3339(),
+ self.sync_addr,
+ urlencoding::encode(sync_ts.to_rfc3339().as_str()),
+ urlencoding::encode(history_ts.to_rfc3339().as_str()),
host,
);
- let client = reqwest::blocking::Client::new();
- let resp = client
+ let resp = self
+ .client
.get(url)
- .header(
- AUTHORIZATION,
- format!("Token {}", self.settings.local.session_token),
- )
- .send()?;
+ .header(AUTHORIZATION, format!("Token {}", self.token))
+ .send()
+ .await?;
- let history = resp.json::<ListHistoryResponse>()?;
+ let history = resp.json::<SyncHistoryResponse>().await?;
let history = history
.history
.iter()
.map(|h| serde_json::from_str(h).expect("invalid base64"))
- .map(|h| decrypt(&h, &key).expect("failed to decrypt history! check your key"))
+ .map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key"))
.collect();
Ok(history)
}
- pub fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> {
- let client = reqwest::blocking::Client::new();
+ pub async fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> {
+ let url = format!("{}/history", self.sync_addr);
+ let url = Url::parse(url.as_str())?;
- let url = format!("{}/history", self.settings.local.sync_address);
- client
+ self.client
.post(url)
.json(history)
- .header(
- AUTHORIZATION,
- format!("Token {}", self.settings.local.session_token),
- )
- .send()?;
+ .header(AUTHORIZATION, format!("Token {}", self.token))
+ .send()
+ .await?;
Ok(())
}
diff --git a/src/local/database.rs b/src/local/database.rs
index 977f11cc..abc22bb8 100644
--- a/src/local/database.rs
+++ b/src/local/database.rs
@@ -215,9 +215,9 @@ impl Database for Sqlite {
}
fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> {
- let mut stmt = self.conn.prepare(
- "SELECT * FROM history where timestamp <= ? order by timestamp desc limit ?",
- )?;
+ let mut stmt = self
+ .conn
+ .prepare("SELECT * FROM history where timestamp < ? order by timestamp desc limit ?")?;
let history_iter = stmt.query_map(params![timestamp.timestamp_nanos(), count], |row| {
history_from_sqlite_row(None, row)
@@ -236,7 +236,7 @@ impl Database for Sqlite {
fn prefix_search(&self, query: &str) -> Result<Vec<History>> {
self.query(
- "select * from history where command like ?1 || '%' order by timestamp asc",
+ "select * from history where command like ?1 || '%' order by timestamp asc limit 1000",
&[query],
)
}
diff --git a/src/local/import.rs b/src/local/import.rs
index d0f679c9..3b0b2a69 100644
--- a/src/local/import.rs
+++ b/src/local/import.rs
@@ -7,6 +7,7 @@ use std::{fs::File, path::Path};
use chrono::prelude::*;
use chrono::Utc;
use eyre::{eyre, Result};
+use itertools::Itertools;
use super::history::History;
@@ -42,8 +43,8 @@ impl Zsh {
fn parse_extended(line: &str, counter: i64) -> History {
let line = line.replacen(": ", "", 2);
- let (time, duration) = line.split_once(':').unwrap();
- let (duration, command) = duration.split_once(';').unwrap();
+ let (time, duration) = line.splitn(2, ':').collect_tuple().unwrap();
+ let (duration, command) = duration.splitn(2, ';').collect_tuple().unwrap();
let time = time
.parse::<i64>()
@@ -60,7 +61,7 @@ fn parse_extended(line: &str, counter: i64) -> History {
time,
command.trim_end().to_string(),
String::from("unknown"),
- -1,
+ 0, // assume 0, we have no way of knowing :(
duration,
None,
None,
diff --git a/src/local/sync.rs b/src/local/sync.rs
index c22d2f27..e0feb759 100644
--- a/src/local/sync.rs
+++ b/src/local/sync.rs
@@ -20,12 +20,12 @@ use crate::{api::AddHistoryRequest, utils::hash_str};
// Check if remote has things we don't, and if so, download them.
// Returns (num downloaded, total local)
-fn sync_download(
+async fn sync_download(
force: bool,
- client: &api_client::Client,
- db: &mut impl Database,
+ client: &api_client::Client<'_>,
+ db: &mut (impl Database + Send),
) -> Result<(i64, i64)> {
- let remote_count = client.count()?;
+ let remote_count = client.count().await?;
let initial_local = db.history_count()?;
let mut local_count = initial_local;
@@ -41,7 +41,9 @@ fn sync_download(
let host = if force { Some(String::from("")) } else { None };
while remote_count > local_count {
- let page = client.get_history(last_sync, last_timestamp, host.clone())?;
+ let page = client
+ .get_history(last_sync, last_timestamp, host.clone())
+ .await?;
if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() {
break;
@@ -71,13 +73,13 @@ fn sync_download(
}
// Check if we have things remote doesn't, and if so, upload them
-fn sync_upload(
+async fn sync_upload(
settings: &Settings,
_force: bool,
- client: &api_client::Client,
- db: &mut impl Database,
+ client: &api_client::Client<'_>,
+ db: &mut (impl Database + Send),
) -> Result<()> {
- let initial_remote_count = client.count()?;
+ let initial_remote_count = client.count().await?;
let mut remote_count = initial_remote_count;
let local_count = db.history_count()?;
@@ -111,21 +113,25 @@ fn sync_upload(
}
// anything left over outside of the 100 block size
- client.post_history(&buffer)?;
+ client.post_history(&buffer).await?;
cursor = buffer.last().unwrap().timestamp;
- remote_count = client.count()?;
+ remote_count = client.count().await?;
}
Ok(())
}
-pub fn sync(settings: &Settings, force: bool, db: &mut impl Database) -> Result<()> {
- let client = api_client::Client::new(settings);
+pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
+ let client = api_client::Client::new(
+ settings.local.sync_address.as_str(),
+ settings.local.session_token.as_str(),
+ load_key(settings)?,
+ );
- sync_upload(settings, force, &client, db)?;
+ sync_upload(settings, force, &client, db).await?;
- let download = sync_download(force, &client, db)?;
+ let download = sync_download(force, &client, db).await?;
debug!("sync downloaded {}", download.0);