using System; using Unity.UIWidgets.async; namespace Unity.UIWidgets.async { public class CastStream : Stream { readonly Stream _source; public CastStream(Stream _source) { this._source = _source; } public override bool isBroadcast { get { return _source.isBroadcast; } } public override StreamSubscription listen(Action onData, Action onError = null, Action onDone = null, bool cancelOnError = false) { var result = new CastStreamSubscription( _source.listen(null, onDone: onDone, cancelOnError: cancelOnError)); result.onData(onData); result.onError(onError); return result; } public override Stream cast() => new CastStream(_source); } class CastStreamSubscription : StreamSubscription { readonly StreamSubscription _source; /// Zone where listen was called. readonly Zone _zone = Zone.current; /// User's data handler. May be null. ZoneUnaryCallback _handleData; /// Copy of _source's handleError so we can report errors in onData. /// May be null. ZoneBinaryCallback _handleError; public CastStreamSubscription(StreamSubscription _source) { this._source = _source; _source.onData(_onData); } public override Future cancel() => _source.cancel(); public override void onData(Action handleData) { _handleData = handleData == null ? null : _zone.registerUnaryCallback(data => { handleData((T) data); return null; }); } public override void onError(Action handleError) { _source.onError(handleError); if (handleError == null) { _handleError = null; } else { _handleError = _zone .registerBinaryCallback((a, b) => { handleError(a, (string) b); return null; }); } } public override void onDone(Action handleDone) { _source.onDone(handleDone); } void _onData(S data) { if (_handleData == null) return; T targetData; try { // siyao: this might go wrong targetData = (T) (object) data; } catch (Exception error) { if (_handleError == null) { _zone.handleUncaughtError(error); } else { _zone.runBinaryGuarded(_handleError, error, error.StackTrace); } return; } _zone.runUnaryGuarded(_handleData, targetData); } public override void pause(Future resumeSignal = null) { _source.pause(resumeSignal); } public override void resume() { _source.resume(); } public override bool isPaused { get { return _source.isPaused; } } public override Future asFuture(E futureValue) => _source.asFuture(futureValue); } class CastStreamTransformer : StreamTransformerBase { public readonly StreamTransformer _source; public CastStreamTransformer(StreamTransformer _source) { this._source = _source; } public override StreamTransformer cast() => new CastStreamTransformer(_source); public override Stream bind(Stream stream) => _source.bind(stream.cast()).cast(); } }