|
|
|
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Diagnostics; |
|
|
|
using System.IO; |
|
|
|
using System.Text; |
|
|
|
using Unity.UIWidgets.async; |
|
|
|
using Unity.UIWidgets.core; |
|
|
|
using Unity.UIWidgets.foundation; |
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public Stream<E> asyncMap<E>(Func<T,FutureOr> convert) { |
|
|
|
_StreamControllerBase<E> controller; |
|
|
|
StreamSubscription<T> subscription; |
|
|
|
_StreamControllerBase<E> controller = null; |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
|
|
|
|
void onListen() { |
|
|
|
var add = new Action<E>(controller.add); |
|
|
|
|
|
|
subscription = this.listen((T evt) => { |
|
|
|
subscription = listen((T evt) => { |
|
|
|
FutureOr newValue; |
|
|
|
try { |
|
|
|
newValue = convert(evt); |
|
|
|
|
|
|
} |
|
|
|
if (newValue is Future<E>) { |
|
|
|
if (newValue is Future<E> newFuture) { |
|
|
|
newValue |
|
|
|
.then(add, onError: addError) |
|
|
|
newFuture |
|
|
|
.then(d =>add((E) d), onError: (e)=> { |
|
|
|
addError(e, e.StackTrace); |
|
|
|
return FutureOr.nil; |
|
|
|
}) |
|
|
|
controller.add(newValue); |
|
|
|
// Siyao: This works as if this is csharpt
|
|
|
|
controller.add((E) newValue.v); |
|
|
|
}, onError: addError, onDone: controller.close); |
|
|
|
}, onError: addError, onDone: ()=>controller.close()); |
|
|
|
if (this.isBroadcast) { |
|
|
|
controller = new StreamController<E>.broadcast( |
|
|
|
onListen: onListen, |
|
|
|
onCancel: () { |
|
|
|
subscription.cancel(); |
|
|
|
if (isBroadcast) { |
|
|
|
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast( |
|
|
|
onListen: ()=> onListen(), |
|
|
|
onCancel: () => { |
|
|
|
subscription.cancel(); |
|
|
|
controller = new StreamController<E>( |
|
|
|
controller = (_StreamControllerBase<E>) StreamController<E>.create( |
|
|
|
onPause: () { |
|
|
|
subscription.pause(); |
|
|
|
onPause: () => { |
|
|
|
subscription.pause(); |
|
|
|
onResume: () { |
|
|
|
subscription.resume(); |
|
|
|
onResume: () => { |
|
|
|
subscription.resume(); |
|
|
|
}, |
|
|
|
onCancel: () => subscription.cancel(), |
|
|
|
sync: true); |
|
|
|
|
|
|
|
|
|
|
Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
|
|
|
_StreamControllerBase<E> controller; |
|
|
|
StreamSubscription<T> subscription; |
|
|
|
Stream<E> asyncExpand<E>(Func<T,Stream<E>> convert) { |
|
|
|
_StreamControllerBase<E> controller = null; |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
D.assert(controller is _StreamController || |
|
|
|
controller is _BroadcastStreamController); |
|
|
|
subscription = this.listen((T event) { |
|
|
|
D.assert(controller is _StreamController<E> || |
|
|
|
controller is _BroadcastStreamController<E>); |
|
|
|
subscription = listen((T evt) => { |
|
|
|
newStream = convert(event); |
|
|
|
} catch (e, s) { |
|
|
|
controller.addError(e, s); |
|
|
|
newStream = convert(evt); |
|
|
|
} catch (Exception e) { |
|
|
|
controller.addError(e, e.StackTrace); |
|
|
|
return; |
|
|
|
} |
|
|
|
if (newStream != null) { |
|
|
|
|
|
|
}, |
|
|
|
onError: controller._addError, // Avoid Zone error replacement.
|
|
|
|
onDone: controller.close); |
|
|
|
onDone: ()=>controller.close()); |
|
|
|
if (this.isBroadcast) { |
|
|
|
controller = new StreamController<E>.broadcast( |
|
|
|
onListen: onListen, |
|
|
|
onCancel: () { |
|
|
|
subscription.cancel(); |
|
|
|
if (isBroadcast) { |
|
|
|
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast( |
|
|
|
onListen:()=>onListen(), |
|
|
|
onCancel: () =>{ |
|
|
|
subscription.cancel(); |
|
|
|
controller = new StreamController<E>( |
|
|
|
onListen: onListen, |
|
|
|
onPause: () { |
|
|
|
subscription.pause(); |
|
|
|
controller = (_StreamControllerBase<E>) StreamController<E>.create( |
|
|
|
onListen:()=> onListen(), |
|
|
|
onPause: () =>{ |
|
|
|
subscription.pause(); |
|
|
|
onResume: () { |
|
|
|
subscription.resume(); |
|
|
|
onResume: () =>{ |
|
|
|
subscription.resume(); |
|
|
|
}, |
|
|
|
onCancel: () => subscription.cancel(), |
|
|
|
sync: true); |
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Future pipe(StreamConsumer<T> streamConsumer) { |
|
|
|
return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
|
|
|
return streamConsumer.addStream(this).then((_) => streamConsumer.close(), (_)=>FutureOr.nil); |
|
|
|
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
|
|
|
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) where S : class { |
|
|
|
Future<T> reduce(T combine(T previous, T element)) { |
|
|
|
_Future<T> result = new _Future<T>(); |
|
|
|
Future<T> reduce(Func<T,T,T> combine) { |
|
|
|
_Future result = new _Future(); |
|
|
|
T value; |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
T value = default; |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T element)=> { |
|
|
|
_runUserCode(() => combine(value, element), (T newValue) { |
|
|
|
_stream._runUserCode(() => combine(value, element), (T newValue) =>{ |
|
|
|
}, _cancelAndErrorClosure(subscription, result)); |
|
|
|
},onError: (e) => _stream._cancelAndErrorClosure(subscription, result)(e)); |
|
|
|
onError: result._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e, s)=>result._completeError((Exception) e), |
|
|
|
onDone: () => { |
|
|
|
throw IterableElementError.noElement(); |
|
|
|
} catch (e, s) { |
|
|
|
_completeWithErrorCallback(result, e, s); |
|
|
|
throw new Exception("IterableElementError.noElement()"); |
|
|
|
} catch (Exception e) { |
|
|
|
async_._completeWithErrorCallback(result, e); |
|
|
|
result._complete(value); |
|
|
|
// TODO: need check
|
|
|
|
result._complete(FutureOr.value(value)); |
|
|
|
return result; |
|
|
|
return result.to<T>(); |
|
|
|
Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
|
|
|
_Future<S> result = new _Future<S>(); |
|
|
|
Future<S> fold<S>(S initialValue, Func<S,T,S> combine) { |
|
|
|
_Future result = new _Future(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
_runUserCode(() => combine(value, element), (S newValue) { |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T element) => { |
|
|
|
_stream._runUserCode(() => combine(value, element), (S newValue) => { |
|
|
|
}, _cancelAndErrorClosure(subscription, result)); |
|
|
|
},e=> _stream._cancelAndErrorClosure(subscription, result)(e)); |
|
|
|
onError: result._completeError, |
|
|
|
onDone: () { |
|
|
|
result._complete(value); |
|
|
|
onError: (e, s)=>result._completeError((Exception) e), |
|
|
|
onDone: () =>{ |
|
|
|
result._complete(FutureOr.value(value)); |
|
|
|
return result; |
|
|
|
return result.to<S>(); |
|
|
|
Future<String> join([String separator = ""]) { |
|
|
|
_Future<String> result = new _Future<String>(); |
|
|
|
StringBuffer buffer = new StringBuffer(); |
|
|
|
StreamSubscription subscription; |
|
|
|
Future<string> join(string separator = "") { |
|
|
|
_Future result = new _Future(); |
|
|
|
StringBuilder buffer = new StringBuilder(); |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
subscription = listen( |
|
|
|
(T element) => { |
|
|
|
buffer.write(separator); |
|
|
|
buffer.Append(separator); |
|
|
|
buffer.write(element); |
|
|
|
} catch (e, s) { |
|
|
|
_cancelAndErrorWithReplacement(subscription, result, e, s); |
|
|
|
buffer.Append(element); |
|
|
|
} catch (Exception e) { |
|
|
|
_stream._cancelAndErrorWithReplacement(subscription, result, e); |
|
|
|
onError: result._completeError, |
|
|
|
onDone: () { |
|
|
|
result._complete(buffer.toString()); |
|
|
|
onError: (e,_)=>result._completeError((Exception) e), |
|
|
|
onDone: () => { |
|
|
|
result._complete(buffer.ToString()); |
|
|
|
return result; |
|
|
|
return result.to<string>(); |
|
|
|
_Future<bool> future = new _Future<bool>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
_runUserCode(() => (element == needle), (bool isMatch) { |
|
|
|
_Future future = new _Future(); |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T element) => { |
|
|
|
_stream._runUserCode(() => (Equals(element, needle)), (bool isMatch) => { |
|
|
|
_cancelAndValue(subscription, future, true); |
|
|
|
_stream._cancelAndValue(subscription, future, true); |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
return future; |
|
|
|
return future.to<bool>(); |
|
|
|
Future forEach(void action(T element)) { |
|
|
|
Future forEach(Action<T> action) { |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T element) => { |
|
|
|
_runUserCode<dynamic>(() => action(element), (_) {}, |
|
|
|
_cancelAndErrorClosure(subscription, future)); |
|
|
|
_stream._runUserCode<object>(() => { |
|
|
|
action(element); |
|
|
|
return default; |
|
|
|
}, (_) =>{}, |
|
|
|
(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
future._complete(null); |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(FutureOr.nil); |
|
|
|
Future<bool> every(bool test(T element)) { |
|
|
|
_Future<bool> future = new _Future<bool>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
_runUserCode(() => test(element), (bool isMatch) { |
|
|
|
Future<bool> every(Func<T,bool> test) { |
|
|
|
_Future future = new _Future(); |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T element) =>{ |
|
|
|
_stream._runUserCode(() => test(element), (bool isMatch) =>{ |
|
|
|
_cancelAndValue(subscription, future, false); |
|
|
|
_stream._cancelAndValue(subscription, future, false); |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},ex => _stream._cancelAndErrorClosure(subscription, future)(ex)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (ex, s)=> future._completeError((Exception) ex), |
|
|
|
onDone: () => { |
|
|
|
return future; |
|
|
|
return future.to<bool>(); |
|
|
|
Future<bool> any(bool test(T element)) { |
|
|
|
_Future<bool> future = new _Future<bool>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
(T element) { |
|
|
|
_runUserCode(() => test(element), (bool isMatch) { |
|
|
|
Future<bool> any(Func<T, bool> test) { |
|
|
|
_Future future = new _Future(); |
|
|
|
StreamSubscription<T> subscription; |
|
|
|
subscription = listen( |
|
|
|
(T element)=> { |
|
|
|
_stream._runUserCode(() => test(element), (bool isMatch) =>{ |
|
|
|
_cancelAndValue(subscription, future, true); |
|
|
|
_stream._cancelAndValue(subscription, future, true); |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(false); |
|
|
|
}, |
|
|
|
cancelOnError: true); |
|
|
|
|
|
|
Future<int> get length { |
|
|
|
_Future<int> future = new _Future<int>(); |
|
|
|
int count = 0; |
|
|
|
this.listen( |
|
|
|
listen( |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(count); |
|
|
|
}, |
|
|
|
cancelOnError: true); |
|
|
|
|
|
|
Future<bool> get isEmpty { |
|
|
|
_Future<bool> future = new _Future<bool>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
_cancelAndValue(subscription, future, false); |
|
|
|
_stream._cancelAndValue(subscription, future, false); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(true); |
|
|
|
}, |
|
|
|
cancelOnError: true); |
|
|
|
|
|
|
Future<List<T>> toList() { |
|
|
|
List<T> result = <T>[]; |
|
|
|
_Future<List<T>> future = new _Future<List<T>>(); |
|
|
|
this.listen( |
|
|
|
listen( |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(result); |
|
|
|
}, |
|
|
|
cancelOnError: true); |
|
|
|
|
|
|
Future<Set<T>> toSet() { |
|
|
|
Set<T> result = new Set<T>(); |
|
|
|
_Future<Set<T>> future = new _Future<Set<T>>(); |
|
|
|
this.listen( |
|
|
|
listen( |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._complete(result); |
|
|
|
}, |
|
|
|
cancelOnError: true); |
|
|
|
|
|
|
|
|
|
|
Future<T> get first { |
|
|
|
_Future<T> future = new _Future<T>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
_cancelAndValue(subscription, future, value); |
|
|
|
_stream._cancelAndValue(subscription, future, value); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
try { |
|
|
|
throw IterableElementError.noElement(); |
|
|
|
} catch (e, s) { |
|
|
|
|
|
|
foundResult = true; |
|
|
|
result = value; |
|
|
|
}, |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
if (foundResult) { |
|
|
|
future._complete(result); |
|
|
|
return; |
|
|
|
|
|
|
_Future<T> future = new _Future<T>(); |
|
|
|
T result; |
|
|
|
bool foundResult = false; |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
(T value) { |
|
|
|
if (foundResult) { |
|
|
|
// This is the second element we get.
|
|
|
|
|
|
|
foundResult = true; |
|
|
|
result = value; |
|
|
|
}, |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
if (foundResult) { |
|
|
|
future._complete(result); |
|
|
|
return; |
|
|
|
|
|
|
|
|
|
|
Future<T> firstWhere(bool test(T element), {T orElse()}) { |
|
|
|
_Future<T> future = new _Future(); |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
_runUserCode(() => test(value), (bool isMatch) { |
|
|
|
_stream._runUserCode(() => test(value), (bool isMatch) => { |
|
|
|
_cancelAndValue(subscription, future, value); |
|
|
|
_stream._cancelAndValue(subscription, future, value); |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
_runUserCode(orElse, future._complete, future._completeError); |
|
|
|
_stream._runUserCode(orElse, future._complete, future._completeError); |
|
|
|
return; |
|
|
|
} |
|
|
|
try { |
|
|
|
|
|
|
_Future<T> future = new _Future(); |
|
|
|
T result; |
|
|
|
bool foundResult = false; |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
_runUserCode(() => true == test(value), (bool isMatch) { |
|
|
|
_stream._runUserCode(() => true == test(value), (bool isMatch) => { |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
_runUserCode(orElse, future._complete, future._completeError); |
|
|
|
_stream._runUserCode(orElse, future._complete, future._completeError); |
|
|
|
return; |
|
|
|
} |
|
|
|
try { |
|
|
|
|
|
|
_Future<T> future = new _Future<T>(); |
|
|
|
T result; |
|
|
|
bool foundResult = false; |
|
|
|
StreamSubscription subscription; |
|
|
|
subscription = this.listen( |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = listen( |
|
|
|
_runUserCode(() => true == test(value), (bool isMatch) { |
|
|
|
_stream._runUserCode(() => true == test(value), (bool isMatch) => { |
|
|
|
if (isMatch) { |
|
|
|
if (foundResult) { |
|
|
|
try { |
|
|
|
|
|
|
foundResult = true; |
|
|
|
result = value; |
|
|
|
} |
|
|
|
}, _cancelAndErrorClosure(subscription, future)); |
|
|
|
},(e)=> _stream._cancelAndErrorClosure(subscription, future)(e)); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
if (foundResult) { |
|
|
|
future._complete(result); |
|
|
|
return; |
|
|
|
|
|
|
_runUserCode(orElse, future._complete, future._completeError); |
|
|
|
_stream._runUserCode(orElse, future._complete, future._completeError); |
|
|
|
return; |
|
|
|
} |
|
|
|
throw IterableElementError.noElement(); |
|
|
|
|
|
|
ArgumentError.checkNotNull(index, "index"); |
|
|
|
RangeError.checkNotNegative(index, "index"); |
|
|
|
_Future<T> future = new _Future<T>(); |
|
|
|
StreamSubscription subscription; |
|
|
|
StreamSubscription<T> subscription = null; |
|
|
|
subscription = this.listen( |
|
|
|
subscription = listen( |
|
|
|
_cancelAndValue(subscription, future, value); |
|
|
|
_stream._cancelAndValue(subscription, future, value); |
|
|
|
onError: future._completeError, |
|
|
|
onDone: () { |
|
|
|
onError: (e,_)=>future._completeError((Exception)e), |
|
|
|
onDone: () => { |
|
|
|
future._completeError( |
|
|
|
new RangeError.index(index, this, "index", null, elementIndex)); |
|
|
|
}, |
|
|
|
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
subscription = this.listen(onData, onError: onError, onDone: onDone); |
|
|
|
subscription = listen(onData, onError: onError, onDone: onDone); |
|
|
|
timer = zone.createTimer(timeLimit, timeout); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
public interface StreamConsumer<S> { |
|
|
|
Future addStream(Stream<S> stream); |
|
|
|
|
|
|
|
// cannot define function with same name
|
|
|
|
// Future closeConsumer();
|
|
|
|
Future close(); |
|
|
|
} |
|
|
|
|
|
|
|
public abstract class StreamSink<S> : EventSink<S>, StreamConsumer<S> { |
|
|
|