您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
755 行
26 KiB
755 行
26 KiB
using System;
|
|
using Unity.UIWidgets.foundation;
|
|
|
|
namespace Unity.UIWidgets.async {
|
|
static partial class _stream {
|
|
public delegate void ControllerCallback();
|
|
|
|
public delegate Future ControllerCancelCallback();
|
|
|
|
public delegate void _NotificationHandler();
|
|
|
|
|
|
public static void _runGuarded(_NotificationHandler notificationHandler) {
|
|
if (notificationHandler == null) return;
|
|
try {
|
|
notificationHandler();
|
|
}
|
|
catch (Exception e) {
|
|
Zone.current.handleUncaughtError(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
public interface IStreamController<T> {
|
|
Stream<T> stream { get; }
|
|
|
|
_stream.ControllerCallback onListen { get; set; }
|
|
|
|
// void onListen(void onListenHandler());
|
|
|
|
_stream.ControllerCallback onPause { get; set; }
|
|
|
|
// void set onPause(void onPauseHandler());
|
|
|
|
_stream.ControllerCallback onResume { get; set; }
|
|
|
|
// void set onResume(void onResumeHandler());
|
|
|
|
_stream.ControllerCancelCallback onCancel { get; set; }
|
|
|
|
// void set onCancel(onCancelHandler());
|
|
|
|
StreamSink<T> sink { get; }
|
|
|
|
bool isClosed { get; }
|
|
|
|
bool isPaused { get; }
|
|
|
|
/** Whether there is a subscriber on the [Stream]. */
|
|
bool hasListener { get; }
|
|
|
|
// public abstract void add(T evt);
|
|
//
|
|
// public abstract void addError(object error, string stackTrace);
|
|
|
|
Future close();
|
|
|
|
Future addStream(Stream<T> source, bool? cancelOnError = false);
|
|
|
|
void add(T evt);
|
|
void addError(object error, string stackTrace);
|
|
|
|
Future done { get; }
|
|
}
|
|
|
|
public abstract class StreamController<T> : StreamSink<T>, IStreamController<T> {
|
|
/** The stream that this controller is controlling. */
|
|
public virtual Stream<T> stream { get; }
|
|
|
|
public static StreamController<T> create(
|
|
_stream.ControllerCallback onListen = null,
|
|
_stream.ControllerCallback onPause = null,
|
|
_stream.ControllerCallback onResume = null,
|
|
_stream.ControllerCancelCallback onCancel = null,
|
|
// Action onListen = null,
|
|
// Action onPause = null,
|
|
// Action onResume = null,
|
|
// Action onCancel = null,
|
|
bool sync = false) {
|
|
return sync
|
|
? (StreamController<T>) new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
|
|
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
|
|
}
|
|
|
|
public static StreamController<T> broadcast(
|
|
Action onListen = null, Action onCancel = null, bool sync = false) {
|
|
return sync
|
|
? (StreamController<T>) new _SyncBroadcastStreamController<T>(() => onListen?.Invoke(), onCancel)
|
|
: new _AsyncBroadcastStreamController<T>(() => onListen?.Invoke(), () => {
|
|
onCancel?.Invoke();
|
|
return Future._nullFuture;
|
|
});
|
|
}
|
|
|
|
public virtual _stream.ControllerCallback onListen { get; set; }
|
|
|
|
// void onListen(void onListenHandler());
|
|
|
|
public virtual _stream.ControllerCallback onPause { get; set; }
|
|
|
|
// void set onPause(void onPauseHandler());
|
|
|
|
public virtual _stream.ControllerCallback onResume { get; set; }
|
|
|
|
// void set onResume(void onResumeHandler());
|
|
|
|
public virtual _stream.ControllerCancelCallback onCancel { get; set; }
|
|
|
|
// void set onCancel(onCancelHandler());
|
|
|
|
public virtual StreamSink<T> sink { get; }
|
|
|
|
public virtual bool isClosed { get; }
|
|
|
|
public virtual bool isPaused { get; }
|
|
|
|
/** Whether there is a subscriber on the [Stream]. */
|
|
public virtual bool hasListener { get; }
|
|
|
|
// public abstract void add(T evt);
|
|
//
|
|
// public abstract void addError(object error, string stackTrace);
|
|
|
|
public abstract override Future close();
|
|
|
|
public abstract Future addStream(Stream<T> source, bool? cancelOnError = false);
|
|
}
|
|
|
|
public interface SynchronousStreamController<T> {
|
|
//: StreamController<T> {
|
|
// public abstract void add(T data);
|
|
|
|
// public abstract void addError(object error, string stackTrace);
|
|
|
|
// public abstract Future close();
|
|
}
|
|
|
|
interface _StreamControllerLifecycle<T> {
|
|
StreamSubscription<T> _subscribe(
|
|
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError);
|
|
|
|
void _recordPause(StreamSubscription<T> subscription);
|
|
void _recordResume(StreamSubscription<T> subscription);
|
|
Future _recordCancel(StreamSubscription<T> subscription);
|
|
}
|
|
|
|
//
|
|
// // Base type for implementations of stream controllers.
|
|
abstract class _StreamControllerBase<T>
|
|
:
|
|
StreamController<T>,
|
|
_StreamControllerLifecycle<T>,
|
|
_EventSink<T>,
|
|
_EventDispatch<T> {
|
|
public abstract StreamSubscription<T> _subscribe(Action<T> onData, Action<object, string> onError,
|
|
Action onDone, bool cancelOnError);
|
|
|
|
|
|
public virtual void _recordPause(StreamSubscription<T> subscription) {
|
|
}
|
|
|
|
public virtual void _recordResume(StreamSubscription<T> subscription) {
|
|
}
|
|
|
|
public virtual Future _recordCancel(StreamSubscription<T> subscription) => null;
|
|
|
|
public abstract void _add(T data);
|
|
|
|
public abstract void _addError(object error, string stackTrace);
|
|
|
|
public abstract void _close();
|
|
|
|
public abstract void _sendData(T data);
|
|
|
|
public abstract void _sendError(object error, string stackTrace);
|
|
|
|
public abstract void _sendDone();
|
|
}
|
|
|
|
abstract class _StreamController<T> : _StreamControllerBase<T> {
|
|
/** The controller is in its initial state with no subscription. */
|
|
internal const int _STATE_INITIAL = 0;
|
|
|
|
internal const int _STATE_SUBSCRIBED = 1;
|
|
|
|
/** The subscription is canceled. */
|
|
internal const int _STATE_CANCELED = 2;
|
|
|
|
/** Mask for the subscription state. */
|
|
internal const int _STATE_SUBSCRIPTION_MASK = 3;
|
|
|
|
// The following state relate to the controller, not the subscription.
|
|
// If closed, adding more events is not allowed.
|
|
// If executing an [addStream], new events are not allowed either, but will
|
|
// be added by the stream.
|
|
|
|
internal const int _STATE_CLOSED = 4;
|
|
internal const int _STATE_ADDSTREAM = 8;
|
|
|
|
// @pragma("vm:entry-point")
|
|
object _varData;
|
|
|
|
/** Current state of the controller. */
|
|
// @pragma("vm:entry-point")
|
|
protected int _state = _STATE_INITIAL;
|
|
|
|
// TODO(lrn): Could this be stored in the varData field too, if it's not
|
|
// accessed until the call to "close"? Then we need to special case if it's
|
|
// accessed earlier, or if close is called before subscribing.
|
|
_Future _doneFuture;
|
|
|
|
public override _stream.ControllerCallback onListen { get; set; }
|
|
public override _stream.ControllerCallback onPause { get; set; }
|
|
public override _stream.ControllerCallback onResume { get; set; }
|
|
public override _stream.ControllerCancelCallback onCancel { get; set; }
|
|
|
|
internal _StreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
|
|
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) {
|
|
this.onListen = onListen;
|
|
this.onPause = onPause;
|
|
this.onResume = onResume;
|
|
this.onCancel = onCancel;
|
|
}
|
|
|
|
// Return a new stream every time. The streams are equal, but not identical.
|
|
public override Stream<T> stream {
|
|
get => new _ControllerStream<T>(this);
|
|
}
|
|
|
|
public override StreamSink<T> sink {
|
|
get => new _StreamSinkWrapper<T>(this);
|
|
}
|
|
|
|
bool _isCanceled {
|
|
get => (_state & _STATE_CANCELED) != 0;
|
|
}
|
|
|
|
/** Whether there is an active listener. */
|
|
public override bool hasListener {
|
|
get => (_state & _STATE_SUBSCRIBED) != 0;
|
|
}
|
|
|
|
/** Whether there has not been a listener yet. */
|
|
bool _isInitialState {
|
|
get =>
|
|
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
|
|
}
|
|
|
|
public override bool isClosed {
|
|
get => (_state & _STATE_CLOSED) != 0;
|
|
}
|
|
|
|
public override bool isPaused {
|
|
get =>
|
|
hasListener ? _subscription._isInputPaused : !_isCanceled;
|
|
}
|
|
|
|
bool _isAddingStream {
|
|
get => (_state & _STATE_ADDSTREAM) != 0;
|
|
}
|
|
|
|
/** New events may not be added after close, or during addStream. */
|
|
internal bool _mayAddEvent {
|
|
get => (_state < _STATE_CLOSED);
|
|
}
|
|
|
|
// Returns the pending events.
|
|
// Pending events are events added before a subscription exists.
|
|
// They are added to the subscription when it is created.
|
|
// Pending events, if any, are kept in the _varData field until the
|
|
// stream is listened to.
|
|
// While adding a stream, pending events are moved into the
|
|
// state object to allow the state object to use the _varData field.
|
|
_PendingEvents<T> _pendingEvents {
|
|
get {
|
|
D.assert(_isInitialState);
|
|
if (!_isAddingStream) {
|
|
return (_PendingEvents<T>) _varData;
|
|
}
|
|
|
|
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
|
|
return (_PendingEvents<T>) state.varData;
|
|
}
|
|
}
|
|
|
|
// Returns the pending events, and creates the object if necessary.
|
|
_StreamImplEvents<T> _ensurePendingEvents() {
|
|
D.assert(_isInitialState);
|
|
if (!_isAddingStream) {
|
|
_varData = _varData ?? new _StreamImplEvents<T>();
|
|
return (_StreamImplEvents<T>) _varData;
|
|
}
|
|
|
|
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
|
|
if (state.varData == null) state.varData = new _StreamImplEvents<T>();
|
|
return (_StreamImplEvents<T>) state.varData;
|
|
}
|
|
|
|
// Get the current subscription.
|
|
// If we are adding a stream, the subscription is moved into the state
|
|
// object to allow the state object to use the _varData field.
|
|
protected _ControllerSubscription<T> _subscription {
|
|
get {
|
|
D.assert(hasListener);
|
|
if (_isAddingStream) {
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
return (_ControllerSubscription<T>) addState.varData;
|
|
}
|
|
|
|
return (_ControllerSubscription<T>) _varData;
|
|
}
|
|
}
|
|
|
|
protected Exception _badEventState() {
|
|
if (isClosed) {
|
|
return new Exception("Cannot add event after closing");
|
|
}
|
|
|
|
D.assert(_isAddingStream);
|
|
return new Exception("Cannot add event while adding a stream");
|
|
}
|
|
|
|
// StreamSink interface.
|
|
public override Future addStream(Stream<T> source, bool? cancelOnError = false) {
|
|
if (!_mayAddEvent) throw _badEventState();
|
|
if (_isCanceled) return _Future.immediate(FutureOr.nil);
|
|
_StreamControllerAddStreamState<T> addState =
|
|
new _StreamControllerAddStreamState<T>(
|
|
this, _varData, source, cancelOnError ?? false);
|
|
_varData = addState;
|
|
_state |= _STATE_ADDSTREAM;
|
|
return addState.addStreamFuture;
|
|
}
|
|
|
|
public override Future done {
|
|
get { return _ensureDoneFuture(); }
|
|
}
|
|
|
|
Future _ensureDoneFuture() {
|
|
_doneFuture = _doneFuture ?? (_isCanceled ? Future._nullFuture : new _Future());
|
|
return _doneFuture;
|
|
}
|
|
|
|
public override void add(T value) {
|
|
if (!_mayAddEvent) throw _badEventState();
|
|
_add(value);
|
|
}
|
|
|
|
public override void addError(object error, string stackTrace) {
|
|
// ArgumentError.checkNotNull(error, "error");
|
|
if (!_mayAddEvent) throw _badEventState();
|
|
error = _async._nonNullError(error);
|
|
AsyncError replacement = Zone.current.errorCallback((Exception) error);
|
|
if (replacement != null) {
|
|
error = _async._nonNullError(replacement);
|
|
// stackTrace = replacement.stackTrace;
|
|
}
|
|
|
|
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
|
|
_addError(error, stackTrace);
|
|
}
|
|
|
|
public override Future close() {
|
|
if (isClosed) {
|
|
return _ensureDoneFuture();
|
|
}
|
|
|
|
if (!_mayAddEvent) throw _badEventState();
|
|
_closeUnchecked();
|
|
return _ensureDoneFuture();
|
|
}
|
|
|
|
internal void _closeUnchecked() {
|
|
_state |= _STATE_CLOSED;
|
|
if (hasListener) {
|
|
_sendDone();
|
|
}
|
|
else if (_isInitialState) {
|
|
_ensurePendingEvents().add(new _DelayedDone<T>());
|
|
}
|
|
}
|
|
|
|
// EventSink interface. Used by the [addStream] events.
|
|
|
|
// Add data event, used both by the [addStream] events and by [add].
|
|
public override void _add(T value) {
|
|
if (hasListener) {
|
|
_sendData(value);
|
|
}
|
|
else if (_isInitialState) {
|
|
_ensurePendingEvents().add(new _DelayedData<T>(value));
|
|
}
|
|
}
|
|
|
|
public override void _addError(object error, string stackTrace) {
|
|
if (hasListener) {
|
|
_sendError(error, stackTrace);
|
|
}
|
|
else if (_isInitialState) {
|
|
_ensurePendingEvents().add(new _DelayedError<T>((Exception) error, stackTrace));
|
|
}
|
|
}
|
|
|
|
public override void _close() {
|
|
// End of addStream stream.
|
|
D.assert(_isAddingStream);
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
_varData = addState.varData;
|
|
_state &= ~_STATE_ADDSTREAM;
|
|
addState.complete();
|
|
}
|
|
|
|
// _StreamControllerLifeCycle interface
|
|
|
|
public override StreamSubscription<T> _subscribe(
|
|
Action<T> onData,
|
|
Action<object, string> onError,
|
|
Action onDone, bool cancelOnError) {
|
|
if (!_isInitialState) {
|
|
throw new Exception("Stream has already been listened to.");
|
|
}
|
|
|
|
_ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
|
|
this, onData, onError, onDone, cancelOnError);
|
|
|
|
_PendingEvents<T> pendingEvents = _pendingEvents;
|
|
_state |= _STATE_SUBSCRIBED;
|
|
if (_isAddingStream) {
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
addState.varData = subscription;
|
|
addState.resume();
|
|
}
|
|
else {
|
|
_varData = subscription;
|
|
}
|
|
|
|
subscription._setPendingEvents(pendingEvents);
|
|
subscription._guardCallback(() => { _stream._runGuarded(() => onListen?.Invoke()); });
|
|
|
|
return subscription;
|
|
}
|
|
|
|
public override Future _recordCancel(StreamSubscription<T> subscription) {
|
|
// When we cancel, we first cancel any stream being added,
|
|
// Then we call `onCancel`, and finally the _doneFuture is completed.
|
|
// If either of addStream's cancel or `onCancel` returns a future,
|
|
// we wait for it before continuing.
|
|
// Any error during this process ends up in the returned future.
|
|
// If more errors happen, we act as if it happens inside nested try/finallys
|
|
// or whenComplete calls, and only the last error ends up in the
|
|
// returned future.
|
|
Future result = null;
|
|
if (_isAddingStream) {
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
result = addState.cancel();
|
|
}
|
|
|
|
_varData = null;
|
|
_state =
|
|
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
|
|
|
|
if (onCancel != null) {
|
|
if (result == null) {
|
|
// Only introduce a future if one is needed.
|
|
// If _onCancel returns null, no future is needed.
|
|
try {
|
|
result = onCancel();
|
|
}
|
|
catch (Exception e) {
|
|
// Return the error in the returned future.
|
|
// Complete it asynchronously, so there is time for a listener
|
|
// to handle the error.
|
|
var f = new _Future();
|
|
f._asyncCompleteError(e);
|
|
result = f;
|
|
}
|
|
}
|
|
else {
|
|
// Simpler case when we already know that we will return a future.
|
|
result = result.whenComplete(() => onCancel());
|
|
}
|
|
}
|
|
|
|
void complete() {
|
|
if (_doneFuture != null && _doneFuture._mayComplete) {
|
|
_doneFuture._asyncComplete(FutureOr.nil);
|
|
}
|
|
}
|
|
|
|
if (result != null) {
|
|
result = result.whenComplete(complete);
|
|
}
|
|
else {
|
|
complete();
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
public override void _recordPause(StreamSubscription<T> subscription) {
|
|
if (_isAddingStream) {
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
addState.pause();
|
|
}
|
|
|
|
_stream._runGuarded(() => onPause?.Invoke());
|
|
}
|
|
|
|
public override void _recordResume(StreamSubscription<T> subscription) {
|
|
if (_isAddingStream) {
|
|
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
|
|
addState.resume();
|
|
}
|
|
|
|
_stream._runGuarded(() => onResume?.Invoke());
|
|
}
|
|
}
|
|
|
|
//
|
|
abstract class _SyncStreamControllerDispatch<T>
|
|
: _StreamController<T>, SynchronousStreamController<T> {
|
|
internal virtual int _state { get; set; }
|
|
|
|
public override void _sendData(T data) {
|
|
_subscription._add(data);
|
|
}
|
|
|
|
public override void _sendError(object error, string stackTrace) {
|
|
_subscription._addError(error, stackTrace);
|
|
}
|
|
|
|
public override void _sendDone() {
|
|
_subscription._close();
|
|
}
|
|
|
|
protected _SyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
|
|
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
|
|
onResume, onCancel) {
|
|
}
|
|
}
|
|
|
|
abstract class _AsyncStreamControllerDispatch<T>
|
|
: _StreamController<T> {
|
|
public override void _sendData(T data) {
|
|
_subscription._addPending(new _DelayedData<T>(data));
|
|
}
|
|
|
|
public override void _sendError(object error, string stackTrace) {
|
|
_subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
|
|
}
|
|
|
|
public override void _sendDone() {
|
|
_subscription._addPending(new _DelayedDone<T>());
|
|
}
|
|
|
|
protected _AsyncStreamControllerDispatch(_stream.ControllerCallback onListen,
|
|
_stream.ControllerCallback onPause, _stream.ControllerCallback onResume,
|
|
_stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel) {
|
|
}
|
|
}
|
|
|
|
// TODO(lrn): Use common superclass for callback-controllers when VM supports
|
|
// constructors in mixin superclasses.
|
|
|
|
class _AsyncStreamController<T> : _AsyncStreamControllerDispatch<T> {
|
|
// public override void close() {
|
|
// throw new NotImplementedException();
|
|
// }
|
|
public _AsyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
|
|
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
|
|
onResume, onCancel) {
|
|
}
|
|
}
|
|
|
|
class _SyncStreamController<T> : _SyncStreamControllerDispatch<T> {
|
|
public _SyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
|
|
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
|
|
onResume, onCancel) {
|
|
}
|
|
}
|
|
|
|
|
|
class _ControllerStream<T> : _StreamImpl<T>, IEquatable<_ControllerStream<T>> {
|
|
_StreamControllerLifecycle<T> _controller;
|
|
|
|
internal _ControllerStream(_StreamControllerLifecycle<T> _controller) {
|
|
this._controller = _controller;
|
|
}
|
|
|
|
internal override StreamSubscription<T> _createSubscription(
|
|
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) =>
|
|
_controller._subscribe(onData, onError, onDone, cancelOnError);
|
|
|
|
// Override == and hashCode so that new streams returned by the same
|
|
// controller are considered equal. The controller returns a new stream
|
|
// each time it's queried, but doesn't have to cache the result.
|
|
|
|
// int hashCode {
|
|
// get { return _controller.GetHashCode() ^ 0x35323532; }
|
|
// }
|
|
|
|
// bool operator ==(object other) {
|
|
// if (identical(this, other)) return true;
|
|
// return other is _ControllerStream &&
|
|
// identical(other._controller, this._controller);
|
|
// }
|
|
|
|
public bool Equals(_ControllerStream<T> other) {
|
|
if (ReferenceEquals(null, other)) {
|
|
return false;
|
|
}
|
|
|
|
if (ReferenceEquals(this, other)) {
|
|
return true;
|
|
}
|
|
|
|
return Equals(_controller, other._controller);
|
|
}
|
|
|
|
public override bool Equals(object obj) {
|
|
if (ReferenceEquals(null, obj)) {
|
|
return false;
|
|
}
|
|
|
|
if (ReferenceEquals(this, obj)) {
|
|
return true;
|
|
}
|
|
|
|
if (obj.GetType() != GetType()) {
|
|
return false;
|
|
}
|
|
|
|
return Equals((_ControllerStream<T>) obj);
|
|
}
|
|
|
|
public override int GetHashCode() {
|
|
return _controller.GetHashCode() ^ 0x35323532;
|
|
}
|
|
}
|
|
|
|
class _ControllerSubscription<T> : _BufferingStreamSubscription<T> {
|
|
internal readonly _StreamControllerLifecycle<T> _controller;
|
|
|
|
internal _ControllerSubscription(
|
|
_StreamControllerLifecycle<T> _controller,
|
|
Action<T> onData,
|
|
Action<object, string> onError,
|
|
Action onDone, bool cancelOnError
|
|
)
|
|
: base(onData, onError, onDone, cancelOnError) {
|
|
this._controller = _controller;
|
|
}
|
|
|
|
protected override Future _onCancel() {
|
|
return _controller._recordCancel(this);
|
|
}
|
|
|
|
protected override void _onPause() {
|
|
_controller._recordPause(this);
|
|
}
|
|
|
|
protected override void _onResume() {
|
|
_controller._recordResume(this);
|
|
}
|
|
}
|
|
|
|
/** A class that exposes only the [StreamSink] interface of an object. */
|
|
class _StreamSinkWrapper<T> : StreamSink<T> {
|
|
readonly StreamController<T> _target;
|
|
|
|
internal _StreamSinkWrapper(StreamController<T> _target) {
|
|
this._target = _target;
|
|
}
|
|
|
|
public override void add(T data) {
|
|
_target.add(data);
|
|
}
|
|
|
|
public override void addError(object error, string stackTrace) {
|
|
_target.addError(error, stackTrace);
|
|
}
|
|
|
|
public override Future close() => _target.close();
|
|
|
|
public override Future addStream(Stream<T> source) => _target.addStream(source);
|
|
|
|
public override Future done {
|
|
get { return _target.done; }
|
|
}
|
|
}
|
|
|
|
class _AddStreamState<T> {
|
|
// [_Future] returned by call to addStream.
|
|
internal readonly _Future addStreamFuture;
|
|
|
|
// Subscription on stream argument to addStream.
|
|
internal readonly StreamSubscription<T> addSubscription;
|
|
|
|
internal _AddStreamState(
|
|
_EventSink<T> controller, Stream<T> source, bool cancelOnError) {
|
|
addStreamFuture = new _Future();
|
|
addSubscription = source.listen(controller._add,
|
|
onError: cancelOnError
|
|
? makeErrorHandler(controller)
|
|
: controller._addError,
|
|
onDone: controller._close,
|
|
cancelOnError: cancelOnError);
|
|
}
|
|
|
|
public static Action<object, string> makeErrorHandler(_EventSink<T> controller) {
|
|
return (object e, string s) => {
|
|
controller._addError(e, s);
|
|
controller._close();
|
|
};
|
|
}
|
|
|
|
public void pause() {
|
|
addSubscription.pause();
|
|
}
|
|
|
|
public void resume() {
|
|
addSubscription.resume();
|
|
}
|
|
|
|
public Future cancel() {
|
|
var cancel = addSubscription.cancel();
|
|
if (cancel == null) {
|
|
addStreamFuture._asyncComplete(FutureOr.nil);
|
|
return null;
|
|
}
|
|
|
|
return cancel.whenComplete(() => { addStreamFuture._asyncComplete(FutureOr.nil); });
|
|
}
|
|
|
|
public void complete() {
|
|
addStreamFuture._asyncComplete(FutureOr.nil);
|
|
}
|
|
}
|
|
|
|
class _StreamControllerAddStreamState<T> : _AddStreamState<T> {
|
|
// The subscription or pending data of a _StreamController.
|
|
// Stored here because we reuse the `_varData` field in the _StreamController
|
|
// to store this state object.
|
|
public object varData;
|
|
|
|
internal _StreamControllerAddStreamState(_StreamController<T> controller, object varData,
|
|
Stream<T> source, bool cancelOnError)
|
|
: base(controller, source, cancelOnError) {
|
|
if (controller.isPaused) {
|
|
addSubscription.pause();
|
|
}
|
|
}
|
|
}
|
|
}
|