您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
305 行
10 KiB
305 行
10 KiB
using System;
|
|
using Unity.UIWidgets.async;
|
|
|
|
namespace Unity.UIWidgets.async {
|
|
class _EventSinkWrapper<T> : EventSink<T> {
|
|
_EventSink<object> _sink;
|
|
|
|
internal _EventSinkWrapper(_EventSink<object> _sink) {
|
|
this._sink = _sink;
|
|
}
|
|
|
|
public override void add(T data) {
|
|
_sink._add(data);
|
|
}
|
|
|
|
public override void addError(object error, string stackTrace) {
|
|
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
|
|
}
|
|
|
|
public override Future close() {
|
|
_sink._close();
|
|
return Future._nullFuture;
|
|
}
|
|
}
|
|
|
|
class _SinkTransformerStreamSubscription<S, T>
|
|
: _BufferingStreamSubscription<T> {
|
|
/// The transformer's input sink.
|
|
EventSink<S> _transformerSink;
|
|
|
|
/// The subscription to the input stream.
|
|
StreamSubscription<S> _subscription;
|
|
|
|
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper,
|
|
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError)
|
|
// We set the adapter's target only when the user is allowed to send data.
|
|
: base(onData, onError, onDone, cancelOnError) {
|
|
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this);
|
|
_transformerSink = mapper(eventSink);
|
|
_subscription =
|
|
source.listen(_handleData, onError: _handleError, onDone: _handleDone);
|
|
}
|
|
|
|
/** Whether this subscription is still subscribed to its source. */
|
|
bool _isSubscribed {
|
|
get { return _subscription != null; }
|
|
}
|
|
|
|
// _EventSink interface.
|
|
|
|
public override void _add(T data) {
|
|
if (_isClosed) {
|
|
throw new Exception("Stream is already closed");
|
|
}
|
|
|
|
base._add(data);
|
|
}
|
|
|
|
public override void _addError(object error, string stackTrace) {
|
|
if (_isClosed) {
|
|
throw new Exception("Stream is already closed");
|
|
}
|
|
|
|
base._addError(error, stackTrace);
|
|
}
|
|
|
|
public override void _close() {
|
|
if (_isClosed) {
|
|
throw new Exception("Stream is already closed");
|
|
}
|
|
|
|
base._close();
|
|
}
|
|
|
|
// _BufferingStreamSubscription hooks.
|
|
|
|
protected override void _onPause() {
|
|
if (_isSubscribed) _subscription.pause();
|
|
}
|
|
|
|
protected override void _onResume() {
|
|
if (_isSubscribed) _subscription.resume();
|
|
}
|
|
|
|
protected override Future _onCancel() {
|
|
if (_isSubscribed) {
|
|
StreamSubscription<S> subscription = _subscription;
|
|
_subscription = null;
|
|
return subscription.cancel();
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
void _handleData(S data) {
|
|
try {
|
|
_transformerSink.add(data);
|
|
}
|
|
catch (Exception e) {
|
|
_addError(e, e.StackTrace);
|
|
}
|
|
}
|
|
|
|
void _handleError(object error, string stackTrace) {
|
|
try {
|
|
_transformerSink.addError(error, stackTrace);
|
|
}
|
|
catch (Exception e) {
|
|
if (Equals(e, error)) {
|
|
_addError(error, stackTrace);
|
|
}
|
|
else {
|
|
_addError(e, e.StackTrace);
|
|
}
|
|
}
|
|
}
|
|
|
|
void _handleDone() {
|
|
try {
|
|
_subscription = null;
|
|
_transformerSink.close();
|
|
}
|
|
catch (Exception e) {
|
|
_addError(e, e.StackTrace);
|
|
}
|
|
}
|
|
}
|
|
|
|
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
|
|
readonly _async._SinkMapper<S, T> _sinkMapper;
|
|
|
|
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) {
|
|
this._sinkMapper = _sinkMapper;
|
|
}
|
|
|
|
public override Stream<T> bind(Stream<S> stream) =>
|
|
new _BoundSinkStream<S, T>(stream, _sinkMapper);
|
|
}
|
|
|
|
class _BoundSinkStream<S, T> : Stream<T> {
|
|
readonly _async._SinkMapper<S, T> _sinkMapper;
|
|
readonly Stream<S> _stream;
|
|
|
|
public override bool isBroadcast {
|
|
get { return _stream.isBroadcast; }
|
|
}
|
|
|
|
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) {
|
|
this._stream = _stream;
|
|
this._sinkMapper = _sinkMapper;
|
|
}
|
|
|
|
public override StreamSubscription<T> listen(Action<T> onData,
|
|
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) {
|
|
StreamSubscription<T> subscription =
|
|
new _SinkTransformerStreamSubscription<S, T>(
|
|
_stream, _sinkMapper, onData, onError, onDone, cancelOnError);
|
|
return subscription;
|
|
}
|
|
}
|
|
|
|
static partial class _stream {
|
|
public delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
|
|
|
|
/// Error-handler coming from [StreamTransformer.fromHandlers].
|
|
public delegate void _TransformErrorHandler<T>(
|
|
object error, string stackTrace, EventSink<T> sink);
|
|
|
|
/// Done-handler coming from [StreamTransformer.fromHandlers].
|
|
public delegate void _TransformDoneHandler<T>(EventSink<T> sink);
|
|
}
|
|
|
|
class _HandlerEventSink<S, T> : EventSink<S> where T : class {
|
|
readonly _stream._TransformDataHandler<S, T> _handleData;
|
|
readonly _stream._TransformErrorHandler<T> _handleError;
|
|
readonly _stream._TransformDoneHandler<T> _handleDone;
|
|
|
|
/// The output sink where the handlers should send their data into.
|
|
EventSink<T> _sink;
|
|
|
|
internal _HandlerEventSink(
|
|
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError,
|
|
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) {
|
|
this._handleData = _handleData;
|
|
this._handleError = _handleError;
|
|
this._handleDone = _handleDone;
|
|
this._sink = _sink;
|
|
if (_sink == null) {
|
|
throw new Exception("The provided sink must not be null.");
|
|
}
|
|
}
|
|
|
|
bool _isClosed {
|
|
get { return _sink == null; }
|
|
}
|
|
|
|
public override void add(S data) {
|
|
if (_isClosed) {
|
|
throw new Exception("Sink is closed");
|
|
}
|
|
|
|
if (_handleData != null) {
|
|
_handleData(data, _sink);
|
|
}
|
|
else {
|
|
_sink.add(data as T);
|
|
}
|
|
}
|
|
|
|
public override void addError(object error, string stackTrace) {
|
|
// ArgumentError.checkNotNull(error, "error");
|
|
if (_isClosed) {
|
|
throw new Exception("Sink is closed");
|
|
}
|
|
|
|
if (_handleError != null) {
|
|
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
|
|
_handleError(error, stackTrace, _sink);
|
|
}
|
|
else {
|
|
_sink.addError(error, stackTrace);
|
|
}
|
|
}
|
|
|
|
public override Future close() {
|
|
if (_isClosed) return Future._nullFuture;
|
|
var sink = _sink;
|
|
_sink = null;
|
|
if (_handleDone != null) {
|
|
_handleDone(sink);
|
|
}
|
|
else {
|
|
sink.close();
|
|
}
|
|
return Future._nullFuture;
|
|
}
|
|
}
|
|
|
|
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class {
|
|
internal _StreamHandlerTransformer(
|
|
_stream._TransformDataHandler<S, T> handleData = null,
|
|
_stream._TransformErrorHandler<T> handleError = null,
|
|
_stream._TransformDoneHandler<T> handleDone = null)
|
|
: base((EventSink<T> outputSink) => {
|
|
return new _HandlerEventSink<S, T>(
|
|
handleData, handleError, handleDone, outputSink);
|
|
}) {
|
|
}
|
|
|
|
public override Stream<T> bind(Stream<S> stream) {
|
|
return base.bind(stream);
|
|
}
|
|
}
|
|
|
|
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
|
|
readonly Func<Stream<S>, Stream<T>> _bind;
|
|
|
|
internal _StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) {
|
|
this._bind = _bind;
|
|
}
|
|
|
|
public override Stream<T> bind(Stream<S> stream) => _bind(stream);
|
|
}
|
|
|
|
public partial class _async {
|
|
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
|
|
|
|
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError);
|
|
}
|
|
|
|
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> where T : class {
|
|
readonly _async._SubscriptionTransformer<S, T> _onListen;
|
|
|
|
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) {
|
|
this._onListen = _onListen;
|
|
}
|
|
|
|
public override Stream<T> bind(Stream<S> stream) =>
|
|
new _BoundSubscriptionStream<S, T>(stream, _onListen);
|
|
}
|
|
|
|
class _BoundSubscriptionStream<S, T> : Stream<T> {
|
|
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
|
|
this._stream = _stream;
|
|
this._onListen = _onListen;
|
|
}
|
|
|
|
readonly _async._SubscriptionTransformer<S, T> _onListen;
|
|
readonly Stream<S> _stream;
|
|
|
|
public override bool isBroadcast {
|
|
get { return _stream.isBroadcast; }
|
|
}
|
|
|
|
public override StreamSubscription<T> listen(Action<T> onData,
|
|
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
|
|
//cancelOnError = cancelOnError;
|
|
StreamSubscription<T> result = _onListen(_stream, cancelOnError);
|
|
result.onData(onData);
|
|
result.onError(onError);
|
|
result.onDone(onDone);
|
|
return result;
|
|
}
|
|
}
|
|
}
|