Skip to content

Commit db430d8

Browse files
committed
Refactor locks and retry logic around connect
1 parent 2b4430b commit db430d8

File tree

4 files changed

+63
-30
lines changed

4 files changed

+63
-30
lines changed

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
282282
StreamSubscription<void>? localUpdatesSubscription;
283283

284284
Future<void> shutdown() async {
285+
await openedStreamingSync?.abort();
286+
285287
localUpdatesSubscription?.cancel();
286288
db?.dispose();
287289
crudUpdateController.close();
@@ -291,9 +293,10 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
291293
// It needs to be closed before killing the isolate
292294
// in order to free the mutex for other operations.
293295
await mutex.close();
294-
await openedStreamingSync?.abort();
295-
296296
rPort.close();
297+
298+
// TODO: If we closed our resources properly, this wouldn't be necessary...
299+
Isolate.current.kill();
297300
}
298301

299302
rPort.listen((message) async {
@@ -377,12 +380,12 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
377380
updateDebouncer ??=
378381
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
379382
});
380-
}, (error, stack) async {
383+
}, (error, stack) {
381384
// Properly dispose the database if an uncaught error occurs.
382385
// Unfortunately, this does not handle disposing while the database is opening.
383386
// This should be rare - any uncaught error is a bug. And in most cases,
384387
// it should occur after the database is already open.
385-
await shutdown();
388+
shutdown();
386389
throw error;
387390
});
388391
}

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -274,28 +274,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
274274
abort: thisConnectAborter,
275275
);
276276

277-
retryHandler();
277+
thisConnectAborter.onCompletion.whenComplete(retryHandler);
278278
}
279279

280280
// If the sync encounters a failure without being aborted, retry
281-
retryHandler = () {
282-
thisConnectAborter.onCompletion.then((_) async {
283-
_activeGroup.syncConnectMutex.lock(() async {
284-
// Still supposed to be active? (abort is only called within mutex)
285-
if (!thisConnectAborter.aborted) {
286-
// We only change _abortActiveSync after disconnecting, which resets
287-
// the abort controller.
288-
assert(identical(_abortActiveSync, thisConnectAborter));
289-
290-
// We need a new abort controller for this attempt
291-
_abortActiveSync = thisConnectAborter = AbortController();
292-
293-
logger.warning('Sync client failed, retrying...');
294-
await connectWithSyncLock();
295-
}
296-
});
281+
retryHandler = Zone.current.bindCallback(() async {
282+
_activeGroup.syncConnectMutex.lock(() async {
283+
// Is this still supposed to be active? (abort is only called within
284+
// mutex)
285+
if (!thisConnectAborter.aborted) {
286+
// We only change _abortActiveSync after disconnecting, which resets
287+
// the abort controller.
288+
assert(identical(_abortActiveSync, thisConnectAborter));
289+
290+
// We need a new abort controller for this attempt
291+
_abortActiveSync = thisConnectAborter = AbortController();
292+
293+
logger.warning('Sync client failed, retrying...');
294+
await connectWithSyncLock();
295+
}
297296
});
298-
};
297+
});
299298

300299
await _activeGroup.syncConnectMutex.lock(() async {
301300
// Disconnect a previous sync client, if one is active.

packages/powersync_core/test/schema_test.dart

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,11 @@ void main() {
176176
]),
177177
]);
178178

179-
try {
180-
powersync.updateSchema(schema2);
181-
} catch (e) {
182-
expect(
183-
e,
184-
isA<AssertionError>().having((e) => e.message, 'message',
185-
'Invalid characters in table name: #notworking'));
186-
}
179+
await expectLater(
180+
() => powersync.updateSchema(schema2),
181+
throwsA(isA<AssertionError>().having((e) => e.message, 'message',
182+
'Invalid characters in table name: #notworking')),
183+
);
187184
});
188185
});
189186

packages/powersync_core/test/streaming_sync_test.dart

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ library;
55
import 'dart:async';
66
import 'dart:math';
77

8+
import 'package:logging/logging.dart';
89
import 'package:powersync_core/powersync_core.dart';
910
import 'package:test/test.dart';
1011

@@ -27,6 +28,39 @@ void main() {
2728
await testUtils.cleanDb(path: path);
2829
});
2930

31+
test('repeated connect and disconnect calls', () async {
32+
final random = Random();
33+
final server = await createServer();
34+
final ignoreLogger = Logger.detached('powersync.test');
35+
36+
final pdb =
37+
await testUtils.setupPowerSync(path: path, logger: ignoreLogger);
38+
pdb.retryDelay = Duration(milliseconds: 5000);
39+
final connector = TestConnector(() async {
40+
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
41+
});
42+
43+
Duration nextDelay() {
44+
return Duration(milliseconds: random.nextInt(100));
45+
}
46+
47+
Future<void> connectAndDisconnect() async {
48+
for (var i = 0; i < 10; i++) {
49+
await Future<void>.delayed(nextDelay());
50+
await pdb.connect(connector: connector);
51+
52+
await Future<void>.delayed(nextDelay());
53+
await pdb.disconnect();
54+
}
55+
}
56+
57+
// Create a bunch of tasks calling connect and disconnect() concurrently.
58+
await Future.wait([for (var i = 0; i < 10; i++) connectAndDisconnect()]);
59+
60+
expect(server.maxConnectionCount, lessThanOrEqualTo(1));
61+
server.close();
62+
});
63+
3064
test('full powersync reconnect', () async {
3165
// Test repeatedly creating new PowerSync connections, then disconnect
3266
// and close the connection.

0 commit comments

Comments
 (0)