浏览代码

stream

/main
siyao 3 年前
当前提交
bd6cbfa6
共有 14 个文件被更改,包括 2303 次插入2 次删除
  1. 4
      com.unity.uiwidgets/Runtime/async/future.cs
  2. 8
      com.unity.uiwidgets/Runtime/async/zone.cs
  3. 16
      com.unity.uiwidgets/Runtime/async/async_cast.cs
  4. 3
      com.unity.uiwidgets/Runtime/async/async_cast.cs.meta
  5. 7
      com.unity.uiwidgets/Runtime/async/sink.cs
  6. 3
      com.unity.uiwidgets/Runtime/async/sink.cs.meta
  7. 946
      com.unity.uiwidgets/Runtime/async/stream.cs
  8. 3
      com.unity.uiwidgets/Runtime/async/stream.cs.meta
  9. 652
      com.unity.uiwidgets/Runtime/async/stream_impl.cs
  10. 3
      com.unity.uiwidgets/Runtime/async/stream_impl.cs.meta
  11. 301
      com.unity.uiwidgets/Runtime/async/stream_transformers.cs
  12. 3
      com.unity.uiwidgets/Runtime/async/stream_transformers.cs.meta
  13. 353
      com.unity.uiwidgets/Runtime/widgets/async.cs
  14. 3
      com.unity.uiwidgets/Runtime/widgets/async.cs.meta

4
com.unity.uiwidgets/Runtime/async/future.cs


}
public abstract class Future {
static readonly _Future _nullFuture = _Future.zoneValue(null, Zone.root);
internal static readonly _Future _nullFuture = _Future.zoneValue(null, Zone.root);
static readonly _Future _falseFuture = _Future.zoneValue(false, Zone.root);
internal static readonly _Future _falseFuture = _Future.zoneValue(false, Zone.root);
public static Future create(Func<FutureOr> computation) {
_Future result = new _Future();

8
com.unity.uiwidgets/Runtime/async/zone.cs


public AsyncError(Exception innerException) : base(null, innerException) {
}
public static string defaultStackTrace(object error) {
if (error is Exception ex) {
return ex.StackTrace;
}
return "";
}
}
struct _ZoneFunction<T> where T : Delegate {

16
com.unity.uiwidgets/Runtime/async/async_cast.cs


using System.Runtime.Versioning;
using Unity.UIWidgets.async;
class CastStreamTransformer<SS, ST, TS, TT>
: StreamTransformerBase<TS, TT> {
public readonly StreamTransformer<SS, ST> _source;
public CastStreamTransformer(StreamTransformer<SS, ST> _source) {
this._source = _source;
}
public override StreamTransformer<RS, RT> cast<RS, RT>() =>
new CastStreamTransformer<SS, ST, RS, RT>(_source);
public override Stream<TT> bind(Stream<TS> stream) =>
_source.bind(stream.cast<SS>()).cast<TT>();
}

3
com.unity.uiwidgets/Runtime/async/async_cast.cs.meta


fileFormatVersion: 2
guid: 42f5ce67e0cb4ef18d4e4f51a08fb08c
timeCreated: 1628682203

7
com.unity.uiwidgets/Runtime/async/sink.cs


namespace Unity.UIWidgets.core {
public abstract class Sink<T> {
public abstract void add(T data);
public abstract void close();
}
}

3
com.unity.uiwidgets/Runtime/async/sink.cs.meta


fileFormatVersion: 2
guid: 8b152784a6234bc493708702199a316d
timeCreated: 1628676429

946
com.unity.uiwidgets/Runtime/async/stream.cs


//part of dart.async;
using System;
using Unity.UIWidgets.core;
namespace Unity.UIWidgets.async {
public static class _stream{
public delegate void _TimerCallback();
}
// abstract class Stream<T> {
// Stream();
// const Stream._internal();
// const factory Stream.empty() = _EmptyStream<T>;
// // @Since("2.5")
// factory Stream.value(T value) =>
// (_AsyncStreamController<T>(null, null, null, null)
// .._add(value)
// .._closeUnchecked())
// .stream;
// // @Since("2.5")
// factory Stream.error(Object error, [StackTrace stackTrace]) {
// ArgumentError.checkNotNull(error, "error");
// return (_AsyncStreamController<T>(null, null, null, null)
// .._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error))
// .._closeUnchecked())
// .stream;
// }
// factory Stream.fromFuture(Future<T> future) {
// // Use the controller's buffering to fill in the value even before
// // the stream has a listener. For a single value, it's not worth it
// // to wait for a listener before doing the `then` on the future.
// _StreamController<T> controller =
// new _SyncStreamController<T>(null, null, null, null);
// future.then((value) {
// controller._add(value);
// controller._closeUnchecked();
// }, onError: (error, stackTrace) {
// controller._addError(error, stackTrace);
// controller._closeUnchecked();
// });
// return controller.stream;
// }
// factory Stream.fromFutures(Iterable<Future<T>> futures) {
// _StreamController<T> controller =
// new _SyncStreamController<T>(null, null, null, null);
// int count = 0;
// // Declare these as variables holding closures instead of as
// // function declarations.
// // This avoids creating a new closure from the functions for each future.
// var onValue = (T value) {
// if (!controller.isClosed) {
// controller._add(value);
// if (--count == 0) controller._closeUnchecked();
// }
// };
// var onError = (error, StackTrace stack) {
// if (!controller.isClosed) {
// controller._addError(error, stack);
// if (--count == 0) controller._closeUnchecked();
// }
// };
// // The futures are already running, so start listening to them immediately
// // (instead of waiting for the stream to be listened on).
// // If we wait, we might not catch errors in the futures in time.
// for (var future in futures) {
// count++;
// future.then(onValue, onError: onError);
// }
// // Use schedule microtask since controller is sync.
// if (count == 0) scheduleMicrotask(controller.close);
// return controller.stream;
// }
// factory Stream.fromIterable(Iterable<T> elements) {
// return new _GeneratedStreamImpl<T>(
// () => new _IterablePendingEvents<T>(elements));
// }
// factory Stream.periodic(Duration period,
// [T computation(int computationCount)]) {
// Timer timer;
// int computationCount = 0;
// StreamController<T> controller;
// // Counts the time that the Stream was running (and not paused).
// Stopwatch watch = new Stopwatch();
// void sendEvent() {
// watch.reset();
// T data;
// if (computation != null) {
// try {
// data = computation(computationCount++);
// } catch (e, s) {
// controller.addError(e, s);
// return;
// }
// }
// controller.add(data);
// }
// void startPeriodicTimer() {
// assert(timer == null);
// timer = new Timer.periodic(period, (Timer timer) {
// sendEvent();
// });
// }
// controller = new StreamController<T>(
// sync: true,
// onListen: () {
// watch.start();
// startPeriodicTimer();
// },
// onPause: () {
// timer.cancel();
// timer = null;
// watch.stop();
// },
// onResume: () {
// assert(timer == null);
// Duration elapsed = watch.elapsed;
// watch.start();
// timer = new Timer(period - elapsed, () {
// timer = null;
// startPeriodicTimer();
// sendEvent();
// });
// },
// onCancel: () {
// if (timer != null) timer.cancel();
// timer = null;
// return Future._nullFuture;
// });
// return controller.stream;
// }
// factory Stream.eventTransformed(
// Stream source, EventSink mapSink(EventSink<T> sink)) {
// return new _BoundSinkStream(source, mapSink);
// }
// static Stream<T> castFrom<S, T>(Stream<S> source) =>
// new CastStream<S, T>(source);
// bool get isBroadcast => false;
// Stream<T> asBroadcastStream(
// {void onListen(StreamSubscription<T> subscription),
// void onCancel(StreamSubscription<T> subscription)}) {
// return new _AsBroadcastStream<T>(this, onListen, onCancel);
// }
// StreamSubscription<T> listen(void onData(T event),
// {Function onError, void onDone(), bool cancelOnError});
// Stream<T> where(bool test(T event)) {
// return new _WhereStream<T>(this, test);
// }
// Stream<S> map<S>(S convert(T event)) {
// return new _MapStream<T, S>(this, convert);
// }
// Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
// _StreamControllerBase<E> controller;
// StreamSubscription<T> subscription;
// void onListen() {
// final add = controller.add;
// assert(controller is _StreamController<E> ||
// controller is _BroadcastStreamController);
// final addError = controller._addError;
// subscription = this.listen((T event) {
// FutureOr<E> newValue;
// try {
// newValue = convert(event);
// } catch (e, s) {
// controller.addError(e, s);
// return;
// }
// if (newValue is Future<E>) {
// subscription.pause();
// newValue
// .then(add, onError: addError)
// .whenComplete(subscription.resume);
// } else {
// controller.add(newValue);
// }
// }, onError: addError, onDone: controller.close);
// }
// if (this.isBroadcast) {
// controller = new StreamController<E>.broadcast(
// onListen: onListen,
// onCancel: () {
// subscription.cancel();
// },
// sync: true);
// } else {
// controller = new StreamController<E>(
// onListen: onListen,
// onPause: () {
// subscription.pause();
// },
// onResume: () {
// subscription.resume();
// },
// onCancel: () => subscription.cancel(),
// sync: true);
// }
// return controller.stream;
// }
// Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
// _StreamControllerBase<E> controller;
// StreamSubscription<T> subscription;
// void onListen() {
// assert(controller is _StreamController ||
// controller is _BroadcastStreamController);
// subscription = this.listen((T event) {
// Stream<E> newStream;
// try {
// newStream = convert(event);
// } catch (e, s) {
// controller.addError(e, s);
// return;
// }
// if (newStream != null) {
// subscription.pause();
// controller.addStream(newStream).whenComplete(subscription.resume);
// }
// },
// onError: controller._addError, // Avoid Zone error replacement.
// onDone: controller.close);
// }
// if (this.isBroadcast) {
// controller = new StreamController<E>.broadcast(
// onListen: onListen,
// onCancel: () {
// subscription.cancel();
// },
// sync: true);
// } else {
// controller = new StreamController<E>(
// onListen: onListen,
// onPause: () {
// subscription.pause();
// },
// onResume: () {
// subscription.resume();
// },
// onCancel: () => subscription.cancel(),
// sync: true);
// }
// return controller.stream;
// }
// Stream<T> handleError(Function onError, {bool test(error)}) {
// return new _HandleErrorStream<T>(this, onError, test);
// }
// Stream<S> expand<S>(Iterable<S> convert(T element)) {
// return new _ExpandStream<T, S>(this, convert);
// }
// Future pipe(StreamConsumer<T> streamConsumer) {
// return streamConsumer.addStream(this).then((_) => streamConsumer.close());
// }
// Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
// return streamTransformer.bind(this);
// }
// Future<T> reduce(T combine(T previous, T element)) {
// _Future<T> result = new _Future<T>();
// bool seenFirst = false;
// T value;
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// if (seenFirst) {
// _runUserCode(() => combine(value, element), (T newValue) {
// value = newValue;
// }, _cancelAndErrorClosure(subscription, result));
// } else {
// value = element;
// seenFirst = true;
// }
// },
// onError: result._completeError,
// onDone: () {
// if (!seenFirst) {
// try {
// // Throw and recatch, instead of just doing
// // _completeWithErrorCallback, e, theError, StackTrace.current),
// // to ensure that the stackTrace is set on the error.
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(result, e, s);
// }
// } else {
// result._complete(value);
// }
// },
// cancelOnError: true);
// return result;
// }
// Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
// _Future<S> result = new _Future<S>();
// S value = initialValue;
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// _runUserCode(() => combine(value, element), (S newValue) {
// value = newValue;
// }, _cancelAndErrorClosure(subscription, result));
// },
// onError: result._completeError,
// onDone: () {
// result._complete(value);
// },
// cancelOnError: true);
// return result;
// }
// Future<String> join([String separator = ""]) {
// _Future<String> result = new _Future<String>();
// StringBuffer buffer = new StringBuffer();
// StreamSubscription subscription;
// bool first = true;
// subscription = this.listen(
// (T element) {
// if (!first) {
// buffer.write(separator);
// }
// first = false;
// try {
// buffer.write(element);
// } catch (e, s) {
// _cancelAndErrorWithReplacement(subscription, result, e, s);
// }
// },
// onError: result._completeError,
// onDone: () {
// result._complete(buffer.toString());
// },
// cancelOnError: true);
// return result;
// }
// Future<bool> contains(Object needle) {
// _Future<bool> future = new _Future<bool>();
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// _runUserCode(() => (element == needle), (bool isMatch) {
// if (isMatch) {
// _cancelAndValue(subscription, future, true);
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// future._complete(false);
// },
// cancelOnError: true);
// return future;
// }
// Future forEach(void action(T element)) {
// _Future future = new _Future();
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// // TODO(floitsch): the type should be 'void' and inferred.
// _runUserCode<dynamic>(() => action(element), (_) {},
// _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// future._complete(null);
// },
// cancelOnError: true);
// return future;
// }
// Future<bool> every(bool test(T element)) {
// _Future<bool> future = new _Future<bool>();
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// _runUserCode(() => test(element), (bool isMatch) {
// if (!isMatch) {
// _cancelAndValue(subscription, future, false);
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// future._complete(true);
// },
// cancelOnError: true);
// return future;
// }
// Future<bool> any(bool test(T element)) {
// _Future<bool> future = new _Future<bool>();
// StreamSubscription subscription;
// subscription = this.listen(
// (T element) {
// _runUserCode(() => test(element), (bool isMatch) {
// if (isMatch) {
// _cancelAndValue(subscription, future, true);
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// future._complete(false);
// },
// cancelOnError: true);
// return future;
// }
// Future<int> get length {
// _Future<int> future = new _Future<int>();
// int count = 0;
// this.listen(
// (_) {
// count++;
// },
// onError: future._completeError,
// onDone: () {
// future._complete(count);
// },
// cancelOnError: true);
// return future;
// }
// Future<bool> get isEmpty {
// _Future<bool> future = new _Future<bool>();
// StreamSubscription subscription;
// subscription = this.listen(
// (_) {
// _cancelAndValue(subscription, future, false);
// },
// onError: future._completeError,
// onDone: () {
// future._complete(true);
// },
// cancelOnError: true);
// return future;
// }
// Stream<R> cast<R>() => Stream.castFrom<T, R>(this);
// Future<List<T>> toList() {
// List<T> result = <T>[];
// _Future<List<T>> future = new _Future<List<T>>();
// this.listen(
// (T data) {
// result.add(data);
// },
// onError: future._completeError,
// onDone: () {
// future._complete(result);
// },
// cancelOnError: true);
// return future;
// }
// Future<Set<T>> toSet() {
// Set<T> result = new Set<T>();
// _Future<Set<T>> future = new _Future<Set<T>>();
// this.listen(
// (T data) {
// result.add(data);
// },
// onError: future._completeError,
// onDone: () {
// future._complete(result);
// },
// cancelOnError: true);
// return future;
// }
// Future<E> drain<E>([E futureValue]) =>
// listen(null, cancelOnError: true).asFuture<E>(futureValue);
// Stream<T> take(int count) {
// return new _TakeStream<T>(this, count);
// }
// Stream<T> takeWhile(bool test(T element)) {
// return new _TakeWhileStream<T>(this, test);
// }
// Stream<T> skip(int count) {
// return new _SkipStream<T>(this, count);
// }
// Stream<T> skipWhile(bool test(T element)) {
// return new _SkipWhileStream<T>(this, test);
// }
// Stream<T> distinct([bool equals(T previous, T next)]) {
// return new _DistinctStream<T>(this, equals);
// }
// Future<T> get first {
// _Future<T> future = new _Future<T>();
// StreamSubscription subscription;
// subscription = this.listen(
// (T value) {
// _cancelAndValue(subscription, future, value);
// },
// onError: future._completeError,
// onDone: () {
// try {
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> get last {
// _Future<T> future = new _Future<T>();
// T result;
// bool foundResult = false;
// listen(
// (T value) {
// foundResult = true;
// result = value;
// },
// onError: future._completeError,
// onDone: () {
// if (foundResult) {
// future._complete(result);
// return;
// }
// try {
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> get single {
// _Future<T> future = new _Future<T>();
// T result;
// bool foundResult = false;
// StreamSubscription subscription;
// subscription = this.listen(
// (T value) {
// if (foundResult) {
// // This is the second element we get.
// try {
// throw IterableElementError.tooMany();
// } catch (e, s) {
// _cancelAndErrorWithReplacement(subscription, future, e, s);
// }
// return;
// }
// foundResult = true;
// result = value;
// },
// onError: future._completeError,
// onDone: () {
// if (foundResult) {
// future._complete(result);
// return;
// }
// try {
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> firstWhere(bool test(T element), {T orElse()}) {
// _Future<T> future = new _Future();
// StreamSubscription subscription;
// subscription = this.listen(
// (T value) {
// _runUserCode(() => test(value), (bool isMatch) {
// if (isMatch) {
// _cancelAndValue(subscription, future, value);
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// if (orElse != null) {
// _runUserCode(orElse, future._complete, future._completeError);
// return;
// }
// try {
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> lastWhere(bool test(T element), {T orElse()}) {
// _Future<T> future = new _Future();
// T result;
// bool foundResult = false;
// StreamSubscription subscription;
// subscription = this.listen(
// (T value) {
// _runUserCode(() => true == test(value), (bool isMatch) {
// if (isMatch) {
// foundResult = true;
// result = value;
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// if (foundResult) {
// future._complete(result);
// return;
// }
// if (orElse != null) {
// _runUserCode(orElse, future._complete, future._completeError);
// return;
// }
// try {
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> singleWhere(bool test(T element), {T orElse()}) {
// _Future<T> future = new _Future<T>();
// T result;
// bool foundResult = false;
// StreamSubscription subscription;
// subscription = this.listen(
// (T value) {
// _runUserCode(() => true == test(value), (bool isMatch) {
// if (isMatch) {
// if (foundResult) {
// try {
// throw IterableElementError.tooMany();
// } catch (e, s) {
// _cancelAndErrorWithReplacement(subscription, future, e, s);
// }
// return;
// }
// foundResult = true;
// result = value;
// }
// }, _cancelAndErrorClosure(subscription, future));
// },
// onError: future._completeError,
// onDone: () {
// if (foundResult) {
// future._complete(result);
// return;
// }
// try {
// if (orElse != null) {
// _runUserCode(orElse, future._complete, future._completeError);
// return;
// }
// throw IterableElementError.noElement();
// } catch (e, s) {
// _completeWithErrorCallback(future, e, s);
// }
// },
// cancelOnError: true);
// return future;
// }
// Future<T> elementAt(int index) {
// ArgumentError.checkNotNull(index, "index");
// RangeError.checkNotNegative(index, "index");
// _Future<T> future = new _Future<T>();
// StreamSubscription subscription;
// int elementIndex = 0;
// subscription = this.listen(
// (T value) {
// if (index == elementIndex) {
// _cancelAndValue(subscription, future, value);
// return;
// }
// elementIndex += 1;
// },
// onError: future._completeError,
// onDone: () {
// future._completeError(
// new RangeError.index(index, this, "index", null, elementIndex));
// },
// cancelOnError: true);
// return future;
// }
// Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
// _StreamControllerBase<T> controller;
// // The following variables are set on listen.
// StreamSubscription<T> subscription;
// Timer timer;
// Zone zone;
// _TimerCallback timeout;
// void onData(T event) {
// timer.cancel();
// timer = zone.createTimer(timeLimit, timeout);
// // It might close the stream and cancel timer, so create recuring Timer
// // before calling into add();
// // issue: https://github.com/dart-lang/sdk/issues/37565
// controller.add(event);
// }
// void onError(error, StackTrace stackTrace) {
// timer.cancel();
// assert(controller is _StreamController ||
// controller is _BroadcastStreamController);
// controller._addError(error, stackTrace); // Avoid Zone error replacement.
// timer = zone.createTimer(timeLimit, timeout);
// }
// void onDone() {
// timer.cancel();
// controller.close();
// }
// void onListen() {
// // This is the onListen callback for of controller.
// // It runs in the same zone that the subscription was created in.
// // Use that zone for creating timers and running the onTimeout
// // callback.
// zone = Zone.current;
// if (onTimeout == null) {
// timeout = () {
// controller.addError(
// new TimeoutException("No stream event", timeLimit), null);
// };
// } else {
// // TODO(floitsch): the return type should be 'void', and the type
// // should be inferred.
// var registeredOnTimeout =
// zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
// var wrapper = new _ControllerEventSinkWrapper<T>(null);
// timeout = () {
// wrapper._sink = controller; // Only valid during call.
// zone.runUnaryGuarded(registeredOnTimeout, wrapper);
// wrapper._sink = null;
// };
// }
// subscription = this.listen(onData, onError: onError, onDone: onDone);
// timer = zone.createTimer(timeLimit, timeout);
// }
// Future onCancel() {
// timer.cancel();
// Future result = subscription.cancel();
// subscription = null;
// return result;
// }
// controller = isBroadcast
// ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
// : new _SyncStreamController<T>(onListen, () {
// // Don't null the timer, onCancel may call cancel again.
// timer.cancel();
// subscription.pause();
// }, () {
// subscription.resume();
// timer = zone.createTimer(timeLimit, timeout);
// }, onCancel);
// return controller.stream;
// }
// }
public abstract class StreamSubscription<T> {
public abstract Future cancel();
public abstract void onData(Action<T> handleData);
public abstract void onError(Action handleError);
public abstract void onDone(Action handleDone);
public abstract void pause(Future resumeSignal);
public abstract void resume();
public bool isPaused { get; }
public abstract Future<E> asFuture<E>(E futureValue);
}
public abstract class EventSink<T> : Sink<T> {
// public abstract void add(T evt);
public abstract void addError(object error, string stackTrace);
// void close();
}
// /** [Stream] wrapper that only exposes the [Stream] interface. */
// class StreamView<T> extends Stream<T> {
// final Stream<T> _stream;
// const StreamView(Stream<T> stream)
// : _stream = stream,
// super._internal();
// bool get isBroadcast => _stream.isBroadcast;
// Stream<T> asBroadcastStream(
// {void onListen(StreamSubscription<T> subscription),
// void onCancel(StreamSubscription<T> subscription)}) =>
// _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
// StreamSubscription<T> listen(void onData(T value),
// {Function onError, void onDone(), bool cancelOnError}) {
// return _stream.listen(onData,
// onError: onError, onDone: onDone, cancelOnError: cancelOnError);
// }
// }
public interface StreamConsumer<S> {
Future addStream(Stream<S> stream);
// cannot define function with same name
Future closeConsumer();
}
public abstract class StreamSink<S> : EventSink<S>, StreamConsumer<S> {
// Future close();
public Future done { get; }
public Future addStream(Stream<S> stream) {
throw new System.NotImplementedException();
}
public Future closeConsumer() {
throw new System.NotImplementedException();
}
}
public abstract class StreamTransformer<S, T> {
// c# does not support change constructor
public static StreamTransformer<S, T> create<S, T>(_async._SubscriptionTransformer<S, T> onListen) {
return new _StreamSubscriptionTransformer<S, T>(onListen);
}
public static StreamTransformer<S, T> fromHandlers(
{void handleData(S data, EventSink<T> sink),
void handleError(Object error, string stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
// @Since("2.1")
factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind) =
_StreamBindTransformer<S, T>;
public static StreamTransformer<TS, TT> castFrom<SS, ST, TS, TT>(
StreamTransformer<SS, ST> source) {
return new CastStreamTransformer<SS, ST, TS, TT>(source);
}
public abstract Stream<T> bind(Stream<S> stream);
public abstract StreamTransformer<RS, RT> cast<RS, RT>();
}
public abstract class StreamTransformerBase<S, T> : StreamTransformer<S, T> {
public StreamTransformerBase() {}
public override StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer<RS, RT>.castFrom<S, T, RS, RT>(this);
}
public abstract class StreamIterator<T> {
/** Create a [StreamIterator] on [stream]. */
public static StreamIterator<T> Create(Stream<T> stream)
// TODO(lrn): use redirecting factory constructor when type
// arguments are supported.
=>
new _StreamIterator<T>(stream);
public abstract Future<bool> moveNext();
T current { get; }
public abstract Future cancel();
}
internal class _ControllerEventSinkWrapper<T> : EventSink<T> {
EventSink<T> _sink;
_ControllerEventSinkWrapper(EventSink<T> _sink) {
this._sink = _sink;
}
public override void add(T data) {
_sink.add(data);
}
public override void addError(object error, string stackTrace) {
_sink.addError(error, stackTrace);
}
public override void close() {
_sink.close();
}
}
}

3
com.unity.uiwidgets/Runtime/async/stream.cs.meta


fileFormatVersion: 2
guid: b09d9a1e8bd34f36ba6ed51a870f4bef
timeCreated: 1628672859

652
com.unity.uiwidgets/Runtime/async/stream_impl.cs


using System;
using System.Diagnostics;
using Unity.UIWidgets.async;
using Unity.UIWidgets.foundation;
static partial class _stream {
public delegate void _DataHandler<T>(T value);
public delegate void _DoneHandler();
public static void _nullDataHandler<T>(T obj) {}
public static void _nullErrorHandler(Exception error) {
Zone.current.handleUncaughtError(error);
}
public static void _nullDoneHandler() {}
}
abstract class _DelayedEvent<T> {
/** Added as a linked list on the [StreamController]. */
internal _DelayedEvent<T> next;
/** Execute the delayed event on the [StreamController]. */
public abstract void perform(_EventDispatch<T> dispatch);
}
class _DelayedData<T> : _DelayedEvent<T> {
readonly T value;
internal _DelayedData(T value) {
this.value = value;
}
public override void perform(_EventDispatch<T> dispatch) {
dispatch._sendData(value);
}
}
/** A delayed error event. */
class _DelayedError : _DelayedEvent<object> {
readonly Exception error;
readonly string stackTrace;
internal _DelayedError(Exception error, string stackTrace) {
this.error = error;
this.stackTrace = stackTrace;
}
public override void perform(_EventDispatch<object> dispatch) {
dispatch._sendError(error, stackTrace);
}
}
class _DelayedDone : _DelayedEvent<object> {
internal _DelayedDone(){}
public override void perform(_EventDispatch<object> dispatch) {
dispatch._sendDone();
}
_DelayedEvent<object> next {
get { return null; }
set { throw new Exception("No events after a done.");}
}
}
interface _EventSink<T> {
void _add(T data);
void _addError(object error, string stackTrace);
void _close();
}
interface _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, string stackTrace);
void _sendDone();
}
class _BufferingStreamSubscription<T>
: StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
/** The `cancelOnError` flag from the `listen` call. */
const int _STATE_CANCEL_ON_ERROR = 1;
const int _STATE_CLOSED = 2;
const int _STATE_INPUT_PAUSED = 4;
const int _STATE_CANCELED = 8;
const int _STATE_WAIT_FOR_CANCEL = 16;
const int _STATE_IN_CALLBACK = 32;
const int _STATE_HAS_PENDING = 64;
const int _STATE_PAUSE_COUNT = 128;
//@pagma("vm:entry-point")
_stream._DataHandler<T> _onData;
Action<Exception, string> _onError;
_stream._DoneHandler _onDone;
readonly Zone _zone = Zone.current;
/** Bit vector based on state-constants above. */
int _state;
// TODO(floitsch): reuse another field
/** The future [_onCancel] may return. */
Future _cancelFuture;
_PendingEvents<T> _pending;
internal _BufferingStreamSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
_state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0);
this.onData(onData);
this.onError(onError);
this.onDone(onDone);
}
void _setPendingEvents(_PendingEvents<T> pendingEvents) {
D.assert(_pending == null);
if (pendingEvents == null) return;
_pending = pendingEvents;
if (!pendingEvents.isEmpty) {
_state |= _STATE_HAS_PENDING;
_pending.schedule(this);
}
}
// StreamSubscription interface.
void onData(Action<T> handleData) {
handleData ??= _stream._nullDataHandler;
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
_onData = d => _zone.registerUnaryCallback(data => {
handleData((T) data);
return default;
});
}
// Siyao: c# does not support convert action
void onError(Action<object, string> handleError) {
handleError ??= (input1, input2) =>_stream._nullErrorHandler(null);
_onError = (_, __)=> _zone
.registerBinaryCallback((in1, in2)=> {
handleError(in1, (string) in2);
return null;
});
}
void onDone(Action handleDone) {
handleDone ??= _stream._nullDoneHandler;
_onDone = ()=>_zone.registerCallback(()=> {
handleDone();
return null;
});
}
void pause(Future resumeSignal) {
if (_isCanceled) return;
bool wasPaused = _isPaused;
bool wasInputPaused = _isInputPaused;
// Increment pause count and mark input paused (if it isn't already).
_state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
if (resumeSignal != null) resumeSignal.whenComplete(resume);
if (!wasPaused && _pending != null) _pending.cancelSchedule();
if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
}
void resume() {
if (_isCanceled) return;
if (_isPaused) {
_decrementPauseCount();
if (!_isPaused) {
if (_hasPending && !_pending.isEmpty) {
// Input is still paused.
_pending.schedule(this);
} else {
D.assert(_mayResumeInput);
_state &= ~_STATE_INPUT_PAUSED;
if (!_inCallback) _guardCallback(_onResume);
}
}
}
}
Future cancel() {
// The user doesn't want to receive any further events. If there is an
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
if (!_isCanceled) {
_cancel();
}
return _cancelFuture ?? Future._nullFuture;
}
Future<E> asFuture<E>(E futureValue) {
_Future result = new _Future();
// Overwrite the onDone and onError handlers.
_onDone = ()=> {
result._complete(FutureOr.value(futureValue));
};
_onError = (error, stackTrace) => {
Future cancelFuture = cancel();
if (!Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() =>{
result._completeError(error);
});
} else {
result._completeError(error);
}
};
return result.to<E>();
}
// State management.
internal bool _isInputPaused{get => (_state & _STATE_INPUT_PAUSED) != 0;}
internal bool _isClosed{get => (_state & _STATE_CLOSED) != 0;}
internal bool _isCanceled{get => (_state & _STATE_CANCELED) != 0;}
internal bool _waitsForCancel{get => (_state & _STATE_WAIT_FOR_CANCEL) != 0;}
internal bool _inCallback{get => (_state & _STATE_IN_CALLBACK) != 0;}
internal bool _hasPending{get => (_state & _STATE_HAS_PENDING) != 0;}
internal bool _isPaused{get => _state >= _STATE_PAUSE_COUNT;}
internal bool _canFire{get => _state < _STATE_IN_CALLBACK;}
internal bool _mayResumeInput {
get =>
!_isPaused && (_pending == null || _pending.isEmpty);
}
internal bool _cancelOnError{get => (_state & _STATE_CANCEL_ON_ERROR) != 0;}
internal bool isPaused{get => _isPaused;}
void _cancel() {
_state |= _STATE_CANCELED;
if (_hasPending) {
_pending.cancelSchedule();
}
if (!_inCallback) _pending = null;
_cancelFuture = _onCancel();
}
void _decrementPauseCount() {
D.assert(_isPaused);
_state -= _STATE_PAUSE_COUNT;
}
// _EventSink interface.
public void _add(T data) {
D.assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
_sendData(data);
} else {
_addPending(new _DelayedData<object>(data));
}
}
public void _addError(object error, string stackTrace) {
if (_isCanceled) return;
if (_canFire) {
_sendError(error, stackTrace); // Reports cancel after sending.
} else {
_addPending(new _DelayedError((Exception)error, stackTrace));
}
}
public void _close() {
D.assert(!_isClosed);
if (_isCanceled) return;
_state |= _STATE_CLOSED;
if (_canFire) {
_sendDone();
} else {
_addPending(new _DelayedDone());
}
}
// Hooks called when the input is paused, unpaused or canceled.
// These must not throw. If overwritten to call user code, include suitable
// try/catch wrapping and send any errors to
// [_Zone.current.handleUncaughtError].
void _onPause() {
D.assert(_isInputPaused);
}
void _onResume() {
D.assert(!_isInputPaused);
}
Future _onCancel() {
D.assert(_isCanceled);
return null;
}
// Handle pending events.
void _addPending(_DelayedEvent<object> evt) {
_StreamImplEvents<T> pending = _pending as _StreamImplEvents<T>;
if (_pending == null) {
pending = (_StreamImplEvents<T>) (_pending = new _StreamImplEvents<T>());
}
pending.add(evt);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
_pending.schedule(this);
}
}
}
public void _sendData(T data) {
D.assert(!_isCanceled);
D.assert(!_isPaused);
D.assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(data=> {
_onData((T) data);
return null;
}, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
public void _sendError(object error, string stackTrace) {
D.assert(!_isCanceled);
D.assert(!_isPaused);
D.assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
void sendError() {
// If the subscription has been canceled while waiting for the cancel
// future to finish we must not report the error.
if (_isCanceled && !_waitsForCancel) return;
_state |= _STATE_IN_CALLBACK;
// TODO(floitsch): this dynamic should be 'void'.
var onError = _onError;
if (onError != null) {
_zone.runBinaryGuarded((error, stack)=> {
onError((Exception) error, (string) stack);
return null;
}, error, stackTrace);
} else {
// Siyao: c# could not cast Action
D.assert(_onError != null);
// _zone.runUnaryGuarded(error => _onError, error);
}
_state &= ~_STATE_IN_CALLBACK;
}
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
if (_cancelFuture != null &&
!Equals(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
}
} else {
sendError();
// Only check state if not cancelOnError.
_checkState(wasInputPaused);
}
}
public void _sendDone() {
D.assert(!_isCanceled);
D.assert(!_isPaused);
D.assert(!_inCallback);
void sendDone() {
// If the subscription has been canceled while waiting for the cancel
// future to finish we must not report the done event.
if (!_waitsForCancel) return;
_state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
_zone.runGuarded(()=> {
_onDone();
return null;
});
_state &= ~_STATE_IN_CALLBACK;
}
_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
if (_cancelFuture != null &&
!Equals(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
}
}
void _guardCallback(Action callback) {
D.assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
callback();
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
void _checkState(bool wasInputPaused) {
D.assert(!_inCallback);
if (_hasPending && _pending.isEmpty) {
_state &= ~_STATE_HAS_PENDING;
if (_isInputPaused && _mayResumeInput) {
_state &= ~_STATE_INPUT_PAUSED;
}
}
// If the state changes during a callback, we immediately
// make a new state-change callback. Loop until the state didn't change.
while (true) {
if (_isCanceled) {
_pending = null;
return;
}
bool isInputPaused = _isInputPaused;
if (wasInputPaused == isInputPaused) break;
_state ^= _STATE_IN_CALLBACK;
if (isInputPaused) {
_onPause();
} else {
_onResume();
}
_state &= ~_STATE_IN_CALLBACK;
wasInputPaused = isInputPaused;
}
if (_hasPending && !_isPaused) {
_pending.schedule(this);
}
}
}
abstract class _PendingEvents<T> {
// No async event has been scheduled.
const int _STATE_UNSCHEDULED = 0;
// An async event has been scheduled to run a function.
const int _STATE_SCHEDULED = 1;
// An async event has been scheduled, but it will do nothing when it runs.
// Async events can't be preempted.
const int _STATE_CANCELED = 3;
/**
* State of being scheduled.
*
* Set to [_STATE_SCHEDULED] when pending events are scheduled for
* async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
* scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
* which will make the async code do nothing except resetting [_state].
*
* If events are scheduled while the state is [_STATE_CANCELED], it is
* merely switched back to [_STATE_SCHEDULED], but no new call to
* [scheduleMicrotask] is performed.
*/
int _state = _STATE_UNSCHEDULED;
public bool isEmpty{get;}
public bool isScheduled{get => _state == _STATE_SCHEDULED;}
public bool _eventScheduled{get => _state >= _STATE_SCHEDULED;}
/**
* Schedule an event to run later.
*
* If called more than once, it should be called with the same dispatch as
* argument each time. It may reuse an earlier argument in some cases.
*/
public void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;
D.assert(!isEmpty);
if (_eventScheduled) {
D.assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
return;
}
async_.scheduleMicrotask(() => {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return null;
handleNext(dispatch);
return null;
});
_state = _STATE_SCHEDULED;
}
public void cancelSchedule() {
if (isScheduled) _state = _STATE_CANCELED;
}
public abstract void handleNext(_EventDispatch<T> dispatch);
/** Throw away any pending events and cancel scheduled events. */
public abstract void clear();
}
class _StreamImplEvents<T> : _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent<object> firstPendingEvent;
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent<object> lastPendingEvent;
bool isEmpty {
get { return lastPendingEvent == null; }
}
internal void add(_DelayedEvent<object> evt) {
if (lastPendingEvent == null) {
firstPendingEvent = lastPendingEvent = evt;
} else {
lastPendingEvent = lastPendingEvent.next = evt;
}
}
public override void handleNext(_EventDispatch<T> dispatch) {
D.assert(!isScheduled);
_DelayedEvent<object> evt = firstPendingEvent;
firstPendingEvent = evt.next;
if (firstPendingEvent == null) {
lastPendingEvent = null;
}
evt.perform((_EventDispatch<object>) dispatch);
}
public override void clear() {
if (isScheduled) cancelSchedule();
firstPendingEvent = lastPendingEvent = null;
}
}
internal class _StreamIterator<T> : StreamIterator<T> {
// The stream iterator is always in one of four states.
// The value of the [_stateData] field depends on the state.
//
// When `_subscription == null` and `_stateData != null`:
// The stream iterator has been created, but [moveNext] has not been called
// yet. The [_stateData] field contains the stream to listen to on the first
// call to [moveNext] and [current] returns `null`.
//
// When `_subscription != null` and `!_isPaused`:
// The user has called [moveNext] and the iterator is waiting for the next
// event. The [_stateData] field contains the [_Future] returned by the
// [_moveNext] call and [current] returns `null.`
//
// When `_subscription != null` and `_isPaused`:
// The most recent call to [moveNext] has completed with a `true` value
// and [current] provides the value of the data event.
// The [_stateData] field contains the [current] value.
//
// When `_subscription == null` and `_stateData == null`:
// The stream has completed or been canceled using [cancel].
// The stream completes on either a done event or an error event.
// The last call to [moveNext] has completed with `false` and [current]
// returns `null`.
StreamSubscription<object> _subscription;
//@pragma("vm:entry-point")
object _stateData;
bool _isPaused = false;
internal _StreamIterator(Stream<T> stream) {
if (stream != null) {
_stateData = stream;
}
else {
throw new ArgumentException("not null","stream");
}
// _stateData = stream ?? (throw ArgumentError.notNull("stream"));
}
object current {
get {
if (_subscription != null && _isPaused) {
return _stateData;
}
return default;
}
}
public override Future<bool> moveNext() {
if (_subscription != null) {
if (_isPaused) {
var future = new _Future();
_stateData = future;
_isPaused = false;
_subscription.resume();
return future.to<bool>();
}
throw new Exception("Already waiting for next.");
}
return _initializeOrDone();
}
Future<bool> _initializeOrDone() {
D.assert(_subscription == null);
var stateData = _stateData;
if (stateData != null) {
Stream<T> stream = (Stream<T>) stateData;
_subscription = stream.listen(_onData,
onError: _onError, onDone: _onDone, cancelOnError: true);
var future = new _Future();
_stateData = future;
return future.to<bool>();
}
return Future._falseFuture.to<bool>();
}
public override Future cancel() {
StreamSubscription<object> subscription = _subscription;
object stateData = _stateData;
_stateData = null;
if (subscription != null) {
_subscription = null;
if (!_isPaused) {
_Future future = (_Future) stateData;
future._asyncComplete(false);
}
return subscription.cancel();
}
return Future._nullFuture;
}
void _onData(T data) {
D.assert(_subscription != null && !_isPaused);
_Future moveNextFuture = (_Future) _stateData;
_stateData = data;
_isPaused = true;
moveNextFuture._complete(true);
if (_subscription != null && _isPaused) _subscription.pause();
}
void _onError(object error, string stackTrace) {
D.assert(_subscription != null && !_isPaused);
_Future moveNextFuture = (_Future) _stateData;
_subscription = null;
_stateData = null;
moveNextFuture._completeError((Exception) error);
}
void _onDone() {
D.assert(_subscription != null && !_isPaused);
_Future moveNextFuture = (_Future) _stateData;
_subscription = null;
_stateData = null;
moveNextFuture._complete(false);
}
}

3
com.unity.uiwidgets/Runtime/async/stream_impl.cs.meta


fileFormatVersion: 2
guid: 8da42c1ef952401abf68f552618101a4
timeCreated: 1628681636

301
com.unity.uiwidgets/Runtime/async/stream_transformers.cs


using System;
using Unity.UIWidgets.async;
class _EventSinkWrapper<T> : EventSink<T> {
_EventSink<object> _sink;
internal _EventSinkWrapper(_EventSink<object> _sink) {
this._sink = _sink;
}
public override void add(T data) {
_sink._add(data);
}
public override void addError(object error, string stackTrace) {
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
}
public override void close() {
_sink._close();
}
}
class _SinkTransformerStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
/// The transformer's input sink.
EventSink<S> _transformerSink;
/// The subscription to the input stream.
StreamSubscription<S> _subscription;
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError)
// We set the adapter's target only when the user is allowed to send data.
: base(onData, onError, onDone, cancelOnError) {
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>((_EventSink<object>) this);
_transformerSink = mapper(eventSink);
_subscription =
source.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
/** Whether this subscription is still subscribed to its source. */
bool _isSubscribed {
get { return _subscription != null; }
}
// _EventSink interface.
void _add(T data) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._add(data);
}
void _addError(Object error, string stackTrace) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._addError(error, stackTrace);
}
void _close() {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._close();
}
// _BufferingStreamSubscription hooks.
void _onPause() {
if (_isSubscribed) _subscription.pause();
}
void _onResume() {
if (_isSubscribed) _subscription.resume();
}
Future _onCancel() {
if (_isSubscribed) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
}
return null;
}
void _handleData(S data) {
try {
_transformerSink.add(data);
}
catch (Exception e) {
_addError(e, e.StackTrace);
}
}
void _handleError(object error, string stackTrace) {
try {
_transformerSink.addError(error, stackTrace);
}
catch (Exception e) {
if (Equals(e, error)) {
_addError(error, stackTrace);
}
else {
_addError(e, e.StackTrace);
}
}
}
void _handleDone() {
try {
_subscription = null;
_transformerSink.close();
}
catch (Exception e) {
_addError(e, e.StackTrace);
}
}
}
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) {
this._sinkMapper = _sinkMapper;
}
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSinkStream<S, T>(stream, _sinkMapper);
}
class _BoundSinkStream<S, T> : Stream<T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
readonly Stream<S> _stream;
bool isBroadcast {
get { return _stream.isBroadcast; }
}
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) {
this._stream = _stream;
this._sinkMapper = _sinkMapper;
}
StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) {
StreamSubscription<T> subscription =
new _SinkTransformerStreamSubscription<S, T>(
_stream, _sinkMapper, onData, onError, onDone, cancelOnError);
return subscription;
}
}
static partial class _stream {
internal delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
/// Error-handler coming from [StreamTransformer.fromHandlers].
internal delegate void _TransformErrorHandler<T>(
object error, string stackTrace, EventSink<T> sink);
/// Done-handler coming from [StreamTransformer.fromHandlers].
internal delegate void _TransformDoneHandler<T>(EventSink<T> sink);
}
class _HandlerEventSink<S, T> : EventSink<S> where T : class {
readonly _stream._TransformDataHandler<S, T> _handleData;
readonly _stream._TransformErrorHandler<T> _handleError;
readonly _stream._TransformDoneHandler<T> _handleDone;
/// The output sink where the handlers should send their data into.
EventSink<T> _sink;
internal _HandlerEventSink(
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError,
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) {
this._handleData = _handleData;
this._handleError = _handleError;
this._handleDone = _handleDone;
this._sink = _sink;
if (_sink == null) {
throw new Exception("The provided sink must not be null.");
}
}
bool _isClosed {
get { return _sink == null; }
}
public override void add(S data) {
if (_isClosed) {
throw new Exception("Sink is closed");
}
if (_handleData != null) {
_handleData(data, _sink);
}
else {
_sink.add(data as T);
}
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (_isClosed) {
throw new Exception("Sink is closed");
}
if (_handleError != null) {
stackTrace ??= AsyncError.defaultStackTrace(error);
_handleError(error, stackTrace, _sink);
}
else {
_sink.addError(error, stackTrace);
}
}
public override void close() {
if (_isClosed) return;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(sink);
}
else {
sink.close();
}
}
}
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> where T : class {
_StreamHandlerTransformer(
_stream._TransformDataHandler<S, T> handleData = null,
_stream._TransformErrorHandler<T> handleError = null,
_stream._TransformDoneHandler<T> handleDone = null)
: base((EventSink<T> outputSink) => {
return new _HandlerEventSink<S, T>(
handleData, handleError, handleDone, outputSink);
}) {
}
Stream<T> bind(Stream<S> stream) {
return base.bind(stream);
}
}
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> {
readonly Func<Stream<S>, Stream<T>> _bind;
_StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) {
this._bind = _bind;
}
public override Stream<T> bind(Stream<S> stream) => _bind(stream);
}
public partial class _async {
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError);
}
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SubscriptionTransformer<S, T> _onListen;
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) {
this._onListen = _onListen;
}
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSubscriptionStream<S, T>(stream, _onListen);
}
class _BoundSubscriptionStream<S, T> : Stream<T> {
readonly _async._SubscriptionTransformer<S, T> _onListen;
readonly Stream<S> _stream;
bool isBroadcast {
get { return _stream.isBroadcast; }
}
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
this._stream = _stream;
this._onListen = _onListen;
}
StreamSubscription<T> listen(Action<T> onData,
Action onError = null, Action onDone = null, bool cancelOnError = false) {
//cancelOnError = cancelOnError;
StreamSubscription<T> result = _onListen(_stream, cancelOnError);
result.onData(onData);
result.onError(onError);
result.onDone(onDone);
return result;
}
}

3
com.unity.uiwidgets/Runtime/async/stream_transformers.cs.meta


fileFormatVersion: 2
guid: 4be593ce960f459482dbeb617dfcb4e0
timeCreated: 1628682407

353
com.unity.uiwidgets/Runtime/widgets/async.cs


// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// import 'dart:async' show Future, Stream, StreamSubscription;
//
// import 'package:flutter/foundation.dart';
//
// import 'framework.dart';
// Examples can assume:
// dynamic _lot;
// Future<String> _calculation;
using System;
using System.Collections.Generic;
using Unity.UIWidgets.async;
using Unity.UIWidgets.foundation;
using Unity.UIWidgets.widgets;
public class Stream<T> {
}
public class StreamSubscription<T> {
}
public abstract class StreamBuilderBase<T, S> : StatefulWidget {
public StreamBuilderBase(Key key = null, Stream<T> stream = null) : base(key: key) {
this.stream = stream;
}
public readonly Stream<T> stream;
public abstract S initial();
public virtual S afterConnected(S current) => current;
public abstract S afterData(S current, T data);
public virtual S afterError(S current, object error) => current;
public virtual S afterDone(S current) => current;
public virtual S afterDisconnected(S current) => current;
public abstract Widget build(BuildContext context, S currentSummary);
public override State createState() => new _StreamBuilderBaseState<T, S>();
}
class _StreamBuilderBaseState<T, S> : State<StreamBuilderBase<T, S>> {
StreamSubscription<T> _subscription;
S _summary;
public override void initState() {
base.initState();
_summary = widget.initial();
_subscribe();
}
public override void didUpdateWidget(StatefulWidget statefulWidget) {
StreamBuilderBase<T, S> oldWidget = statefulWidget as StreamBuilderBase<T, S>;
if (oldWidget == null) {
return;
}
base.didUpdateWidget(statefulWidget);
if (oldWidget != null) {
if (oldWidget.stream != widget.stream) {
if (_subscription != null) {
_unsubscribe();
_summary = widget.afterDisconnected(_summary);
}
_subscribe();
}
}
}
public override Widget build(BuildContext context) => widget.build(context, _summary);
public override void dispose() {
_unsubscribe();
base.dispose();
}
void _subscribe() {
if (widget.stream != null) {
_subscription = widget.stream.listen(
(T data) => { setState(() => { _summary = widget.afterData(_summary, data); }); },
onError: (object error) => { setState(() => { _summary = widget.afterError(_summary, error); }); },
onDone: () => { setState(() => { _summary = widget.afterDone(_summary); }); });
_summary = widget.afterConnected(_summary);
}
}
void _unsubscribe() {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
}
}
}
public enum ConnectionState {
none,
waiting,
active,
done,
}
//@immutable
public class AsyncSnapshot<T> : IEquatable<AsyncSnapshot<T>> {
AsyncSnapshot(ConnectionState connectionState, object data, object error) {
D.assert(connectionState != null);
D.assert(!(data != null && error != null));
this.connectionState = connectionState;
this.data = (T) data;
this.error = error;
}
public static AsyncSnapshot<object> nothing() {
return new AsyncSnapshot<object>(ConnectionState.none, null, null);
}
public static AsyncSnapshot<T> withData(ConnectionState state, T data) {
return new AsyncSnapshot<T>(state, data, null);
}
public static AsyncSnapshot<T> withError(ConnectionState state, object error) {
return new AsyncSnapshot<T>(state, null, error);
}
public readonly ConnectionState connectionState;
public readonly T data;
public T requireData {
get {
if (hasData)
return data;
if (hasError)
//TODO: not sure if cast works
throw (Exception) error;
throw new Exception("Snapshot has neither data nor error");
}
}
public readonly object error;
public AsyncSnapshot<T> inState(ConnectionState state) {
return new AsyncSnapshot<T>(state, data, error);
}
public bool hasData {
get => data != null;
}
public bool hasError {
get => error != null;
}
public override string ToString() =>
$"{foundation_.objectRuntimeType(this, "AsyncSnapshot")}({connectionState}, {data}, {error})";
public static bool operator ==(AsyncSnapshot<T> left, AsyncSnapshot<T> right) {
return Equals(left, right);
}
public static bool operator !=(AsyncSnapshot<T> left, AsyncSnapshot<T> right) {
return !Equals(left, right);
}
public bool Equals(AsyncSnapshot<T> other) {
if (ReferenceEquals(null, other)) {
return false;
}
if (ReferenceEquals(this, other)) {
return true;
}
return connectionState == other.connectionState && EqualityComparer<T>.Default.Equals(data, other.data) &&
Equals(error, other.error);
}
public override bool Equals(object obj) {
if (ReferenceEquals(null, obj)) {
return false;
}
if (ReferenceEquals(this, obj)) {
return true;
}
if (obj.GetType() != GetType()) {
return false;
}
return Equals((AsyncSnapshot<T>) obj);
}
public override int GetHashCode() {
unchecked {
var hashCode = (int) connectionState;
hashCode = (hashCode * 397) ^ EqualityComparer<T>.Default.GetHashCode(data);
hashCode = (hashCode * 397) ^ (error != null ? error.GetHashCode() : 0);
return hashCode;
}
}
}
public static partial class _async {
public delegate Widget AsyncWidgetBuilder<T>(BuildContext context, AsyncSnapshot<T> snapshot);
}
// TODO(ianh): remove unreachable code above once https://github.com/dart-lang/linter/issues/1139 is fixed
public class StreamBuilder<T> : StreamBuilderBase<T, AsyncSnapshot<T>> {
public StreamBuilder(
_async.AsyncWidgetBuilder<T> builder,
Key key = null,
T initialData = default,
Stream<T> stream = null
) : base(key: key, stream: stream) {
D.assert(builder != null);
this.builder = builder;
this.initialData = initialData;
}
public readonly _async.AsyncWidgetBuilder<T> builder;
public readonly T initialData;
public override
AsyncSnapshot<T> initial() => global::AsyncSnapshot<T>.withData(ConnectionState.none, initialData);
public override
AsyncSnapshot<T> afterConnected(AsyncSnapshot<T> current) => current.inState(ConnectionState.waiting);
public override
AsyncSnapshot<T> afterData(AsyncSnapshot<T> current, T data) {
return global::AsyncSnapshot<T>.withData(ConnectionState.active, data);
}
public override
AsyncSnapshot<T> afterError(AsyncSnapshot<T> current, object error) {
return AsyncSnapshot<T>.withError(ConnectionState.active, error);
}
public override
AsyncSnapshot<T> afterDone(AsyncSnapshot<T> current) => current.inState(ConnectionState.done);
public override
AsyncSnapshot<T> afterDisconnected(AsyncSnapshot<T> current) => current.inState(ConnectionState.none);
public override
Widget build(BuildContext context, AsyncSnapshot<T> currentSummary) => builder(context, currentSummary);
}
// TODO(ianh): remove unreachable code above once https://github.com/dart-lang/linter/issues/1141 is fixed
class FutureBuilder<T> : StatefulWidget {
public FutureBuilder(
_async.AsyncWidgetBuilder<T> builder,
Key key = null,
Future<T> future = null,
T initialData = default
) :
base(key: key) {
D.assert(builder != null);
this.builder = builder;
this.future = future;
this.initialData = initialData;
}
public readonly Future<T> future;
public readonly _async.AsyncWidgetBuilder<T> builder;
public readonly T initialData;
public override
State createState() => new _FutureBuilderState<T>();
}
class _FutureBuilderState<T> : State<FutureBuilder<T>> {
object _activeCallbackIdentity;
AsyncSnapshot<T> _snapshot;
public override
void initState() {
base.initState();
_snapshot = AsyncSnapshot<T>.withData(ConnectionState.none, widget.initialData);
_subscribe();
}
public override
void didUpdateWidget(StatefulWidget statefulWidget) {
var oldWidget = statefulWidget as FutureBuilder<T>;
if (oldWidget == null) {
return;
}
base.didUpdateWidget(oldWidget);
if (oldWidget.future != widget.future) {
if (_activeCallbackIdentity != null) {
_unsubscribe();
_snapshot = _snapshot.inState(ConnectionState.none);
}
_subscribe();
}
}
public override
Widget build(BuildContext context) => widget.builder(context, _snapshot);
public override
void dispose() {
_unsubscribe();
base.dispose();
}
void _subscribe() {
if (widget.future != null) {
object callbackIdentity = new object();
_activeCallbackIdentity = callbackIdentity;
widget.future.then((object dataIn) => {
var data = (T) dataIn;
if (_activeCallbackIdentity == callbackIdentity) {
setState(() => { _snapshot = AsyncSnapshot<T>.withData(ConnectionState.done, data); });
}
}, onError: (Exception error) => {
if (_activeCallbackIdentity == callbackIdentity) {
setState(() => { _snapshot = AsyncSnapshot<T>.withError(ConnectionState.done, error); });
}
return FutureOr.nil;
});
_snapshot = _snapshot.inState(ConnectionState.waiting);
}
}
void _unsubscribe() {
_activeCallbackIdentity = null;
}
}

3
com.unity.uiwidgets/Runtime/widgets/async.cs.meta


fileFormatVersion: 2
guid: 459525b5a4954fd3b46b3462cc408fbd
timeCreated: 1628671862
正在加载...
取消
保存