using System; using System.Collections.Generic; using System.Text; using Unity.UIWidgets.core; using Unity.UIWidgets.foundation; using Stopwatch = Unity.UIWidgets.core.Stopwatch; namespace Unity.UIWidgets.async { public static partial class _stream { public delegate void _TimerCallback(); } public abstract class Stream { public Stream() { } // const Stream._internal(); public static Stream empty() => new _EmptyStream(); // @Since("2.5") public static Stream value(T value) { var result = new _AsyncStreamController(null, null, null, null); result._add(value); result._closeUnchecked(); return result.stream; } // @Since("2.5") public static Stream error(object error, string stackTrace = null) { // ArgumentError.checkNotNull(error, "error"); var result = new _AsyncStreamController(null, null, null, null); result._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)); result._closeUnchecked(); return result.stream; } public static Stream fromFuture(Future future) { // Use the controller's buffering to fill in the value even before // the stream has a listener. For a single value, it's not worth it // to wait for a listener before doing the `then` on the future. _StreamController controller = new _SyncStreamController(null, null, null, null); future.then((value) => { controller._add((T) value); controller._closeUnchecked(); }, onError: (error) => { controller._addError(error, null); controller._closeUnchecked(); return FutureOr.nil; }); return controller.stream; } public static Stream fromFutures(IEnumerable> futures) { _StreamController controller = new _SyncStreamController(null, null, null, null); int count = 0; // Declare these as variables holding closures instead of as // function declarations. // This avoids creating a new closure from the functions for each future. var onValue = new Action((object value) => { if (!controller.isClosed) { controller._add((T) value); if (--count == 0) controller._closeUnchecked(); } }); var onError = new Func((error) => { if (!controller.isClosed) { controller._addError(error, null); if (--count == 0) controller._closeUnchecked(); } return FutureOr.nil; }); // The futures are already running, so start listening to them immediately // (instead of waiting for the stream to be listened on). // If we wait, we might not catch errors in the futures in time. foreach (var future in futures) { count++; future.then(onValue, onError: onError); } // Use schedule microtask since controller is sync. if (count == 0) async_.scheduleMicrotask(controller.close); return controller.stream; } public static Stream fromIterable(IEnumerable elements) { return new _GeneratedStreamImpl( () => (_PendingEvents) new _IterablePendingEvents(elements)); } public static Stream periodic(TimeSpan period, Func computation = null) { Timer timer = default; int computationCount = 0; StreamController controller = null; // Counts the time that the Stream was running (and not paused). Stopwatch watch = new Stopwatch(); Action sendEvent = () => { watch.reset(); T data = default; if (computation != null) { try { data = computation(computationCount++); } catch (Exception e) { controller.addError(e, e.StackTrace); return; } } controller.add(data); }; Action startPeriodicTimer = () => { D.assert(timer == null); timer = Timer.periodic(period, (object timer1) => { sendEvent(); return null; }); }; // the original code new an abstract class controller = StreamController.create( sync: true, onListen: () => { watch.start(); startPeriodicTimer(); }, onPause: () => { timer.cancel(); timer = null; watch.stop(); }, onResume: () => { D.assert(timer == null); TimeSpan elapsed = watch.elapsed; watch.start(); timer = Timer.create(period - elapsed, () => { timer = null; startPeriodicTimer(); sendEvent(); }); }, onCancel: () => { if (timer != null) timer.cancel(); timer = null; return Future._nullFuture; }); return controller.stream; } public static Stream eventTransformed( Stream source, _async._SinkMapper mapSink) { return new _BoundSinkStream(source, mapSink); } static Stream castFrom(Stream source) => new CastStream(source); public virtual bool isBroadcast { get { return false; } } public virtual Stream asBroadcastStream( Action> onListen = null, Action> onCancel = null) { return new _AsBroadcastStream(this, onListen, onCancel); } public abstract StreamSubscription listen( Action onData, Action onError = null, Action onDone = null, bool cancelOnError = false); public Stream where(Func test) { return new _WhereStream(this, test); } public Stream map(Func convert) { return new _MapStream(this, convert); } public Stream asyncMap(Func convert) { _StreamControllerBase controller = null; StreamSubscription subscription = null; void onListen() { var add = new Action(controller.add); D.assert(controller is _StreamController || controller is _BroadcastStreamController); var addError = new Action(controller._addError); subscription = listen((T evt) => { FutureOr newValue; try { newValue = convert(evt); } catch (Exception e) { controller.addError(e, e.StackTrace); return; } if (newValue.f is Future newFuture) { // siyao: this if different from dart subscription.pause(); newFuture .then(d => add((E) d), onError: (e) => { addError(e, e.StackTrace); return FutureOr.nil; }) .whenComplete(subscription.resume); } else { // Siyao: This works as if this is csharpt controller.add((E) newValue.v); } }, onError: addError, onDone: () => controller.close()); } if (isBroadcast) { controller = (_StreamControllerBase) StreamController.broadcast( onListen: () => onListen(), onCancel: () => { subscription.cancel(); }, sync: true); } else { controller = (_StreamControllerBase) StreamController.create( onListen: onListen, onPause: () => { subscription.pause(); }, onResume: () => { subscription.resume(); }, onCancel: () => subscription.cancel(), sync: true); } return controller.stream; } Stream asyncExpand(Func> convert) { _StreamControllerBase controller = null; StreamSubscription subscription = null; void onListen() { D.assert(controller is _StreamController || controller is _BroadcastStreamController); subscription = listen((T evt) => { Stream newStream; try { newStream = convert(evt); } catch (Exception e) { controller.addError(e, e.StackTrace); return; } if (newStream != null) { subscription.pause(); controller.addStream(newStream).whenComplete(subscription.resume); } }, onError: controller._addError, // Avoid Zone error replacement. onDone: () => controller.close()); } if (isBroadcast) { controller = (_StreamControllerBase) StreamController.broadcast( onListen: () => onListen(), onCancel: () => { subscription.cancel(); }, sync: true); } else { controller = (_StreamControllerBase) StreamController.create( onListen: () => onListen(), onPause: () => { subscription.pause(); }, onResume: () => { subscription.resume(); }, onCancel: () => subscription.cancel(), sync: true); } return controller.stream; } Stream handleError(ZoneBinaryCallback onError, _stream._ErrorTest test = null) { return new _HandleErrorStream(this, onError, test); } Stream expand(_stream._Transformation> convert) { return new _ExpandStream(this, convert); } Future pipe(StreamConsumer streamConsumer) { return streamConsumer.addStream(this).then((_) => streamConsumer.close(), (_) => FutureOr.nil); } public Stream transform(StreamTransformer streamTransformer) { return streamTransformer.bind(this); } Future reduce(Func combine) { _Future result = new _Future(); bool seenFirst = false; T value = default; StreamSubscription subscription = null; subscription = listen( (T element) => { if (seenFirst) { _stream._runUserCode(() => combine(value, element), (T newValue) => { value = newValue; }, onError: (e) => _stream._cancelAndErrorClosure(subscription, result)(e)); } else { value = element; seenFirst = true; } }, onError: (e, s) => result._completeError((Exception) e), onDone: () => { if (!seenFirst) { try { // Throw and recatch, instead of just doing // _completeWithErrorCallback, e, theError, StackTrace.current), // to ensure that the stackTrace is set on the error. throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(result, e); } } else { // TODO: need check result._complete(FutureOr.value(value)); } }, cancelOnError: true); return result.to(); } Future fold(S initialValue, Func combine) { _Future result = new _Future(); S value = initialValue; StreamSubscription subscription = null; subscription = listen( (T element) => { _stream._runUserCode(() => combine(value, element), (S newValue) => { value = newValue; }, e => _stream._cancelAndErrorClosure(subscription, result)(e)); }, onError: (e, s) => result._completeError((Exception) e), onDone: () => { result._complete(FutureOr.value(value)); }, cancelOnError: true); return result.to(); } Future join(string separator = "") { _Future result = new _Future(); StringBuilder buffer = new StringBuilder(); StreamSubscription subscription = null; bool first = true; subscription = listen( (T element) => { if (!first) { buffer.Append(separator); } first = false; try { buffer.Append(element); } catch (Exception e) { _stream._cancelAndErrorWithReplacement(subscription, result, e); } }, onError: (e, _) => result._completeError((Exception) e), onDone: () => { result._complete(buffer.ToString()); }, cancelOnError: true); return result.to(); } Future contains(object needle) { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T element) => { _stream._runUserCode(() => (Equals(element, needle)), (bool isMatch) => { if (isMatch) { _stream._cancelAndValue(subscription, future, true); } }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(false); }, cancelOnError: true); return future.to(); } public Future forEach(Action action) { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T element) => { // TODO(floitsch): the type should be 'void' and inferred. _stream._runUserCode(() => { action(element); return default; }, (_) => { }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(FutureOr.nil); }, cancelOnError: true); return future; } Future every(Func test) { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T element) => { _stream._runUserCode(() => test(element), (bool isMatch) => { if (!isMatch) { _stream._cancelAndValue(subscription, future, false); } }, ex => _stream._cancelAndErrorClosure(subscription, future)(ex)); }, onError: (ex, s) => future._completeError((Exception) ex), onDone: () => { future._complete(true); }, cancelOnError: true); return future.to(); } Future any(Func test) { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T element) => { _stream._runUserCode(() => test(element), (bool isMatch) => { if (isMatch) { _stream._cancelAndValue(subscription, future, true); } }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(false); }, cancelOnError: true); return future.to(); } Future length { get { _Future future = new _Future(); int count = 0; listen( (_) => { count++; }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(count); }, cancelOnError: true); return future.to(); } } Future isEmpty { get { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (_) => { _stream._cancelAndValue(subscription, future, false); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(true); }, cancelOnError: true); return future.to(); } } public Stream cast() => Stream.castFrom(this); public Future> toList() { List result = new List(); _Future future = new _Future(); listen( (T data) => { result.Add(data); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(FutureOr.value(result)); }, cancelOnError: true); return future.to>(); } public Future> toSet() { HashSet result = new HashSet(); _Future future = new _Future(); listen( (T data) => { result.Add(data); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._complete(FutureOr.value(result)); }, cancelOnError: true); return future.to>(); } Future drain(E futureValue) => listen(null, cancelOnError: true).asFuture(futureValue); public Stream take(int count) { return new _TakeStream(this, count); } Stream takeWhile(Func test) { return new _TakeWhileStream(this, d => test(d)); } Stream skip(int count) { return new _SkipStream(this, count); } Stream skipWhile(Func test) { return new _SkipWhileStream(this, d => test(d)); } public Stream distinct(Func equals) { return new _DistinctStream(this, (d1, d2) => equals(d1, d2)); } Future first { get { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T value) => { _stream._cancelAndValue(subscription, future, value); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { try { throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } } Future last { get { _Future future = new _Future(); T result = default; bool foundResult = false; listen( (T value) => { foundResult = true; result = value; }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { if (foundResult) { future._complete(FutureOr.value(result)); return; } try { throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } } Future single { get { _Future future = new _Future(); T result = default; bool foundResult = false; StreamSubscription subscription = null; subscription = listen( (T value) => { if (foundResult) { // This is the second element we get. try { throw new Exception("IterableElementError.tooMany()"); } catch (Exception e) { _stream._cancelAndErrorWithReplacement(subscription, future, e); } return; } foundResult = true; result = value; }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { if (foundResult) { future._complete(FutureOr.value(result)); return; } try { throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } } Future firstWhere(Func test, Func orElse = null) { _Future future = new _Future(); StreamSubscription subscription = null; subscription = listen( (T value) => { _stream._runUserCode(() => test(value), (bool isMatch) => { if (isMatch) { _stream._cancelAndValue(subscription, future, value); } }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { if (orElse != null) { _stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)), future._completeError); return; } try { throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } Future lastWhere(Func test, Func orElse = null) { _Future future = new _Future(); T result = default; bool foundResult = false; StreamSubscription subscription = null; subscription = listen( (T value) => { _stream._runUserCode(() => true == test(value), (bool isMatch) => { if (isMatch) { foundResult = true; result = value; } }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { if (foundResult) { future._complete(FutureOr.value(result)); return; } if (orElse != null) { _stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)), future._completeError); return; } try { throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } Future singleWhere(Func test, Func orElse = null) { _Future future = new _Future(); T result = default; bool foundResult = false; StreamSubscription subscription = null; subscription = listen( (T value) => { _stream._runUserCode(() => true == test(value), (bool isMatch) => { if (isMatch) { if (foundResult) { try { throw new Exception("IterableElementError.tooMany()"); } catch (Exception e) { _stream._cancelAndErrorWithReplacement(subscription, future, e); } return; } foundResult = true; result = value; } }, (e) => _stream._cancelAndErrorClosure(subscription, future)(e)); }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { if (foundResult) { future._complete(FutureOr.value(result)); return; } try { if (orElse != null) { _stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)), future._completeError); return; } throw new Exception("IterableElementError.noElement()"); } catch (Exception e) { async_._completeWithErrorCallback(future, e); } }, cancelOnError: true); return future.to(); } Future elementAt(int index) { // ArgumentError.checkNotNull(index, "index"); // RangeError.checkNotNegative(index, "index"); _Future future = new _Future(); StreamSubscription subscription = null; int elementIndex = 0; subscription = listen( (T value) => { if (index == elementIndex) { _stream._cancelAndValue(subscription, future, value); return; } elementIndex += 1; }, onError: (e, _) => future._completeError((Exception) e), onDone: () => { future._completeError( new Exception($"exception {index} null, {elementIndex}") // new RangeError.index(index, this, "index", null, elementIndex) ); }, cancelOnError: true); return future.to(); } public Stream timeout(TimeSpan timeLimit, Action> onTimeout) { _StreamControllerBase controller = null; // The following variables are set on listen. StreamSubscription subscription = null; Timer timer = null; Zone zone = null; _stream._TimerCallback timeout = null; Action onData = (T evt) => { timer.cancel(); timer = zone.createTimer(timeLimit, () => { timeout(); return default; }); // It might close the stream and cancel timer, so create recuring Timer // before calling into add(); // issue: https://github.com/dart-lang/sdk/issues/37565 controller.add(evt); }; Action onError = (object error, string stack) => { timer.cancel(); D.assert(controller is _StreamController || controller is _BroadcastStreamController); Exception e = error as Exception; controller._addError(e, e.StackTrace); // Avoid Zone error replacement. timer = zone.createTimer(timeLimit, () => { timeout(); return default; }); }; Action onDone = () => { timer.cancel(); controller.close(); }; Action onListen = () => { // This is the onListen callback for of controller. // It runs in the same zone that the subscription was created in. // Use that zone for creating timers and running the onTimeout // callback. zone = Zone.current; if (onTimeout == null) { timeout = () => { controller.addError( new TimeoutException("No stream event", timeLimit), null); }; } else { // TODO(floitsch): the return type should be 'void', and the type // should be inferred. var registeredOnTimeout = zone.registerUnaryCallback((o) => { onTimeout((EventSink) o); return default; }); var wrapper = new _ControllerEventSinkWrapper(null); timeout = () => { wrapper._sink = controller; // Only valid during call. zone.runUnaryGuarded(registeredOnTimeout, wrapper); wrapper._sink = null; }; } subscription = listen(onData, onError: onError, onDone: onDone); timer = zone.createTimer(timeLimit, () => { timeout(); return default; }); }; Future onCancel() { timer.cancel(); Future result = subscription.cancel(); subscription = null; return result; } controller = isBroadcast ? (_StreamControllerBase) new _SyncBroadcastStreamController(() => onListen(), () => onCancel()) : new _SyncStreamController(() => onListen(), () => { // Don't null the timer, onCancel may call cancel again. timer.cancel(); subscription.pause(); }, () => { subscription.resume(); timer = zone.createTimer(timeLimit, () => { timeout(); return default; }); }, onCancel); return controller.stream; } } public abstract class StreamSubscription { public abstract Future cancel(); public abstract void onData(Action handleData); public abstract void onError(Action action); public abstract void onDone(Action handleDone); public abstract void pause(Future resumeSignal = null); public abstract void resume(); public virtual bool isPaused { get; } public abstract Future asFuture(E futureValue); } public abstract class EventSink : Sink { // public abstract void add(T evt); public abstract void addError(object error, string stackTrace); // void close(); } // /** [Stream] wrapper that only exposes the [Stream] interface. */ public class StreamView : Stream { readonly Stream _stream; public StreamView(Stream stream) : base() { _stream = stream; } public override bool isBroadcast { get { return _stream.isBroadcast; } } public override Stream asBroadcastStream(Action> onListen = null, Action> onCancel = null) => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); public override StreamSubscription listen(Action onData, Action onError = null, Action onDone = null, bool cancelOnError = false) { return _stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); } } public interface StreamConsumer { Future addStream(Stream stream); Future close(); } public abstract class StreamSink : EventSink, StreamConsumer { // Future close(); public virtual Future done { get; } public virtual Future addStream(Stream stream) { throw new System.NotImplementedException(); } // public Future closeConsumer() { // throw new System.NotImplementedException(); // } } public abstract class StreamTransformer { // c# does not support change constructor public static StreamTransformer create(_async._SubscriptionTransformer onListen) { return new _StreamSubscriptionTransformer(onListen); } public static StreamTransformer fromHandlers( _stream._TransformDataHandler handleData = null, _stream._TransformErrorHandler handleError = null, _stream._TransformDoneHandler handleDone = null) { return new _StreamHandlerTransformer(handleData, handleError, handleDone); } // @Since("2.1") public static StreamTransformer fromBind(Func, Stream> bind) { return new _StreamBindTransformer(bind); } public static StreamTransformer castFrom( StreamTransformer source) { return new CastStreamTransformer(source); } public abstract Stream bind(Stream stream); public abstract StreamTransformer cast(); } public abstract class StreamTransformerBase : StreamTransformer { public StreamTransformerBase() { } public override StreamTransformer cast() => StreamTransformer.castFrom(this); } public abstract class StreamIterator { /** Create a [StreamIterator] on [stream]. */ public static StreamIterator Create(Stream stream) // TODO(lrn): use redirecting factory constructor when type // arguments are supported. => new _StreamIterator(stream); public abstract Future moveNext(); T current { get; } public abstract Future cancel(); } internal class _ControllerEventSinkWrapper : EventSink { internal EventSink _sink; internal _ControllerEventSinkWrapper(EventSink _sink) { this._sink = _sink; } public override void add(T data) { _sink.add(data); } public override void addError(object error, string stackTrace) { _sink.addError(error, stackTrace); } public override Future close() { _sink.close(); return Future._nullFuture; } } }