diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 01fddb76..2f846fd5 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }) async { @@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl SendPort? initPort; final hasInitPort = Completer(); final receivedIsolateExit = Completer(); - final resolved = ResolvedSyncOptions(options); Future waitForShutdown() async { // Only complete the abortion signal after the isolate shuts down. This @@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl final port = initPort = data[1] as SendPort; hasInitPort.complete(); var crudStream = database - .onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + .onChange(['ps_crud'], throttle: options.crudThrottleTime); crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); @@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl _PowerSyncDatabaseIsolateArgs( receiveMessages.sendPort, dbRef, - resolved, + options, crudMutex.shared, syncMutex.shared, ), diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 83498b17..2a795497 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 6db00e0d..808efc71 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -289,11 +289,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // the lock for the connection. await initialize(); - final resolvedOptions = SyncOptions( - crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime, + final resolvedOptions = ResolvedSyncOptions.resolve( + options, + crudThrottleTime: crudThrottleTime, // ignore: deprecated_member_use_from_same_package - retryDelay: options?.retryDelay ?? retryDelay, - params: options?.params ?? params, + retryDelay: retryDelay, + params: params, ); // ignore: deprecated_member_use_from_same_package @@ -362,7 +363,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 44fa01d9..6b40a6a2 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) async { - final resolved = ResolvedSyncOptions(options); - final storage = BucketStorage(database); StreamingSync sync; // Try using a shared worker for the synchronization implementation to avoid @@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl sync = await SyncWorkerHandle.start( database: this, connector: connector, - options: options, + options: options.source, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), ); } catch (e) { @@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl e, ); final crudStream = - database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + database.onChange(['ps_crud'], throttle: options.crudThrottleTime); sync = StreamingSyncImplementation( adapter: storage, connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, - options: resolved, + options: options, client: BrowserClient(), // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync_core/lib/src/sync/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart index 5f6e15ff..d6145918 100644 --- a/packages/powersync_core/lib/src/sync/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name }); } + Future control(String op, [Object? payload]) async { + return await writeTransaction( + (tx) async { + final [row] = + await tx.execute('SELECT powersync_control(?, ?)', [op, payload]); + return row.columnAt(0) as String; + }, + // We flush when powersync_control yields an instruction to do so. + flush: false, + ); + } + + Future flushFileSystem() async { + // Noop outside of web. + } + /// Note: The asynchronous nature of this is due to this needing a global /// lock. The actual database operations are still synchronous, and it /// is assumed that multiple functions on this instance won't be called diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart new file mode 100644 index 00000000..f0146e8e --- /dev/null +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -0,0 +1,153 @@ +import 'sync_status.dart'; + +/// An internal instruction emitted by the sync client in the core extension in +/// response to the Dart SDK passing sync data into the extension. +sealed class Instruction { + factory Instruction.fromJson(Map json) { + return switch (json) { + {'LogLine': final logLine} => + LogLine.fromJson(logLine as Map), + {'UpdateSyncStatus': final updateStatus} => + UpdateSyncStatus.fromJson(updateStatus as Map), + {'EstablishSyncStream': final establish} => + EstablishSyncStream.fromJson(establish as Map), + {'FetchCredentials': final creds} => + FetchCredentials.fromJson(creds as Map), + {'CloseSyncStream': _} => const CloseSyncStream(), + {'FlushFileSystem': _} => const FlushFileSystem(), + {'DidCompleteSync': _} => const DidCompleteSync(), + _ => UnknownSyncInstruction(json) + }; + } +} + +final class LogLine implements Instruction { + final String severity; + final String line; + + LogLine({required this.severity, required this.line}); + + factory LogLine.fromJson(Map json) { + return LogLine( + severity: json['severity'] as String, + line: json['line'] as String, + ); + } +} + +final class EstablishSyncStream implements Instruction { + final Map request; + + EstablishSyncStream(this.request); + + factory EstablishSyncStream.fromJson(Map json) { + return EstablishSyncStream(json['request'] as Map); + } +} + +final class UpdateSyncStatus implements Instruction { + final CoreSyncStatus status; + + UpdateSyncStatus({required this.status}); + + factory UpdateSyncStatus.fromJson(Map json) { + return UpdateSyncStatus( + status: + CoreSyncStatus.fromJson(json['status'] as Map)); + } +} + +final class CoreSyncStatus { + final bool connected; + final bool connecting; + final List priorityStatus; + final DownloadProgress? downloading; + + CoreSyncStatus({ + required this.connected, + required this.connecting, + required this.priorityStatus, + required this.downloading, + }); + + factory CoreSyncStatus.fromJson(Map json) { + return CoreSyncStatus( + connected: json['connected'] as bool, + connecting: json['connecting'] as bool, + priorityStatus: [ + for (final entry in json['priority_status'] as List) + _priorityStatusFromJson(entry as Map) + ], + downloading: switch (json['downloading']) { + null => null, + final raw as Map => DownloadProgress.fromJson(raw), + }, + ); + } + + static SyncPriorityStatus _priorityStatusFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + hasSynced: json['has_synced'] as bool?, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final lastSyncedAt as int => + DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000), + }, + ); + } +} + +final class DownloadProgress { + final Map buckets; + + DownloadProgress(this.buckets); + + factory DownloadProgress.fromJson(Map line) { + final rawBuckets = line['buckets'] as Map; + + return DownloadProgress(rawBuckets.map((k, v) { + return MapEntry( + k, + _bucketProgressFromJson(v as Map), + ); + })); + } + + static BucketProgress _bucketProgressFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + atLast: json['at_last'] as int, + sinceLast: json['since_last'] as int, + targetCount: json['target_count'] as int, + ); + } +} + +final class FetchCredentials implements Instruction { + final bool didExpire; + + FetchCredentials(this.didExpire); + + factory FetchCredentials.fromJson(Map line) { + return FetchCredentials(line['did_expire'] as bool); + } +} + +final class CloseSyncStream implements Instruction { + const CloseSyncStream(); +} + +final class FlushFileSystem implements Instruction { + const FlushFileSystem(); +} + +final class DidCompleteSync implements Instruction { + const DidCompleteSync(); +} + +final class UnknownSyncInstruction implements Instruction { + final Map source; + + UnknownSyncInstruction(this.source); +} diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 6da63654..23e3becb 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; +import 'instruction.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; @@ -79,6 +80,20 @@ final class MutableSyncStatus { } } + void applyFromCore(CoreSyncStatus status) { + connected = status.connected; + connecting = status.connecting; + downloading = status.downloading != null; + priorityStatusEntries = status.priorityStatus; + downloadProgress = switch (status.downloading) { + null => null, + final downloading => InternalSyncDownloadProgress(downloading.buckets), + }; + lastSyncedAt = status.priorityStatus + .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) + ?.lastSyncedAt; + } + SyncStatus immutableSnapshot() { return SyncStatus( connected: connected, diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index f13f7e38..d9b0833b 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -24,15 +24,71 @@ final class SyncOptions { /// When set to null, PowerSync defaults to a delay of 5 seconds. final Duration? retryDelay; + /// The [SyncClientImplementation] to use. + final SyncClientImplementation syncImplementation; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, + this.syncImplementation = SyncClientImplementation.defaultClient, }); + + SyncOptions _copyWith({ + Duration? crudThrottleTime, + Duration? retryDelay, + Map? params, + }) { + return SyncOptions( + crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime, + retryDelay: retryDelay, + params: params ?? this.params, + syncImplementation: syncImplementation, + ); + } +} + +/// The PowerSync SDK offers two different implementations for receiving sync +/// lines: One handling most logic in Dart, and a newer one offloading that work +/// to the native PowerSync extension. +enum SyncClientImplementation { + /// A sync implementation that decodes and handles sync lines in Dart. + @Deprecated( + "Don't use SyncClientImplementation.dart directly, " + "use SyncClientImplementation.defaultClient instead.", + ) + dart, + + /// An experimental sync implementation that parses and handles sync lines in + /// the native PowerSync core extensions. + /// + /// This implementation can be more performant than the Dart implementation, + /// and supports receiving sync lines in a more efficient format. + /// + /// Note that this option is currently experimental. + @experimental + rust; + + /// The default sync client implementation to use. + // ignore: deprecated_member_use_from_same_package + static const defaultClient = dart; } @internal extension type ResolvedSyncOptions(SyncOptions source) { + factory ResolvedSyncOptions.resolve( + SyncOptions? source, { + Duration? crudThrottleTime, + Duration? retryDelay, + Map? params, + }) { + return ResolvedSyncOptions((source ?? SyncOptions())._copyWith( + crudThrottleTime: crudThrottleTime, + retryDelay: retryDelay, + params: params, + )); + } + Duration get crudThrottleTime => source.crudThrottleTime ?? const Duration(milliseconds: 10); diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 24284751..b2a4a935 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert' as convert; +import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; @@ -13,7 +14,7 @@ import 'package:sqlite_async/mutex.dart'; import 'bucket_storage.dart'; import '../crud.dart'; - +import 'instruction.dart'; import 'internal_connector.dart'; import 'mutable_sync_status.dart'; import 'stream_utils.dart'; @@ -137,7 +138,13 @@ class StreamingSyncImplementation implements StreamingSync { } // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() { - return _streamingSyncIteration(); + switch (options.source.syncImplementation) { + // ignore: deprecated_member_use_from_same_package + case SyncClientImplementation.dart: + return _dartStreamingSyncIteration(); + case SyncClientImplementation.rust: + return _rustStreamingSyncIteration(); + } }, timeout: _retryDelay); } catch (e, stacktrace) { if (aborted && e is http.ClientException) { @@ -281,6 +288,10 @@ class StreamingSyncImplementation implements StreamingSync { }); } + Future _rustStreamingSyncIteration() async { + await _ActiveRustStreamingIteration(this).syncIteration(); + } + Future<(List, Map)> _collectLocalBucketState() async { final bucketEntries = await adapter.getBucketStates(); @@ -295,7 +306,7 @@ class StreamingSyncImplementation implements StreamingSync { return (initialRequests, localDescriptions); } - Future _streamingSyncIteration() async { + Future _dartStreamingSyncIteration() async { var (bucketRequests, bucketMap) = await _collectLocalBucketState(); if (aborted) { return; @@ -432,6 +443,7 @@ class StreamingSyncImplementation implements StreamingSync { case UploadCompleted(): // Only relevant for the Rust sync implementation. break; + case AbortCurrentIteration(): case TokenRefreshComplete(): // We have a new token, so stop the iteration. haveInvalidated = true; @@ -556,7 +568,7 @@ String _syncErrorMessage(Object? error) { } else if (error is PowerSyncProtocolException) { return 'Protocol error'; } else { - return '${error.runtimeType}'; + return '${error.runtimeType}: $error'; } } @@ -565,6 +577,111 @@ typedef BucketDescription = ({ int priority, }); +final class _ActiveRustStreamingIteration { + final StreamingSyncImplementation sync; + var _isActive = true; + + StreamSubscription? _completedUploads; + final Completer _completedStream = Completer(); + + _ActiveRustStreamingIteration(this.sync); + + Future syncIteration() async { + try { + await _control('start', convert.json.encode(sync.options.params)); + assert(_completedStream.isCompleted, 'Should have started streaming'); + await _completedStream.future; + } finally { + _isActive = false; + _completedUploads?.cancel(); + await _stop(); + } + } + + Stream _receiveLines(Object? data) { + return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new); + } + + Future _handleLines(EstablishSyncStream request) async { + final events = addBroadcast( + _receiveLines(request.request), sync._nonLineSyncEvents.stream); + + loop: + await for (final event in events) { + if (!_isActive || sync.aborted) { + break; + } + + switch (event) { + case ReceivedLine(line: final Uint8List line): + await _control('line_binary', line); + case ReceivedLine(line: final line as String): + await _control('line_text', line); + case UploadCompleted(): + await _control('completed_upload'); + case AbortCurrentIteration(): + break loop; + case TokenRefreshComplete(): + await _control('refreshed_token'); + } + } + } + + Future _stop() { + return _control('stop'); + } + + Future _control(String operation, [Object? payload]) async { + final rawResponse = await sync.adapter.control(operation, payload); + final instructions = convert.json.decode(rawResponse) as List; + + for (final instruction in instructions) { + await _handleInstruction( + Instruction.fromJson(instruction as Map)); + } + } + + Future _handleInstruction(Instruction instruction) async { + switch (instruction) { + case LogLine(:final severity, :final line): + sync.logger.log( + switch (severity) { + 'DEBUG' => Level.FINE, + 'INFO' => Level.INFO, + _ => Level.WARNING, + }, + line); + case EstablishSyncStream(): + _completedStream.complete(_handleLines(instruction)); + case UpdateSyncStatus(:final status): + sync._state.updateStatus((m) => m.applyFromCore(status)); + case FetchCredentials(:final didExpire): + if (didExpire) { + await sync.connector.prefetchCredentials(invalidate: true); + } else { + sync.connector.prefetchCredentials().then((_) { + if (_isActive && !sync.aborted) { + sync._nonLineSyncEvents.add(const TokenRefreshComplete()); + } + }, onError: (Object e, StackTrace s) { + sync.logger.warning('Could not prefetch credentials', e, s); + }); + } + case CloseSyncStream(): + if (!sync.aborted) { + _isActive = false; + sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + } + case FlushFileSystem(): + await sync.adapter.flushFileSystem(); + case DidCompleteSync(): + sync._state.updateStatus((m) => m.downloadError = null); + case UnknownSyncInstruction(:final source): + sync.logger.warning('Unknown instruction: $source'); + } + } +} + sealed class SyncEvent {} final class ReceivedLine implements SyncEvent { @@ -580,3 +697,7 @@ final class UploadCompleted implements SyncEvent { final class TokenRefreshComplete implements SyncEvent { const TokenRefreshComplete(); } + +final class AbortCurrentIteration implements SyncEvent { + const AbortCurrentIteration(); +} diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index fceece50..9c1fcfba 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -189,6 +189,9 @@ extension type const BucketPriority._(int priorityNumber) { /// A [Comparator] instance suitable for comparing [BucketPriority] values. static int comparator(BucketPriority a, BucketPriority b) => -a.priorityNumber.compareTo(b.priorityNumber); + + /// The priority used by PowerSync to indicate that a full sync was completed. + static const fullSyncPriority = BucketPriority._(2147483647); } /// Partial information about the synchronization status for buckets within a diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index e2443e23..b5e8ed63 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -80,6 +80,10 @@ class _ConnectedClient { final encodedParams => jsonDecode(encodedParams) as Map, }, + syncImplementation: switch (request.implementationName) { + null => SyncClientImplementation.defaultClient, + final name => SyncClientImplementation.values.byName(name), + }, ); _runner = _worker.referenceSyncTask( diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index f4de6cc9..2b859e53 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -69,7 +69,8 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String databaseName, required int crudThrottleTimeMs, required int requestId, - required int? retryDelayMs, + required int retryDelayMs, + required String implementationName, String? syncParamsEncoded, }); @@ -77,6 +78,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external int get requestId; external int get crudThrottleTimeMs; external int? get retryDelayMs; + external String? get implementationName; external String? get syncParamsEncoded; } @@ -417,6 +419,7 @@ final class WorkerCommunicationChannel { crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds, retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, + implementationName: options.source.syncImplementation.name, syncParamsEncoded: switch (options.source.params) { null => null, final params => jsonEncode(params), diff --git a/packages/powersync_core/lib/src/web/web_bucket_storage.dart b/packages/powersync_core/lib/src/web/web_bucket_storage.dart index 4ba46b07..a430d569 100644 --- a/packages/powersync_core/lib/src/web/web_bucket_storage.dart +++ b/packages/powersync_core/lib/src/web/web_bucket_storage.dart @@ -17,4 +17,9 @@ class WebBucketStorage extends BucketStorage { return _webDb.writeTransaction(callback, lockTimeout: lockTimeout, flush: flush); } + + @override + Future flushFileSystem() { + return _webDb.flush(); + } } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 3ab8ae76..9455bc94 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -16,9 +16,26 @@ import 'utils/in_memory_http.dart'; import 'utils/test_utils_impl.dart'; void main() { + _declareTests( + 'dart sync client', + SyncOptions( + // ignore: deprecated_member_use_from_same_package + syncImplementation: SyncClientImplementation.dart, + ), + ); + + _declareTests( + 'rust sync client', + SyncOptions( + syncImplementation: SyncClientImplementation.rust, + ), + ); +} + +void _declareTests(String name, SyncOptions options) { final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF; - group('in-memory sync tests', () { + group(name, () { late final testUtils = TestUtils(); late TestPowerSyncFactory factory; @@ -44,6 +61,7 @@ void main() { expiresAt: DateTime.now(), ); }, uploadData: (db) => uploadData(db)), + options: options, ); } @@ -107,6 +125,7 @@ void main() { }); await expectLater( status, emits(isSyncStatus(downloading: false, hasSynced: true))); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -122,65 +141,68 @@ void main() { isTrue); }); - test('can save independent buckets in same transaction', () async { - final status = await waitForConnection(); - - syncService.addLine({ - 'checkpoint': Checkpoint( - lastOpId: '0', - writeCheckpoint: null, - checksums: [ - BucketChecksum(bucket: 'a', checksum: 0, priority: 3), - BucketChecksum(bucket: 'b', checksum: 0, priority: 3), - ], - ) - }); - await expectLater(status, emits(isSyncStatus(downloading: true))); - - var commits = 0; - raw.commits.listen((_) => commits++); + // ignore: deprecated_member_use_from_same_package + if (options.syncImplementation == SyncClientImplementation.dart) { + test('can save independent buckets in same transaction', () async { + final status = await waitForConnection(); - syncService - ..addLine({ - 'data': { - 'bucket': 'a', - 'data': >[ - { - 'op_id': '1', - 'op': 'PUT', - 'object_type': 'a', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } - ], - } - }) - ..addLine({ - 'data': { - 'bucket': 'b', - 'data': >[ - { - 'op_id': '2', - 'op': 'PUT', - 'object_type': 'b', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', checksum: 0, priority: 3), + BucketChecksum(bucket: 'b', checksum: 0, priority: 3), ], - } + ) }); + await expectLater(status, emits(isSyncStatus(downloading: true))); - // Wait for the operations to be inserted. - while (raw.select('SELECT * FROM ps_oplog;').length < 2) { - await pumpEventQueue(); - } + var commits = 0; + raw.commits.listen((_) => commits++); - // The two buckets should have been inserted in a single transaction - // because the messages were received in quick succession. - expect(commits, 1); - }); + syncService + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }) + ..addLine({ + 'data': { + 'bucket': 'b', + 'data': >[ + { + 'op_id': '2', + 'op': 'PUT', + 'object_type': 'b', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + + // Wait for the operations to be inserted. + while (raw.select('SELECT * FROM ps_oplog;').length < 2) { + await pumpEventQueue(); + } + + // The two buckets should have been inserted in a single transaction + // because the messages were received in quick succession. + expect(commits, 1); + }); + } group('partial sync', () { test('updates sync state incrementally', () async { @@ -281,6 +303,7 @@ void main() { }); await database.waitForFirstSync(priority: BucketPriority(1)); expect(database.currentStatus.hasSynced, isFalse); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -485,7 +508,7 @@ void main() { }) async { await expectLater( status, - emits(isSyncStatus( + emitsThrough(isSyncStatus( downloading: true, downloadProgress: isSyncDownloadProgress( progress: total, @@ -644,7 +667,6 @@ void main() { await checkProgress(progress(8, 8), progress(10, 14)); addCheckpointComplete(0); - await checkProgress(progress(8, 8), progress(10, 14)); addDataLine('b', 4); await checkProgress(progress(8, 8), progress(14, 14));