|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// class _AsBroadcastStream<T> : Stream<T> {
|
|
|
|
// readonly Stream<T> _source;
|
|
|
|
// readonly _stream._BroadcastCallback<T> _onListenHandler;
|
|
|
|
// readonly _stream._BroadcastCallback<T> _onCancelHandler;
|
|
|
|
// readonly Zone _zone;
|
|
|
|
//
|
|
|
|
// _AsBroadcastStreamController<T> _controller;
|
|
|
|
// StreamSubscription<T> _subscription;
|
|
|
|
//
|
|
|
|
// _AsBroadcastStream(
|
|
|
|
// this._source,
|
|
|
|
// void onListenHandler(StreamSubscription<T> subscription),
|
|
|
|
// void onCancelHandler(StreamSubscription<T> subscription))
|
|
|
|
// // TODO(floitsch): the return type should be void and should be
|
|
|
|
// // inferred.
|
|
|
|
// : _onListenHandler = Zone.current
|
|
|
|
// .registerUnaryCallback<dynamic, StreamSubscription<T>>(
|
|
|
|
// onListenHandler),
|
|
|
|
// _onCancelHandler = Zone.current
|
|
|
|
// .registerUnaryCallback<dynamic, StreamSubscription<T>>(
|
|
|
|
// onCancelHandler),
|
|
|
|
// _zone = Zone.current {
|
|
|
|
// _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// bool get isBroadcast => true;
|
|
|
|
//
|
|
|
|
// StreamSubscription<T> listen(void onData(T data),
|
|
|
|
// {Function onError, void onDone(), bool cancelOnError}) {
|
|
|
|
// if (_controller == null || _controller.isClosed) {
|
|
|
|
// // Return a dummy subscription backed by nothing, since
|
|
|
|
// // it will only ever send one done event.
|
|
|
|
// return new _DoneStreamSubscription<T>(onDone);
|
|
|
|
// }
|
|
|
|
// _subscription ??= _source.listen(_controller.add,
|
|
|
|
// onError: _controller.addError, onDone: _controller.close);
|
|
|
|
// cancelOnError = identical(true, cancelOnError);
|
|
|
|
// return _controller._subscribe(onData, onError, onDone, cancelOnError);
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// void _onCancel() {
|
|
|
|
// bool shutdown = (_controller == null) || _controller.isClosed;
|
|
|
|
// if (_onCancelHandler != null) {
|
|
|
|
// _zone.runUnary(
|
|
|
|
// _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
|
|
|
// }
|
|
|
|
// if (shutdown) {
|
|
|
|
// if (_subscription != null) {
|
|
|
|
// _subscription.cancel();
|
|
|
|
// _subscription = null;
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// void _onListen() {
|
|
|
|
// if (_onListenHandler != null) {
|
|
|
|
// _zone.runUnary(
|
|
|
|
// _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// // Methods called from _BroadcastSubscriptionWrapper.
|
|
|
|
// void _cancelSubscription() {
|
|
|
|
// if (_subscription == null) return;
|
|
|
|
// // Called by [_controller] when it has no subscribers left.
|
|
|
|
// StreamSubscription subscription = _subscription;
|
|
|
|
// _subscription = null;
|
|
|
|
// _controller = null; // Marks the stream as no longer listenable.
|
|
|
|
// subscription.cancel();
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// void _pauseSubscription(Future resumeSignal) {
|
|
|
|
// if (_subscription == null) return;
|
|
|
|
// _subscription.pause(resumeSignal);
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// void _resumeSubscription() {
|
|
|
|
// if (_subscription == null) return;
|
|
|
|
// _subscription.resume();
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// bool get _isSubscriptionPaused {
|
|
|
|
// if (_subscription == null) return false;
|
|
|
|
// return _subscription.isPaused;
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
class _AsBroadcastStream<T> : Stream<T> { |
|
|
|
readonly Stream<T> _source; |
|
|
|
readonly _stream._BroadcastCallback<T> _onListenHandler; |
|
|
|
readonly _stream._BroadcastCallback<T> _onCancelHandler; |
|
|
|
readonly Zone _zone; |
|
|
|
|
|
|
|
_AsBroadcastStreamController<T> _controller; |
|
|
|
StreamSubscription<T> _subscription; |
|
|
|
|
|
|
|
internal _AsBroadcastStream( |
|
|
|
Stream<T> _source, |
|
|
|
Action<StreamSubscription<T>> onListenHandler, |
|
|
|
|
|
|
|
Action<StreamSubscription<T>> onCancelHandler) |
|
|
|
// TODO(floitsch): the return type should be void and should be
|
|
|
|
// inferred.
|
|
|
|
{ |
|
|
|
this._source = _source; |
|
|
|
_onListenHandler = a=>Zone.current |
|
|
|
.registerUnaryCallback( |
|
|
|
b => { |
|
|
|
onListenHandler((StreamSubscription<T>) b); |
|
|
|
return default; |
|
|
|
} |
|
|
|
)(a); |
|
|
|
_onCancelHandler = d=> Zone.current |
|
|
|
.registerUnaryCallback( |
|
|
|
c=> { |
|
|
|
onCancelHandler((StreamSubscription<T>) c); |
|
|
|
return default; |
|
|
|
})(d); |
|
|
|
_zone = Zone.current; |
|
|
|
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
|
|
|
} |
|
|
|
|
|
|
|
bool isBroadcast { |
|
|
|
get { return true; } |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) { |
|
|
|
|
|
|
|
if (_controller == null || _controller.isClosed) { |
|
|
|
// Return a dummy subscription backed by nothing, since
|
|
|
|
// it will only ever send one done event.
|
|
|
|
return new _DoneStreamSubscription<T>(()=>onDone()); |
|
|
|
} |
|
|
|
_subscription ??= _source.listen(_controller.add, |
|
|
|
onError: _controller.addError, onDone: ()=>_controller.close()); |
|
|
|
cancelOnError = Equals(true, cancelOnError); |
|
|
|
return _controller._subscribe(onData, onError, onDone, cancelOnError); |
|
|
|
} |
|
|
|
|
|
|
|
void _onCancel() { |
|
|
|
bool shutdown = (_controller == null) || _controller.isClosed; |
|
|
|
if (_onCancelHandler != null) { |
|
|
|
_zone.runUnary( |
|
|
|
a=> { |
|
|
|
_onCancelHandler((StreamSubscription<T>) a); |
|
|
|
return default; |
|
|
|
}, new _BroadcastSubscriptionWrapper<T>(this)); |
|
|
|
} |
|
|
|
if (shutdown) { |
|
|
|
if (_subscription != null) { |
|
|
|
_subscription.cancel(); |
|
|
|
_subscription = null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void _onListen() { |
|
|
|
if (_onListenHandler != null) { |
|
|
|
_zone.runUnary( |
|
|
|
a => { |
|
|
|
_onListenHandler((StreamSubscription<T>) a); |
|
|
|
return default; |
|
|
|
}, new _BroadcastSubscriptionWrapper<T>(this)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Methods called from _BroadcastSubscriptionWrapper.
|
|
|
|
void _cancelSubscription() { |
|
|
|
if (_subscription == null) return; |
|
|
|
// Called by [_controller] when it has no subscribers left.
|
|
|
|
StreamSubscription<T> subscription = _subscription; |
|
|
|
_subscription = null; |
|
|
|
_controller = null; // Marks the stream as no longer listenable.
|
|
|
|
subscription.cancel(); |
|
|
|
} |
|
|
|
|
|
|
|
void _pauseSubscription(Future resumeSignal) { |
|
|
|
if (_subscription == null) return; |
|
|
|
_subscription.pause(resumeSignal); |
|
|
|
} |
|
|
|
|
|
|
|
void _resumeSubscription() { |
|
|
|
if (_subscription == null) return; |
|
|
|
_subscription.resume(); |
|
|
|
} |
|
|
|
|
|
|
|
bool _isSubscriptionPaused{get { |
|
|
|
if (_subscription == null) return false; |
|
|
|
return _subscription.isPaused; |
|
|
|
}} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
internal class _StreamIterator<T> : StreamIterator<T> { |
|
|
|