浏览代码

add multiStream

/main
Xingwei Zhu 3 年前
当前提交
82cbe604
共有 3 个文件被更改,包括 79 次插入8 次删除
  1. 12
      com.unity.uiwidgets/Runtime/async/stream.cs
  2. 16
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  3. 59
      com.unity.uiwidgets/Runtime/async/stream_impl.cs

12
com.unity.uiwidgets/Runtime/async/stream.cs


() => (_PendingEvents<T>) new _IterablePendingEvents<T>(elements));
}
public static Stream<T> multi(Action<MultiStreamController<T>> onListen, bool isBroadcast = false) {
return new _MultiStream<T>(onListen, isBroadcast);
}
public static Stream<T> periodic(TimeSpan period,
Func<int, T> computation = null) {
Timer timer = default;

_sink.close();
return Future._nullFuture;
}
}
public interface MultiStreamController<T> : IStreamController<T> {
void addSync(T value);
void addErrorSync(object error, string trackStack);
void closeSync();
}
}

16
com.unity.uiwidgets/Runtime/async/stream_controller.cs


abstract class _StreamController<T> : _StreamControllerBase<T> {
/** The controller is in its initial state with no subscription. */
const int _STATE_INITIAL = 0;
internal const int _STATE_INITIAL = 0;
const int _STATE_SUBSCRIBED = 1;
internal const int _STATE_SUBSCRIBED = 1;
const int _STATE_CANCELED = 2;
internal const int _STATE_CANCELED = 2;
const int _STATE_SUBSCRIPTION_MASK = 3;
internal const int _STATE_SUBSCRIPTION_MASK = 3;
// The following state relate to the controller, not the subscription.
// If closed, adding more events is not allowed.

const int _STATE_CLOSED = 4;
const int _STATE_ADDSTREAM = 8;
internal const int _STATE_CLOSED = 4;
internal const int _STATE_ADDSTREAM = 8;
// @pragma("vm:entry-point")
object _varData;

int _state = _STATE_INITIAL;
protected int _state = _STATE_INITIAL;
// TODO(lrn): Could this be stored in the varData field too, if it's not
// accessed until the call to "close"? Then we need to special case if it's

}
}
Exception _badEventState() {
protected Exception _badEventState() {
if (isClosed) {
return new Exception("Cannot add event after closing");
}

59
com.unity.uiwidgets/Runtime/async/stream_impl.cs


return new _DoneStreamSubscription<T>(() => onDone());
}
}
class _MultiStream<T> : Stream<T> {
public override bool isBroadcast {
get {
return _isBroadcast;
}
}
bool _isBroadcast;
/// The callback called for each listen.
public readonly Action<MultiStreamController<T>> _onListen;
public _MultiStream(Action<MultiStreamController<T>> _onListen, bool isBroadcast) {
_isBroadcast = isBroadcast;
this._onListen = _onListen;
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
var controller = new _MultiStreamController<T>();
controller.onListen = () => {
_onListen(controller);
};
return controller._subscribe(
onData, onError, onDone, cancelOnError);
}
}
class _MultiStreamController<T> : _AsyncStreamController<T>, MultiStreamController<T> {
public _MultiStreamController() : base(null, null, null, null)
{
}
public void addSync(T value) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) _subscription._add(value);
}
public void addErrorSync(object error, string trackStack) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) {
_subscription._addError(error, trackStack ?? "");
}
}
public void closeSync() {
if (isClosed) return;
if (!_mayAddEvent) throw _badEventState();
_state |= _StreamController<T>._STATE_CLOSED;
if (hasListener) _subscription._close();
}
public override Stream<T> stream {
get {
throw new Exception("Not available");
}
}
}
}
正在加载...
取消
保存