From 8644fe21112747335f85af62e53b1da0c714d290 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 26 Jun 2025 15:06:48 +0200 Subject: [PATCH] WIP: Replace key with subkey. --- crates/core/src/migrations.rs | 31 ++++++++++++- crates/core/src/sync/operations.rs | 34 +++++++++----- dart/test/utils/migration_fixtures.dart | 62 ++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 13 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 676fcd8..9aaa16b 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -12,7 +12,7 @@ use crate::error::{PSResult, SQLiteError}; use crate::fix_data::apply_v035_fix; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 10; +pub const LATEST_VERSION: i32 = 11; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -386,5 +386,34 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( .into_db_result(local_db)?; } + if current_version < 11 && target_version >= 11 { + let stmt = "\ +CREATE TABLE ps_oplog_new( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT NOT NULL, + row_id TEXT NOT NULL, + subkey TEXT NOT NULL, + data TEXT, + hash INTEGER NOT NULL) STRICT; + +INSERT INTO ps_oplog_new (bucket, op_id, row_type, row_id, subkey, data, hash) +SELECT bucket, op_id, row_type, row_id, null, data, hash FROM ps_oplog; + +DROP TABLE ps_oplog; +ALTER TABLE ps_oplog_new RENAME TO ps_oplog; + +CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id); +CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id); +CREATE INDEX ps_oplog_key ON ps_oplog (bucket, row_type, row_id, subkey); + +INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') +)); +"; + + local_db.exec_safe(stmt).into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index 29c23f3..7b90a48 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -38,14 +38,16 @@ pub fn insert_bucket_operations( "\ DELETE FROM ps_oplog WHERE unlikely(ps_oplog.bucket = ?1) - AND ps_oplog.key = ?2 + AND ps_oplog.row_type = ?2 + AND ps_oplog.row_id = ?3 + AND ps_oplog.subkey = ?4 RETURNING op_id, hash", )?; supersede_statement.bind_int64(1, bucket_id)?; // language=SQLite let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; +INSERT INTO ps_oplog(bucket, op_id, subkey, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; insert_statement.bind_int64(1, bucket_id)?; let updated_row_statement = db.prepare_v2( @@ -70,15 +72,25 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", added_ops += 1; if op == OpType::PUT || op == OpType::REMOVE { - let key: String; - if let (Some(object_type), Some(object_id)) = (object_type, object_id) { - let subkey = line.subkey.as_ref().map(|i| &**i).unwrap_or("null"); - key = format!("{}/{}/{}", &object_type, &object_id, subkey); + let subkey = line.subkey.as_ref().map(|i| &**i); + + if let Some(subkey) = subkey { + supersede_statement.bind_text(4, &subkey, sqlite::Destructor::STATIC)?; } else { - key = String::from(""); + supersede_statement.bind_text(4, "", sqlite::Destructor::STATIC)?; } - supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; + if let Some(object_type) = object_type { + supersede_statement.bind_text(2, &object_type, sqlite::Destructor::STATIC)?; + } else { + supersede_statement.bind_text(2, "", sqlite::Destructor::STATIC)?; + } + + if let Some(object_id) = object_id { + supersede_statement.bind_text(3, &object_id, sqlite::Destructor::STATIC)?; + } else { + supersede_statement.bind_text(3, "", sqlite::Destructor::STATIC)?; + } let mut superseded = false; @@ -124,10 +136,10 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", } insert_statement.bind_int64(2, op_id)?; - if key != "" { - insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; + if let Some(subkey) = subkey { + insert_statement.bind_text(3, &subkey, sqlite::Destructor::STATIC)?; } else { - insert_statement.bind_null(3)?; + insert_statement.bind_text(3, "", sqlite::Destructor::STATIC)?; } if let (Some(object_type), Some(object_id)) = (object_type, object_id) { diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index ce8b2b8..56c4cf5 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 10; +const databaseVersion = 11; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -355,6 +355,54 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') ''', + 11: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE "ps_oplog"( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT NOT NULL, + row_id TEXT NOT NULL, + subkey TEXT NOT NULL, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, row_type, row_id, subkey) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') +''' }; final finalState = expectedState[databaseVersion]!; @@ -456,6 +504,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 11: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, subkey, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -501,6 +560,7 @@ final dataDown1 = { 7: data1[5]!, 8: data1[5]!, 9: data1[9]!, + 10: data1[10]!, }; final finalData1 = data1[databaseVersion]!;