Skip to content

Commit

Permalink
Handle dry run failure, and abort migration (tursodatabase#1163)
Browse files Browse the repository at this point in the history
* test snasphots

* add test

* handle dry run failure, and abort all tasks

* fmt
  • Loading branch information
MarinPostma authored Mar 8, 2024
1 parent b67550f commit c9ba692
Show file tree
Hide file tree
Showing 22 changed files with 649 additions and 216 deletions.
77 changes: 63 additions & 14 deletions libsql-server/src/schema/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(super) fn setup_schema(conn: &mut rusqlite::Connection) -> Result<(), Error>
schema TEXT NOT NULL,
migration TEXT NOT NULL,
status INTEGER,
error TEXT,
finished BOOLEAN GENERATED ALWAYS AS ({})
)
",
Expand All @@ -37,15 +38,22 @@ pub(super) fn setup_schema(conn: &mut rusqlite::Connection) -> Result<(), Error>
)?;
// this table contains a list of all the that need to be performed for each migration job
txn.execute(
"CREATE TABLE IF NOT EXISTS migration_job_pending_tasks (
&format!(
"CREATE TABLE IF NOT EXISTS migration_job_pending_tasks (
task_id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER,
target_namespace TEXT NOT NULL,
status INTEGER,
error TEXT,
finished BOOLEAN GENERATED ALWAYS AS ({}),
FOREIGN KEY (job_id) REFERENCES migration_jobs (job_id)
)
",
MigrationTaskStatus::finished_states()
.into_iter()
.map(|s| format!("status = {}", *s as u64))
.join(" OR ")
),
(),
)?;
// This temporary table hold the list of tasks that are currently being processed
Expand Down Expand Up @@ -190,10 +198,47 @@ pub(super) fn get_next_pending_migration_tasks_batch(
Ok(tasks)
}

/// returns a batch of tasks that are not in a finished state
pub(super) fn get_unfinished_task_batch(
conn: &mut rusqlite::Connection,
job_id: i64,
limit: usize,
) -> Result<Vec<MigrationTask>, Error> {
let txn = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
let tasks = txn
.prepare(
"SELECT task_id, target_namespace, status, job_id
FROM migration_job_pending_tasks
WHERE job_id = ? AND finished = false AND task_id NOT IN (select * from enqueued_tasks)
LIMIT ?",
)?
.query_map((job_id, limit), |row| {
let task_id = row.get::<_, i64>(0)?;
let namespace = NamespaceName::from_string(row.get::<_, String>(1)?).unwrap();
let status = MigrationTaskStatus::from_int(row.get::<_, u64>(2)?);
let job_id = row.get::<_, i64>(3)?;
Ok(MigrationTask {
namespace,
status,
job_id,
task_id,
})
})?
.map(|r| r.map_err(Into::into))
.collect::<Result<Vec<_>, Error>>()?;

for task in tasks.iter() {
txn.execute("INSERT INTO enqueued_tasks VALUES (?)", [task.task_id])?;
}

txn.commit()?;
Ok(tasks)
}

pub(super) fn update_meta_task_status(
conn: &mut rusqlite::Connection,
task: MigrationTask,
error: Option<String>,
task: &MigrationTask,
error: Option<&str>,
) -> Result<(), Error> {
assert!(error.is_none() || task.status.is_failure());
let txn = conn.transaction()?;
Expand Down Expand Up @@ -242,10 +287,11 @@ pub(super) fn update_job_status(
conn: &mut rusqlite::Connection,
job_id: i64,
status: MigrationJobStatus,
error: Option<&str>,
) -> Result<(), Error> {
conn.execute(
"UPDATE migration_jobs SET status = ? WHERE job_id = ?",
(status as u64, job_id),
"UPDATE migration_jobs SET status = ?, error = coalesce(?, error) WHERE job_id = ?",
(status as u64, error, job_id),
)?;
Ok(())
}
Expand Down Expand Up @@ -275,6 +321,7 @@ pub(super) fn get_next_pending_migration_job(
status,
migration,
progress: Default::default(),
task_error: None,
})
},
)
Expand Down Expand Up @@ -303,15 +350,16 @@ pub fn get_migration_details(
schema: NamespaceName,
job_id: u64,
) -> crate::Result<Option<MigrationDetails>> {
let Some(status) = conn
let Some((status, error)) = conn
.query_row(
"SELECT status
"SELECT status, error
FROM migration_jobs
WHERE schema = ? AND job_id = ?",
params![schema.as_str(), job_id],
|r| {
let status: Option<u64> = r.get(0)?;
Ok(status.map(MigrationJobStatus::from_int))
let status = MigrationJobStatus::from_int(r.get::<_, u64>(0)?);
let error: Option<String> = r.get(1)?;
Ok((status, error))
},
)
.optional()?
Expand Down Expand Up @@ -341,6 +389,7 @@ pub fn get_migration_details(
Ok(Some(MigrationDetails {
job_id,
status,
error,
progress,
}))
}
Expand Down Expand Up @@ -516,7 +565,7 @@ mod test {

let mut task = tasks.pop().unwrap();
*task.status_mut() = MigrationTaskStatus::Success;
update_meta_task_status(&mut conn, task, None).unwrap();
update_meta_task_status(&mut conn, &task, None).unwrap();

assert_debug_snapshot!(get_next_pending_migration_job(&mut conn).unwrap().unwrap());
}
Expand Down Expand Up @@ -565,7 +614,7 @@ mod test {
.unwrap();
for mut task in tasks {
task.status = MigrationTaskStatus::DryRunSuccess;
update_meta_task_status(&mut conn, task, None).unwrap();
update_meta_task_status(&mut conn, &task, None).unwrap();
}

let status = job_step_dry_run_success(&mut conn, job.job_id()).unwrap();
Expand Down Expand Up @@ -612,7 +661,7 @@ mod test {
)
.unwrap();

update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess).unwrap();
update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess, None).unwrap();

// job is finished, we can enqueue now
register_schema_migration_job(
Expand Down Expand Up @@ -645,7 +694,7 @@ mod test {

assert!(super::has_pending_migration_jobs(&conn, &"schema".into()).unwrap());

update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess).unwrap();
update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess, None).unwrap();
assert!(!super::has_pending_migration_jobs(&conn, &"schema".into()).unwrap());
}

Expand Down Expand Up @@ -673,7 +722,7 @@ mod test {
.await
.unwrap_err());

update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess).unwrap();
update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess, None).unwrap();

assert!(register_shared(&meta_store, "ns", "schema").await.is_ok());
}
Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/schema/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub enum Error {
MigrationError(usize, String),
#[error("migration is invalid: it contains transaction items (BEGIN, COMMIT, SAVEPOINT...) which are not allowed. The migration is already run within a transaction")]
MigrationContainsTransactionStatements,
#[error("an error occured while backing up the meta store")]
MetaStoreBackupFailure,
}

impl ResponseError for Error {}
Expand Down
14 changes: 13 additions & 1 deletion libsql-server/src/schema/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn setup_migration_table(conn: &mut rusqlite::Connection) -> Result<(), Erro
migration TEXT NOT NULL,
error TEXT,
finished BOOLEAN GENERATED ALWAYS AS ({})
)",
MigrationTaskStatus::finished_states()
.iter()
Expand Down Expand Up @@ -61,6 +60,19 @@ pub fn enqueue_migration_task(
Ok(())
}

pub fn abort_migration_task(
conn: &rusqlite::Connection,
task: &MigrationTask,
) -> Result<(), Error> {
// there is a `NOT NULL` constraint on migration, but if we are aborting a task that wasn't
// already enqueued, we need a placeholder. It's ok because we are never gonna try to run a
// failed task migration.
conn.execute("INSERT OR REPLACE INTO __libsql_migration_tasks (job_id, status, error, migration) VALUES (?, ?, ?, ?)",
(task.job_id, MigrationTaskStatus::Failure as u64, "aborted", "aborted"))?;

Ok(())
}

/// set the task status to `Run` if its current state is `DryRunSuccess`
pub fn step_migration_task_run(conn: &rusqlite::Connection, job_id: i64) -> Result<(), Error> {
conn.execute(
Expand Down
Loading

0 comments on commit c9ba692

Please sign in to comment.