diff --git a/lib/src/realtime_mixin.dart b/lib/src/realtime_mixin.dart index da2b3b75..9f6cc45c 100644 --- a/lib/src/realtime_mixin.dart +++ b/lib/src/realtime_mixin.dart @@ -67,15 +67,32 @@ mixin RealtimeMixin { break; case 'event': final message = RealtimeMessage.fromMap(data.data); - for(var channel in message.channels) { + for (var channel in message.channels) { if (_channels[channel] != null) { - for( var stream in _channels[channel]!) { + for (var stream in _channels[channel]!) { stream.sink.add(message); } } } break; } + }, onDone: () { + for (var list in _channels.values) { + for (var stream in list) { + stream.close(); + } + } + _closeConnection(); + }, onError: (err, stack) { + for (var list in _channels.values) { + for (var stream in list) { + stream.sink.addError(err, stack); + } + } + if (_websok?.closeCode != null && _websok?.closeCode != 1008) { + debugPrint("Reconnecting in one second."); + Future.delayed(Duration(seconds: 1), _createSocket); + } }); } catch (e) { if (e is AppwriteException) { @@ -108,7 +125,7 @@ mixin RealtimeMixin { RealtimeSubscription subscribeTo(List channels) { StreamController controller = StreamController.broadcast(); - for(var channel in channels) { + for (var channel in channels) { if (!_channels.containsKey(channel)) { _channels[channel] = []; } @@ -119,13 +136,13 @@ mixin RealtimeMixin { stream: controller.stream, close: () async { controller.close(); - for(var channel in channels) { + for (var channel in channels) { _channels[channel]!.remove(controller); if (_channels[channel]!.isEmpty) { _channels.remove(channel); } } - if(_channels.isNotEmpty) { + if (_channels.isNotEmpty) { await Future.delayed(Duration.zero, () => _createSocket()); } else { await _closeConnection();