use std::{fmt::Display, mem}; use crate::app::App; use chrono::Utc; use log::{debug, trace}; use serde::{Serialize, de::DeserializeOwned}; use sqlx::{SqliteConnection, query}; pub(crate) mod barcode; pub(crate) mod product; pub(crate) mod unit; pub(crate) trait Transactionable: Sized + std::fmt::Debug + Serialize + DeserializeOwned { type ApplyError: std::error::Error + Display; type UndoError: std::error::Error + Display; /// Apply this transaction. /// /// This should change the db state. async fn apply(self, txn: &mut SqliteConnection) -> Result<(), Self::ApplyError>; /// Undo this transaction. /// /// This should return the db to the state it was in before this transaction. async fn undo(self, txn: &mut SqliteConnection) -> Result<(), Self::UndoError>; } #[derive(Debug)] pub(crate) struct Operations { name: &'static str, ops: Vec, } impl Default for Operations { fn default() -> Self { Self::new("") } } impl Operations { #[must_use] pub(crate) fn new(name: &'static str) -> Self { Self { name, ops: Vec::new(), } } pub(crate) async fn apply(mut self, app: &App) -> Result<(), apply::Error> { let ops = mem::take(&mut self.ops); if ops.is_empty() { return Ok(()); } trace!("Begin commit of {}", self.name); let mut txn = app.db.begin().await?; for op in ops { trace!("Commiting operation: {op:?}"); add_operation_to_txn_log(&op, &mut txn).await?; op.apply(&mut txn) .await .map_err(|err| apply::Error::InnerApply(err))?; } txn.commit().await?; trace!("End commit of {}", self.name); Ok(()) } pub(crate) fn push(&mut self, op: O) { self.ops.push(op); } } pub(crate) mod apply { use actix_web::{ResponseError, http::header::HeaderValue}; use log::error; use crate::storage::sql::insert::{Transactionable, add_operations_to_txn_log}; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { #[error("Failed to execute sql statments")] Sql(#[from] sqlx::Error), #[error("Failed to append operations to the txn log: {0}")] TxnLogAppend(#[from] add_operations_to_txn_log::Error), #[error("Failed to apply one of the operations: {0}")] InnerApply(::ApplyError), } impl ResponseError for Error { fn status_code(&self) -> actix_web::http::StatusCode { actix_web::http::StatusCode::INTERNAL_SERVER_ERROR } fn error_response(&self) -> actix_web::HttpResponse { error!("Emmiting `INTERNAL_SERVER_ERROR`: {self}"); let mut res = actix_web::HttpResponse::new(self.status_code()).set_body(self.to_string()); let mime = actix_web::mime::TEXT_PLAIN_UTF_8; res.headers_mut().insert( actix_web::http::header::CONTENT_TYPE, HeaderValue::from_str(mime.to_string().as_str()).expect("Hard-coded conversion"), ); res.set_body(actix_web::body::BoxBody::new(self.to_string())) } } } impl Drop for Operations { fn drop(&mut self) { assert!( self.ops.is_empty(), "Trying to drop uncommitted operations (name: {}) ({:#?}). This is a bug.", self.name, self.ops ); } } async fn add_operation_to_txn_log( operation: &O, txn: &mut SqliteConnection, ) -> Result<(), add_operations_to_txn_log::Error> { debug!("Adding operation to txn log: {operation:?}"); let now = Utc::now().timestamp(); let operation = serde_json::to_string(&operation).expect("should be serializable"); query!( r#" INSERT INTO txn_log ( timestamp, operation ) VALUES (?, ?); "#, now, operation, ) .execute(txn) .await?; Ok(()) } pub(crate) mod add_operations_to_txn_log { #[derive(thiserror::Error, Debug)] pub(crate) enum Error { #[error("Failed to execute sql statments")] SqlError(#[from] sqlx::Error), } }