Skip to content

Commit

Permalink
fix(subject): Subject.stream now returns a read-only Stream, fix …
Browse files Browse the repository at this point in the history
…`StateSubject.addStream`, add more tests (hoc081098#79)

* readonly statestream

* format

* refactor StateSubject

* add tests

* refactor(not_replay_value_stream): `ValueSubject.stream` returns a read-only `NotReplayValueStream`

* tests

* tests

* docs(not_replay_value_stream): ValueSubject
  • Loading branch information
hoc081098 authored Nov 18, 2022
1 parent 19de39e commit d3df59a
Show file tree
Hide file tree
Showing 9 changed files with 785 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import 'stream_event.dart';
@internal
mixin NotReplayValueStreamMixin<T> implements NotReplayValueStream<T> {
/// Keep latest state.
/// **DO NOT USE THIS METHOD**
@visibleForOverriding
@internal
StreamEvent<T> get event;

@nonVirtual
@override
Object get error {
final errorAndSt = event.errorAndStackTrace;
Expand All @@ -21,21 +23,27 @@ mixin NotReplayValueStreamMixin<T> implements NotReplayValueStream<T> {
throw ValueStreamError.hasNoError();
}

@nonVirtual
@override
Object? get errorOrNull => event.errorAndStackTrace?.error;

@nonVirtual
@override
bool get hasError => event.errorAndStackTrace != null;

@nonVirtual
@override
StackTrace? get stackTrace => event.errorAndStackTrace?.stackTrace;

@nonVirtual
@override
T get value => event.value;

@nonVirtual
@override
T get valueOrNull => event.value;

@nonVirtual
@override
bool get hasValue => true;
}
3 changes: 2 additions & 1 deletion lib/src/not_replay_value_stream/value_stream_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart' show ErrorAndStackTrace;

import '../utils/stream_sink_wrapper.dart';
import 'not_replay_value_stream.dart';
import 'not_replay_value_stream_mixin.dart';
import 'stream_event.dart';
Expand Down Expand Up @@ -118,7 +119,7 @@ class ValueStreamController<T> implements StreamController<T> {
bool get isPaused => _delegate.isPaused;

@override
StreamSink<T> get sink => _delegate.sink;
StreamSink<T> get sink => StreamSinkWrapper(this);

/// The stream that this controller is controlling.
/// It is a single-subscription [NotReplayValueStream].
Expand Down
45 changes: 44 additions & 1 deletion lib/src/not_replay_value_stream/value_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import 'stream_event.dart';
///
/// [ValueSubject] is the same as [PublishSubject], with the ability to capture
/// the latest item has been added to the controller.
/// This [ValueSubject] always has the value, ie. [hasValue] is always true.
///
/// [ValueSubject] is, by default, a broadcast (aka hot) controller, in order
/// to fulfill the Rx Subject contract. This means the Subject's `stream` can
Expand Down Expand Up @@ -74,9 +75,51 @@ class ValueSubject<T> extends Subject<T>
_dataOrError.onError(ErrorAndStackTrace(error, stackTrace));

@override
NotReplayValueStream<T> get stream => this;
NotReplayValueStream<T> get stream => _ValueSubjectStream(this);

@internal
@override
StreamEvent<T> get event => _dataOrError;
}

class _ValueSubjectStream<T> extends Stream<T>
with NotReplayValueStreamMixin<T>
implements NotReplayValueStream<T> {
final ValueSubject<T> _subject;

_ValueSubjectStream(this._subject);

@override
bool get isBroadcast => true;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@override
int get hashCode => _subject.hashCode ^ 0x35323532;

@override
bool operator ==(Object other) {
if (identical(this, other)) return true;
return other is _ValueSubjectStream && identical(other._subject, _subject);
}

@override
StreamEvent<T> get event => _subject.event;

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
6 changes: 6 additions & 0 deletions lib/src/state_stream/state_stream_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,27 @@ import 'state_stream.dart';
/// This mixin implements all [StateStream] members except [StateStream.value].
@internal
mixin StateStreamMixin<T> implements StateStream<T> {
@nonVirtual
@override
Never get error => throw ValueStreamError.hasNoError();

@nonVirtual
@override
Null get errorOrNull => null;

@nonVirtual
@override
bool get hasError => false;

@nonVirtual
@override
Null get stackTrace => null;

@nonVirtual
@override
bool get hasValue => true;

@nonVirtual
@override
T get valueOrNull => value;
}
113 changes: 98 additions & 15 deletions lib/src/state_stream/state_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart'
show PublishSubject, Subject, BehaviorSubject;

import '../not_replay_value_stream/not_replay_value_stream.dart';
import '../not_replay_value_stream/value_subject.dart';
import 'state_stream.dart';
import 'state_stream_mixin.dart';
import '../not_replay_value_stream/not_replay_value_stream.dart';

/// A special [Subject] / [StreamController] that captures the latest item that has been
/// added to the controller.
Expand Down Expand Up @@ -53,15 +53,18 @@ import '../not_replay_value_stream/not_replay_value_stream.dart';
class StateSubject<T> extends Subject<T>
with StateStreamMixin<T>
implements StateStream<T> {
final ValueSubject<T> _subject;
T _value;
bool _isAddingStreamItems = false;
final StreamController<T> _controller;

@override
final Equality<T> equals;

StateSubject._(
this._value,
this.equals,
this._subject,
) : super(_subject, _subject.stream);
this._controller,
) : super(_controller, _controller.stream);

/// Constructs a [StateSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
Expand All @@ -77,25 +80,46 @@ class StateSubject<T> extends Subject<T>
FutureOr<void> Function()? onCancel,
bool sync = false,
}) {
final subject = ValueSubject<T>(
seedValue,
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
return StateSubject._(equals ?? StateStream.defaultEquality, subject);
return StateSubject._(
seedValue,
equals ?? StateStream.defaultEquality,
controller,
);
}

@nonVirtual
@override
void add(T event) {
if (!equals(value, event)) {
_subject.add(event);
if (_isAddingStreamItems) {
throw StateError(
'You cannot add items while items are being added from addStream');
}

_addInternal(event);
}

@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _addInternal(T event) {
if (!equals(_value, event)) {
_value = event;
_controller.add(event);
}
}

@override
Future<void> close() => _subject.close();
Future<void> close() {
if (_isAddingStreamItems) {
throw StateError(
'You cannot close the subject while items are being added from addStream');
}
return _controller.close();
}

/// Cannot send an error to this subject.
/// **Always throws** an [UnsupportedError].
Expand All @@ -105,21 +129,80 @@ class StateSubject<T> extends Subject<T>

@override
Future<void> addStream(Stream<T> source, {bool? cancelOnError}) {
final completer = Completer<void>.sync();
if (_isAddingStreamItems) {
throw StateError(
'You cannot add items while items are being added from addStream');
}
_isAddingStreamItems = true;

final completer = Completer<void>();
void complete() {
if (!completer.isCompleted) {
_isAddingStreamItems = false;
completer.complete();
}
}

source.listen(
add,
_addInternal,
onError: addError,
onDone: completer.complete,
onDone: complete,
cancelOnError: cancelOnError,
);

return completer.future;
}

@override
StateStream<T> get stream => this;
StateStream<T> get stream => _StateSubjectStream<T>(this);

@override
T get value => _subject.value;
T get value => _value;

set value(T newValue) => add(newValue);
}

class _StateSubjectStream<T> extends Stream<T>
with StateStreamMixin<T>
implements StateStream<T> {
final StateSubject<T> _subject;

_StateSubjectStream(this._subject);

@override
bool get isBroadcast => true;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@override
int get hashCode => _subject.hashCode ^ 0x35323532;

@override
bool operator ==(Object other) {
if (identical(this, other)) return true;
return other is _StateSubjectStream && identical(other._subject, _subject);
}

@override
Equality<T> get equals => _subject.equals;

@override
T get value => _subject.value;

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
28 changes: 28 additions & 0 deletions lib/src/utils/stream_sink_wrapper.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import 'dart:async';

import 'package:meta/meta.dart';

/// A class that exposes only the [StreamSink] interface of an object.
@internal
class StreamSinkWrapper<T> implements StreamSink<T> {
final StreamSink<T> _target;

/// Creates a [StreamSinkWrapper] that wraps [target].
StreamSinkWrapper(this._target);

@override
void add(T data) => _target.add(data);

@override
void addError(Object error, [StackTrace? stackTrace]) =>
_target.addError(error, stackTrace);

@override
Future<dynamic> close() => _target.close();

@override
Future<dynamic> addStream(Stream<T> source) => _target.addStream(source);

@override
Future<dynamic> get done => _target.done;
}
29 changes: 22 additions & 7 deletions test/not_replay_value_stream/value_controller_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ void main() {
expect(valueController.isPaused, false);
verify(mockController.isPaused).called(1);
});

test('sink', () {
final sink = StreamController<int>().sink;
when(mockController.sink).thenReturn(sink);
expect(valueController.sink, sink);
verify(mockController.sink).called(1);
});
});

group('stream', () {
Expand Down Expand Up @@ -303,6 +296,28 @@ void main() {
subscription.resume();
}
});

test(
'adding to sink has same behavior as adding to ValueStreamController itself',
() async {
final controller = ValueStreamController<int>(0);

scheduleMicrotask(() {
controller.sink.add(1);
expect(controller.stream.value, 1);

controller.sink.add(2);
expect(controller.stream.value, 2);

controller.sink.add(3);
expect(controller.stream.value, 3);

controller.sink.close();
});

await expectLater(
controller.stream, emitsInOrder(<dynamic>[1, 2, 3, emitsDone]));
});
});
});

Expand Down
Loading

0 comments on commit d3df59a

Please sign in to comment.