From c0b508beb74b8b37f80907b167eb932688f77341 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 19 Jun 2025 16:23:24 +0200 Subject: [PATCH 01/18] Start adding support for sync streams --- crates/core/src/migrations.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 440548d..fd54df1 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -384,5 +384,24 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( .into_db_result(local_db)?; } + if current_version < 11 && target_version >= 11 { + let stmt = "\ +CREATE TABLE ps_streams ( + id NOT NULL INTEGER PRIMARY KEY, + definition_name TEXT NOT NULL, + is_default INTEGER NOT NULL, + local_priority INTEGER, + local_params TEXT +) STRICT; +ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id); + +INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array( +json_object('sql', 'todo down migration'), +json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10') +)); +"; + local_db.exec_safe(stmt)?; + } + Ok(()) } From 5b157b1af0785069500d4867170134089d7dc84c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 14:56:17 -0600 Subject: [PATCH 02/18] Add protocol changes --- crates/core/src/migrations.rs | 7 +++-- crates/core/src/sync/interface.rs | 17 +++++++++++ crates/core/src/sync/line.rs | 49 +++++++++++++++++++++++++++++++ crates/core/src/util.rs | 28 ++++++------------ 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index fd54df1..d609382 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -386,12 +386,13 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( if current_version < 11 && target_version >= 11 { let stmt = "\ -CREATE TABLE ps_streams ( +CREATE TABLE ps_stream_subscriptions ( id NOT NULL INTEGER PRIMARY KEY, - definition_name TEXT NOT NULL, + stream_name TEXT NOT NULL, is_default INTEGER NOT NULL, local_priority INTEGER, - local_params TEXT + local_params TEXT, + ttl INTEGER ) STRICT; ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id); diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 65245cf..a10c4e4 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -14,6 +14,7 @@ use sqlite_nostd::{Connection, Context}; use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; +use crate::util::serialize_i64_to_string; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -108,6 +109,22 @@ pub struct StreamingSyncRequest { pub parameters: Option>, } +#[derive(Serialize)] +pub struct StreamSubscriptionRequest { + pub include_defaults: bool, + pub subscriptions: Vec, +} + +#[derive(Serialize)] +pub struct RequestedStreamSubscription { + /// The name of the sync stream to subscribe to. + pub stream: String, + /// Parameters to make available in the stream's definition. + pub parameters: Box, + #[serde(serialize_with = "serialize_i64_to_string")] + pub client_id: i64, +} + #[derive(Serialize)] pub struct BucketRequest { pub name: String, diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index 8f73078..54d04ed 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -119,11 +119,60 @@ pub struct BucketChecksum<'a> { pub priority: Option, #[serde(default)] pub count: Option, + #[serde( + default, + deserialize_with = "BucketChecksum::deserialize_subscriptions" + )] + pub subscriptions: Option>, // #[serde(default)] // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] // pub last_op_id: Option, } +impl BucketChecksum<'_> { + fn deserialize_subscriptions<'de, D: serde::Deserializer<'de>>( + deserializer: D, + ) -> Result>, D::Error> { + struct MyVisitor; + + impl<'de> Visitor<'de> for MyVisitor { + type Value = Option>; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "optional list of subscriptions") + } + + fn visit_none(self) -> Result + where + E: serde::de::Error, + { + Ok(None) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_seq(self) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut result: Vec = Vec::new(); + while let Some(element) = seq.next_element::<&'de str>()? { + result.push(element.parse::().map_err(serde::de::Error::custom)?); + } + + Ok(Some(result)) + } + } + + deserializer.deserialize_option(MyVisitor) + } +} + #[derive(Deserialize, Debug)] pub struct DataLine<'a> { #[serde(borrow)] diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index 5e77768..cd9ce4d 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -36,29 +36,19 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } +pub fn serialize_i64_to_string<'de, S: serde::Serializer>( + value: &i64, + serializer: S, +) -> Result { + serializer.collect_str(value) +} + pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - struct ValueVisitor; - - impl<'de> Visitor<'de> for ValueVisitor { - type Value = i64; - - fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { - formatter.write_str("a string representation of a number") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - v.parse::().map_err(serde::de::Error::custom) - } - } - - // Using a custom visitor here to avoid an intermediate string allocation - deserializer.deserialize_str(ValueVisitor) + let value: &'de str = serde::Deserialize::deserialize(deserializer)?; + value.parse::().map_err(serde::de::Error::custom) } pub fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result, D::Error> From b208e1837ef8bca2ff505875611949118f4e9cdd Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 16:19:46 -0600 Subject: [PATCH 03/18] Use serde_with --- Cargo.lock | 144 ++++++++++++++++++++++++++++++ crates/core/Cargo.toml | 1 + crates/core/src/sync/interface.rs | 5 +- crates/core/src/sync/line.rs | 68 +++----------- crates/core/src/util.rs | 47 ---------- 5 files changed, 160 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bfd084..1b0bb49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bindgen" version = "0.72.0" @@ -73,6 +79,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -104,12 +120,62 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.100", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures-core" version = "0.3.31" @@ -143,6 +209,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "itertools" version = "0.13.0" @@ -202,6 +280,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.3.3" @@ -239,6 +323,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "powersync_core" version = "0.4.2" @@ -251,6 +341,7 @@ dependencies = [ "rustc-hash", "serde", "serde_json", + "serde_with", "sqlite_nostd", "thiserror", "uuid", @@ -381,6 +472,34 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5" +dependencies = [ + "base64", + "chrono", + "hex", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "shlex" version = "1.1.0" @@ -418,6 +537,12 @@ dependencies = [ "sqlite3_capi", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "1.0.109" @@ -460,6 +585,25 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "num-conv", + "powerfmt", + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + [[package]] name = "unicode-ident" version = "1.0.11" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index af3f330..f1840a6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -23,6 +23,7 @@ const_format = "0.2.34" futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] } rustc-hash = { version = "2.1", default-features = false } thiserror = { version = "2", default-features = false } +serde_with = { version = "3.14.0", default-features = false, features = ["alloc", "macros"] } [dependencies.uuid] version = "1.4.1" diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index a10c4e4..fff3861 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -7,6 +7,7 @@ use alloc::rc::Rc; use alloc::sync::Arc; use alloc::{string::String, vec::Vec}; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use sqlite::{ResultCode, Value}; use sqlite_nostd::{self as sqlite, ColumnType}; use sqlite_nostd::{Connection, Context}; @@ -14,7 +15,6 @@ use sqlite_nostd::{Connection, Context}; use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; -use crate::util::serialize_i64_to_string; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -115,13 +115,14 @@ pub struct StreamSubscriptionRequest { pub subscriptions: Vec, } +#[serde_as] #[derive(Serialize)] pub struct RequestedStreamSubscription { /// The name of the sync stream to subscribe to. pub stream: String, /// Parameters to make available in the stream's definition. pub parameters: Box, - #[serde(serialize_with = "serialize_i64_to_string")] + #[serde_as(as = "DisplayFromStr")] pub client_id: i64, } diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index 54d04ed..6a5a7f2 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -2,8 +2,7 @@ use alloc::borrow::Cow; use alloc::vec::Vec; use serde::de::{IgnoredAny, VariantAccess, Visitor}; use serde::Deserialize; - -use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; +use serde_with::{serde_as, DisplayFromStr}; use super::bucket_priority::BucketPriority; use super::Checksum; @@ -73,27 +72,28 @@ impl<'de> Deserialize<'de> for SyncLine<'de> { } } +#[serde_as] #[derive(Deserialize, Debug)] pub struct Checkpoint<'a> { - #[serde(deserialize_with = "deserialize_string_to_i64")] + #[serde_as(as = "DisplayFromStr")] pub last_op_id: i64, #[serde(default)] - #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + #[serde_as(as = "Option")] pub write_checkpoint: Option, #[serde(borrow)] pub buckets: Vec>, } +#[serde_as] #[derive(Deserialize, Debug)] pub struct CheckpointDiff<'a> { - #[serde(deserialize_with = "deserialize_string_to_i64")] + #[serde_as(as = "DisplayFromStr")] pub last_op_id: i64, #[serde(borrow)] pub updated_buckets: Vec>, #[serde(borrow)] pub removed_buckets: Vec>, - #[serde(default)] - #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + #[serde_as(as = "Option")] pub write_checkpoint: Option, } @@ -110,6 +110,7 @@ pub struct CheckpointPartiallyComplete { pub priority: BucketPriority, } +#[serde_as] #[derive(Deserialize, Debug)] pub struct BucketChecksum<'a> { #[serde(borrow)] @@ -119,60 +120,14 @@ pub struct BucketChecksum<'a> { pub priority: Option, #[serde(default)] pub count: Option, - #[serde( - default, - deserialize_with = "BucketChecksum::deserialize_subscriptions" - )] + #[serde_as(as = "Option>")] + #[serde(default)] pub subscriptions: Option>, // #[serde(default)] // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] // pub last_op_id: Option, } -impl BucketChecksum<'_> { - fn deserialize_subscriptions<'de, D: serde::Deserializer<'de>>( - deserializer: D, - ) -> Result>, D::Error> { - struct MyVisitor; - - impl<'de> Visitor<'de> for MyVisitor { - type Value = Option>; - - fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { - write!(formatter, "optional list of subscriptions") - } - - fn visit_none(self) -> Result - where - E: serde::de::Error, - { - Ok(None) - } - - fn visit_some(self, deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - deserializer.deserialize_seq(self) - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut result: Vec = Vec::new(); - while let Some(element) = seq.next_element::<&'de str>()? { - result.push(element.parse::().map_err(serde::de::Error::custom)?); - } - - Ok(Some(result)) - } - } - - deserializer.deserialize_option(MyVisitor) - } -} - #[derive(Deserialize, Debug)] pub struct DataLine<'a> { #[serde(borrow)] @@ -186,10 +141,11 @@ pub struct DataLine<'a> { // pub next_after: Option>, } +#[serde_as] #[derive(Deserialize, Debug)] pub struct OplogEntry<'a> { pub checksum: Checksum, - #[serde(deserialize_with = "deserialize_string_to_i64")] + #[serde_as(as = "DisplayFromStr")] pub op_id: i64, pub op: OpType, #[serde(default, borrow)] diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index cd9ce4d..a46b968 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -5,7 +5,6 @@ use alloc::string::String; #[cfg(not(feature = "getrandom"))] use crate::sqlite; -use serde::de::Visitor; use uuid::Uuid; @@ -36,52 +35,6 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } -pub fn serialize_i64_to_string<'de, S: serde::Serializer>( - value: &i64, - serializer: S, -) -> Result { - serializer.collect_str(value) -} - -pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result -where - D: serde::Deserializer<'de>, -{ - let value: &'de str = serde::Deserialize::deserialize(deserializer)?; - value.parse::().map_err(serde::de::Error::custom) -} - -pub fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result, D::Error> -where - D: serde::Deserializer<'de>, -{ - struct ValueVisitor; - - impl<'de> Visitor<'de> for ValueVisitor { - type Value = Option; - - fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { - formatter.write_str("a string or null") - } - - fn visit_none(self) -> Result - where - E: serde::de::Error, - { - Ok(None) - } - - fn visit_some(self, deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - Ok(Some(deserialize_string_to_i64(deserializer)?)) - } - } - - deserializer.deserialize_option(ValueVisitor) -} - // Use getrandom crate to generate UUID. // This is not available in all WASM builds - use the default in those cases. #[cfg(feature = "getrandom")] From 50bedcdbb57bd0adc7397fdb9f521ef9f129a2ca Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 11:33:36 -0600 Subject: [PATCH 04/18] Start with subscription logic --- crates/core/src/sync/mod.rs | 1 + crates/core/src/sync/subscriptions.rs | 30 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 crates/core/src/sync/subscriptions.rs diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index 2a28044..f43a2a5 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -9,6 +9,7 @@ pub mod line; pub mod operations; pub mod storage_adapter; mod streaming_sync; +mod subscriptions; mod sync_status; pub use bucket_priority::BucketPriority; diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs new file mode 100644 index 0000000..36e78ed --- /dev/null +++ b/crates/core/src/sync/subscriptions.rs @@ -0,0 +1,30 @@ +use core::time::Duration; + +use alloc::string::String; +use serde::Deserialize; +use serde_with::{serde_as, DurationSeconds}; + +use crate::sync::BucketPriority; + +/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client. +#[derive(Deserialize)] +pub enum SubscriptionChangeRequest { + Subscribe(SubscribeToStream), +} + +#[serde_as] +#[derive(Deserialize)] +pub struct SubscribeToStream { + pub stream: String, + pub params: Option, + #[serde_as(as = "Option")] + pub ttl: Option, + pub priority: Option, +} + +#[derive(Deserialize)] +pub struct UnsubscribeFromStream { + pub stream: String, + pub params: Option, + pub immediate: bool, +} From a8d1d56c25091261bfca38ce2e92dcc58e070930 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Jul 2025 13:00:23 -0600 Subject: [PATCH 05/18] Start tracking subscriptions --- crates/core/src/migrations.rs | 6 +++--- crates/core/src/sync/interface.rs | 21 ++++++++++++++++++++- crates/core/src/sync/line.rs | 4 ++-- crates/core/src/sync/storage_adapter.rs | 17 ++++++++++++++++- crates/core/src/sync/streaming_sync.rs | 3 +++ crates/core/src/sync/subscriptions.rs | 6 +++--- 6 files changed, 47 insertions(+), 10 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index d609382..9a37ccf 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -394,11 +394,11 @@ CREATE TABLE ps_stream_subscriptions ( local_params TEXT, ttl INTEGER ) STRICT; -ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id); +ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]'; -INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array( +INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( json_object('sql', 'todo down migration'), -json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10') +json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') )); "; local_db.exec_safe(stmt)?; diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index fff3861..adbd8f3 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -20,13 +20,31 @@ use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; /// Payload provided by SDKs when requesting a sync iteration. -#[derive(Default, Deserialize)] +#[derive(Deserialize)] pub struct StartSyncStream { /// Bucket parameters to include in the request when opening a sync stream. #[serde(default)] pub parameters: Option>, #[serde(default)] pub schema: Schema, + #[serde(default = "StartSyncStream::include_defaults_by_default")] + pub include_defaults: bool, +} + +impl StartSyncStream { + pub const fn include_defaults_by_default() -> bool { + true + } +} + +impl Default for StartSyncStream { + fn default() -> Self { + Self { + parameters: Default::default(), + schema: Default::default(), + include_defaults: Self::include_defaults_by_default(), + } + } } /// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation. @@ -107,6 +125,7 @@ pub struct StreamingSyncRequest { pub binary_data: bool, pub client_id: String, pub parameters: Option>, + pub streams: StreamSubscriptionRequest, } #[derive(Serialize)] diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index 6a5a7f2..f40329a 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -120,9 +120,9 @@ pub struct BucketChecksum<'a> { pub priority: Option, #[serde(default)] pub count: Option, - #[serde_as(as = "Option>")] + #[serde_as(as = "Vec>")] #[serde(default)] - pub subscriptions: Option>, + pub subscriptions: Vec>, // #[serde(default)] // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] // pub last_op_id: Option, diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index a68d891..6bed18e 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -10,7 +10,10 @@ use crate::{ operations::delete_bucket, schema::Schema, state::DatabaseState, - sync::checkpoint::{validate_checkpoint, ChecksumMismatch}, + sync::{ + checkpoint::{validate_checkpoint, ChecksumMismatch}, + interface::{RequestedStreamSubscription, StreamSubscriptionRequest}, + }, sync_local::{PartialSyncOperation, SyncOperation}, }; @@ -239,6 +242,18 @@ impl StorageAdapter { } } + pub fn collect_subscription_requests( + &self, + include_defaults: bool, + ) -> Result { + let mut subscriptions: Vec = Vec::new(); + + Ok(StreamSubscriptionRequest { + include_defaults, + subscriptions, + }) + } + pub fn now(&self) -> Result { self.time_stmt.step()?; let res = Timestamp(self.time_stmt.column_int64(0)); diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index a344cb6..a633af5 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -589,6 +589,9 @@ impl StreamingSyncIteration { binary_data: true, client_id: client_id(self.db)?, parameters: self.options.parameters.take(), + streams: self + .adapter + .collect_subscription_requests(self.options.include_defaults)?, }; event diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 36e78ed..d254200 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -1,6 +1,6 @@ use core::time::Duration; -use alloc::string::String; +use alloc::{boxed::Box, string::String}; use serde::Deserialize; use serde_with::{serde_as, DurationSeconds}; @@ -16,7 +16,7 @@ pub enum SubscriptionChangeRequest { #[derive(Deserialize)] pub struct SubscribeToStream { pub stream: String, - pub params: Option, + pub params: Option>, #[serde_as(as = "Option")] pub ttl: Option, pub priority: Option, @@ -25,6 +25,6 @@ pub struct SubscribeToStream { #[derive(Deserialize)] pub struct UnsubscribeFromStream { pub stream: String, - pub params: Option, + pub params: Option>, pub immediate: bool, } From d3d3e42ca898b19a75ba34b35f83d3bd87e354c9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Jul 2025 21:27:33 -0600 Subject: [PATCH 06/18] Track subscriptions --- crates/core/src/migrations.rs | 7 +- crates/core/src/sync/checkpoint.rs | 7 +- crates/core/src/sync/interface.rs | 2 + crates/core/src/sync/line.rs | 70 +++++++++++- crates/core/src/sync/storage_adapter.rs | 53 +++++++++ crates/core/src/sync/streaming_sync.rs | 146 +++++++++++++++++++++--- crates/core/src/sync/subscriptions.rs | 32 +++++- crates/core/src/sync/sync_status.rs | 48 +++++++- crates/core/src/util.rs | 84 +++++++++++++- 9 files changed, 421 insertions(+), 28 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 9a37ccf..56c8736 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -389,12 +389,13 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( CREATE TABLE ps_stream_subscriptions ( id NOT NULL INTEGER PRIMARY KEY, stream_name TEXT NOT NULL, - is_default INTEGER NOT NULL, + active INTEGER NOT NULL DEFAULT FALSE, + is_default INTEGER NOT NULL DEFAULT FALSE, local_priority INTEGER, local_params TEXT, - ttl INTEGER + ttl INTEGER, + expires_at INTEGER ) STRICT; -ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]'; INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( json_object('sql', 'todo down migration'), diff --git a/crates/core/src/sync/checkpoint.rs b/crates/core/src/sync/checkpoint.rs index 0c105a9..e778af1 100644 --- a/crates/core/src/sync/checkpoint.rs +++ b/crates/core/src/sync/checkpoint.rs @@ -1,7 +1,10 @@ use alloc::{string::String, vec::Vec}; use num_traits::Zero; -use crate::sync::{line::BucketChecksum, BucketPriority, Checksum}; +use crate::sync::{ + line::{BucketChecksum, BucketSubscriptionReason}, + BucketPriority, Checksum, +}; use sqlite_nostd::{self as sqlite, Connection, ResultCode}; /// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from @@ -12,6 +15,7 @@ pub struct OwnedBucketChecksum { pub checksum: Checksum, pub priority: BucketPriority, pub count: Option, + pub subscriptions: BucketSubscriptionReason, } impl OwnedBucketChecksum { @@ -30,6 +34,7 @@ impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum { checksum: value.checksum, priority: value.priority.unwrap_or(BucketPriority::FALLBACK), count: value.count, + subscriptions: value.subscriptions.clone(), } } } diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index adbd8f3..f7b0c52 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -15,6 +15,7 @@ use sqlite_nostd::{Connection, Context}; use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; +use crate::sync::BucketPriority; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -141,6 +142,7 @@ pub struct RequestedStreamSubscription { pub stream: String, /// Parameters to make available in the stream's definition. pub parameters: Box, + pub override_priority: Option, #[serde_as(as = "DisplayFromStr")] pub client_id: i64, } diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index f40329a..eedc770 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -1,6 +1,7 @@ use alloc::borrow::Cow; +use alloc::string::{String, ToString}; use alloc::vec::Vec; -use serde::de::{IgnoredAny, VariantAccess, Visitor}; +use serde::de::{Error, IgnoredAny, VariantAccess, Visitor}; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; @@ -82,6 +83,14 @@ pub struct Checkpoint<'a> { pub write_checkpoint: Option, #[serde(borrow)] pub buckets: Vec>, + #[serde(default, borrow)] + pub streams: Vec>, +} + +#[derive(Deserialize, Debug)] +pub struct StreamDefinition<'a> { + pub name: SyncLineStr<'a>, + pub is_default: bool, } #[serde_as] @@ -120,14 +129,67 @@ pub struct BucketChecksum<'a> { pub priority: Option, #[serde(default)] pub count: Option, - #[serde_as(as = "Vec>")] #[serde(default)] - pub subscriptions: Vec>, + pub subscriptions: BucketSubscriptionReason, // #[serde(default)] // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] // pub last_op_id: Option, } +/// The reason for why a bucket was included in a checkpoint. +#[derive(Debug, Default, Clone)] +pub enum BucketSubscriptionReason { + /// A bucket was created for all of the subscription ids we've explicitly requested in the sync + /// request. + ExplicitlySubscribed { subscriptions: Vec }, + /// A bucket was created from a default stream. + IsDefault { stream_name: String }, + /// We're talking to an older sync service not sending the reason. + #[default] + Unknown, +} + +impl<'de> Deserialize<'de> for BucketSubscriptionReason { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct MyVisitor; + + impl<'de> Visitor<'de> for MyVisitor { + type Value = BucketSubscriptionReason; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a subscription reason") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut subscriptions = Vec::::new(); + + while let Some(item) = seq.next_element::<&'de str>()? { + subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?); + } + + Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions }) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Ok(BucketSubscriptionReason::IsDefault { + stream_name: v.to_string(), + }) + } + } + + deserializer.deserialize_any(MyVisitor) + } +} + #[derive(Deserialize, Debug)] pub struct DataLine<'a> { #[serde(borrow)] @@ -229,6 +291,7 @@ mod tests { last_op_id: 10, write_checkpoint: None, buckets: _, + streams: _, }) ); @@ -264,6 +327,7 @@ mod tests { last_op_id: 1, write_checkpoint: None, buckets: _, + streams: _, }) ); } diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 6bed18e..2f741a2 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -2,6 +2,7 @@ use core::{assert_matches::debug_assert_matches, fmt::Display}; use alloc::{string::ToString, vec::Vec}; use serde::Serialize; +use serde_json::value::RawValue; use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; use crate::{ @@ -13,8 +14,11 @@ use crate::{ sync::{ checkpoint::{validate_checkpoint, ChecksumMismatch}, interface::{RequestedStreamSubscription, StreamSubscriptionRequest}, + streaming_sync::OwnedStreamDefinition, + subscriptions::LocallyTrackedSubscription, }, sync_local::{PartialSyncOperation, SyncOperation}, + util::{column_nullable, JsonString}, }; use super::{ @@ -261,6 +265,55 @@ impl StorageAdapter { Ok(res) } + + fn read_stream_subscription( + stmt: &ManagedStmt, + ) -> Result { + Ok(LocallyTrackedSubscription { + id: stmt.column_int64(0), + stream_name: stmt.column_text(1)?.to_string(), + active: stmt.column_int(2) != 0, + is_default: stmt.column_int(3) != 0, + local_priority: column_nullable(&stmt, 4, || { + BucketPriority::try_from(stmt.column_int(4)) + })?, + local_params: column_nullable(&stmt, 5, || { + JsonString::from_string(stmt.column_text(5)?.to_string()) + })?, + ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?, + expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?, + }) + } + + pub fn iterate_local_subscriptions ()>( + &self, + mut action: F, + ) -> Result<(), PowerSyncError> { + let stmt = self + .db + .prepare_v2("SELECT * FROM ps_stream_subscriptions ORDER BY id ASC")?; + + while stmt.step()? == ResultCode::ROW { + action(Self::read_stream_subscription(&stmt)?); + } + + stmt.finalize()?; + Ok(()) + } + + pub fn create_default_subscription( + &self, + stream: &OwnedStreamDefinition, + ) -> Result { + let stmt = self.db.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, active, is_default) VALUES (?, TRUE, TRUE) RETURNING *;")?; + stmt.bind_text(1, &stream.name, sqlite_nostd::Destructor::STATIC)?; + + if stmt.step()? == ResultCode::ROW { + Self::read_stream_subscription(&stmt) + } else { + Err(PowerSyncError::unknown_internal()) + } + } } pub struct BucketInfo { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index a633af5..dd95ad1 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -7,13 +7,17 @@ use core::{ use alloc::{ boxed::Box, - collections::{btree_map::BTreeMap, btree_set::BTreeSet}, + collections::{ + btree_map::{BTreeMap, Entry}, + btree_set::BTreeSet, + }, format, string::{String, ToString}, sync::Arc, vec::Vec, }; use futures_lite::FutureExt; +use serde::de; use crate::{ bson, @@ -21,8 +25,12 @@ use crate::{ kv::client_id, state::DatabaseState, sync::{ - checkpoint::OwnedBucketChecksum, interface::StartSyncStream, line::DataLine, - sync_status::Timestamp, BucketPriority, + checkpoint::OwnedBucketChecksum, + interface::StartSyncStream, + line::{BucketSubscriptionReason, DataLine, StreamDefinition}, + subscriptions::{LocallyTrackedSubscription, SubscriptionKey}, + sync_status::{ActiveStreamSubscription, Timestamp}, + BucketPriority, }, }; use sqlite_nostd::{self as sqlite}; @@ -265,9 +273,11 @@ impl StreamingSyncIteration { self.adapter .delete_buckets(to_delete.iter().map(|b| b.as_str()))?; - let progress = self.load_progress(updated_target.target_checkpoint().unwrap())?; + let target = updated_target.target_checkpoint().unwrap(); + let progress = self.load_progress(&target.checkpoint)?; SyncStateMachineTransition::StartTrackingCheckpoint { progress, + subscription_state: self.resolve_subscription_state(&target)?, updated_target, } } @@ -279,24 +289,26 @@ impl StreamingSyncIteration { )); }; - let mut target = target.clone(); + let mut target = (*target).clone(); target.apply_diff(&diff); self.adapter .delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?; - let progress = self.load_progress(&target)?; + let progress = self.load_progress(&target.checkpoint)?; SyncStateMachineTransition::StartTrackingCheckpoint { progress, + subscription_state: self.resolve_subscription_state(&target)?, updated_target: SyncTarget::Tracking(target), } } SyncLine::CheckpointComplete(_) => { - let Some(target) = target.target_checkpoint() else { + let Some(checkpoint) = target.target_checkpoint() else { return Err(PowerSyncError::sync_protocol_error( "Received checkpoint complete without previous checkpoint", PowerSyncErrorCause::Unknown, )); }; + let target = &checkpoint.checkpoint; let result = self.adapter .sync_local(&self.state, target, None, &self.options.schema)?; @@ -344,7 +356,7 @@ impl StreamingSyncIteration { }; let result = self.adapter.sync_local( &self.state, - target, + &target.checkpoint, Some(priority), &self.options.schema, )?; @@ -419,9 +431,10 @@ impl StreamingSyncIteration { SyncStateMachineTransition::StartTrackingCheckpoint { progress, updated_target, + subscription_state, } => { self.status.update( - |s| s.start_tracking_checkpoint(progress), + |s| s.start_tracking_checkpoint(progress, subscription_state), &mut event.instructions, ); self.validated_but_not_applied = None; @@ -565,6 +578,63 @@ impl StreamingSyncIteration { Ok(progress) } + fn resolve_subscription_state( + &self, + tracked: &TrackedCheckpoint, + ) -> Result, PowerSyncError> { + let mut tracked_subscriptions: Vec = Vec::new(); + + // Load known subscriptions from database + self.adapter.iterate_local_subscriptions(|sub| { + tracked_subscriptions.push(sub); + })?; + + // If they don't exist already, create default subscriptions included in checkpoint + for subscription in &tracked.streams { + if subscription.is_default { + let found = tracked_subscriptions + .iter() + .filter(|s| s.stream_name == subscription.name) + .next(); + + if found.is_none() { + let subscription = self.adapter.create_default_subscription(subscription)?; + tracked_subscriptions.push(subscription); + } + } + } + + debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id)); + + let mut resolved: Vec = tracked_subscriptions + .iter() + .map(|e| ActiveStreamSubscription::from_local(e)) + .collect(); + + // TODO: Cleanup old default subscriptions? + + // Iterate over buckets to associate them with subscriptions + for bucket in tracked.checkpoint.buckets.values() { + match &bucket.subscriptions { + BucketSubscriptionReason::ExplicitlySubscribed { subscriptions } => { + for subscription_id in subscriptions { + if let Ok(index) = + tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id) + { + resolved[index] + .associated_buckets + .push(bucket.bucket.clone()); + } + } + } + BucketSubscriptionReason::IsDefault { stream_name } => todo!(), + BucketSubscriptionReason::Unknown => {} + } + } + + Ok(resolved) + } + /// Prepares a sync iteration by handling the initial [SyncEvent::Initialize]. /// /// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket @@ -614,16 +684,16 @@ impl StreamingSyncIteration { enum SyncTarget { /// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is /// updated for subsequent checkpoint or checkpoint_diff lines. - Tracking(OwnedCheckpoint), + Tracking(TrackedCheckpoint), /// We have not received a checkpoint message yet. We still keep a list of local buckets around /// so that we know which ones to delete depending on the first checkpoint message. BeforeCheckpoint(Vec), } impl SyncTarget { - fn target_checkpoint(&self) -> Option<&OwnedCheckpoint> { + fn target_checkpoint(&self) -> Option<&TrackedCheckpoint> { match self { - Self::Tracking(cp) => Some(cp), + Self::Tracking(tracked) => Some(tracked), _ => None, } } @@ -639,7 +709,7 @@ impl SyncTarget { /// buckets fails. fn track_checkpoint<'a>(&self, checkpoint: &Checkpoint<'a>) -> (BTreeSet, Self) { let mut to_delete: BTreeSet = match &self { - SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(), + SyncTarget::Tracking(tracked) => tracked.checkpoint.buckets.keys().cloned().collect(), SyncTarget::BeforeCheckpoint(buckets) => buckets.iter().cloned().collect(), }; @@ -651,11 +721,58 @@ impl SyncTarget { ( to_delete, - SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets)), + SyncTarget::Tracking(TrackedCheckpoint { + checkpoint: OwnedCheckpoint::from_checkpoint(checkpoint, buckets), + streams: checkpoint + .streams + .iter() + .map(OwnedStreamDefinition::from_definition) + .collect(), + }), ) } } +/// Information about the currently-tracked checkpoint of the sync client. +/// +/// This struct is initially created from the first [Checkpoint] line and then patched as we receive +/// [CheckpointDiff] lines afterwards. +#[derive(Debug, Clone)] +pub struct TrackedCheckpoint { + pub checkpoint: OwnedCheckpoint, + /// Streams included in the checkpoint + pub streams: Vec, +} + +impl TrackedCheckpoint { + fn apply_diff<'a>(&mut self, diff: &CheckpointDiff<'a>) { + self.checkpoint.apply_diff(diff); + // stream definitions are never changed by a checkpoint_diff line + } +} + +#[derive(Debug, Clone)] +pub struct OwnedStreamDefinition { + pub name: String, + pub is_default: bool, +} + +impl OwnedStreamDefinition { + pub fn from_definition<'a>(definition: &StreamDefinition<'a>) -> Self { + Self { + name: definition.name.clone().into_owned(), + is_default: definition.is_default, + } + } + + pub fn key(&self) -> SubscriptionKey { + SubscriptionKey { + stream_name: self.name.clone(), + params: None, + } + } +} + #[derive(Debug, Clone)] pub struct OwnedCheckpoint { pub last_op_id: i64, @@ -704,6 +821,7 @@ enum SyncStateMachineTransition<'a> { StartTrackingCheckpoint { progress: SyncDownloadProgress, updated_target: SyncTarget, + subscription_state: Vec, }, DataLineSaved { line: &'a DataLine<'a>, diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index d254200..6623ee7 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -1,10 +1,38 @@ -use core::time::Duration; +use core::{cmp::Ordering, hash::Hash, time::Duration}; use alloc::{boxed::Box, string::String}; use serde::Deserialize; use serde_with::{serde_as, DurationSeconds}; -use crate::sync::BucketPriority; +use crate::{sync::BucketPriority, util::JsonString}; + +/// A key that uniquely identifies a stream subscription. +#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)] +pub struct SubscriptionKey { + pub stream_name: String, + pub params: Option>, +} + +/// A row in the `ps_stream_subscriptions` table. +pub struct LocallyTrackedSubscription { + pub id: i64, + pub stream_name: String, + pub active: bool, + pub is_default: bool, + pub local_priority: Option, + pub local_params: Option>, + pub ttl: Option, + pub expires_at: Option, +} + +impl LocallyTrackedSubscription { + pub fn key(&self) -> SubscriptionKey { + SubscriptionKey { + stream_name: self.stream_name.clone(), + params: self.local_params.clone(), + } + } +} /// A request sent from a PowerSync SDK to alter the subscriptions managed by this client. #[derive(Deserialize)] diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 89a86a3..0c7213a 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -1,10 +1,16 @@ -use alloc::{collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec}; -use core::{cell::RefCell, hash::BuildHasher}; +use alloc::{boxed::Box, collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec}; +use core::{ + cell::RefCell, + hash::{BuildHasher, Hash}, +}; use rustc_hash::FxBuildHasher; use serde::Serialize; use sqlite_nostd::ResultCode; -use crate::sync::storage_adapter::StorageAdapter; +use crate::{ + sync::{storage_adapter::StorageAdapter, subscriptions::LocallyTrackedSubscription}, + util::JsonString, +}; use super::{ bucket_priority::BucketPriority, interface::Instruction, line::DataLine, @@ -31,6 +37,7 @@ pub struct DownloadSyncStatus { /// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been /// received), information about how far the download has progressed. pub downloading: Option, + pub streams: Vec, } impl DownloadSyncStatus { @@ -60,10 +67,15 @@ impl DownloadSyncStatus { /// Transitions state after receiving a checkpoint line. /// /// This sets the [downloading] state to include [progress]. - pub fn start_tracking_checkpoint<'a>(&mut self, progress: SyncDownloadProgress) { + pub fn start_tracking_checkpoint<'a>( + &mut self, + progress: SyncDownloadProgress, + subscriptions: Vec, + ) { self.mark_connected(); self.downloading = Some(progress); + self.streams = subscriptions; } /// Increments [SyncDownloadProgress] progress for the given [DataLine]. @@ -107,6 +119,7 @@ impl Default for DownloadSyncStatus { connecting: false, downloading: None, priority_status: Vec::new(), + streams: Vec::new(), } } } @@ -246,3 +259,30 @@ impl SyncDownloadProgress { } } } + +#[derive(Serialize, Hash)] +pub struct ActiveStreamSubscription { + pub name: String, + pub parameters: Option>, + pub associated_buckets: Vec, + pub active: bool, + pub is_default: bool, + pub expires_at: Option, + pub has_synced: bool, + pub last_synced_at: Option, +} + +impl ActiveStreamSubscription { + pub fn from_local(local: &LocallyTrackedSubscription) -> Self { + Self { + name: local.stream_name.clone(), + parameters: local.local_params.clone(), + is_default: local.is_default, + associated_buckets: Vec::new(), + active: local.active, + expires_at: local.expires_at.clone().map(|e| Timestamp(e)), + has_synced: false, // TODDO + last_synced_at: None, // TODO + } + } +} diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index a46b968..d33cf97 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -1,8 +1,14 @@ extern crate alloc; -use alloc::format; +use core::{cmp::Ordering, hash::Hash}; + use alloc::string::String; +use alloc::{boxed::Box, format}; +use serde::Serialize; +use serde_json::value::RawValue; +use sqlite_nostd::{ColumnType, ManagedStmt, ResultCode}; +use crate::error::PowerSyncError; #[cfg(not(feature = "getrandom"))] use crate::sqlite; @@ -35,6 +41,82 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } +pub fn column_nullable Result>( + stmt: &ManagedStmt, + index: i32, + read: R, +) -> Result, PowerSyncError> { + if stmt.column_type(index)? == ColumnType::Null { + Ok(None) + } else { + Ok(Some(read()?)) + } +} + +/// An opaque wrapper around a JSON-serialized value. +/// +/// This wraps [RawValue] from `serde_json`, adding implementations for comparisons and hashes. +#[derive(Debug)] +#[repr(transparent)] +pub struct JsonString(pub RawValue); + +impl JsonString { + pub fn from_string(string: String) -> Result, PowerSyncError> { + let underlying = + RawValue::from_string(string).map_err(PowerSyncError::as_argument_error)?; + unsafe { + // Safety: repr(transparent) + core::mem::transmute(underlying) + } + } +} + +impl Hash for JsonString { + fn hash(&self, state: &mut H) { + self.0.get().hash(state); + } +} + +impl PartialEq for JsonString { + fn eq(&self, other: &Self) -> bool { + self.0.get() == other.0.get() + } +} + +impl Eq for JsonString {} + +impl PartialOrd for JsonString { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for JsonString { + fn cmp(&self, other: &Self) -> Ordering { + self.0.get().cmp(other.0.get()) + } +} + +impl Serialize for JsonString { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.0.serialize(serializer) + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + let raw_value_box: &Box = unsafe { + // SAFETY: repr(transparent) + core::mem::transmute(self) + }; + + unsafe { core::mem::transmute(raw_value_box.clone()) } + } +} + // Use getrandom crate to generate UUID. // This is not available in all WASM builds - use the default in those cases. #[cfg(feature = "getrandom")] From c791cd4f2d8448b5acd1970d774f020ec1cce8e9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 9 Jul 2025 12:17:51 +0200 Subject: [PATCH 07/18] Test handling default streams --- crates/core/src/migrations.rs | 9 +-- crates/core/src/sync/storage_adapter.rs | 3 +- crates/core/src/sync/streaming_sync.rs | 28 ++++++-- crates/core/src/sync/subscriptions.rs | 1 + crates/core/src/sync/sync_status.rs | 4 +- crates/core/src/util.rs | 2 + dart/test/error_test.dart | 2 +- dart/test/goldens/simple_iteration.json | 18 +++-- dart/test/goldens/starting_stream.json | 7 +- dart/test/sync_stream_test.dart | 96 +++++++++++++++++++++++++ dart/test/sync_test.dart | 46 ++---------- dart/test/utils/matchers.dart | 8 --- dart/test/utils/native_test_utils.dart | 11 +++ dart/test/utils/test_utils.dart | 55 ++++++++++++++ 14 files changed, 219 insertions(+), 71 deletions(-) create mode 100644 dart/test/sync_stream_test.dart delete mode 100644 dart/test/utils/matchers.dart create mode 100644 dart/test/utils/test_utils.dart diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 56c8736..d33096a 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError}; use crate::fix_data::apply_v035_fix; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 10; +pub const LATEST_VERSION: i32 = 11; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -387,14 +387,15 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( if current_version < 11 && target_version >= 11 { let stmt = "\ CREATE TABLE ps_stream_subscriptions ( - id NOT NULL INTEGER PRIMARY KEY, + id INTEGER NOT NULL PRIMARY KEY, stream_name TEXT NOT NULL, active INTEGER NOT NULL DEFAULT FALSE, is_default INTEGER NOT NULL DEFAULT FALSE, local_priority INTEGER, local_params TEXT, ttl INTEGER, - expires_at INTEGER + expires_at INTEGER, + last_synced_at INTEGER ) STRICT; INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( @@ -402,7 +403,7 @@ json_object('sql', 'todo down migration'), json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') )); "; - local_db.exec_safe(stmt)?; + local_db.exec_safe(stmt).into_db_result(local_db)?; } Ok(()) diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 2f741a2..96bb8bb 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -282,6 +282,7 @@ impl StorageAdapter { })?, ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?, expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?, + last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?, }) } @@ -296,8 +297,6 @@ impl StorageAdapter { while stmt.step()? == ResultCode::ROW { action(Self::read_stream_subscription(&stmt)?); } - - stmt.finalize()?; Ok(()) } diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index dd95ad1..6432ea7 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -594,7 +594,7 @@ impl StreamingSyncIteration { if subscription.is_default { let found = tracked_subscriptions .iter() - .filter(|s| s.stream_name == subscription.name) + .filter(|s| s.stream_name == subscription.name && s.local_params.is_none()) .next(); if found.is_none() { @@ -606,10 +606,20 @@ impl StreamingSyncIteration { debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id)); - let mut resolved: Vec = tracked_subscriptions - .iter() - .map(|e| ActiveStreamSubscription::from_local(e)) - .collect(); + let mut resolved: Vec = + Vec::with_capacity(tracked_subscriptions.len()); + // Map of stream name to index in resolved for stream subscriptions without custom + // parameters. This simplifies the association from BucketSubscriptionReason::IsDefault to + // stream subscriptions later. + let mut default_stream_subscriptions = BTreeMap::<&str, usize>::new(); + + for (i, subscription) in tracked_subscriptions.iter().enumerate() { + resolved.push(ActiveStreamSubscription::from_local(subscription)); + + if subscription.local_params.is_none() { + default_stream_subscriptions.insert(&subscription.stream_name, i); + } + } // TODO: Cleanup old default subscriptions? @@ -627,7 +637,13 @@ impl StreamingSyncIteration { } } } - BucketSubscriptionReason::IsDefault { stream_name } => todo!(), + BucketSubscriptionReason::IsDefault { stream_name } => { + if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) { + resolved[*index] + .associated_buckets + .push(bucket.bucket.clone()); + } + } BucketSubscriptionReason::Unknown => {} } } diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 6623ee7..868e7a5 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -23,6 +23,7 @@ pub struct LocallyTrackedSubscription { pub local_params: Option>, pub ttl: Option, pub expires_at: Option, + pub last_synced_at: Option, } impl LocallyTrackedSubscription { diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 0c7213a..2a2bc37 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -268,7 +268,6 @@ pub struct ActiveStreamSubscription { pub active: bool, pub is_default: bool, pub expires_at: Option, - pub has_synced: bool, pub last_synced_at: Option, } @@ -281,8 +280,7 @@ impl ActiveStreamSubscription { associated_buckets: Vec::new(), active: local.active, expires_at: local.expires_at.clone().map(|e| Timestamp(e)), - has_synced: false, // TODDO - last_synced_at: None, // TODO + last_synced_at: local.last_synced_at.map(|e| Timestamp(e)), } } } diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index d33cf97..79165e4 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -41,6 +41,8 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } +/// Calls [read] to read a column if it's not null, otherwise returns [None]. +#[inline] pub fn column_nullable Result>( stmt: &ManagedStmt, index: i32, diff --git a/dart/test/error_test.dart b/dart/test/error_test.dart index b7349a9..a84c20b 100644 --- a/dart/test/error_test.dart +++ b/dart/test/error_test.dart @@ -3,8 +3,8 @@ import 'dart:convert'; import 'package:sqlite3/common.dart'; import 'package:test/test.dart'; -import 'utils/matchers.dart'; import 'utils/native_test_utils.dart'; +import 'utils/test_utils.dart'; void main() { group('error reporting', () { diff --git a/dart/test/goldens/simple_iteration.json b/dart/test/goldens/simple_iteration.json index ad3f358..8e76a25 100644 --- a/dart/test/goldens/simple_iteration.json +++ b/dart/test/goldens/simple_iteration.json @@ -9,7 +9,8 @@ "connected": false, "connecting": true, "priority_status": [], - "downloading": null + "downloading": null, + "streams": [] } } }, @@ -21,7 +22,11 @@ "raw_data": true, "binary_data": true, "client_id": "test-test-test-test", - "parameters": null + "parameters": null, + "streams": { + "include_defaults": true, + "subscriptions": [] + } } } } @@ -59,7 +64,8 @@ "target_count": 1 } } - } + }, + "streams": [] } } } @@ -108,7 +114,8 @@ "target_count": 1 } } - } + }, + "streams": [] } } } @@ -146,7 +153,8 @@ "has_synced": true } ], - "downloading": null + "downloading": null, + "streams": [] } } } diff --git a/dart/test/goldens/starting_stream.json b/dart/test/goldens/starting_stream.json index 96e505c..2549905 100644 --- a/dart/test/goldens/starting_stream.json +++ b/dart/test/goldens/starting_stream.json @@ -13,7 +13,8 @@ "connected": false, "connecting": true, "priority_status": [], - "downloading": null + "downloading": null, + "streams": [] } } }, @@ -27,6 +28,10 @@ "client_id": "test-test-test-test", "parameters": { "foo": "bar" + }, + "streams": { + "include_defaults": true, + "subscriptions": [] } } } diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart new file mode 100644 index 0000000..a299545 --- /dev/null +++ b/dart/test/sync_stream_test.dart @@ -0,0 +1,96 @@ +import 'dart:convert'; + +import 'package:file/local.dart'; +import 'package:sqlite3/common.dart'; +import 'package:sqlite3/sqlite3.dart'; +import 'package:sqlite3_test/sqlite3_test.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; +import 'utils/test_utils.dart'; + +void main() { + final vfs = TestSqliteFileSystem( + fs: const LocalFileSystem(), name: 'vfs-stream-test'); + + setUpAll(() { + loadExtension(); + sqlite3.registerVirtualFileSystem(vfs, makeDefault: false); + }); + tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs)); + + late CommonDatabase db; + Object? lastStatus; + + setUp(() async { + db = openTestDatabase(vfs: vfs) + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(testSchema)]) + ..execute('update ps_kv set value = ?2 where key = ?1', + ['client_id', 'test-test-test-test']); + }); + + tearDown(() { + db.dispose(); + }); + + List control(String operation, Object? data) { + db.execute('begin'); + ResultSet result; + + try { + result = db.select('SELECT powersync_control(?, ?)', [operation, data]); + } catch (e) { + db.execute('rollback'); + rethrow; + } + + db.execute('commit'); + final [row] = result; + final instructions = jsonDecode(row.columnAt(0)) as List; + for (final instruction in instructions) { + if (instruction case {'UpdateSyncStatus': final status}) { + lastStatus = status['status']!; + } + } + + return instructions; + } + + group('default streams', () { + test('are created on-demand', () { + control('start', null); + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [ + bucketDescription('a', + subscriptions: 'my_default_stream', priority: 1), + ], + streams: [('my_default_stream', true)], + ), + ), + ); + + expect( + lastStatus, + containsPair( + 'streams', + [ + { + 'name': 'my_default_stream', + 'parameters': null, + 'associated_buckets': ['a'], + 'active': true, + 'is_default': true, + 'expires_at': null, + 'last_synced_at': null + } + ], + ), + ); + }); + }); +} diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index a39d953..893ba22 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -3,9 +3,7 @@ import 'dart:io'; import 'dart:typed_data'; import 'package:bson/bson.dart'; -import 'package:fake_async/fake_async.dart'; import 'package:file/local.dart'; -import 'package:meta/meta.dart'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite3_test/sqlite3_test.dart'; @@ -13,16 +11,8 @@ import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:path/path.dart'; -import 'utils/matchers.dart'; import 'utils/native_test_utils.dart'; - -@isTest -void syncTest(String description, void Function(FakeAsync controller) body) { - return test(description, () { - // Give each test the same starting time to make goldens easier to compare. - fakeAsync(body, initialTime: DateTime.utc(2025, 3, 1, 10)); - }); -} +import 'utils/test_utils.dart'; void main() { final vfs = @@ -85,7 +75,7 @@ void _syncTests({ setUp(() async { db = openTestDatabase(vfs: vfs) ..select('select powersync_init();') - ..select('select powersync_replace_schema(?)', [json.encode(_schema)]) + ..select('select powersync_replace_schema(?)', [json.encode(testSchema)]) ..execute('update ps_kv set value = ?2 where key = ?1', ['client_id', 'test-test-test-test']); @@ -132,13 +122,7 @@ void _syncTests({ List pushCheckpoint( {int lastOpId = 1, List buckets = const []}) { - return syncLine({ - 'checkpoint': { - 'last_op_id': '$lastOpId', - 'write_checkpoint': null, - 'buckets': buckets, - }, - }); + return syncLine(checkpoint(lastOpId: lastOpId, buckets: buckets)); } List pushCheckpointComplete({int? priority, String lastOpId = '1'}) { @@ -715,7 +699,8 @@ void _syncTests({ db = openTestDatabase(fileName: fileName) ..select('select powersync_init();') - ..select('select powersync_replace_schema(?)', [json.encode(_schema)]) + ..select( + 'select powersync_replace_schema(?)', [json.encode(testSchema)]) ..execute('update ps_kv set value = ?2 where key = ?1', ['client_id', 'test-test-test-test']); @@ -955,27 +940,6 @@ END; }); } -const _schema = { - 'tables': [ - { - 'name': 'items', - 'columns': [ - {'name': 'col', 'type': 'text'} - ], - } - ] -}; - -Object bucketDescription(String name, - {int checksum = 0, int priority = 3, int count = 1}) { - return { - 'bucket': name, - 'checksum': checksum, - 'priority': priority, - 'count': count, - }; -} - final priorityBuckets = [ for (var i = 0; i < 4; i++) bucketDescription('prio$i', priority: i) ]; diff --git a/dart/test/utils/matchers.dart b/dart/test/utils/matchers.dart deleted file mode 100644 index 4d198ff..0000000 --- a/dart/test/utils/matchers.dart +++ /dev/null @@ -1,8 +0,0 @@ -import 'package:sqlite3/common.dart'; -import 'package:test/test.dart'; - -Matcher isSqliteException(int code, dynamic message) { - return isA() - .having((e) => e.extendedResultCode, 'extendedResultCode', code) - .having((e) => e.message, 'message', message); -} diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index 36db4a5..b08c9b1 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -1,10 +1,13 @@ import 'dart:ffi'; import 'dart:io'; +import 'package:fake_async/fake_async.dart'; +import 'package:meta/meta.dart'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/open.dart' as sqlite_open; import 'package:sqlite3/sqlite3.dart'; import 'package:path/path.dart' as p; +import 'package:test/test.dart'; const defaultSqlitePath = 'libsqlite3.so.0'; @@ -72,3 +75,11 @@ String getLibraryForPlatform({String? path = "."}) { ) })); } + +@isTest +void syncTest(String description, void Function(FakeAsync controller) body) { + return test(description, () { + // Give each test the same starting time to make goldens easier to compare. + fakeAsync(body, initialTime: DateTime.utc(2025, 3, 1, 10)); + }); +} diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart new file mode 100644 index 0000000..23c30e7 --- /dev/null +++ b/dart/test/utils/test_utils.dart @@ -0,0 +1,55 @@ +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +/// Creates a `checkpoint` line. +Object checkpoint({ + required int lastOpId, + List buckets = const [], + String? writeCheckpoint, + List<(String, bool)> streams = const [], +}) { + return { + 'checkpoint': { + 'last_op_id': '$lastOpId', + 'write_checkpoint': null, + 'buckets': buckets, + 'streams': [ + for (final (name, isDefault) in streams) + {'name': name, 'is_default': isDefault}, + ], + } + }; +} + +Object bucketDescription( + String name, { + int checksum = 0, + int priority = 3, + int count = 1, + Object? subscriptions, +}) { + return { + 'bucket': name, + 'checksum': checksum, + 'priority': priority, + 'count': count, + 'subscriptions': subscriptions, + }; +} + +Matcher isSqliteException(int code, dynamic message) { + return isA() + .having((e) => e.extendedResultCode, 'extendedResultCode', code) + .having((e) => e.message, 'message', message); +} + +const testSchema = { + 'tables': [ + { + 'name': 'items', + 'columns': [ + {'name': 'col', 'type': 'text'} + ], + } + ] +}; From 0916b4575773511ed293ae79cadf58d7ecf7ab32 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 9 Jul 2025 18:17:26 +0200 Subject: [PATCH 08/18] Update last_synced_at for subscriptions --- crates/core/src/sync/streaming_sync.rs | 63 +++++++++++++++++--------- crates/core/src/sync/sync_status.rs | 36 +++++++++++++-- dart/test/sync_stream_test.dart | 21 ++++++++- dart/test/sync_test.dart | 8 +--- dart/test/utils/test_utils.dart | 10 ++++ 5 files changed, 103 insertions(+), 35 deletions(-) diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 6432ea7..ba173c9 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -33,7 +33,7 @@ use crate::{ BucketPriority, }, }; -use sqlite_nostd::{self as sqlite}; +use sqlite_nostd::{self as sqlite, Connection, ResultCode}; use super::{ interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent}, @@ -309,9 +309,7 @@ impl StreamingSyncIteration { )); }; let target = &checkpoint.checkpoint; - let result = - self.adapter - .sync_local(&self.state, target, None, &self.options.schema)?; + let result = self.sync_local(target, None)?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { @@ -354,12 +352,7 @@ impl StreamingSyncIteration { "Received checkpoint complete without previous checkpoint", )); }; - let result = self.adapter.sync_local( - &self.state, - &target.checkpoint, - Some(priority), - &self.options.schema, - )?; + let result = self.sync_local(&target.checkpoint, Some(priority))?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { @@ -505,12 +498,7 @@ impl StreamingSyncIteration { .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?, SyncEvent::UploadFinished => { if let Some(checkpoint) = self.validated_but_not_applied.take() { - let result = self.adapter.sync_local( - &self.state, - &checkpoint, - None, - &self.options.schema, - )?; + let result = self.sync_local(&checkpoint, None)?; match result { SyncLocalResult::ChangesApplied => { @@ -631,17 +619,13 @@ impl StreamingSyncIteration { if let Ok(index) = tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id) { - resolved[index] - .associated_buckets - .push(bucket.bucket.clone()); + resolved[index].mark_associated_with_bucket(&bucket); } } } BucketSubscriptionReason::IsDefault { stream_name } => { if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) { - resolved[*index] - .associated_buckets - .push(bucket.bucket.clone()); + resolved[*index].mark_associated_with_bucket(&bucket); } } BucketSubscriptionReason::Unknown => {} @@ -651,6 +635,41 @@ impl StreamingSyncIteration { Ok(resolved) } + /// Performs a partial or a complete local sync. + fn sync_local( + &self, + target: &OwnedCheckpoint, + priority: Option, + ) -> Result { + let result = + self.adapter + .sync_local(&self.state, target, priority, &self.options.schema)?; + + if matches!(&result, SyncLocalResult::ChangesApplied) { + // Update affected stream subscriptions to mark them as synced. + let mut status = self.status.inner().borrow_mut(); + if let Some(ref mut streams) = status.streams { + let stmt = self.adapter.db.prepare_v2( + "UPDATE ps_stream_subscriptions SET last_synced_at = unixepoch() WHERE id = ? RETURNING last_synced_at", + )?; + + for stream in streams { + if stream.is_in_priority(priority) { + stmt.bind_int64(1, stream.id)?; + if stmt.step()? == ResultCode::ROW { + let timestamp = Timestamp(stmt.column_int64(0)); + stream.last_synced_at = Some(timestamp); + } + + stmt.reset()?; + } + } + } + } + + Ok(result) + } + /// Prepares a sync iteration by handling the initial [SyncEvent::Initialize]. /// /// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 2a2bc37..e56ec0a 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -1,6 +1,7 @@ use alloc::{boxed::Box, collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec}; use core::{ cell::RefCell, + cmp::min, hash::{BuildHasher, Hash}, }; use rustc_hash::FxBuildHasher; @@ -8,7 +9,10 @@ use serde::Serialize; use sqlite_nostd::ResultCode; use crate::{ - sync::{storage_adapter::StorageAdapter, subscriptions::LocallyTrackedSubscription}, + sync::{ + checkpoint::OwnedBucketChecksum, storage_adapter::StorageAdapter, + subscriptions::LocallyTrackedSubscription, + }, util::JsonString, }; @@ -37,7 +41,7 @@ pub struct DownloadSyncStatus { /// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been /// received), information about how far the download has progressed. pub downloading: Option, - pub streams: Vec, + pub streams: Option>, } impl DownloadSyncStatus { @@ -75,7 +79,7 @@ impl DownloadSyncStatus { self.mark_connected(); self.downloading = Some(progress); - self.streams = subscriptions; + self.streams = Some(subscriptions); } /// Increments [SyncDownloadProgress] progress for the given [DataLine]. @@ -119,7 +123,7 @@ impl Default for DownloadSyncStatus { connecting: false, downloading: None, priority_status: Vec::new(), - streams: Vec::new(), + streams: None, } } } @@ -137,6 +141,10 @@ impl SyncStatusContainer { } } + pub fn inner(&self) -> &Rc> { + &self.status + } + /// Invokes a function to update the sync status, then emits an [Instruction::UpdateSyncStatus] /// if the function did indeed change the status. pub fn update ()>( @@ -262,9 +270,12 @@ impl SyncDownloadProgress { #[derive(Serialize, Hash)] pub struct ActiveStreamSubscription { + #[serde(skip)] + pub id: i64, pub name: String, pub parameters: Option>, pub associated_buckets: Vec, + pub priority: Option, pub active: bool, pub is_default: bool, pub expires_at: Option, @@ -274,13 +285,30 @@ pub struct ActiveStreamSubscription { impl ActiveStreamSubscription { pub fn from_local(local: &LocallyTrackedSubscription) -> Self { Self { + id: local.id, name: local.stream_name.clone(), parameters: local.local_params.clone(), is_default: local.is_default, + priority: None, associated_buckets: Vec::new(), active: local.active, expires_at: local.expires_at.clone().map(|e| Timestamp(e)), last_synced_at: local.last_synced_at.map(|e| Timestamp(e)), } } + + pub fn mark_associated_with_bucket(&mut self, bucket: &OwnedBucketChecksum) { + self.associated_buckets.push(bucket.bucket.clone()); + self.priority = Some(match self.priority { + None => bucket.priority, + Some(prio) => min(prio, bucket.priority), + }); + } + + pub fn is_in_priority(&self, prio: Option) -> bool { + match prio { + None => true, + Some(prio) => self.priority >= Some(prio), + } + } } diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index a299545..df2ad88 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -58,7 +58,7 @@ void main() { } group('default streams', () { - test('are created on-demand', () { + syncTest('are created on-demand', (_) { control('start', null); control( 'line_text', @@ -86,11 +86,28 @@ void main() { 'active': true, 'is_default': true, 'expires_at': null, - 'last_synced_at': null + 'last_synced_at': null, + 'priority': 1, } ], ), ); + + control( + 'line_text', + json.encode(checkpointComplete(priority: 1)), + ); + + expect( + lastStatus, + containsPair( + 'streams', + [containsPair('last_synced_at', 1740823200)], + ), + ); + + final [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(stored, containsPair('last_synced_at', 1740823200)); }); }); } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 893ba22..804f686 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -126,13 +126,7 @@ void _syncTests({ } List pushCheckpointComplete({int? priority, String lastOpId = '1'}) { - return syncLine({ - priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': - { - 'last_op_id': lastOpId, - if (priority != null) 'priority': priority, - }, - }); + return syncLine(checkpointComplete(priority: priority, lastOpId: lastOpId)); } ResultSet fetchRows() { diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart index 23c30e7..c36c206 100644 --- a/dart/test/utils/test_utils.dart +++ b/dart/test/utils/test_utils.dart @@ -21,6 +21,16 @@ Object checkpoint({ }; } +/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line. +Object checkpointComplete({int? priority, String lastOpId = '1'}) { + return { + priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': { + 'last_op_id': lastOpId, + if (priority != null) 'priority': priority, + }, + }; +} + Object bucketDescription( String name, { int checksum = 0, From b25c03c1f27fde79ae4aca2dfac7d165e182c2f7 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 10 Jul 2025 18:55:12 +0200 Subject: [PATCH 09/18] Allow subscribing to streams --- crates/core/src/migrations.rs | 5 +- crates/core/src/sync/interface.rs | 6 ++ crates/core/src/sync/storage_adapter.rs | 23 +++++++- crates/core/src/sync/streaming_sync.rs | 38 ++++++++---- crates/core/src/sync/subscriptions.rs | 46 ++++++++++++++- dart/test/sync_stream_test.dart | 78 +++++++++++++++++++++++-- 6 files changed, 172 insertions(+), 24 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index d33096a..df92ee2 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -392,10 +392,11 @@ CREATE TABLE ps_stream_subscriptions ( active INTEGER NOT NULL DEFAULT FALSE, is_default INTEGER NOT NULL DEFAULT FALSE, local_priority INTEGER, - local_params TEXT, + local_params TEXT NOT NULL DEFAULT 'null', ttl INTEGER, expires_at INTEGER, - last_synced_at INTEGER + last_synced_at INTEGER, + UNIQUE (stream_name, local_params) ) STRICT; INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index f7b0c52..820990a 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -15,6 +15,7 @@ use sqlite_nostd::{Connection, Context}; use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; +use crate::sync::subscriptions::{apply_subscriptions, SubscriptionChangeRequest}; use crate::sync::BucketPriority; use super::streaming_sync::SyncClient; @@ -216,6 +217,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<( }), "refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken), "completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished), + "subscriptions" => { + let request = serde_json::from_str(payload.text()) + .map_err(PowerSyncError::as_argument_error)?; + return apply_subscriptions(ctx.db_handle(), request); + } _ => { return Err(PowerSyncError::argument_error("Unknown operation")); } diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 96bb8bb..70a15d5 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -35,6 +35,7 @@ pub struct StorageAdapter { pub db: *mut sqlite::sqlite3, pub progress_stmt: ManagedStmt, time_stmt: ManagedStmt, + delete_subscription: ManagedStmt, } impl StorageAdapter { @@ -47,10 +48,15 @@ impl StorageAdapter { // language=SQLite let time = db.prepare_v2("SELECT unixepoch()")?; + // language=SQLite + let delete_subscription = + db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?; + Ok(Self { db, progress_stmt: progress, time_stmt: time, + delete_subscription, }) } @@ -269,6 +275,8 @@ impl StorageAdapter { fn read_stream_subscription( stmt: &ManagedStmt, ) -> Result { + let raw_params = stmt.column_text(5)?; + Ok(LocallyTrackedSubscription { id: stmt.column_int64(0), stream_name: stmt.column_text(1)?.to_string(), @@ -277,9 +285,11 @@ impl StorageAdapter { local_priority: column_nullable(&stmt, 4, || { BucketPriority::try_from(stmt.column_int(4)) })?, - local_params: column_nullable(&stmt, 5, || { - JsonString::from_string(stmt.column_text(5)?.to_string()) - })?, + local_params: if raw_params == "null" { + None + } else { + Some(JsonString::from_string(stmt.column_text(5)?.to_string())?) + }, ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?, expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?, last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?, @@ -313,6 +323,13 @@ impl StorageAdapter { Err(PowerSyncError::unknown_internal()) } } + + pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> { + let _ = self.delete_subscription.reset(); + self.delete_subscription.bind_int64(1, id)?; + self.delete_subscription.exec()?; + Ok(()) + } } pub struct BucketInfo { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index ba173c9..8d74878 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -573,25 +573,41 @@ impl StreamingSyncIteration { let mut tracked_subscriptions: Vec = Vec::new(); // Load known subscriptions from database - self.adapter.iterate_local_subscriptions(|sub| { + self.adapter.iterate_local_subscriptions(|mut sub| { + // We will mark it as active again if it's part of the streams included in the + // checkpoint. + sub.active = false; + tracked_subscriptions.push(sub); })?; // If they don't exist already, create default subscriptions included in checkpoint for subscription in &tracked.streams { - if subscription.is_default { - let found = tracked_subscriptions - .iter() - .filter(|s| s.stream_name == subscription.name && s.local_params.is_none()) - .next(); + let matching_local_subscriptions = tracked_subscriptions + .iter_mut() + .filter(|s| s.stream_name == subscription.name); + + let mut has_local = false; + for subscription in matching_local_subscriptions { + subscription.active = true; + has_local = true; + } - if found.is_none() { - let subscription = self.adapter.create_default_subscription(subscription)?; - tracked_subscriptions.push(subscription); - } + if !has_local && subscription.is_default { + let subscription = self.adapter.create_default_subscription(subscription)?; + tracked_subscriptions.push(subscription); } } + // Clean up default subscriptions that are no longer active. + for subscription in &tracked_subscriptions { + if subscription.is_default && !subscription.active { + self.adapter.delete_subscription(subscription.id)?; + } + } + tracked_subscriptions + .retain(|subscription| !subscription.is_default || subscription.active); + debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id)); let mut resolved: Vec = @@ -609,8 +625,6 @@ impl StreamingSyncIteration { } } - // TODO: Cleanup old default subscriptions? - // Iterate over buckets to associate them with subscriptions for bucket in tracked.checkpoint.buckets.values() { match &bucket.subscriptions { diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 868e7a5..1c74b55 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -3,8 +3,14 @@ use core::{cmp::Ordering, hash::Hash, time::Duration}; use alloc::{boxed::Box, string::String}; use serde::Deserialize; use serde_with::{serde_as, DurationSeconds}; +use sqlite_nostd::{self as sqlite, Connection}; -use crate::{sync::BucketPriority, util::JsonString}; +use crate::{ + error::{PSResult, PowerSyncError}, + ext::SafeManagedStmt, + sync::BucketPriority, + util::JsonString, +}; /// A key that uniquely identifies a stream subscription. #[derive(Debug, PartialEq, PartialOrd, Eq, Ord)] @@ -38,6 +44,7 @@ impl LocallyTrackedSubscription { /// A request sent from a PowerSync SDK to alter the subscriptions managed by this client. #[derive(Deserialize)] pub enum SubscriptionChangeRequest { + #[serde(rename = "subscribe")] Subscribe(SubscribeToStream), } @@ -45,9 +52,12 @@ pub enum SubscriptionChangeRequest { #[derive(Deserialize)] pub struct SubscribeToStream { pub stream: String, + #[serde(default)] pub params: Option>, #[serde_as(as = "Option")] + #[serde(default)] pub ttl: Option, + #[serde(default)] pub priority: Option, } @@ -57,3 +67,37 @@ pub struct UnsubscribeFromStream { pub params: Option>, pub immediate: bool, } + +pub fn apply_subscriptions( + db: *mut sqlite::sqlite3, + subscription: SubscriptionChangeRequest, +) -> Result<(), PowerSyncError> { + match subscription { + SubscriptionChangeRequest::Subscribe(subscription) => { + let stmt = db + .prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE") + .into_db_result(db)?; + + stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?; + match &subscription.priority { + Some(priority) => stmt.bind_int(2, priority.number), + None => stmt.bind_null(2), + }?; + stmt.bind_text( + 3, + match &subscription.params { + Some(params) => params.get(), + None => "null", + }, + sqlite::Destructor::STATIC, + )?; + match &subscription.ttl { + Some(ttl) => stmt.bind_int64(4, ttl.as_secs() as i64), + None => stmt.bind_null(4), + }?; + stmt.exec()?; + } + } + + Ok(()) +} diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index df2ad88..4f846d7 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -47,14 +47,19 @@ void main() { db.execute('commit'); final [row] = result; - final instructions = jsonDecode(row.columnAt(0)) as List; - for (final instruction in instructions) { - if (instruction case {'UpdateSyncStatus': final status}) { - lastStatus = status['status']!; + + final rawResult = row.columnAt(0); + if (rawResult is String) { + final instructions = jsonDecode(row.columnAt(0)) as List; + for (final instruction in instructions) { + if (instruction case {'UpdateSyncStatus': final status}) { + lastStatus = status['status']!; + } } + return instructions; + } else { + return const []; } - - return instructions; } group('default streams', () { @@ -109,5 +114,66 @@ void main() { final [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); expect(stored, containsPair('last_synced_at', 1740823200)); }); + + syncTest('are deleted', (_) { + control('start', null); + + for (final stream in ['s1', 's2']) { + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [ + bucketDescription('a', subscriptions: stream, priority: 1), + ], + streams: [(stream, true)], + ), + ), + ); + control( + 'line_text', + json.encode(checkpointComplete(priority: 1)), + ); + } + + expect( + lastStatus, + containsPair( + 'streams', + [containsPair('name', 's2')], + ), + ); + }); + + syncTest('can be made explicit', (_) { + control('start', null); + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [ + bucketDescription('a', subscriptions: 'a', priority: 1), + ], + streams: [('a', true)], + ), + ), + ); + + var [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(stored, containsPair('is_default', 1)); + + control( + 'subscriptions', + json.encode({ + 'subscribe': {'stream': 'a'}, + }), + ); + + [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(stored, containsPair('active', 1)); + expect(stored, containsPair('is_default', 0)); + }); }); } From 9ddded9d75ee470093531e445078144940d3028d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 13:47:46 +0200 Subject: [PATCH 10/18] Expire subscriptions after TTL --- crates/core/src/sync/interface.rs | 3 +- crates/core/src/sync/storage_adapter.rs | 62 +++++++++++++++++++++ crates/core/src/sync/streaming_sync.rs | 14 +++-- crates/core/src/sync/subscriptions.rs | 7 +++ dart/test/sync_stream_test.dart | 72 +++++++++++++++++++++++++ 5 files changed, 154 insertions(+), 4 deletions(-) diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 820990a..8538079 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -17,6 +17,7 @@ use crate::schema::Schema; use crate::state::DatabaseState; use crate::sync::subscriptions::{apply_subscriptions, SubscriptionChangeRequest}; use crate::sync::BucketPriority; +use crate::util::JsonString; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -142,7 +143,7 @@ pub struct RequestedStreamSubscription { /// The name of the sync stream to subscribe to. pub stream: String, /// Parameters to make available in the stream's definition. - pub parameters: Box, + pub parameters: Option>, pub override_priority: Option, #[serde_as(as = "DisplayFromStr")] pub client_id: i64, diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 70a15d5..9767c15 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -8,6 +8,7 @@ use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; use crate::{ error::{PSResult, PowerSyncError}, ext::SafeManagedStmt, + kv::client_id, operations::delete_bucket, schema::Schema, state::DatabaseState, @@ -36,6 +37,7 @@ pub struct StorageAdapter { pub progress_stmt: ManagedStmt, time_stmt: ManagedStmt, delete_subscription: ManagedStmt, + update_subscription: ManagedStmt, } impl StorageAdapter { @@ -52,11 +54,16 @@ impl StorageAdapter { let delete_subscription = db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?; + // language=SQLite + let update_subscription = + db.prepare_v2("UPDATE ps_stream_subscriptions SET active = ?2, is_default = ?3, ttl = ?, expires_at = ?, last_synced_at = ? WHERE id = ?1")?; + Ok(Self { db, progress_stmt: progress, time_stmt: time, delete_subscription, + update_subscription, }) } @@ -256,7 +263,23 @@ impl StorageAdapter { &self, include_defaults: bool, ) -> Result { + self.delete_outdated_subscriptions()?; + let mut subscriptions: Vec = Vec::new(); + let stmt = self + .db + .prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;")?; + + while let ResultCode::ROW = stmt.step()? { + let subscription = Self::read_stream_subscription(&stmt)?; + + subscriptions.push(RequestedStreamSubscription { + stream: subscription.stream_name, + parameters: subscription.local_params, + override_priority: subscription.local_priority, + client_id: subscription.id, + }); + } Ok(StreamSubscriptionRequest { include_defaults, @@ -296,6 +319,12 @@ impl StorageAdapter { }) } + fn delete_outdated_subscriptions(&self) -> Result<(), PowerSyncError> { + self.db + .exec_safe("DELETE FROM ps_stream_subscriptions WHERE expires_at < unixepoch()")?; + Ok(()) + } + pub fn iterate_local_subscriptions ()>( &self, mut action: F, @@ -324,6 +353,39 @@ impl StorageAdapter { } } + pub fn update_subscription( + &self, + subscription: &LocallyTrackedSubscription, + ) -> Result<(), PowerSyncError> { + let _ = self.update_subscription.reset(); + + self.update_subscription.bind_int64(1, subscription.id)?; + self.update_subscription + .bind_int(2, if subscription.active { 1 } else { 0 })?; + self.update_subscription + .bind_int(3, if subscription.is_default { 1 } else { 0 })?; + if let Some(ttl) = subscription.ttl { + self.update_subscription.bind_int64(4, ttl)?; + } else { + self.update_subscription.bind_null(4)?; + } + + if let Some(expires_at) = subscription.expires_at { + self.update_subscription.bind_int64(5, expires_at)?; + } else { + self.update_subscription.bind_null(5)?; + } + + if let Some(last_synced_at) = subscription.last_synced_at { + self.update_subscription.bind_int64(6, last_synced_at)?; + } else { + self.update_subscription.bind_null(6)?; + } + + self.update_subscription.exec()?; + Ok(()) + } + pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> { let _ = self.delete_subscription.reset(); self.delete_subscription.bind_int64(1, id)?; diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 8d74878..f5e7d06 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -571,6 +571,7 @@ impl StreamingSyncIteration { tracked: &TrackedCheckpoint, ) -> Result, PowerSyncError> { let mut tracked_subscriptions: Vec = Vec::new(); + let now = self.adapter.now()?; // Load known subscriptions from database self.adapter.iterate_local_subscriptions(|mut sub| { @@ -588,9 +589,14 @@ impl StreamingSyncIteration { .filter(|s| s.stream_name == subscription.name); let mut has_local = false; - for subscription in matching_local_subscriptions { - subscription.active = true; + for local in matching_local_subscriptions { + local.active = true; + local.is_default = subscription.is_default; has_local = true; + + if let Some(ttl) = local.ttl { + local.expires_at = Some(now.0 + ttl); + } } if !has_local && subscription.is_default { @@ -601,8 +607,10 @@ impl StreamingSyncIteration { // Clean up default subscriptions that are no longer active. for subscription in &tracked_subscriptions { - if subscription.is_default && !subscription.active { + if !subscription.has_subscribed_manually() && !subscription.active { self.adapter.delete_subscription(subscription.id)?; + } else { + self.adapter.update_subscription(subscription)?; } } tracked_subscriptions diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 1c74b55..ba51a3f 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -33,12 +33,19 @@ pub struct LocallyTrackedSubscription { } impl LocallyTrackedSubscription { + /// The default TTL of non-default subscriptions if none is set: One day. + pub const DEFAULT_TTL: i64 = 60 * 60 * 24; + pub fn key(&self) -> SubscriptionKey { SubscriptionKey { stream_name: self.stream_name.clone(), params: self.local_params.clone(), } } + + pub fn has_subscribed_manually(&self) -> bool { + self.ttl.is_some() + } } /// A request sent from a PowerSync SDK to alter the subscriptions managed by this client. diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index 4f846d7..b7a0760 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -175,5 +175,77 @@ void main() { expect(stored, containsPair('active', 1)); expect(stored, containsPair('is_default', 0)); }); + + syncTest('ttl', (controller) { + db.execute( + 'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);', + ['my_stream', 3600]); + + var startInstructions = control('start', null); + expect( + startInstructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair( + 'streams', + { + 'include_defaults': true, + 'subscriptions': [ + { + 'stream': 'my_stream', + 'parameters': null, + 'override_priority': null, + 'client_id': '1', + } + ], + }, + ), + ), + ), + ), + ); + + // Send a checkpoint containing the stream, increasing the TTL. + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [], + streams: [('my_stream', false)], + ), + ), + ); + + final [row] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(row, containsPair('expires_at', 1740826800)); + control('stop', null); + + // Elapse beyond end of TTL + controller.elapse(const Duration(hours: 2)); + startInstructions = control('start', null); + expect( + startInstructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair( + 'streams', + { + 'include_defaults': true, + // Outdated subscription should no longer be included. + 'subscriptions': isEmpty, + }, + ), + ), + ), + ), + ); + }); }); } From 35bfc4efaaf6208f163b9c4843cfb10bd49ade18 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 13:56:57 +0200 Subject: [PATCH 11/18] Support unsubscribing --- crates/core/src/sync/storage_adapter.rs | 4 +- crates/core/src/sync/subscriptions.rs | 17 +++++++ dart/test/sync_stream_test.dart | 59 +++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 9767c15..ee1765a 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -266,9 +266,11 @@ impl StorageAdapter { self.delete_outdated_subscriptions()?; let mut subscriptions: Vec = Vec::new(); + // We have an explicit subscription iff ttl is not null. Checking is_default is not enough, + // because a stream can both be a default stream and have an explicit subscription. let stmt = self .db - .prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;")?; + .prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE ttl IS NOT NULL;")?; while let ResultCode::ROW = stmt.step()? { let subscription = Self::read_stream_subscription(&stmt)?; diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index ba51a3f..e3c2d7c 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -53,6 +53,8 @@ impl LocallyTrackedSubscription { pub enum SubscriptionChangeRequest { #[serde(rename = "subscribe")] Subscribe(SubscribeToStream), + #[serde(rename = "unsubscribe")] + Unsubscribe(UnsubscribeFromStream), } #[serde_as] @@ -104,6 +106,21 @@ pub fn apply_subscriptions( }?; stmt.exec()?; } + SubscriptionChangeRequest::Unsubscribe(subscription) => { + let stmt = db + .prepare_v2("UPDATE ps_stream_subscriptions SET ttl = NULL WHERE stream_name = ? AND local_params = ?") + .into_db_result(db)?; + stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?; + stmt.bind_text( + 2, + match &subscription.params { + Some(params) => params.get(), + None => "null", + }, + sqlite::Destructor::STATIC, + )?; + stmt.exec()?; + } } Ok(()) diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index b7a0760..bb2c38d 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -175,6 +175,65 @@ void main() { expect(stored, containsPair('active', 1)); expect(stored, containsPair('is_default', 0)); }); + }); + + group('explicit subscriptions', () { + syncTest('unsubscribe', (_) { + db.execute( + 'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);', + ['my_stream', 3600]); + + var startInstructions = control('start', null); + expect( + startInstructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair( + 'streams', + { + 'include_defaults': true, + 'subscriptions': isNotEmpty, + }, + ), + ), + ), + ), + ); + control('stop', null); + + control( + 'subscriptions', + json.encode({ + 'unsubscribe': { + 'stream': 'my_stream', + 'params': null, + 'immediate': false, + } + }), + ); + startInstructions = control('start', null); + expect( + startInstructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair( + 'streams', + { + 'include_defaults': true, + 'subscriptions': isEmpty, + }, + ), + ), + ), + ), + ); + }); syncTest('ttl', (controller) { db.execute( From 0e86ecbab3d085002b514cc2d31db05d5db71063 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 13:57:46 +0200 Subject: [PATCH 12/18] Delete outdated subscriptions --- crates/core/src/sync/storage_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index ee1765a..8befade 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -323,7 +323,7 @@ impl StorageAdapter { fn delete_outdated_subscriptions(&self) -> Result<(), PowerSyncError> { self.db - .exec_safe("DELETE FROM ps_stream_subscriptions WHERE expires_at < unixepoch()")?; + .exec_safe("DELETE FROM ps_stream_subscriptions WHERE (expires_at < unixepoch()) OR (ttl IS NULL AND NOT active)")?; Ok(()) } From 2765848d7db6e1046f7731e2be163e9a7a0d53f0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 20:18:09 +0200 Subject: [PATCH 13/18] Include default ttl --- Cargo.toml | 1 + crates/core/src/sync/subscriptions.rs | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2522cd..9100c5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ debug = true [profile.wasm] inherits = "release" +debug = true [profile.wasm_asyncify] inherits = "wasm" diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index e3c2d7c..d1802ce 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -100,10 +100,13 @@ pub fn apply_subscriptions( }, sqlite::Destructor::STATIC, )?; - match &subscription.ttl { - Some(ttl) => stmt.bind_int64(4, ttl.as_secs() as i64), - None => stmt.bind_null(4), - }?; + stmt.bind_int64( + 4, + subscription + .ttl + .map(|f| f.as_secs() as i64) + .unwrap_or(LocallyTrackedSubscription::DEFAULT_TTL) as i64, + )?; stmt.exec()?; } SubscriptionChangeRequest::Unsubscribe(subscription) => { From a93c2e5499daa5bb134d084951eea8bc5d77c800 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 15 Jul 2025 14:14:25 +0200 Subject: [PATCH 14/18] New protocol format --- crates/core/src/sync/checkpoint.rs | 4 +-- crates/core/src/sync/line.rs | 50 +++++++++++++------------- crates/core/src/sync/streaming_sync.rs | 27 +++++++------- crates/core/src/sync/sync_status.rs | 15 +++++++- dart/test/sync_stream_test.dart | 18 ++++++++-- 5 files changed, 69 insertions(+), 45 deletions(-) diff --git a/crates/core/src/sync/checkpoint.rs b/crates/core/src/sync/checkpoint.rs index e778af1..2dd9f59 100644 --- a/crates/core/src/sync/checkpoint.rs +++ b/crates/core/src/sync/checkpoint.rs @@ -1,4 +1,4 @@ -use alloc::{string::String, vec::Vec}; +use alloc::{rc::Rc, string::String, vec::Vec}; use num_traits::Zero; use crate::sync::{ @@ -15,7 +15,7 @@ pub struct OwnedBucketChecksum { pub checksum: Checksum, pub priority: BucketPriority, pub count: Option, - pub subscriptions: BucketSubscriptionReason, + pub subscriptions: Rc>, } impl OwnedBucketChecksum { diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index eedc770..4568f30 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -1,4 +1,5 @@ use alloc::borrow::Cow; +use alloc::rc::Rc; use alloc::string::{String, ToString}; use alloc::vec::Vec; use serde::de::{Error, IgnoredAny, VariantAccess, Visitor}; @@ -130,23 +131,19 @@ pub struct BucketChecksum<'a> { #[serde(default)] pub count: Option, #[serde(default)] - pub subscriptions: BucketSubscriptionReason, + pub subscriptions: Rc>, // #[serde(default)] // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] // pub last_op_id: Option, } /// The reason for why a bucket was included in a checkpoint. -#[derive(Debug, Default, Clone)] +#[derive(Debug)] pub enum BucketSubscriptionReason { - /// A bucket was created for all of the subscription ids we've explicitly requested in the sync - /// request. - ExplicitlySubscribed { subscriptions: Vec }, /// A bucket was created from a default stream. - IsDefault { stream_name: String }, - /// We're talking to an older sync service not sending the reason. - #[default] - Unknown, + DerivedFromDefaultStream(String), + /// A bucket was created for a subscription id we've explicitly requested in the sync request. + DerivedFromExplicitSubscription(i64), } impl<'de> Deserialize<'de> for BucketSubscriptionReason { @@ -156,6 +153,8 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason { { struct MyVisitor; + const VARIANTS: &'static [&'static str] = &["def", "sub"]; + impl<'de> Visitor<'de> for MyVisitor { type Value = BucketSubscriptionReason; @@ -163,30 +162,29 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason { write!(formatter, "a subscription reason") } - fn visit_seq(self, mut seq: A) -> Result + fn visit_enum(self, data: A) -> Result where - A: serde::de::SeqAccess<'de>, + A: serde::de::EnumAccess<'de>, { - let mut subscriptions = Vec::::new(); - - while let Some(item) = seq.next_element::<&'de str>()? { - subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?); - } - - Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions }) - } + let (key, variant) = data.variant::<&'de str>()?; + Ok(match key { + "def" => BucketSubscriptionReason::DerivedFromDefaultStream( + variant.newtype_variant()?, + ), + "sub" => { + let textual_id = variant.newtype_variant::<&'de str>()?; + let id = textual_id + .parse() + .map_err(|_| A::Error::custom("not an int"))?; - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - Ok(BucketSubscriptionReason::IsDefault { - stream_name: v.to_string(), + BucketSubscriptionReason::DerivedFromExplicitSubscription(id) + } + other => return Err(A::Error::unknown_variant(other, VARIANTS)), }) } } - deserializer.deserialize_any(MyVisitor) + deserializer.deserialize_enum("BucketSubscriptionReason", VARIANTS, MyVisitor) } } diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index f5e7d06..3ad5100 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -635,22 +635,23 @@ impl StreamingSyncIteration { // Iterate over buckets to associate them with subscriptions for bucket in tracked.checkpoint.buckets.values() { - match &bucket.subscriptions { - BucketSubscriptionReason::ExplicitlySubscribed { subscriptions } => { - for subscription_id in subscriptions { - if let Ok(index) = - tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id) - { - resolved[index].mark_associated_with_bucket(&bucket); - } + for reason in &*bucket.subscriptions { + let subscription_index = match reason { + BucketSubscriptionReason::DerivedFromDefaultStream(stream_name) => { + default_stream_subscriptions + .get(stream_name.as_str()) + .cloned() } - } - BucketSubscriptionReason::IsDefault { stream_name } => { - if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) { - resolved[*index].mark_associated_with_bucket(&bucket); + BucketSubscriptionReason::DerivedFromExplicitSubscription(subscription_id) => { + tracked_subscriptions + .binary_search_by_key(subscription_id, |s| s.id) + .ok() } + }; + + if let Some(index) = subscription_index { + resolved[index].mark_associated_with_bucket(&bucket); } - BucketSubscriptionReason::Unknown => {} } } diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index e56ec0a..5924b6c 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -278,6 +278,7 @@ pub struct ActiveStreamSubscription { pub priority: Option, pub active: bool, pub is_default: bool, + pub has_explicit_subscription: bool, pub expires_at: Option, pub last_synced_at: Option, } @@ -292,13 +293,25 @@ impl ActiveStreamSubscription { priority: None, associated_buckets: Vec::new(), active: local.active, + has_explicit_subscription: local.has_subscribed_manually(), expires_at: local.expires_at.clone().map(|e| Timestamp(e)), last_synced_at: local.last_synced_at.map(|e| Timestamp(e)), } } pub fn mark_associated_with_bucket(&mut self, bucket: &OwnedBucketChecksum) { - self.associated_buckets.push(bucket.bucket.clone()); + match self.associated_buckets.binary_search(&bucket.bucket) { + Ok(_) => { + // The bucket is already part of the list + return; + } + Err(position) => { + // Insert here to keep vec sorted + self.associated_buckets + .insert(position, bucket.bucket.clone()); + } + }; + self.priority = Some(match self.priority { None => bucket.priority, Some(prio) => min(prio, bucket.priority), diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index bb2c38d..3d303d1 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -72,7 +72,10 @@ void main() { lastOpId: 1, buckets: [ bucketDescription('a', - subscriptions: 'my_default_stream', priority: 1), + subscriptions: [ + {'def': 'my_default_stream'} + ], + priority: 1), ], streams: [('my_default_stream', true)], ), @@ -90,6 +93,7 @@ void main() { 'associated_buckets': ['a'], 'active': true, 'is_default': true, + 'has_explicit_subscription': false, 'expires_at': null, 'last_synced_at': null, 'priority': 1, @@ -125,7 +129,11 @@ void main() { checkpoint( lastOpId: 1, buckets: [ - bucketDescription('a', subscriptions: stream, priority: 1), + bucketDescription('a', + subscriptions: [ + {'def': stream} + ], + priority: 1), ], streams: [(stream, true)], ), @@ -154,7 +162,11 @@ void main() { checkpoint( lastOpId: 1, buckets: [ - bucketDescription('a', subscriptions: 'a', priority: 1), + bucketDescription('a', + subscriptions: [ + {'def': 'a'} + ], + priority: 1), ], streams: [('a', true)], ), From f915d6b9ad216205725712d4e1cb0bc37673adc1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 11:12:38 +0200 Subject: [PATCH 15/18] Fix tests --- crates/core/src/migrations.rs | 2 +- dart/test/goldens/simple_iteration.json | 2 +- dart/test/goldens/starting_stream.json | 2 +- dart/test/utils/migration_fixtures.dart | 74 ++++++++++++++++++++++++- dart/test/utils/test_utils.dart | 2 +- 5 files changed, 77 insertions(+), 5 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index df92ee2..6adde8e 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -400,7 +400,7 @@ CREATE TABLE ps_stream_subscriptions ( ) STRICT; INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array( -json_object('sql', 'todo down migration'), +json_object('sql', 'DROP TABLE ps_stream_subscriptions'), json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') )); "; diff --git a/dart/test/goldens/simple_iteration.json b/dart/test/goldens/simple_iteration.json index 8e76a25..ebea369 100644 --- a/dart/test/goldens/simple_iteration.json +++ b/dart/test/goldens/simple_iteration.json @@ -10,7 +10,7 @@ "connecting": true, "priority_status": [], "downloading": null, - "streams": [] + "streams": null } } }, diff --git a/dart/test/goldens/starting_stream.json b/dart/test/goldens/starting_stream.json index 2549905..50dd2a0 100644 --- a/dart/test/goldens/starting_stream.json +++ b/dart/test/goldens/starting_stream.json @@ -14,7 +14,7 @@ "connecting": true, "priority_status": [], "downloading": null, - "streams": [] + "streams": null } } }, diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index ce8b2b8..78075a1 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 10; +const databaseVersion = 11; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -354,6 +354,66 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +''', + 11: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_stream_subscriptions ( + id INTEGER NOT NULL PRIMARY KEY, + stream_name TEXT NOT NULL, + active INTEGER NOT NULL DEFAULT FALSE, + is_default INTEGER NOT NULL DEFAULT FALSE, + local_priority INTEGER, + local_params TEXT NOT NULL DEFAULT 'null', + ttl INTEGER, + expires_at INTEGER, + last_synced_at INTEGER, + UNIQUE (stream_name, local_params) +) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') ''', }; @@ -456,6 +516,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 11: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -501,6 +572,7 @@ final dataDown1 = { 7: data1[5]!, 8: data1[5]!, 9: data1[9]!, + 10: data1[9]!, }; final finalData1 = data1[databaseVersion]!; diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart index c36c206..b0fe7d4 100644 --- a/dart/test/utils/test_utils.dart +++ b/dart/test/utils/test_utils.dart @@ -43,7 +43,7 @@ Object bucketDescription( 'checksum': checksum, 'priority': priority, 'count': count, - 'subscriptions': subscriptions, + if (subscriptions != null) 'subscriptions': subscriptions, }; } From 3e1196c7275e20b3c685887bf1cf08f7b80f9652 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 11:45:38 +0200 Subject: [PATCH 16/18] More stream management tests --- Cargo.toml | 1 - crates/core/src/sync/streaming_sync.rs | 3 +- crates/core/src/sync/subscriptions.rs | 3 +- dart/test/sync_stream_test.dart | 75 ++++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9100c5c..f2522cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ debug = true [profile.wasm] inherits = "release" -debug = true [profile.wasm_asyncify] inherits = "wasm" diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 3ad5100..f934d01 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -578,6 +578,7 @@ impl StreamingSyncIteration { // We will mark it as active again if it's part of the streams included in the // checkpoint. sub.active = false; + sub.is_default = false; tracked_subscriptions.push(sub); })?; @@ -614,7 +615,7 @@ impl StreamingSyncIteration { } } tracked_subscriptions - .retain(|subscription| !subscription.is_default || subscription.active); + .retain(|subscription| subscription.has_subscribed_manually() || subscription.active); debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id)); diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index d1802ce..91f97c2 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -73,6 +73,7 @@ pub struct SubscribeToStream { #[derive(Deserialize)] pub struct UnsubscribeFromStream { pub stream: String, + #[serde(default)] pub params: Option>, pub immediate: bool, } @@ -84,7 +85,7 @@ pub fn apply_subscriptions( match subscription { SubscriptionChangeRequest::Subscribe(subscription) => { let stmt = db - .prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE") + .prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4") .into_db_result(db)?; stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?; diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index 3d303d1..609a929 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -175,6 +175,7 @@ void main() { var [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); expect(stored, containsPair('is_default', 1)); + expect(stored, containsPair('ttl', isNull)); control( 'subscriptions', @@ -185,7 +186,29 @@ void main() { [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); expect(stored, containsPair('active', 1)); + // It's still a default stream, but it now has a TTL to indicate the + // explicit subscription. + expect(stored, containsPair('is_default', 1)); + expect(stored, containsPair('ttl', isNotNull)); + + // Remove the stream from the checkpoint, should still be included due to + // the explicit subscription. + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [ + bucketDescription('a', priority: 1), + ], + ), + ), + ); + + [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(stored, containsPair('active', 0)); expect(stored, containsPair('is_default', 0)); + expect(stored, containsPair('ttl', isNotNull)); }); }); @@ -318,5 +341,57 @@ void main() { ), ); }); + + syncTest('can be made implicit', (_) { + control( + 'subscriptions', + json.encode({ + 'subscribe': {'stream': 'a'} + })); + control('start', null); + control( + 'line_text', + json.encode( + checkpoint( + lastOpId: 1, + buckets: [], + streams: [('a', true)], + ), + ), + ); + + var [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(stored, containsPair('is_default', 1)); + expect(stored, containsPair('ttl', isNotNull)); + + control( + 'subscriptions', + json.encode({ + 'unsubscribe': {'stream': 'a', 'immediate': false} + }), + ); + control('stop', null); + + // The stream should no longer be requested + var startInstructions = control('start', null); + expect( + startInstructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair( + 'streams', + { + 'include_defaults': true, + 'subscriptions': isEmpty, + }, + ), + ), + ), + ), + ); + }); }); } From 24fcc0e75065ef54cd35544b0d80489fa3f2d0f0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 13:55:09 +0200 Subject: [PATCH 17/18] Remove immediate parameter when unsubscribing --- crates/core/src/sync/subscriptions.rs | 1 - dart/test/sync_stream_test.dart | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 91f97c2..862cf8d 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -75,7 +75,6 @@ pub struct UnsubscribeFromStream { pub stream: String, #[serde(default)] pub params: Option>, - pub immediate: bool, } pub fn apply_subscriptions( diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index 609a929..c3c9b64 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -245,7 +245,6 @@ void main() { 'unsubscribe': { 'stream': 'my_stream', 'params': null, - 'immediate': false, } }), ); @@ -367,7 +366,7 @@ void main() { control( 'subscriptions', json.encode({ - 'unsubscribe': {'stream': 'a', 'immediate': false} + 'unsubscribe': {'stream': 'a'} }), ); control('stop', null); From deb144466b15d0f1a564370297633976f37b7705 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 22 Jul 2025 14:01:19 +0200 Subject: [PATCH 18/18] Increase expires_at only when subscribing again --- crates/core/src/sync/streaming_sync.rs | 4 ---- crates/core/src/sync/subscriptions.rs | 12 ++++++++++- dart/test/sync_stream_test.dart | 30 +++++++++++--------------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index f934d01..b293315 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -594,10 +594,6 @@ impl StreamingSyncIteration { local.active = true; local.is_default = subscription.is_default; has_local = true; - - if let Some(ttl) = local.ttl { - local.expires_at = Some(now.0 + ttl); - } } if !has_local && subscription.is_default { diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 862cf8d..1f48e7a 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -84,7 +84,17 @@ pub fn apply_subscriptions( match subscription { SubscriptionChangeRequest::Subscribe(subscription) => { let stmt = db - .prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4") + .prepare_v2( + " +INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl, expires_at) + VALUES (?, ?2, ?, ?4, unixepoch() + ?4) + ON CONFLICT DO UPDATE SET + local_priority = min(coalesce(?2, local_priority), + local_priority), + ttl = ?4, + expires_at = unixepoch() + ?4 + ", + ) .into_db_result(db)?; stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?; diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index c3c9b64..6a5464e 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -270,9 +270,18 @@ void main() { }); syncTest('ttl', (controller) { - db.execute( - 'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);', - ['my_stream', 3600]); + control( + 'subscriptions', + json.encode({ + 'subscribe': { + 'stream': 'my_stream', + 'ttl': 3600, + } + }), + ); + + final [row] = db.select('SELECT * FROM ps_stream_subscriptions'); + expect(row, containsPair('expires_at', 1740826800)); var startInstructions = control('start', null); expect( @@ -300,21 +309,6 @@ void main() { ), ), ); - - // Send a checkpoint containing the stream, increasing the TTL. - control( - 'line_text', - json.encode( - checkpoint( - lastOpId: 1, - buckets: [], - streams: [('my_stream', false)], - ), - ), - ); - - final [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', 1740826800)); control('stop', null); // Elapse beyond end of TTL