diff options
| author | Ellie Huxtable <ellie@elliehuxtable.com> | 2024-01-05 17:57:49 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-05 17:57:49 +0000 |
| commit | 7bc6ccdd70422f8fc763e2fd814a481bc79ce7b5 (patch) | |
| tree | a1c064a7c7394d261711c6e046d4c60791e6cf6f /atuin-server | |
| parent | fix: Prevent input to be interpreted as options for zsh autosuggestions (#1506) (diff) | |
| download | atuin-7bc6ccdd70422f8fc763e2fd814a481bc79ce7b5.zip | |
feat: rework record sync for improved reliability (#1478)
* feat: rework record sync for improved reliability
So, to tell a story
1. We introduced the record sync, intended to be the new algorithm to
sync history.
2. On top of this, I added the KV store. This was intended as a simple
test of the record sync, and to see if people wanted that sort of
functionality
3. History remained syncing via the old means, as while it had issues it
worked more-or-less OK. And we are aware of its flaws
4. If KV syncing worked ok, history would be moved across
KV syncing ran ok for 6mo or so, so I started to move across history.
For several weeks, I ran a local fork of Atuin + the server that synced
via records instead.
The record store maintained ordering via a linked list, which was a
mistake. It performed well in testing, but was really difficult to debug
and reason about. So when a few small sync issues occured, they took an
extremely long time to debug.
This PR is huge, which I regret. It involves replacing the "parent"
relationship that records once had (pointing to the previous record)
with a simple index (generally referred to as idx). This also means we
had to change the recordindex, which referenced "tails". Tails were the
last item in the chain.
Now that we use an "array" vs linked list, that logic was also replaced.
And is much simpler :D
Same for the queries that act on this data.
----
This isn't final - we still need to add
1. Proper server/client error handling, which has been lacking for a
while
2. The actual history implementation on top
This exists in a branch, just without deletions. Won't be much to
add that, I just don't want to make this any larger than it already
is
The _only_ caveat here is that we basically lose data synced via the old
record store. This is the KV data from before.
It hasn't been deleted or anything, just no longer hooked up. So it's
totally possible to write a migration script. I just need to do that.
* update .gitignore
* use correct endpoint
* fix for stores with length of 1
* use create/delete enum for history store
* lint, remove unneeded host_id
* remove prints
* add command to import old history
* add enable/disable switch for record sync
* add record sync to auto sync
* satisfy the almighty clippy
* remove file that I did not mean to commit
* feedback
Diffstat (limited to '')
| -rw-r--r-- | atuin-server-database/src/lib.rs | 6 | ||||
| -rw-r--r-- | atuin-server-postgres/migrations/20231202170508_create-store.sql | 15 | ||||
| -rw-r--r-- | atuin-server-postgres/migrations/20231203124112_create-store-idx.sql | 2 | ||||
| -rw-r--r-- | atuin-server-postgres/src/lib.rs | 92 | ||||
| -rw-r--r-- | atuin-server-postgres/src/wrappers.rs | 7 | ||||
| -rw-r--r-- | atuin-server/src/handlers/mod.rs | 1 | ||||
| -rw-r--r-- | atuin-server/src/handlers/record.rs | 107 | ||||
| -rw-r--r-- | atuin-server/src/handlers/v0/mod.rs | 1 | ||||
| -rw-r--r-- | atuin-server/src/handlers/v0/record.rs | 111 | ||||
| -rw-r--r-- | atuin-server/src/router.rs | 11 |
10 files changed, 216 insertions, 137 deletions
diff --git a/atuin-server-database/src/lib.rs b/atuin-server-database/src/lib.rs index d529655e..9b154ea1 100644 --- a/atuin-server-database/src/lib.rs +++ b/atuin-server-database/src/lib.rs @@ -14,7 +14,7 @@ use self::{ models::{History, NewHistory, NewSession, NewUser, Session, User}, }; use async_trait::async_trait; -use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIndex}; +use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus}; use serde::{de::DeserializeOwned, Serialize}; use time::{Date, Duration, Month, OffsetDateTime, Time, UtcOffset}; use tracing::instrument; @@ -68,12 +68,12 @@ pub trait Database: Sized + Clone + Send + Sync + 'static { user: &User, host: HostId, tag: String, - start: Option<RecordId>, + start: Option<RecordIdx>, count: u64, ) -> DbResult<Vec<Record<EncryptedData>>>; // Return the tail record ID for each store, so (HostID, Tag, TailRecordID) - async fn tail_records(&self, user: &User) -> DbResult<RecordIndex>; + async fn status(&self, user: &User) -> DbResult<RecordStatus>; async fn count_history_range(&self, user: &User, range: Range<OffsetDateTime>) -> DbResult<i64>; diff --git a/atuin-server-postgres/migrations/20231202170508_create-store.sql b/atuin-server-postgres/migrations/20231202170508_create-store.sql new file mode 100644 index 00000000..ffb57966 --- /dev/null +++ b/atuin-server-postgres/migrations/20231202170508_create-store.sql @@ -0,0 +1,15 @@ +-- Add migration script here +create table store ( + id uuid primary key, -- remember to use uuidv7 for happy indices <3 + client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key, even though it's fine mathematically + host uuid not null, -- a unique identifier for the host + idx bigint not null, -- the index of the record in this store, identified by (host, tag) + timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision + version text not null, + tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host + data text not null, -- store the actual history data, encrypted. I don't wanna know! + cek text not null, + + user_id bigint not null, -- allow multiple users + created_at timestamp not null default current_timestamp +); diff --git a/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql b/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql new file mode 100644 index 00000000..56d67145 --- /dev/null +++ b/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql @@ -0,0 +1,2 @@ +-- Add migration script here +create unique index record_uniq ON store(user_id, host, tag, idx); diff --git a/atuin-server-postgres/src/lib.rs b/atuin-server-postgres/src/lib.rs index f22e6bee..c1de4d50 100644 --- a/atuin-server-postgres/src/lib.rs +++ b/atuin-server-postgres/src/lib.rs @@ -1,7 +1,7 @@ use std::ops::Range; use async_trait::async_trait; -use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIndex}; +use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus}; use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User}; use atuin_server_database::{Database, DbError, DbResult}; use futures_util::TryStreamExt; @@ -11,6 +11,7 @@ use sqlx::Row; use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset}; use tracing::instrument; +use uuid::Uuid; use wrappers::{DbHistory, DbRecord, DbSession, DbUser}; mod wrappers; @@ -361,16 +362,16 @@ impl Database for Postgres { let id = atuin_common::utils::uuid_v7(); sqlx::query( - "insert into records - (id, client_id, host, parent, timestamp, version, tag, data, cek, user_id) + "insert into store + (id, client_id, host, idx, timestamp, version, tag, data, cek, user_id) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) on conflict do nothing ", ) .bind(id) .bind(i.id) - .bind(i.host) - .bind(i.parent) + .bind(i.host.id) + .bind(i.idx as i64) .bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time .bind(&i.version) .bind(&i.tag) @@ -393,62 +394,69 @@ impl Database for Postgres { user: &User, host: HostId, tag: String, - start: Option<RecordId>, + start: Option<RecordIdx>, count: u64, ) -> DbResult<Vec<Record<EncryptedData>>> { tracing::debug!("{:?} - {:?} - {:?}", host, tag, start); - let mut ret = Vec::with_capacity(count as usize); - let mut parent = start; + let start = start.unwrap_or(0); - // yeah let's do something better - for _ in 0..count { - // a very much not ideal query. but it's simple at least? - // we are basically using postgres as a kv store here, so... maybe consider using an actual - // kv store? - let record: Result<DbRecord, DbError> = sqlx::query_as( - "select client_id, host, parent, timestamp, version, tag, data, cek from records + let records: Result<Vec<DbRecord>, DbError> = sqlx::query_as( + "select client_id, host, idx, timestamp, version, tag, data, cek from store where user_id = $1 and tag = $2 and host = $3 - and parent is not distinct from $4", - ) - .bind(user.id) - .bind(tag.clone()) - .bind(host) - .bind(parent) - .fetch_one(&self.pool) - .await - .map_err(fix_error); + and idx >= $4 + order by idx asc + limit $5", + ) + .bind(user.id) + .bind(tag.clone()) + .bind(host) + .bind(start as i64) + .bind(count as i64) + .fetch_all(&self.pool) + .await + .map_err(fix_error); - match record { - Ok(record) => { - let record: Record<EncryptedData> = record.into(); - ret.push(record.clone()); + let ret = match records { + Ok(records) => { + let records: Vec<Record<EncryptedData>> = records + .into_iter() + .map(|f| { + let record: Record<EncryptedData> = f.into(); + record + }) + .collect(); - parent = Some(record.id); - } - Err(DbError::NotFound) => { - tracing::debug!("hit tail of store: {:?}/{}", host, tag); - return Ok(ret); - } - Err(e) => return Err(e), + records } - } + Err(DbError::NotFound) => { + tracing::debug!("no records found in store: {:?}/{}", host, tag); + return Ok(vec![]); + } + Err(e) => return Err(e), + }; Ok(ret) } - async fn tail_records(&self, user: &User) -> DbResult<RecordIndex> { - const TAIL_RECORDS_SQL: &str = "select host, tag, client_id from records rp where (select count(1) from records where parent=rp.client_id and user_id = $1) = 0 and user_id = $1;"; + async fn status(&self, user: &User) -> DbResult<RecordStatus> { + const STATUS_SQL: &str = + "select host, tag, max(idx) from store where user_id = $1 group by host, tag"; - let res = sqlx::query_as(TAIL_RECORDS_SQL) + let res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL) .bind(user.id) - .fetch(&self.pool) - .try_collect() + .fetch_all(&self.pool) .await .map_err(fix_error)?; - Ok(res) + let mut status = RecordStatus::new(); + + for i in res { + status.set_raw(HostId(i.0), i.1, i.2 as u64); + } + + Ok(status) } } diff --git a/atuin-server-postgres/src/wrappers.rs b/atuin-server-postgres/src/wrappers.rs index b4ae48ae..3ccf9c19 100644 --- a/atuin-server-postgres/src/wrappers.rs +++ b/atuin-server-postgres/src/wrappers.rs @@ -1,5 +1,5 @@ use ::sqlx::{FromRow, Result}; -use atuin_common::record::{EncryptedData, Record}; +use atuin_common::record::{EncryptedData, Host, Record}; use atuin_server_database::models::{History, Session, User}; use sqlx::{postgres::PgRow, Row}; use time::PrimitiveDateTime; @@ -51,6 +51,7 @@ impl<'a> ::sqlx::FromRow<'a, PgRow> for DbHistory { impl<'a> ::sqlx::FromRow<'a, PgRow> for DbRecord { fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> { let timestamp: i64 = row.try_get("timestamp")?; + let idx: i64 = row.try_get("idx")?; let data = EncryptedData { data: row.try_get("data")?, @@ -59,8 +60,8 @@ impl<'a> ::sqlx::FromRow<'a, PgRow> for DbRecord { Ok(Self(Record { id: row.try_get("client_id")?, - host: row.try_get("host")?, - parent: row.try_get("parent")?, + host: Host::new(row.try_get("host")?), + idx: idx as u64, timestamp: timestamp as u64, version: row.try_get("version")?, tag: row.try_get("tag")?, diff --git a/atuin-server/src/handlers/mod.rs b/atuin-server/src/handlers/mod.rs index 18b1af8e..b66a20bf 100644 --- a/atuin-server/src/handlers/mod.rs +++ b/atuin-server/src/handlers/mod.rs @@ -8,6 +8,7 @@ pub mod history; pub mod record; pub mod status; pub mod user; +pub mod v0; const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/atuin-server/src/handlers/record.rs b/atuin-server/src/handlers/record.rs index 91b937b3..b5c07c5b 100644 --- a/atuin-server/src/handlers/record.rs +++ b/atuin-server/src/handlers/record.rs @@ -1,109 +1,46 @@ -use axum::{extract::Query, extract::State, Json}; +use axum::{response::IntoResponse, Json}; use http::StatusCode; -use metrics::counter; -use serde::Deserialize; -use tracing::{error, instrument}; +use serde_json::json; +use tracing::instrument; use super::{ErrorResponse, ErrorResponseStatus, RespExt}; -use crate::router::{AppState, UserAuth}; +use crate::router::UserAuth; use atuin_server_database::Database; -use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIndex}; +use atuin_common::record::{EncryptedData, Record}; #[instrument(skip_all, fields(user.id = user.id))] pub async fn post<DB: Database>( UserAuth(user): UserAuth, - state: State<AppState<DB>>, - Json(records): Json<Vec<Record<EncryptedData>>>, ) -> Result<(), ErrorResponseStatus<'static>> { - let State(AppState { database, settings }) = state; + // anyone who has actually used the old record store (a very small number) will see this error + // upon trying to sync. + // 1. The status endpoint will say that the server has nothing + // 2. The client will try to upload local records + // 3. Sync will fail with this error - tracing::debug!( - count = records.len(), - user = user.username, - "request to add records" + // If the client has no local records, they will see the empty index and do nothing. For the + // vast majority of users, this is the case. + return Err( + ErrorResponse::reply("record store deprecated; please upgrade") + .with_status(StatusCode::BAD_REQUEST), ); - - counter!("atuin_record_uploaded", records.len() as u64); - - let too_big = records - .iter() - .any(|r| r.data.data.len() >= settings.max_record_size || settings.max_record_size == 0); - - if too_big { - counter!("atuin_record_too_large", 1); - - return Err( - ErrorResponse::reply("could not add records; record too large") - .with_status(StatusCode::BAD_REQUEST), - ); - } - - if let Err(e) = database.add_records(&user, &records).await { - error!("failed to add record: {}", e); - - return Err(ErrorResponse::reply("failed to add record") - .with_status(StatusCode::INTERNAL_SERVER_ERROR)); - }; - - Ok(()) } #[instrument(skip_all, fields(user.id = user.id))] -pub async fn index<DB: Database>( - UserAuth(user): UserAuth, - state: State<AppState<DB>>, -) -> Result<Json<RecordIndex>, ErrorResponseStatus<'static>> { - let State(AppState { - database, - settings: _, - }) = state; - - let record_index = match database.tail_records(&user).await { - Ok(index) => index, - Err(e) => { - error!("failed to get record index: {}", e); +pub async fn index<DB: Database>(UserAuth(user): UserAuth) -> axum::response::Response { + let ret = json!({ + "hosts": {} + }); - return Err(ErrorResponse::reply("failed to calculate record index") - .with_status(StatusCode::INTERNAL_SERVER_ERROR)); - } - }; - - Ok(Json(record_index)) -} - -#[derive(Deserialize)] -pub struct NextParams { - host: HostId, - tag: String, - start: Option<RecordId>, - count: u64, + ret.to_string().into_response() } #[instrument(skip_all, fields(user.id = user.id))] -pub async fn next<DB: Database>( - params: Query<NextParams>, +pub async fn next( UserAuth(user): UserAuth, - state: State<AppState<DB>>, ) -> Result<Json<Vec<Record<EncryptedData>>>, ErrorResponseStatus<'static>> { - let State(AppState { - database, - settings: _, - }) = state; - let params = params.0; - - let records = match database - .next_records(&user, params.host, params.tag, params.start, params.count) - .await - { - Ok(records) => records, - Err(e) => { - error!("failed to get record index: {}", e); - - return Err(ErrorResponse::reply("failed to calculate record index") - .with_status(StatusCode::INTERNAL_SERVER_ERROR)); - } - }; + let records = Vec::new(); Ok(Json(records)) } diff --git a/atuin-server/src/handlers/v0/mod.rs b/atuin-server/src/handlers/v0/mod.rs new file mode 100644 index 00000000..78fb47b8 --- /dev/null +++ b/atuin-server/src/handlers/v0/mod.rs @@ -0,0 +1 @@ +pub(crate) mod record; diff --git a/atuin-server/src/handlers/v0/record.rs b/atuin-server/src/handlers/v0/record.rs new file mode 100644 index 00000000..79b2f80c --- /dev/null +++ b/atuin-server/src/handlers/v0/record.rs @@ -0,0 +1,111 @@ +use axum::{extract::Query, extract::State, Json}; +use http::StatusCode; +use metrics::counter; +use serde::Deserialize; +use tracing::{error, instrument}; + +use crate::{ + handlers::{ErrorResponse, ErrorResponseStatus, RespExt}, + router::{AppState, UserAuth}, +}; +use atuin_server_database::Database; + +use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus}; + +#[instrument(skip_all, fields(user.id = user.id))] +pub async fn post<DB: Database>( + UserAuth(user): UserAuth, + state: State<AppState<DB>>, + Json(records): Json<Vec<Record<EncryptedData>>>, +) -> Result<(), ErrorResponseStatus<'static>> { + let State(AppState { database, settings }) = state; + + tracing::debug!( + count = records.len(), + user = user.username, + "request to add records" + ); + + counter!("atuin_record_uploaded", records.len() as u64); + + let too_big = records + .iter() + .any(|r| r.data.data.len() >= settings.max_record_size || settings.max_record_size == 0); + + if too_big { + counter!("atuin_record_too_large", 1); + + return Err( + ErrorResponse::reply("could not add records; record too large") + .with_status(StatusCode::BAD_REQUEST), + ); + } + + if let Err(e) = database.add_records(&user, &records).await { + error!("failed to add record: {}", e); + + return Err(ErrorResponse::reply("failed to add record") + .with_status(StatusCode::INTERNAL_SERVER_ERROR)); + }; + + Ok(()) +} + +#[instrument(skip_all, fields(user.id = user.id))] +pub async fn index<DB: Database>( + UserAuth(user): UserAuth, + state: State<AppState<DB>>, +) -> Result<Json<RecordStatus>, ErrorResponseStatus<'static>> { + let State(AppState { + database, + settings: _, + }) = state; + + let record_index = match database.status(&user).await { + Ok(index) => index, + Err(e) => { + error!("failed to get record index: {}", e); + + return Err(ErrorResponse::reply("failed to calculate record index") + .with_status(StatusCode::INTERNAL_SERVER_ERROR)); + } + }; + + Ok(Json(record_index)) +} + +#[derive(Deserialize)] +pub struct NextParams { + host: HostId, + tag: String, + start: Option<RecordIdx>, + count: u64, +} + +#[instrument(skip_all, fields(user.id = user.id))] +pub async fn next<DB: Database>( + params: Query<NextParams>, + UserAuth(user): UserAuth, + state: State<AppState<DB>>, +) -> Result<Json<Vec<Record<EncryptedData>>>, ErrorResponseStatus<'static>> { + let State(AppState { + database, + settings: _, + }) = state; + let params = params.0; + + let records = match database + .next_records(&user, params.host, params.tag, params.start, params.count) + .await + { + Ok(records) => records, + Err(e) => { + error!("failed to get record index: {}", e); + + return Err(ErrorResponse::reply("failed to calculate record index") + .with_status(StatusCode::INTERNAL_SERVER_ERROR)); + } + }; + + Ok(Json(records)) +} diff --git a/atuin-server/src/router.rs b/atuin-server/src/router.rs index 42cfaa86..500e1a29 100644 --- a/atuin-server/src/router.rs +++ b/atuin-server/src/router.rs @@ -118,13 +118,16 @@ pub fn router<DB: Database>(database: DB, settings: Settings<DB::Settings>) -> R .route("/sync/status", get(handlers::status::status)) .route("/history", post(handlers::history::add)) .route("/history", delete(handlers::history::delete)) - .route("/record", post(handlers::record::post)) - .route("/record", get(handlers::record::index)) - .route("/record/next", get(handlers::record::next)) .route("/user/:username", get(handlers::user::get)) .route("/account", delete(handlers::user::delete)) .route("/register", post(handlers::user::register)) - .route("/login", post(handlers::user::login)); + .route("/login", post(handlers::user::login)) + .route("/record", post(handlers::record::post::<DB>)) + .route("/record", get(handlers::record::index::<DB>)) + .route("/record/next", get(handlers::record::next)) + .route("/api/v0/record", post(handlers::v0::record::post)) + .route("/api/v0/record", get(handlers::v0::record::index)) + .route("/api/v0/record/next", get(handlers::v0::record::next)); let path = settings.path.as_str(); if path.is_empty() { |
