diff --git a/lib/src/realtime_mixin.dart b/lib/src/realtime_mixin.dart index 84437699..ec3727aa 100644 --- a/lib/src/realtime_mixin.dart +++ b/lib/src/realtime_mixin.dart @@ -2,7 +2,7 @@ import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; -import 'package:web_socket_channel/status.dart'; +import 'package:web_socket_channel/status.dart' as status; import 'exception.dart'; import 'realtime_subscription.dart'; import 'client.dart'; @@ -15,15 +15,20 @@ typedef GetFallbackCookie = String? Function(); mixin RealtimeMixin { late Client client; - final Map>> _channels = {}; + final Set _channels = {}; WebSocketChannel? _websok; String? _lastUrl; late WebSocketFactory getWebSocket; GetFallbackCookie? getFallbackCookie; int? get closeCode => _websok?.closeCode; + int _subscriptionsCounter = 0; + Map _subscriptions = {}; + bool _notifyDone = true; + StreamSubscription? _websocketSubscription; Future _closeConnection() async { - await _websok?.sink.close(normalClosure); + await _websocketSubscription?.cancel(); + await _websok?.sink.close(status.normalClosure, 'Ending session'); _lastUrl = null; } @@ -36,14 +41,16 @@ mixin RealtimeMixin { if (_lastUrl == uri.toString() && _websok?.closeCode == null) { return; } + _notifyDone = false; await _closeConnection(); _lastUrl = uri.toString(); _websok = await getWebSocket(uri); + _notifyDone = true; } debugPrint('subscription: $_lastUrl'); try { - _websok?.stream.listen((response) { + _websocketSubscription = _websok?.stream.listen((response) { final data = RealtimeResponse.fromJson(response); switch (data.type) { case 'error': @@ -67,28 +74,25 @@ mixin RealtimeMixin { break; case 'event': final message = RealtimeMessage.fromMap(data.data); - for(var channel in message.channels) { - if (_channels[channel] != null) { - for( var stream in _channels[channel]!) { - stream.sink.add(message); + for (var subscription in _subscriptions.values) { + for (var channel in message.channels) { + if (subscription.channels.contains(channel)) { + subscription.controller.add(message); } } } break; } }, onDone: () { - for (var list in _channels.values) { - for (var stream in list) { - stream.close(); - } + if (!_notifyDone) return; + for (var subscription in _subscriptions.values) { + subscription.close(); } _channels.clear(); _closeConnection(); }, onError: (err, stack) { - for (var list in _channels.values) { - for (var stream in list) { - stream.sink.addError(err, stack); - } + for (var subscription in _subscriptions.values) { + subscription.controller.addError(err, stack); } if (_websok?.closeCode != null && _websok?.closeCode != 1008) { debugPrint("Reconnecting in one second."); @@ -118,7 +122,7 @@ mixin RealtimeMixin { port: uri.port, queryParameters: { "project": client.config['project'], - "channels[]": _channels.keys.toList(), + "channels[]": _channels.toList(), }, path: uri.path + "/realtime", ); @@ -126,32 +130,38 @@ mixin RealtimeMixin { RealtimeSubscription subscribeTo(List channels) { StreamController controller = StreamController.broadcast(); - for(var channel in channels) { - if (!_channels.containsKey(channel)) { - _channels[channel] = []; - } - _channels[channel]!.add(controller); - } + _channels.addAll(channels); Future.delayed(Duration.zero, () => _createSocket()); + int id = DateTime.now().microsecondsSinceEpoch; RealtimeSubscription subscription = RealtimeSubscription( - stream: controller.stream, + controller: controller, + channels: channels, close: () async { + _subscriptions.remove(id); + _subscriptionsCounter--; controller.close(); - for(var channel in channels) { - _channels[channel]!.remove(controller); - if (_channels[channel]!.isEmpty) { - _channels.remove(channel); - } - } - if(_channels.isNotEmpty) { + _cleanup(channels); + + if (_channels.isNotEmpty) { await Future.delayed(Duration.zero, () => _createSocket()); } else { await _closeConnection(); } }); + _subscriptions[id] = subscription; return subscription; } + void _cleanup(List channels) { + for (var channel in channels) { + bool found = _subscriptions.values + .any((subscription) => subscription.channels.contains(channel)); + if (!found) { + _channels.remove(channel); + } + } + } + void handleError(RealtimeResponse response) { if (response.data['code'] == 1008) { throw AppwriteException(response.data["message"], response.data["code"]); diff --git a/lib/src/realtime_subscription.dart b/lib/src/realtime_subscription.dart index e45d3b41..17076916 100644 --- a/lib/src/realtime_subscription.dart +++ b/lib/src/realtime_subscription.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'realtime_message.dart'; /// Realtime Subscription @@ -5,9 +7,16 @@ class RealtimeSubscription { /// Stream of [RealtimeMessage]s final Stream stream; + final StreamController controller; + + /// List of channels + List channels; + /// Closes the subscription final Future Function() close; /// Initializes a [RealtimeSubscription] - RealtimeSubscription({required this.stream, required this.close}); + RealtimeSubscription( + {required this.close, required this.channels, required this.controller}) + : stream = controller.stream; } diff --git a/test/src/realtime_subscription_test.dart b/test/src/realtime_subscription_test.dart index 346e5167..34bf159c 100644 --- a/test/src/realtime_subscription_test.dart +++ b/test/src/realtime_subscription_test.dart @@ -1,20 +1,20 @@ -import 'package:mockito/mockito.dart'; -import 'package:appwrite/src/realtime_message.dart'; +mport 'package:appwrite/src/realtime_message.dart'; import 'package:appwrite/src/realtime_subscription.dart'; import 'package:flutter_test/flutter_test.dart'; - -class MockStream extends Mock implements Stream {} - - +import 'dart:async'; void main() { group('RealtimeSubscription', () { - final mockStream = MockStream(); + final mockStream = StreamController.broadcast(); final mockCloseFunction = () async {}; - final subscription = RealtimeSubscription(stream: mockStream, close: mockCloseFunction); + final subscription = RealtimeSubscription( + controller: mockStream, + close: mockCloseFunction, + channels: ['documents']); test('should have the correct stream and close function', () { - expect(subscription.stream, equals(mockStream)); + expect(subscription.controller, equals(mockStream)); + expect(subscription.stream, equals(mockStream.stream)); expect(subscription.close, equals(mockCloseFunction)); }); });