diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 6f6cad4..cb1814b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -35,19 +35,19 @@ jobs: include: - sqlite_version: "3440200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3440200.tar.gz" - dart_sdk: 3.5.0 + dart_sdk: stable - sqlite_version: "3430200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3430200.tar.gz" - dart_sdk: 3.5.0 + dart_sdk: stable - sqlite_version: "3420000" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3420000.tar.gz" - dart_sdk: 3.5.0 + dart_sdk: stable - sqlite_version: "3410100" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3410100.tar.gz" - dart_sdk: 3.5.0 + dart_sdk: stable - sqlite_version: "3380000" sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" - dart_sdk: 3.5.0 + dart_sdk: stable steps: - uses: actions/checkout@v3 - uses: dart-lang/setup-dart@v1 @@ -68,3 +68,4 @@ jobs: run: | export LD_LIBRARY_PATH=$(pwd)/sqlite-autoconf-${{ matrix.sqlite_version }}/.libs melos test + melos test_build diff --git a/CHANGELOG.md b/CHANGELOG.md index aba9311..d9cda07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,66 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-06-03 + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.11.7`](#sqlite_async---v0117) + +--- + +#### `sqlite_async` - `v0.11.7` + +- Shared worker: Release locks owned by connected client tab when it closes. +- Fix web concurrency issues: Consistently apply a shared mutex or let a shared + worker coordinate access. + +## 2025-05-28 + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.11.6`](#sqlite_async---v0116) + +--- + +#### `sqlite_async` - `v0.11.6` + +- Native: Consistently report errors when opening the database instead of + causing unhandled exceptions. + +## 2025-05-22 + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.11.5`](#sqlite_async---v0115) + +--- + +#### `sqlite_async` - `v0.11.5` + +- Allow profiling queries. Queries are profiled by default in debug and profile builds, the runtime + for queries is added to profiling timelines under the `sqlite_async` tag. +- Fix cancelling `watch()` queries sometimes taking longer than necessary. +- Fix web databases not respecting lock timeouts. + ## 2024-11-06 ### Changes diff --git a/melos.yaml b/melos.yaml index 8de8bfe..4555e43 100644 --- a/melos.yaml +++ b/melos.yaml @@ -41,7 +41,7 @@ scripts: test: description: Run tests in a specific package. - run: dart test -p chrome,vm + run: dart test -p chrome,vm --compiler dart2js,dart2wasm exec: concurrency: 1 packageFilters: @@ -51,3 +51,11 @@ scripts: # as they could change the behaviour of how tests filter packages. env: MELOS_TEST: true + + test_build: + description: Runs tests with build_test + run: dart run build_runner test -- -p chrome + exec: + concurrency: 1 + packageFilters: + dependsOn: build_test diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index 4c23e85..b101ac4 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.2.2 + +- Fix write detection when using UPDATE/INSERT/DELETE with RETURNING in raw queries. + +## 0.2.1 + +- Fix lints. + ## 0.2.0 - Automatically run Drift migrations diff --git a/analysis_options.yaml b/packages/drift_sqlite_async/analysis_options.yaml similarity index 100% rename from analysis_options.yaml rename to packages/drift_sqlite_async/analysis_options.yaml diff --git a/packages/drift_sqlite_async/lib/drift_sqlite_async.dart b/packages/drift_sqlite_async/lib/drift_sqlite_async.dart index f842c5b..83d04fc 100644 --- a/packages/drift_sqlite_async/lib/drift_sqlite_async.dart +++ b/packages/drift_sqlite_async/lib/drift_sqlite_async.dart @@ -1,4 +1,2 @@ -library drift_sqlite_async; - export './src/connection.dart'; export './src/executor.dart'; diff --git a/packages/drift_sqlite_async/lib/src/executor.dart b/packages/drift_sqlite_async/lib/src/executor.dart index 0727dd8..41886f3 100644 --- a/packages/drift_sqlite_async/lib/src/executor.dart +++ b/packages/drift_sqlite_async/lib/src/executor.dart @@ -1,14 +1,15 @@ import 'dart:async'; import 'package:drift/backends.dart'; -import 'package:drift/src/runtime/query_builder/query_builder.dart'; +import 'package:drift/drift.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; // Ends with " RETURNING *", or starts with insert/update/delete. // Drift-generated queries will always have the RETURNING *. // The INSERT/UPDATE/DELETE check is for custom queries, and is not exhaustive. -final _returningCheck = RegExp(r'( RETURNING \*;?$)|(^(INSERT|UPDATE|DELETE))', +final _returningCheck = RegExp( + r'( RETURNING \*;?\s*$)|(^\s*(INSERT|UPDATE|DELETE))', caseSensitive: false); class _SqliteAsyncDelegate extends _SqliteAsyncQueryDelegate @@ -19,6 +20,7 @@ class _SqliteAsyncDelegate extends _SqliteAsyncQueryDelegate _SqliteAsyncDelegate(this.db) : super(db, db.writeLock); + @override bool isInTransaction = false; // unused @override diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index f88dccb..62a64da 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.0 +version: 0.2.2 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -16,10 +16,12 @@ environment: dependencies: drift: ">=2.19.0 <3.0.0" sqlite_async: ^0.11.0 + dev_dependencies: build_runner: ^2.4.8 drift_dev: ">=2.19.0 <3.0.0" glob: ^2.1.2 + lints: ^5.0.0 sqlite3: ^2.4.0 test: ^1.25.2 test_api: ^0.7.0 diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index 1b33562..339cc04 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -1,9 +1,12 @@ // TODO @TestOn('!browser') +library; + import 'dart:async'; import 'package:drift/drift.dart'; import 'package:drift_sqlite_async/drift_sqlite_async.dart'; +import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; @@ -217,5 +220,29 @@ void main() { .data, equals({'count': 1})); }); + + test('cannot update database with read', () async { + await expectLater(() => dbu.customSelect(''' +-- trick to circumvent regex detecting writes +INSERT INTO test_data(description) VALUES('test data'); +''').get(), throwsA(isA())); + }); + + test('allows spaces after returning', () async { + // This tests that the statement is forwarded to the write connection + // despite using customSelect(). If it wasn't, we'd get an error about + // the database being read-only. + final row = await dbu.customSelect( + 'INSERT INTO test_data(description) VALUES(?) RETURNING * ', + variables: [Variable('Test Data')]).getSingle(); + expect(row.data['description'], equals('Test Data')); + }); + + test('allows spaces before insert', () async { + final row = await dbu.customSelect( + ' INSERT INTO test_data(description) VALUES(?) ', + variables: [Variable('Test Data')]).get(); + expect(row, isEmpty); + }); }); } diff --git a/packages/drift_sqlite_async/test/db_test.dart b/packages/drift_sqlite_async/test/db_test.dart index 372f0fa..bda09df 100644 --- a/packages/drift_sqlite_async/test/db_test.dart +++ b/packages/drift_sqlite_async/test/db_test.dart @@ -1,5 +1,7 @@ // TODO @TestOn('!browser') +library; + import 'package:drift/drift.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; @@ -104,5 +106,16 @@ void main() { '[]' ])); }); + + test('delete returns affected rows', () async { + for (var i = 0; i < 10; i++) { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'desc $i')); + } + + final deleted = await dbu.delete(dbu.todoItems).go(); + expect(deleted, 10); + }); }); } diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index 7747be4..928c7dd 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -21,7 +21,7 @@ class TodoDatabase extends _$TodoDatabase { } class TodosMigrationDatabase extends TodoDatabase { - TodosMigrationDatabase(SqliteConnection db) : super(db); + TodosMigrationDatabase(super.db); @override MigrationStrategy get migration { diff --git a/packages/drift_sqlite_async/test/migration_test.dart b/packages/drift_sqlite_async/test/migration_test.dart index e05c212..ca9cb3f 100644 --- a/packages/drift_sqlite_async/test/migration_test.dart +++ b/packages/drift_sqlite_async/test/migration_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index b49c6ab..97dbf50 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,37 @@ +## 0.11.7 + +- Shared worker: Release locks owned by connected client tab when it closes. +- Fix web concurrency issues: Consistently apply a shared mutex or let a shared + worker coordinate access. + +## 0.11.6 + +- Native: Consistently report errors when opening the database instead of + causing unhandled exceptions. + +## 0.11.5 + +- Allow profiling queries. Queries are profiled by default in debug and profile builds, the runtime + for queries is added to profiling timelines under the `sqlite_async` tag. +- Fix cancelling `watch()` queries sometimes taking longer than necessary. +- Fix web databases not respecting lock timeouts. + +## 0.11.4 + +- Add `SqliteConnection.synchronousWrapper` and `SqliteDatabase.singleConnection`. + Together, these can be used to wrap raw `CommonDatabase` instances from `package:sqlite3` + as a `Database` (without an automated worker or isolate setup). This can be useful in tests + where synchronous access to the underlying database is convenient. + +## 0.11.3 + +- Support being compiled with `package:build_web_compilers`. + +## 0.11.2 + +- Support latest version of `package:sqlite3_web`. +- Support `dart2wasm`. + ## 0.11.1 - Remove remaining `dart:js_util` imports in favor of new interop APIs. diff --git a/packages/sqlite_async/analysis_options.yaml b/packages/sqlite_async/analysis_options.yaml new file mode 100644 index 0000000..572dd23 --- /dev/null +++ b/packages/sqlite_async/analysis_options.yaml @@ -0,0 +1 @@ +include: package:lints/recommended.yaml diff --git a/packages/sqlite_async/build.yaml b/packages/sqlite_async/build.yaml new file mode 100644 index 0000000..0774cc7 --- /dev/null +++ b/packages/sqlite_async/build.yaml @@ -0,0 +1,7 @@ +targets: + $default: + builders: + build_web_compilers:entrypoint: + options: + # Workers can't be compiled with dartdevc, so use dart2js for the example + compiler: dart2js diff --git a/packages/sqlite_async/example/web/index.html b/packages/sqlite_async/example/web/index.html new file mode 100644 index 0000000..4bc69de --- /dev/null +++ b/packages/sqlite_async/example/web/index.html @@ -0,0 +1,29 @@ + + + + + + sqlite_async web demo + + + + + +

sqlite_async demo

+ +
+This page is used to test the sqlite_async package on the web. +Use the console to open and interact with databases. + +
+
+const db = await open('test.db');
+const lock = await write_lock(db);
+release_lock(lock);
+
+
+
+ + + + diff --git a/packages/sqlite_async/example/web/main.dart b/packages/sqlite_async/example/web/main.dart new file mode 100644 index 0000000..95bd1f1 --- /dev/null +++ b/packages/sqlite_async/example/web/main.dart @@ -0,0 +1,41 @@ +import 'dart:async'; +import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; + +import 'package:sqlite_async/sqlite_async.dart'; + +void main() { + globalContext['open'] = (String path) { + return Future(() async { + final db = SqliteDatabase( + path: path, + options: SqliteOptions( + webSqliteOptions: WebSqliteOptions( + wasmUri: + 'https://cdn.jsdelivr.net/npm/@powersync/dart-wasm-bundles@latest/dist/sqlite3.wasm', + workerUri: 'worker.dart.js', + ), + ), + ); + await db.initialize(); + return db.toJSBox; + }).toJS; + }.toJS; + + globalContext['write_lock'] = (JSBoxedDartObject db) { + final hasLock = Completer(); + final completer = Completer(); + + (db.toDart as SqliteDatabase).writeLock((_) async { + print('has write lock!'); + hasLock.complete(); + await completer.future; + }); + + return hasLock.future.then((_) => completer.toJSBox).toJS; + }.toJS; + + globalContext['release_lock'] = (JSBoxedDartObject db) { + (db.toDart as Completer).complete(); + }.toJS; +} diff --git a/packages/sqlite_async/example/web/worker.dart b/packages/sqlite_async/example/web/worker.dart new file mode 100644 index 0000000..481455d --- /dev/null +++ b/packages/sqlite_async/example/web/worker.dart @@ -0,0 +1,6 @@ +import 'package:sqlite_async/sqlite3_web.dart'; +import 'package:sqlite_async/sqlite3_web_worker.dart'; + +void main() { + WebSqlite.workerEntrypoint(controller: AsyncSqliteController()); +} diff --git a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart index f29520f..f63e0df 100644 --- a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart @@ -1,12 +1,18 @@ +import 'dart:developer'; + import 'package:sqlite3/common.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; +import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/profiler.dart'; + +import '../../impl/context.dart'; /// A simple "synchronous" connection which provides the async SqliteConnection /// implementation using a synchronous SQLite connection -class SyncSqliteConnection extends SqliteConnection with SqliteQueries { +class SyncSqliteConnection with SqliteQueries implements SqliteConnection { final CommonDatabase db; late Mutex mutex; @override @@ -14,7 +20,15 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { bool _closed = false; - SyncSqliteConnection(this.db, Mutex m) { + /// Whether queries should be added to the `dart:developer` timeline. + /// + /// This is enabled by default outside of release builds, see + /// [SqliteOptions.profileQueries] for details. + final bool profileQueries; + + SyncSqliteConnection(this.db, Mutex m, {bool? profileQueries}) + : profileQueries = + profileQueries ?? const SqliteOptions().profileQueries { mutex = m.open(); updates = db.updates.map( (event) { @@ -26,15 +40,37 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) { - return mutex.lock(() => callback(SyncReadContext(db)), - timeout: lockTimeout); + final task = profileQueries ? TimelineTask() : null; + task?.start('${profilerPrefix}mutex_lock'); + + return mutex.lock( + () { + task?.finish(); + return ScopedReadContext.assumeReadLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); + }, + timeout: lockTimeout, + ); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { - return mutex.lock(() => callback(SyncWriteContext(db)), - timeout: lockTimeout); + final task = profileQueries ? TimelineTask() : null; + task?.start('${profilerPrefix}mutex_lock'); + + return mutex.lock( + () { + task?.finish(); + return ScopedWriteContext.assumeWriteLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); + }, + timeout: lockTimeout, + ); } @override @@ -52,10 +88,13 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { } } -class SyncReadContext implements SqliteReadContext { +final class _UnsafeSyncContext extends UnscopedContext { + final TimelineTask? task; + CommonDatabase db; - SyncReadContext(this.db); + _UnsafeSyncContext(this.db, {TimelineTask? parent}) + : task = TimelineTask(parent: parent); @override Future computeWithDatabase( @@ -65,13 +104,23 @@ class SyncReadContext implements SqliteReadContext { @override Future get(String sql, [List parameters = const []]) async { - return db.select(sql, parameters).first; + return task.timeSync( + 'get', + () => db.select(sql, parameters).first, + sql: sql, + parameters: parameters, + ); } @override Future getAll(String sql, [List parameters = const []]) async { - return db.select(sql, parameters); + return task.timeSync( + 'getAll', + () => db.select(sql, parameters), + sql: sql, + parameters: parameters, + ); } @override @@ -88,29 +137,31 @@ class SyncReadContext implements SqliteReadContext { Future getAutoCommit() async { return db.autocommit; } -} - -class SyncWriteContext extends SyncReadContext implements SqliteWriteContext { - SyncWriteContext(super.db); @override Future execute(String sql, [List parameters = const []]) async { - return db.select(sql, parameters); + return task.timeSync( + 'execute', + () => db.select(sql, parameters), + sql: sql, + parameters: parameters, + ); } @override Future executeBatch( String sql, List> parameterSets) async { - return computeWithDatabase((db) async { + task.timeSync('executeBatch', () { final statement = db.prepare(sql, checkNoTail: true); try { for (var parameters in parameterSets) { - statement.execute(parameters); + task.timeSync('iteration', () => statement.execute(parameters), + parameters: parameters); } } finally { statement.dispose(); } - }); + }, sql: sql); } } diff --git a/packages/sqlite_async/lib/src/common/port_channel.dart b/packages/sqlite_async/lib/src/common/port_channel.dart index 8b05feb..fc5e69c 100644 --- a/packages/sqlite_async/lib/src/common/port_channel.dart +++ b/packages/sqlite_async/lib/src/common/port_channel.dart @@ -1,352 +1,2 @@ -import 'dart:async'; -import 'dart:collection'; -import 'dart:isolate'; - -abstract class PortClient { - Future post(Object message); - void fire(Object message); - - factory PortClient.parent() { - return ParentPortClient(); - } - - factory PortClient.child(SendPort upstream) { - return ChildPortClient(upstream); - } -} - -class ParentPortClient implements PortClient { - late Future sendPortFuture; - SendPort? sendPort; - final ReceivePort _receivePort = ReceivePort(); - final ReceivePort _errorPort = ReceivePort(); - bool closed = false; - Object? _closeError; - String? _isolateDebugName; - int _nextId = 1; - - Map> handlers = HashMap(); - - ParentPortClient() { - final initCompleter = Completer.sync(); - sendPortFuture = initCompleter.future; - sendPortFuture.then((value) { - sendPort = value; - }); - _receivePort.listen((message) { - if (message is _InitMessage) { - assert(!initCompleter.isCompleted); - initCompleter.complete(message.port); - } else if (message is _PortChannelResult) { - final handler = handlers.remove(message.requestId); - assert(handler != null); - if (message.success) { - handler!.complete(message.result); - } else { - handler!.completeError(message.error, message.stackTrace); - } - } else if (message == _closeMessage) { - close(); - } - }, onError: (e) { - if (!initCompleter.isCompleted) { - initCompleter.completeError(e); - } - - close(); - }, onDone: () { - if (!initCompleter.isCompleted) { - initCompleter.completeError(ClosedException()); - } - close(); - }); - _errorPort.listen((message) { - final [error, stackTraceString] = message; - final stackTrace = stackTraceString == null - ? null - : StackTrace.fromString(stackTraceString); - if (!initCompleter.isCompleted) { - initCompleter.completeError(error, stackTrace); - } - _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), - stackTrace); - }); - } - - Future get ready async { - await sendPortFuture; - } - - void _cancelAll(Object error, [StackTrace? stackTrace]) { - var handlers = this.handlers; - this.handlers = {}; - for (var message in handlers.values) { - message.completeError(error, stackTrace); - } - } - - @override - Future post(Object message) async { - if (closed) { - throw _closeError ?? const ClosedException(); - } - var completer = Completer.sync(); - var id = _nextId++; - handlers[id] = completer; - final port = sendPort ?? await sendPortFuture; - port.send(_RequestMessage(id, message, null)); - return await completer.future; - } - - @override - void fire(Object message) async { - if (closed) { - throw _closeError ?? ClosedException(); - } - final port = sendPort ?? await sendPortFuture; - port.send(_FireMessage(message)); - } - - RequestPortServer server() { - return RequestPortServer(_receivePort.sendPort); - } - - void _close([Object? error, StackTrace? stackTrace]) { - if (!closed) { - closed = true; - - _receivePort.close(); - _errorPort.close(); - if (error == null) { - _cancelAll(const ClosedException()); - } else { - _closeError = error; - _cancelAll(error, stackTrace); - } - } - } - - void close() { - _close(); - } - - tieToIsolate(Isolate isolate) { - _isolateDebugName = isolate.debugName; - isolate.addErrorListener(_errorPort.sendPort); - isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); - } -} - -class SerializedPortClient { - final SendPort sendPort; - - SerializedPortClient(this.sendPort); - - ChildPortClient open() { - return ChildPortClient(sendPort); - } -} - -class ChildPortClient implements PortClient { - final SendPort sendPort; - final ReceivePort receivePort = ReceivePort(); - int _nextId = 1; - bool closed = false; - - final Map> handlers = HashMap(); - - ChildPortClient(this.sendPort) { - receivePort.listen((message) { - if (message is _PortChannelResult) { - final handler = handlers.remove(message.requestId); - assert(handler != null); - if (message.success) { - handler!.complete(message.result); - } else { - handler!.completeError(message.error, message.stackTrace); - } - } - }); - } - - @override - Future post(Object message) async { - if (closed) { - throw const ClosedException(); - } - var completer = Completer.sync(); - var id = _nextId++; - handlers[id] = completer; - sendPort.send(_RequestMessage(id, message, receivePort.sendPort)); - return await completer.future; - } - - @override - void fire(Object message) { - if (closed) { - throw ClosedException(); - } - sendPort.send(_FireMessage(message)); - } - - void _cancelAll(Object error) { - var handlers = HashMap>.from(this.handlers); - this.handlers.clear(); - for (var message in handlers.values) { - message.completeError(error); - } - } - - void close() { - closed = true; - _cancelAll(const ClosedException()); - receivePort.close(); - } -} - -class RequestPortServer { - final SendPort port; - - RequestPortServer(this.port); - - open(Future Function(Object? message) handle) { - return PortServer.forSendPort(port, handle); - } -} - -class PortServer { - final ReceivePort _receivePort = ReceivePort(); - final Future Function(Object? message) handle; - final SendPort? replyPort; - - PortServer(this.handle) : replyPort = null { - _init(); - } - - PortServer.forSendPort(SendPort port, this.handle) : replyPort = port { - port.send(_InitMessage(_receivePort.sendPort)); - _init(); - } - - SendPort get sendPort { - return _receivePort.sendPort; - } - - SerializedPortClient client() { - return SerializedPortClient(sendPort); - } - - void close() { - _receivePort.close(); - } - - void _init() { - _receivePort.listen((request) async { - if (request is _FireMessage) { - handle(request.message); - } else if (request is _RequestMessage) { - if (request.id == 0) { - // Fire and forget - handle(request.message); - } else { - final replyPort = request.reply ?? this.replyPort; - try { - var result = await handle(request.message); - replyPort!.send(_PortChannelResult.success(request.id, result)); - } catch (e, stacktrace) { - replyPort! - .send(_PortChannelResult.error(request.id, e, stacktrace)); - } - } - } - }); - } -} - -const _closeMessage = '_Close'; - -class _InitMessage { - final SendPort port; - - _InitMessage(this.port); -} - -class _FireMessage { - final Object message; - - const _FireMessage(this.message); -} - -class _RequestMessage { - final int id; - final Object message; - final SendPort? reply; - - _RequestMessage(this.id, this.message, this.reply); -} - -class ClosedException implements Exception { - const ClosedException(); - - @override - String toString() { - return 'ClosedException'; - } -} - -class IsolateError extends Error { - final Object cause; - final String? isolateDebugName; - - IsolateError({required this.cause, this.isolateDebugName}); - - @override - String toString() { - if (isolateDebugName != null) { - return 'IsolateError in $isolateDebugName: $cause'; - } else { - return 'IsolateError: $cause'; - } - } -} - -class _PortChannelResult { - final int requestId; - final bool success; - final T? _result; - final Object? _error; - final StackTrace? stackTrace; - - const _PortChannelResult.success(this.requestId, T result) - : success = true, - _error = null, - stackTrace = null, - _result = result; - const _PortChannelResult.error(this.requestId, Object error, - [this.stackTrace]) - : success = false, - _result = null, - _error = error; - - T get value { - if (success) { - return _result as T; - } else { - if (_error != null && stackTrace != null) { - Error.throwWithStackTrace(_error, stackTrace!); - } else { - throw _error!; - } - } - } - - T get result { - assert(success); - return _result as T; - } - - Object get error { - assert(!success); - return _error!; - } -} +export 'port_channel_native.dart' + if (dart.library.js_interop) 'port_channel_stub.dart'; diff --git a/packages/sqlite_async/lib/src/common/port_channel_native.dart b/packages/sqlite_async/lib/src/common/port_channel_native.dart new file mode 100644 index 0000000..bb8318e --- /dev/null +++ b/packages/sqlite_async/lib/src/common/port_channel_native.dart @@ -0,0 +1,350 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:isolate'; + +abstract class PortClient { + Future post(Object message); + void fire(Object message); + + factory PortClient.parent() { + return ParentPortClient(); + } + + factory PortClient.child(SendPort upstream) { + return ChildPortClient(upstream); + } +} + +class ParentPortClient implements PortClient { + late Future sendPortFuture; + SendPort? sendPort; + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); + bool closed = false; + Object? _closeError; + String? _isolateDebugName; + int _nextId = 1; + + Map> handlers = HashMap(); + + ParentPortClient() { + final initCompleter = Completer.sync(); + sendPortFuture = initCompleter.future; + _receivePort.listen((message) { + if (message is _InitMessage) { + assert(!initCompleter.isCompleted); + sendPort = message.port; + initCompleter.complete(message.port); + } else if (message is _PortChannelResult) { + final handler = handlers.remove(message.requestId); + assert(handler != null); + if (message.success) { + handler!.complete(message.result); + } else { + handler!.completeError(message.error, message.stackTrace); + } + } else if (message == _closeMessage) { + close(); + } + }, onError: (e) { + if (!initCompleter.isCompleted) { + initCompleter.completeError(e); + } + + close(); + }, onDone: () { + if (!initCompleter.isCompleted) { + initCompleter.completeError(ClosedException()); + } + close(); + }); + _errorPort.listen((message) { + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); + if (!initCompleter.isCompleted) { + initCompleter.completeError(error, stackTrace); + } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); + }); + } + + Future get ready async { + await sendPortFuture; + } + + void _cancelAll(Object error, [StackTrace? stackTrace]) { + var handlers = this.handlers; + this.handlers = {}; + for (var message in handlers.values) { + message.completeError(error, stackTrace); + } + } + + @override + Future post(Object message) async { + if (closed) { + throw _closeError ?? const ClosedException(); + } + var completer = Completer.sync(); + var id = _nextId++; + handlers[id] = completer; + final port = sendPort ?? await sendPortFuture; + port.send(_RequestMessage(id, message, null)); + return await completer.future; + } + + @override + void fire(Object message) async { + if (closed) { + throw _closeError ?? ClosedException(); + } + final port = sendPort ?? await sendPortFuture; + port.send(_FireMessage(message)); + } + + RequestPortServer server() { + return RequestPortServer(_receivePort.sendPort); + } + + void _close([Object? error, StackTrace? stackTrace]) { + if (!closed) { + closed = true; + + _receivePort.close(); + _errorPort.close(); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } + } + } + + void close() { + _close(); + } + + tieToIsolate(Isolate isolate) { + _isolateDebugName = isolate.debugName; + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); + } +} + +class SerializedPortClient { + final SendPort sendPort; + + SerializedPortClient(this.sendPort); + + ChildPortClient open() { + return ChildPortClient(sendPort); + } +} + +class ChildPortClient implements PortClient { + final SendPort sendPort; + final ReceivePort receivePort = ReceivePort(); + int _nextId = 1; + bool closed = false; + + final Map> handlers = HashMap(); + + ChildPortClient(this.sendPort) { + receivePort.listen((message) { + if (message is _PortChannelResult) { + final handler = handlers.remove(message.requestId); + assert(handler != null); + if (message.success) { + handler!.complete(message.result); + } else { + handler!.completeError(message.error, message.stackTrace); + } + } + }); + } + + @override + Future post(Object message) async { + if (closed) { + throw const ClosedException(); + } + var completer = Completer.sync(); + var id = _nextId++; + handlers[id] = completer; + sendPort.send(_RequestMessage(id, message, receivePort.sendPort)); + return await completer.future; + } + + @override + void fire(Object message) { + if (closed) { + throw ClosedException(); + } + sendPort.send(_FireMessage(message)); + } + + void _cancelAll(Object error) { + var handlers = HashMap>.from(this.handlers); + this.handlers.clear(); + for (var message in handlers.values) { + message.completeError(error); + } + } + + void close() { + closed = true; + _cancelAll(const ClosedException()); + receivePort.close(); + } +} + +class RequestPortServer { + final SendPort port; + + RequestPortServer(this.port); + + open(Future Function(Object? message) handle) { + return PortServer.forSendPort(port, handle); + } +} + +class PortServer { + final ReceivePort _receivePort = ReceivePort(); + final Future Function(Object? message) handle; + final SendPort? replyPort; + + PortServer(this.handle) : replyPort = null { + _init(); + } + + PortServer.forSendPort(SendPort port, this.handle) : replyPort = port { + port.send(_InitMessage(_receivePort.sendPort)); + _init(); + } + + SendPort get sendPort { + return _receivePort.sendPort; + } + + SerializedPortClient client() { + return SerializedPortClient(sendPort); + } + + void close() { + _receivePort.close(); + } + + void _init() { + _receivePort.listen((request) async { + if (request is _FireMessage) { + handle(request.message); + } else if (request is _RequestMessage) { + if (request.id == 0) { + // Fire and forget + handle(request.message); + } else { + final replyPort = request.reply ?? this.replyPort; + try { + var result = await handle(request.message); + replyPort!.send(_PortChannelResult.success(request.id, result)); + } catch (e, stacktrace) { + replyPort! + .send(_PortChannelResult.error(request.id, e, stacktrace)); + } + } + } + }); + } +} + +const _closeMessage = '_Close'; + +class _InitMessage { + final SendPort port; + + _InitMessage(this.port); +} + +class _FireMessage { + final Object message; + + const _FireMessage(this.message); +} + +class _RequestMessage { + final int id; + final Object message; + final SendPort? reply; + + _RequestMessage(this.id, this.message, this.reply); +} + +class ClosedException implements Exception { + const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } +} + +class _PortChannelResult { + final int requestId; + final bool success; + final T? _result; + final Object? _error; + final StackTrace? stackTrace; + + const _PortChannelResult.success(this.requestId, T result) + : success = true, + _error = null, + stackTrace = null, + _result = result; + const _PortChannelResult.error(this.requestId, Object error, + [this.stackTrace]) + : success = false, + _result = null, + _error = error; + + T get value { + if (success) { + return _result as T; + } else { + if (_error != null && stackTrace != null) { + Error.throwWithStackTrace(_error, stackTrace!); + } else { + throw _error!; + } + } + } + + T get result { + assert(success); + return _result as T; + } + + Object get error { + assert(!success); + return _error!; + } +} diff --git a/packages/sqlite_async/lib/src/common/port_channel_stub.dart b/packages/sqlite_async/lib/src/common/port_channel_stub.dart new file mode 100644 index 0000000..6c6e5cc --- /dev/null +++ b/packages/sqlite_async/lib/src/common/port_channel_stub.dart @@ -0,0 +1,149 @@ +import 'dart:async'; +import 'dart:collection'; + +typedef SendPort = Never; +typedef ReceivePort = Never; +typedef Isolate = Never; + +Never _stub() { + throw UnsupportedError('Isolates are not supported on this platform'); +} + +abstract class PortClient { + Future post(Object message); + void fire(Object message); + + factory PortClient.parent() { + return ParentPortClient(); + } + + factory PortClient.child(SendPort upstream) { + return ChildPortClient(upstream); + } +} + +class ParentPortClient implements PortClient { + late Future sendPortFuture; + SendPort? sendPort; + bool closed = false; + + Map> handlers = HashMap(); + + ParentPortClient(); + + Future get ready async { + await sendPortFuture; + } + + @override + Future post(Object message) async { + _stub(); + } + + @override + void fire(Object message) async { + _stub(); + } + + RequestPortServer server() { + _stub(); + } + + void close() { + _stub(); + } + + tieToIsolate(Isolate isolate) { + _stub(); + } +} + +class SerializedPortClient { + final SendPort sendPort; + + SerializedPortClient(this.sendPort); + + ChildPortClient open() { + return ChildPortClient(sendPort); + } +} + +class ChildPortClient implements PortClient { + final SendPort sendPort; + ReceivePort get receivePort => _stub(); + bool closed = false; + + final Map> handlers = HashMap(); + + ChildPortClient(this.sendPort); + + @override + Future post(Object message) async { + _stub(); + } + + @override + void fire(Object message) { + _stub(); + } + + void close() { + _stub(); + } +} + +class RequestPortServer { + final SendPort port; + + RequestPortServer(this.port); + + open(Future Function(Object? message) handle) { + _stub(); + } +} + +class PortServer { + final Future Function(Object? message) handle; + final SendPort? replyPort; + + PortServer(this.handle) : replyPort = null; + + PortServer.forSendPort(SendPort port, this.handle) : replyPort = port; + + SendPort get sendPort { + _stub(); + } + + SerializedPortClient client() { + return SerializedPortClient(sendPort); + } + + void close() { + _stub(); + } +} + +class ClosedException implements Exception { + const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } +} diff --git a/packages/sqlite_async/lib/src/common/sqlite_database.dart b/packages/sqlite_async/lib/src/common/sqlite_database.dart index f8e0be5..3cb12bb 100644 --- a/packages/sqlite_async/lib/src/common/sqlite_database.dart +++ b/packages/sqlite_async/lib/src/common/sqlite_database.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; +import 'package:sqlite_async/src/impl/single_connection_database.dart'; import 'package:sqlite_async/src/impl/sqlite_database_impl.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; @@ -82,4 +83,24 @@ abstract class SqliteDatabase {int maxReaders = SqliteDatabase.defaultMaxReaders}) { return SqliteDatabaseImpl.withFactory(openFactory, maxReaders: maxReaders); } + + /// Opens a [SqliteDatabase] that only wraps an underlying connection. + /// + /// This function may be useful in some instances like tests, but should not + /// typically be used by applications. Compared to the other ways to open + /// databases, it has the following downsides: + /// + /// 1. No connection pool / concurrent readers for native databases. + /// 2. No reliable update notifications on the web. + /// 3. There is no reliable transaction management in Dart, and opening the + /// same database with [SqliteDatabase.singleConnection] multiple times + /// may cause "database is locked" errors. + /// + /// Together with [SqliteConnection.synchronousWrapper], this can be used to + /// open in-memory databases (e.g. via [SqliteOpenFactory.open]). That + /// bypasses most convenience features, but may still be useful for + /// short-lived databases used in tests. + factory SqliteDatabase.singleConnection(SqliteConnection connection) { + return SingleConnectionDatabase(connection); + } } diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart new file mode 100644 index 0000000..0e3eef6 --- /dev/null +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -0,0 +1,208 @@ +import 'package:sqlite3/common.dart'; + +import '../sqlite_connection.dart'; + +/// A context that can be used to run both reading and writing queries - +/// basically a [SqliteWriteContext] without the ability to start transactions. +/// +/// Instances of this are not given out to clients - instead, they are wrapped +/// with [ScopedReadContext] and [ScopedWriteContext] after obtaining a lock. +/// Those wrapped views have a shorter lifetime (they can be closed +/// independently, and verify that they're not being used after being closed). +abstract base class UnscopedContext implements SqliteReadContext { + Future execute(String sql, List parameters); + Future executeBatch(String sql, List> parameterSets); + + /// Returns an [UnscopedContext] useful as the outermost transaction. + /// + /// This is called by [ScopedWriteContext.writeTransaction] _after_ executing + /// the first `BEGIN` statement. + /// This is used on the web to assert that the auto-commit state is false + /// before running statements. + UnscopedContext interceptOutermostTransaction() { + return this; + } +} + +/// A view over an [UnscopedContext] implementing [SqliteReadContext]. +final class ScopedReadContext implements SqliteReadContext { + final UnscopedContext _context; + + /// Whether this context view is locked on an inner operation like a + /// transaction. + /// + /// We don't use a mutex because we don't want to serialize access - we just + /// want to forbid concurrent operations. + bool _isLocked = false; + + /// Whether this particular view of a read context has been closed, e.g. + /// because the callback owning it has returned. + bool _closed = false; + + ScopedReadContext(this._context); + + void _checkNotLocked() { + _checkStillOpen(); + + if (_isLocked) { + throw StateError( + 'The context from the callback was locked, e.g. due to a nested ' + 'transaction.'); + } + } + + void _checkStillOpen() { + if (_closed) { + throw StateError('This context to a callback is no longer open. ' + 'Make sure to await all statements on a database to avoid a context ' + 'still being used after its callback has finished.'); + } + } + + @override + bool get closed => _closed || _context.closed; + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) async { + _checkNotLocked(); + return await _context.computeWithDatabase(compute); + } + + @override + Future get(String sql, [List parameters = const []]) async { + _checkNotLocked(); + return _context.get(sql, parameters); + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.getAll(sql, parameters); + } + + @override + Future getAutoCommit() async { + _checkStillOpen(); + return _context.getAutoCommit(); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return _context.getOptional(sql, parameters); + } + + void invalidate() => _closed = true; + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] read-access to the database. + /// + /// Assumes that a read lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeReadLock( + UnscopedContext unsafe, + Future Function(SqliteReadContext) callback, + ) async { + final scoped = ScopedReadContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} + +final class ScopedWriteContext extends ScopedReadContext + implements SqliteWriteContext { + /// The "depth" of virtual nested transaction. + /// + /// A value of `0` indicates that this is operating outside of a transaction. + /// A value of `1` indicates a regular transaction (guarded with `BEGIN` and + /// `COMMIT` statements). + /// All higher values indicate a nested transaction implemented with + /// savepoint statements. + final int transactionDepth; + + ScopedWriteContext(super._context, {this.transactionDepth = 0}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.execute(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + _checkNotLocked(); + + return await _context.executeBatch(sql, parameterSets); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback) async { + _checkNotLocked(); + final (begin, commit, rollback) = _beginCommitRollback(transactionDepth); + ScopedWriteContext? inner; + + final innerContext = transactionDepth == 0 + ? _context.interceptOutermostTransaction() + : _context; + + try { + _isLocked = true; + + await _context.execute(begin, const []); + + inner = ScopedWriteContext(innerContext, + transactionDepth: transactionDepth + 1); + final result = await callback(inner); + await innerContext.execute(commit, const []); + return result; + } catch (e) { + try { + await innerContext.execute(rollback, const []); + } catch (e) { + // In rare cases, a ROLLBACK may fail. + // Safe to ignore. + } + rethrow; + } finally { + _isLocked = false; + inner?.invalidate(); + } + } + + static (String, String, String) _beginCommitRollback(int level) { + return switch (level) { + 0 => ('BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK'), + final nested => ( + 'SAVEPOINT s$nested', + 'RELEASE s$nested', + 'ROLLBACK TO s$nested' + ) + }; + } + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] access to the database. + /// + /// Assumes that a write lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeWriteLock( + UnscopedContext unsafe, + Future Function(SqliteWriteContext) callback, + ) async { + final scoped = ScopedWriteContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart new file mode 100644 index 0000000..4cd3144 --- /dev/null +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -0,0 +1,60 @@ +import 'package:sqlite3/common.dart'; +import 'package:sqlite_async/sqlite_async.dart'; + +/// A database implementation that delegates everything to a single connection. +/// +/// This doesn't provide an automatic connection pool or the web worker +/// management, but it can still be useful in cases like unit tests where those +/// features might not be necessary. Since only a single sqlite connection is +/// used internally, this also allows using in-memory databases. +final class SingleConnectionDatabase + with SqliteQueries, SqliteDatabaseMixin + implements SqliteDatabase { + final SqliteConnection connection; + + SingleConnectionDatabase(this.connection); + + @override + Future close() => connection.close(); + + @override + bool get closed => connection.closed; + + @override + Future getAutoCommit() => connection.getAutoCommit(); + + @override + Future get isInitialized => Future.value(); + + @override + IsolateConnectionFactory isolateConnectionFactory() { + throw UnsupportedError( + "SqliteDatabase.singleConnection instances can't be used across " + 'isolates.'); + } + + @override + int get maxReaders => 1; + + @override + AbstractDefaultSqliteOpenFactory get openFactory => + throw UnimplementedError(); + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout, String? debugContext}) { + return connection.readLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + } + + @override + Stream get updates => + connection.updates ?? const Stream.empty(); + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout, String? debugContext}) { + return connection.writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + } +} diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 7df4ac8..e2df0f3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:developer'; import 'dart:isolate'; import 'package:sqlite3/sqlite3.dart' as sqlite; @@ -10,8 +11,10 @@ import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import '../../impl/context.dart'; import 'upstream_updates.dart'; typedef TxCallback = Future Function(CommonDatabase db); @@ -33,20 +36,24 @@ class SqliteConnectionImpl final String? debugName; final bool readOnly; - SqliteConnectionImpl( - {required openFactory, - required Mutex mutex, - SerializedPortClient? upstreamPort, - Stream? updates, - this.debugName, - this.readOnly = false, - bool primary = false}) - : _writeMutex = mutex { - isInitialized = _isolateClient.ready; + final bool profileQueries; + bool _didOpenSuccessfully = false; + + SqliteConnectionImpl({ + required AbstractDefaultSqliteOpenFactory openFactory, + required Mutex mutex, + SerializedPortClient? upstreamPort, + Stream? updates, + this.debugName, + this.readOnly = false, + bool primary = false, + }) : _writeMutex = mutex, + profileQueries = openFactory.sqliteOptions.profileQueries { this.upstreamPort = upstreamPort ?? listenForEvents(); // Accept an incoming stream of updates, or expose one if not given. this.updates = updates ?? updatesController.stream; - _open(openFactory, primary: primary, upstreamPort: this.upstreamPort); + isInitialized = + _open(openFactory, primary: primary, upstreamPort: this.upstreamPort); } Future get ready async { @@ -58,6 +65,11 @@ class SqliteConnectionImpl return _isolateClient.closed; } + _UnsafeContext _context() { + return _UnsafeContext( + _isolateClient, profileQueries ? TimelineTask() : null); + } + @override Future getAutoCommit() async { if (closed) { @@ -65,7 +77,7 @@ class SqliteConnectionImpl } // We use a _TransactionContext without a lock here. // It is safe to call this in the middle of another transaction. - final ctx = _TransactionContext(_isolateClient); + final ctx = _context(); try { return await ctx.getAutoCommit(); } finally { @@ -90,6 +102,7 @@ class SqliteConnectionImpl _isolateClient.tieToIsolate(_isolate); _isolate.resume(_isolate.pauseCapability!); await _isolateClient.ready; + _didOpenSuccessfully = true; }); } @@ -97,15 +110,18 @@ class SqliteConnectionImpl Future close() async { eventsPort?.close(); await _connectionMutex.lock(() async { - if (readOnly) { - await _isolateClient.post(const _SqliteIsolateConnectionClose()); - } else { - // In some cases, disposing a write connection lock the database. - // We use the lock here to avoid "database is locked" errors. - await _writeMutex.lock(() async { + if (_didOpenSuccessfully) { + if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); - }); + } else { + // In some cases, disposing a write connection lock the database. + // We use the lock here to avoid "database is locked" errors. + await _writeMutex.lock(() async { + await _isolateClient.post(const _SqliteIsolateConnectionClose()); + }); + } } + _isolate.kill(); }); } @@ -120,9 +136,9 @@ class SqliteConnectionImpl // Private lock to synchronize this with other statements on the same connection, // to ensure that transactions aren't interleaved. return _connectionMutex.lock(() async { - final ctx = _TransactionContext(_isolateClient); + final ctx = _context(); try { - return await callback(ctx); + return await ScopedReadContext.assumeReadLock(ctx, callback); } finally { await ctx.close(); } @@ -143,9 +159,9 @@ class SqliteConnectionImpl } // DB lock so that only one write happens at a time return await _writeMutex.lock(() async { - final ctx = _TransactionContext(_isolateClient); + final ctx = _context(); try { - return await callback(ctx); + return await ScopedWriteContext.assumeWriteLock(ctx, callback); } finally { await ctx.close(); } @@ -162,12 +178,14 @@ class SqliteConnectionImpl int _nextCtxId = 1; -class _TransactionContext implements SqliteWriteContext { +final class _UnsafeContext extends UnscopedContext { final PortClient _sendPort; bool _closed = false; final int ctxId = _nextCtxId++; - _TransactionContext(this._sendPort); + final TimelineTask? task; + + _UnsafeContext(this._sendPort, this.task); @override bool get closed { @@ -187,8 +205,13 @@ class _TransactionContext implements SqliteWriteContext { throw sqlite.SqliteException(0, 'Transaction closed', null, sql); } try { - var future = _sendPort.post( - _SqliteIsolateStatement(ctxId, sql, parameters, readOnly: false)); + var future = _sendPort.post(_SqliteIsolateStatement( + ctxId, + sql, + parameters, + readOnly: false, + timelineTask: task?.pass(), + )); return await future; } on sqlite.SqliteException catch (e) { @@ -314,74 +337,101 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, Timer(const Duration(milliseconds: 1), maybeFireUpdates); }); - server.open((data) async { - if (data is _SqliteIsolateClose) { - // This is a transaction close message + ResultSet runStatement(_SqliteIsolateStatement data) { + if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { if (txId != null) { - if (!db.autocommit) { - db.execute('ROLLBACK'); - } - txId = null; - txError = null; - throw sqlite.SqliteException( - 0, 'Transaction must be closed within the read or write lock'); + // This will error on db.select } - // We would likely have received updates by this point - fire now. - maybeFireUpdates(); - return null; - } else if (data is _SqliteIsolateStatement) { - if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { - if (txId != null) { - // This will error on db.select + txId = data.ctxId; + } else if (txId != null && txId != data.ctxId) { + // Locks should prevent this from happening + throw sqlite.SqliteException( + 0, 'Mixed transactions: $txId and ${data.ctxId}'); + } else if (data.sql == 'ROLLBACK') { + // This is the only valid way to clear an error + txError = null; + txId = null; + } else if (txError != null) { + // Any statement (including COMMIT) after the first error will also error, until the + // transaction is aborted. + throw txError!; + } else if (data.sql == 'COMMIT' || data.sql == 'END TRANSACTION') { + txId = null; + } + try { + final result = db.select(data.sql, mapParameters(data.args)); + return result; + } catch (err) { + if (txId != null) { + if (db.autocommit) { + // Transaction rolled back + txError = sqlite.SqliteException(0, + 'Transaction rolled back by earlier statement: ${err.toString()}'); + } else { + // Recoverable error } - txId = data.ctxId; - } else if (txId != null && txId != data.ctxId) { - // Locks should prevent this from happening - throw sqlite.SqliteException( - 0, 'Mixed transactions: $txId and ${data.ctxId}'); - } else if (data.sql == 'ROLLBACK') { - // This is the only valid way to clear an error - txError = null; - txId = null; - } else if (txError != null) { - // Any statement (including COMMIT) after the first error will also error, until the - // transaction is aborted. - throw txError!; - } else if (data.sql == 'COMMIT' || data.sql == 'END TRANSACTION') { - txId = null; } - try { - final result = db.select(data.sql, mapParameters(data.args)); - return result; - } catch (err) { + rethrow; + } + } + + Future handle(_RemoteIsolateRequest data, TimelineTask? task) async { + switch (data) { + case _SqliteIsolateClose(): + // This is a transaction close message if (txId != null) { - if (db.autocommit) { - // Transaction rolled back - txError = sqlite.SqliteException(0, - 'Transaction rolled back by earlier statement: ${err.toString()}'); - } else { - // Recoverable error + if (!db.autocommit) { + db.execute('ROLLBACK'); } + txId = null; + txError = null; + throw sqlite.SqliteException( + 0, 'Transaction must be closed within the read or write lock'); } - rethrow; - } - } else if (data is _SqliteIsolateClosure) { - try { - return await data.cb(db); - } finally { + // We would likely have received updates by this point - fire now. maybeFireUpdates(); - } - } else if (data is _SqliteIsolateConnectionClose) { - db.dispose(); - return null; - } else { + return null; + case _SqliteIsolateStatement(): + return task.timeSync( + 'execute_remote', + () => runStatement(data), + sql: data.sql, + parameters: data.args, + ); + case _SqliteIsolateClosure(): + try { + return await data.cb(db); + } finally { + maybeFireUpdates(); + } + case _SqliteIsolateConnectionClose(): + db.dispose(); + return null; + } + } + + server.open((data) async { + if (data is! _RemoteIsolateRequest) { throw ArgumentError('Unknown data type $data'); } + + final task = switch (data.timelineTask) { + null => null, + final id => TimelineTask.withTaskId(id), + }; + + return await handle(data, task); }); commandPort.listen((data) async {}); } +sealed class _RemoteIsolateRequest { + final int? timelineTask; + + const _RemoteIsolateRequest({required this.timelineTask}); +} + class _SqliteConnectionParams { final RequestPortServer portServer; final bool readOnly; @@ -398,28 +448,28 @@ class _SqliteConnectionParams { required this.primary}); } -class _SqliteIsolateStatement { +class _SqliteIsolateStatement extends _RemoteIsolateRequest { final int ctxId; final String sql; final List args; final bool readOnly; _SqliteIsolateStatement(this.ctxId, this.sql, this.args, - {this.readOnly = false}); + {this.readOnly = false, super.timelineTask}); } -class _SqliteIsolateClosure { +class _SqliteIsolateClosure extends _RemoteIsolateRequest { final TxCallback cb; - _SqliteIsolateClosure(this.cb); + _SqliteIsolateClosure(this.cb, {super.timelineTask}); } -class _SqliteIsolateClose { +class _SqliteIsolateClose extends _RemoteIsolateRequest { final int ctxId; - const _SqliteIsolateClose(this.ctxId); + const _SqliteIsolateClose(this.ctxId, {super.timelineTask}); } -class _SqliteIsolateConnectionClose { - const _SqliteIsolateConnectionClose(); +class _SqliteIsolateConnectionClose extends _RemoteIsolateRequest { + const _SqliteIsolateConnectionClose({super.timelineTask}); } diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 5cb60f3..7bea111 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -34,8 +34,8 @@ class SqliteDatabaseImpl @override @protected - // Native doesn't require any asynchronous initialization - late Future isInitialized = Future.value(); + // ignore: invalid_use_of_protected_member + late Future isInitialized = _internalConnection.isInitialized; late final SqliteConnectionImpl _internalConnection; late final SqliteConnectionPool _pool; diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index f1b721a..0e360fc 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -1,10 +1,14 @@ import 'dart:async'; import 'package:sqlite3/common.dart' as sqlite; +import 'package:sqlite_async/mutex.dart'; +import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/update_notification.dart'; +import 'common/connection/sync_sqlite_connection.dart'; + /// Abstract class representing calls available in a read-only or read-write context. -abstract class SqliteReadContext { +abstract interface class SqliteReadContext { /// Execute a read-only (SELECT) query and return the results. Future getAll(String sql, [List parameters = const []]); @@ -62,7 +66,7 @@ abstract class SqliteReadContext { } /// Abstract class representing calls available in a read-write context. -abstract class SqliteWriteContext extends SqliteReadContext { +abstract interface class SqliteWriteContext extends SqliteReadContext { /// Execute a write query (INSERT, UPDATE, DELETE) and return the results (if any). Future execute(String sql, [List parameters = const []]); @@ -71,10 +75,48 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// parameter set. This is faster than executing separately with each /// parameter set. Future executeBatch(String sql, List> parameterSets); + + /// Open a read-write transaction on this write context. + /// + /// When called on a [SqliteConnection], this takes a global lock - only one + /// write write transaction can execute against the database at a time. This + /// applies even when constructing separate [SqliteDatabase] instances for the + /// same database file. + /// + /// Statements within the transaction must be done on the provided + /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] + /// instance will error. + /// It is forbidden to use the [SqliteWriteContext] after the [callback] + /// completes. + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback); } /// Abstract class representing a connection to the SQLite database. -abstract class SqliteConnection extends SqliteWriteContext { +/// +/// This package typically pools multiple [SqliteConnection] instances into a +/// managed [SqliteDatabase] automatically. +abstract interface class SqliteConnection extends SqliteWriteContext { + /// Default constructor for subclasses. + SqliteConnection(); + + /// Creates a [SqliteConnection] instance that wraps a raw [CommonDatabase] + /// from the `sqlite3` package. + /// + /// Users should not typically create connections manually at all. Instead, + /// open a [SqliteDatabase] through a factory. In special scenarios where it + /// may be easier to wrap a [raw] databases (like unit tests), this method + /// may be used as an escape hatch for the asynchronous wrappers provided by + /// this package. + /// + /// When [profileQueries] is enabled (it's enabled by default outside of + /// release builds, queries are posted to the `dart:developer` timeline). + factory SqliteConnection.synchronousWrapper(CommonDatabase raw, + {Mutex? mutex, bool? profileQueries}) { + return SyncSqliteConnection(raw, mutex ?? Mutex(), + profileQueries: profileQueries); + } + /// Reports table change update notifications Stream? get updates; @@ -96,6 +138,7 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] /// instance will error. + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}); diff --git a/packages/sqlite_async/lib/src/sqlite_options.dart b/packages/sqlite_async/lib/src/sqlite_options.dart index 3767213..0489f6b 100644 --- a/packages/sqlite_async/lib/src/sqlite_options.dart +++ b/packages/sqlite_async/lib/src/sqlite_options.dart @@ -30,19 +30,27 @@ class SqliteOptions { /// Set to null or [Duration.zero] to fail immediately when the database is locked. final Duration? lockTimeout; - const SqliteOptions.defaults() - : journalMode = SqliteJournalMode.wal, - journalSizeLimit = 6 * 1024 * 1024, // 1.5x the default checkpoint size - synchronous = SqliteSynchronous.normal, - webSqliteOptions = const WebSqliteOptions.defaults(), - lockTimeout = const Duration(seconds: 30); - - const SqliteOptions( - {this.journalMode = SqliteJournalMode.wal, - this.journalSizeLimit = 6 * 1024 * 1024, - this.synchronous = SqliteSynchronous.normal, - this.webSqliteOptions = const WebSqliteOptions.defaults(), - this.lockTimeout = const Duration(seconds: 30)}); + /// Whether queries should be added to the `dart:developer` timeline. + /// + /// By default, this is enabled if the `dart.vm.product` compile-time variable + /// is not set to `true`. For Flutter apps, this means that [profileQueries] + /// is enabled by default in debug and profile mode. + final bool profileQueries; + + const factory SqliteOptions.defaults() = SqliteOptions; + + const SqliteOptions({ + this.journalMode = SqliteJournalMode.wal, + this.journalSizeLimit = 6 * 1024 * 1024, + this.synchronous = SqliteSynchronous.normal, + this.webSqliteOptions = const WebSqliteOptions.defaults(), + this.lockTimeout = const Duration(seconds: 30), + this.profileQueries = _profileQueriesByDefault, + }); + + // https://api.flutter.dev/flutter/foundation/kReleaseMode-constant.html + static const _profileQueriesByDefault = + !bool.fromEnvironment('dart.vm.product'); } /// SQLite journal mode. Set on the primary connection. diff --git a/packages/sqlite_async/lib/src/sqlite_queries.dart b/packages/sqlite_async/lib/src/sqlite_queries.dart index dc91dd1..367d23f 100644 --- a/packages/sqlite_async/lib/src/sqlite_queries.dart +++ b/packages/sqlite_async/lib/src/sqlite_queries.dart @@ -44,25 +44,23 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Stream watch(String sql, {List parameters = const [], Duration throttle = const Duration(milliseconds: 30), - Iterable? triggerOnTables}) async* { + Iterable? triggerOnTables}) { assert(updates != null, 'updates stream must be provided to allow query watching'); - final tables = - triggerOnTables ?? await getSourceTables(this, sql, parameters); - final filteredStream = - updates!.transform(UpdateNotification.filterTablesTransformer(tables)); - final throttledStream = UpdateNotification.throttleStream( - filteredStream, throttle, - addOne: UpdateNotification.empty()); - // FIXME: - // When the subscription is cancelled, this performs a final query on the next - // update. - // The loop only stops once the "yield" is reached. - // Using asyncMap instead of a generator would solve it, but then the body - // here can't be async for getSourceTables(). - await for (var _ in throttledStream) { - yield await getAll(sql, parameters); + Stream watchInner(Iterable trigger) { + return onChange( + trigger, + throttle: throttle, + triggerImmediately: true, + ).asyncMap((_) => getAll(sql, parameters)); + } + + if (triggerOnTables case final knownTrigger?) { + return watchInner(knownTrigger); + } else { + return Stream.fromFuture(getSourceTables(this, sql, parameters)) + .asyncExpand(watchInner); } } @@ -109,7 +107,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) async { return writeLock((ctx) async { - return await internalWriteTransaction(ctx, callback); + return ctx.writeTransaction(callback); }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); } diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 8141df9..21f3541 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -52,68 +52,138 @@ class UpdateNotification { static Stream throttleStream( Stream input, Duration timeout, {UpdateNotification? addOne}) { - return _throttleStream(input, timeout, addOne: addOne, throttleFirst: true, - add: (a, b) { - return a.union(b); - }); + return _throttleStream( + input: input, + timeout: timeout, + throttleFirst: true, + add: (a, b) => a.union(b), + addOne: addOne, + ); } /// Filter an update stream by specific tables. static StreamTransformer filterTablesTransformer(Iterable tables) { Set normalized = {for (var table in tables) table.toLowerCase()}; - return StreamTransformer.fromHandlers(handleData: (data, sink) { - if (data.containsAny(normalized)) { - sink.add(data); - } - }); + return StreamTransformer.fromBind( + (source) => source.where((data) => data.containsAny(normalized))); } } -/// Given a broadcast stream, return a singular throttled stream that is throttled. -/// This immediately starts listening. +/// Throttles an [input] stream to not emit events more often than with a +/// frequency of 1/[timeout]. +/// +/// When an event is received and no timeout window is active, it is forwarded +/// downstream and a timeout window is started. For events received within a +/// timeout window, [add] is called to fold events. Then when the window +/// expires, pending events are emitted. +/// The subscription to the [input] stream is never paused. +/// +/// When the returned stream is paused, an active timeout window is reset and +/// restarts after the stream is resumed. /// -/// Behaviour: -/// If there was no event in "timeout", and one comes in, it is pushed immediately. -/// Otherwise, we wait until the timeout is over. -Stream _throttleStream(Stream input, Duration timeout, - {bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* { - var nextPing = Completer(); - T? lastData; - - var listener = input.listen((data) { - if (lastData is T && add != null) { - lastData = add(lastData as T, data); - } else { - lastData = data; +/// If [addOne] is not null, that event will always be added when the stream is +/// subscribed to. +/// When [throttleFirst] is true, a timeout window begins immediately after +/// listening (so that the first event, apart from [addOne], is emitted no +/// earlier than after [timeout]). +Stream _throttleStream({ + required Stream input, + required Duration timeout, + required bool throttleFirst, + required T Function(T, T) add, + required T? addOne, +}) { + return Stream.multi((listener) { + T? pendingData; + Timer? activeTimeoutWindow; + var needsTimeoutWindowAfterResume = false; + + /// Add pending data, bypassing the active timeout window. + /// + /// This is used to forward error and done events immediately. + bool addPendingEvents() { + if (pendingData case final data?) { + pendingData = null; + listener.addSync(data); + activeTimeoutWindow?.cancel(); + activeTimeoutWindow = null; + return true; + } else { + return false; + } } - if (!nextPing.isCompleted) { - nextPing.complete(); + + late void Function() setTimeout; + + /// Emits [pendingData] if no timeout window is active, and then starts a + /// timeout window if necessary. + void maybeEmit() { + if (activeTimeoutWindow == null && !listener.isPaused) { + final didAdd = addPendingEvents(); + if (didAdd) { + // Schedule a pause after resume if the subscription was paused + // directly in response to receiving the event. Otherwise, begin the + // timeout window immediately. + if (listener.isPaused) { + needsTimeoutWindowAfterResume = true; + } else { + setTimeout(); + } + } + } } - }); - try { + setTimeout = () { + activeTimeoutWindow = Timer(timeout, () { + activeTimeoutWindow = null; + maybeEmit(); + }); + }; + + void onData(T data) { + pendingData = switch (pendingData) { + null => data, + final pending => add(pending, data), + }; + maybeEmit(); + } + + void onError(Object error, StackTrace trace) { + addPendingEvents(); + listener.addErrorSync(error, trace); + } + + void onDone() { + addPendingEvents(); + listener.closeSync(); + } + + final subscription = input.listen(onData, onError: onError, onDone: onDone); + + listener.onPause = () { + needsTimeoutWindowAfterResume = activeTimeoutWindow != null; + activeTimeoutWindow?.cancel(); + activeTimeoutWindow = null; + }; + listener.onResume = () { + if (needsTimeoutWindowAfterResume) { + setTimeout(); + } else { + maybeEmit(); + } + }; + listener.onCancel = () async { + activeTimeoutWindow?.cancel(); + return subscription.cancel(); + }; + if (addOne != null) { - yield addOne; + // This must not be sync, we're doing this directly in onListen + listener.add(addOne); } if (throttleFirst) { - await Future.delayed(timeout); - } - while (true) { - // If a value is available now, we'll use it immediately. - // If not, this waits for it. - await nextPing.future; - // Capture any new values coming in while we wait. - nextPing = Completer(); - T data = lastData as T; - // Clear before we yield, so that we capture new changes while yielding - lastData = null; - yield data; - // Wait a minimum of this duration between tasks - await Future.delayed(timeout); + setTimeout(); } - } finally { - listener.cancel(); - } + }); } diff --git a/packages/sqlite_async/lib/src/utils/profiler.dart b/packages/sqlite_async/lib/src/utils/profiler.dart new file mode 100644 index 0000000..bffbf39 --- /dev/null +++ b/packages/sqlite_async/lib/src/utils/profiler.dart @@ -0,0 +1,64 @@ +import 'dart:developer'; + +extension TimeSync on TimelineTask? { + T timeSync(String name, TimelineSyncFunction function, + {String? sql, List? parameters}) { + final currentTask = this; + if (currentTask == null) { + return function(); + } + + final (resolvedName, args) = + profilerNameAndArgs(name, sql: sql, parameters: parameters); + currentTask.start(resolvedName, arguments: args); + + try { + return function(); + } finally { + currentTask.finish(); + } + } + + Future timeAsync(String name, TimelineSyncFunction> function, + {String? sql, List? parameters}) { + final currentTask = this; + if (currentTask == null) { + return function(); + } + + final (resolvedName, args) = + profilerNameAndArgs(name, sql: sql, parameters: parameters); + currentTask.start(resolvedName, arguments: args); + + return Future.sync(function).whenComplete(() { + currentTask.finish(); + }); + } +} + +(String, Map) profilerNameAndArgs(String name, + {String? sql, List? parameters}) { + // On native platforms, we want static names for tasks because every + // unique key here shows up in a separate line in Perfetto: https://github.com/dart-lang/sdk/issues/56274 + // On the web however, the names are embedded in the timeline slices and + // it's convenient to include the SQL there. + const isWeb = bool.fromEnvironment('dart.library.js_interop'); + var resolvedName = '$profilerPrefix$name'; + if (isWeb && sql != null) { + resolvedName = '$resolvedName $sql'; + } + + return ( + resolvedName, + { + if (sql != null) 'sql': sql, + if (parameters != null) + 'parameters': [ + for (final parameter in parameters) + if (parameter is List) '' else parameter + ], + } + ); +} + +const profilerPrefix = 'sqlite_async:'; diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index c911bbc..9faf928 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -21,24 +21,6 @@ Future internalReadTransaction(SqliteReadContext ctx, } } -Future internalWriteTransaction(SqliteWriteContext ctx, - Future Function(SqliteWriteContext tx) callback) async { - try { - await ctx.execute('BEGIN IMMEDIATE'); - final result = await callback(ctx); - await ctx.execute('COMMIT'); - return result; - } catch (e) { - try { - await ctx.execute('ROLLBACK'); - } catch (e) { - // In rare cases, a ROLLBACK may fail. - // Safe to ignore. - } - rethrow; - } -} - /// Given a SELECT query, return the tables that the query depends on. Future> getSourceTablesText( SqliteReadContext ctx, String sql) async { diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index a4f0ddf..3e0797b 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -1,12 +1,16 @@ import 'dart:async'; +import 'dart:developer'; import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; import 'package:sqlite3/common.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; +import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/web.dart'; +import '../impl/context.dart'; import 'protocol.dart'; import 'web_mutex.dart'; @@ -15,6 +19,7 @@ class WebDatabase implements SqliteDatabase, WebSqliteConnection { final Database _database; final Mutex? _mutex; + final bool profileQueries; /// For persistent databases that aren't backed by a shared worker, we use /// web broadcast channels to forward local update events to other tabs. @@ -23,7 +28,12 @@ class WebDatabase @override bool closed = false; - WebDatabase(this._database, this._mutex, {this.broadcastUpdates}); + WebDatabase( + this._database, + this._mutex, { + required this.profileQueries, + this.broadcastUpdates, + }); @override Future close() async { @@ -84,13 +94,9 @@ class WebDatabase Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { if (_mutex case var mutex?) { - return await mutex.lock(() async { - final context = _SharedContext(this); - try { - return await callback(context); - } finally { - context.markClosed(); - } + return await mutex.lock(timeout: lockTimeout, () { + return ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); }); } else { // No custom mutex, coordinate locks through shared worker. @@ -98,7 +104,8 @@ class WebDatabase CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); try { - return await callback(_SharedContext(this)); + return await ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); } finally { await _database.customRequest( CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); @@ -115,30 +122,28 @@ class WebDatabase Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock( - (writeContext) => - internalWriteTransaction(writeContext, (context) async { - // All execute calls done in the callback will be checked for the - // autocommit state - return callback(_ExclusiveTransactionContext(this, writeContext)); - }), + return writeLock((writeContext) { + return ScopedWriteContext.assumeWriteLock( + _UnscopedContext(this), + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); + }, debugContext: 'writeTransaction()', lockTimeout: lockTimeout, flush: flush); } @override - - /// Internal writeLock which intercepts transaction context's to verify auto commit is not active Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { if (_mutex case var mutex?) { - return await mutex.lock(() async { - final context = _ExclusiveContext(this); + return await mutex.lock(timeout: lockTimeout, () async { + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -148,11 +153,10 @@ class WebDatabase // No custom mutex, coordinate locks through shared worker. await _database.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -169,14 +173,16 @@ class WebDatabase } } -class _SharedContext implements SqliteReadContext { +final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; - bool _contextClosed = false; - _SharedContext(this._database); + final TimelineTask? _task; + + _UnscopedContext(this._database) + : _task = _database.profileQueries ? TimelineTask() : null; @override - bool get closed => _contextClosed || _database.closed; + bool get closed => _database.closed; @override Future computeWithDatabase( @@ -194,8 +200,15 @@ class _SharedContext implements SqliteReadContext { @override Future getAll(String sql, [List parameters = const []]) async { - return await wrapSqliteException( - () => _database._database.select(sql, parameters)); + return _task.timeAsync( + 'getAll', + sql: sql, + parameters: parameters, + () async { + return await wrapSqliteException( + () => _database._database.select(sql, parameters)); + }, + ); } @override @@ -210,44 +223,40 @@ class _SharedContext implements SqliteReadContext { return results.firstOrNull; } - void markClosed() { - _contextClosed = true; + @override + Future execute(String sql, [List parameters = const []]) { + return _task.timeAsync('execute', sql: sql, parameters: parameters, () { + return wrapSqliteException( + () => _database._database.select(sql, parameters)); + }); } -} - -class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { - _ExclusiveContext(super.database); @override - Future execute(String sql, - [List parameters = const []]) async { - return wrapSqliteException( - () => _database._database.select(sql, parameters)); + Future executeBatch(String sql, List> parameterSets) { + return _task.timeAsync('executeBatch', sql: sql, () { + return wrapSqliteException(() async { + for (final set in parameterSets) { + // use execute instead of select to avoid transferring rows from the + // worker to this context. + await _database._database.execute(sql, set); + } + }); + }); } @override - Future executeBatch( - String sql, List> parameterSets) async { - return wrapSqliteException(() async { - for (final set in parameterSets) { - // use execute instead of select to avoid transferring rows from the - // worker to this context. - await _database._database.execute(sql, set); - } - }); + UnscopedContext interceptOutermostTransaction() { + // All execute calls done in the callback will be checked for the + // autocommit state + return _ExclusiveTransactionContext(_database); } } -class _ExclusiveTransactionContext extends _ExclusiveContext { - SqliteWriteContext baseContext; - _ExclusiveTransactionContext(super.database, this.baseContext); +final class _ExclusiveTransactionContext extends _UnscopedContext { + _ExclusiveTransactionContext(super._database); - @override - bool get closed => baseContext.closed; - - @override - Future execute(String sql, - [List parameters = const []]) async { + Future _executeInternal( + String sql, List parameters) async { // Operations inside transactions are executed with custom requests // in order to verify that the connection does not have autocommit enabled. // The worker will check if autocommit = true before executing the SQL. @@ -256,9 +265,15 @@ class _ExclusiveTransactionContext extends _ExclusiveContext { // JavaScript object. This is the converted into a Dart ResultSet. return await wrapSqliteException(() async { var res = await _database._database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.executeInTransaction, sql, parameters)); - var result = - Map.from((res as JSObject).dartify() as Map); + CustomDatabaseMessageKind.executeInTransaction, sql, parameters)) + as JSObject; + + if (res.has('format') && (res['format'] as JSNumber).toDartInt == 2) { + // Newer workers use a serialization format more efficient than dartify(). + return proto.deserializeResultSet(res['r'] as JSObject); + } + + var result = Map.from(res.dartify() as Map); final columnNames = [ for (final entry in result['columnNames']) entry as String ]; @@ -285,15 +300,22 @@ class _ExclusiveTransactionContext extends _ExclusiveContext { }); } + @override + Future execute(String sql, + [List parameters = const []]) async { + return _task.timeAsync('execute', sql: sql, parameters: parameters, () { + return _executeInternal(sql, parameters); + }); + } + @override Future executeBatch( String sql, List> parameterSets) async { - return await wrapSqliteException(() async { + return _task.timeAsync('executeBatch', sql: sql, () async { for (final set in parameterSets) { await _database._database.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.executeBatchInTransaction, sql, set)); } - return; }); } } @@ -303,9 +325,14 @@ Future wrapSqliteException(Future Function() callback) async { try { return await callback(); } on RemoteException catch (ex) { + if (ex.exception case final serializedCause?) { + throw serializedCause; + } + + // Older versions of package:sqlite_web reported SqliteExceptions as strings + // only. if (ex.toString().contains('SqliteException')) { RegExp regExp = RegExp(r'SqliteException\((\d+)\)'); - // The SQLite Web package wraps these in remote errors throw SqliteException( int.parse(regExp.firstMatch(ex.message)?.group(1) ?? '0'), ex.message); diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index 0f38b1c..c6d1b75 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -8,7 +8,6 @@ import 'package:sqlite_async/src/common/sqlite_database.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/web/web_mutex.dart'; import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; import 'package:sqlite_async/web.dart'; @@ -43,7 +42,6 @@ class SqliteDatabaseImpl @override AbstractDefaultSqliteOpenFactory openFactory; - late final Mutex mutex; late final WebDatabase _connection; StreamSubscription? _broadcastUpdatesSubscription; @@ -77,15 +75,15 @@ class SqliteDatabaseImpl /// 4. Creating temporary views or triggers. SqliteDatabaseImpl.withFactory(this.openFactory, {this.maxReaders = SqliteDatabase.defaultMaxReaders}) { - mutex = MutexImpl(); // This way the `updates` member is available synchronously updates = updatesController.stream; isInitialized = _init(); } Future _init() async { - _connection = await openFactory.openConnection(SqliteOpenOptions( - primaryConnection: true, readOnly: false, mutex: mutex)) as WebDatabase; + _connection = await openFactory.openConnection( + SqliteOpenOptions(primaryConnection: true, readOnly: false)) + as WebDatabase; final broadcastUpdates = _connection.broadcastUpdates; if (broadcastUpdates == null) { diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index e6206d6..cb3a5fd 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -3,6 +3,7 @@ library; import 'dart:js_interop'; +import 'package:sqlite3_web/protocol_utils.dart' as proto; enum CustomDatabaseMessageKind { requestSharedLock, @@ -19,15 +20,24 @@ extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { required JSString rawKind, JSString rawSql, JSArray rawParameters, + JSArrayBuffer typeInfo, }); factory CustomDatabaseMessage(CustomDatabaseMessageKind kind, [String? sql, List parameters = const []]) { - final rawSql = sql?.toJS ?? ''.toJS; - final rawParameters = - [for (final parameter in parameters) parameter.jsify()].toJS; + final rawSql = (sql ?? '').toJS; + // Serializing parameters this way is backwards-compatible with dartify() + // on the other end, but a bit more efficient while also suppporting sound + // communcation between dart2js workers and dart2wasm clients. + // Older workers ignore the typeInfo, but that's not a problem. + final (rawParameters, typeInfo) = proto.serializeParameters(parameters); + return CustomDatabaseMessage._( - rawKind: kind.name.toJS, rawSql: rawSql, rawParameters: rawParameters); + rawKind: kind.name.toJS, + rawSql: rawSql, + rawParameters: rawParameters, + typeInfo: typeInfo, + ); } external JSString get rawKind; @@ -36,6 +46,9 @@ extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { external JSArray get rawParameters; + /// Not set in earlier versions of this package. + external JSArrayBuffer? get typeInfo; + CustomDatabaseMessageKind get kind { return CustomDatabaseMessageKind.values.byName(rawKind.toDart); } diff --git a/packages/sqlite_async/lib/src/web/web_mutex.dart b/packages/sqlite_async/lib/src/web/web_mutex.dart index 8c2baa5..6972b2f 100644 --- a/packages/sqlite_async/lib/src/web/web_mutex.dart +++ b/packages/sqlite_async/lib/src/web/web_mutex.dart @@ -16,10 +16,9 @@ external Navigator get _navigator; /// Web implementation of [Mutex] class MutexImpl implements Mutex { late final mutex.Mutex fallback; - String? identifier; final String resolvedIdentifier; - MutexImpl({this.identifier}) + MutexImpl({String? identifier}) /// On web a lock name is required for Navigator locks. /// Having exclusive Mutex instances requires a somewhat unique lock name. @@ -40,7 +39,7 @@ class MutexImpl implements Mutex { @override Future lock(Future Function() callback, {Duration? timeout}) { - if ((_navigator as JSObject).hasProperty('locks'.toJS).toDart) { + if (_navigator.has('locks')) { return _webLock(callback, timeout: timeout); } else { return _fallbackLock(callback, timeout: timeout); diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 247999d..a724329 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -15,7 +15,7 @@ Map> webSQLiteImplementations = {}; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory - implements WebSqliteOpenFactory { + with WebSqliteOpenFactory { late final Future _initialized = Future.sync(() { final cacheKey = sqliteOptions.webSqliteOptions.wasmUri + sqliteOptions.webSqliteOptions.workerUri; @@ -45,9 +45,8 @@ class DefaultSqliteOpenFactory ); } - @override - /// This is currently not supported on web + @override CommonDatabase openDB(SqliteOpenOptions options) { throw UnimplementedError( 'Direct access to CommonDatabase is not available on web.'); @@ -61,7 +60,7 @@ class DefaultSqliteOpenFactory /// Due to being asynchronous, the under laying CommonDatabase is not accessible Future openConnection(SqliteOpenOptions options) async { final workers = await _initialized; - final connection = await workers.connectToRecommended(path); + final connection = await connectToWorker(workers, path); // When the database is accessed through a shared worker, we implement // mutexes over custom messages sent through the shared worker. In other @@ -77,7 +76,8 @@ class DefaultSqliteOpenFactory } return WebDatabase(connection.database, options.mutex ?? mutex, - broadcastUpdates: updates); + broadcastUpdates: updates, + profileQueries: sqliteOptions.profileQueries); } @override diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart index 07264bf..5f33b6d 100644 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -107,6 +107,7 @@ class ThrottledCommonDatabase extends CommonDatabase { @override VoidPredicate? get commitFilter => _db.commitFilter; + @override set commitFilter(VoidPredicate? filter) => _db.commitFilter = filter; @override diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 5a6e1b3..3ecb257 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,8 +1,12 @@ import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; +import 'package:meta/meta.dart'; import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'package:sqlite3_web/protocol_utils.dart' as proto; + import 'throttled_common_database.dart'; import '../protocol.dart'; @@ -12,9 +16,9 @@ import '../protocol.dart'; /// can be extended to perform custom requests. base class AsyncSqliteController extends DatabaseController { @override - Future openDatabase( - WasmSqlite3 sqlite3, String path, String vfs) async { - final db = sqlite3.open(path, vfs: vfs); + Future openDatabase(WasmSqlite3 sqlite3, String path, + String vfs, JSAny? additionalData) async { + final db = openUnderlying(sqlite3, path, vfs, additionalData); // Register any custom functions here if needed @@ -23,6 +27,18 @@ base class AsyncSqliteController extends DatabaseController { return AsyncSqliteDatabase(database: throttled); } + /// Opens a database with the `sqlite3` package that will be wrapped in a + /// [ThrottledCommonDatabase] for [openDatabase]. + @visibleForOverriding + CommonDatabase openUnderlying( + WasmSqlite3 sqlite3, + String path, + String vfs, + JSAny? additionalData, + ) { + return sqlite3.open(path, vfs: vfs); + } + @override Future handleCustomRequest( ClientConnection connection, JSAny? request) { @@ -40,9 +56,27 @@ class AsyncSqliteDatabase extends WorkerDatabase { // these requests for shared workers, so we can assume each database is only // opened once and we don't need web locks here. final mutex = ReadWriteMutex(); + final Map _state = {}; AsyncSqliteDatabase({required this.database}); + _ConnectionState _findState(ClientConnection connection) { + return _state.putIfAbsent(connection, _ConnectionState.new); + } + + void _markHoldsMutex(ClientConnection connection) { + final state = _findState(connection); + state.holdsMutex = true; + if (!state.hasOnCloseListener) { + state.hasOnCloseListener = true; + connection.closed.then((_) { + if (state.holdsMutex) { + mutex.release(); + } + }); + } + } + @override Future handleCustomRequest( ClientConnection connection, JSAny? request) async { @@ -51,9 +85,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { switch (message.kind) { case CustomDatabaseMessageKind.requestSharedLock: await mutex.acquireRead(); + _markHoldsMutex(connection); case CustomDatabaseMessageKind.requestExclusiveLock: await mutex.acquireWrite(); + _markHoldsMutex(connection); case CustomDatabaseMessageKind.releaseLock: + _findState(connection).holdsMutex = false; mutex.release(); case CustomDatabaseMessageKind.lockObtained: throw UnsupportedError('This is a response, not a request'); @@ -61,25 +98,32 @@ class AsyncSqliteDatabase extends WorkerDatabase { return database.autocommit.toJS; case CustomDatabaseMessageKind.executeInTransaction: final sql = message.rawSql.toDart; - final parameters = [ - for (final raw in (message.rawParameters).toDart) raw.dartify() - ]; + final hasTypeInfo = message.typeInfo.isDefinedAndNotNull; + final parameters = proto.deserializeParameters( + message.rawParameters, message.typeInfo); if (database.autocommit) { throw SqliteException(0, "Transaction rolled back by earlier statement. Cannot execute: $sql"); } - var res = database.select(sql, parameters); - var dartMap = resultSetToMap(res); - - var jsObject = dartMap.jsify(); + var res = database.select(sql, parameters); + if (hasTypeInfo) { + // If the client is sending a request that has parameters with type + // information, it will also support a newer serialization format for + // result sets. + return JSObject() + ..['format'] = 2.toJS + ..['r'] = proto.serializeResultSet(res); + } else { + var dartMap = resultSetToMap(res); + var jsObject = dartMap.jsify(); + return jsObject; + } - return jsObject; case CustomDatabaseMessageKind.executeBatchInTransaction: final sql = message.rawSql.toDart; - final parameters = [ - for (final raw in (message.rawParameters).toDart) raw.dartify() - ]; + final parameters = proto.deserializeParameters( + message.rawParameters, message.typeInfo); if (database.autocommit) { throw SqliteException(0, "Transaction rolled back by earlier statement. Cannot execute: $sql"); @@ -100,3 +144,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { return resultSetMap; } } + +final class _ConnectionState { + bool hasOnCloseListener = false; + bool holdsMutex = false; +} diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index c7f9628..3a65115 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -2,7 +2,7 @@ /// /// These expose methods allowing database instances to be shared across web /// workers. -library sqlite_async.web; +library; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:web/web.dart'; @@ -31,7 +31,7 @@ typedef WebDatabaseEndpoint = ({ /// /// The [DefaultSqliteOpenFactory] class implements this interface only when /// compiling for the web. -abstract interface class WebSqliteOpenFactory +abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { /// Opens a [WebSqlite] instance for the given [options]. /// @@ -39,7 +39,21 @@ abstract interface class WebSqliteOpenFactory /// opened needs to be customized. Implementers should be aware that the /// result of this method is cached and will be re-used by the open factory /// when provided with the same [options] again. - Future openWebSqlite(WebSqliteOptions options); + Future openWebSqlite(WebSqliteOptions options) async { + return WebSqlite.open( + worker: Uri.parse(options.workerUri), + wasmModule: Uri.parse(options.wasmUri), + ); + } + + /// Uses [WebSqlite] to connects to the recommended database setup for [name]. + /// + /// This typically just calls [WebSqlite.connectToRecommended], but subclasses + /// can customize the behavior where needed. + Future connectToWorker( + WebSqlite sqlite, String name) { + return sqlite.connectToRecommended(name); + } } /// A [SqliteConnection] interface implemented by opened connections when @@ -80,6 +94,7 @@ abstract class WebSqliteConnection implements SqliteConnection { var lock? => Mutex(identifier: lock), null => null, }, + profileQueries: false, ); return database; } @@ -91,6 +106,7 @@ abstract class WebSqliteConnection implements SqliteConnection { /// This only has an effect when IndexedDB storage is used. /// /// See [flush] for details. + @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}); @@ -101,6 +117,7 @@ abstract class WebSqliteConnection implements SqliteConnection { /// This only has an effect when IndexedDB storage is used. /// /// See [flush] for details. + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 1ffa8b4..bb27d60 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.1 +version: 0.11.7 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" @@ -13,7 +13,7 @@ topics: dependencies: sqlite3: ^2.7.2 - sqlite3_web: ^0.2.2 + sqlite3_web: ^0.3.0 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 @@ -21,8 +21,10 @@ dependencies: web: ^1.0.0 dev_dependencies: - dcli: ^4.0.0 - lints: ^3.0.0 + build_runner: ^2.4.14 + build_web_compilers: ^4.1.1 + build_test: ^2.2.3 + lints: ^5.0.0 test: ^1.21.0 test_api: ^0.7.0 glob: ^2.1.1 @@ -31,6 +33,8 @@ dev_dependencies: shelf_static: ^1.1.2 stream_channel: ^2.1.2 path: ^1.9.0 + test_descriptor: ^2.0.2 + fake_async: ^1.3.3 platforms: android: diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index e07daf4..6a315da 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -6,6 +6,7 @@ import 'package:test/test.dart'; import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); +const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm'); void main() { group('Shared Basic Tests', () { @@ -121,52 +122,184 @@ void main() { ['Test Data']); expect(rs.rows[0], equals(['Test Data'])); }); - expect(await savedTx!.getAutoCommit(), equals(true)); + expect(await db.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); - test('should properly report errors in transactions', () async { - final db = await testUtils.setupDatabase(path: path); - await createTables(db); + test( + 'should properly report errors in transactions', + () async { + final db = await testUtils.setupDatabase(path: path); + await createTables(db); - var tp = db.writeTransaction((tx) async { - await tx.execute( - 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', - [1, 'test1']); - await tx.execute( - 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', - [2, 'test2']); - expect(await tx.getAutoCommit(), equals(false)); - try { + var tp = db.writeTransaction((tx) async { await tx.execute( 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', - [2, 'test3']); - } catch (e) { - // Ignore - } + [1, 'test1']); + await tx.execute( + 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', + [2, 'test2']); + expect(await tx.getAutoCommit(), equals(false)); + try { + await tx.execute( + 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', + [2, 'test3']); + } catch (e) { + // Ignore + } - expect(await tx.getAutoCommit(), equals(true)); - expect(tx.closed, equals(false)); + expect(await tx.getAutoCommit(), equals(true)); + expect(tx.closed, equals(false)); - // Will not be executed because of the above rollback - await tx.execute( - 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', - [4, 'test4']); - }); + // Will not be executed because of the above rollback + await tx.execute( + 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', + [4, 'test4']); + }); + + // The error propagates up to the transaction + await expectLater( + tp, + throwsA((e) => + e is SqliteException && + e.message + .contains('Transaction rolled back by earlier statement'))); + + expect(await db.get('SELECT count() count FROM test_data'), + equals({'count': 0})); - // The error propagates up to the transaction + // Check that we can open another transaction afterwards + await db.writeTransaction((tx) async {}); + }, + skip: _isDart2Wasm + ? 'Fails due to compiler bug, https://dartbug.com/59981' + : null, + ); + + test('reports exceptions as SqliteExceptions', () async { + final db = await testUtils.setupDatabase(path: path); await expectLater( - tp, - throwsA((e) => - e is SqliteException && - e.message - .contains('Transaction rolled back by earlier statement'))); + db.get('SELECT invalid_statement;'), + throwsA( + isA() + .having((e) => e.causingStatement, 'causingStatement', + 'SELECT invalid_statement;') + .having((e) => e.extendedResultCode, 'extendedResultCode', 1), + ), + ); + }); + + group('nested transaction', () { + const insert = 'INSERT INTO test_data (description) VALUES(?);'; + late SqliteDatabase db; + + setUp(() async { + db = await testUtils.setupDatabase(path: path); + await createTables(db); + }); + + tearDown(() => db.close()); + + test('run in outer transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); - expect(await db.get('SELECT count() count FROM test_data'), - equals({'count': 0})); + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('can rollback inner transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + await expectLater(() async { + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['third']); + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(3)); + throw 'rollback please'; + }); + }, throwsA(anything)); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('cannot use outer transaction while inner is active', () async { + await db.writeTransaction((outer) async { + await outer.writeTransaction((inner) async { + await expectLater(outer.execute('SELECT 1'), throwsStateError); + }); + }); + }); + + test('cannot use inner after leaving scope', () async { + await db.writeTransaction((tx) async { + late SqliteWriteContext inner; + await tx.writeTransaction((tx) async { + inner = tx; + }); + + await expectLater(inner.execute('SELECT 1'), throwsStateError); + }); + }); + }); + + test('can use raw database instance', () async { + final factory = await testUtils.testFactory(); + final raw = await factory.openDatabaseForSingleConnection(); + // Creating a fuction ensures that this database is actually used - if + // a connection were set up in a background isolate, it wouldn't have this + // function. + raw.createFunction( + functionName: 'my_function', function: (args) => 'test'); + + final db = SqliteDatabase.singleConnection( + SqliteConnection.synchronousWrapper(raw)); + await createTables(db); + + expect(db.updates, emits(UpdateNotification({'test_data'}))); + await db + .execute('INSERT INTO test_data(description) VALUES (my_function())'); + + expect(await db.get('SELECT description FROM test_data'), + {'description': 'test'}); + }); + + test('respects lock timeouts', () async { + // Unfortunately this test can't use fakeAsync because it uses actual + // lock APIs on the web. + final db = await testUtils.setupDatabase(path: path); + final lockAcquired = Completer(); + + final completion = db.writeLock((context) async { + lockAcquired.complete(); + await Future.delayed(const Duration(seconds: 1)); + }); + + await lockAcquired.future; + await expectLater( + () => db.writeLock( + lockTimeout: Duration(milliseconds: 200), (_) async => {}), + throwsA(isA()), + ); - // Check that we can open another transaction afterwards - await db.writeTransaction((tx) async {}); + await completion; + }, onPlatform: { + 'browser': Skip( + 'Web locks are managed with a shared worker, which does not support timeouts', + ) }); }); } diff --git a/packages/sqlite_async/test/close_test.dart b/packages/sqlite_async/test/close_test.dart index dcb3390..6a72f3d 100644 --- a/packages/sqlite_async/test/close_test.dart +++ b/packages/sqlite_async/test/close_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:io'; import 'package:sqlite_async/sqlite_async.dart'; diff --git a/packages/sqlite_async/test/isolate_test.dart b/packages/sqlite_async/test/isolate_test.dart index 60bea87..3a3af98 100644 --- a/packages/sqlite_async/test/isolate_test.dart +++ b/packages/sqlite_async/test/isolate_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:isolate'; import 'package:test/test.dart'; diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index 263ab39..dec1fed 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:async'; import 'dart:math'; @@ -342,6 +344,22 @@ void main() { await Future.wait([f1, f2]); }); + + test('reports open error', () async { + // Ensure that a db that fails to open doesn't report any unhandled + // exceptions. This could happen when e.g. SQLCipher is used and the open + // factory supplies a wrong key pragma (because a subsequent pragma to + // change the journal mode then fails with a "not a database" error). + final db = + SqliteDatabase.withFactory(_InvalidPragmaOnOpenFactory(path: path)); + await expectLater( + db.initialize(), + throwsA( + isA().having( + (e) => e.toString(), 'toString()', contains('syntax error')), + ), + ); + }); }); } @@ -349,3 +367,15 @@ void main() { void ignore(Future future) { future.then((_) {}, onError: (_) {}); } + +class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory { + const _InvalidPragmaOnOpenFactory({required super.path}); + + @override + List pragmaStatements(SqliteOpenOptions options) { + return [ + 'invalid syntax to fail open in test', + ...super.pragmaStatements(options), + ]; + } +} diff --git a/packages/sqlite_async/test/native/native_mutex_test.dart b/packages/sqlite_async/test/native/native_mutex_test.dart index 699a877..e9b5a54 100644 --- a/packages/sqlite_async/test/native/native_mutex_test.dart +++ b/packages/sqlite_async/test/native/native_mutex_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:isolate'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; diff --git a/packages/sqlite_async/test/native/schema_test.dart b/packages/sqlite_async/test/native/schema_test.dart index c358402..423d33b 100644 --- a/packages/sqlite_async/test/native/schema_test.dart +++ b/packages/sqlite_async/test/native/schema_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:async'; import 'package:sqlite_async/sqlite_async.dart'; diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 67077db..4e4fb83 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -1,4 +1,6 @@ @TestOn('!browser') +library; + import 'dart:async'; import 'dart:isolate'; import 'dart:math'; diff --git a/packages/sqlite_async/test/server/worker_server.dart b/packages/sqlite_async/test/server/worker_server.dart index 30cffe9..4d060e2 100644 --- a/packages/sqlite_async/test/server/worker_server.dart +++ b/packages/sqlite_async/test/server/worker_server.dart @@ -1,6 +1,5 @@ import 'dart:io'; -import 'package:dcli/dcli.dart'; import 'package:path/path.dart' as p; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as io; @@ -10,8 +9,7 @@ import 'package:stream_channel/stream_channel.dart'; import 'asset_server.dart'; Future hybridMain(StreamChannel channel) async { - final directory = p.normalize( - p.join(DartScript.self.pathToScriptDirectory, '../../../../assets')); + final directory = p.normalize('../../assets'); final sqliteOutputPath = p.join(directory, 'sqlite3.wasm'); diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart new file mode 100644 index 0000000..05c94c6 --- /dev/null +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -0,0 +1,209 @@ +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:test/test.dart'; + +void main() { + group('Update notifications', () { + const timeout = Duration(seconds: 10); + const halfTimeout = Duration(seconds: 5); + + group('throttle', () { + test('can add initial', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout, + addOne: UpdateNotification({'a'})).listen(events.add); + + control.flushMicrotasks(); + expect(events, hasLength(1)); + control.elapse(halfTimeout); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); // Still a delay from the initial one + + control.elapse(halfTimeout); + expect(events, hasLength(2)); + }); + }); + + test('sends events after initial throttle', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + control.elapse(halfTimeout); + expect(events, hasLength(1)); + }); + }); + + test('increases delay after pause', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen(null); + sub.onData((event) { + events.add(event); + sub.pause(); + }); + + source.add(UpdateNotification({'a'})); + control.elapse(timeout); + expect(events, hasLength(1)); + + // Assume the stream stays paused for the timeout window that would + // be created after emitting the notification. + control.elapse(timeout * 2); + source.add(UpdateNotification({'b'})); + control.elapse(timeout * 2); + + // A full timeout needs to pass after resuming before a new item is + // emitted. + sub.resume(); + expect(events, hasLength(1)); + + control.elapse(halfTimeout); + expect(events, hasLength(1)); + control.elapse(halfTimeout); + expect(events, hasLength(2)); + }); + }); + + test('does not introduce artificial delay in pause', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + // Await the initial delay + control.elapse(timeout); + + sub.pause(); + source.add(UpdateNotification({'a'})); + // Resuming should not introduce a timeout window because no window + // was active when the stream was paused. + sub.resume(); + control.flushMicrotasks(); + expect(events, hasLength(1)); + }); + }); + + test('merges events', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + source.add(UpdateNotification({'b'})); + control.elapse(halfTimeout); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + }); + }); + + test('forwards cancellations', () { + fakeAsync((control) { + var cancelled = false; + final source = StreamController(sync: true) + ..onCancel = () => cancelled = true; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen((_) => fail('unexpected event'), + onDone: () => fail('unexpected done')); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + + sub.cancel(); + control.flushTimers(); + + expect(cancelled, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + + test('closes when source closes', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + source + // These two are combined due to throttleFirst + ..add(UpdateNotification({'a'})) + ..add(UpdateNotification({'b'})) + ..close(); + + control.flushTimers(); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + + test('closes when source closes after delay', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + control.elapse(const Duration(hours: 1)); + source.close(); + + control.flushTimers(); + expect(events, isEmpty); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + }); + + test('filter tables', () async { + final source = StreamController(sync: true); + final events = []; + final subscription = UpdateNotification.filterTablesTransformer(['a']) + .bind(source.stream) + .listen(events.add); + + source.add(UpdateNotification({'a', 'b'})); + expect(events, hasLength(1)); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); + + await subscription.cancel(); + expect(source.hasListener, isFalse); + }); + }); +} diff --git a/packages/sqlite_async/test/utils/abstract_test_utils.dart b/packages/sqlite_async/test/utils/abstract_test_utils.dart index f1ec6ea..b388c4d 100644 --- a/packages/sqlite_async/test/utils/abstract_test_utils.dart +++ b/packages/sqlite_async/test/utils/abstract_test_utils.dart @@ -1,22 +1,19 @@ +import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; -import 'package:test_api/src/backend/invoker.dart'; class TestDefaultSqliteOpenFactory extends DefaultSqliteOpenFactory { final String sqlitePath; TestDefaultSqliteOpenFactory( {required super.path, super.sqliteOptions, this.sqlitePath = ''}); + + Future openDatabaseForSingleConnection() async { + return openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + } } abstract class AbstractTestUtils { - String dbPath() { - final test = Invoker.current!.liveTest; - var testName = test.test.name; - var testShortName = - testName.replaceAll(RegExp(r'[\s\./]'), '_').toLowerCase(); - var dbName = "test-db/$testShortName.db"; - return dbName; - } + String dbPath(); /// Generates a test open factory Future testFactory( diff --git a/packages/sqlite_async/test/utils/native_test_utils.dart b/packages/sqlite_async/test/utils/native_test_utils.dart index e23bb65..83dea18 100644 --- a/packages/sqlite_async/test/utils/native_test_utils.dart +++ b/packages/sqlite_async/test/utils/native_test_utils.dart @@ -5,9 +5,11 @@ import 'dart:isolate'; import 'package:glob/glob.dart'; import 'package:glob/list_local_fs.dart'; +import 'package:sqlite_async/sqlite3.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite3/open.dart' as sqlite_open; +import 'package:test_descriptor/test_descriptor.dart' as d; import 'abstract_test_utils.dart'; @@ -20,11 +22,25 @@ class TestSqliteOpenFactory extends TestDefaultSqliteOpenFactory { super.sqlitePath = defaultSqlitePath, initStatements}); - @override - CommonDatabase open(SqliteOpenOptions options) { + void _applyOpenOverrides() { sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.linux, () { return DynamicLibrary.open(sqlitePath); }); + + sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { + // Prefer using Homebrew's SQLite which allows loading extensions. + const fromHomebrew = '/opt/homebrew/opt/sqlite/lib/libsqlite3.dylib'; + if (File(fromHomebrew).existsSync()) { + return DynamicLibrary.open(fromHomebrew); + } + + return DynamicLibrary.open('libsqlite3.dylib'); + }); + } + + @override + CommonDatabase open(SqliteOpenOptions options) { + _applyOpenOverrides(); final db = super.open(options); db.createFunction( @@ -47,13 +63,18 @@ class TestSqliteOpenFactory extends TestDefaultSqliteOpenFactory { return db; } + + @override + Future openDatabaseForSingleConnection() async { + _applyOpenOverrides(); + return sqlite3.openInMemory(); + } } class TestUtils extends AbstractTestUtils { @override String dbPath() { - Directory("test-db").createSync(recursive: false); - return super.dbPath(); + return d.path('test.db'); } @override diff --git a/packages/sqlite_async/test/utils/stub_test_utils.dart b/packages/sqlite_async/test/utils/stub_test_utils.dart index 5e3a953..852009f 100644 --- a/packages/sqlite_async/test/utils/stub_test_utils.dart +++ b/packages/sqlite_async/test/utils/stub_test_utils.dart @@ -1,6 +1,11 @@ import 'abstract_test_utils.dart'; class TestUtils extends AbstractTestUtils { + @override + String dbPath() { + throw UnimplementedError(); + } + @override Future cleanDb({required String path}) { throw UnimplementedError(); diff --git a/packages/sqlite_async/test/utils/web_test_utils.dart b/packages/sqlite_async/test/utils/web_test_utils.dart index 32b7e05..33f7d64 100644 --- a/packages/sqlite_async/test/utils/web_test_utils.dart +++ b/packages/sqlite_async/test/utils/web_test_utils.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:js_interop'; +import 'dart:math'; +import 'package:sqlite_async/sqlite3_wasm.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import 'package:web/web.dart' show Blob, BlobPart, BlobPropertyBag; @@ -9,6 +11,22 @@ import 'abstract_test_utils.dart'; @JS('URL.createObjectURL') external String _createObjectURL(Blob blob); +String? _dbPath; + +class TestSqliteOpenFactory extends TestDefaultSqliteOpenFactory { + TestSqliteOpenFactory( + {required super.path, super.sqliteOptions, super.sqlitePath = ''}); + + @override + Future openDatabaseForSingleConnection() async { + final sqlite = await WasmSqlite3.loadFromUrl( + Uri.parse(sqliteOptions.webSqliteOptions.wasmUri)); + sqlite.registerVirtualFileSystem(InMemoryFileSystem(), makeDefault: true); + + return sqlite.openInMemory(); + } +} + class TestUtils extends AbstractTestUtils { late Future _isInitialized; late final SqliteOptions webOptions; @@ -19,7 +37,7 @@ class TestUtils extends AbstractTestUtils { Future _init() async { final channel = spawnHybridUri('/test/server/worker_server.dart'); - final port = await channel.stream.first as int; + final port = (await channel.stream.first as num).toInt(); final sqliteWasmUri = 'http://localhost:$port/sqlite3.wasm'; // Cross origin workers are not supported, but we can supply a Blob var sqliteUri = 'http://localhost:$port/db_worker.js'; @@ -33,18 +51,36 @@ class TestUtils extends AbstractTestUtils { wasmUri: sqliteWasmUri.toString(), workerUri: sqliteUri)); } + @override + String dbPath() { + if (_dbPath case final path?) { + return path; + } + + final created = _dbPath = 'test-db/${Random().nextInt(1 << 31)}/test.db'; + addTearDown(() { + // Pick a new path for the next test. + _dbPath = null; + }); + + return created; + } + @override Future cleanDb({required String path}) async {} @override Future testFactory( {String? path, - String? sqlitePath, + String sqlitePath = '', List initStatements = const [], SqliteOptions options = const SqliteOptions.defaults()}) async { await _isInitialized; - return super.testFactory( - path: path, options: webOptions, initStatements: initStatements); + return TestSqliteOpenFactory( + path: path ?? dbPath(), + sqlitePath: sqlitePath, + sqliteOptions: webOptions, + ); } @override diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index 7e74790..dc28add 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -113,8 +113,10 @@ void main() { lastCount = count; } - // The number of read queries must not be greater than the number of writes overall. - expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count'])); + // The number of read queries must not be greater than the number of + // writes overall, plus one for the initial stream emission. + expect(numberOfQueries, + lessThanOrEqualTo(results.last.first['count'] + 1)); DateTime? lastTime; for (var r in times) { @@ -283,7 +285,7 @@ void main() { }); await Future.delayed(delay); - subscription.cancel(); + await subscription.cancel(); expect( counts, diff --git a/packages/sqlite_async/test/web/watch_test.dart b/packages/sqlite_async/test/web/watch_test.dart index 50bbae0..c6757b9 100644 --- a/packages/sqlite_async/test/web/watch_test.dart +++ b/packages/sqlite_async/test/web/watch_test.dart @@ -1,4 +1,6 @@ @TestOn('browser') +library; + import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; diff --git a/scripts/sqlite3_wasm_download.dart b/scripts/sqlite3_wasm_download.dart index 28d91b4..62acbbe 100644 --- a/scripts/sqlite3_wasm_download.dart +++ b/scripts/sqlite3_wasm_download.dart @@ -1,4 +1,6 @@ /// Downloads sqlite3.wasm +library; + import 'dart:io'; final sqliteUrl =