aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-server-postgres/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/atuin-server-postgres/src/lib.rs')
-rw-r--r--crates/atuin-server-postgres/src/lib.rs53
1 files changed, 23 insertions, 30 deletions
diff --git a/crates/atuin-server-postgres/src/lib.rs b/crates/atuin-server-postgres/src/lib.rs
index d5824fec..39c25256 100644
--- a/crates/atuin-server-postgres/src/lib.rs
+++ b/crates/atuin-server-postgres/src/lib.rs
@@ -1,13 +1,14 @@
use std::collections::HashMap;
use std::ops::Range;
+use rand::Rng;
+
use async_trait::async_trait;
use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus};
use atuin_common::utils::crypto_random_string;
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
use atuin_server_database::{Database, DbError, DbResult, DbSettings};
use futures_util::TryStreamExt;
-use metrics::counter;
use sqlx::Row;
use sqlx::postgres::PgPoolOptions;
@@ -635,44 +636,36 @@ impl Database for Postgres {
const STATUS_SQL: &str =
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";
- // Use a transaction to ensure consistent reads from both tables
- let mut tx = self.pool.begin().await.map_err(fix_error)?;
+ // If IDX_CACHE_ROLLOUT is set, then we
+ // 1. Read the value of the var, use it as a % chance of using the cache
+ // 2. If we use the cache, just read from the cache table
+ // 3. If we don't use the cache, read from the store table
+ // IDX_CACHE_ROLLOUT should be between 0 and 100.
- let mut res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
- .bind(user.id)
- .fetch_all(&mut *tx)
- .await
- .map_err(fix_error)?;
- res.sort();
-
- // We're temporarily increasing latency in order to improve confidence in the cache
- // If it runs for a few days, and we confirm that cached values are equal to realtime, we
- // can replace realtime with cached.
- //
- // But let's check so sync doesn't do Weird Things.
+ let idx_cache_rollout = std::env::var("IDX_CACHE_ROLLOUT").unwrap_or("0".to_string());
+ let idx_cache_rollout = idx_cache_rollout.parse::<f64>().unwrap_or(0.0);
+ let use_idx_cache = rand::thread_rng().gen_bool(idx_cache_rollout / 100.0);
- let mut cached_res: Vec<(Uuid, String, i64)> =
+ let mut res: Vec<(Uuid, String, i64)> = if use_idx_cache {
+ tracing::debug!("using idx cache for user {}", user.id);
sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1")
.bind(user.id)
- .fetch_all(&mut *tx)
+ .fetch_all(&self.pool)
.await
- .map_err(fix_error)?;
- cached_res.sort();
+ .map_err(fix_error)?
+ } else {
+ tracing::debug!("using aggregate query for user {}", user.id);
+ sqlx::query_as(STATUS_SQL)
+ .bind(user.id)
+ .fetch_all(&self.pool)
+ .await
+ .map_err(fix_error)?
+ };
- // No need to commit a read-only transaction
+ res.sort();
let mut status = RecordStatus::new();
- let equal = res == cached_res;
-
- if equal {
- counter!("atuin_store_idx_cache_consistent", 1);
- } else {
- // log the values if we have an inconsistent cache
- tracing::debug!(user = user.username, cache_match = equal, res = ?res, cached = ?cached_res, "record store index request");
- counter!("atuin_store_idx_cache_inconsistent", 1);
- };
-
for i in res.iter() {
status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64);
}