浏览代码

async cast, boradcast stream controller, stream transformer

/main
siyao 3 年前
当前提交
0d786a46
共有 8 个文件被更改,包括 957 次插入171 次删除
  1. 104
      com.unity.uiwidgets/Runtime/async/async_cast.cs
  2. 48
      com.unity.uiwidgets/Runtime/async/stream.cs
  3. 164
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  4. 220
      com.unity.uiwidgets/Runtime/async/stream_impl.cs
  5. 3
      com.unity.uiwidgets/Runtime/async/stream_transformers.cs
  6. 23
      com.unity.uiwidgets/Runtime/widgets/async.cs
  7. 563
      com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs
  8. 3
      com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs.meta

104
com.unity.uiwidgets/Runtime/async/async_cast.cs


using System;
using System.Data.Common;
public class CastStream<S, T> : Stream<T> where T : class {
readonly Stream<S> _source;
public CastStream(Stream<S> _source) {
this._source = _source;
}
bool isBroadcast {
get { return _source.isBroadcast; }
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
var result = new CastStreamSubscription<S, T>(
_source.listen(null, onDone: onDone, cancelOnError: cancelOnError));
result.onData(onData);
result.onError(onError);
return result;
}
Stream<R> cast<R>() where R : class => new CastStream<S, R>(_source);
}
class CastStreamSubscription<S, T> : StreamSubscription<T> where T : class {
readonly StreamSubscription<S> _source;
/// Zone where listen was called.
readonly Zone _zone = Zone.current;
/// User's data handler. May be null.
ZoneUnaryCallback _handleData;
/// Copy of _source's handleError so we can report errors in onData.
/// May be null.
ZoneBinaryCallback _handleError;
public CastStreamSubscription( StreamSubscription<S> _source) {
this._source = _source;
_source.onData(_onData);
}
public override Future cancel() => _source.cancel();
public override void onData(Action<T> handleData) {
_handleData = handleData == null
? null
: _zone.registerUnaryCallback(data => {
handleData((T) data);
return null;
});
}
public override void onError(Action<object, string> handleError) {
_source.onError(handleError);
if (handleError == null) {
_handleError = null;
} else {
_handleError = _zone
.registerBinaryCallback((a, b)=> {
handleError(a, (string) b);
return null;
});
}
}
public override void onDone(Action handleDone) {
_source.onDone(handleDone);
}
void _onData(S data) {
if (_handleData == null) return;
T targetData;
try {
targetData = data as T;
} catch (Exception error) {
if (_handleError == null) {
_zone.handleUncaughtError(error);
} else {
_zone.runBinaryGuarded(_handleError, error, error.StackTrace);
}
return;
}
_zone.runUnaryGuarded(_handleData, targetData);
}
public override void pause(Future resumeSignal = null) {
_source.pause(resumeSignal);
}
public override void resume() {
_source.resume();
}
bool isPaused {
get { return _source.isPaused; }
}
public override Future<E> asFuture<E>(E futureValue) => _source.asFuture<E>(futureValue);
}
class CastStreamTransformer<SS, ST, TS, TT>
: StreamTransformerBase<TS, TT> where TT : class where ST : class {

48
com.unity.uiwidgets/Runtime/async/stream.cs


return controller.stream;
}
public static Stream fromIterable(IEnumerable<T> elements) {
public static Stream<T> fromIterable(IEnumerable<T> elements) {
() => new _IterablePendingEvents<T>(elements));
() => (_PendingEvents<T>) new _IterablePendingEvents<T>(elements));
}
public static Stream<T> periodic(TimeSpan period,

onCancel: () => {
if (timer != null) timer.cancel();
timer = null;
// return Future._nullFuture;
return Future._nullFuture;
factory Stream.eventTransformed(
Stream source, EventSink mapSink(EventSink<T> sink)) {
return new _BoundSinkStream(source, mapSink);
public static Stream<T> eventTransformed(
Stream<T> source, _async._SinkMapper<T, T> mapSink) {
return new _BoundSinkStream<T, T>(source, mapSink);
static Stream<T> castFrom<S, T>(Stream<S> source) =>
static Stream<T> castFrom<S, T>(Stream<S> source) where T : class =>
new CastStream<S, T>(source);
public bool isBroadcast {

Stream<T> asBroadcastStream(
{void onListen(StreamSubscription<T> subscription),
void onCancel(StreamSubscription<T> subscription)}) {
public Stream<T> asBroadcastStream(
Action<StreamSubscription<T>> onListen = null,
Action<StreamSubscription<T>> onCancel = null) {
return new _AsBroadcastStream<T>(this, onListen, onCancel);
}

Stream<T> where(bool test(T event)) {
public Stream<T> where(Func<T, bool> test) {
Stream<S> map<S>(S convert(T event)) {
public Stream<S> map<S>(Func<T,S> convert) {
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
public Stream<E> asyncMap<E>(Func<T,FutureOr> convert) {
final add = controller.add;
assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController);
final addError = controller._addError;
subscription = this.listen((T event) {
FutureOr<E> newValue;
var add = new Action<E>(controller.add);
D.assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController<E>);
var addError = new Action<object, string>(controller._addError);
subscription = this.listen((T evt) => {
FutureOr newValue;
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
newValue = convert(evt);
} catch (Exception e) {
controller.addError(e, e.StackTrace);
return;
}
if (newValue is Future<E>) {

_StreamControllerBase<E> controller;
StreamSubscription<T> subscription;
void onListen() {
assert(controller is _StreamController ||
D.assert(controller is _StreamController ||
controller is _BroadcastStreamController);
subscription = this.listen((T event) {
Stream<E> newStream;

void onError(error, StackTrace stackTrace) {
timer.cancel();
assert(controller is _StreamController ||
D.assert(controller is _StreamController ||
controller is _BroadcastStreamController);
controller._addError(error, stackTrace); // Avoid Zone error replacement.
timer = zone.createTimer(timeLimit, timeout);

164
com.unity.uiwidgets/Runtime/async/stream_controller.cs


/** The stream that this controller is controlling. */
internal Stream<T> stream { get; }
public StreamController(
Action onListen = null,
Action onPause = null,
Action onResume = null,
Action onCancel = null,
public static StreamController<T> create(
_stream.ControllerCallback onListen,
_stream.ControllerCallback onPause,
_stream.ControllerCallback onResume,
_stream.ControllerCancelCallback onCancel,
// Action onListen = null,
// Action onPause = null,
// Action onResume = null,
// Action onCancel = null,
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
? (StreamController<T>) new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
public static StreamController<T> create(
Action onListen = null,
Action onPause = null,
Action onResume = null,
Action onCancel = null,
bool sync = false) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
? (StreamController<T>) new _SyncBroadcastStreamController<T>(()=>onListen(), ()=> {
onCancel();
return Future._nullFuture;
})
: new _AsyncBroadcastStreamController<T>(()=>onListen(), ()=> {
onCancel();
return Future._nullFuture;
});
}
public _stream.ControllerCallback onListen {

//
// public abstract void addError(object error, string stackTrace);
public abstract Future closeConsumer();
public abstract override Future close();
public abstract Future addStream(Stream<T> source, bool cancelOnError = false);
public abstract Future addStream(Stream<T> source, bool? cancelOnError = false);
}
public interface SynchronousStreamController<T> {//: StreamController<T> {

}
// StreamSink interface.
Future addStream(Stream<T> source, bool? cancelOnError = false) {
public override Future addStream(Stream<T> source, bool? cancelOnError = false) {
if (!_mayAddEvent) throw _badEventState();
if (_isCanceled) return _Future.immediate(FutureOr.nil);
_StreamControllerAddStreamState<T> addState =

_addError(error, stackTrace);
}
public override Future closeConsumer() {
public override Future close() {
if (isClosed) {
return _ensureDoneFuture();
}

// EventSink interface. Used by the [addStream] events.
// Add data event, used both by the [addStream] events and by [add].
void _add(T value) {
public override void _add(T value) {
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {

void _addError(object error, string stackTrace) {
public override void _addError(object error, string stackTrace) {
if (hasListener) {
_sendError(error, stackTrace);
} else if (_isInitialState) {

void _close() {
public override void _close() {
// End of addStream stream.
D.assert(_isAddingStream);
_StreamControllerAddStreamState<T> addState = _varData;

: _StreamController<T>, SynchronousStreamController<T> {
int _state { get; set; }
void _sendData(T data) {
public override void _sendData(T data) {
void _sendError(object error, string stackTrace) {
public override void _sendError(object error, string stackTrace) {
void _sendDone() {
public override void _sendDone() {
_subscription._close();
}

abstract class _AsyncStreamControllerDispatch<T>
: _StreamController<T> {
void _sendData(T data) {
public override void _sendData(T data) {
void _sendError(object error, string stackTrace) {
public override void _sendError(object error, string stackTrace) {
void _sendDone() {
public override void _sendDone() {
_subscription._addPending(new _DelayedDone());
}

// public override void close() {
// throw new NotImplementedException();
// }
public _AsyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel)
{
}
public _SyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel)
{
}
}

return _controller.GetHashCode() ^ 0x35323532;
}
}
//
// class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
// final _StreamControllerLifecycle<T> _controller;
//
// _ControllerSubscription(this._controller, void onData(T data),
// Function onError, void onDone(), bool cancelOnError)
// : super(onData, onError, onDone, cancelOnError);
//
// Future _onCancel() {
// return _controller._recordCancel(this);
// }
//
// void _onPause() {
// _controller._recordPause(this);
// }
//
// void _onResume() {
// _controller._recordResume(this);
// }
// }
//
// /** A class that exposes only the [StreamSink] interface of an object. */
// class _StreamSinkWrapper<T> implements StreamSink<T> {
// final StreamController _target;
// _StreamSinkWrapper(this._target);
// void add(T data) {
// _target.add(data);
// }
//
// void addError(object error, [string stackTrace]) {
// _target.addError(error, stackTrace);
// }
//
// Future close() => _target.close();
//
// Future addStream(Stream<T> source) => _target.addStream(source);
//
// Future get done => _target.done;
// }
//
class _ControllerSubscription<T> : _BufferingStreamSubscription<T> {
internal readonly _StreamControllerLifecycle<T> _controller;
internal _ControllerSubscription(
_StreamControllerLifecycle<T> _controller,
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError
)
: base(onData, onError, onDone, cancelOnError) {
this._controller = _controller;
}
Future _onCancel() {
return _controller._recordCancel(this);
}
void _onPause() {
_controller._recordPause(this);
}
void _onResume() {
_controller._recordResume(this);
}
}
/** A class that exposes only the [StreamSink] interface of an object. */
class _StreamSinkWrapper<T> : StreamSink<T> {
readonly StreamController<T> _target;
internal _StreamSinkWrapper(StreamController<T> _target) {
this._target = _target;
}
public override void add(T data) {
_target.add(data);
}
public override void addError(object error, string stackTrace) {
_target.addError(error, stackTrace);
}
public override Future close() => _target.close();
Future addStream(Stream<T> source) => _target.addStream(source);
Future done {
get { return _target.done; }
}
}
class _AddStreamState<T> {
// [_Future] returned by call to addStream.
internal readonly _Future addStreamFuture;

});
}
void complete() {
public void complete() {
addStreamFuture._asyncComplete(FutureOr.nil);
}
}

220
com.unity.uiwidgets/Runtime/async/stream_impl.cs


using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
public delegate void _DataHandler<T>(T value);
internal delegate void _DataHandler<T>(T value);
public delegate void _DoneHandler();
internal delegate void _DoneHandler();
public static void _nullDataHandler<T>(T obj) {
internal static void _nullDataHandler<T>(T obj) {
public static void _nullErrorHandler(Exception error) {
internal static void _nullErrorHandler(Exception error) {
public static void _nullDoneHandler() {
internal static void _nullDoneHandler() {
internal delegate _PendingEvents<T> _EventGenerator<T>();
}
abstract class _StreamImpl<T> : Stream<T> {

StreamSubscription<T> listen(
public override StreamSubscription<T> listen(
Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
// void onData(T data),
// {Function onError, void onDone(), bool cancelOnError}) {

void _onListen(StreamSubscription<T> subscription) {
}
}
class _GeneratedStreamImpl<T> : _StreamImpl<T> {
readonly _stream._EventGenerator<T> _pending;
bool _isUsed = false;
internal _GeneratedStreamImpl(_stream._EventGenerator<T> _pending) {
this._pending = _pending;
}
StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError ) {
if (_isUsed) throw new Exception("Stream has already been listened to.");
_isUsed = true;
var result = new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError);
result._setPendingEvents(_pending());
return result;
}
}
class _IterablePendingEvents<T> : _PendingEvents<T> {
IEnumerator<T> _iterator;
internal _IterablePendingEvents(IEnumerable<T> data) {
_iterator = data.GetEnumerator();
}
bool isEmpty {
get { return _iterator == null; }
}
public override void handleNext(_EventDispatch<T> dispatch) {
if (_iterator == null) {
throw new Exception("No events pending.");
}
bool? hasMore = null;
try {
hasMore = _iterator.MoveNext();
if (hasMore ?? false) {
dispatch._sendData(_iterator.Current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (Exception e) {
if (hasMore == null) {
// Threw in .moveNext().
// Ensure that we send a done afterwards.
_iterator =Enumerable.Empty<T>().GetEnumerator();// new EmptyIterator<Null>();
dispatch._sendError(e, e.StackTrace);
} else {
// Threw in .current.
dispatch._sendError(e, e.StackTrace);
}
}
}
public override void clear() {
if (isScheduled) cancelSchedule();
_iterator = null;
}
}
abstract class _DelayedEvent<T> {
/** Added as a linked list on the [StreamController]. */

}
/** A delayed error event. */
class _DelayedError : _DelayedEvent<object> {
class _DelayedError<T> : _DelayedEvent<T> {
readonly Exception error;
readonly string stackTrace;

}
public override void perform(_EventDispatch<object> dispatch) {
public override void perform(_EventDispatch<T> dispatch) {
class _DelayedDone : _DelayedEvent<object> {
class _DelayedDone<T> : _DelayedEvent<T> {
public override void perform(_EventDispatch<object> dispatch) {
public override void perform(_EventDispatch<T> dispatch) {
dispatch._sendDone();
}

this.onDone(onDone);
}
void _setPendingEvents(_PendingEvents<T> pendingEvents) {
internal void _setPendingEvents(_PendingEvents<T> pendingEvents) {
D.assert(_pending == null);
if (pendingEvents == null) return;
_pending = pendingEvents;

_sendData(data);
}
else {
_addPending(new _DelayedData<object>(data));
_addPending(new _DelayedData<T>(data));
}
}

_sendError(error, stackTrace); // Reports cancel after sending.
}
else {
_addPending(new _DelayedError((Exception) error, stackTrace));
_addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
}

_sendDone();
}
else {
_addPending(new _DelayedDone());
_addPending(new _DelayedDone<T>());
}
}

// Handle pending events.
void _addPending(_DelayedEvent<object> evt) {
internal void _addPending(_DelayedEvent<T> evt) {
_StreamImplEvents<T> pending = _pending as _StreamImplEvents<T>;
if (_pending == null) {
pending = (_StreamImplEvents<T>) (_pending = new _StreamImplEvents<T>());

class _StreamImplEvents<T> : _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent<object> firstPendingEvent;
_DelayedEvent<T> firstPendingEvent;
_DelayedEvent<object> lastPendingEvent;
_DelayedEvent<T> lastPendingEvent;
internal void add(_DelayedEvent<object> evt) {
internal void add(_DelayedEvent<T> evt) {
if (lastPendingEvent == null) {
firstPendingEvent = lastPendingEvent = evt;
}

public override void handleNext(_EventDispatch<T> dispatch) {
D.assert(!isScheduled);
_DelayedEvent<object> evt = firstPendingEvent;
_DelayedEvent<T> evt = firstPendingEvent;
evt.perform((_EventDispatch<object>) dispatch);
evt.perform((_EventDispatch<T>) dispatch);
}
public override void clear() {

}
internal class _StreamIterator<T> : StreamIterator<T> {
// The stream iterator is always in one of four states.
// The value of the [_stateData] field depends on the state.
//
// When `_subscription == null` and `_stateData != null`:
// The stream iterator has been created, but [moveNext] has not been called
// yet. The [_stateData] field contains the stream to listen to on the first
// call to [moveNext] and [current] returns `null`.
//
// When `_subscription != null` and `!_isPaused`:
// The user has called [moveNext] and the iterator is waiting for the next
// event. The [_stateData] field contains the [_Future] returned by the
// [_moveNext] call and [current] returns `null.`
//
// When `_subscription != null` and `_isPaused`:
// The most recent call to [moveNext] has completed with a `true` value
// and [current] provides the value of the data event.
// The [_stateData] field contains the [current] value.
//
// When `_subscription == null` and `_stateData == null`:
// The stream has completed or been canceled using [cancel].
// The stream completes on either a done event or an error event.
// The last call to [moveNext] has completed with `false` and [current]
// returns `null`.
class _DoneStreamSubscription<T> : StreamSubscription<T> {
internal const int _DONE_SENT = 1;
internal const int _SCHEDULED = 2;
internal const int _PAUSED = 4;
readonly Zone _zone;
int _state = 0;
_stream._DoneHandler _onDone;
internal _DoneStreamSubscription(_stream._DoneHandler _onDone) {
_zone = Zone.current;
this._onDone = _onDone;
_schedule();
}
bool _isSent {
get => (_state & _DONE_SENT) != 0;
}
bool _isScheduled {
get => (_state & _SCHEDULED) != 0;
}
bool isPaused {
get => _state >= _PAUSED;
}
void _schedule() {
if (_isScheduled) return;
_zone.scheduleMicrotask(() => {
_sendDone();
return null;
});
_state |= _SCHEDULED;
}
public override void onData(Action<T> handleData) {
}
public override void onError(Action<object, string> action) {
}
public override void onDone(Action handleDone) {
_onDone = () => handleDone();
}
public override void pause(Future resumeSignal = null) {
_state += _PAUSED;
if (resumeSignal != null) resumeSignal.whenComplete(resume);
}
public override void resume() {
if (isPaused) {
_state -= _PAUSED;
if (!isPaused && !_isSent) {
_schedule();
}
}
}
public override Future cancel() => Future._nullFuture;
public override Future<E> asFuture<E>(E futureValue) {
_Future result = new _Future();
_onDone = () => { result._completeWithValue(futureValue); };
return result.to<E>();
}
void _sendDone() {
_state &= ~_SCHEDULED;
if (isPaused) return;
_state |= _DONE_SENT;
if (_onDone != null) _zone.runGuarded(() => _onDone);
}
}
internal class _StreamIterator<T> : StreamIterator<T> {
StreamSubscription<T> _subscription;
//@pragma("vm:entry-point")

moveNextFuture._complete(false);
}
}
bool isBroadcast {
bool isBroadcast {
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
return new _DoneStreamSubscription<T>(onDone);
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
return new _DoneStreamSubscription<T>(() => onDone());
}
}
}

3
com.unity.uiwidgets/Runtime/async/stream_transformers.cs


}
class _BoundSubscriptionStream<S, T> : Stream<T> {
_BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
this._stream = _stream;
this._onListen = _onListen;
}

23
com.unity.uiwidgets/Runtime/widgets/async.cs


// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// import 'dart:async' show Future, Stream, StreamSubscription;
//
// import 'package:flutter/foundation.dart';
//
// import 'framework.dart';
// Examples can assume:
// dynamic _lot;
// Future<String> _calculation;
using System;
using System.Collections.Generic;
using Unity.UIWidgets.async;

// public class Stream<T> {
// }
// public class StreamSubscription<T> {
// }
public abstract class StreamBuilderBase<T, S> : StatefulWidget {
public StreamBuilderBase(Key key = null, Stream<T> stream = null) : base(key: key) {

if (widget.stream != null) {
_subscription = widget.stream.listen(
(T data) => { setState(() => { _summary = widget.afterData(_summary, data); }); },
onError: (object error) => { setState(() => { _summary = widget.afterError(_summary, error); }); },
onError: (object error, string stackTrace) => { setState(() => { _summary = widget.afterError(_summary, error); }); },
onDone: () => { setState(() => { _summary = widget.afterDone(_summary); }); });
_summary = widget.afterConnected(_summary);
}

563
com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs


using System;
using Unity.UIWidgets.foundation;
namespace Unity.UIWidgets.async {
class _BroadcastStream<T> : _ControllerStream<T> {
internal _BroadcastStream(_StreamControllerLifecycle<T> controller)
: base(controller) {
}
bool isBroadcast {
get { return true; }
}
}
class _BroadcastSubscription<T> : _ControllerSubscription<T> {
const int _STATE_EVENT_ID = 1;
internal const int _STATE_FIRING = 2;
const int _STATE_REMOVE_AFTER_FIRING = 4;
// TODO(lrn): Use the _state field on _ControllerSubscription to
// also store this state. Requires that the subscription implementation
// does not assume that it's use of the state integer is the only use.
internal int _eventState = 0; // Initialized to help dart2js type inference.
internal _BroadcastSubscription<T> _next;
internal _BroadcastSubscription<T> _previous;
internal _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError
)
: base(controller, onData, onError, onDone, cancelOnError) {
_next = _previous = this;
}
internal bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId;
internal void _toggleEventId() {
_eventState ^= _STATE_EVENT_ID;
}
internal bool _isFiring {
get { return (_eventState & _STATE_FIRING) != 0; }
}
internal void _setRemoveAfterFiring() {
D.assert(_isFiring);
_eventState |= _STATE_REMOVE_AFTER_FIRING;
}
internal bool _removeAfterFiring {
get { return (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; }
}
// The controller._recordPause doesn't do anything for a broadcast controller,
// so we don't bother calling it.
void _onPause() {
}
// The controller._recordResume doesn't do anything for a broadcast
// controller, so we don't bother calling it.
void _onResume() {
}
// _onCancel is inherited.
}
abstract class _BroadcastStreamController<T>
: _StreamControllerBase<T> {
const int _STATE_INITIAL = 0;
const int _STATE_EVENT_ID = 1;
internal const int _STATE_FIRING = 2;
const int _STATE_CLOSED = 4;
const int _STATE_ADDSTREAM = 8;
_stream.ControllerCallback onListen;
_stream.ControllerCancelCallback onCancel;
// State of the controller.
internal int _state;
// Double-linked list of active listeners.
internal _BroadcastSubscription<T> _firstSubscription;
_BroadcastSubscription<T> _lastSubscription;
// Extra state used during an [addStream] call.
_AddStreamState<T> _addStreamState;
internal _Future _doneFuture;
internal _BroadcastStreamController(_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel) {
this.onListen = onListen;
this.onCancel = onCancel;
_state = _STATE_INITIAL;
}
_stream.ControllerCallback onPause {
get {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
set {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
}
_stream.ControllerCallback onResume {
get {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
set {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
}
// StreamController interface.
Stream<T> stream {
get { return new _BroadcastStream<T>(this); }
}
StreamSink<T> sink {
get { return new _StreamSinkWrapper<T>(this); }
}
bool isClosed {
get { return (_state & _STATE_CLOSED) != 0; }
}
/**
* A broadcast controller is never paused.
*
* Each receiving stream may be paused individually, and they handle their
* own buffering.
*/
bool isPaused {
get => false;
}
/** Whether there are currently one or more subscribers. */
bool hasListener {
get => !_isEmpty;
}
/**
* Test whether the stream has exactly one listener.
*
* Assumes that the stream has a listener (not [_isEmpty]).
*/
internal bool _hasOneListener {
get {
D.assert(!_isEmpty);
return Equals(_firstSubscription, _lastSubscription);
}
}
/** Whether an event is being fired (sent to some, but not all, listeners). */
internal bool _isFiring {
get => (_state & _STATE_FIRING) != 0;
}
bool _isAddingStream {
get => (_state & _STATE_ADDSTREAM) != 0;
}
internal bool _mayAddEvent {
get => (_state < _STATE_CLOSED);
}
_Future _ensureDoneFuture() {
if (_doneFuture != null) return _doneFuture;
return _doneFuture = new _Future();
}
// Linked list helpers
internal bool _isEmpty {
get { return _firstSubscription == null; }
}
/** Adds subscription to linked list of active listeners. */
void _addListener(_BroadcastSubscription<T> subscription) {
D.assert(Equals(subscription._next, subscription));
subscription._eventState = (_state & _STATE_EVENT_ID);
// Insert in linked list as last subscription.
_BroadcastSubscription<T> oldLast = _lastSubscription;
_lastSubscription = subscription;
subscription._next = null;
subscription._previous = oldLast;
if (oldLast == null) {
_firstSubscription = subscription;
}
else {
oldLast._next = subscription;
}
}
void _removeListener(_BroadcastSubscription<T> subscription) {
D.assert(Equals(subscription._controller, this));
D.assert(!Equals(subscription._next, subscription));
_BroadcastSubscription<T> previous = subscription._previous;
_BroadcastSubscription<T> next = subscription._next;
if (previous == null) {
// This was the first subscription.
_firstSubscription = next;
}
else {
previous._next = next;
}
if (next == null) {
// This was the last subscription.
_lastSubscription = previous;
}
else {
next._previous = previous;
}
subscription._next = subscription._previous = subscription;
}
// _StreamControllerLifecycle interface.
StreamSubscription<T> _subscribe(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
if (isClosed) {
onDone ??= _stream._nullDoneHandler;
return new _DoneStreamSubscription<T>(()=>onDone());
}
StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_addListener((_BroadcastSubscription<T>) subscription);
if (Equals(_firstSubscription, _lastSubscription)) {
// Only one listener, so it must be the first listener.
_stream._runGuarded(() => onListen());
}
return subscription;
}
Future _recordCancel(StreamSubscription<T> sub) {
_BroadcastSubscription<T> subscription = (_BroadcastSubscription<T>) sub;
// If already removed by the stream, don't remove it again.
if (Equals(subscription._next, subscription)) return null;
if (subscription._isFiring) {
subscription._setRemoveAfterFiring();
}
else {
_removeListener(subscription);
// If we are currently firing an event, the empty-check is performed at
// the end of the listener loop instead of here.
if (!_isFiring && _isEmpty) {
_callOnCancel();
}
}
return null;
}
void _recordPause(StreamSubscription<T> subscription) {
}
void _recordResume(StreamSubscription<T> subscription) {
}
// EventSink interface.
internal virtual Exception _addEventError() {
if (isClosed) {
return new Exception("Cannot add new events after calling close");
}
D.assert(_isAddingStream);
return new Exception("Cannot add new events while doing an addStream");
}
public override void add(T data) {
if (!_mayAddEvent) throw _addEventError();
_sendData(data);
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (!_mayAddEvent) throw _addEventError();
AsyncError replacement = Zone.current.errorCallback((Exception) error);
if (replacement != null) {
error = _async._nonNullError(replacement);
stackTrace = replacement.StackTrace;
}
stackTrace ??= AsyncError.defaultStackTrace(error);
_sendError(error, stackTrace);
}
public override Future close() {
if (isClosed) {
D.assert(_doneFuture != null);
return _doneFuture;
}
if (!_mayAddEvent) throw _addEventError();
_state |= _STATE_CLOSED;
Future doneFuture = _ensureDoneFuture();
_sendDone();
return doneFuture;
}
Future done {
get { return _ensureDoneFuture(); }
}
public override Future addStream(Stream<T> stream, bool? cancelOnError = null) {
if (!_mayAddEvent) throw _addEventError();
_state |= _STATE_ADDSTREAM;
_addStreamState = new _AddStreamState<T>(this, stream, cancelOnError ?? false);
return _addStreamState.addStreamFuture;
}
// _EventSink interface, called from AddStreamState.
public override void _add(T data) {
_sendData(data);
}
public override void _addError(object error, string stackTrace) {
_sendError(error, stackTrace);
}
public override void _close() {
D.assert(_isAddingStream);
_AddStreamState<T> addState = _addStreamState;
_addStreamState = null;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
}
// Event handling.
internal void _forEachListener(Action<_BufferingStreamSubscription<T>> action) {
if (_isFiring) {
throw new Exception(
"Cannot fire new event. Controller is already firing an event");
}
if (_isEmpty) return;
// Get event id of this event.
int id = (_state & _STATE_EVENT_ID);
// Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
// callbacks while firing, and we prevent reentrancy of this function.
//
// Set [_state]'s event id to the next event's id.
// Any listeners added while firing this event will expect the next event,
// not this one, and won't get notified.
_state ^= _STATE_EVENT_ID | _STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
while (subscription != null) {
if (subscription._expectsEvent(id)) {
subscription._eventState |= _BroadcastSubscription<T>._STATE_FIRING;
action(subscription);
subscription._toggleEventId();
_BroadcastSubscription<T> next = subscription._next;
if (subscription._removeAfterFiring) {
_removeListener(subscription);
}
subscription._eventState &= ~_BroadcastSubscription<T>._STATE_FIRING;
subscription = next;
}
else {
subscription = subscription._next;
}
}
_state &= ~_STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
}
internal void _callOnCancel() {
D.assert(_isEmpty);
if (isClosed && _doneFuture._mayComplete) {
// When closed, _doneFuture is not null.
_doneFuture._asyncComplete(FutureOr.nil);
}
_stream._runGuarded(() => onCancel());
}
}
class _SyncBroadcastStreamController<T> : _BroadcastStreamController<T>
, SynchronousStreamController<T> {
internal _SyncBroadcastStreamController(
_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
}
// EventDispatch interface.
bool _mayAddEvent {
get { return base._mayAddEvent && !_isFiring; }
}
internal override Exception _addEventError() {
if (_isFiring) {
return new Exception(
"Cannot fire new event. Controller is already firing an event");
}
return base._addEventError();
}
public override void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController<T>._STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
subscription._add(data);
_state &= ~_BroadcastStreamController<T>._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) =>{
subscription._add(data);
});
}
public override void _sendError(object error, string stackTrace) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._addError(error, stackTrace);
});
}
public override void _sendDone() {
if (!_isEmpty) {
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._close();
});
} else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
//
class _AsyncBroadcastStreamController<T> : _BroadcastStreamController<T> {
internal _AsyncBroadcastStreamController(_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
}
// EventDispatch interface.
public override void _sendData(T data) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
public override void _sendError(object error, string stackTrace) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
}
public override void _sendDone() {
if (!_isEmpty) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedDone<T>());
}
} else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
//
// /**
// * Stream controller that is used by [Stream.asBroadcastStream].
// *
// * This stream controller allows incoming events while it is firing
// * other events. This is handled by delaying the events until the
// * current event is done firing, and then fire the pending events.
// *
// * This class extends [_SyncBroadcastStreamController]. Events of
// * an "asBroadcastStream" stream are always initiated by events
// * on another stream, and it is fine to forward them synchronously.
// */
// class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
// implements _EventDispatch<T> {
// _StreamImplEvents<T> _pending;
//
// _AsBroadcastStreamController(void onListen(), void onCancel())
// : base(onListen, onCancel);
//
// bool get _hasPending => _pending != null && !_pending.isEmpty;
//
// void _addPendingEvent(_DelayedEvent event) {
// _pending ??= new _StreamImplEvents<T>();
// _pending.add(event);
// }
//
// void add(T data) {
// if (!isClosed && _isFiring) {
// _addPendingEvent(new _DelayedData<T>(data));
// return;
// }
// base.add(data);
// while (_hasPending) {
// _pending.handleNext(this);
// }
// }
//
// void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
// stackTrace ??= AsyncError.defaultStackTrace(error);
// if (!isClosed && _isFiring) {
// _addPendingEvent(new _DelayedError(error, stackTrace));
// return;
// }
// if (!_mayAddEvent) throw _addEventError();
// _sendError(error, stackTrace);
// while (_hasPending) {
// _pending.handleNext(this);
// }
// }
//
// Future close() {
// if (!isClosed && _isFiring) {
// _addPendingEvent(const _DelayedDone());
// _state |= _BroadcastStreamController._STATE_CLOSED;
// return base.done;
// }
// Future result = base.close();
// D.assert(!_hasPending);
// return result;
// }
//
// void _callOnCancel() {
// if (_hasPending) {
// _pending.clear();
// _pending = null;
// }
// base._callOnCancel();
// }
// }
}

3
com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs.meta


fileFormatVersion: 2
guid: 88bb4d17b79047948e7e36354ad968d4
timeCreated: 1629189231
正在加载...
取消
保存