Skip to content

WIP: Option to use core exension for sync logic #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required SyncOptions options,
required ResolvedSyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
}) async {
Expand All @@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl
SendPort? initPort;
final hasInitPort = Completer<void>();
final receivedIsolateExit = Completer<void>();
final resolved = ResolvedSyncOptions(options);

Future<void> waitForShutdown() async {
// Only complete the abortion signal after the isolate shuts down. This
Expand Down Expand Up @@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl
final port = initPort = data[1] as SendPort;
hasInitPort.complete();
var crudStream = database
.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
.onChange(['ps_crud'], throttle: options.crudThrottleTime);
crudUpdateSubscription = crudStream.listen((event) {
port.send(['update']);
});
Expand Down Expand Up @@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl
_PowerSyncDatabaseIsolateArgs(
receiveMessages.sendPort,
dbRef,
resolved,
options,
crudMutex.shared,
syncMutex.shared,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required AbortController abort,
required Zone asyncWorkZone,
required SyncOptions options,
required ResolvedSyncOptions options,
}) {
throw UnimplementedError();
}
Expand Down
11 changes: 6 additions & 5 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
// the lock for the connection.
await initialize();

final resolvedOptions = SyncOptions(
crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime,
final resolvedOptions = ResolvedSyncOptions.resolve(
options,
crudThrottleTime: crudThrottleTime,
// ignore: deprecated_member_use_from_same_package
retryDelay: options?.retryDelay ?? retryDelay,
params: options?.params ?? params,
retryDelay: retryDelay,
params: params,
);

// ignore: deprecated_member_use_from_same_package
Expand Down Expand Up @@ -362,7 +363,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required SyncOptions options,
required ResolvedSyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required AbortController abort,
required Zone asyncWorkZone,
required SyncOptions options,
required ResolvedSyncOptions options,
}) async {
final resolved = ResolvedSyncOptions(options);

final storage = BucketStorage(database);
StreamingSync sync;
// Try using a shared worker for the synchronization implementation to avoid
Expand All @@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl
sync = await SyncWorkerHandle.start(
database: this,
connector: connector,
options: options,
options: options.source,
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
);
} catch (e) {
Expand All @@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl
e,
);
final crudStream =
database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
database.onChange(['ps_crud'], throttle: options.crudThrottleTime);

sync = StreamingSyncImplementation(
adapter: storage,
connector: InternalConnector.wrap(connector, this),
crudUpdateTriggerStream: crudStream,
options: resolved,
options: options,
client: BrowserClient(),
// Only allows 1 sync implementation to run at a time per database
// This should be global (across tabs) when using Navigator locks.
Expand Down
16 changes: 16 additions & 0 deletions packages/powersync_core/lib/src/sync/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
});
}

Future<String> control(String op, [Object? payload]) async {
return await writeTransaction(
(tx) async {
final [row] =
await tx.execute('SELECT powersync_control(?, ?)', [op, payload]);
return row.columnAt(0) as String;
},
// We flush when powersync_control yields an instruction to do so.
flush: false,
);
}

Future<void> flushFileSystem() async {
// Noop outside of web.
}

/// Note: The asynchronous nature of this is due to this needing a global
/// lock. The actual database operations are still synchronous, and it
/// is assumed that multiple functions on this instance won't be called
Expand Down
153 changes: 153 additions & 0 deletions packages/powersync_core/lib/src/sync/instruction.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import 'sync_status.dart';

/// An internal instruction emitted by the sync client in the core extension in
/// response to the Dart SDK passing sync data into the extension.
sealed class Instruction {
factory Instruction.fromJson(Map<String, Object?> json) {
return switch (json) {
{'LogLine': final logLine} =>
LogLine.fromJson(logLine as Map<String, Object?>),
{'UpdateSyncStatus': final updateStatus} =>
UpdateSyncStatus.fromJson(updateStatus as Map<String, Object?>),
{'EstablishSyncStream': final establish} =>
EstablishSyncStream.fromJson(establish as Map<String, Object?>),
{'FetchCredentials': final creds} =>
FetchCredentials.fromJson(creds as Map<String, Object?>),
{'CloseSyncStream': _} => const CloseSyncStream(),
{'FlushFileSystem': _} => const FlushFileSystem(),
{'DidCompleteSync': _} => const DidCompleteSync(),
_ => UnknownSyncInstruction(json)
};
}
}

final class LogLine implements Instruction {
final String severity;
final String line;

LogLine({required this.severity, required this.line});

factory LogLine.fromJson(Map<String, Object?> json) {
return LogLine(
severity: json['severity'] as String,
line: json['line'] as String,
);
}
}

final class EstablishSyncStream implements Instruction {
final Map<String, Object?> request;

EstablishSyncStream(this.request);

factory EstablishSyncStream.fromJson(Map<String, Object?> json) {
return EstablishSyncStream(json['request'] as Map<String, Object?>);
}
}

final class UpdateSyncStatus implements Instruction {
final CoreSyncStatus status;

UpdateSyncStatus({required this.status});

factory UpdateSyncStatus.fromJson(Map<String, Object?> json) {
return UpdateSyncStatus(
status:
CoreSyncStatus.fromJson(json['status'] as Map<String, Object?>));
}
}

final class CoreSyncStatus {
final bool connected;
final bool connecting;
final List<SyncPriorityStatus> priorityStatus;
final DownloadProgress? downloading;

CoreSyncStatus({
required this.connected,
required this.connecting,
required this.priorityStatus,
required this.downloading,
});

factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
return CoreSyncStatus(
connected: json['connected'] as bool,
connecting: json['connecting'] as bool,
priorityStatus: [
for (final entry in json['priority_status'] as List)
_priorityStatusFromJson(entry as Map<String, Object?>)
],
downloading: switch (json['downloading']) {
null => null,
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
},
);
}

static SyncPriorityStatus _priorityStatusFromJson(Map<String, Object?> json) {
return (
priority: BucketPriority(json['priority'] as int),
hasSynced: json['has_synced'] as bool?,
lastSyncedAt: switch (json['last_synced_at']) {
null => null,
final lastSyncedAt as int =>
DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000),
},
);
}
}

final class DownloadProgress {
final Map<String, BucketProgress> buckets;

DownloadProgress(this.buckets);

factory DownloadProgress.fromJson(Map<String, Object?> line) {
final rawBuckets = line['buckets'] as Map<String, Object?>;

return DownloadProgress(rawBuckets.map((k, v) {
return MapEntry(
k,
_bucketProgressFromJson(v as Map<String, Object?>),
);
}));
}

static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {
return (
priority: BucketPriority(json['priority'] as int),
atLast: json['at_last'] as int,
sinceLast: json['since_last'] as int,
targetCount: json['target_count'] as int,
);
}
}

final class FetchCredentials implements Instruction {
final bool didExpire;

FetchCredentials(this.didExpire);

factory FetchCredentials.fromJson(Map<String, Object?> line) {
return FetchCredentials(line['did_expire'] as bool);
}
}

final class CloseSyncStream implements Instruction {
const CloseSyncStream();
}

final class FlushFileSystem implements Instruction {
const FlushFileSystem();
}

final class DidCompleteSync implements Instruction {
const DidCompleteSync();
}

final class UnknownSyncInstruction implements Instruction {
final Map<String, Object?> source;

UnknownSyncInstruction(this.source);
}
15 changes: 15 additions & 0 deletions packages/powersync_core/lib/src/sync/mutable_sync_status.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';

import 'package:collection/collection.dart';

import 'instruction.dart';
import 'sync_status.dart';
import 'bucket_storage.dart';
import 'protocol.dart';
Expand Down Expand Up @@ -79,6 +80,20 @@ final class MutableSyncStatus {
}
}

void applyFromCore(CoreSyncStatus status) {
connected = status.connected;
connecting = status.connecting;
downloading = status.downloading != null;
priorityStatusEntries = status.priorityStatus;
downloadProgress = switch (status.downloading) {
null => null,
final downloading => InternalSyncDownloadProgress(downloading.buckets),
};
lastSyncedAt = status.priorityStatus
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)
?.lastSyncedAt;
}

SyncStatus immutableSnapshot() {
return SyncStatus(
connected: connected,
Expand Down
56 changes: 56 additions & 0 deletions packages/powersync_core/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,71 @@ final class SyncOptions {
/// When set to null, PowerSync defaults to a delay of 5 seconds.
final Duration? retryDelay;

/// The [SyncClientImplementation] to use.
final SyncClientImplementation syncImplementation;

const SyncOptions({
this.crudThrottleTime,
this.retryDelay,
this.params,
this.syncImplementation = SyncClientImplementation.defaultClient,
});

SyncOptions _copyWith({
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
}) {
return SyncOptions(
crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime,
retryDelay: retryDelay,
params: params ?? this.params,
syncImplementation: syncImplementation,
);
}
}

/// The PowerSync SDK offers two different implementations for receiving sync
/// lines: One handling most logic in Dart, and a newer one offloading that work
/// to the native PowerSync extension.
enum SyncClientImplementation {
/// A sync implementation that decodes and handles sync lines in Dart.
@Deprecated(
"Don't use SyncClientImplementation.dart directly, "
"use SyncClientImplementation.defaultClient instead.",
)
dart,

/// An experimental sync implementation that parses and handles sync lines in
/// the native PowerSync core extensions.
///
/// This implementation can be more performant than the Dart implementation,
/// and supports receiving sync lines in a more efficient format.
///
/// Note that this option is currently experimental.
@experimental
rust;

/// The default sync client implementation to use.
// ignore: deprecated_member_use_from_same_package
static const defaultClient = dart;
}

@internal
extension type ResolvedSyncOptions(SyncOptions source) {
factory ResolvedSyncOptions.resolve(
SyncOptions? source, {
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
}) {
return ResolvedSyncOptions((source ?? SyncOptions())._copyWith(
crudThrottleTime: crudThrottleTime,
retryDelay: retryDelay,
params: params,
));
}

Duration get crudThrottleTime =>
source.crudThrottleTime ?? const Duration(milliseconds: 10);

Expand Down
Loading
Loading