siyao
3 年前
当前提交
433d5a95
共有 12 个文件被更改,包括 2046 次插入 和 1153 次删除
-
2com.unity.uiwidgets/Runtime/async/async_cast.cs
-
4com.unity.uiwidgets/Runtime/async/sink.cs
-
1001com.unity.uiwidgets/Runtime/async/stream.cs
-
994com.unity.uiwidgets/Runtime/async/stream_impl.cs
-
445com.unity.uiwidgets/Runtime/async/stream_transformers.cs
-
8com.unity.uiwidgets/Runtime/widgets/async.cs
-
7com.unity.uiwidgets/Runtime/async/async.cs
-
3com.unity.uiwidgets/Runtime/async/async.cs.meta
-
65com.unity.uiwidgets/Runtime/async/stopwatch.cs
-
3com.unity.uiwidgets/Runtime/async/stopwatch.cs.meta
-
664com.unity.uiwidgets/Runtime/async/stream_controller.cs
-
3com.unity.uiwidgets/Runtime/async/stream_controller.cs.meta
|
|||
using Unity.UIWidgets.async; |
|||
|
|||
public abstract void close(); |
|||
public abstract Future close(); |
|||
} |
|||
} |
1001
com.unity.uiwidgets/Runtime/async/stream.cs
文件差异内容过多而无法显示
查看文件
文件差异内容过多而无法显示
查看文件
994
com.unity.uiwidgets/Runtime/async/stream_impl.cs
文件差异内容过多而无法显示
查看文件
文件差异内容过多而无法显示
查看文件
|
|||
using System; |
|||
using Unity.UIWidgets.async; |
|||
|
|||
class _EventSinkWrapper<T> : EventSink<T> { |
|||
_EventSink<object> _sink; |
|||
namespace Unity.UIWidgets.async { |
|||
class _EventSinkWrapper<T> : EventSink<T> { |
|||
_EventSink<object> _sink; |
|||
internal _EventSinkWrapper(_EventSink<object> _sink) { |
|||
this._sink = _sink; |
|||
} |
|||
internal _EventSinkWrapper(_EventSink<object> _sink) { |
|||
this._sink = _sink; |
|||
} |
|||
public override void add(T data) { |
|||
_sink._add(data); |
|||
} |
|||
public override void add(T data) { |
|||
_sink._add(data); |
|||
} |
|||
public override void addError(object error, string stackTrace) { |
|||
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)); |
|||
} |
|||
public override void addError(object error, string stackTrace) { |
|||
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)); |
|||
} |
|||
public override void close() { |
|||
_sink._close(); |
|||
public override Future close() { |
|||
_sink._close(); |
|||
return Future._nullFuture; |
|||
} |
|||
} |
|||
class _SinkTransformerStreamSubscription<S, T> |
|||
: _BufferingStreamSubscription<T> { |
|||
/// The transformer's input sink.
|
|||
EventSink<S> _transformerSink; |
|||
class _SinkTransformerStreamSubscription<S, T> |
|||
: _BufferingStreamSubscription<T> { |
|||
/// The transformer's input sink.
|
|||
EventSink<S> _transformerSink; |
|||
/// The subscription to the input stream.
|
|||
StreamSubscription<S> _subscription; |
|||
/// The subscription to the input stream.
|
|||
StreamSubscription<S> _subscription; |
|||
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper, |
|||
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) |
|||
// We set the adapter's target only when the user is allowed to send data.
|
|||
: base(onData, onError, onDone, cancelOnError) { |
|||
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this); |
|||
_transformerSink = mapper(eventSink); |
|||
_subscription = |
|||
source.listen(_handleData, onError: _handleError, onDone: _handleDone); |
|||
} |
|||
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper, |
|||
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) |
|||
// We set the adapter's target only when the user is allowed to send data.
|
|||
: base(onData, onError, onDone, cancelOnError) { |
|||
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this); |
|||
_transformerSink = mapper(eventSink); |
|||
_subscription = |
|||
source.listen(_handleData, onError: _handleError, onDone: _handleDone); |
|||
} |
|||
/** Whether this subscription is still subscribed to its source. */ |
|||
bool _isSubscribed { |
|||
get { return _subscription != null; } |
|||
} |
|||
/** Whether this subscription is still subscribed to its source. */ |
|||
bool _isSubscribed { |
|||
get { return _subscription != null; } |
|||
} |
|||
// _EventSink interface.
|
|||
// _EventSink interface.
|
|||
void _add(T data) { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
void _add(T data) { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
} |
|||
|
|||
base._add(data); |
|||
base._add(data); |
|||
} |
|||
void _addError(object error, string stackTrace) { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
} |
|||
void _addError(Object error, string stackTrace) { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
base._addError(error, stackTrace); |
|||
base._addError(error, stackTrace); |
|||
} |
|||
void _close() { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
} |
|||
void _close() { |
|||
if (_isClosed) { |
|||
throw new Exception("Stream is already closed"); |
|||
base._close(); |
|||
base._close(); |
|||
} |
|||
// _BufferingStreamSubscription hooks.
|
|||
// _BufferingStreamSubscription hooks.
|
|||
void _onPause() { |
|||
if (_isSubscribed) _subscription.pause(); |
|||
} |
|||
void _onPause() { |
|||
if (_isSubscribed) _subscription.pause(); |
|||
} |
|||
void _onResume() { |
|||
if (_isSubscribed) _subscription.resume(); |
|||
} |
|||
void _onResume() { |
|||
if (_isSubscribed) _subscription.resume(); |
|||
} |
|||
Future _onCancel() { |
|||
if (_isSubscribed) { |
|||
StreamSubscription<S> subscription = _subscription; |
|||
_subscription = null; |
|||
return subscription.cancel(); |
|||
} |
|||
Future _onCancel() { |
|||
if (_isSubscribed) { |
|||
StreamSubscription<S> subscription = _subscription; |
|||
_subscription = null; |
|||
return subscription.cancel(); |
|||
return null; |
|||
return null; |
|||
} |
|||
|
|||
void _handleData(S data) { |
|||
try { |
|||
_transformerSink.add(data); |
|||
} |
|||
catch (Exception e) { |
|||
_addError(e, e.StackTrace); |
|||
void _handleData(S data) { |
|||
try { |
|||
_transformerSink.add(data); |
|||
} |
|||
catch (Exception e) { |
|||
_addError(e, e.StackTrace); |
|||
} |
|||
} |
|||
void _handleError(object error, string stackTrace) { |
|||
try { |
|||
_transformerSink.addError(error, stackTrace); |
|||
void _handleError(object error, string stackTrace) { |
|||
try { |
|||
_transformerSink.addError(error, stackTrace); |
|||
} |
|||
catch (Exception e) { |
|||
if (Equals(e, error)) { |
|||
_addError(error, stackTrace); |
|||
} |
|||
else { |
|||
_addError(e, e.StackTrace); |
|||
} |
|||
} |
|||
catch (Exception e) { |
|||
if (Equals(e, error)) { |
|||
_addError(error, stackTrace); |
|||
|
|||
void _handleDone() { |
|||
try { |
|||
_subscription = null; |
|||
_transformerSink.close(); |
|||
else { |
|||
catch (Exception e) { |
|||
void _handleDone() { |
|||
try { |
|||
_subscription = null; |
|||
_transformerSink.close(); |
|||
} |
|||
catch (Exception e) { |
|||
_addError(e, e.StackTrace); |
|||
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> where T : class { |
|||
readonly _async._SinkMapper<S, T> _sinkMapper; |
|||
|
|||
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) { |
|||
this._sinkMapper = _sinkMapper; |
|||
|
|||
public override Stream<T> bind(Stream<S> stream) => |
|||
new _BoundSinkStream<S, T>(stream, _sinkMapper); |
|||
} |
|||
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> { |
|||
readonly _async._SinkMapper<S, T> _sinkMapper; |
|||
class _BoundSinkStream<S, T> : Stream<T> { |
|||
readonly _async._SinkMapper<S, T> _sinkMapper; |
|||
readonly Stream<S> _stream; |
|||
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) { |
|||
this._sinkMapper = _sinkMapper; |
|||
} |
|||
bool isBroadcast { |
|||
get { return _stream.isBroadcast; } |
|||
} |
|||
public override Stream<T> bind(Stream<S> stream) => |
|||
new _BoundSinkStream<S, T>(stream, _sinkMapper); |
|||
} |
|||
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) { |
|||
this._stream = _stream; |
|||
this._sinkMapper = _sinkMapper; |
|||
} |
|||
class _BoundSinkStream<S, T> : Stream<T> { |
|||
readonly _async._SinkMapper<S, T> _sinkMapper; |
|||
readonly Stream<S> _stream; |
|||
public override StreamSubscription<T> listen(Action<T> onData, |
|||
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) { |
|||
StreamSubscription<T> subscription = |
|||
new _SinkTransformerStreamSubscription<S, T>( |
|||
_stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
|||
return subscription; |
|||
} |
|||
} |
|||
bool isBroadcast { |
|||
get { return _stream.isBroadcast; } |
|||
} |
|||
static partial class _stream { |
|||
public delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
|||
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) { |
|||
this._stream = _stream; |
|||
this._sinkMapper = _sinkMapper; |
|||
} |
|||
/// Error-handler coming from [StreamTransformer.fromHandlers].
|
|||
public delegate void _TransformErrorHandler<T>( |
|||
object error, string stackTrace, EventSink<T> sink); |
|||
StreamSubscription<T> listen(Action<T> onData, |
|||
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) { |
|||
StreamSubscription<T> subscription = |
|||
new _SinkTransformerStreamSubscription<S, T>( |
|||
_stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
|||
return subscription; |
|||
/// Done-handler coming from [StreamTransformer.fromHandlers].
|
|||
public delegate void _TransformDoneHandler<T>(EventSink<T> sink); |
|||
} |
|||
static partial class _stream { |
|||
internal delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
|||
class _HandlerEventSink<S, T> : EventSink<S> where T : class { |
|||
readonly _stream._TransformDataHandler<S, T> _handleData; |
|||
readonly _stream._TransformErrorHandler<T> _handleError; |
|||
readonly _stream._TransformDoneHandler<T> _handleDone; |
|||
/// Error-handler coming from [StreamTransformer.fromHandlers].
|
|||
internal delegate void _TransformErrorHandler<T>( |
|||
object error, string stackTrace, EventSink<T> sink); |
|||
/// The output sink where the handlers should send their data into.
|
|||
EventSink<T> _sink; |
|||
/// Done-handler coming from [StreamTransformer.fromHandlers].
|
|||
internal delegate void _TransformDoneHandler<T>(EventSink<T> sink); |
|||
} |
|||
internal _HandlerEventSink( |
|||
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError, |
|||
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) { |
|||
this._handleData = _handleData; |
|||
this._handleError = _handleError; |
|||
this._handleDone = _handleDone; |
|||
this._sink = _sink; |
|||
if (_sink == null) { |
|||
throw new Exception("The provided sink must not be null."); |
|||
} |
|||
} |
|||
class _HandlerEventSink<S, T> : EventSink<S> where T : class { |
|||
readonly _stream._TransformDataHandler<S, T> _handleData; |
|||
readonly _stream._TransformErrorHandler<T> _handleError; |
|||
readonly _stream._TransformDoneHandler<T> _handleDone; |
|||
bool _isClosed { |
|||
get { return _sink == null; } |
|||
} |
|||
/// The output sink where the handlers should send their data into.
|
|||
EventSink<T> _sink; |
|||
public override void add(S data) { |
|||
if (_isClosed) { |
|||
throw new Exception("Sink is closed"); |
|||
} |
|||
internal _HandlerEventSink( |
|||
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError, |
|||
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) { |
|||
this._handleData = _handleData; |
|||
this._handleError = _handleError; |
|||
this._handleDone = _handleDone; |
|||
this._sink = _sink; |
|||
if (_sink == null) { |
|||
throw new Exception("The provided sink must not be null."); |
|||
if (_handleData != null) { |
|||
_handleData(data, _sink); |
|||
} |
|||
else { |
|||
_sink.add(data as T); |
|||
} |
|||
} |
|||
bool _isClosed { |
|||
get { return _sink == null; } |
|||
} |
|||
public override void addError(object error, string stackTrace) { |
|||
// ArgumentError.checkNotNull(error, "error");
|
|||
if (_isClosed) { |
|||
throw new Exception("Sink is closed"); |
|||
} |
|||
public override void add(S data) { |
|||
if (_isClosed) { |
|||
throw new Exception("Sink is closed"); |
|||
if (_handleError != null) { |
|||
stackTrace ??= AsyncError.defaultStackTrace(error); |
|||
_handleError(error, stackTrace, _sink); |
|||
} |
|||
else { |
|||
_sink.addError(error, stackTrace); |
|||
} |
|||
if (_handleData != null) { |
|||
_handleData(data, _sink); |
|||
} |
|||
else { |
|||
_sink.add(data as T); |
|||
public override Future close() { |
|||
if (_isClosed) return Future._nullFuture; |
|||
var sink = _sink; |
|||
_sink = null; |
|||
if (_handleDone != null) { |
|||
_handleDone(sink); |
|||
} |
|||
else { |
|||
sink.close(); |
|||
} |
|||
return Future._nullFuture; |
|||
public override void addError(object error, string stackTrace) { |
|||
// ArgumentError.checkNotNull(error, "error");
|
|||
if (_isClosed) { |
|||
throw new Exception("Sink is closed"); |
|||
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class { |
|||
internal _StreamHandlerTransformer( |
|||
_stream._TransformDataHandler<S, T> handleData = null, |
|||
_stream._TransformErrorHandler<T> handleError = null, |
|||
_stream._TransformDoneHandler<T> handleDone = null) |
|||
: base((EventSink<T> outputSink) => { |
|||
return new _HandlerEventSink<S, T>( |
|||
handleData, handleError, handleDone, outputSink); |
|||
}) { |
|||
if (_handleError != null) { |
|||
stackTrace ??= AsyncError.defaultStackTrace(error); |
|||
_handleError(error, stackTrace, _sink); |
|||
} |
|||
else { |
|||
_sink.addError(error, stackTrace); |
|||
Stream<T> bind(Stream<S> stream) { |
|||
return base.bind(stream); |
|||
public override void close() { |
|||
if (_isClosed) return; |
|||
var sink = _sink; |
|||
_sink = null; |
|||
if (_handleDone != null) { |
|||
_handleDone(sink); |
|||
} |
|||
else { |
|||
sink.close(); |
|||
} |
|||
} |
|||
} |
|||
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> where T : class { |
|||
readonly Func<Stream<S>, Stream<T>> _bind; |
|||
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class { |
|||
_StreamHandlerTransformer( |
|||
_stream._TransformDataHandler<S, T> handleData = null, |
|||
_stream._TransformErrorHandler<T> handleError = null, |
|||
_stream._TransformDoneHandler<T> handleDone = null) |
|||
: base((EventSink<T> outputSink) => { |
|||
return new _HandlerEventSink<S, T>( |
|||
handleData, handleError, handleDone, outputSink); |
|||
}) { |
|||
} |
|||
internal _StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) { |
|||
this._bind = _bind; |
|||
} |
|||
Stream<T> bind(Stream<S> stream) { |
|||
return base.bind(stream); |
|||
public override Stream<T> bind(Stream<S> stream) => _bind(stream); |
|||
} |
|||
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> { |
|||
readonly Func<Stream<S>, Stream<T>> _bind; |
|||
public partial class _async { |
|||
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output); |
|||
_StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) { |
|||
this._bind = _bind; |
|||
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError); |
|||
public override Stream<T> bind(Stream<S> stream) => _bind(stream); |
|||
} |
|||
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> where T : class { |
|||
readonly _async._SubscriptionTransformer<S, T> _onListen; |
|||
public partial class _async { |
|||
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output); |
|||
|
|||
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError); |
|||
} |
|||
|
|||
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> { |
|||
readonly _async._SubscriptionTransformer<S, T> _onListen; |
|||
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) { |
|||
this._onListen = _onListen; |
|||
} |
|||
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) { |
|||
this._onListen = _onListen; |
|||
public override Stream<T> bind(Stream<S> stream) => |
|||
new _BoundSubscriptionStream<S, T>(stream, _onListen); |
|||
public override Stream<T> bind(Stream<S> stream) => |
|||
new _BoundSubscriptionStream<S, T>(stream, _onListen); |
|||
} |
|||
class _BoundSubscriptionStream<S, T> : Stream<T> { |
|||
|
|||
_BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) { |
|||
this._stream = _stream; |
|||
this._onListen = _onListen; |
|||
} |
|||
|
|||
readonly _async._SubscriptionTransformer<S, T> _onListen; |
|||
readonly Stream<S> _stream; |
|||
class _BoundSubscriptionStream<S, T> : Stream<T> { |
|||
readonly _async._SubscriptionTransformer<S, T> _onListen; |
|||
readonly Stream<S> _stream; |
|||
bool isBroadcast { |
|||
get { return _stream.isBroadcast; } |
|||
} |
|||
bool isBroadcast { |
|||
get { return _stream.isBroadcast; } |
|||
} |
|||
|
|||
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) { |
|||
this._stream = _stream; |
|||
this._onListen = _onListen; |
|||
} |
|||
|
|||
StreamSubscription<T> listen(Action<T> onData, |
|||
Action onError = null, Action onDone = null, bool cancelOnError = false) { |
|||
//cancelOnError = cancelOnError;
|
|||
StreamSubscription<T> result = _onListen(_stream, cancelOnError); |
|||
result.onData(onData); |
|||
result.onError(onError); |
|||
result.onDone(onDone); |
|||
return result; |
|||
public override StreamSubscription<T> listen(Action<T> onData, |
|||
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) { |
|||
//cancelOnError = cancelOnError;
|
|||
StreamSubscription<T> result = _onListen(_stream, cancelOnError); |
|||
result.onData(onData); |
|||
result.onError(onError); |
|||
result.onDone(onDone); |
|||
return result; |
|||
} |
|||
} |
|||
} |
|
|||
using System; |
|||
|
|||
namespace Unity.UIWidgets.async { |
|||
public static partial class _async { |
|||
public static object _nonNullError(object error) => error ?? new NullReferenceException(); |
|||
} |
|||
} |
|
|||
fileFormatVersion: 2 |
|||
guid: 2682c09c8f8c447cbd5a400f4db9852a |
|||
timeCreated: 1629171016 |
|
|||
using System; |
|||
|
|||
namespace Unity.UIWidgets.core { |
|||
public class Stopwatch { |
|||
static int _frequency; |
|||
|
|||
// The _start and _stop fields capture the time when [start] and [stop]
|
|||
// are called respectively.
|
|||
// If _stop is null, the stopwatch is running.
|
|||
int? _start = 0; |
|||
int? _stop = 0; |
|||
|
|||
public Stopwatch() { |
|||
if (_frequency == null) _initTicker(); |
|||
} |
|||
|
|||
public int frequency { |
|||
get { return _frequency; } |
|||
} |
|||
|
|||
public void start() { |
|||
if (_stop != null) { |
|||
// (Re)start this stopwatch.
|
|||
// Don't count the time while the stopwatch has been stopped.
|
|||
_start += _now() - _stop; |
|||
_stop = null; |
|||
} |
|||
} |
|||
|
|||
public void stop() { |
|||
_stop ??= _now(); |
|||
} |
|||
|
|||
public void reset() { |
|||
_start = _stop ?? _now(); |
|||
} |
|||
|
|||
public int? elapsedTicks { |
|||
get { return (_stop ?? _now()) - _start; } |
|||
} |
|||
|
|||
public TimeSpan elapsed { |
|||
get { return TimeSpan.FromMilliseconds(elapsedMicroseconds); } |
|||
} |
|||
|
|||
// This is external, we might need to reimplement it
|
|||
int elapsedMicroseconds { get; } |
|||
|
|||
// This is external, we might need to reimplement it
|
|||
int elapsedMilliseconds { get; } |
|||
|
|||
bool isRunning { |
|||
get { return _stop == null; } |
|||
} |
|||
|
|||
// This is external, we might need to reimplement it
|
|||
static void _initTicker() { |
|||
} |
|||
|
|||
// This is external, we might need to reimplement it
|
|||
static int _now() { |
|||
return DateTime.Now.Millisecond; |
|||
} |
|||
} |
|||
} |
|
|||
fileFormatVersion: 2 |
|||
guid: 63c86df370684414b7b148702eacd440 |
|||
timeCreated: 1629184886 |
|
|||
using System; |
|||
using System.Diagnostics; |
|||
using Unity.UIWidgets.async; |
|||
using Unity.UIWidgets.foundation; |
|||
using UnityEditor.PackageManager; |
|||
|
|||
namespace Unity.UIWidgets.async { |
|||
static partial class _stream { |
|||
public delegate void ControllerCallback(); |
|||
|
|||
public delegate Future ControllerCancelCallback(); |
|||
public delegate void _NotificationHandler(); |
|||
|
|||
|
|||
public static void _runGuarded(_NotificationHandler notificationHandler) { |
|||
if (notificationHandler == null) return; |
|||
try { |
|||
notificationHandler(); |
|||
} catch (Exception e) { |
|||
Zone.current.handleUncaughtError(e); |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
public abstract class StreamController<T> : StreamSink<T> { |
|||
/** The stream that this controller is controlling. */ |
|||
internal Stream<T> stream { get; } |
|||
|
|||
public StreamController( |
|||
Action onListen = null, |
|||
Action onPause = null, |
|||
Action onResume = null, |
|||
Action onCancel = null, |
|||
bool sync = false) { |
|||
return sync |
|||
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
|||
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
|||
} |
|||
|
|||
public static StreamController<T> create( |
|||
Action onListen = null, |
|||
Action onPause = null, |
|||
Action onResume = null, |
|||
Action onCancel = null, |
|||
bool sync = false) { |
|||
return sync |
|||
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
|||
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
|||
} |
|||
|
|||
public static StreamController<T> broadcast( |
|||
Action onListen = null, Action onCancel = null, bool sync = false) { |
|||
return sync |
|||
? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
|||
: new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
|||
} |
|||
|
|||
public _stream.ControllerCallback onListen { |
|||
get; |
|||
set; |
|||
} |
|||
|
|||
// void onListen(void onListenHandler());
|
|||
|
|||
public _stream.ControllerCallback onPause { get; set; } |
|||
|
|||
// void set onPause(void onPauseHandler());
|
|||
|
|||
public _stream.ControllerCallback onResume { get; set; } |
|||
|
|||
// void set onResume(void onResumeHandler());
|
|||
|
|||
public _stream.ControllerCancelCallback onCancel { get; set; } |
|||
|
|||
// void set onCancel(onCancelHandler());
|
|||
|
|||
StreamSink<T> sink { get; } |
|||
|
|||
public bool isClosed{get;} |
|||
|
|||
public bool isPaused{get;} |
|||
|
|||
/** Whether there is a subscriber on the [Stream]. */ |
|||
public bool hasListener{get;} |
|||
|
|||
// public abstract void add(T evt);
|
|||
//
|
|||
// public abstract void addError(object error, string stackTrace);
|
|||
|
|||
public abstract Future closeConsumer(); |
|||
|
|||
public abstract Future addStream(Stream<T> source, bool cancelOnError = false); |
|||
} |
|||
|
|||
public interface SynchronousStreamController<T> {//: StreamController<T> {
|
|||
// public abstract void add(T data);
|
|||
|
|||
// public abstract void addError(object error, string stackTrace);
|
|||
|
|||
// public abstract Future close();
|
|||
} |
|||
|
|||
interface _StreamControllerLifecycle<T> { |
|||
StreamSubscription<T> _subscribe( |
|||
Action<T> onData, Action onError, Action onDone, bool cancelOnError); |
|||
|
|||
void _recordPause(StreamSubscription<T> subscription); |
|||
void _recordResume(StreamSubscription<T> subscription); |
|||
Future _recordCancel(StreamSubscription<T> subscription); |
|||
} |
|||
//
|
|||
// // Base type for implementations of stream controllers.
|
|||
abstract class _StreamControllerBase<T> |
|||
: |
|||
StreamController<T>, |
|||
_StreamControllerLifecycle<T>, |
|||
_EventSink<T>, |
|||
_EventDispatch<T> { |
|||
public StreamSubscription<T> _subscribe(Action<T> onData, Action onError, Action onDone, bool cancelOnError) { |
|||
throw new NotImplementedException(); |
|||
} |
|||
|
|||
public void _recordPause(StreamSubscription<T> subscription) {} |
|||
public void _recordResume(StreamSubscription<T> subscription) {} |
|||
public Future _recordCancel(StreamSubscription<T> subscription) => null; |
|||
|
|||
public abstract void _add(T data); |
|||
|
|||
public abstract void _addError(object error, string stackTrace); |
|||
|
|||
public abstract void _close(); |
|||
|
|||
public abstract void _sendData(T data); |
|||
|
|||
public abstract void _sendError(object error, string stackTrace); |
|||
|
|||
public abstract void _sendDone(); |
|||
} |
|||
|
|||
abstract class _StreamController<T> : _StreamControllerBase<T> { |
|||
|
|||
/** The controller is in its initial state with no subscription. */ |
|||
const int _STATE_INITIAL = 0; |
|||
const int _STATE_SUBSCRIBED = 1; |
|||
/** The subscription is canceled. */ |
|||
const int _STATE_CANCELED = 2; |
|||
/** Mask for the subscription state. */ |
|||
const int _STATE_SUBSCRIPTION_MASK = 3; |
|||
|
|||
// The following state relate to the controller, not the subscription.
|
|||
// If closed, adding more events is not allowed.
|
|||
// If executing an [addStream], new events are not allowed either, but will
|
|||
// be added by the stream.
|
|||
|
|||
const int _STATE_CLOSED = 4; |
|||
const int _STATE_ADDSTREAM = 8; |
|||
|
|||
// @pragma("vm:entry-point")
|
|||
object _varData; |
|||
|
|||
/** Current state of the controller. */ |
|||
// @pragma("vm:entry-point")
|
|||
int _state = _STATE_INITIAL; |
|||
|
|||
// TODO(lrn): Could this be stored in the varData field too, if it's not
|
|||
// accessed until the call to "close"? Then we need to special case if it's
|
|||
// accessed earlier, or if close is called before subscribing.
|
|||
_Future _doneFuture; |
|||
|
|||
_stream.ControllerCallback onListen; |
|||
_stream.ControllerCallback onPause; |
|||
_stream.ControllerCallback onResume; |
|||
_stream.ControllerCancelCallback onCancel; |
|||
|
|||
internal _StreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, |
|||
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) { |
|||
this.onListen = onListen; |
|||
this.onPause = onPause; |
|||
this.onResume = onResume; |
|||
this.onCancel = onCancel; |
|||
} |
|||
|
|||
// Return a new stream every time. The streams are equal, but not identical.
|
|||
public Stream<T> stream{get => new _ControllerStream<T>(this);} |
|||
|
|||
StreamSink<T> sink{get => new _StreamSinkWrapper<T>(this);} |
|||
|
|||
bool _isCanceled{get => (_state & _STATE_CANCELED) != 0;} |
|||
|
|||
/** Whether there is an active listener. */ |
|||
bool hasListener{get => (_state & _STATE_SUBSCRIBED) != 0;} |
|||
|
|||
/** Whether there has not been a listener yet. */ |
|||
bool _isInitialState { |
|||
get => |
|||
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; |
|||
} |
|||
|
|||
bool isClosed{get => (_state & _STATE_CLOSED) != 0;} |
|||
|
|||
bool isPaused { |
|||
get => |
|||
hasListener ? _subscription._isInputPaused : !_isCanceled; |
|||
} |
|||
|
|||
bool _isAddingStream{get => (_state & _STATE_ADDSTREAM) != 0;} |
|||
|
|||
/** New events may not be added after close, or during addStream. */ |
|||
bool _mayAddEvent{get => (_state < _STATE_CLOSED);} |
|||
|
|||
// Returns the pending events.
|
|||
// Pending events are events added before a subscription exists.
|
|||
// They are added to the subscription when it is created.
|
|||
// Pending events, if any, are kept in the _varData field until the
|
|||
// stream is listened to.
|
|||
// While adding a stream, pending events are moved into the
|
|||
// state object to allow the state object to use the _varData field.
|
|||
_PendingEvents<T> _pendingEvents { |
|||
get { |
|||
D.assert(_isInitialState); |
|||
if (!_isAddingStream) { |
|||
return (_PendingEvents<T>) _varData; |
|||
} |
|||
|
|||
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData; |
|||
return (_PendingEvents<T>) state.varData; |
|||
} |
|||
} |
|||
|
|||
// Returns the pending events, and creates the object if necessary.
|
|||
_StreamImplEvents<T> _ensurePendingEvents() { |
|||
D.assert(_isInitialState); |
|||
if (!_isAddingStream) { |
|||
_varData ??= new _StreamImplEvents<T>(); |
|||
return (_StreamImplEvents<T>) _varData; |
|||
} |
|||
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData; |
|||
if (state.varData == null) state.varData = new _StreamImplEvents<T>(); |
|||
return (_StreamImplEvents<T>) state.varData; |
|||
} |
|||
|
|||
// Get the current subscription.
|
|||
// If we are adding a stream, the subscription is moved into the state
|
|||
// object to allow the state object to use the _varData field.
|
|||
protected _ControllerSubscription<T> _subscription { |
|||
get { |
|||
D.assert(hasListener); |
|||
if (_isAddingStream) { |
|||
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData; |
|||
return addState.varData; |
|||
} |
|||
|
|||
return _varData; |
|||
} |
|||
} |
|||
|
|||
Exception _badEventState() { |
|||
if (isClosed) { |
|||
return new Exception("Cannot add event after closing"); |
|||
} |
|||
D.assert(_isAddingStream); |
|||
return new Exception("Cannot add event while adding a stream"); |
|||
} |
|||
|
|||
// StreamSink interface.
|
|||
Future addStream(Stream<T> source, bool? cancelOnError = false) { |
|||
if (!_mayAddEvent) throw _badEventState(); |
|||
if (_isCanceled) return _Future.immediate(FutureOr.nil); |
|||
_StreamControllerAddStreamState<T> addState = |
|||
new _StreamControllerAddStreamState<T>( |
|||
this, _varData, source, cancelOnError ?? false); |
|||
_varData = addState; |
|||
_state |= _STATE_ADDSTREAM; |
|||
return addState.addStreamFuture; |
|||
} |
|||
|
|||
Future done { |
|||
get { return _ensureDoneFuture(); } |
|||
} |
|||
|
|||
Future _ensureDoneFuture() { |
|||
_doneFuture ??= _isCanceled ? Future._nullFuture : new _Future(); |
|||
return _doneFuture; |
|||
} |
|||
|
|||
public override void add(T value) { |
|||
if (!_mayAddEvent) throw _badEventState(); |
|||
_add(value); |
|||
} |
|||
|
|||
public override void addError(object error, string stackTrace) { |
|||
// ArgumentError.checkNotNull(error, "error");
|
|||
if (!_mayAddEvent) throw _badEventState(); |
|||
error = _async._nonNullError(error); |
|||
AsyncError replacement = Zone.current.errorCallback((Exception) error); |
|||
if (replacement != null) { |
|||
error = _async._nonNullError(replacement); |
|||
// stackTrace = replacement.stackTrace;
|
|||
} |
|||
stackTrace ??= AsyncError.defaultStackTrace(error); |
|||
_addError(error, stackTrace); |
|||
} |
|||
|
|||
public override Future closeConsumer() { |
|||
if (isClosed) { |
|||
return _ensureDoneFuture(); |
|||
} |
|||
if (!_mayAddEvent) throw _badEventState(); |
|||
_closeUnchecked(); |
|||
return _ensureDoneFuture(); |
|||
} |
|||
|
|||
internal void _closeUnchecked() { |
|||
_state |= _STATE_CLOSED; |
|||
if (hasListener) { |
|||
_sendDone(); |
|||
} else if (_isInitialState) { |
|||
_ensurePendingEvents().add(new _DelayedDone()); |
|||
} |
|||
} |
|||
|
|||
// EventSink interface. Used by the [addStream] events.
|
|||
|
|||
// Add data event, used both by the [addStream] events and by [add].
|
|||
void _add(T value) { |
|||
if (hasListener) { |
|||
_sendData(value); |
|||
} else if (_isInitialState) { |
|||
_ensurePendingEvents().add(new _DelayedData<object>(value)); |
|||
} |
|||
} |
|||
|
|||
void _addError(object error, string stackTrace) { |
|||
if (hasListener) { |
|||
_sendError(error, stackTrace); |
|||
} else if (_isInitialState) { |
|||
_ensurePendingEvents().add(new _DelayedError((Exception) error, stackTrace)); |
|||
} |
|||
} |
|||
|
|||
void _close() { |
|||
// End of addStream stream.
|
|||
D.assert(_isAddingStream); |
|||
_StreamControllerAddStreamState<T> addState = _varData; |
|||
_varData = addState.varData; |
|||
_state &= ~_STATE_ADDSTREAM; |
|||
addState.complete(); |
|||
} |
|||
|
|||
// _StreamControllerLifeCycle interface
|
|||
|
|||
StreamSubscription<T> _subscribe(void onData(T data), Function onError, |
|||
void onDone(), bool cancelOnError) { |
|||
if (!_isInitialState) { |
|||
throw new StateError("Stream has already been listened to."); |
|||
} |
|||
_ControllerSubscription<T> subscription = new _ControllerSubscription<T>( |
|||
this, onData, onError, onDone, cancelOnError); |
|||
|
|||
_PendingEvents<T> pendingEvents = _pendingEvents; |
|||
_state |= _STATE_SUBSCRIBED; |
|||
if (_isAddingStream) { |
|||
_StreamControllerAddStreamState<T> addState = _varData; |
|||
addState.varData = subscription; |
|||
addState.resume(); |
|||
} else { |
|||
_varData = subscription; |
|||
} |
|||
subscription._setPendingEvents(pendingEvents); |
|||
subscription._guardCallback(() { |
|||
_runGuarded(onListen); |
|||
}); |
|||
|
|||
return subscription; |
|||
} |
|||
|
|||
Future _recordCancel(StreamSubscription<T> subscription) { |
|||
// When we cancel, we first cancel any stream being added,
|
|||
// Then we call `onCancel`, and finally the _doneFuture is completed.
|
|||
// If either of addStream's cancel or `onCancel` returns a future,
|
|||
// we wait for it before continuing.
|
|||
// Any error during this process ends up in the returned future.
|
|||
// If more errors happen, we act as if it happens inside nested try/finallys
|
|||
// or whenComplete calls, and only the last error ends up in the
|
|||
// returned future.
|
|||
Future result; |
|||
if (_isAddingStream) { |
|||
_StreamControllerAddStreamState<T> addState = _varData; |
|||
result = addState.cancel(); |
|||
} |
|||
_varData = null; |
|||
_state = |
|||
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
|||
|
|||
if (onCancel != null) { |
|||
if (result == null) { |
|||
// Only introduce a future if one is needed.
|
|||
// If _onCancel returns null, no future is needed.
|
|||
try { |
|||
result = onCancel(); |
|||
} catch (e, s) { |
|||
// Return the error in the returned future.
|
|||
// Complete it asynchronously, so there is time for a listener
|
|||
// to handle the error.
|
|||
result = new _Future().._asyncCompleteError(e, s); |
|||
} |
|||
} else { |
|||
// Simpler case when we already know that we will return a future.
|
|||
result = result.whenComplete(onCancel); |
|||
} |
|||
} |
|||
|
|||
void complete() { |
|||
if (_doneFuture != null && _doneFuture._mayComplete) { |
|||
_doneFuture._asyncComplete(null); |
|||
} |
|||
} |
|||
|
|||
if (result != null) { |
|||
result = result.whenComplete(complete); |
|||
} else { |
|||
complete(); |
|||
} |
|||
|
|||
return result; |
|||
} |
|||
|
|||
void _recordPause(StreamSubscription<T> subscription) { |
|||
if (_isAddingStream) { |
|||
_StreamControllerAddStreamState<T> addState = _varData; |
|||
addState.pause(); |
|||
} |
|||
_runGuarded(onPause); |
|||
} |
|||
|
|||
void _recordResume(StreamSubscription<T> subscription) { |
|||
if (_isAddingStream) { |
|||
_StreamControllerAddStreamState<T> addState = _varData; |
|||
addState.resume(); |
|||
} |
|||
_runGuarded(onResume); |
|||
} |
|||
} |
|||
//
|
|||
abstract class _SyncStreamControllerDispatch<T> |
|||
: _StreamController<T>, SynchronousStreamController<T> { |
|||
int _state { get; set; } |
|||
|
|||
void _sendData(T data) { |
|||
_subscription._add(data); |
|||
} |
|||
|
|||
void _sendError(object error, string stackTrace) { |
|||
_subscription._addError(error, stackTrace); |
|||
} |
|||
|
|||
void _sendDone() { |
|||
_subscription._close(); |
|||
} |
|||
|
|||
protected _SyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel) |
|||
{ |
|||
} |
|||
} |
|||
|
|||
abstract class _AsyncStreamControllerDispatch<T> |
|||
: _StreamController<T> { |
|||
void _sendData(T data) { |
|||
_subscription._addPending(new _DelayedData<T>(data)); |
|||
} |
|||
|
|||
void _sendError(object error, string stackTrace) { |
|||
_subscription._addPending(new _DelayedError((Exception) error, stackTrace)); |
|||
} |
|||
|
|||
void _sendDone() { |
|||
_subscription._addPending(new _DelayedDone()); |
|||
} |
|||
|
|||
protected _AsyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause, _stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel) |
|||
{ |
|||
} |
|||
} |
|||
|
|||
// TODO(lrn): Use common superclass for callback-controllers when VM supports
|
|||
// constructors in mixin superclasses.
|
|||
|
|||
class _AsyncStreamController<T> : _AsyncStreamControllerDispatch<T> { |
|||
// public override void close() {
|
|||
// throw new NotImplementedException();
|
|||
// }
|
|||
} |
|||
|
|||
class _SyncStreamController<T> : _SyncStreamControllerDispatch<T> { |
|||
|
|||
} |
|||
|
|||
|
|||
class _ControllerStream<T> : _StreamImpl<T>, IEquatable<_ControllerStream<T>> { |
|||
_StreamControllerLifecycle<T> _controller; |
|||
|
|||
internal _ControllerStream( _StreamControllerLifecycle<T> _controller) { |
|||
this._controller = _controller; |
|||
} |
|||
|
|||
StreamSubscription<T> _createSubscription(void onData(T data), |
|||
Function onError, void onDone(), bool cancelOnError) => |
|||
_controller._subscribe(onData, onError, onDone, cancelOnError); |
|||
|
|||
// Override == and hashCode so that new streams returned by the same
|
|||
// controller are considered equal. The controller returns a new stream
|
|||
// each time it's queried, but doesn't have to cache the result.
|
|||
|
|||
// int hashCode {
|
|||
// get { return _controller.GetHashCode() ^ 0x35323532; }
|
|||
// }
|
|||
|
|||
// bool operator ==(object other) {
|
|||
// if (identical(this, other)) return true;
|
|||
// return other is _ControllerStream &&
|
|||
// identical(other._controller, this._controller);
|
|||
// }
|
|||
|
|||
public bool Equals(_ControllerStream<T> other) |
|||
{ |
|||
if (ReferenceEquals(null, other)) { |
|||
return false; |
|||
} |
|||
|
|||
if (ReferenceEquals(this, other)) { |
|||
return true; |
|||
} |
|||
|
|||
return Equals(_controller, other._controller); |
|||
} |
|||
|
|||
public override bool Equals(object obj) |
|||
{ |
|||
if (ReferenceEquals(null, obj)) { |
|||
return false; |
|||
} |
|||
|
|||
if (ReferenceEquals(this, obj)) { |
|||
return true; |
|||
} |
|||
|
|||
if (obj.GetType() != GetType()) { |
|||
return false; |
|||
} |
|||
|
|||
return Equals((_ControllerStream<T>) obj); |
|||
} |
|||
|
|||
public override int GetHashCode() |
|||
{ |
|||
return _controller.GetHashCode() ^ 0x35323532; |
|||
} |
|||
} |
|||
//
|
|||
// class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
|||
// final _StreamControllerLifecycle<T> _controller;
|
|||
//
|
|||
// _ControllerSubscription(this._controller, void onData(T data),
|
|||
// Function onError, void onDone(), bool cancelOnError)
|
|||
// : super(onData, onError, onDone, cancelOnError);
|
|||
//
|
|||
// Future _onCancel() {
|
|||
// return _controller._recordCancel(this);
|
|||
// }
|
|||
//
|
|||
// void _onPause() {
|
|||
// _controller._recordPause(this);
|
|||
// }
|
|||
//
|
|||
// void _onResume() {
|
|||
// _controller._recordResume(this);
|
|||
// }
|
|||
// }
|
|||
//
|
|||
// /** A class that exposes only the [StreamSink] interface of an object. */
|
|||
// class _StreamSinkWrapper<T> implements StreamSink<T> {
|
|||
// final StreamController _target;
|
|||
// _StreamSinkWrapper(this._target);
|
|||
// void add(T data) {
|
|||
// _target.add(data);
|
|||
// }
|
|||
//
|
|||
// void addError(object error, [string stackTrace]) {
|
|||
// _target.addError(error, stackTrace);
|
|||
// }
|
|||
//
|
|||
// Future close() => _target.close();
|
|||
//
|
|||
// Future addStream(Stream<T> source) => _target.addStream(source);
|
|||
//
|
|||
// Future get done => _target.done;
|
|||
// }
|
|||
//
|
|||
class _AddStreamState<T> { |
|||
// [_Future] returned by call to addStream.
|
|||
internal readonly _Future addStreamFuture; |
|||
|
|||
// Subscription on stream argument to addStream.
|
|||
internal readonly StreamSubscription<T> addSubscription; |
|||
|
|||
internal _AddStreamState( |
|||
_EventSink<T> controller, Stream<T> source, bool cancelOnError) { |
|||
addStreamFuture = new _Future(); |
|||
addSubscription = source.listen(controller._add, |
|||
onError: cancelOnError |
|||
? makeErrorHandler(controller) |
|||
: controller._addError, |
|||
onDone: controller._close, |
|||
cancelOnError: cancelOnError); |
|||
} |
|||
|
|||
public static Action<object, string> makeErrorHandler(_EventSink<T> controller) { |
|||
return (object e, string s) => { |
|||
controller._addError(e, s); |
|||
controller._close(); |
|||
}; |
|||
} |
|||
|
|||
void pause() { |
|||
addSubscription.pause(); |
|||
} |
|||
|
|||
void resume() { |
|||
addSubscription.resume(); |
|||
} |
|||
|
|||
Future cancel() { |
|||
var cancel = addSubscription.cancel(); |
|||
if (cancel == null) { |
|||
addStreamFuture._asyncComplete(FutureOr.nil); |
|||
return null; |
|||
} |
|||
return cancel.whenComplete(() => { |
|||
addStreamFuture._asyncComplete(FutureOr.nil); |
|||
}); |
|||
} |
|||
|
|||
void complete() { |
|||
addStreamFuture._asyncComplete(FutureOr.nil); |
|||
} |
|||
} |
|||
|
|||
class _StreamControllerAddStreamState<T> : _AddStreamState<T> { |
|||
// The subscription or pending data of a _StreamController.
|
|||
// Stored here because we reuse the `_varData` field in the _StreamController
|
|||
// to store this state object.
|
|||
public object varData; |
|||
|
|||
internal _StreamControllerAddStreamState(_StreamController<T> controller, object varData, |
|||
Stream<T> source, bool cancelOnError) |
|||
: base(controller, source, cancelOnError) { |
|||
if (controller.isPaused) { |
|||
addSubscription.pause(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|
|||
fileFormatVersion: 2 |
|||
guid: 6a5b6c88a4f44674921771bc51facbfc |
|||
timeCreated: 1628753099 |
撰写
预览
正在加载...
取消
保存
Reference in new issue