aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-server-postgres
diff options
context:
space:
mode:
authorBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
committerBenedikt Peetz <benedikt.peetz@b-peetz.de>2026-06-11 00:54:30 +0200
commit5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8 (patch)
treec64baa8d5866c8e339eaf660dd3f94f30a3f7d8a /crates/atuin-server-postgres
parentchore: Somewhat simplify sync code (diff)
downloadatuin-5c39e7cf284a1f6e9a1657f2deb44e359fc47eb8.zip
chore: Move everything into one big crate
That helps remove duplicated code and rustc/cargo will now also show dead code correctly.
Diffstat (limited to 'crates/atuin-server-postgres')
-rw-r--r--crates/atuin-server-postgres/Cargo.toml25
-rw-r--r--crates/atuin-server-postgres/build.rs5
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153745_create_history.sql11
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153757_create_users.sql10
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql6
-rw-r--r--crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql51
-rw-r--r--crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql35
-rw-r--r--crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql3
-rw-r--r--crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql1
-rw-r--r--crates/atuin-server-postgres/migrations/20220505082442_create-events.sql14
-rw-r--r--crates/atuin-server-postgres/migrations/20220610074049_history-length.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql5
-rw-r--r--crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql30
-rw-r--r--crates/atuin-server-postgres/migrations/20230623070418_records.sql15
-rw-r--r--crates/atuin-server-postgres/migrations/20231202170508_create-store.sql15
-rw-r--r--crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql4
-rw-r--r--crates/atuin-server-postgres/migrations/20240614104159_idx-cache.sql8
-rw-r--r--crates/atuin-server-postgres/migrations/20240621110731_user-verified.sql8
-rw-r--r--crates/atuin-server-postgres/migrations/20240702094825_idx_cache_index.sql1
-rw-r--r--crates/atuin-server-postgres/migrations/20260127000000_remove-email-verification.sql2
-rw-r--r--crates/atuin-server-postgres/src/lib.rs581
-rw-r--r--crates/atuin-server-postgres/src/wrappers.rs77
24 files changed, 0 insertions, 913 deletions
diff --git a/crates/atuin-server-postgres/Cargo.toml b/crates/atuin-server-postgres/Cargo.toml
deleted file mode 100644
index ea19899e..00000000
--- a/crates/atuin-server-postgres/Cargo.toml
+++ /dev/null
@@ -1,25 +0,0 @@
-[package]
-name = "atuin-server-postgres"
-edition = "2024"
-description = "server postgres database library for atuin"
-
-version = { workspace = true }
-authors = { workspace = true }
-license = { workspace = true }
-homepage = { workspace = true }
-repository = { workspace = true }
-
-[dependencies]
-atuin-common = { path = "../atuin-common", version = "18.16.1" }
-atuin-server-database = { path = "../atuin-server-database", version = "18.16.1" }
-
-eyre = { workspace = true }
-tracing = { workspace = true }
-time = { workspace = true }
-serde = { workspace = true }
-sqlx = { workspace = true }
-async-trait = { workspace = true }
-uuid = { workspace = true }
-metrics = "0.24"
-futures-util = "0.3"
-rand.workspace = true \ No newline at end of file
diff --git a/crates/atuin-server-postgres/build.rs b/crates/atuin-server-postgres/build.rs
deleted file mode 100644
index d5068697..00000000
--- a/crates/atuin-server-postgres/build.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-// generated by `sqlx migrate build-script`
-fn main() {
- // trigger recompilation when a new migration is added
- println!("cargo:rerun-if-changed=migrations");
-}
diff --git a/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql b/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql
deleted file mode 100644
index 2c2d17b0..00000000
--- a/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql
+++ /dev/null
@@ -1,11 +0,0 @@
-create table history (
- id bigserial primary key,
- client_id text not null unique, -- the client-generated ID
- user_id bigserial not null, -- allow multiple users
- hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
- timestamp timestamp not null, -- one of the few non-encrypted metadatas
-
- data varchar(8192) not null, -- store the actual history data, encrypted. I don't wanna know!
-
- created_at timestamp not null default current_timestamp
-);
diff --git a/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql b/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql
deleted file mode 100644
index a25dcced..00000000
--- a/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql
+++ /dev/null
@@ -1,10 +0,0 @@
-create table users (
- id bigserial primary key, -- also store our own ID
- username varchar(32) not null unique, -- being able to contact users is useful
- email varchar(128) not null unique, -- being able to contact users is useful
- password varchar(128) not null unique
-);
-
--- the prior index is case sensitive :(
-CREATE UNIQUE INDEX email_unique_idx on users (LOWER(email));
-CREATE UNIQUE INDEX username_unique_idx on users (LOWER(username));
diff --git a/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql b/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql
deleted file mode 100644
index c2fb6559..00000000
--- a/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql
+++ /dev/null
@@ -1,6 +0,0 @@
--- Add migration script here
-create table sessions (
- id bigserial primary key,
- user_id bigserial,
- token varchar(128) unique not null
-);
diff --git a/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql b/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql
deleted file mode 100644
index dd1afa88..00000000
--- a/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql
+++ /dev/null
@@ -1,51 +0,0 @@
--- Prior to this, the count endpoint was super naive and just ran COUNT(1).
--- This is slow asf. Now that we have an amount of actual traffic,
--- stop doing that!
--- This basically maintains a count, so we can read ONE row, instead of ALL the
--- rows. Much better.
--- Future optimisation could use some sort of cache so we don't even need to hit
--- postgres at all.
-
-create table total_history_count_user(
- id bigserial primary key,
- user_id bigserial,
- total integer -- try and avoid using keywords - hence total, not count
-);
-
-create or replace function user_history_count()
-returns trigger as
-$func$
-begin
- if (TG_OP='INSERT') then
- update total_history_count_user set total = total + 1 where user_id = new.user_id;
-
- if not found then
- insert into total_history_count_user(user_id, total)
- values (
- new.user_id,
- (select count(1) from history where user_id = new.user_id)
- );
- end if;
-
- elsif (TG_OP='DELETE') then
- update total_history_count_user set total = total - 1 where user_id = new.user_id;
-
- if not found then
- insert into total_history_count_user(user_id, total)
- values (
- new.user_id,
- (select count(1) from history where user_id = new.user_id)
- );
- end if;
- end if;
-
- return NEW; -- this is actually ignored for an after trigger, but oh well
-end;
-$func$
-language plpgsql volatile -- pldfplplpflh
-cost 100; -- default value
-
-create trigger tg_user_history_count
- after insert or delete on history
- for each row
- execute procedure user_history_count();
diff --git a/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql b/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql
deleted file mode 100644
index 6198f300..00000000
--- a/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql
+++ /dev/null
@@ -1,35 +0,0 @@
--- the old version of this function used NEW in the delete part when it should
--- use OLD
-
-create or replace function user_history_count()
-returns trigger as
-$func$
-begin
- if (TG_OP='INSERT') then
- update total_history_count_user set total = total + 1 where user_id = new.user_id;
-
- if not found then
- insert into total_history_count_user(user_id, total)
- values (
- new.user_id,
- (select count(1) from history where user_id = new.user_id)
- );
- end if;
-
- elsif (TG_OP='DELETE') then
- update total_history_count_user set total = total - 1 where user_id = old.user_id;
-
- if not found then
- insert into total_history_count_user(user_id, total)
- values (
- old.user_id,
- (select count(1) from history where user_id = old.user_id)
- );
- end if;
- end if;
-
- return NEW; -- this is actually ignored for an after trigger, but oh well
-end;
-$func$
-language plpgsql volatile -- pldfplplpflh
-cost 100; -- default value
diff --git a/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql b/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql
deleted file mode 100644
index 0ac43433..00000000
--- a/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql
+++ /dev/null
@@ -1,3 +0,0 @@
--- Make it 4x larger. Most commands are less than this, but as it's base64
--- SOME are more than 8192. Should be enough for now.
-ALTER TABLE history ALTER COLUMN data TYPE varchar(32768);
diff --git a/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql b/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql
deleted file mode 100644
index a9138194..00000000
--- a/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql
+++ /dev/null
@@ -1 +0,0 @@
-alter table users add column created_at timestamp not null default now();
diff --git a/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql b/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql
deleted file mode 100644
index 57e16ec7..00000000
--- a/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql
+++ /dev/null
@@ -1,14 +0,0 @@
-create type event_type as enum ('create', 'delete');
-
-create table events (
- id bigserial primary key,
- client_id text not null unique, -- the client-generated ID
- user_id bigserial not null, -- allow multiple users
- hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
- timestamp timestamp not null, -- one of the few non-encrypted metadatas
-
- event_type event_type,
- data text not null, -- store the actual history data, encrypted. I don't wanna know!
-
- created_at timestamp not null default current_timestamp
-);
diff --git a/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql b/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql
deleted file mode 100644
index b1c23016..00000000
--- a/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql
+++ /dev/null
@@ -1,2 +0,0 @@
--- Add migration script here
-alter table history alter column data type text;
diff --git a/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql b/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql
deleted file mode 100644
index fe3cae17..00000000
--- a/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql
+++ /dev/null
@@ -1,2 +0,0 @@
--- Add migration script here
-drop table events;
diff --git a/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql b/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql
deleted file mode 100644
index 9a9e6263..00000000
--- a/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql
+++ /dev/null
@@ -1,5 +0,0 @@
--- Add migration script here
-alter table history add column if not exists deleted_at timestamp;
-
--- queries will all be selecting the ids of history for a user, that has been deleted
-create index if not exists history_deleted_index on history(client_id, user_id, deleted_at);
diff --git a/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql b/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql
deleted file mode 100644
index 3d0bba52..00000000
--- a/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql
+++ /dev/null
@@ -1,30 +0,0 @@
--- We do not need to run the trigger on deletes, as the only time we are deleting history is when the user
--- has already been deleted
--- This actually slows down deleting all the history a good bit!
-
-create or replace function user_history_count()
-returns trigger as
-$func$
-begin
- if (TG_OP='INSERT') then
- update total_history_count_user set total = total + 1 where user_id = new.user_id;
-
- if not found then
- insert into total_history_count_user(user_id, total)
- values (
- new.user_id,
- (select count(1) from history where user_id = new.user_id)
- );
- end if;
- end if;
-
- return NEW; -- this is actually ignored for an after trigger, but oh well
-end;
-$func$
-language plpgsql volatile -- pldfplplpflh
-cost 100; -- default value
-
-create or replace trigger tg_user_history_count
- after insert on history
- for each row
- execute procedure user_history_count();
diff --git a/crates/atuin-server-postgres/migrations/20230623070418_records.sql b/crates/atuin-server-postgres/migrations/20230623070418_records.sql
deleted file mode 100644
index 22437595..00000000
--- a/crates/atuin-server-postgres/migrations/20230623070418_records.sql
+++ /dev/null
@@ -1,15 +0,0 @@
--- Add migration script here
-create table records (
- id uuid primary key, -- remember to use uuidv7 for happy indices <3
- client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key
- host uuid not null, -- a unique identifier for the host
- parent uuid default null, -- the ID of the parent record, bearing in mind this is a linked list
- timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
- version text not null,
- tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
- data text not null, -- store the actual history data, encrypted. I don't wanna know!
- cek text not null,
-
- user_id bigint not null, -- allow multiple users
- created_at timestamp not null default current_timestamp
-);
diff --git a/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql b/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql
deleted file mode 100644
index ffb57966..00000000
--- a/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql
+++ /dev/null
@@ -1,15 +0,0 @@
--- Add migration script here
-create table store (
- id uuid primary key, -- remember to use uuidv7 for happy indices <3
- client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key, even though it's fine mathematically
- host uuid not null, -- a unique identifier for the host
- idx bigint not null, -- the index of the record in this store, identified by (host, tag)
- timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
- version text not null,
- tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
- data text not null, -- store the actual history data, encrypted. I don't wanna know!
- cek text not null,
-
- user_id bigint not null, -- allow multiple users
- created_at timestamp not null default current_timestamp
-);
diff --git a/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql b/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql
deleted file mode 100644
index 56d67145..00000000
--- a/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql
+++ /dev/null
@@ -1,2 +0,0 @@
--- Add migration script here
-create unique index record_uniq ON store(user_id, host, tag, idx);
diff --git a/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql b/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql
deleted file mode 100644
index ad2af5a1..00000000
--- a/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql
+++ /dev/null
@@ -1,4 +0,0 @@
--- Add migration script here
-alter table history alter column user_id drop default;
-alter table sessions alter column user_id drop default;
-alter table total_history_count_user alter column user_id drop default;
diff --git a/crates/atuin-server-postgres/migrations/20240614104159_idx-cache.sql b/crates/atuin-server-postgres/migrations/20240614104159_idx-cache.sql
deleted file mode 100644
index 76425ed7..00000000
--- a/crates/atuin-server-postgres/migrations/20240614104159_idx-cache.sql
+++ /dev/null
@@ -1,8 +0,0 @@
-create table store_idx_cache(
- id bigserial primary key,
- user_id bigint,
-
- host uuid,
- tag text,
- idx bigint
-);
diff --git a/crates/atuin-server-postgres/migrations/20240621110731_user-verified.sql b/crates/atuin-server-postgres/migrations/20240621110731_user-verified.sql
deleted file mode 100644
index 6eba02ec..00000000
--- a/crates/atuin-server-postgres/migrations/20240621110731_user-verified.sql
+++ /dev/null
@@ -1,8 +0,0 @@
-alter table users add verified_at timestamp with time zone default null;
-
-create table user_verification_token(
- id bigserial primary key,
- user_id bigint unique references users(id),
- token text,
- valid_until timestamp with time zone
-);
diff --git a/crates/atuin-server-postgres/migrations/20240702094825_idx_cache_index.sql b/crates/atuin-server-postgres/migrations/20240702094825_idx_cache_index.sql
deleted file mode 100644
index d1a7b194..00000000
--- a/crates/atuin-server-postgres/migrations/20240702094825_idx_cache_index.sql
+++ /dev/null
@@ -1 +0,0 @@
-create unique index store_idx_cache_uniq on store_idx_cache(user_id, host, tag);
diff --git a/crates/atuin-server-postgres/migrations/20260127000000_remove-email-verification.sql b/crates/atuin-server-postgres/migrations/20260127000000_remove-email-verification.sql
deleted file mode 100644
index 15309920..00000000
--- a/crates/atuin-server-postgres/migrations/20260127000000_remove-email-verification.sql
+++ /dev/null
@@ -1,2 +0,0 @@
-drop table if exists user_verification_token;
-alter table users drop column if exists verified_at;
diff --git a/crates/atuin-server-postgres/src/lib.rs b/crates/atuin-server-postgres/src/lib.rs
deleted file mode 100644
index 2e69c7f2..00000000
--- a/crates/atuin-server-postgres/src/lib.rs
+++ /dev/null
@@ -1,581 +0,0 @@
-use std::collections::HashMap;
-use std::ops::Range;
-
-use rand::Rng;
-
-use async_trait::async_trait;
-use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus};
-use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
-use atuin_server_database::{Database, DbError, DbResult, DbSettings, into_utc};
-use futures_util::TryStreamExt;
-use sqlx::Row;
-use sqlx::postgres::PgPoolOptions;
-
-use time::OffsetDateTime;
-use tracing::instrument;
-use uuid::Uuid;
-use wrappers::{DbHistory, DbRecord, DbSession, DbUser};
-
-mod wrappers;
-
-const MIN_PG_VERSION: u32 = 14;
-
-#[derive(Clone)]
-pub struct Postgres {
- pool: sqlx::Pool<sqlx::postgres::Postgres>,
- /// Optional read replica pool for read-only queries
- read_pool: Option<sqlx::Pool<sqlx::postgres::Postgres>>,
-}
-
-impl Postgres {
- /// Returns the appropriate pool for read operations.
- /// Uses read_pool if available, otherwise falls back to the primary pool.
- fn read_pool(&self) -> &sqlx::Pool<sqlx::postgres::Postgres> {
- self.read_pool.as_ref().unwrap_or(&self.pool)
- }
-}
-
-#[async_trait]
-impl Database for Postgres {
- async fn new(settings: &DbSettings) -> DbResult<Self> {
- let pool = PgPoolOptions::new()
- .max_connections(100)
- .connect(settings.db_uri.as_str())
- .await?;
-
- // Call server_version_num to get the DB server's major version number
- // The call returns None for servers older than 8.x.
- let pg_major_version: u32 =
- pool.acquire()
- .await?
- .server_version_num()
- .ok_or(DbError::Other(eyre::Report::msg(
- "could not get PostgreSQL version",
- )))?
- / 10000;
-
- if pg_major_version < MIN_PG_VERSION {
- return Err(DbError::Other(eyre::Report::msg(format!(
- "unsupported PostgreSQL version {pg_major_version}, minimum required is {MIN_PG_VERSION}"
- ))));
- }
-
- sqlx::migrate!("./migrations")
- .run(&pool)
- .await
- .map_err(|error| DbError::Other(error.into()))?;
-
- // Create read replica pool if configured
- let read_pool = if let Some(read_db_uri) = &settings.read_db_uri {
- tracing::info!("Connecting to read replica database");
- let read_pool = PgPoolOptions::new()
- .max_connections(100)
- .connect(read_db_uri.as_str())
- .await?;
-
- // Verify the read replica is also a supported PostgreSQL version
- let read_pg_major_version: u32 = read_pool
- .acquire()
- .await?
- .server_version_num()
- .ok_or(DbError::Other(eyre::Report::msg(
- "could not get PostgreSQL version from read replica",
- )))?
- / 10000;
-
- if read_pg_major_version < MIN_PG_VERSION {
- return Err(DbError::Other(eyre::Report::msg(format!(
- "unsupported PostgreSQL version {read_pg_major_version} on read replica, minimum required is {MIN_PG_VERSION}"
- ))));
- }
-
- Some(read_pool)
- } else {
- None
- };
-
- Ok(Self { pool, read_pool })
- }
-
- #[instrument(skip_all)]
- async fn get_session(&self, token: &str) -> DbResult<Session> {
- sqlx::query_as("select id, user_id, token from sessions where token = $1")
- .bind(token)
- .fetch_one(self.read_pool())
- .await
- .map_err(Into::into)
- .map(|DbSession(session)| session)
- }
-
- #[instrument(skip_all)]
- async fn get_user(&self, username: &str) -> DbResult<User> {
- sqlx::query_as("select id, username, email, password from users where username = $1")
- .bind(username)
- .fetch_one(self.read_pool())
- .await
- .map_err(Into::into)
- .map(|DbUser(user)| user)
- }
-
- #[instrument(skip_all)]
- async fn get_session_user(&self, token: &str) -> DbResult<User> {
- sqlx::query_as(
- "select users.id, users.username, users.email, users.password from users
- inner join sessions
- on users.id = sessions.user_id
- and sessions.token = $1",
- )
- .bind(token)
- .fetch_one(self.read_pool())
- .await
- .map_err(Into::into)
- .map(|DbUser(user)| user)
- }
-
- #[instrument(skip_all)]
- async fn count_history(&self, user: &User) -> DbResult<i64> {
- // The cache is new, and the user might not yet have a cache value.
- // They will have one as soon as they post up some new history, but handle that
- // edge case.
-
- let res: (i64,) = sqlx::query_as(
- "select count(1) from history
- where user_id = $1",
- )
- .bind(user.id)
- .fetch_one(self.read_pool())
- .await?;
-
- Ok(res.0)
- }
-
- #[instrument(skip_all)]
- async fn count_history_cached(&self, user: &User) -> DbResult<i64> {
- let res: (i32,) = sqlx::query_as(
- "select total from total_history_count_user
- where user_id = $1",
- )
- .bind(user.id)
- .fetch_one(self.read_pool())
- .await?;
-
- Ok(res.0 as i64)
- }
-
- async fn delete_store(&self, user: &User) -> DbResult<()> {
- let mut tx = self.pool.begin().await?;
-
- sqlx::query(
- "delete from store
- where user_id = $1",
- )
- .bind(user.id)
- .execute(&mut *tx)
- .await?;
-
- sqlx::query(
- "delete from store_idx_cache
- where user_id = $1",
- )
- .bind(user.id)
- .execute(&mut *tx)
- .await?;
-
- tx.commit().await?;
-
- Ok(())
- }
-
- async fn delete_history(&self, user: &User, id: String) -> DbResult<()> {
- sqlx::query(
- "update history
- set deleted_at = $3
- where user_id = $1
- and client_id = $2
- and deleted_at is null", // don't just keep setting it
- )
- .bind(user.id)
- .bind(id)
- .bind(OffsetDateTime::now_utc())
- .fetch_all(&self.pool)
- .await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn deleted_history(&self, user: &User) -> DbResult<Vec<String>> {
- // The cache is new, and the user might not yet have a cache value.
- // They will have one as soon as they post up some new history, but handle that
- // edge case.
-
- let res = sqlx::query(
- "select client_id from history
- where user_id = $1
- and deleted_at is not null",
- )
- .bind(user.id)
- .fetch_all(self.read_pool())
- .await?;
-
- let res = res
- .iter()
- .map(|row| row.get::<String, _>("client_id"))
- .collect();
-
- Ok(res)
- }
-
- #[instrument(skip_all)]
- async fn count_history_range(
- &self,
- user: &User,
- range: Range<OffsetDateTime>,
- ) -> DbResult<i64> {
- let res: (i64,) = sqlx::query_as(
- "select count(1) from history
- where user_id = $1
- and timestamp >= $2::date
- and timestamp < $3::date",
- )
- .bind(user.id)
- .bind(into_utc(range.start))
- .bind(into_utc(range.end))
- .fetch_one(self.read_pool())
- .await?;
-
- Ok(res.0)
- }
-
- #[instrument(skip_all)]
- async fn list_history(
- &self,
- user: &User,
- created_after: OffsetDateTime,
- since: OffsetDateTime,
- host: &str,
- page_size: i64,
- ) -> DbResult<Vec<History>> {
- let res = sqlx::query_as(
- "select id, client_id, user_id, hostname, timestamp, data, created_at from history
- where user_id = $1
- and hostname != $2
- and created_at >= $3
- and timestamp >= $4
- order by timestamp asc
- limit $5",
- )
- .bind(user.id)
- .bind(host)
- .bind(into_utc(created_after))
- .bind(into_utc(since))
- .bind(page_size)
- .fetch(self.read_pool())
- .map_ok(|DbHistory(h)| h)
- .try_collect()
- .await?;
-
- Ok(res)
- }
-
- #[instrument(skip_all)]
- async fn add_history(&self, history: &[NewHistory]) -> DbResult<()> {
- let mut tx = self.pool.begin().await?;
-
- for i in history {
- let client_id: &str = &i.client_id;
- let hostname: &str = &i.hostname;
- let data: &str = &i.data;
-
- sqlx::query(
- "insert into history
- (client_id, user_id, hostname, timestamp, data)
- values ($1, $2, $3, $4, $5)
- on conflict do nothing
- ",
- )
- .bind(client_id)
- .bind(i.user_id)
- .bind(hostname)
- .bind(i.timestamp)
- .bind(data)
- .execute(&mut *tx)
- .await?;
- }
-
- tx.commit().await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn delete_user(&self, u: &User) -> DbResult<()> {
- sqlx::query("delete from sessions where user_id = $1")
- .bind(u.id)
- .execute(&self.pool)
- .await?;
-
- sqlx::query("delete from history where user_id = $1")
- .bind(u.id)
- .execute(&self.pool)
- .await?;
-
- sqlx::query("delete from store where user_id = $1")
- .bind(u.id)
- .execute(&self.pool)
- .await?;
-
- sqlx::query("delete from total_history_count_user where user_id = $1")
- .bind(u.id)
- .execute(&self.pool)
- .await?;
-
- sqlx::query("delete from users where id = $1")
- .bind(u.id)
- .execute(&self.pool)
- .await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn update_user_password(&self, user: &User) -> DbResult<()> {
- sqlx::query(
- "update users
- set password = $1
- where id = $2",
- )
- .bind(&user.password)
- .bind(user.id)
- .execute(&self.pool)
- .await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn add_user(&self, user: &NewUser) -> DbResult<i64> {
- let email: &str = &user.email;
- let username: &str = &user.username;
- let password: &str = &user.password;
-
- let res: (i64,) = sqlx::query_as(
- "insert into users
- (username, email, password)
- values($1, $2, $3)
- returning id",
- )
- .bind(username)
- .bind(email)
- .bind(password)
- .fetch_one(&self.pool)
- .await?;
-
- Ok(res.0)
- }
-
- #[instrument(skip_all)]
- async fn add_session(&self, session: &NewSession) -> DbResult<()> {
- let token: &str = &session.token;
-
- sqlx::query(
- "insert into sessions
- (user_id, token)
- values($1, $2)",
- )
- .bind(session.user_id)
- .bind(token)
- .execute(&self.pool)
- .await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn get_user_session(&self, u: &User) -> DbResult<Session> {
- sqlx::query_as("select id, user_id, token from sessions where user_id = $1")
- .bind(u.id)
- .fetch_one(self.read_pool())
- .await
- .map_err(Into::into)
- .map(|DbSession(session)| session)
- }
-
- #[instrument(skip_all)]
- async fn oldest_history(&self, user: &User) -> DbResult<History> {
- sqlx::query_as(
- "select id, client_id, user_id, hostname, timestamp, data, created_at from history
- where user_id = $1
- order by timestamp asc
- limit 1",
- )
- .bind(user.id)
- .fetch_one(self.read_pool())
- .await
- .map_err(Into::into)
- .map(|DbHistory(h)| h)
- }
-
- #[instrument(skip_all)]
- async fn add_records(&self, user: &User, records: &[Record<EncryptedData>]) -> DbResult<()> {
- let mut tx = self.pool.begin().await?;
-
- // We won't have uploaded this data if it wasn't the max. Therefore, we can deduce the max
- // idx without having to make further database queries. Doing the query on this small
- // amount of data should be much, much faster.
- //
- // Worst case, say we get this wrong. We end up caching data that isn't actually the max
- // idx, so clients upload again. The cache logic can be verified with a sql query anyway :)
-
- let mut heads = HashMap::<(HostId, &str), u64>::new();
-
- for i in records {
- let id = atuin_common::utils::uuid_v7();
-
- let result = sqlx::query(
- "insert into store
- (id, client_id, host, idx, timestamp, version, tag, data, cek, user_id)
- values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- on conflict do nothing
- ",
- )
- .bind(id)
- .bind(i.id)
- .bind(i.host.id)
- .bind(i.idx as i64)
- .bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time
- .bind(&i.version)
- .bind(&i.tag)
- .bind(&i.data.data)
- .bind(&i.data.content_encryption_key)
- .bind(user.id)
- .execute(&mut *tx)
- .await?;
-
- // Only update heads if we actually inserted the record
- if result.rows_affected() > 0 {
- heads
- .entry((i.host.id, &i.tag))
- .and_modify(|e| {
- if i.idx > *e {
- *e = i.idx
- }
- })
- .or_insert(i.idx);
- }
- }
-
- // we've built the map of heads for this push, so commit it to the database
- for ((host, tag), idx) in heads {
- sqlx::query(
- "insert into store_idx_cache
- (user_id, host, tag, idx)
- values ($1, $2, $3, $4)
- on conflict(user_id, host, tag) do update set idx = greatest(store_idx_cache.idx, $4)
- ",
- )
- .bind(user.id)
- .bind(host)
- .bind(tag)
- .bind(idx as i64)
- .execute(&mut *tx)
- .await
- ?;
- }
-
- tx.commit().await?;
-
- Ok(())
- }
-
- #[instrument(skip_all)]
- async fn next_records(
- &self,
- user: &User,
- host: HostId,
- tag: String,
- start: Option<RecordIdx>,
- count: u64,
- ) -> DbResult<Vec<Record<EncryptedData>>> {
- tracing::debug!("{:?} - {:?} - {:?}", host, tag, start);
- let start = start.unwrap_or(0);
-
- let records: Result<Vec<DbRecord>, DbError> = sqlx::query_as(
- "select client_id, host, idx, timestamp, version, tag, data, cek from store
- where user_id = $1
- and tag = $2
- and host = $3
- and idx >= $4
- order by idx asc
- limit $5",
- )
- .bind(user.id)
- .bind(tag.clone())
- .bind(host)
- .bind(start as i64)
- .bind(count as i64)
- .fetch_all(self.read_pool())
- .await
- .map_err(Into::into);
-
- let ret = match records {
- Ok(records) => {
- let records: Vec<Record<EncryptedData>> = records
- .into_iter()
- .map(|f| {
- let record: Record<EncryptedData> = f.into();
- record
- })
- .collect();
-
- records
- }
- Err(DbError::NotFound) => {
- tracing::debug!("no records found in store: {:?}/{}", host, tag);
- return Ok(vec![]);
- }
- Err(e) => return Err(e),
- };
-
- Ok(ret)
- }
-
- async fn status(&self, user: &User) -> DbResult<RecordStatus> {
- const STATUS_SQL: &str =
- "select host, tag, max(idx) from store where user_id = $1 group by host, tag";
-
- // If IDX_CACHE_ROLLOUT is set, then we
- // 1. Read the value of the var, use it as a % chance of using the cache
- // 2. If we use the cache, just read from the cache table
- // 3. If we don't use the cache, read from the store table
- // IDX_CACHE_ROLLOUT should be between 0 and 100.
-
- let idx_cache_rollout = std::env::var("IDX_CACHE_ROLLOUT").unwrap_or("0".to_string());
- let idx_cache_rollout = idx_cache_rollout.parse::<f64>().unwrap_or(0.0);
- let use_idx_cache = rand::thread_rng().gen_bool(idx_cache_rollout / 100.0);
-
- let mut res: Vec<(Uuid, String, i64)> = if use_idx_cache {
- tracing::debug!("using idx cache for user {}", user.id);
- sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1")
- .bind(user.id)
- .fetch_all(self.read_pool())
- .await?
- } else {
- tracing::debug!("using aggregate query for user {}", user.id);
- sqlx::query_as(STATUS_SQL)
- .bind(user.id)
- .fetch_all(self.read_pool())
- .await?
- };
-
- res.sort();
-
- let mut status = RecordStatus::new();
-
- for i in res.iter() {
- status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64);
- }
-
- Ok(status)
- }
-}
diff --git a/crates/atuin-server-postgres/src/wrappers.rs b/crates/atuin-server-postgres/src/wrappers.rs
deleted file mode 100644
index cde4134c..00000000
--- a/crates/atuin-server-postgres/src/wrappers.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-use ::sqlx::{FromRow, Result};
-use atuin_common::record::{EncryptedData, Host, Record};
-use atuin_server_database::models::{History, Session, User};
-use sqlx::{Row, postgres::PgRow};
-use time::PrimitiveDateTime;
-
-pub struct DbUser(pub User);
-pub struct DbSession(pub Session);
-pub struct DbHistory(pub History);
-pub struct DbRecord(pub Record<EncryptedData>);
-
-impl<'a> FromRow<'a, PgRow> for DbUser {
- fn from_row(row: &'a PgRow) -> Result<Self> {
- Ok(Self(User {
- id: row.try_get("id")?,
- username: row.try_get("username")?,
- email: row.try_get("email")?,
- password: row.try_get("password")?,
- }))
- }
-}
-
-impl<'a> ::sqlx::FromRow<'a, PgRow> for DbSession {
- fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
- Ok(Self(Session {
- id: row.try_get("id")?,
- user_id: row.try_get("user_id")?,
- token: row.try_get("token")?,
- }))
- }
-}
-
-impl<'a> ::sqlx::FromRow<'a, PgRow> for DbHistory {
- fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
- Ok(Self(History {
- id: row.try_get("id")?,
- client_id: row.try_get("client_id")?,
- user_id: row.try_get("user_id")?,
- hostname: row.try_get("hostname")?,
- timestamp: row
- .try_get::<PrimitiveDateTime, _>("timestamp")?
- .assume_utc(),
- data: row.try_get("data")?,
- created_at: row
- .try_get::<PrimitiveDateTime, _>("created_at")?
- .assume_utc(),
- }))
- }
-}
-
-impl<'a> ::sqlx::FromRow<'a, PgRow> for DbRecord {
- fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
- let timestamp: i64 = row.try_get("timestamp")?;
- let idx: i64 = row.try_get("idx")?;
-
- let data = EncryptedData {
- data: row.try_get("data")?,
- content_encryption_key: row.try_get("cek")?,
- };
-
- Ok(Self(Record {
- id: row.try_get("client_id")?,
- host: Host::new(row.try_get("host")?),
- idx: idx as u64,
- timestamp: timestamp as u64,
- version: row.try_get("version")?,
- tag: row.try_get("tag")?,
- data,
- }))
- }
-}
-
-impl From<DbRecord> for Record<EncryptedData> {
- fn from(other: DbRecord) -> Record<EncryptedData> {
- Record { ..other.0 }
- }
-}