using System; using Unity.UIWidgets.async; namespace Unity.UIWidgets.async { class _EventSinkWrapper : EventSink { _EventSink _sink; internal _EventSinkWrapper(_EventSink _sink) { this._sink = _sink; } public override void add(T data) { _sink._add(data); } public override void addError(object error, string stackTrace) { _sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)); } public override Future close() { _sink._close(); return Future._nullFuture; } } class _SinkTransformerStreamSubscription : _BufferingStreamSubscription { /// The transformer's input sink. EventSink _transformerSink; /// The subscription to the input stream. StreamSubscription _subscription; internal _SinkTransformerStreamSubscription(Stream source, _async._SinkMapper mapper, Action onData, Action onError, Action onDone, bool cancelOnError) // We set the adapter's target only when the user is allowed to send data. : base(onData, onError, onDone, cancelOnError) { _EventSinkWrapper eventSink = new _EventSinkWrapper((_EventSink) this); _transformerSink = mapper(eventSink); _subscription = source.listen(_handleData, onError: _handleError, onDone: _handleDone); } /** Whether this subscription is still subscribed to its source. */ bool _isSubscribed { get { return _subscription != null; } } // _EventSink interface. public override void _add(T data) { if (_isClosed) { throw new Exception("Stream is already closed"); } base._add(data); } public override void _addError(object error, string stackTrace) { if (_isClosed) { throw new Exception("Stream is already closed"); } base._addError(error, stackTrace); } public override void _close() { if (_isClosed) { throw new Exception("Stream is already closed"); } base._close(); } // _BufferingStreamSubscription hooks. protected override void _onPause() { if (_isSubscribed) _subscription.pause(); } protected override void _onResume() { if (_isSubscribed) _subscription.resume(); } protected override Future _onCancel() { if (_isSubscribed) { StreamSubscription subscription = _subscription; _subscription = null; return subscription.cancel(); } return null; } void _handleData(S data) { try { _transformerSink.add(data); } catch (Exception e) { _addError(e, e.StackTrace); } } void _handleError(object error, string stackTrace) { try { _transformerSink.addError(error, stackTrace); } catch (Exception e) { if (Equals(e, error)) { _addError(error, stackTrace); } else { _addError(e, e.StackTrace); } } } void _handleDone() { try { _subscription = null; _transformerSink.close(); } catch (Exception e) { _addError(e, e.StackTrace); } } } class _StreamSinkTransformer : StreamTransformerBase where T : class { readonly _async._SinkMapper _sinkMapper; public _StreamSinkTransformer(_async._SinkMapper _sinkMapper) { this._sinkMapper = _sinkMapper; } public override Stream bind(Stream stream) => new _BoundSinkStream(stream, _sinkMapper); } class _BoundSinkStream : Stream { readonly _async._SinkMapper _sinkMapper; readonly Stream _stream; public override bool isBroadcast { get { return _stream.isBroadcast; } } internal _BoundSinkStream(Stream _stream, _async._SinkMapper _sinkMapper) { this._stream = _stream; this._sinkMapper = _sinkMapper; } public override StreamSubscription listen(Action onData, Action onError = null, Action onDone = null, bool cancelOnError = default) { StreamSubscription subscription = new _SinkTransformerStreamSubscription( _stream, _sinkMapper, onData, onError, onDone, cancelOnError); return subscription; } } static partial class _stream { public delegate void _TransformDataHandler(S data, EventSink sink); /// Error-handler coming from [StreamTransformer.fromHandlers]. public delegate void _TransformErrorHandler( object error, string stackTrace, EventSink sink); /// Done-handler coming from [StreamTransformer.fromHandlers]. public delegate void _TransformDoneHandler(EventSink sink); } class _HandlerEventSink : EventSink where T : class { readonly _stream._TransformDataHandler _handleData; readonly _stream._TransformErrorHandler _handleError; readonly _stream._TransformDoneHandler _handleDone; /// The output sink where the handlers should send their data into. EventSink _sink; internal _HandlerEventSink( _stream._TransformDataHandler _handleData, _stream._TransformErrorHandler _handleError, _stream._TransformDoneHandler _handleDone, EventSink _sink) { this._handleData = _handleData; this._handleError = _handleError; this._handleDone = _handleDone; this._sink = _sink; if (_sink == null) { throw new Exception("The provided sink must not be null."); } } bool _isClosed { get { return _sink == null; } } public override void add(S data) { if (_isClosed) { throw new Exception("Sink is closed"); } if (_handleData != null) { _handleData(data, _sink); } else { _sink.add(data as T); } } public override void addError(object error, string stackTrace) { // ArgumentError.checkNotNull(error, "error"); if (_isClosed) { throw new Exception("Sink is closed"); } if (_handleError != null) { stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error); _handleError(error, stackTrace, _sink); } else { _sink.addError(error, stackTrace); } } public override Future close() { if (_isClosed) return Future._nullFuture; var sink = _sink; _sink = null; if (_handleDone != null) { _handleDone(sink); } else { sink.close(); } return Future._nullFuture; } } class _StreamHandlerTransformer : _StreamSinkTransformer where T : class { internal _StreamHandlerTransformer( _stream._TransformDataHandler handleData = null, _stream._TransformErrorHandler handleError = null, _stream._TransformDoneHandler handleDone = null) : base((EventSink outputSink) => { return new _HandlerEventSink( handleData, handleError, handleDone, outputSink); }) { } public override Stream bind(Stream stream) { return base.bind(stream); } } class _StreamBindTransformer : StreamTransformerBase where T : class { readonly Func, Stream> _bind; internal _StreamBindTransformer(Func, Stream> _bind) { this._bind = _bind; } public override Stream bind(Stream stream) => _bind(stream); } public partial class _async { public delegate EventSink _SinkMapper(EventSink output); public delegate StreamSubscription _SubscriptionTransformer(Stream stream, bool cancelOnError); } class _StreamSubscriptionTransformer : StreamTransformerBase where T : class { readonly _async._SubscriptionTransformer _onListen; internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer _onListen) { this._onListen = _onListen; } public override Stream bind(Stream stream) => new _BoundSubscriptionStream(stream, _onListen); } class _BoundSubscriptionStream : Stream { internal _BoundSubscriptionStream(Stream _stream, _async._SubscriptionTransformer _onListen) { this._stream = _stream; this._onListen = _onListen; } readonly _async._SubscriptionTransformer _onListen; readonly Stream _stream; public override bool isBroadcast { get { return _stream.isBroadcast; } } public override StreamSubscription listen(Action onData, Action onError = null, Action onDone = null, bool cancelOnError = false) { //cancelOnError = cancelOnError; StreamSubscription result = _onListen(_stream, cancelOnError); result.onData(onData); result.onError(onError); result.onDone(onDone); return result; } } }