Skip to content

Commit 2b4430b

Browse files
committed
Refactor connect/disconnect logic
1 parent 12d6188 commit 2b4430b

File tree

6 files changed

+228
-185
lines changed

6 files changed

+228
-185
lines changed

packages/powersync_core/lib/src/abort_controller.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ class AbortController {
1414
return _abortRequested.future;
1515
}
1616

17+
Future<void> get onCompletion {
18+
return _abortCompleter.future;
19+
}
20+
1721
/// Abort, and wait until aborting is complete.
1822
Future<void> abort() async {
1923
aborted = true;

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 98 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import 'package:powersync_core/src/log_internal.dart';
1515
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
1616
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
1717
import 'package:powersync_core/src/schema.dart';
18-
import 'package:powersync_core/src/schema_logic.dart';
1918
import 'package:powersync_core/src/streaming_sync.dart';
2019
import 'package:powersync_core/src/sync_status.dart';
2120
import 'package:sqlite_async/sqlite3_common.dart';
@@ -109,42 +108,55 @@ class PowerSyncDatabaseImpl
109108
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
110109
PowerSyncDatabaseImpl.withDatabase(
111110
{required this.schema, required this.database, Logger? logger}) {
112-
if (logger != null) {
113-
this.logger = logger;
114-
} else {
115-
this.logger = autoLogger;
116-
}
111+
this.logger = logger ?? autoLogger;
117112
isInitialized = baseInit();
118113
}
119114

120115
@override
121116
@internal
122-
123-
/// Connect to the PowerSync service, and keep the databases in sync.
124-
///
125-
/// The connection is automatically re-opened if it fails for any reason.
126-
///
127-
/// Status changes are reported on [statusStream].
128-
baseConnect(
129-
{required PowerSyncBackendConnector connector,
130-
131-
/// Throttle time between CRUD operations
132-
/// Defaults to 10 milliseconds.
133-
required Duration crudThrottleTime,
134-
required Future<void> Function() reconnect,
135-
Map<String, dynamic>? params}) async {
117+
Future<void> connectInternal({
118+
required PowerSyncBackendConnector connector,
119+
required Duration crudThrottleTime,
120+
required AbortController abort,
121+
Map<String, dynamic>? params,
122+
}) async {
136123
await initialize();
137-
138-
// Disconnect if connected
139-
await disconnect();
140-
final disconnector = AbortController();
141-
disconnecter = disconnector;
142-
143-
await isInitialized;
144124
final dbRef = database.isolateConnectionFactory();
145-
ReceivePort rPort = ReceivePort();
125+
126+
Isolate? isolate;
146127
StreamSubscription<UpdateNotification>? crudUpdateSubscription;
147-
rPort.listen((data) async {
128+
final receiveMessages = ReceivePort();
129+
final receiveUnhandledErrors = ReceivePort();
130+
final receiveExit = ReceivePort();
131+
132+
SendPort? initPort;
133+
final hasInitPort = Completer<void>();
134+
final receivedIsolateExit = Completer<void>();
135+
136+
Future<void> waitForShutdown() async {
137+
// Only complete the abortion signal after the isolate shuts down. This
138+
// ensures absolutely no trace of this sync iteration remains.
139+
if (isolate != null) {
140+
await receivedIsolateExit.future;
141+
}
142+
143+
// Cleanup
144+
crudUpdateSubscription?.cancel();
145+
receiveMessages.close();
146+
receiveUnhandledErrors.close();
147+
receiveExit.close();
148+
149+
// Clear status apart from lastSyncedAt
150+
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
151+
abort.completeAbort();
152+
}
153+
154+
Future<void> close() async {
155+
initPort?.send(['close']);
156+
await waitForShutdown();
157+
}
158+
159+
receiveMessages.listen((data) async {
148160
if (data is List) {
149161
String action = data[0] as String;
150162
if (action == "getCredentials") {
@@ -159,27 +171,20 @@ class PowerSyncDatabaseImpl
159171
await connector.prefetchCredentials();
160172
});
161173
} else if (action == 'init') {
162-
SendPort port = data[1] as SendPort;
174+
final port = initPort = data[1] as SendPort;
175+
hasInitPort.complete();
163176
var crudStream =
164177
database.onChange(['ps_crud'], throttle: crudThrottleTime);
165178
crudUpdateSubscription = crudStream.listen((event) {
166179
port.send(['update']);
167180
});
168-
disconnector.onAbort.then((_) {
169-
port.send(['close']);
170-
}).ignore();
171181
} else if (action == 'uploadCrud') {
172182
await (data[1] as PortCompleter).handle(() async {
173183
await connector.uploadData(this);
174184
});
175185
} else if (action == 'status') {
176186
final SyncStatus status = data[1] as SyncStatus;
177187
setStatus(status);
178-
} else if (action == 'close') {
179-
// Clear status apart from lastSyncedAt
180-
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
181-
rPort.close();
182-
crudUpdateSubscription?.cancel();
183188
} else if (action == 'log') {
184189
LogRecord record = data[1] as LogRecord;
185190
logger.log(
@@ -188,8 +193,7 @@ class PowerSyncDatabaseImpl
188193
}
189194
});
190195

191-
var errorPort = ReceivePort();
192-
errorPort.listen((message) async {
196+
receiveUnhandledErrors.listen((message) async {
193197
// Sample error:
194198
// flutter: [PowerSync] WARNING: 2023-06-28 16:34:11.566122: Sync Isolate error
195199
// flutter: [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
@@ -200,38 +204,37 @@ class PowerSyncDatabaseImpl
200204
// ...
201205
logger.severe('Sync Isolate error', message);
202206

203-
// Reconnect
204-
// Use the param like this instead of directly calling connect(), to avoid recursive
205-
// locks in some edge cases.
206-
reconnect();
207+
// Fatal errors are enabled, so the isolate will exit soon, causing us to
208+
// complete the abort controller which will make the db mixin reconnect if
209+
// necessary. There's no need to reconnect manually.
207210
});
208211

209-
disconnected() {
210-
disconnector.completeAbort();
211-
disconnecter = null;
212-
rPort.close();
213-
// Clear status apart from lastSyncedAt
214-
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
212+
// Don't spawn isolate if this operation was cancelled already.
213+
if (abort.aborted) {
214+
return waitForShutdown();
215215
}
216216

217-
var exitPort = ReceivePort();
218-
exitPort.listen((message) {
217+
receiveExit.listen((message) {
219218
logger.fine('Sync Isolate exit');
220-
disconnected();
219+
receivedIsolateExit.complete();
221220
});
222221

223-
if (disconnecter?.aborted == true) {
224-
disconnected();
225-
return;
226-
}
227-
228-
Isolate.spawn(
229-
_powerSyncDatabaseIsolate,
230-
_PowerSyncDatabaseIsolateArgs(
231-
rPort.sendPort, dbRef, retryDelay, clientParams),
232-
debugName: 'PowerSyncDatabase',
233-
onError: errorPort.sendPort,
234-
onExit: exitPort.sendPort);
222+
// Spawning the isolate can't be interrupted
223+
isolate = await Isolate.spawn(
224+
_syncIsolate,
225+
_PowerSyncDatabaseIsolateArgs(
226+
receiveMessages.sendPort, dbRef, retryDelay, clientParams),
227+
debugName: 'Sync ${database.openFactory.path}',
228+
onError: receiveUnhandledErrors.sendPort,
229+
errorsAreFatal: true,
230+
onExit: receiveExit.sendPort,
231+
);
232+
await hasInitPort.future;
233+
234+
abort.onAbort.whenComplete(close);
235+
236+
// Automatically complete the abort controller once the isolate exits.
237+
unawaited(waitForShutdown());
235238
}
236239

237240
/// Takes a read lock, without starting a transaction.
@@ -255,16 +258,6 @@ class PowerSyncDatabaseImpl
255258
return database.writeLock(callback,
256259
debugContext: debugContext, lockTimeout: lockTimeout);
257260
}
258-
259-
@override
260-
Future<void> updateSchema(Schema schema) {
261-
if (disconnecter != null) {
262-
throw AssertionError('Cannot update schema while connected');
263-
}
264-
schema.validate();
265-
this.schema = schema;
266-
return updateSchemaInIsolate(database, schema);
267-
}
268261
}
269262

270263
class _PowerSyncDatabaseIsolateArgs {
@@ -277,64 +270,70 @@ class _PowerSyncDatabaseIsolateArgs {
277270
this.sPort, this.dbRef, this.retryDelay, this.parameters);
278271
}
279272

280-
Future<void> _powerSyncDatabaseIsolate(
281-
_PowerSyncDatabaseIsolateArgs args) async {
273+
Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
282274
final sPort = args.sPort;
283-
ReceivePort rPort = ReceivePort();
275+
final rPort = ReceivePort();
284276
StreamController<String> crudUpdateController = StreamController.broadcast();
285277
final upstreamDbClient = args.dbRef.upstreamPort.open();
286278

287279
CommonDatabase? db;
288280
final Mutex mutex = args.dbRef.mutex.open();
289281
StreamingSyncImplementation? openedStreamingSync;
282+
StreamSubscription<void>? localUpdatesSubscription;
283+
284+
Future<void> shutdown() async {
285+
localUpdatesSubscription?.cancel();
286+
db?.dispose();
287+
crudUpdateController.close();
288+
upstreamDbClient.close();
289+
290+
// The SyncSqliteConnection uses this mutex
291+
// It needs to be closed before killing the isolate
292+
// in order to free the mutex for other operations.
293+
await mutex.close();
294+
await openedStreamingSync?.abort();
295+
296+
rPort.close();
297+
}
290298

291299
rPort.listen((message) async {
292300
if (message is List) {
293301
String action = message[0] as String;
294302
if (action == 'update') {
295-
crudUpdateController.add('update');
303+
if (!crudUpdateController.isClosed) {
304+
crudUpdateController.add('update');
305+
}
296306
} else if (action == 'close') {
297-
// The SyncSqliteConnection uses this mutex
298-
// It needs to be closed before killing the isolate
299-
// in order to free the mutex for other operations.
300-
await mutex.close();
301-
db?.dispose();
302-
crudUpdateController.close();
303-
upstreamDbClient.close();
304-
// Abort any open http requests, and wait for it to be closed properly
305-
await openedStreamingSync?.abort();
306-
// No kill the Isolate
307-
Isolate.current.kill();
307+
await shutdown();
308308
}
309309
}
310310
});
311-
Isolate.current.addOnExitListener(sPort, response: const ['close']);
312-
sPort.send(["init", rPort.sendPort]);
311+
sPort.send(['init', rPort.sendPort]);
313312

314313
// Is there a way to avoid the overhead if logging is not enabled?
315314
// This only takes effect in this isolate.
316315
isolateLogger.level = Level.ALL;
317316
isolateLogger.onRecord.listen((record) {
318317
var copy = LogRecord(record.level, record.message, record.loggerName,
319318
record.error, record.stackTrace);
320-
sPort.send(["log", copy]);
319+
sPort.send(['log', copy]);
321320
});
322321

323322
Future<PowerSyncCredentials?> loadCredentials() async {
324323
final r = IsolateResult<PowerSyncCredentials?>();
325-
sPort.send(["getCredentials", r.completer]);
324+
sPort.send(['getCredentials', r.completer]);
326325
return r.future;
327326
}
328327

329328
Future<void> invalidateCredentials() async {
330329
final r = IsolateResult<void>();
331-
sPort.send(["invalidateCredentials", r.completer]);
330+
sPort.send(['invalidateCredentials', r.completer]);
332331
return r.future;
333332
}
334333

335334
Future<void> uploadCrud() async {
336335
final r = IsolateResult<void>();
337-
sPort.send(["uploadCrud", r.completer]);
336+
sPort.send(['uploadCrud', r.completer]);
338337
return r.future;
339338
}
340339

@@ -372,18 +371,18 @@ Future<void> _powerSyncDatabaseIsolate(
372371
}
373372
}
374373

375-
db!.updates.listen((event) {
374+
localUpdatesSubscription = db!.updates.listen((event) {
376375
updatedTables.add(event.tableName);
377376

378377
updateDebouncer ??=
379378
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
380379
});
381-
}, (error, stack) {
380+
}, (error, stack) async {
382381
// Properly dispose the database if an uncaught error occurs.
383382
// Unfortunately, this does not handle disposing while the database is opening.
384383
// This should be rare - any uncaught error is a bug. And in most cases,
385384
// it should occur after the database is already open.
386-
db?.dispose();
385+
await shutdown();
387386
throw error;
388387
});
389388
}

packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:async';
33
import 'package:logging/logging.dart';
44
import 'package:meta/meta.dart';
55
import 'package:powersync_core/sqlite_async.dart';
6+
import 'package:powersync_core/src/abort_controller.dart';
67
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
78
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
89
import 'powersync_database.dart';
@@ -24,6 +25,9 @@ class PowerSyncDatabaseImpl
2425
@override
2526
Schema get schema => throw UnimplementedError();
2627

28+
@override
29+
set schema(Schema s) => throw UnimplementedError();
30+
2731
@override
2832
SqliteDatabase get database => throw UnimplementedError();
2933

@@ -101,20 +105,15 @@ class PowerSyncDatabaseImpl
101105
throw UnimplementedError();
102106
}
103107

104-
@override
105-
Future<void> updateSchema(Schema schema) {
106-
throw UnimplementedError();
107-
}
108-
109108
@override
110109
Logger get logger => throw UnimplementedError();
111110

112111
@override
113112
@internal
114-
Future<void> baseConnect(
113+
Future<void> connectInternal(
115114
{required PowerSyncBackendConnector connector,
116115
required Duration crudThrottleTime,
117-
required Future<void> Function() reconnect,
116+
required AbortController abort,
118117
Map<String, dynamic>? params}) {
119118
throw UnimplementedError();
120119
}

0 commit comments

Comments
 (0)