From 97e24d0d41bb743833e457de5ba49c5c233eb3b3 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Fri, 14 Jul 2023 20:44:08 +0100 Subject: Add new sync (#1093) * Add record migration * Add database functions for inserting history No real tests yet :( I would like to avoid running postgres lol * Add index handler, use UUIDs not strings * Fix a bunch of tests, remove Option * Add tests, all passing * Working upload sync * Record downloading works * Sync download works * Don't waste requests * Use a page size for uploads, make it variable later * Aaaaaand they're encrypted now too * Add cek * Allow reading tail across hosts * Revert "Allow reading tail across hosts" Not like that This reverts commit 7b0c72e7e050c358172f9b53cbd21b9e44cf4931. * Handle multiple shards properly * format * Format and make clippy happy * use some fancy types (#1098) * use some fancy types * fmt * Goodbye horrible tuple * Update atuin-server-postgres/migrations/20230623070418_records.sql Co-authored-by: Conrad Ludgate * fmt * Sort tests too because time sucks * fix features --------- Co-authored-by: Conrad Ludgate --- atuin-server-postgres/src/lib.rs | 102 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) (limited to 'atuin-server-postgres/src/lib.rs') diff --git a/atuin-server-postgres/src/lib.rs b/atuin-server-postgres/src/lib.rs index 0dc51daf..404188b0 100644 --- a/atuin-server-postgres/src/lib.rs +++ b/atuin-server-postgres/src/lib.rs @@ -1,14 +1,14 @@ use async_trait::async_trait; +use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIndex}; use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User}; use atuin_server_database::{Database, DbError, DbResult}; use futures_util::TryStreamExt; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; - use sqlx::Row; use tracing::instrument; -use wrappers::{DbHistory, DbSession, DbUser}; +use wrappers::{DbHistory, DbRecord, DbSession, DbUser}; mod wrappers; @@ -329,4 +329,102 @@ impl Database for Postgres { .map_err(fix_error) .map(|DbHistory(h)| h) } + + #[instrument(skip_all)] + async fn add_records(&self, user: &User, records: &[Record]) -> DbResult<()> { + let mut tx = self.pool.begin().await.map_err(fix_error)?; + + for i in records { + let id = atuin_common::utils::uuid_v7(); + + sqlx::query( + "insert into records + (id, client_id, host, parent, timestamp, version, tag, data, cek, user_id) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + on conflict do nothing + ", + ) + .bind(id) + .bind(i.id) + .bind(i.host) + .bind(i.parent) + .bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time + .bind(&i.version) + .bind(&i.tag) + .bind(&i.data.data) + .bind(&i.data.content_encryption_key) + .bind(user.id) + .execute(&mut tx) + .await + .map_err(fix_error)?; + } + + tx.commit().await.map_err(fix_error)?; + + Ok(()) + } + + #[instrument(skip_all)] + async fn next_records( + &self, + user: &User, + host: HostId, + tag: String, + start: Option, + count: u64, + ) -> DbResult>> { + tracing::debug!("{:?} - {:?} - {:?}", host, tag, start); + let mut ret = Vec::with_capacity(count as usize); + let mut parent = start; + + // yeah let's do something better + for _ in 0..count { + // a very much not ideal query. but it's simple at least? + // we are basically using postgres as a kv store here, so... maybe consider using an actual + // kv store? + let record: Result = sqlx::query_as( + "select client_id, host, parent, timestamp, version, tag, data, cek from records + where user_id = $1 + and tag = $2 + and host = $3 + and parent is not distinct from $4", + ) + .bind(user.id) + .bind(tag.clone()) + .bind(host) + .bind(parent) + .fetch_one(&self.pool) + .await + .map_err(fix_error); + + match record { + Ok(record) => { + let record: Record = record.into(); + ret.push(record.clone()); + + parent = Some(record.id); + } + Err(DbError::NotFound) => { + tracing::debug!("hit tail of store: {:?}/{}", host, tag); + return Ok(ret); + } + Err(e) => return Err(e), + } + } + + Ok(ret) + } + + async fn tail_records(&self, user: &User) -> DbResult { + const TAIL_RECORDS_SQL: &str = "select host, tag, client_id from records rp where (select count(1) from records where parent=rp.client_id and user_id = $1) = 0;"; + + let res = sqlx::query_as(TAIL_RECORDS_SQL) + .bind(user.id) + .fetch(&self.pool) + .try_collect() + .await + .map_err(fix_error)?; + + Ok(res) + } } -- cgit v1.3.1