From cea388bbacf1af3377718c8e6fb9164b513842aa Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 09:24:04 +0200 Subject: [PATCH 1/5] Start integrating Rust extension --- .../lib/src/sync/bucket_storage.dart | 16 ++ .../lib/src/sync/instruction.dart | 147 ++++++++++++++++++ .../lib/src/sync/mutable_sync_status.dart | 15 ++ .../lib/src/sync/streaming_sync.dart | 103 +++++++++++- .../lib/src/sync/sync_status.dart | 3 + .../lib/src/web/web_bucket_storage.dart | 5 + 6 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 packages/powersync_core/lib/src/sync/instruction.dart diff --git a/packages/powersync_core/lib/src/sync/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart index 5f6e15ff..d6145918 100644 --- a/packages/powersync_core/lib/src/sync/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name }); } + Future control(String op, [Object? payload]) async { + return await writeTransaction( + (tx) async { + final [row] = + await tx.execute('SELECT powersync_control(?, ?)', [op, payload]); + return row.columnAt(0) as String; + }, + // We flush when powersync_control yields an instruction to do so. + flush: false, + ); + } + + Future flushFileSystem() async { + // Noop outside of web. + } + /// Note: The asynchronous nature of this is due to this needing a global /// lock. The actual database operations are still synchronous, and it /// is assumed that multiple functions on this instance won't be called diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart new file mode 100644 index 00000000..c5c7d32a --- /dev/null +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -0,0 +1,147 @@ +import 'sync_status.dart'; + +/// An internal instruction emitted by the sync client in the core extension in +/// response to the Dart SDK passing sync data into the extension. +sealed class Instruction { + factory Instruction.fromJson(Map json) { + return switch (json) { + {'LogLine': final logLine} => + LogLine.fromJson(logLine as Map), + {'UpdateSyncStatus': final updateStatus} => + UpdateSyncStatus.fromJson(updateStatus as Map), + {'EstablishSyncStream': final establish} => + EstablishSyncStream.fromJson(establish as Map), + {'FetchCredentials': final creds} => + FetchCredentials.fromJson(creds as Map), + {'CloseSyncStream': _} => const CloseSyncStream(), + {'FlushFileSystem': _} => const FlushFileSystem(), + {'DidCompleteSync': _} => const DidCompleteSync(), + _ => UnknownSyncInstruction(json) + }; + } +} + +final class LogLine implements Instruction { + final String severity; + final String line; + + LogLine({required this.severity, required this.line}); + + factory LogLine.fromJson(Map json) { + return LogLine( + severity: json['severity'] as String, + line: json['line'] as String, + ); + } +} + +final class EstablishSyncStream implements Instruction { + final Map request; + + EstablishSyncStream(this.request); + + factory EstablishSyncStream.fromJson(Map json) { + return EstablishSyncStream(json['request'] as Map); + } +} + +final class UpdateSyncStatus implements Instruction { + final CoreSyncStatus status; + + UpdateSyncStatus({required this.status}); + + factory UpdateSyncStatus.fromJson(Map json) { + return UpdateSyncStatus( + status: + CoreSyncStatus.fromJson(json['status'] as Map)); + } +} + +final class CoreSyncStatus { + final bool connected; + final bool connecting; + final List priorityStatus; + final DownloadProgress? downloading; + + CoreSyncStatus({ + required this.connected, + required this.connecting, + required this.priorityStatus, + required this.downloading, + }); + + factory CoreSyncStatus.fromJson(Map json) { + return CoreSyncStatus( + connected: json['connected'] as bool, + connecting: json['connecting'] as bool, + priorityStatus: [ + for (final entry in json['priority_status'] as List) + _priorityStatusFromJson(entry as Map) + ], + downloading: switch (json['downloading']) { + null => null, + final raw as Map => DownloadProgress.fromJson(raw), + }, + ); + } + + static SyncPriorityStatus _priorityStatusFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + hasSynced: json['has_synced'] as bool?, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final lastSyncedAt as int => + DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000), + }, + ); + } +} + +final class DownloadProgress { + final Map progress; + + DownloadProgress(this.progress); + + factory DownloadProgress.fromJson(Map line) { + return DownloadProgress(line.map((k, v) => + MapEntry(k, _bucketProgressFromJson(v as Map)))); + } + + static BucketProgress _bucketProgressFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + atLast: json['at_last'] as int, + sinceLast: json['since_last'] as int, + targetCount: json['target_count'] as int, + ); + } +} + +final class FetchCredentials implements Instruction { + final bool didExpire; + + FetchCredentials(this.didExpire); + + factory FetchCredentials.fromJson(Map line) { + return FetchCredentials(line['did_expire'] as bool); + } +} + +final class CloseSyncStream implements Instruction { + const CloseSyncStream(); +} + +final class FlushFileSystem implements Instruction { + const FlushFileSystem(); +} + +final class DidCompleteSync implements Instruction { + const DidCompleteSync(); +} + +final class UnknownSyncInstruction implements Instruction { + final Map source; + + UnknownSyncInstruction(this.source); +} diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 6da63654..51d010d4 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; +import 'package:powersync_core/src/sync/instruction.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; @@ -79,6 +80,20 @@ final class MutableSyncStatus { } } + void applyFromCore(CoreSyncStatus status) { + connected = status.connected; + connecting = status.connecting; + downloading = status.downloading != null; + priorityStatusEntries = status.priorityStatus; + downloadProgress = switch (status.downloading) { + null => null, + final downloading => InternalSyncDownloadProgress(downloading.progress), + }; + lastSyncedAt = status.priorityStatus + .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) + ?.lastSyncedAt; + } + SyncStatus immutableSnapshot() { return SyncStatus( connected: connected, diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 24284751..a0c3e250 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -1,8 +1,10 @@ import 'dart:async'; import 'dart:convert' as convert; +import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; +import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -13,7 +15,7 @@ import 'package:sqlite_async/mutex.dart'; import 'bucket_storage.dart'; import '../crud.dart'; - +import 'instruction.dart'; import 'internal_connector.dart'; import 'mutable_sync_status.dart'; import 'stream_utils.dart'; @@ -137,7 +139,12 @@ class StreamingSyncImplementation implements StreamingSync { } // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() { - return _streamingSyncIteration(); + switch (options.source.syncImplementation) { + case SyncClientImplementation.dart: + return _dartStreamingSyncIteration(); + case SyncClientImplementation.rust: + return _rustStreamingSyncIteration(); + } }, timeout: _retryDelay); } catch (e, stacktrace) { if (aborted && e is http.ClientException) { @@ -238,6 +245,7 @@ class StreamingSyncImplementation implements StreamingSync { } assert(identical(_activeCrudUpload, completer)); + _nonLineSyncEvents.add(const UploadCompleted()); _activeCrudUpload = null; completer.complete(); }); @@ -281,6 +289,10 @@ class StreamingSyncImplementation implements StreamingSync { }); } + Future _rustStreamingSyncIteration() async { + await _ActiveRustStreamingIteration(this).syncIteration(); + } + Future<(List, Map)> _collectLocalBucketState() async { final bucketEntries = await adapter.getBucketStates(); @@ -295,7 +307,7 @@ class StreamingSyncImplementation implements StreamingSync { return (initialRequests, localDescriptions); } - Future _streamingSyncIteration() async { + Future _dartStreamingSyncIteration() async { var (bucketRequests, bucketMap) = await _collectLocalBucketState(); if (aborted) { return; @@ -565,6 +577,91 @@ typedef BucketDescription = ({ int priority, }); +final class _ActiveRustStreamingIteration { + final StreamingSyncImplementation sync; + + StreamSubscription? _completedUploads; + final Completer _completedStream = Completer(); + + _ActiveRustStreamingIteration(this.sync); + + Future syncIteration() async { + try { + await _control('start', convert.json.encode(sync.options.params)); + assert(_completedStream.isCompleted, 'Should have started streaming'); + await _completedStream.future; + } finally { + _completedUploads?.cancel(); + await _stop(); + } + } + + Stream _receiveLines(Object? data) { + return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new); + } + + Future _handleLines(EstablishSyncStream request) async { + final events = addBroadcast( + _receiveLines(request.request), sync._nonLineSyncEvents.stream); + + listen: + await for (final event in events) { + switch (event) { + case ReceivedLine(line: final Uint8List line): + await _control('line_binary', line); + case ReceivedLine(line: final line as String): + await _control('line_text', line); + case UploadCompleted(): + await _control('completed_upload'); + case TokenRefreshComplete(): + await _control('refreshed_token'); + case AbortRequested(): + break listen; + } + } + } + + Future _stop() => _control('stop'); + + Future _control(String operation, [Object? payload]) async { + final rawResponse = await sync.adapter.control(operation, payload); + final instructions = convert.json.decode(rawResponse) as List; + + for (final instruction in instructions) { + await _handleInstruction( + Instruction.fromJson(instruction as Map)); + } + } + + Future _handleInstruction(Instruction instruction) async { + switch (instruction) { + case LogLine(:final severity, :final line): + sync.logger.log( + switch (severity) { + 'DEBUG' => Level.FINE, + 'INFO' => Level.INFO, + _ => Level.WARNING, + }, + line); + case EstablishSyncStream(): + _completedStream.complete(_handleLines(instruction)); + case UpdateSyncStatus(:final status): + sync._state.updateStatus((m) => m.applyFromCore(status)); + case FetchCredentials(): + // TODO: Handle this case. + throw UnimplementedError(); + case CloseSyncStream(): + sync._nonLineSyncEvents.add(AbortRequested()); + case FlushFileSystem(): + await sync.adapter.flushFileSystem(); + case DidCompleteSync(): + sync._state.updateStatus((m) => m.downloadError = null); + case UnknownSyncInstruction(:final source): + sync.logger.warning('Unknown instruction: $source'); + } + } +} + sealed class SyncEvent {} final class ReceivedLine implements SyncEvent { diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index fceece50..9c1fcfba 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -189,6 +189,9 @@ extension type const BucketPriority._(int priorityNumber) { /// A [Comparator] instance suitable for comparing [BucketPriority] values. static int comparator(BucketPriority a, BucketPriority b) => -a.priorityNumber.compareTo(b.priorityNumber); + + /// The priority used by PowerSync to indicate that a full sync was completed. + static const fullSyncPriority = BucketPriority._(2147483647); } /// Partial information about the synchronization status for buckets within a diff --git a/packages/powersync_core/lib/src/web/web_bucket_storage.dart b/packages/powersync_core/lib/src/web/web_bucket_storage.dart index 4ba46b07..a430d569 100644 --- a/packages/powersync_core/lib/src/web/web_bucket_storage.dart +++ b/packages/powersync_core/lib/src/web/web_bucket_storage.dart @@ -17,4 +17,9 @@ class WebBucketStorage extends BucketStorage { return _webDb.writeTransaction(callback, lockTimeout: lockTimeout, flush: flush); } + + @override + Future flushFileSystem() { + return _webDb.flush(); + } } From 1421bb61c476b20de8ab0bb9ad4baa64a928a27b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 14:29:19 +0200 Subject: [PATCH 2/5] Add new sync implementation --- .../powersync_core/lib/src/sync/options.dart | 30 ++++++++++++++++++ .../lib/src/sync/streaming_sync.dart | 31 ++++++++++++++----- .../lib/src/web/sync_worker.dart | 4 +++ .../lib/src/web/sync_worker_protocol.dart | 5 ++- 4 files changed, 61 insertions(+), 9 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index f13f7e38..f9017dec 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -24,13 +24,43 @@ final class SyncOptions { /// When set to null, PowerSync defaults to a delay of 5 seconds. final Duration? retryDelay; + /// The [SyncClientImplementation] to use. + final SyncClientImplementation syncImplementation; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, + this.syncImplementation = SyncClientImplementation.defaultClient, }); } +/// The PowerSync SDK offers two different implementations for receiving sync +/// lines: One handling most logic in Dart, and a newer one offloading that work +/// to the native PowerSync extension. +enum SyncClientImplementation { + /// A sync implementation that decodes and handles sync lines in Dart. + @Deprecated( + "Don't use SyncClientImplementation.dart directly, " + "use SyncClientImplementation.defaultClient instead.", + ) + dart, + + /// An experimental sync implementation that parses and handles sync lines in + /// the native PowerSync core extensions. + /// + /// This implementation can be more performant than the Dart implementation, + /// and supports receiving sync lines in a more efficient format. + /// + /// Note that this option is currently experimental. + @experimental + rust; + + /// The default sync client implementation to use. + // ignore: deprecated_member_use_from_same_package + static const defaultClient = dart; +} + @internal extension type ResolvedSyncOptions(SyncOptions source) { Duration get crudThrottleTime => diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index a0c3e250..d793fdf4 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -4,7 +4,6 @@ import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; -import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -444,6 +443,7 @@ class StreamingSyncImplementation implements StreamingSync { case UploadCompleted(): // Only relevant for the Rust sync implementation. break; + case AbortCurrentIteration(): case TokenRefreshComplete(): // We have a new token, so stop the iteration. haveInvalidated = true; @@ -579,6 +579,7 @@ typedef BucketDescription = ({ final class _ActiveRustStreamingIteration { final StreamingSyncImplementation sync; + var _isActive = true; StreamSubscription? _completedUploads; final Completer _completedStream = Completer(); @@ -591,6 +592,7 @@ final class _ActiveRustStreamingIteration { assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { + _isActive = true; _completedUploads?.cancel(); await _stop(); } @@ -604,7 +606,7 @@ final class _ActiveRustStreamingIteration { final events = addBroadcast( _receiveLines(request.request), sync._nonLineSyncEvents.stream); - listen: + loop: await for (final event in events) { switch (event) { case ReceivedLine(line: final Uint8List line): @@ -613,10 +615,10 @@ final class _ActiveRustStreamingIteration { await _control('line_text', line); case UploadCompleted(): await _control('completed_upload'); + case AbortCurrentIteration(): + break loop; case TokenRefreshComplete(): await _control('refreshed_token'); - case AbortRequested(): - break listen; } } } @@ -647,11 +649,20 @@ final class _ActiveRustStreamingIteration { _completedStream.complete(_handleLines(instruction)); case UpdateSyncStatus(:final status): sync._state.updateStatus((m) => m.applyFromCore(status)); - case FetchCredentials(): - // TODO: Handle this case. - throw UnimplementedError(); + case FetchCredentials(:final didExpire): + if (didExpire) { + await sync.connector.prefetchCredentials(invalidate: true); + } else { + sync.connector.prefetchCredentials().then((_) { + if (_isActive && !sync.aborted) { + sync._nonLineSyncEvents.add(const TokenRefreshComplete()); + } + }, onError: (Object e, StackTrace s) { + sync.logger.warning('Could not prefetch credentials', e, s); + }); + } case CloseSyncStream(): - sync._nonLineSyncEvents.add(AbortRequested()); + sync._nonLineSyncEvents.add(const AbortCurrentIteration()); case FlushFileSystem(): await sync.adapter.flushFileSystem(); case DidCompleteSync(): @@ -677,3 +688,7 @@ final class UploadCompleted implements SyncEvent { final class TokenRefreshComplete implements SyncEvent { const TokenRefreshComplete(); } + +final class AbortCurrentIteration implements SyncEvent { + const AbortCurrentIteration(); +} diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index e2443e23..b5e8ed63 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -80,6 +80,10 @@ class _ConnectedClient { final encodedParams => jsonDecode(encodedParams) as Map, }, + syncImplementation: switch (request.implementationName) { + null => SyncClientImplementation.defaultClient, + final name => SyncClientImplementation.values.byName(name), + }, ); _runner = _worker.referenceSyncTask( diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index f4de6cc9..2b859e53 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -69,7 +69,8 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String databaseName, required int crudThrottleTimeMs, required int requestId, - required int? retryDelayMs, + required int retryDelayMs, + required String implementationName, String? syncParamsEncoded, }); @@ -77,6 +78,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external int get requestId; external int get crudThrottleTimeMs; external int? get retryDelayMs; + external String? get implementationName; external String? get syncParamsEncoded; } @@ -417,6 +419,7 @@ final class WorkerCommunicationChannel { crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds, retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, + implementationName: options.source.syncImplementation.name, syncParamsEncoded: switch (options.source.params) { null => null, final params => jsonEncode(params), From 88923e9917065ad77457d166d33a69018ca918b5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 16:20:45 +0200 Subject: [PATCH 3/5] Use in-memory sync tests --- .../lib/src/sync/instruction.dart | 14 +- .../lib/src/sync/mutable_sync_status.dart | 2 +- .../lib/src/sync/streaming_sync.dart | 18 ++- .../test/in_memory_sync_test.dart | 136 ++++++++++-------- 4 files changed, 104 insertions(+), 66 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart index c5c7d32a..f0146e8e 100644 --- a/packages/powersync_core/lib/src/sync/instruction.dart +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -99,13 +99,19 @@ final class CoreSyncStatus { } final class DownloadProgress { - final Map progress; + final Map buckets; - DownloadProgress(this.progress); + DownloadProgress(this.buckets); factory DownloadProgress.fromJson(Map line) { - return DownloadProgress(line.map((k, v) => - MapEntry(k, _bucketProgressFromJson(v as Map)))); + final rawBuckets = line['buckets'] as Map; + + return DownloadProgress(rawBuckets.map((k, v) { + return MapEntry( + k, + _bucketProgressFromJson(v as Map), + ); + })); } static BucketProgress _bucketProgressFromJson(Map json) { diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 51d010d4..df49e1c2 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -87,7 +87,7 @@ final class MutableSyncStatus { priorityStatusEntries = status.priorityStatus; downloadProgress = switch (status.downloading) { null => null, - final downloading => InternalSyncDownloadProgress(downloading.progress), + final downloading => InternalSyncDownloadProgress(downloading.buckets), }; lastSyncedAt = status.priorityStatus .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index d793fdf4..c81f9929 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -139,6 +139,7 @@ class StreamingSyncImplementation implements StreamingSync { // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() { switch (options.source.syncImplementation) { + // ignore: deprecated_member_use_from_same_package case SyncClientImplementation.dart: return _dartStreamingSyncIteration(); case SyncClientImplementation.rust: @@ -568,7 +569,7 @@ String _syncErrorMessage(Object? error) { } else if (error is PowerSyncProtocolException) { return 'Protocol error'; } else { - return '${error.runtimeType}'; + return '${error.runtimeType}: $error'; } } @@ -592,7 +593,7 @@ final class _ActiveRustStreamingIteration { assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { - _isActive = true; + _isActive = false; _completedUploads?.cancel(); await _stop(); } @@ -608,6 +609,10 @@ final class _ActiveRustStreamingIteration { loop: await for (final event in events) { + if (!_isActive || sync.aborted) { + break; + } + switch (event) { case ReceivedLine(line: final Uint8List line): await _control('line_binary', line); @@ -623,7 +628,9 @@ final class _ActiveRustStreamingIteration { } } - Future _stop() => _control('stop'); + Future _stop() { + return _control('stop'); + } Future _control(String operation, [Object? payload]) async { final rawResponse = await sync.adapter.control(operation, payload); @@ -662,7 +669,10 @@ final class _ActiveRustStreamingIteration { }); } case CloseSyncStream(): - sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + if (!sync.aborted) { + _isActive = false; + sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + } case FlushFileSystem(): await sync.adapter.flushFileSystem(); case DidCompleteSync(): diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 3ab8ae76..9455bc94 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -16,9 +16,26 @@ import 'utils/in_memory_http.dart'; import 'utils/test_utils_impl.dart'; void main() { + _declareTests( + 'dart sync client', + SyncOptions( + // ignore: deprecated_member_use_from_same_package + syncImplementation: SyncClientImplementation.dart, + ), + ); + + _declareTests( + 'rust sync client', + SyncOptions( + syncImplementation: SyncClientImplementation.rust, + ), + ); +} + +void _declareTests(String name, SyncOptions options) { final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF; - group('in-memory sync tests', () { + group(name, () { late final testUtils = TestUtils(); late TestPowerSyncFactory factory; @@ -44,6 +61,7 @@ void main() { expiresAt: DateTime.now(), ); }, uploadData: (db) => uploadData(db)), + options: options, ); } @@ -107,6 +125,7 @@ void main() { }); await expectLater( status, emits(isSyncStatus(downloading: false, hasSynced: true))); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -122,65 +141,68 @@ void main() { isTrue); }); - test('can save independent buckets in same transaction', () async { - final status = await waitForConnection(); - - syncService.addLine({ - 'checkpoint': Checkpoint( - lastOpId: '0', - writeCheckpoint: null, - checksums: [ - BucketChecksum(bucket: 'a', checksum: 0, priority: 3), - BucketChecksum(bucket: 'b', checksum: 0, priority: 3), - ], - ) - }); - await expectLater(status, emits(isSyncStatus(downloading: true))); - - var commits = 0; - raw.commits.listen((_) => commits++); + // ignore: deprecated_member_use_from_same_package + if (options.syncImplementation == SyncClientImplementation.dart) { + test('can save independent buckets in same transaction', () async { + final status = await waitForConnection(); - syncService - ..addLine({ - 'data': { - 'bucket': 'a', - 'data': >[ - { - 'op_id': '1', - 'op': 'PUT', - 'object_type': 'a', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } - ], - } - }) - ..addLine({ - 'data': { - 'bucket': 'b', - 'data': >[ - { - 'op_id': '2', - 'op': 'PUT', - 'object_type': 'b', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', checksum: 0, priority: 3), + BucketChecksum(bucket: 'b', checksum: 0, priority: 3), ], - } + ) }); + await expectLater(status, emits(isSyncStatus(downloading: true))); - // Wait for the operations to be inserted. - while (raw.select('SELECT * FROM ps_oplog;').length < 2) { - await pumpEventQueue(); - } + var commits = 0; + raw.commits.listen((_) => commits++); - // The two buckets should have been inserted in a single transaction - // because the messages were received in quick succession. - expect(commits, 1); - }); + syncService + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }) + ..addLine({ + 'data': { + 'bucket': 'b', + 'data': >[ + { + 'op_id': '2', + 'op': 'PUT', + 'object_type': 'b', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + + // Wait for the operations to be inserted. + while (raw.select('SELECT * FROM ps_oplog;').length < 2) { + await pumpEventQueue(); + } + + // The two buckets should have been inserted in a single transaction + // because the messages were received in quick succession. + expect(commits, 1); + }); + } group('partial sync', () { test('updates sync state incrementally', () async { @@ -281,6 +303,7 @@ void main() { }); await database.waitForFirstSync(priority: BucketPriority(1)); expect(database.currentStatus.hasSynced, isFalse); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -485,7 +508,7 @@ void main() { }) async { await expectLater( status, - emits(isSyncStatus( + emitsThrough(isSyncStatus( downloading: true, downloadProgress: isSyncDownloadProgress( progress: total, @@ -644,7 +667,6 @@ void main() { await checkProgress(progress(8, 8), progress(10, 14)); addCheckpointComplete(0); - await checkProgress(progress(8, 8), progress(10, 14)); addDataLine('b', 4); await checkProgress(progress(8, 8), progress(14, 14)); From 814baded86b3416bc92e4011c3ea280ad48e264e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 May 2025 18:15:42 +0200 Subject: [PATCH 4/5] Properly copy options --- .../native/native_powersync_database.dart | 7 +++--- .../powersync_database_impl_stub.dart | 2 +- .../lib/src/database/powersync_db_mixin.dart | 11 ++++----- .../database/web/web_powersync_database.dart | 10 ++++---- .../powersync_core/lib/src/sync/options.dart | 23 +++++++++++++++++++ 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 01fddb76..2f846fd5 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }) async { @@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl SendPort? initPort; final hasInitPort = Completer(); final receivedIsolateExit = Completer(); - final resolved = ResolvedSyncOptions(options); Future waitForShutdown() async { // Only complete the abortion signal after the isolate shuts down. This @@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl final port = initPort = data[1] as SendPort; hasInitPort.complete(); var crudStream = database - .onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + .onChange(['ps_crud'], throttle: options.crudThrottleTime); crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); @@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl _PowerSyncDatabaseIsolateArgs( receiveMessages.sendPort, dbRef, - resolved, + options, crudMutex.shared, syncMutex.shared, ), diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 83498b17..2a795497 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 6db00e0d..62e2bec5 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -289,11 +289,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // the lock for the connection. await initialize(); - final resolvedOptions = SyncOptions( - crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime, - // ignore: deprecated_member_use_from_same_package - retryDelay: options?.retryDelay ?? retryDelay, - params: options?.params ?? params, + final resolvedOptions = ResolvedSyncOptions.resolve( + options, + crudThrottleTime: crudThrottleTime, + params: params, ); // ignore: deprecated_member_use_from_same_package @@ -362,7 +361,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 44fa01d9..6b40a6a2 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) async { - final resolved = ResolvedSyncOptions(options); - final storage = BucketStorage(database); StreamingSync sync; // Try using a shared worker for the synchronization implementation to avoid @@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl sync = await SyncWorkerHandle.start( database: this, connector: connector, - options: options, + options: options.source, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), ); } catch (e) { @@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl e, ); final crudStream = - database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + database.onChange(['ps_crud'], throttle: options.crudThrottleTime); sync = StreamingSyncImplementation( adapter: storage, connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, - options: resolved, + options: options, client: BrowserClient(), // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index f9017dec..130161f9 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -33,6 +33,18 @@ final class SyncOptions { this.params, this.syncImplementation = SyncClientImplementation.defaultClient, }); + + SyncOptions _copyWith({ + Duration? crudThrottleTime, + Map? params, + }) { + return SyncOptions( + crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime, + retryDelay: retryDelay, + params: params ?? this.params, + syncImplementation: syncImplementation, + ); + } } /// The PowerSync SDK offers two different implementations for receiving sync @@ -63,6 +75,17 @@ enum SyncClientImplementation { @internal extension type ResolvedSyncOptions(SyncOptions source) { + factory ResolvedSyncOptions.resolve( + SyncOptions? source, { + Duration? crudThrottleTime, + Map? params, + }) { + return ResolvedSyncOptions((source ?? SyncOptions())._copyWith( + crudThrottleTime: crudThrottleTime, + params: params, + )); + } + Duration get crudThrottleTime => source.crudThrottleTime ?? const Duration(milliseconds: 10); From d104fd2029968b8505a347a39c6b3711db3f9140 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 21 May 2025 09:43:08 +0200 Subject: [PATCH 5/5] Revert intended changes --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 2 ++ packages/powersync_core/lib/src/sync/mutable_sync_status.dart | 2 +- packages/powersync_core/lib/src/sync/options.dart | 3 +++ packages/powersync_core/lib/src/sync/streaming_sync.dart | 1 - 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 62e2bec5..808efc71 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -292,6 +292,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { final resolvedOptions = ResolvedSyncOptions.resolve( options, crudThrottleTime: crudThrottleTime, + // ignore: deprecated_member_use_from_same_package + retryDelay: retryDelay, params: params, ); diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index df49e1c2..23e3becb 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -1,8 +1,8 @@ import 'dart:async'; import 'package:collection/collection.dart'; -import 'package:powersync_core/src/sync/instruction.dart'; +import 'instruction.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index 130161f9..d9b0833b 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -36,6 +36,7 @@ final class SyncOptions { SyncOptions _copyWith({ Duration? crudThrottleTime, + Duration? retryDelay, Map? params, }) { return SyncOptions( @@ -78,10 +79,12 @@ extension type ResolvedSyncOptions(SyncOptions source) { factory ResolvedSyncOptions.resolve( SyncOptions? source, { Duration? crudThrottleTime, + Duration? retryDelay, Map? params, }) { return ResolvedSyncOptions((source ?? SyncOptions())._copyWith( crudThrottleTime: crudThrottleTime, + retryDelay: retryDelay, params: params, )); } diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index c81f9929..b2a4a935 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -245,7 +245,6 @@ class StreamingSyncImplementation implements StreamingSync { } assert(identical(_activeCrudUpload, completer)); - _nonLineSyncEvents.add(const UploadCompleted()); _activeCrudUpload = null; completer.complete(); });