Skip to content

Sync streams #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const_format = "0.2.34"
futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] }
rustc-hash = { version = "2.1", default-features = false }
thiserror = { version = "2", default-features = false }
serde_with = { version = "3.14.0", default-features = false, features = ["alloc", "macros"] }

[dependencies.uuid]
version = "1.4.1"
Expand Down
25 changes: 24 additions & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
use crate::fix_data::apply_v035_fix;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 10;
pub const LATEST_VERSION: i32 = 11;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -384,5 +384,28 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
.into_db_result(local_db)?;
}

if current_version < 11 && target_version >= 11 {
let stmt = "\
CREATE TABLE ps_stream_subscriptions (
id INTEGER NOT NULL PRIMARY KEY,
stream_name TEXT NOT NULL,
active INTEGER NOT NULL DEFAULT FALSE,
is_default INTEGER NOT NULL DEFAULT FALSE,
local_priority INTEGER,
local_params TEXT NOT NULL DEFAULT 'null',
ttl INTEGER,
expires_at INTEGER,
last_synced_at INTEGER,
UNIQUE (stream_name, local_params)
) STRICT;

INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
json_object('sql', 'DROP TABLE ps_stream_subscriptions'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
));
";
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

Ok(())
}
9 changes: 7 additions & 2 deletions crates/core/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use alloc::{string::String, vec::Vec};
use alloc::{rc::Rc, string::String, vec::Vec};
use num_traits::Zero;

use crate::sync::{line::BucketChecksum, BucketPriority, Checksum};
use crate::sync::{
line::{BucketChecksum, BucketSubscriptionReason},
BucketPriority, Checksum,
};
use sqlite_nostd::{self as sqlite, Connection, ResultCode};

/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from
Expand All @@ -12,6 +15,7 @@ pub struct OwnedBucketChecksum {
pub checksum: Checksum,
pub priority: BucketPriority,
pub count: Option<i64>,
pub subscriptions: Rc<Vec<BucketSubscriptionReason>>,
}

impl OwnedBucketChecksum {
Expand All @@ -30,6 +34,7 @@ impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {
checksum: value.checksum,
priority: value.priority.unwrap_or(BucketPriority::FALLBACK),
count: value.count,
subscriptions: value.subscriptions.clone(),
}
}
}
Expand Down
48 changes: 47 additions & 1 deletion crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,47 @@ use alloc::rc::Rc;
use alloc::sync::Arc;
use alloc::{string::String, vec::Vec};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sqlite::{ResultCode, Value};
use sqlite_nostd::{self as sqlite, ColumnType};
use sqlite_nostd::{Connection, Context};

use crate::error::PowerSyncError;
use crate::schema::Schema;
use crate::state::DatabaseState;
use crate::sync::subscriptions::{apply_subscriptions, SubscriptionChangeRequest};
use crate::sync::BucketPriority;
use crate::util::JsonString;

use super::streaming_sync::SyncClient;
use super::sync_status::DownloadSyncStatus;

/// Payload provided by SDKs when requesting a sync iteration.
#[derive(Default, Deserialize)]
#[derive(Deserialize)]
pub struct StartSyncStream {
/// Bucket parameters to include in the request when opening a sync stream.
#[serde(default)]
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default)]
pub schema: Schema,
#[serde(default = "StartSyncStream::include_defaults_by_default")]
pub include_defaults: bool,
}

impl StartSyncStream {
pub const fn include_defaults_by_default() -> bool {
true
}
}

impl Default for StartSyncStream {
fn default() -> Self {
Self {
parameters: Default::default(),
schema: Default::default(),
include_defaults: Self::include_defaults_by_default(),
}
}
}

/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
Expand Down Expand Up @@ -106,6 +128,25 @@ pub struct StreamingSyncRequest {
pub binary_data: bool,
pub client_id: String,
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
pub streams: StreamSubscriptionRequest,
}

#[derive(Serialize)]
pub struct StreamSubscriptionRequest {
pub include_defaults: bool,
pub subscriptions: Vec<RequestedStreamSubscription>,
}

#[serde_as]
#[derive(Serialize)]
pub struct RequestedStreamSubscription {
/// The name of the sync stream to subscribe to.
pub stream: String,
/// Parameters to make available in the stream's definition.
pub parameters: Option<Box<JsonString>>,
pub override_priority: Option<BucketPriority>,
#[serde_as(as = "DisplayFromStr")]
pub client_id: i64,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -177,6 +218,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
}),
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
"subscriptions" => {
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
return apply_subscriptions(ctx.db_handle(), request);
}
_ => {
return Err(PowerSyncError::argument_error("Unknown operation"));
}
Expand Down
Loading
Loading