diff options
Diffstat (limited to 'crates/rocie-server/src/storage/sql/insert/mod.rs')
| -rw-r--r-- | crates/rocie-server/src/storage/sql/insert/mod.rs | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/crates/rocie-server/src/storage/sql/insert/mod.rs b/crates/rocie-server/src/storage/sql/insert/mod.rs new file mode 100644 index 0000000..99f1e71 --- /dev/null +++ b/crates/rocie-server/src/storage/sql/insert/mod.rs @@ -0,0 +1,144 @@ +use std::{fmt::Display, mem}; + +use crate::app::App; + +use chrono::Utc; +use log::{debug, trace}; +use serde::{Serialize, de::DeserializeOwned}; +use sqlx::{SqliteConnection, query}; + +pub(crate) mod product; + +pub(crate) trait Transactionable: + Sized + std::fmt::Debug + Serialize + DeserializeOwned +{ + type ApplyError: std::error::Error + Display; + type UndoError: std::error::Error + Display; + + /// Apply this transaction. + /// + /// This should change the db state. + async fn apply(self, txn: &mut SqliteConnection) -> Result<(), Self::ApplyError>; + + /// Undo this transaction. + /// + /// This should return the db to the state it was in before this transaction. + async fn undo(self, txn: &mut SqliteConnection) -> Result<(), Self::UndoError>; +} + +#[derive(Debug)] +pub(crate) struct Operations<O: Transactionable> { + name: &'static str, + ops: Vec<O>, +} + +impl<O: Transactionable> Default for Operations<O> { + fn default() -> Self { + Self::new("<default impl>") + } +} + +impl<O: Transactionable> Operations<O> { + #[must_use] + pub(crate) fn new(name: &'static str) -> Self { + Self { + name, + ops: Vec::new(), + } + } + + pub(crate) async fn apply(mut self, app: &App) -> Result<(), apply::Error<O>> { + let ops = mem::take(&mut self.ops); + + if ops.is_empty() { + return Ok(()); + } + + trace!("Begin commit of {}", self.name); + let mut txn = app.db.begin().await?; + + for op in ops { + trace!("Commiting operation: {op:?}"); + add_operation_to_txn_log(&op, &mut txn).await?; + op.apply(&mut txn) + .await + .map_err(|err| apply::Error::InnerApply(err))?; + } + + txn.commit().await?; + trace!("End commit of {}", self.name); + + Ok(()) + } + + pub(crate) fn push(&mut self, op: O) { + self.ops.push(op); + } +} + +pub(crate) mod apply { + use actix_web::ResponseError; + + use crate::storage::sql::insert::{Transactionable, add_operations_to_txn_log}; + + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error<O: Transactionable> { + #[error("Failed to execute sql statments")] + Sql(#[from] sqlx::Error), + + #[error("Failed to append operations to the txn log: {0}")] + TxnLogAppend(#[from] add_operations_to_txn_log::Error), + + #[error("Failed to apply one of the operations: {0}")] + InnerApply(<O as Transactionable>::ApplyError), + } + + impl<O: Transactionable> ResponseError for Error<O> { + // TODO(@bpeetz): Actually do something with this. <2025-09-05> + } +} + +impl<O: Transactionable> Drop for Operations<O> { + fn drop(&mut self) { + assert!( + self.ops.is_empty(), + "Trying to drop uncommitted operations (name: {}) ({:#?}). This is a bug.", + self.name, + self.ops + ); + } +} + +async fn add_operation_to_txn_log<O: Transactionable>( + operation: &O, + txn: &mut SqliteConnection, +) -> Result<(), add_operations_to_txn_log::Error> { + debug!("Adding operation to txn log: {operation:?}"); + + let now = Utc::now().timestamp(); + let operation = serde_json::to_string(&operation).expect("should be serializable"); + + query!( + r#" + INSERT INTO txn_log ( + timestamp, + operation + ) + VALUES (?, ?); + "#, + now, + operation, + ) + .execute(txn) + .await?; + + Ok(()) +} + +pub(crate) mod add_operations_to_txn_log { + #[derive(thiserror::Error, Debug)] + pub(crate) enum Error { + #[error("Failed to execute sql statments")] + SqlError(#[from] sqlx::Error), + } +} |
