Skip to content

Commit 5b157b1

Browse files
committed
Add protocol changes
1 parent c0b508b commit 5b157b1

File tree

4 files changed

+79
-22
lines changed

4 files changed

+79
-22
lines changed

crates/core/src/migrations.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,12 +386,13 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
386386

387387
if current_version < 11 && target_version >= 11 {
388388
let stmt = "\
389-
CREATE TABLE ps_streams (
389+
CREATE TABLE ps_stream_subscriptions (
390390
id NOT NULL INTEGER PRIMARY KEY,
391-
definition_name TEXT NOT NULL,
391+
stream_name TEXT NOT NULL,
392392
is_default INTEGER NOT NULL,
393393
local_priority INTEGER,
394-
local_params TEXT
394+
local_params TEXT,
395+
ttl INTEGER
395396
) STRICT;
396397
ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id);
397398

crates/core/src/sync/interface.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use sqlite_nostd::{Connection, Context};
1414
use crate::error::PowerSyncError;
1515
use crate::schema::Schema;
1616
use crate::state::DatabaseState;
17+
use crate::util::serialize_i64_to_string;
1718

1819
use super::streaming_sync::SyncClient;
1920
use super::sync_status::DownloadSyncStatus;
@@ -108,6 +109,22 @@ pub struct StreamingSyncRequest {
108109
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
109110
}
110111

112+
#[derive(Serialize)]
113+
pub struct StreamSubscriptionRequest {
114+
pub include_defaults: bool,
115+
pub subscriptions: Vec<RequestedStreamSubscription>,
116+
}
117+
118+
#[derive(Serialize)]
119+
pub struct RequestedStreamSubscription {
120+
/// The name of the sync stream to subscribe to.
121+
pub stream: String,
122+
/// Parameters to make available in the stream's definition.
123+
pub parameters: Box<serde_json::value::RawValue>,
124+
#[serde(serialize_with = "serialize_i64_to_string")]
125+
pub client_id: i64,
126+
}
127+
111128
#[derive(Serialize)]
112129
pub struct BucketRequest {
113130
pub name: String,

crates/core/src/sync/line.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,60 @@ pub struct BucketChecksum<'a> {
119119
pub priority: Option<BucketPriority>,
120120
#[serde(default)]
121121
pub count: Option<i64>,
122+
#[serde(
123+
default,
124+
deserialize_with = "BucketChecksum::deserialize_subscriptions"
125+
)]
126+
pub subscriptions: Option<Vec<i64>>,
122127
// #[serde(default)]
123128
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
124129
// pub last_op_id: Option<i64>,
125130
}
126131

132+
impl BucketChecksum<'_> {
133+
fn deserialize_subscriptions<'de, D: serde::Deserializer<'de>>(
134+
deserializer: D,
135+
) -> Result<Option<Vec<i64>>, D::Error> {
136+
struct MyVisitor;
137+
138+
impl<'de> Visitor<'de> for MyVisitor {
139+
type Value = Option<Vec<i64>>;
140+
141+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
142+
write!(formatter, "optional list of subscriptions")
143+
}
144+
145+
fn visit_none<E>(self) -> Result<Self::Value, E>
146+
where
147+
E: serde::de::Error,
148+
{
149+
Ok(None)
150+
}
151+
152+
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
153+
where
154+
D: serde::Deserializer<'de>,
155+
{
156+
deserializer.deserialize_seq(self)
157+
}
158+
159+
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
160+
where
161+
A: serde::de::SeqAccess<'de>,
162+
{
163+
let mut result: Vec<i64> = Vec::new();
164+
while let Some(element) = seq.next_element::<&'de str>()? {
165+
result.push(element.parse::<i64>().map_err(serde::de::Error::custom)?);
166+
}
167+
168+
Ok(Some(result))
169+
}
170+
}
171+
172+
deserializer.deserialize_option(MyVisitor)
173+
}
174+
}
175+
127176
#[derive(Deserialize, Debug)]
128177
pub struct DataLine<'a> {
129178
#[serde(borrow)]

crates/core/src/util.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,29 +36,19 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String {
3636
return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\""));
3737
}
3838

39+
pub fn serialize_i64_to_string<'de, S: serde::Serializer>(
40+
value: &i64,
41+
serializer: S,
42+
) -> Result<S::Ok, S::Error> {
43+
serializer.collect_str(value)
44+
}
45+
3946
pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result<i64, D::Error>
4047
where
4148
D: serde::Deserializer<'de>,
4249
{
43-
struct ValueVisitor;
44-
45-
impl<'de> Visitor<'de> for ValueVisitor {
46-
type Value = i64;
47-
48-
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
49-
formatter.write_str("a string representation of a number")
50-
}
51-
52-
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
53-
where
54-
E: serde::de::Error,
55-
{
56-
v.parse::<i64>().map_err(serde::de::Error::custom)
57-
}
58-
}
59-
60-
// Using a custom visitor here to avoid an intermediate string allocation
61-
deserializer.deserialize_str(ValueVisitor)
50+
let value: &'de str = serde::Deserialize::deserialize(deserializer)?;
51+
value.parse::<i64>().map_err(serde::de::Error::custom)
6252
}
6353

6454
pub fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result<Option<i64>, D::Error>

0 commit comments

Comments
 (0)