浏览代码

_AsBroadcastStream, _AsBroadcastStreamController

/main
siyao 3 年前
当前提交
cf41ad6b
共有 5 个文件被更改,包括 748 次插入602 次删除
  1. 9
      com.unity.uiwidgets/Runtime/async/async_cast.cs
  2. 131
      com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs
  3. 998
      com.unity.uiwidgets/Runtime/async/stream.cs
  4. 19
      com.unity.uiwidgets/Runtime/async/stream_controller.cs
  5. 193
      com.unity.uiwidgets/Runtime/async/stream_impl.cs

9
com.unity.uiwidgets/Runtime/async/async_cast.cs


using Unity.UIWidgets.async;
public class CastStream<S, T> : Stream<T> where T : class {
public class CastStream<S, T> : Stream<T> {
readonly Stream<S> _source;
public CastStream(Stream<S> _source) {

}
class CastStreamSubscription<S, T> : StreamSubscription<T> where T : class {
class CastStreamSubscription<S, T> : StreamSubscription<T> {
readonly StreamSubscription<S> _source;
/// Zone where listen was called.

if (_handleData == null) return;
T targetData;
try {
targetData = data as T;
// siyao: this might go wrong
targetData = (T)(object) data ;
} catch (Exception error) {
if (_handleError == null) {
_zone.handleUncaughtError(error);

class CastStreamTransformer<SS, ST, TS, TT>
: StreamTransformerBase<TS, TT> where TT : class where ST : class where SS : class {
: StreamTransformerBase<TS, TT> where TT : class where ST : class {
public readonly StreamTransformer<SS, ST> _source;
public CastStreamTransformer(StreamTransformer<SS, ST> _source) {

131
com.unity.uiwidgets/Runtime/async/broadcast_stream_controller.cs


const int _STATE_INITIAL = 0;
const int _STATE_EVENT_ID = 1;
internal const int _STATE_FIRING = 2;
const int _STATE_CLOSED = 4;
protected const int _STATE_CLOSED = 4;
const int _STATE_ADDSTREAM = 8;
_stream.ControllerCallback onListen;

// _StreamControllerLifecycle interface.
StreamSubscription<T> _subscribe(
public override StreamSubscription<T> _subscribe(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
if (isClosed) {
onDone ??= _stream._nullDoneHandler;

class _SyncBroadcastStreamController<T> : _BroadcastStreamController<T>
, SynchronousStreamController<T> {
internal _SyncBroadcastStreamController(
_stream.ControllerCallback onListen, _stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
_stream.ControllerCallback onListen, Action onCancel)
: base(onListen, ()=> {
onCancel();
return Future._nullFuture;
}) {
}

// * an "asBroadcastStream" stream are always initiated by events
// * on another stream, and it is fine to forward them synchronously.
// */
// class _AsBroadcastStreamController<T> : _SyncBroadcastStreamController<T>
// , _EventDispatch<T> {
// _StreamImplEvents<T> _pending;
//
// _AsBroadcastStreamController(Action onListen, Action onCancel)
// : base(()=>onListen(), onCancel);
//
// bool get _hasPending => _pending != null && !_pending.isEmpty;
//
// void _addPendingEvent(_DelayedEvent event) {
// _pending ??= new _StreamImplEvents<T>();
// _pending.add(event);
// }
//
// void add(T data) {
// if (!isClosed && _isFiring) {
// _addPendingEvent(new _DelayedData<T>(data));
// return;
// }
// base.add(data);
// while (_hasPending) {
// _pending.handleNext(this);
// }
// }
//
// void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
// stackTrace ??= AsyncError.defaultStackTrace(error);
// if (!isClosed && _isFiring) {
// _addPendingEvent(new _DelayedError(error, stackTrace));
// return;
// }
// if (!_mayAddEvent) throw _addEventError();
// _sendError(error, stackTrace);
// while (_hasPending) {
// _pending.handleNext(this);
// }
// }
//
// Future close() {
// if (!isClosed && _isFiring) {
// _addPendingEvent(const _DelayedDone());
// _state |= _BroadcastStreamController._STATE_CLOSED;
// return base.done;
// }
// Future result = base.close();
// D.assert(!_hasPending);
// return result;
// }
//
// void _callOnCancel() {
// if (_hasPending) {
// _pending.clear();
// _pending = null;
// }
// base._callOnCancel();
// }
// }
class _AsBroadcastStreamController<T> : _SyncBroadcastStreamController<T>
, _EventDispatch<T> {
_StreamImplEvents<T> _pending;
internal _AsBroadcastStreamController(Action onListen, Action onCancel)
: base(() => onListen(), onCancel) {
}
bool _hasPending {
get { return _pending != null && !_pending.isEmpty; }
}
void _addPendingEvent(_DelayedEvent<T> evt) {
_pending ??= new _StreamImplEvents<T>();
_pending.add(evt);
}
void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
base.add(data);
while (_hasPending) {
_pending.handleNext(this);
}
}
void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
stackTrace ??= AsyncError.defaultStackTrace(error);
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedError<T>((Exception) error, stackTrace));
return;
}
if (!_mayAddEvent) throw _addEventError();
_sendError(error, stackTrace);
while (_hasPending) {
_pending.handleNext(this);
}
}
Future close() {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedDone<T>());
_state |= _BroadcastStreamController<T>._STATE_CLOSED;
return base.done;
}
Future result = base.close();
D.assert(!_hasPending);
return result;
}
void _callOnCancel() {
if (_hasPending) {
_pending.clear();
_pending = null;
}
base._callOnCancel();
}
}
}

998
com.unity.uiwidgets/Runtime/async/stream.cs
文件差异内容过多而无法显示
查看文件

19
com.unity.uiwidgets/Runtime/async/stream_controller.cs


public static StreamController<T> broadcast(
Action onListen = null, Action onCancel = null, bool sync = false) {
return sync
? (StreamController<T>) new _SyncBroadcastStreamController<T>(() => onListen(), () => {
onCancel();
return Future._nullFuture;
})
? (StreamController<T>) new _SyncBroadcastStreamController<T>(() => onListen(), onCancel)
: new _AsyncBroadcastStreamController<T>(() => onListen(), () => {
onCancel();
return Future._nullFuture;

interface _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(
Action<T> onData, Action onError, Action onDone, bool cancelOnError);
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription);
void _recordResume(StreamSubscription<T> subscription);

_StreamControllerLifecycle<T>,
_EventSink<T>,
_EventDispatch<T> {
public StreamSubscription<T> _subscribe(Action<T> onData, Action onError, Action onDone, bool cancelOnError) {
throw new NotImplementedException();
}
public abstract StreamSubscription<T> _subscribe(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError);
public void _recordPause(StreamSubscription<T> subscription) {
}

// _StreamControllerLifeCycle interface
StreamSubscription<T> _subscribe(
public override StreamSubscription<T> _subscribe(
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError) {

}
StreamSubscription<T> _createSubscription(
Action<T> onData, Action onError, Action onDone, bool cancelOnError) =>
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) =>
_controller._subscribe(onData, onError, onDone, cancelOnError);
// Override == and hashCode so that new streams returned by the same

193
com.unity.uiwidgets/Runtime/async/stream_impl.cs


}
}
// class _AsBroadcastStream<T> : Stream<T> {
// readonly Stream<T> _source;
// readonly _stream._BroadcastCallback<T> _onListenHandler;
// readonly _stream._BroadcastCallback<T> _onCancelHandler;
// readonly Zone _zone;
//
// _AsBroadcastStreamController<T> _controller;
// StreamSubscription<T> _subscription;
//
// _AsBroadcastStream(
// this._source,
// void onListenHandler(StreamSubscription<T> subscription),
// void onCancelHandler(StreamSubscription<T> subscription))
// // TODO(floitsch): the return type should be void and should be
// // inferred.
// : _onListenHandler = Zone.current
// .registerUnaryCallback<dynamic, StreamSubscription<T>>(
// onListenHandler),
// _onCancelHandler = Zone.current
// .registerUnaryCallback<dynamic, StreamSubscription<T>>(
// onCancelHandler),
// _zone = Zone.current {
// _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
// }
//
// bool get isBroadcast => true;
//
// StreamSubscription<T> listen(void onData(T data),
// {Function onError, void onDone(), bool cancelOnError}) {
// if (_controller == null || _controller.isClosed) {
// // Return a dummy subscription backed by nothing, since
// // it will only ever send one done event.
// return new _DoneStreamSubscription<T>(onDone);
// }
// _subscription ??= _source.listen(_controller.add,
// onError: _controller.addError, onDone: _controller.close);
// cancelOnError = identical(true, cancelOnError);
// return _controller._subscribe(onData, onError, onDone, cancelOnError);
// }
//
// void _onCancel() {
// bool shutdown = (_controller == null) || _controller.isClosed;
// if (_onCancelHandler != null) {
// _zone.runUnary(
// _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
// }
// if (shutdown) {
// if (_subscription != null) {
// _subscription.cancel();
// _subscription = null;
// }
// }
// }
//
// void _onListen() {
// if (_onListenHandler != null) {
// _zone.runUnary(
// _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
// }
// }
//
// // Methods called from _BroadcastSubscriptionWrapper.
// void _cancelSubscription() {
// if (_subscription == null) return;
// // Called by [_controller] when it has no subscribers left.
// StreamSubscription subscription = _subscription;
// _subscription = null;
// _controller = null; // Marks the stream as no longer listenable.
// subscription.cancel();
// }
//
// void _pauseSubscription(Future resumeSignal) {
// if (_subscription == null) return;
// _subscription.pause(resumeSignal);
// }
//
// void _resumeSubscription() {
// if (_subscription == null) return;
// _subscription.resume();
// }
//
// bool get _isSubscriptionPaused {
// if (_subscription == null) return false;
// return _subscription.isPaused;
// }
// }
class _AsBroadcastStream<T> : Stream<T> {
readonly Stream<T> _source;
readonly _stream._BroadcastCallback<T> _onListenHandler;
readonly _stream._BroadcastCallback<T> _onCancelHandler;
readonly Zone _zone;
_AsBroadcastStreamController<T> _controller;
StreamSubscription<T> _subscription;
internal _AsBroadcastStream(
Stream<T> _source,
Action<StreamSubscription<T>> onListenHandler,
Action<StreamSubscription<T>> onCancelHandler)
// TODO(floitsch): the return type should be void and should be
// inferred.
{
this._source = _source;
_onListenHandler = a=>Zone.current
.registerUnaryCallback(
b => {
onListenHandler((StreamSubscription<T>) b);
return default;
}
)(a);
_onCancelHandler = d=> Zone.current
.registerUnaryCallback(
c=> {
onCancelHandler((StreamSubscription<T>) c);
return default;
})(d);
_zone = Zone.current;
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
}
bool isBroadcast {
get { return true; }
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
if (_controller == null || _controller.isClosed) {
// Return a dummy subscription backed by nothing, since
// it will only ever send one done event.
return new _DoneStreamSubscription<T>(()=>onDone());
}
_subscription ??= _source.listen(_controller.add,
onError: _controller.addError, onDone: ()=>_controller.close());
cancelOnError = Equals(true, cancelOnError);
return _controller._subscribe(onData, onError, onDone, cancelOnError);
}
void _onCancel() {
bool shutdown = (_controller == null) || _controller.isClosed;
if (_onCancelHandler != null) {
_zone.runUnary(
a=> {
_onCancelHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
}
if (shutdown) {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
}
}
}
void _onListen() {
if (_onListenHandler != null) {
_zone.runUnary(
a => {
_onListenHandler((StreamSubscription<T>) a);
return default;
}, new _BroadcastSubscriptionWrapper<T>(this));
}
}
// Methods called from _BroadcastSubscriptionWrapper.
void _cancelSubscription() {
if (_subscription == null) return;
// Called by [_controller] when it has no subscribers left.
StreamSubscription<T> subscription = _subscription;
_subscription = null;
_controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
}
void _pauseSubscription(Future resumeSignal) {
if (_subscription == null) return;
_subscription.pause(resumeSignal);
}
void _resumeSubscription() {
if (_subscription == null) return;
_subscription.resume();
}
bool _isSubscriptionPaused{get {
if (_subscription == null) return false;
return _subscription.isPaused;
}}
}
internal class _StreamIterator<T> : StreamIterator<T> {

正在加载...
取消
保存