diff options
Diffstat (limited to 'crates/rocie-server/src/storage/migrate/mod.rs')
| -rw-r--r-- | crates/rocie-server/src/storage/migrate/mod.rs | 313 |
1 files changed, 313 insertions, 0 deletions
diff --git a/crates/rocie-server/src/storage/migrate/mod.rs b/crates/rocie-server/src/storage/migrate/mod.rs new file mode 100644 index 0000000..3fdc400 --- /dev/null +++ b/crates/rocie-server/src/storage/migrate/mod.rs @@ -0,0 +1,313 @@ +use std::{ + fmt::Display, + time::{SystemTime, UNIX_EPOCH}, +}; + +use chrono::TimeDelta; +use log::{debug, info}; +use sqlx::{Sqlite, SqlitePool, Transaction, query}; + +use crate::app::App; + +macro_rules! make_upgrade { + ($app:expr, $old_version:expr, $new_version:expr, $sql_name:expr) => { + let mut tx = $app + .db + .begin() + .await + .map_err(|err| update::Error::TxnStart(err))?; + debug!("Migrating: {} -> {}", $old_version, $new_version); + + sqlx::raw_sql(include_str!($sql_name)) + .execute(&mut *tx) + .await + .map_err(|err| update::Error::SqlUpdate(err))?; + + set_db_version( + &mut tx, + if $old_version == Self::Empty { + // There is no previous version we would need to remove + None + } else { + Some($old_version) + }, + $new_version, + ) + .await + .map_err(|err| update::Error::SetDbVersion { + err, + new_version: $new_version, + })?; + + tx.commit() + .await + .map_err(|err| update::Error::TxnCommit(err))?; + + // NOTE: This is needed, so that sqlite "sees" our changes to the table + // without having to reconnect. <2025-02-18> + query!("VACUUM") + .execute(&$app.db) + .await + .map_err(|err| update::Error::SqlVacuum(err))?; + + Box::pin($new_version.update($app)) + .await + .map_err(|err| update::Error::NextUpdate { + err: Box::new(err), + new_version: $new_version, + })?; + + Ok(()) + }; +} + +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +pub(crate) enum DbVersion { + /// The database is not yet initialized. + Empty, + + /// Introduced: 2025-09-02. + One, +} +const CURRENT_VERSION: DbVersion = DbVersion::One; + +async fn set_db_version( + tx: &mut Transaction<'_, Sqlite>, + old_version: Option<DbVersion>, + new_version: DbVersion, +) -> Result<(), db_version_set::Error> { + let valid_from = get_current_date(); + + if let Some(old_version) = old_version { + let valid_to = valid_from + 1; + let old_version = old_version.as_sql_integer(); + + query!( + "UPDATE version SET valid_to = ? WHERE namespace = 'rocie' AND number = ?;", + valid_to, + old_version + ) + .execute(&mut *(*tx)) + .await?; + } + + let version = new_version.as_sql_integer(); + + query!( + "INSERT INTO version (namespace, number, valid_from, valid_to) VALUES ('rocie', ?, ?, NULL);", + version, + valid_from + ) + .execute(&mut *(*tx)) + .await?; + + Ok(()) +} + +pub(crate) mod db_version_set { + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Failed to perform database action: {0}")] + DbError(#[from] sqlx::Error), + } +} + +impl DbVersion { + fn as_sql_integer(self) -> i32 { + match self { + DbVersion::One => 1, + + DbVersion::Empty => unreachable!("A empty version does not have an associated integer"), + } + } + + fn from_db(number: i64, namespace: &str) -> Result<Self, db_version_parse::Error> { + match (number, namespace) { + (1, "rocie") => Ok(DbVersion::One), + (number, namespace) => Err(db_version_parse::Error::UnkownVersion { + namespace: namespace.to_owned(), + number, + }), + } + } + + /// Try to update the database from version [`self`] to the [`CURRENT_VERSION`]. + /// + /// Each update is atomic, so if this function fails you are still guaranteed to have a + /// database at version `get_version`. + #[allow(clippy::too_many_lines)] + async fn update(self, app: &App) -> Result<(), update::Error> { + match self { + Self::Empty => { + make_upgrade! {app, Self::Empty, Self::One, "./sql/0->1.sql"} + } + + // This is the current_version + Self::One => { + assert_eq!(self, CURRENT_VERSION); + assert_eq!(self, get_version(app).await?); + Ok(()) + } + } + } +} +impl Display for DbVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // It is a unit only enum, thus we can simply use the Debug formatting + <Self as std::fmt::Debug>::fmt(self, f) + } +} +pub(crate) mod update { + use crate::storage::migrate::{DbVersion, db_version_set, get_db_version}; + + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Failed to determine final database version: {0}")] + GetVersion(#[from] get_db_version::Error), + + #[error("Failed to set the db to version {new_version}: {err}")] + SetDbVersion { + err: db_version_set::Error, + new_version: DbVersion, + }, + + #[error("Failed to vacuum sql database after update: {0}")] + SqlVacuum(sqlx::Error), + + #[error("Failed to execute the sql update script: {0}")] + SqlUpdate(sqlx::Error), + + #[error("Failed to start the update transaction: {0}")] + TxnStart(sqlx::Error), + #[error("Failed to commit the update transaction: {0}")] + TxnCommit(sqlx::Error), + + #[error("Failed to perform the next chained update (to ver {new_version}): {err}")] + NextUpdate { + err: Box<Self>, + new_version: DbVersion, + }, + } +} +pub(crate) mod db_version_parse { + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Db version is {number}, but got unknown namespace: '{namespace}'")] + UnkownVersion { namespace: String, number: i64 }, + } +} + +/// Returns the current data as UNIX time stamp. +fn get_current_date() -> i64 { + let start = SystemTime::now(); + let seconds_since_epoch: TimeDelta = TimeDelta::from_std( + start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"), + ) + .expect("Time does not go backwards"); + + // All database dates should be after the UNIX_EPOCH (and thus positiv) + seconds_since_epoch.num_milliseconds() +} + +/// Return the current database version. +/// +/// # Panics +/// Only if internal assertions fail. +pub(crate) async fn get_version(app: &App) -> Result<DbVersion, get_db_version::Error> { + get_version_db(&app.db).await +} + +/// Return the current database version. +/// +/// In contrast to the [`get_version`] function, this function does not +/// a fully instantiated [`App`], a database connection suffices. +/// +/// # Panics +/// Only if internal assertions fail. +pub(crate) async fn get_version_db(pool: &SqlitePool) -> Result<DbVersion, get_db_version::Error> { + let version_table_exists = { + let query = query!( + " + SELECT 1 as result + FROM sqlite_master + WHERE type = 'table' + AND name = 'version' + " + ) + .fetch_optional(pool) + .await + .map_err(|err| get_db_version::Error::VersionTableExistance(err))?; + + if let Some(output) = query { + assert_eq!(output.result, 1); + true + } else { + false + } + }; + + if !version_table_exists { + return Ok(DbVersion::Empty); + } + + let current_version = query!( + " + SELECT namespace, number + FROM version + WHERE valid_to IS NULL; + " + ) + .fetch_one(pool) + .await + .map_err(|err| get_db_version::Error::VersionNumberFetch(err))?; + + Ok(DbVersion::from_db( + current_version.number, + current_version.namespace.as_str(), + )?) +} + +pub(crate) mod get_db_version { + use crate::storage::migrate::db_version_parse; + + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Failed to fetch the version number from db: {0}")] + VersionNumberFetch(sqlx::Error), + + #[error("Failed to check for existance of the `version` table: {0}")] + VersionTableExistance(sqlx::Error), + + #[error("Failed to parse the db version: {0}")] + VersionParse(#[from] db_version_parse::Error), + } +} + +pub(crate) async fn migrate_db(app: &App) -> Result<(), migrate_db::Error> { + let current_version = get_version(app).await?; + + if current_version == CURRENT_VERSION { + return Ok(()); + } + + info!("Migrate database from version '{current_version}' to version '{CURRENT_VERSION}'"); + + current_version.update(app).await?; + + Ok(()) +} + +pub(crate) mod migrate_db { + use crate::storage::migrate::{get_db_version, update}; + + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Failed to determine the database version: {0}")] + GetVersion(#[from] get_db_version::Error), + + #[error("Failed to update the database: {0}")] + Upadate(#[from] update::Error), + } +} |
