浏览代码

stream impl, controller, stopwatch, transformer

/main
siyao 3 年前
当前提交
433d5a95
共有 12 个文件被更改,包括 2046 次插入1153 次删除
  1. 2
      com.unity.uiwidgets/Runtime/async/async_cast.cs
  2. 4
      com.unity.uiwidgets/Runtime/async/sink.cs
  3. 1001
      com.unity.uiwidgets/Runtime/async/stream.cs
  4. 994
      com.unity.uiwidgets/Runtime/async/stream_impl.cs
  5. 445
      com.unity.uiwidgets/Runtime/async/stream_transformers.cs
  6. 8
      com.unity.uiwidgets/Runtime/widgets/async.cs
  7. 7
      com.unity.uiwidgets/Runtime/async/async.cs
  8. 3
      com.unity.uiwidgets/Runtime/async/async.cs.meta
  9. 65
      com.unity.uiwidgets/Runtime/async/stopwatch.cs
  10. 3
      com.unity.uiwidgets/Runtime/async/stopwatch.cs.meta
  11. 664
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  12. 3
      com.unity.uiwidgets/Runtime/async/stream_controller.cs.meta

2
com.unity.uiwidgets/Runtime/async/async_cast.cs


using Unity.UIWidgets.async;
class CastStreamTransformer<SS, ST, TS, TT>
: StreamTransformerBase<TS, TT> {
: StreamTransformerBase<TS, TT> where TT : class where ST : class {
public readonly StreamTransformer<SS, ST> _source;
public CastStreamTransformer(StreamTransformer<SS, ST> _source) {

4
com.unity.uiwidgets/Runtime/async/sink.cs


using Unity.UIWidgets.async;
public abstract void close();
public abstract Future close();
}
}

1001
com.unity.uiwidgets/Runtime/async/stream.cs
文件差异内容过多而无法显示
查看文件

994
com.unity.uiwidgets/Runtime/async/stream_impl.cs
文件差异内容过多而无法显示
查看文件

445
com.unity.uiwidgets/Runtime/async/stream_transformers.cs


using System;
using Unity.UIWidgets.async;
class _EventSinkWrapper<T> : EventSink<T> {
_EventSink<object> _sink;
namespace Unity.UIWidgets.async {
class _EventSinkWrapper<T> : EventSink<T> {
_EventSink<object> _sink;
internal _EventSinkWrapper(_EventSink<object> _sink) {
this._sink = _sink;
}
internal _EventSinkWrapper(_EventSink<object> _sink) {
this._sink = _sink;
}
public override void add(T data) {
_sink._add(data);
}
public override void add(T data) {
_sink._add(data);
}
public override void addError(object error, string stackTrace) {
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
}
public override void addError(object error, string stackTrace) {
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
}
public override void close() {
_sink._close();
public override Future close() {
_sink._close();
return Future._nullFuture;
}
}
class _SinkTransformerStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
/// The transformer's input sink.
EventSink<S> _transformerSink;
class _SinkTransformerStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
/// The transformer's input sink.
EventSink<S> _transformerSink;
/// The subscription to the input stream.
StreamSubscription<S> _subscription;
/// The subscription to the input stream.
StreamSubscription<S> _subscription;
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError)
// We set the adapter's target only when the user is allowed to send data.
: base(onData, onError, onDone, cancelOnError) {
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this);
_transformerSink = mapper(eventSink);
_subscription =
source.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError)
// We set the adapter's target only when the user is allowed to send data.
: base(onData, onError, onDone, cancelOnError) {
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this);
_transformerSink = mapper(eventSink);
_subscription =
source.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
/** Whether this subscription is still subscribed to its source. */
bool _isSubscribed {
get { return _subscription != null; }
}
/** Whether this subscription is still subscribed to its source. */
bool _isSubscribed {
get { return _subscription != null; }
}
// _EventSink interface.
// _EventSink interface.
void _add(T data) {
if (_isClosed) {
throw new Exception("Stream is already closed");
void _add(T data) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._add(data);
base._add(data);
}
void _addError(object error, string stackTrace) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
void _addError(Object error, string stackTrace) {
if (_isClosed) {
throw new Exception("Stream is already closed");
base._addError(error, stackTrace);
base._addError(error, stackTrace);
}
void _close() {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
void _close() {
if (_isClosed) {
throw new Exception("Stream is already closed");
base._close();
base._close();
}
// _BufferingStreamSubscription hooks.
// _BufferingStreamSubscription hooks.
void _onPause() {
if (_isSubscribed) _subscription.pause();
}
void _onPause() {
if (_isSubscribed) _subscription.pause();
}
void _onResume() {
if (_isSubscribed) _subscription.resume();
}
void _onResume() {
if (_isSubscribed) _subscription.resume();
}
Future _onCancel() {
if (_isSubscribed) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
}
Future _onCancel() {
if (_isSubscribed) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
return null;
return null;
}
void _handleData(S data) {
try {
_transformerSink.add(data);
}
catch (Exception e) {
_addError(e, e.StackTrace);
void _handleData(S data) {
try {
_transformerSink.add(data);
}
catch (Exception e) {
_addError(e, e.StackTrace);
}
}
void _handleError(object error, string stackTrace) {
try {
_transformerSink.addError(error, stackTrace);
void _handleError(object error, string stackTrace) {
try {
_transformerSink.addError(error, stackTrace);
}
catch (Exception e) {
if (Equals(e, error)) {
_addError(error, stackTrace);
}
else {
_addError(e, e.StackTrace);
}
}
catch (Exception e) {
if (Equals(e, error)) {
_addError(error, stackTrace);
void _handleDone() {
try {
_subscription = null;
_transformerSink.close();
else {
catch (Exception e) {
void _handleDone() {
try {
_subscription = null;
_transformerSink.close();
}
catch (Exception e) {
_addError(e, e.StackTrace);
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
readonly _async._SinkMapper<S, T> _sinkMapper;
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) {
this._sinkMapper = _sinkMapper;
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSinkStream<S, T>(stream, _sinkMapper);
}
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
class _BoundSinkStream<S, T> : Stream<T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
readonly Stream<S> _stream;
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) {
this._sinkMapper = _sinkMapper;
}
bool isBroadcast {
get { return _stream.isBroadcast; }
}
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSinkStream<S, T>(stream, _sinkMapper);
}
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) {
this._stream = _stream;
this._sinkMapper = _sinkMapper;
}
class _BoundSinkStream<S, T> : Stream<T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
readonly Stream<S> _stream;
public override StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) {
StreamSubscription<T> subscription =
new _SinkTransformerStreamSubscription<S, T>(
_stream, _sinkMapper, onData, onError, onDone, cancelOnError);
return subscription;
}
}
bool isBroadcast {
get { return _stream.isBroadcast; }
}
static partial class _stream {
public delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) {
this._stream = _stream;
this._sinkMapper = _sinkMapper;
}
/// Error-handler coming from [StreamTransformer.fromHandlers].
public delegate void _TransformErrorHandler<T>(
object error, string stackTrace, EventSink<T> sink);
StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) {
StreamSubscription<T> subscription =
new _SinkTransformerStreamSubscription<S, T>(
_stream, _sinkMapper, onData, onError, onDone, cancelOnError);
return subscription;
/// Done-handler coming from [StreamTransformer.fromHandlers].
public delegate void _TransformDoneHandler<T>(EventSink<T> sink);
}
static partial class _stream {
internal delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
class _HandlerEventSink<S, T> : EventSink<S> where T : class {
readonly _stream._TransformDataHandler<S, T> _handleData;
readonly _stream._TransformErrorHandler<T> _handleError;
readonly _stream._TransformDoneHandler<T> _handleDone;
/// Error-handler coming from [StreamTransformer.fromHandlers].
internal delegate void _TransformErrorHandler<T>(
object error, string stackTrace, EventSink<T> sink);
/// The output sink where the handlers should send their data into.
EventSink<T> _sink;
/// Done-handler coming from [StreamTransformer.fromHandlers].
internal delegate void _TransformDoneHandler<T>(EventSink<T> sink);
}
internal _HandlerEventSink(
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError,
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) {
this._handleData = _handleData;
this._handleError = _handleError;
this._handleDone = _handleDone;
this._sink = _sink;
if (_sink == null) {
throw new Exception("The provided sink must not be null.");
}
}
class _HandlerEventSink<S, T> : EventSink<S> where T : class {
readonly _stream._TransformDataHandler<S, T> _handleData;
readonly _stream._TransformErrorHandler<T> _handleError;
readonly _stream._TransformDoneHandler<T> _handleDone;
bool _isClosed {
get { return _sink == null; }
}
/// The output sink where the handlers should send their data into.
EventSink<T> _sink;
public override void add(S data) {
if (_isClosed) {
throw new Exception("Sink is closed");
}
internal _HandlerEventSink(
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError,
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) {
this._handleData = _handleData;
this._handleError = _handleError;
this._handleDone = _handleDone;
this._sink = _sink;
if (_sink == null) {
throw new Exception("The provided sink must not be null.");
if (_handleData != null) {
_handleData(data, _sink);
}
else {
_sink.add(data as T);
}
}
bool _isClosed {
get { return _sink == null; }
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (_isClosed) {
throw new Exception("Sink is closed");
}
public override void add(S data) {
if (_isClosed) {
throw new Exception("Sink is closed");
if (_handleError != null) {
stackTrace ??= AsyncError.defaultStackTrace(error);
_handleError(error, stackTrace, _sink);
}
else {
_sink.addError(error, stackTrace);
}
if (_handleData != null) {
_handleData(data, _sink);
}
else {
_sink.add(data as T);
public override Future close() {
if (_isClosed) return Future._nullFuture;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(sink);
}
else {
sink.close();
}
return Future._nullFuture;
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (_isClosed) {
throw new Exception("Sink is closed");
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class {
internal _StreamHandlerTransformer(
_stream._TransformDataHandler<S, T> handleData = null,
_stream._TransformErrorHandler<T> handleError = null,
_stream._TransformDoneHandler<T> handleDone = null)
: base((EventSink<T> outputSink) => {
return new _HandlerEventSink<S, T>(
handleData, handleError, handleDone, outputSink);
}) {
if (_handleError != null) {
stackTrace ??= AsyncError.defaultStackTrace(error);
_handleError(error, stackTrace, _sink);
}
else {
_sink.addError(error, stackTrace);
Stream<T> bind(Stream<S> stream) {
return base.bind(stream);
public override void close() {
if (_isClosed) return;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(sink);
}
else {
sink.close();
}
}
}
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
readonly Func<Stream<S>, Stream<T>> _bind;
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class {
_StreamHandlerTransformer(
_stream._TransformDataHandler<S, T> handleData = null,
_stream._TransformErrorHandler<T> handleError = null,
_stream._TransformDoneHandler<T> handleDone = null)
: base((EventSink<T> outputSink) => {
return new _HandlerEventSink<S, T>(
handleData, handleError, handleDone, outputSink);
}) {
}
internal _StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) {
this._bind = _bind;
}
Stream<T> bind(Stream<S> stream) {
return base.bind(stream);
public override Stream<T> bind(Stream<S> stream) => _bind(stream);
}
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> {
readonly Func<Stream<S>, Stream<T>> _bind;
public partial class _async {
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
_StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) {
this._bind = _bind;
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError);
public override Stream<T> bind(Stream<S> stream) => _bind(stream);
}
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
readonly _async._SubscriptionTransformer<S, T> _onListen;
public partial class _async {
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError);
}
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SubscriptionTransformer<S, T> _onListen;
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) {
this._onListen = _onListen;
}
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) {
this._onListen = _onListen;
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSubscriptionStream<S, T>(stream, _onListen);
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSubscriptionStream<S, T>(stream, _onListen);
}
class _BoundSubscriptionStream<S, T> : Stream<T> {
_BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
this._stream = _stream;
this._onListen = _onListen;
}
readonly _async._SubscriptionTransformer<S, T> _onListen;
readonly Stream<S> _stream;
class _BoundSubscriptionStream<S, T> : Stream<T> {
readonly _async._SubscriptionTransformer<S, T> _onListen;
readonly Stream<S> _stream;
bool isBroadcast {
get { return _stream.isBroadcast; }
}
bool isBroadcast {
get { return _stream.isBroadcast; }
}
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
this._stream = _stream;
this._onListen = _onListen;
}
StreamSubscription<T> listen(Action<T> onData,
Action onError = null, Action onDone = null, bool cancelOnError = false) {
//cancelOnError = cancelOnError;
StreamSubscription<T> result = _onListen(_stream, cancelOnError);
result.onData(onData);
result.onError(onError);
result.onDone(onDone);
return result;
public override StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
//cancelOnError = cancelOnError;
StreamSubscription<T> result = _onListen(_stream, cancelOnError);
result.onData(onData);
result.onError(onError);
result.onDone(onDone);
return result;
}
}
}

8
com.unity.uiwidgets/Runtime/widgets/async.cs


using Unity.UIWidgets.widgets;
public class Stream<T> {
}
// public class Stream<T> {
// }
public class StreamSubscription<T> {
}
// public class StreamSubscription<T> {
// }
public abstract class StreamBuilderBase<T, S> : StatefulWidget {
public StreamBuilderBase(Key key = null, Stream<T> stream = null) : base(key: key) {

7
com.unity.uiwidgets/Runtime/async/async.cs


using System;
namespace Unity.UIWidgets.async {
public static partial class _async {
public static object _nonNullError(object error) => error ?? new NullReferenceException();
}
}

3
com.unity.uiwidgets/Runtime/async/async.cs.meta


fileFormatVersion: 2
guid: 2682c09c8f8c447cbd5a400f4db9852a
timeCreated: 1629171016

65
com.unity.uiwidgets/Runtime/async/stopwatch.cs


using System;
namespace Unity.UIWidgets.core {
public class Stopwatch {
static int _frequency;
// The _start and _stop fields capture the time when [start] and [stop]
// are called respectively.
// If _stop is null, the stopwatch is running.
int? _start = 0;
int? _stop = 0;
public Stopwatch() {
if (_frequency == null) _initTicker();
}
public int frequency {
get { return _frequency; }
}
public void start() {
if (_stop != null) {
// (Re)start this stopwatch.
// Don't count the time while the stopwatch has been stopped.
_start += _now() - _stop;
_stop = null;
}
}
public void stop() {
_stop ??= _now();
}
public void reset() {
_start = _stop ?? _now();
}
public int? elapsedTicks {
get { return (_stop ?? _now()) - _start; }
}
public TimeSpan elapsed {
get { return TimeSpan.FromMilliseconds(elapsedMicroseconds); }
}
// This is external, we might need to reimplement it
int elapsedMicroseconds { get; }
// This is external, we might need to reimplement it
int elapsedMilliseconds { get; }
bool isRunning {
get { return _stop == null; }
}
// This is external, we might need to reimplement it
static void _initTicker() {
}
// This is external, we might need to reimplement it
static int _now() {
return DateTime.Now.Millisecond;
}
}
}

3
com.unity.uiwidgets/Runtime/async/stopwatch.cs.meta


fileFormatVersion: 2
guid: 63c86df370684414b7b148702eacd440
timeCreated: 1629184886

664
com.unity.uiwidgets/Runtime/async/stream_controller.cs


using System;
using System.Diagnostics;
using Unity.UIWidgets.async;
using Unity.UIWidgets.foundation;
using UnityEditor.PackageManager;
namespace Unity.UIWidgets.async {
static partial class _stream {
public delegate void ControllerCallback();
public delegate Future ControllerCancelCallback();
public delegate void _NotificationHandler();
public static void _runGuarded(_NotificationHandler notificationHandler) {
if (notificationHandler == null) return;
try {
notificationHandler();
} catch (Exception e) {
Zone.current.handleUncaughtError(e);
}
}
}
public abstract class StreamController<T> : StreamSink<T> {
/** The stream that this controller is controlling. */
internal Stream<T> stream { get; }
public StreamController(
Action onListen = null,
Action onPause = null,
Action onResume = null,
Action onCancel = null,
bool sync = false) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
public static StreamController<T> create(
Action onListen = null,
Action onPause = null,
Action onResume = null,
Action onCancel = null,
bool sync = false) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
public static StreamController<T> broadcast(
Action onListen = null, Action onCancel = null, bool sync = false) {
return sync
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
public _stream.ControllerCallback onListen {
get;
set;
}
// void onListen(void onListenHandler());
public _stream.ControllerCallback onPause { get; set; }
// void set onPause(void onPauseHandler());
public _stream.ControllerCallback onResume { get; set; }
// void set onResume(void onResumeHandler());
public _stream.ControllerCancelCallback onCancel { get; set; }
// void set onCancel(onCancelHandler());
StreamSink<T> sink { get; }
public bool isClosed{get;}
public bool isPaused{get;}
/** Whether there is a subscriber on the [Stream]. */
public bool hasListener{get;}
// public abstract void add(T evt);
//
// public abstract void addError(object error, string stackTrace);
public abstract Future closeConsumer();
public abstract Future addStream(Stream<T> source, bool cancelOnError = false);
}
public interface SynchronousStreamController<T> {//: StreamController<T> {
// public abstract void add(T data);
// public abstract void addError(object error, string stackTrace);
// public abstract Future close();
}
interface _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(
Action<T> onData, Action onError, Action onDone, bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription);
void _recordResume(StreamSubscription<T> subscription);
Future _recordCancel(StreamSubscription<T> subscription);
}
//
// // Base type for implementations of stream controllers.
abstract class _StreamControllerBase<T>
:
StreamController<T>,
_StreamControllerLifecycle<T>,
_EventSink<T>,
_EventDispatch<T> {
public StreamSubscription<T> _subscribe(Action<T> onData, Action onError, Action onDone, bool cancelOnError) {
throw new NotImplementedException();
}
public void _recordPause(StreamSubscription<T> subscription) {}
public void _recordResume(StreamSubscription<T> subscription) {}
public Future _recordCancel(StreamSubscription<T> subscription) => null;
public abstract void _add(T data);
public abstract void _addError(object error, string stackTrace);
public abstract void _close();
public abstract void _sendData(T data);
public abstract void _sendError(object error, string stackTrace);
public abstract void _sendDone();
}
abstract class _StreamController<T> : _StreamControllerBase<T> {
/** The controller is in its initial state with no subscription. */
const int _STATE_INITIAL = 0;
const int _STATE_SUBSCRIBED = 1;
/** The subscription is canceled. */
const int _STATE_CANCELED = 2;
/** Mask for the subscription state. */
const int _STATE_SUBSCRIPTION_MASK = 3;
// The following state relate to the controller, not the subscription.
// If closed, adding more events is not allowed.
// If executing an [addStream], new events are not allowed either, but will
// be added by the stream.
const int _STATE_CLOSED = 4;
const int _STATE_ADDSTREAM = 8;
// @pragma("vm:entry-point")
object _varData;
/** Current state of the controller. */
// @pragma("vm:entry-point")
int _state = _STATE_INITIAL;
// TODO(lrn): Could this be stored in the varData field too, if it's not
// accessed until the call to "close"? Then we need to special case if it's
// accessed earlier, or if close is called before subscribing.
_Future _doneFuture;
_stream.ControllerCallback onListen;
_stream.ControllerCallback onPause;
_stream.ControllerCallback onResume;
_stream.ControllerCancelCallback onCancel;
internal _StreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) {
this.onListen = onListen;
this.onPause = onPause;
this.onResume = onResume;
this.onCancel = onCancel;
}
// Return a new stream every time. The streams are equal, but not identical.
public Stream<T> stream{get => new _ControllerStream<T>(this);}
StreamSink<T> sink{get => new _StreamSinkWrapper<T>(this);}
bool _isCanceled{get => (_state & _STATE_CANCELED) != 0;}
/** Whether there is an active listener. */
bool hasListener{get => (_state & _STATE_SUBSCRIBED) != 0;}
/** Whether there has not been a listener yet. */
bool _isInitialState {
get =>
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
}
bool isClosed{get => (_state & _STATE_CLOSED) != 0;}
bool isPaused {
get =>
hasListener ? _subscription._isInputPaused : !_isCanceled;
}
bool _isAddingStream{get => (_state & _STATE_ADDSTREAM) != 0;}
/** New events may not be added after close, or during addStream. */
bool _mayAddEvent{get => (_state < _STATE_CLOSED);}
// Returns the pending events.
// Pending events are events added before a subscription exists.
// They are added to the subscription when it is created.
// Pending events, if any, are kept in the _varData field until the
// stream is listened to.
// While adding a stream, pending events are moved into the
// state object to allow the state object to use the _varData field.
_PendingEvents<T> _pendingEvents {
get {
D.assert(_isInitialState);
if (!_isAddingStream) {
return (_PendingEvents<T>) _varData;
}
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
return (_PendingEvents<T>) state.varData;
}
}
// Returns the pending events, and creates the object if necessary.
_StreamImplEvents<T> _ensurePendingEvents() {
D.assert(_isInitialState);
if (!_isAddingStream) {
_varData ??= new _StreamImplEvents<T>();
return (_StreamImplEvents<T>) _varData;
}
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
if (state.varData == null) state.varData = new _StreamImplEvents<T>();
return (_StreamImplEvents<T>) state.varData;
}
// Get the current subscription.
// If we are adding a stream, the subscription is moved into the state
// object to allow the state object to use the _varData field.
protected _ControllerSubscription<T> _subscription {
get {
D.assert(hasListener);
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
return addState.varData;
}
return _varData;
}
}
Exception _badEventState() {
if (isClosed) {
return new Exception("Cannot add event after closing");
}
D.assert(_isAddingStream);
return new Exception("Cannot add event while adding a stream");
}
// StreamSink interface.
Future addStream(Stream<T> source, bool? cancelOnError = false) {
if (!_mayAddEvent) throw _badEventState();
if (_isCanceled) return _Future.immediate(FutureOr.nil);
_StreamControllerAddStreamState<T> addState =
new _StreamControllerAddStreamState<T>(
this, _varData, source, cancelOnError ?? false);
_varData = addState;
_state |= _STATE_ADDSTREAM;
return addState.addStreamFuture;
}
Future done {
get { return _ensureDoneFuture(); }
}
Future _ensureDoneFuture() {
_doneFuture ??= _isCanceled ? Future._nullFuture : new _Future();
return _doneFuture;
}
public override void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (!_mayAddEvent) throw _badEventState();
error = _async._nonNullError(error);
AsyncError replacement = Zone.current.errorCallback((Exception) error);
if (replacement != null) {
error = _async._nonNullError(replacement);
// stackTrace = replacement.stackTrace;
}
stackTrace ??= AsyncError.defaultStackTrace(error);
_addError(error, stackTrace);
}
public override Future closeConsumer() {
if (isClosed) {
return _ensureDoneFuture();
}
if (!_mayAddEvent) throw _badEventState();
_closeUnchecked();
return _ensureDoneFuture();
}
internal void _closeUnchecked() {
_state |= _STATE_CLOSED;
if (hasListener) {
_sendDone();
} else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedDone());
}
}
// EventSink interface. Used by the [addStream] events.
// Add data event, used both by the [addStream] events and by [add].
void _add(T value) {
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedData<object>(value));
}
}
void _addError(object error, string stackTrace) {
if (hasListener) {
_sendError(error, stackTrace);
} else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedError((Exception) error, stackTrace));
}
}
void _close() {
// End of addStream stream.
D.assert(_isAddingStream);
_StreamControllerAddStreamState<T> addState = _varData;
_varData = addState.varData;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
}
// _StreamControllerLifeCycle interface
StreamSubscription<T> _subscribe(void onData(T data), Function onError,
void onDone(), bool cancelOnError) {
if (!_isInitialState) {
throw new StateError("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_PendingEvents<T> pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData;
addState.varData = subscription;
addState.resume();
} else {
_varData = subscription;
}
subscription._setPendingEvents(pendingEvents);
subscription._guardCallback(() {
_runGuarded(onListen);
});
return subscription;
}
Future _recordCancel(StreamSubscription<T> subscription) {
// When we cancel, we first cancel any stream being added,
// Then we call `onCancel`, and finally the _doneFuture is completed.
// If either of addStream's cancel or `onCancel` returns a future,
// we wait for it before continuing.
// Any error during this process ends up in the returned future.
// If more errors happen, we act as if it happens inside nested try/finallys
// or whenComplete calls, and only the last error ends up in the
// returned future.
Future result;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData;
result = addState.cancel();
}
_varData = null;
_state =
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
if (onCancel != null) {
if (result == null) {
// Only introduce a future if one is needed.
// If _onCancel returns null, no future is needed.
try {
result = onCancel();
} catch (e, s) {
// Return the error in the returned future.
// Complete it asynchronously, so there is time for a listener
// to handle the error.
result = new _Future().._asyncCompleteError(e, s);
}
} else {
// Simpler case when we already know that we will return a future.
result = result.whenComplete(onCancel);
}
}
void complete() {
if (_doneFuture != null && _doneFuture._mayComplete) {
_doneFuture._asyncComplete(null);
}
}
if (result != null) {
result = result.whenComplete(complete);
} else {
complete();
}
return result;
}
void _recordPause(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData;
addState.pause();
}
_runGuarded(onPause);
}
void _recordResume(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = _varData;
addState.resume();
}
_runGuarded(onResume);
}
}
//
abstract class _SyncStreamControllerDispatch<T>
: _StreamController<T>, SynchronousStreamController<T> {
int _state { get; set; }
void _sendData(T data) {
_subscription._add(data);
}
void _sendError(object error, string stackTrace) {
_subscription._addError(error, stackTrace);
}
void _sendDone() {
_subscription._close();
}
protected _SyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel)
{
}
}
abstract class _AsyncStreamControllerDispatch<T>
: _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(new _DelayedData<T>(data));
}
void _sendError(object error, string stackTrace) {
_subscription._addPending(new _DelayedError((Exception) error, stackTrace));
}
void _sendDone() {
_subscription._addPending(new _DelayedDone());
}
protected _AsyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel)
{
}
}
// TODO(lrn): Use common superclass for callback-controllers when VM supports
// constructors in mixin superclasses.
class _AsyncStreamController<T> : _AsyncStreamControllerDispatch<T> {
// public override void close() {
// throw new NotImplementedException();
// }
}
class _SyncStreamController<T> : _SyncStreamControllerDispatch<T> {
}
class _ControllerStream<T> : _StreamImpl<T>, IEquatable<_ControllerStream<T>> {
_StreamControllerLifecycle<T> _controller;
internal _ControllerStream( _StreamControllerLifecycle<T> _controller) {
this._controller = _controller;
}
StreamSubscription<T> _createSubscription(void onData(T data),
Function onError, void onDone(), bool cancelOnError) =>
_controller._subscribe(onData, onError, onDone, cancelOnError);
// Override == and hashCode so that new streams returned by the same
// controller are considered equal. The controller returns a new stream
// each time it's queried, but doesn't have to cache the result.
// int hashCode {
// get { return _controller.GetHashCode() ^ 0x35323532; }
// }
// bool operator ==(object other) {
// if (identical(this, other)) return true;
// return other is _ControllerStream &&
// identical(other._controller, this._controller);
// }
public bool Equals(_ControllerStream<T> other)
{
if (ReferenceEquals(null, other)) {
return false;
}
if (ReferenceEquals(this, other)) {
return true;
}
return Equals(_controller, other._controller);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) {
return false;
}
if (ReferenceEquals(this, obj)) {
return true;
}
if (obj.GetType() != GetType()) {
return false;
}
return Equals((_ControllerStream<T>) obj);
}
public override int GetHashCode()
{
return _controller.GetHashCode() ^ 0x35323532;
}
}
//
// class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
// final _StreamControllerLifecycle<T> _controller;
//
// _ControllerSubscription(this._controller, void onData(T data),
// Function onError, void onDone(), bool cancelOnError)
// : super(onData, onError, onDone, cancelOnError);
//
// Future _onCancel() {
// return _controller._recordCancel(this);
// }
//
// void _onPause() {
// _controller._recordPause(this);
// }
//
// void _onResume() {
// _controller._recordResume(this);
// }
// }
//
// /** A class that exposes only the [StreamSink] interface of an object. */
// class _StreamSinkWrapper<T> implements StreamSink<T> {
// final StreamController _target;
// _StreamSinkWrapper(this._target);
// void add(T data) {
// _target.add(data);
// }
//
// void addError(object error, [string stackTrace]) {
// _target.addError(error, stackTrace);
// }
//
// Future close() => _target.close();
//
// Future addStream(Stream<T> source) => _target.addStream(source);
//
// Future get done => _target.done;
// }
//
class _AddStreamState<T> {
// [_Future] returned by call to addStream.
internal readonly _Future addStreamFuture;
// Subscription on stream argument to addStream.
internal readonly StreamSubscription<T> addSubscription;
internal _AddStreamState(
_EventSink<T> controller, Stream<T> source, bool cancelOnError) {
addStreamFuture = new _Future();
addSubscription = source.listen(controller._add,
onError: cancelOnError
? makeErrorHandler(controller)
: controller._addError,
onDone: controller._close,
cancelOnError: cancelOnError);
}
public static Action<object, string> makeErrorHandler(_EventSink<T> controller) {
return (object e, string s) => {
controller._addError(e, s);
controller._close();
};
}
void pause() {
addSubscription.pause();
}
void resume() {
addSubscription.resume();
}
Future cancel() {
var cancel = addSubscription.cancel();
if (cancel == null) {
addStreamFuture._asyncComplete(FutureOr.nil);
return null;
}
return cancel.whenComplete(() => {
addStreamFuture._asyncComplete(FutureOr.nil);
});
}
void complete() {
addStreamFuture._asyncComplete(FutureOr.nil);
}
}
class _StreamControllerAddStreamState<T> : _AddStreamState<T> {
// The subscription or pending data of a _StreamController.
// Stored here because we reuse the `_varData` field in the _StreamController
// to store this state object.
public object varData;
internal _StreamControllerAddStreamState(_StreamController<T> controller, object varData,
Stream<T> source, bool cancelOnError)
: base(controller, source, cancelOnError) {
if (controller.isPaused) {
addSubscription.pause();
}
}
}
}

3
com.unity.uiwidgets/Runtime/async/stream_controller.cs.meta


fileFormatVersion: 2
guid: 6a5b6c88a4f44674921771bc51facbfc
timeCreated: 1628753099
正在加载...
取消
保存