diff options
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | crates/atuin-server-postgres/Cargo.toml | 1 | ||||
| -rw-r--r-- | crates/atuin-server-postgres/src/lib.rs | 53 |
3 files changed, 25 insertions, 30 deletions
@@ -489,6 +489,7 @@ dependencies = [ "eyre", "futures-util", "metrics", + "rand 0.8.5", "serde", "sqlx", "time", diff --git a/crates/atuin-server-postgres/Cargo.toml b/crates/atuin-server-postgres/Cargo.toml index 8bb989d2..b75ec224 100644 --- a/crates/atuin-server-postgres/Cargo.toml +++ b/crates/atuin-server-postgres/Cargo.toml @@ -22,3 +22,4 @@ async-trait = { workspace = true } uuid = { workspace = true } metrics = "0.21.1" futures-util = "0.3" +rand.workspace = true
\ No newline at end of file diff --git a/crates/atuin-server-postgres/src/lib.rs b/crates/atuin-server-postgres/src/lib.rs index d5824fec..39c25256 100644 --- a/crates/atuin-server-postgres/src/lib.rs +++ b/crates/atuin-server-postgres/src/lib.rs @@ -1,13 +1,14 @@ use std::collections::HashMap; use std::ops::Range; +use rand::Rng; + use async_trait::async_trait; use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus}; use atuin_common::utils::crypto_random_string; use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User}; use atuin_server_database::{Database, DbError, DbResult, DbSettings}; use futures_util::TryStreamExt; -use metrics::counter; use sqlx::Row; use sqlx::postgres::PgPoolOptions; @@ -635,44 +636,36 @@ impl Database for Postgres { const STATUS_SQL: &str = "select host, tag, max(idx) from store where user_id = $1 group by host, tag"; - // Use a transaction to ensure consistent reads from both tables - let mut tx = self.pool.begin().await.map_err(fix_error)?; + // If IDX_CACHE_ROLLOUT is set, then we + // 1. Read the value of the var, use it as a % chance of using the cache + // 2. If we use the cache, just read from the cache table + // 3. If we don't use the cache, read from the store table + // IDX_CACHE_ROLLOUT should be between 0 and 100. - let mut res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL) - .bind(user.id) - .fetch_all(&mut *tx) - .await - .map_err(fix_error)?; - res.sort(); - - // We're temporarily increasing latency in order to improve confidence in the cache - // If it runs for a few days, and we confirm that cached values are equal to realtime, we - // can replace realtime with cached. - // - // But let's check so sync doesn't do Weird Things. + let idx_cache_rollout = std::env::var("IDX_CACHE_ROLLOUT").unwrap_or("0".to_string()); + let idx_cache_rollout = idx_cache_rollout.parse::<f64>().unwrap_or(0.0); + let use_idx_cache = rand::thread_rng().gen_bool(idx_cache_rollout / 100.0); - let mut cached_res: Vec<(Uuid, String, i64)> = + let mut res: Vec<(Uuid, String, i64)> = if use_idx_cache { + tracing::debug!("using idx cache for user {}", user.id); sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1") .bind(user.id) - .fetch_all(&mut *tx) + .fetch_all(&self.pool) .await - .map_err(fix_error)?; - cached_res.sort(); + .map_err(fix_error)? + } else { + tracing::debug!("using aggregate query for user {}", user.id); + sqlx::query_as(STATUS_SQL) + .bind(user.id) + .fetch_all(&self.pool) + .await + .map_err(fix_error)? + }; - // No need to commit a read-only transaction + res.sort(); let mut status = RecordStatus::new(); - let equal = res == cached_res; - - if equal { - counter!("atuin_store_idx_cache_consistent", 1); - } else { - // log the values if we have an inconsistent cache - tracing::debug!(user = user.username, cache_match = equal, res = ?res, cached = ?cached_res, "record store index request"); - counter!("atuin_store_idx_cache_inconsistent", 1); - }; - for i in res.iter() { status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64); } |
