about summary refs log tree commit diff stats
path: root/crates/rocie-server/src/storage/migrate/mod.rs
blob: 3fdc40092e604d5ea9252169f5edaa2515d620fe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
use std::{
    fmt::Display,
    time::{SystemTime, UNIX_EPOCH},
};

use chrono::TimeDelta;
use log::{debug, info};
use sqlx::{Sqlite, SqlitePool, Transaction, query};

use crate::app::App;

macro_rules! make_upgrade {
    ($app:expr, $old_version:expr, $new_version:expr, $sql_name:expr) => {
        let mut tx = $app
            .db
            .begin()
            .await
            .map_err(|err| update::Error::TxnStart(err))?;
        debug!("Migrating: {} -> {}", $old_version, $new_version);

        sqlx::raw_sql(include_str!($sql_name))
            .execute(&mut *tx)
            .await
            .map_err(|err| update::Error::SqlUpdate(err))?;

        set_db_version(
            &mut tx,
            if $old_version == Self::Empty {
                // There is no previous version we would need to remove
                None
            } else {
                Some($old_version)
            },
            $new_version,
        )
        .await
        .map_err(|err| update::Error::SetDbVersion {
            err,
            new_version: $new_version,
        })?;

        tx.commit()
            .await
            .map_err(|err| update::Error::TxnCommit(err))?;

        // NOTE: This is needed, so that sqlite "sees" our changes to the table
        // without having to reconnect. <2025-02-18>
        query!("VACUUM")
            .execute(&$app.db)
            .await
            .map_err(|err| update::Error::SqlVacuum(err))?;

        Box::pin($new_version.update($app))
            .await
            .map_err(|err| update::Error::NextUpdate {
                err: Box::new(err),
                new_version: $new_version,
            })?;

        Ok(())
    };
}

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub(crate) enum DbVersion {
    /// The database is not yet initialized.
    Empty,

    /// Introduced: 2025-09-02.
    One,
}
const CURRENT_VERSION: DbVersion = DbVersion::One;

async fn set_db_version(
    tx: &mut Transaction<'_, Sqlite>,
    old_version: Option<DbVersion>,
    new_version: DbVersion,
) -> Result<(), db_version_set::Error> {
    let valid_from = get_current_date();

    if let Some(old_version) = old_version {
        let valid_to = valid_from + 1;
        let old_version = old_version.as_sql_integer();

        query!(
            "UPDATE version SET valid_to = ? WHERE namespace = 'rocie' AND number = ?;",
            valid_to,
            old_version
        )
        .execute(&mut *(*tx))
        .await?;
    }

    let version = new_version.as_sql_integer();

    query!(
        "INSERT INTO version (namespace, number, valid_from, valid_to) VALUES ('rocie', ?, ?, NULL);",
        version,
        valid_from
    )
    .execute(&mut *(*tx))
    .await?;

    Ok(())
}

pub(crate) mod db_version_set {
    #[derive(thiserror::Error, Debug)]
    pub(crate) enum Error {
        #[error("Failed to perform database action: {0}")]
        DbError(#[from] sqlx::Error),
    }
}

impl DbVersion {
    fn as_sql_integer(self) -> i32 {
        match self {
            DbVersion::One => 1,

            DbVersion::Empty => unreachable!("A empty version does not have an associated integer"),
        }
    }

    fn from_db(number: i64, namespace: &str) -> Result<Self, db_version_parse::Error> {
        match (number, namespace) {
            (1, "rocie") => Ok(DbVersion::One),
            (number, namespace) => Err(db_version_parse::Error::UnkownVersion {
                namespace: namespace.to_owned(),
                number,
            }),
        }
    }

    /// Try to update the database from version [`self`] to the [`CURRENT_VERSION`].
    ///
    /// Each update is atomic, so if this function fails you are still guaranteed to have a
    /// database at version `get_version`.
    #[allow(clippy::too_many_lines)]
    async fn update(self, app: &App) -> Result<(), update::Error> {
        match self {
            Self::Empty => {
                make_upgrade! {app, Self::Empty, Self::One, "./sql/0->1.sql"}
            }

            // This is the current_version
            Self::One => {
                assert_eq!(self, CURRENT_VERSION);
                assert_eq!(self, get_version(app).await?);
                Ok(())
            }
        }
    }
}
impl Display for DbVersion {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // It is a unit only enum, thus we can simply use the Debug formatting
        <Self as std::fmt::Debug>::fmt(self, f)
    }
}
pub(crate) mod update {
    use crate::storage::migrate::{DbVersion, db_version_set, get_db_version};

    #[derive(thiserror::Error, Debug)]
    pub(crate) enum Error {
        #[error("Failed to determine final database version: {0}")]
        GetVersion(#[from] get_db_version::Error),

        #[error("Failed to set the db to version {new_version}: {err}")]
        SetDbVersion {
            err: db_version_set::Error,
            new_version: DbVersion,
        },

        #[error("Failed to vacuum sql database after update: {0}")]
        SqlVacuum(sqlx::Error),

        #[error("Failed to execute the sql update script: {0}")]
        SqlUpdate(sqlx::Error),

        #[error("Failed to start the update transaction: {0}")]
        TxnStart(sqlx::Error),
        #[error("Failed to commit the update transaction: {0}")]
        TxnCommit(sqlx::Error),

        #[error("Failed to perform the next chained update (to ver {new_version}): {err}")]
        NextUpdate {
            err: Box<Self>,
            new_version: DbVersion,
        },
    }
}
pub(crate) mod db_version_parse {
    #[derive(thiserror::Error, Debug)]
    pub(crate) enum Error {
        #[error("Db version is {number}, but got unknown namespace: '{namespace}'")]
        UnkownVersion { namespace: String, number: i64 },
    }
}

/// Returns the current data as UNIX time stamp.
fn get_current_date() -> i64 {
    let start = SystemTime::now();
    let seconds_since_epoch: TimeDelta = TimeDelta::from_std(
        start
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards"),
    )
    .expect("Time does not go backwards");

    // All database dates should be after the UNIX_EPOCH (and thus positiv)
    seconds_since_epoch.num_milliseconds()
}

/// Return the current database version.
///
/// # Panics
/// Only if internal assertions fail.
pub(crate) async fn get_version(app: &App) -> Result<DbVersion, get_db_version::Error> {
    get_version_db(&app.db).await
}

/// Return the current database version.
///
/// In contrast to the [`get_version`] function, this function does not
/// a fully instantiated [`App`], a database connection suffices.
///
/// # Panics
/// Only if internal assertions fail.
pub(crate) async fn get_version_db(pool: &SqlitePool) -> Result<DbVersion, get_db_version::Error> {
    let version_table_exists = {
        let query = query!(
            "
            SELECT 1 as result
            FROM sqlite_master
            WHERE type = 'table'
            AND name = 'version'
            "
        )
        .fetch_optional(pool)
        .await
        .map_err(|err| get_db_version::Error::VersionTableExistance(err))?;

        if let Some(output) = query {
            assert_eq!(output.result, 1);
            true
        } else {
            false
        }
    };

    if !version_table_exists {
        return Ok(DbVersion::Empty);
    }

    let current_version = query!(
        "
        SELECT namespace, number
        FROM version
        WHERE valid_to IS NULL;
        "
    )
    .fetch_one(pool)
    .await
    .map_err(|err| get_db_version::Error::VersionNumberFetch(err))?;

    Ok(DbVersion::from_db(
        current_version.number,
        current_version.namespace.as_str(),
    )?)
}

pub(crate) mod get_db_version {
    use crate::storage::migrate::db_version_parse;

    #[derive(thiserror::Error, Debug)]
    pub(crate) enum Error {
        #[error("Failed to fetch the version number from db: {0}")]
        VersionNumberFetch(sqlx::Error),

        #[error("Failed to check for existance of the `version` table: {0}")]
        VersionTableExistance(sqlx::Error),

        #[error("Failed to parse the db version: {0}")]
        VersionParse(#[from] db_version_parse::Error),
    }
}

pub(crate) async fn migrate_db(app: &App) -> Result<(), migrate_db::Error> {
    let current_version = get_version(app).await?;

    if current_version == CURRENT_VERSION {
        return Ok(());
    }

    info!("Migrate database from version '{current_version}' to version '{CURRENT_VERSION}'");

    current_version.update(app).await?;

    Ok(())
}

pub(crate) mod migrate_db {
    use crate::storage::migrate::{get_db_version, update};

    #[derive(thiserror::Error, Debug)]
    pub(crate) enum Error {
        #[error("Failed to determine the database version: {0}")]
        GetVersion(#[from] get_db_version::Error),

        #[error("Failed to update the database: {0}")]
        Upadate(#[from] update::Error),
    }
}