浏览代码

stream pip, stream

/main
siyao 3 年前
当前提交
9a90b11c
共有 5 个文件被更改,包括 781 次插入652 次删除
  1. 360
      com.unity.uiwidgets/Runtime/async/stream.cs
  2. 998
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  3. 2
      com.unity.uiwidgets/Runtime/async/stream_impl.cs
  4. 70
      com.unity.uiwidgets/Runtime/async/stream_pipe.cs
  5. 3
      com.unity.uiwidgets/Runtime/async/stream_pipe.cs.meta

360
com.unity.uiwidgets/Runtime/async/stream.cs


using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using Unity.UIWidgets.async;
using Unity.UIWidgets.core;
using Unity.UIWidgets.foundation;

}
public Stream<E> asyncMap<E>(Func<T,FutureOr> convert) {
_StreamControllerBase<E> controller;
StreamSubscription<T> subscription;
_StreamControllerBase<E> controller = null;
StreamSubscription<T> subscription = null;
void onListen() {
var add = new Action<E>(controller.add);

subscription = this.listen((T evt) => {
subscription = listen((T evt) => {
FutureOr newValue;
try {
newValue = convert(evt);

}
if (newValue is Future<E>) {
if (newValue is Future<E> newFuture) {
newValue
.then(add, onError: addError)
newFuture
.then(d =>add((E) d), onError: (e)=> {
addError(e, e.StackTrace);
return FutureOr.nil;
})
controller.add(newValue);
// Siyao: This works as if this is csharpt
controller.add((E) newValue.v);
}, onError: addError, onDone: controller.close);
}, onError: addError, onDone: ()=>controller.close());
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
onListen: onListen,
onCancel: () {
subscription.cancel();
if (isBroadcast) {
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast(
onListen: ()=> onListen(),
onCancel: () => {
subscription.cancel();
controller = new StreamController<E>(
controller = (_StreamControllerBase<E>) StreamController<E>.create(
onPause: () {
subscription.pause();
onPause: () => {
subscription.pause();
onResume: () {
subscription.resume();
onResume: () => {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);

Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
_StreamControllerBase<E> controller;
StreamSubscription<T> subscription;
Stream<E> asyncExpand<E>(Func<T,Stream<E>> convert) {
_StreamControllerBase<E> controller = null;
StreamSubscription<T> subscription = null;
D.assert(controller is _StreamController ||
controller is _BroadcastStreamController);
subscription = this.listen((T event) {
D.assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController<E>);
subscription = listen((T evt) => {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
newStream = convert(evt);
} catch (Exception e) {
controller.addError(e, e.StackTrace);
return;
}
if (newStream != null) {

},
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close);
onDone: ()=>controller.close());
if (this.isBroadcast) {
controller = new StreamController<E>.broadcast(
onListen: onListen,
onCancel: () {
subscription.cancel();
if (isBroadcast) {
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast(
onListen:()=>onListen(),
onCancel: () =>{
subscription.cancel();
controller = new StreamController<E>(
onListen: onListen,
onPause: () {
subscription.pause();
controller = (_StreamControllerBase<E>) StreamController<E>.create(
onListen:()=> onListen(),
onPause: () =>{
subscription.pause();
onResume: () {
subscription.resume();
onResume: () =>{
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);

}
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_) => streamConsumer.close());
return streamConsumer.addStream(this).then((_) => streamConsumer.close(), (_)=>FutureOr.nil);
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) where S : class {
Future<T> reduce(T combine(T previous, T element)) {
_Future<T> result = new _Future<T>();
Future<T> reduce(Func<T,T,T> combine) {
_Future result = new _Future();
T value;
StreamSubscription subscription;
subscription = this.listen(
(T element) {
T value = default;
StreamSubscription<T> subscription = null;
subscription = listen(
(T element)=> {
_runUserCode(() => combine(value, element), (T newValue) {
_stream._runUserCode(() => combine(value, element), (T newValue) =>{
}, _cancelAndErrorClosure(subscription, result));
},onError: (e) => _stream._cancelAndErrorClosure(subscription, result)(e));
onError: result._completeError,
onDone: () {
onError: (e, s)=>result._completeError((Exception) e),
onDone: () => {
throw IterableElementError.noElement();
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
throw new Exception("IterableElementError.noElement()");
} catch (Exception e) {
async_._completeWithErrorCallback(result, e);
result._complete(value);
// TODO: need check
result._complete(FutureOr.value(value));
return result;
return result.to<T>();
Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
_Future<S> result = new _Future<S>();
Future<S> fold<S>(S initialValue, Func<S,T,S> combine) {
_Future result = new _Future();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
_runUserCode(() => combine(value, element), (S newValue) {
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => combine(value, element), (S newValue) => {
}, _cancelAndErrorClosure(subscription, result));
},e=> _stream._cancelAndErrorClosure(subscription, result)(e));
onError: result._completeError,
onDone: () {
result._complete(value);
onError: (e, s)=>result._completeError((Exception) e),
onDone: () =>{
result._complete(FutureOr.value(value));
return result;
return result.to<S>();
Future<String> join([String separator = ""]) {
_Future<String> result = new _Future<String>();
StringBuffer buffer = new StringBuffer();
StreamSubscription subscription;
Future<string> join(string separator = "") {
_Future result = new _Future();
StringBuilder buffer = new StringBuilder();
StreamSubscription<T> subscription = null;
subscription = this.listen(
(T element) {
subscription = listen(
(T element) => {
buffer.write(separator);
buffer.Append(separator);
buffer.write(element);
} catch (e, s) {
_cancelAndErrorWithReplacement(subscription, result, e, s);
buffer.Append(element);
} catch (Exception e) {
_stream._cancelAndErrorWithReplacement(subscription, result, e);
onError: result._completeError,
onDone: () {
result._complete(buffer.toString());
onError: (e,_)=>result._completeError((Exception) e),
onDone: () => {
result._complete(buffer.ToString());
return result;
return result.to<string>();
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
_runUserCode(() => (element == needle), (bool isMatch) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => (Equals(element, needle)), (bool isMatch) => {
_cancelAndValue(subscription, future, true);
_stream._cancelAndValue(subscription, future, true);
}, _cancelAndErrorClosure(subscription, future));
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
return future;
return future.to<bool>();
Future forEach(void action(T element)) {
Future forEach(Action<T> action) {
StreamSubscription subscription;
subscription = this.listen(
(T element) {
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_runUserCode<dynamic>(() => action(element), (_) {},
_cancelAndErrorClosure(subscription, future));
_stream._runUserCode<object>(() => {
action(element);
return default;
}, (_) =>{},
(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
future._complete(null);
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(FutureOr.nil);
Future<bool> every(bool test(T element)) {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
_runUserCode(() => test(element), (bool isMatch) {
Future<bool> every(Func<T,bool> test) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) =>{
_stream._runUserCode(() => test(element), (bool isMatch) =>{
_cancelAndValue(subscription, future, false);
_stream._cancelAndValue(subscription, future, false);
}, _cancelAndErrorClosure(subscription, future));
},ex => _stream._cancelAndErrorClosure(subscription, future)(ex));
onError: future._completeError,
onDone: () {
onError: (ex, s)=> future._completeError((Exception) ex),
onDone: () => {
return future;
return future.to<bool>();
Future<bool> any(bool test(T element)) {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
_runUserCode(() => test(element), (bool isMatch) {
Future<bool> any(Func<T, bool> test) {
_Future future = new _Future();
StreamSubscription<T> subscription;
subscription = listen(
(T element)=> {
_stream._runUserCode(() => test(element), (bool isMatch) =>{
_cancelAndValue(subscription, future, true);
_stream._cancelAndValue(subscription, future, true);
}, _cancelAndErrorClosure(subscription, future));
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(false);
},
cancelOnError: true);

Future<int> get length {
_Future<int> future = new _Future<int>();
int count = 0;
this.listen(
listen(
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(count);
},
cancelOnError: true);

Future<bool> get isEmpty {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
_cancelAndValue(subscription, future, false);
_stream._cancelAndValue(subscription, future, false);
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(true);
},
cancelOnError: true);

Future<List<T>> toList() {
List<T> result = <T>[];
_Future<List<T>> future = new _Future<List<T>>();
this.listen(
listen(
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(result);
},
cancelOnError: true);

Future<Set<T>> toSet() {
Set<T> result = new Set<T>();
_Future<Set<T>> future = new _Future<Set<T>>();
this.listen(
listen(
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._complete(result);
},
cancelOnError: true);

Future<T> get first {
_Future<T> future = new _Future<T>();
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
_cancelAndValue(subscription, future, value);
_stream._cancelAndValue(subscription, future, value);
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
try {
throw IterableElementError.noElement();
} catch (e, s) {

foundResult = true;
result = value;
},
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
if (foundResult) {
future._complete(result);
return;

_Future<T> future = new _Future<T>();
T result;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) {
if (foundResult) {
// This is the second element we get.

foundResult = true;
result = value;
},
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
if (foundResult) {
future._complete(result);
return;

Future<T> firstWhere(bool test(T element), {T orElse()}) {
_Future<T> future = new _Future();
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
_runUserCode(() => test(value), (bool isMatch) {
_stream._runUserCode(() => test(value), (bool isMatch) => {
_cancelAndValue(subscription, future, value);
_stream._cancelAndValue(subscription, future, value);
}, _cancelAndErrorClosure(subscription, future));
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
_runUserCode(orElse, future._complete, future._completeError);
_stream._runUserCode(orElse, future._complete, future._completeError);
return;
}
try {

_Future<T> future = new _Future();
T result;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
_runUserCode(() => true == test(value), (bool isMatch) {
_stream._runUserCode(() => true == test(value), (bool isMatch) => {
}, _cancelAndErrorClosure(subscription, future));
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
_runUserCode(orElse, future._complete, future._completeError);
_stream._runUserCode(orElse, future._complete, future._completeError);
return;
}
try {

_Future<T> future = new _Future<T>();
T result;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
StreamSubscription<T> subscription = null;
subscription = listen(
_runUserCode(() => true == test(value), (bool isMatch) {
_stream._runUserCode(() => true == test(value), (bool isMatch) => {
if (isMatch) {
if (foundResult) {
try {

foundResult = true;
result = value;
}
}, _cancelAndErrorClosure(subscription, future));
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e));
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
if (foundResult) {
future._complete(result);
return;

_runUserCode(orElse, future._complete, future._completeError);
_stream._runUserCode(orElse, future._complete, future._completeError);
return;
}
throw IterableElementError.noElement();

ArgumentError.checkNotNull(index, "index");
RangeError.checkNotNegative(index, "index");
_Future<T> future = new _Future<T>();
StreamSubscription subscription;
StreamSubscription<T> subscription = null;
subscription = this.listen(
subscription = listen(
_cancelAndValue(subscription, future, value);
_stream._cancelAndValue(subscription, future, value);
onError: future._completeError,
onDone: () {
onError: (e,_)=>future._completeError((Exception)e),
onDone: () => {
future._completeError(
new RangeError.index(index, this, "index", null, elementIndex));
},

};
}
subscription = this.listen(onData, onError: onError, onDone: onDone);
subscription = listen(onData, onError: onError, onDone: onDone);
timer = zone.createTimer(timeLimit, timeout);
}

public interface StreamConsumer<S> {
Future addStream(Stream<S> stream);
// cannot define function with same name
// Future closeConsumer();
Future close();
}
public abstract class StreamSink<S> : EventSink<S>, StreamConsumer<S> {

998
com.unity.uiwidgets/Runtime/async/stream_controller.cs
文件差异内容过多而无法显示
查看文件

2
com.unity.uiwidgets/Runtime/async/stream_impl.cs


}
}
void _guardCallback(Action callback) {
internal void _guardCallback(Action callback) {
D.assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;

70
com.unity.uiwidgets/Runtime/async/stream_pipe.cs


using System;
using System.Diagnostics;
namespace Unity.UIWidgets.async {
public static partial class _stream {
/** Runs user code and takes actions depending on success or failure. */
internal static void _runUserCode<T>(
Func<T> userCode, Action<T> onSuccess, Action<Exception> onError){
try {
onSuccess(userCode());
} catch (Exception e) {
AsyncError replacement = Zone.current.errorCallback(e);
if (replacement == null) {
onError(e);
} else {
var error = async_._nonNullError(replacement);
onError(error);
}
}
}
internal static void _cancelAndErrorWithReplacement<T>(StreamSubscription<T> subscription,
_Future future, Exception error) {
AsyncError replacement = Zone.current.errorCallback(error);
if (replacement != null) {
error = (Exception) _async._nonNullError(replacement);
}
_cancelAndError(subscription, future, error);
}
internal delegate void _ErrorCallback(Exception error);
internal static _ErrorCallback _cancelAndErrorClosure<T>(
StreamSubscription<T> subscription, _Future future) {
return (error) => {
_cancelAndError(subscription, future, error);
};
}
internal static void _cancelAndValue<T>(StreamSubscription<T> subscription, _Future future, object value) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(FutureOr.value(value)));
} else {
future._complete(FutureOr.value(value));
}
}
static void _cancelAndError<T>(StreamSubscription<T> subscription, _Future future, Exception error
) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error));
} else {
future._completeError(error);
}
}
internal static void _cancelAndValue<T>(StreamSubscription<T> subscription, _Future future, FutureOr value) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
future._complete(value);
}
}
}
}

3
com.unity.uiwidgets/Runtime/async/stream_pipe.cs.meta


fileFormatVersion: 2
guid: ddf391c6af5c41bb8e27a4a5f0149c65
timeCreated: 1629257330
正在加载...
取消
保存