浏览代码

resolve all error

/main
siyao 3 年前
当前提交
4300c6f6
共有 10 个文件被更改,包括 1357 次插入729 次删除
  1. 2
      com.unity.uiwidgets/Runtime/async/async.cs
  2. 44
      com.unity.uiwidgets/Runtime/async/async_cast.cs
  3. 292
      com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs
  4. 998
      com.unity.uiwidgets/Runtime/async/stream.cs
  5. 4
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  6. 214
      com.unity.uiwidgets/Runtime/async/stream_impl.cs
  7. 515
      com.unity.uiwidgets/Runtime/async/stream_pipe.cs
  8. 2
      com.unity.uiwidgets/Runtime/async/stream_transformers.cs
  9. 12
      com.unity.uiwidgets/Runtime/async/async_error.cs
  10. 3
      com.unity.uiwidgets/Runtime/async/async_error.cs.meta

2
com.unity.uiwidgets/Runtime/async/async.cs


namespace Unity.UIWidgets.async {
public static partial class _async {
public static object _nonNullError(object error) => error ?? new NullReferenceException();
public static object _nonNullError(object error) => error ?? new NullReferenceException();
}
}

44
com.unity.uiwidgets/Runtime/async/async_cast.cs


public CastStream(Stream<S> _source) {
this._source = _source;
}
bool isBroadcast {
bool isBroadcast {
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
result.onData(onData);
result.onError(onError);
result.onData(onData);
result.onError(onError);
return result;
}

/// May be null.
ZoneBinaryCallback _handleError;
public CastStreamSubscription( StreamSubscription<S> _source) {
public CastStreamSubscription(StreamSubscription<S> _source) {
this._source = _source;
_source.onData(_onData);
}

_handleData = handleData == null
? null
: _zone.registerUnaryCallback(data => {
handleData((T) data);
return null;
handleData((T) data);
return null;
});
}

_handleError = null;
} else {
}
else {
.registerBinaryCallback((a, b)=> {
handleError(a, (string) b);
return null;
.registerBinaryCallback((a, b) => {
handleError(a, (string) b);
return null;
});
}
}

}
targetData = (T)(object) data ;
} catch (Exception error) {
targetData = (T) (object) data;
}
catch (Exception error) {
} else {
}
else {
}
}
_zone.runUnaryGuarded(_handleData, targetData);
}

_source.resume();
}
bool isPaused {
bool isPaused {
get { return _source.isPaused; }
}

public override StreamTransformer<RS, RT> cast<RS, RT>() =>
new CastStreamTransformer<SS, ST, RS, RT>(_source);
public override Stream<TT> bind(Stream<TS> stream) =>
_source.bind(stream.cast<SS>()).cast<TT>();
}

292
com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs


internal _Future _doneFuture;
internal _BroadcastStreamController(_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel) {
internal _BroadcastStreamController(_stream.ControllerCallback onListen,
_stream.ControllerCancelCallback onCancel) {
this.onListen = onListen;
this.onCancel = onCancel;
_state = _STATE_INITIAL;

Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
if (isClosed) {
onDone ??= _stream._nullDoneHandler;
return new _DoneStreamSubscription<T>(()=>onDone());
return new _DoneStreamSubscription<T>(() => onDone());
}
StreamSubscription<T> subscription = new _BroadcastSubscription<T>(

}
}
class _SyncBroadcastStreamController<T> : _BroadcastStreamController<T>
, SynchronousStreamController<T> {
internal _SyncBroadcastStreamController(
_stream.ControllerCallback onListen, Action onCancel)
: base(onListen, ()=> {
onCancel();
return Future._nullFuture;
}) {
}
class _SyncBroadcastStreamController<T> : _BroadcastStreamController<T>
, SynchronousStreamController<T> {
internal _SyncBroadcastStreamController(
_stream.ControllerCallback onListen, Action onCancel)
: base(onListen, () => {
onCancel();
return Future._nullFuture;
}) {
}
// EventDispatch interface.
// EventDispatch interface.
bool _mayAddEvent {
get { return base._mayAddEvent && !_isFiring; }
}
bool _mayAddEvent {
get { return base._mayAddEvent && !_isFiring; }
}
internal override Exception _addEventError() {
if (_isFiring) {
return new Exception(
"Cannot fire new event. Controller is already firing an event");
}
return base._addEventError();
}
internal override Exception _addEventError() {
if (_isFiring) {
return new Exception(
"Cannot fire new event. Controller is already firing an event");
}
public override void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController<T>._STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
subscription._add(data);
_state &= ~_BroadcastStreamController<T>._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) =>{
subscription._add(data);
});
}
return base._addEventError();
}
public override void _sendError(object error, string stackTrace) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._addError(error, stackTrace);
});
}
public override void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController<T>._STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
subscription._add(data);
_state &= ~_BroadcastStreamController<T>._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
public override void _sendDone() {
if (!_isEmpty) {
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._close();
});
} else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) => { subscription._add(data); });
}
public override void _sendError(object error, string stackTrace) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._addError(error, stackTrace);
});
}
public override void _sendDone() {
if (!_isEmpty) {
_forEachListener((_BufferingStreamSubscription<T> subscription) => { subscription._close(); });
}
else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
}
class _AsyncBroadcastStreamController<T> : _BroadcastStreamController<T> {
internal _AsyncBroadcastStreamController(_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
}
class _AsyncBroadcastStreamController<T> : _BroadcastStreamController<T> {
internal _AsyncBroadcastStreamController(_stream.ControllerCallback onListen,
_stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
}
// EventDispatch interface.
// EventDispatch interface.
public override void _sendData(T data) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
public override void _sendData(T data) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
public override void _sendError(object error, string stackTrace) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
}
public override void _sendError(object error, string stackTrace) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
}
public override void _sendDone() {
if (!_isEmpty) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedDone<T>());
}
} else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
public override void _sendDone() {
if (!_isEmpty) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedDone<T>());
}
}
else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
}
//
// /**
// * Stream controller that is used by [Stream.asBroadcastStream].

// * an "asBroadcastStream" stream are always initiated by events
// * on another stream, and it is fine to forward them synchronously.
// */
class _AsBroadcastStreamController<T> : _SyncBroadcastStreamController<T>
, _EventDispatch<T> {
_StreamImplEvents<T> _pending;
class _AsBroadcastStreamController<T> : _SyncBroadcastStreamController<T>
, _EventDispatch<T> {
_StreamImplEvents<T> _pending;
internal _AsBroadcastStreamController(Action onListen, Action onCancel)
: base(() => onListen(), onCancel) {
}
internal _AsBroadcastStreamController(Action onListen, Action onCancel)
: base(() => onListen(), onCancel) {
}
bool _hasPending {
get { return _pending != null && !_pending.isEmpty; }
}
bool _hasPending {
get { return _pending != null && !_pending.isEmpty; }
}
void _addPendingEvent(_DelayedEvent<T> evt) {
_pending ??= new _StreamImplEvents<T>();
_pending.add(evt);
}
void _addPendingEvent(_DelayedEvent<T> evt) {
_pending ??= new _StreamImplEvents<T>();
_pending.add(evt);
}
void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
base.add(data);
while (_hasPending) {
_pending.handleNext(this);
}
}
void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
stackTrace ??= AsyncError.defaultStackTrace(error);
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedError<T>((Exception) error, stackTrace));
return;
}
if (!_mayAddEvent) throw _addEventError();
_sendError(error, stackTrace);
while (_hasPending) {
_pending.handleNext(this);
}
}
base.add(data);
while (_hasPending) {
_pending.handleNext(this);
}
}
void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
stackTrace ??= AsyncError.defaultStackTrace(error);
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedError<T>((Exception) error, stackTrace));
return;
}
if (!_mayAddEvent) throw _addEventError();
_sendError(error, stackTrace);
while (_hasPending) {
_pending.handleNext(this);
}
}
Future close() {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedDone<T>());
_state |= _BroadcastStreamController<T>._STATE_CLOSED;
return base.done;
}
Future result = base.close();
D.assert(!_hasPending);
return result;
}
Future close() {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedDone<T>());
_state |= _BroadcastStreamController<T>._STATE_CLOSED;
return base.done;
}
Future result = base.close();
D.assert(!_hasPending);
return result;
}
void _callOnCancel() {
if (_hasPending) {
_pending.clear();
_pending = null;
}
void _callOnCancel() {
if (_hasPending) {
_pending.clear();
_pending = null;
base._callOnCancel();
}
base._callOnCancel();
}
}
}

998
com.unity.uiwidgets/Runtime/async/stream.cs
文件差异内容过多而无法显示
查看文件

4
com.unity.uiwidgets/Runtime/async/stream_controller.cs


_EventDispatch<T> {
public abstract StreamSubscription<T> _subscribe(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError);
public void _recordPause(StreamSubscription<T> subscription) {
}

214
com.unity.uiwidgets/Runtime/async/stream_impl.cs


internal static void _nullDoneHandler() {
}
internal delegate _PendingEvents<T> _EventGenerator<T>();
internal delegate void _BroadcastCallback<T>(StreamSubscription<T> subscription);

void _onListen(StreamSubscription<T> subscription) {
}
}
class _GeneratedStreamImpl<T> : _StreamImpl<T> {
readonly _stream._EventGenerator<T> _pending;
bool _isUsed = false;

}
StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError ) {
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
result._setPendingEvents(_pending());
result._setPendingEvents(_pending());
class _IterablePendingEvents<T> : _PendingEvents<T> {
IEnumerator<T> _iterator;

bool isEmpty {
bool isEmpty {
get { return _iterator == null; }
}

}
} else {
}
else {
} catch (Exception e) {
}
catch (Exception e) {
_iterator =Enumerable.Empty<T>().GetEnumerator();// new EmptyIterator<Null>();
_iterator = Enumerable.Empty<T>().GetEnumerator(); // new EmptyIterator<Null>();
} else {
}
else {
// Threw in .current.
dispatch._sendError(e, e.StackTrace);
}

StreamSubscription<T> _subscription;
internal _AsBroadcastStream(
Stream<T> _source,
Action<StreamSubscription<T>> onListenHandler,
Action<StreamSubscription<T>> onCancelHandler)
Stream<T> _source,
Action<StreamSubscription<T>> onListenHandler,
Action<StreamSubscription<T>> onCancelHandler)
_onListenHandler = a=>Zone.current
_onListenHandler = a => Zone.current
onListenHandler((StreamSubscription<T>) b);
return default;
onListenHandler((StreamSubscription<T>) b);
return default;
)(a);
_onCancelHandler = d=> Zone.current
)(a);
_onCancelHandler = d => Zone.current
c=> {
onCancelHandler((StreamSubscription<T>) c);
return default;
c => {
onCancelHandler((StreamSubscription<T>) c);
return default;
bool isBroadcast {
bool isBroadcast {
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
if (_controller == null || _controller.isClosed) {
// Return a dummy subscription backed by nothing, since
// it will only ever send one done event.
return new _DoneStreamSubscription<T>(()=>onDone());
}
_subscription ??= _source.listen(_controller.add,
onError: _controller.addError, onDone: ()=>_controller.close());
cancelOnError = Equals(true, cancelOnError);
return _controller._subscribe(onData, onError, onDone, cancelOnError);
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
if (_controller == null || _controller.isClosed) {
// Return a dummy subscription backed by nothing, since
// it will only ever send one done event.
return new _DoneStreamSubscription<T>(() => onDone());
}
_subscription ??= _source.listen(_controller.add,
onError: _controller.addError, onDone: () => _controller.close());
cancelOnError = Equals(true, cancelOnError);
return _controller._subscribe(onData, onError, onDone, cancelOnError);
}
void _onCancel() {
bool shutdown = (_controller == null) || _controller.isClosed;
if (_onCancelHandler != null) {
_zone.runUnary(
a => {
_onCancelHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
}
void _onCancel() {
bool shutdown = (_controller == null) || _controller.isClosed;
if (_onCancelHandler != null) {
_zone.runUnary(
a=> {
_onCancelHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
}
if (shutdown) {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
}
}
}
if (shutdown) {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
}
}
}
void _onListen() {
if (_onListenHandler != null) {
_zone.runUnary(
a => {
_onListenHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
void _onListen() {
if (_onListenHandler != null) {
_zone.runUnary(
a => {
_onListenHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
}
}
// Methods called from _BroadcastSubscriptionWrapper.
internal void _cancelSubscription() {
if (_subscription == null) return;
// Called by [_controller] when it has no subscribers left.
StreamSubscription<T> subscription = _subscription;
_subscription = null;
_controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
}
internal void _pauseSubscription(Future resumeSignal) {
if (_subscription == null) return;
_subscription.pause(resumeSignal);
}
internal void _resumeSubscription() {
if (_subscription == null) return;
_subscription.resume();
}
internal bool _isSubscriptionPaused {
get {
if (_subscription == null) return false;
return _subscription.isPaused;
}
}
}
class _BroadcastSubscriptionWrapper<T> : StreamSubscription<T> {
readonly _AsBroadcastStream<T> _stream;
internal _BroadcastSubscriptionWrapper(_AsBroadcastStream<T> _stream) {
this._stream = _stream;
}
public override void onData(Action<T> handleData) {
throw new Exception(
"Cannot change handlers of asBroadcastStream source subscription.");
}
public override void onError(Action<object, string> action) {
throw new Exception(
"Cannot change handlers of asBroadcastStream source subscription.");
}
// Methods called from _BroadcastSubscriptionWrapper.
void _cancelSubscription() {
if (_subscription == null) return;
// Called by [_controller] when it has no subscribers left.
StreamSubscription<T> subscription = _subscription;
_subscription = null;
_controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
}
public override void onDone(Action handleDone) {
throw new Exception(
"Cannot change handlers of asBroadcastStream source subscription.");
}
void _pauseSubscription(Future resumeSignal) {
if (_subscription == null) return;
_subscription.pause(resumeSignal);
}
public override void pause(Future resumeSignal = null) {
_stream._pauseSubscription(resumeSignal);
}
public override void resume() {
_stream._resumeSubscription();
}
public override Future cancel() {
_stream._cancelSubscription();
return Future._nullFuture;
}
void _resumeSubscription() {
if (_subscription == null) return;
_subscription.resume();
}
bool isPaused {
get { return _stream._isSubscriptionPaused; }
}
bool _isSubscriptionPaused{get {
if (_subscription == null) return false;
return _subscription.isPaused;
}}
}
public override Future<E> asFuture<E>(E futureValue) {
throw new Exception(
"Cannot change handlers of asBroadcastStream source subscription.");
}
}
internal class _StreamIterator<T> : StreamIterator<T> {

515
com.unity.uiwidgets/Runtime/async/stream_pipe.cs


using System;
using System.Collections;
using System.Collections.Generic;
namespace Unity.UIWidgets.async {
namespace Unity.UIWidgets.async {
Func<T> userCode, Action<T> onSuccess, Action<Exception> onError){
Func<T> userCode, Action<T> onSuccess, Action<Exception> onError) {
} catch (Exception e) {
}
catch (Exception e) {
} else {
}
else {
internal static void _cancelAndErrorWithReplacement<T>(StreamSubscription<T> subscription,
_Future future, Exception error) {
AsyncError replacement = Zone.current.errorCallback(error);

return (error) => {
_cancelAndError(subscription, future, error);
};
return (error) => { _cancelAndError(subscription, future, error); };
} else {
}
else {
) {
) {
} else {
}
else {
future._completeError(error);
}
}

if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
}
else {
internal delegate bool _Predicate<T>(T value);
//
internal static void _addErrorWithReplacement<T>(_EventSink<T> sink, Exception error, string stackTrace) {
AsyncError replacement = Zone.current.errorCallback(error);
if (replacement != null) {
error = async_._nonNullError(replacement);
stackTrace = replacement.StackTrace;
}
sink._addError(error, stackTrace);
}
internal delegate T _Transformation<S, T>(S value);
internal delegate bool _ErrorTest(Exception error);
internal delegate bool _Equality<T>(T a, T b);
}
abstract class _ForwardingStream<S, T> : Stream<T> {
internal readonly Stream<S> _source;
internal _ForwardingStream(Stream<S> _source) {
this._source = _source;
}
bool isBroadcast {
get { return _source.isBroadcast; }
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
cancelOnError = Equals(true, cancelOnError);
return _createSubscription(onData, onError, onDone, cancelOnError);
}
internal virtual StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
// Override the following methods in subclasses to change the behavior.
internal virtual void _handleData(S data, _EventSink<T> sink) {
sink._add((T) (object) data);
}
internal virtual void _handleError(Exception error, _EventSink<T> sink) {
sink._addError(error, error.StackTrace);
}
internal virtual void _handleDone(_EventSink<T> sink) {
sink._close();
}
}
//
// /**
// * Abstract superclass for subscriptions that forward to other subscriptions.
// */
class _ForwardingStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
readonly _ForwardingStream<S, T> _stream;
StreamSubscription<S> _subscription;
internal _ForwardingStreamSubscription(_ForwardingStream<S, T> _stream,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError
)
: base(onData, onError, onDone, cancelOnError) {
_subscription = _stream._source
.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
// _StreamSink interface.
// Transformers sending more than one event have no way to know if the stream
// is canceled or closed after the first, so we just ignore remaining events.
void _add(T data) {
if (_isClosed) return;
base._add(data);
}
void _addError(object error, string stackTrace) {
if (_isClosed) return;
base._addError(error, stackTrace);
}
// StreamSubscription callbacks.
void _onPause() {
if (_subscription == null) return;
_subscription.pause();
}
void _onResume() {
if (_subscription == null) return;
_subscription.resume();
}
Future _onCancel() {
if (_subscription != null) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
}
return null;
}
// Methods used as listener on source subscription.
void _handleData(S data) {
_stream._handleData(data, this);
}
void _handleError(object error, string stackTrace) {
_stream._handleError((Exception) error, this);
}
void _handleDone() {
_stream._handleDone(this);
}
}
//
// // -------------------------------------------------------------------
// // Stream transformers used by the default Stream implementation.
// // -------------------------------------------------------------------
//
//
class _WhereStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _WhereStream(Stream<T> source, Func<T, bool> test) : base(source) {
_test = d => test(d);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
if (satisfies) {
sink._add(inputEvent);
}
}
}
//
//
// /**
// * A stream pipe that converts data events before passing them on.
// */
class _MapStream<S, T> : _ForwardingStream<S, T> {
readonly _stream._Transformation<S, T> _transform;
internal _MapStream(Stream<S> source, Func<S, T> transform) : base(source) {
_transform = d => transform(d);
}
internal override void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
sink._add(outputEvent);
}
}
//
// /**
// * A stream pipe that converts data events before passing them on.
// */
class _ExpandStream<S, T> : _ForwardingStream<S, T> {
readonly _stream._Transformation<S, IEnumerable<T>> _expand;
internal _ExpandStream(Stream<S> source, _stream._Transformation<S, IEnumerable<T>> expand) : base(source) {
_expand = expand;
}
internal override void _handleData(S inputEvent, _EventSink<T> sink) {
try {
foreach (T value in _expand(inputEvent)) {
sink._add(value);
}
}
catch (Exception e) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
}
}
}
//
//
// /**
// * A stream pipe that converts or disposes error events
// * before passing them on.
// */
class _HandleErrorStream<T> : _ForwardingStream<T, T> {
readonly ZoneBinaryCallback _transform;
readonly _stream._ErrorTest _test;
internal _HandleErrorStream(Stream<T> source, ZoneBinaryCallback onError, _stream._ErrorTest test) :
base(source) {
_transform = onError;
_test = test;
}
void _handleError(object error, string stackTrace, _EventSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
matches = _test((Exception) error);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
}
if (matches) {
try {
_async._invokeErrorHandler(_transform, error, stackTrace);
}
catch (Exception e) {
if (Equals(e, error)) {
sink._addError(error, stackTrace);
}
else {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
}
return;
}
}
else {
sink._addError(error, stackTrace);
}
}
}
//
class _TakeStream<T> : _ForwardingStream<T, T> {
readonly int _count;
internal _TakeStream(Stream<T> source, int count) : base(source) {
_count = count;
// This test is done early to avoid handling an async error
// in the _handleData method.
// ArgumentError.checkNotNull(count, "count");
}
internal override StreamSubscription<T> _createSubscription(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError) {
if (_count == 0) {
_source.listen(null).cancel();
return new _DoneStreamSubscription<T>(() => onDone());
}
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _count);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
int count = subscription._count;
if (count > 0) {
sink._add(inputEvent);
count -= 1;
subscription._count = count;
if (count == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
sink._close();
}
}
}
}
//
// /**
// * A [_ForwardingStreamSubscription] with one extra state field.
// *
// * Use by several different classes, storing an integer, bool or general.
// */
class _StateStreamSubscription<T> : _ForwardingStreamSubscription<T, T> {
// Raw state field. Typed access provided by getters and setters below.
// siyao: this is used as bool and int, if it was used at the same time, everything would be fxxked up.
object _sharedState;
internal _StateStreamSubscription(
_ForwardingStream<T, T> stream,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError, object _sharedState
)
: base(stream, onData, onError, onDone, cancelOnError) {
this._sharedState = _sharedState;
}
internal bool _flag {
get => (bool) _sharedState;
set => _sharedState = value;
}
internal int _count {
get => (int) _sharedState;
set => _sharedState = value;
}
internal object _value {
get => _sharedState;
set => _sharedState = value;
}
}
class _TakeWhileStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _TakeWhileStream(Stream<T> source, _stream._Predicate<T> test)
: base(source) {
_test = test;
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
// The test didn't say true. Didn't say false either, but we stop anyway.
sink._close();
return;
}
if (satisfies) {
sink._add(inputEvent);
}
else {
sink._close();
}
}
}
//
class _SkipStream<T> : _ForwardingStream<T, T> {
readonly int _count;
internal _SkipStream(Stream<T> source, int count)
: base(source) {
_count = count;
// This test is done early to avoid handling an async error
// in the _handleData method.
// ArgumentError.checkNotNull(count, "count");
// RangeError.checkNotNegative(count, "count");
}
internal override StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _count);
}
internal void _handleDone(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
int count = subscription._count;
if (count > 0) {
subscription._count = count - 1;
return;
}
sink._add(inputEvent);
}
}
class _SkipWhileStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _SkipWhileStream(Stream<T> source, _stream._Predicate<T> test) : base(source) {
_test = test;
}
internal override StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError
) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, false);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
bool hasFailed = subscription._flag;
if (hasFailed) {
sink._add(inputEvent);
return;
}
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
// A failure to return a boolean is considered "not matching".
subscription._flag = true;
return;
}
if (!satisfies) {
subscription._flag = true;
sink._add(inputEvent);
}
}
}
class _DistinctStream<T> : _ForwardingStream<T, T> {
static readonly object _SENTINEL = new object();
readonly _stream._Equality<T> _equals;
internal _DistinctStream(Stream<T> source, _stream._Equality<T> equals) : base(source) {
_equals = equals;
}
internal override StreamSubscription<T> _createSubscription(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _SENTINEL);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
var previous = subscription._value;
if (Equals(previous, _SENTINEL)) {
// First event.
subscription._value = inputEvent;
sink._add(inputEvent);
}
else {
T previousEvent = (T) previous;
bool isEqual;
try {
if (_equals == null) {
isEqual = Equals(previousEvent, inputEvent);
}
else {
isEqual = _equals(previousEvent, inputEvent);
}
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
if (!isEqual) {
sink._add(inputEvent);
subscription._value = inputEvent;
}
}
}
}
}

2
com.unity.uiwidgets/Runtime/async/stream_transformers.cs


this._stream = _stream;
this._onListen = _onListen;
}
readonly _async._SubscriptionTransformer<S, T> _onListen;
readonly Stream<S> _stream;

12
com.unity.uiwidgets/Runtime/async/async_error.cs


namespace Unity.UIWidgets.async {
public partial class _async {
internal static object _invokeErrorHandler(
ZoneBinaryCallback errorHandler, object error, string stackTrace) {
// Dynamic invocation because we don't know the actual type of the
// first argument or the error object, but we should successfully call
// the handler if they match up.
// TODO(lrn): Should we? Why not the same below for the unary case?
return errorHandler(error, stackTrace);
}
}
}

3
com.unity.uiwidgets/Runtime/async/async_error.cs.meta


fileFormatVersion: 2
guid: 0e0283a717c7486596da66a72eb9231e
timeCreated: 1629344015
正在加载...
取消
保存