|
|
|
|
|
|
// StreamSubscription interface.
|
|
|
|
|
|
|
|
public override void onData(Action<T> handleData) { |
|
|
|
handleData ??= _stream._nullDataHandler; |
|
|
|
handleData = handleData ?? _stream._nullDataHandler; |
|
|
|
// TODO(floitsch): the return type should be 'void', and the type
|
|
|
|
// should be inferred.
|
|
|
|
_onData = d => _zone.registerUnaryCallback(data => { |
|
|
|
|
|
|
|
|
|
|
// Siyao: c# does not support convert action
|
|
|
|
public override void onError(Action<object, string> handleError) { |
|
|
|
handleError ??= (input1, input2) => _stream._nullErrorHandler(null); |
|
|
|
handleError = handleError ?? ((input1, input2) => _stream._nullErrorHandler(null)); |
|
|
|
|
|
|
|
_onError = (_, __) => _zone |
|
|
|
.registerBinaryCallback((in1, in2) => { |
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public override void onDone(Action handleDone) { |
|
|
|
handleDone ??= _stream._nullDoneHandler; |
|
|
|
handleDone = handleDone ?? _stream._nullDoneHandler; |
|
|
|
_onDone = () => _zone.registerCallback(() => { |
|
|
|
handleDone(); |
|
|
|
return null; |
|
|
|
|
|
|
D.assert(!_inCallback); |
|
|
|
bool wasInputPaused = _isInputPaused; |
|
|
|
_state |= _STATE_IN_CALLBACK; |
|
|
|
_zone.runUnaryGuarded(data => { |
|
|
|
_onData((T) data); |
|
|
|
_zone.runUnaryGuarded(data1 => { |
|
|
|
_onData((T) data1); |
|
|
|
return null; |
|
|
|
}, data); |
|
|
|
_state &= ~_STATE_IN_CALLBACK; |
|
|
|
|
|
|
// TODO(floitsch): this dynamic should be 'void'.
|
|
|
|
var onError = _onError; |
|
|
|
if (onError != null) { |
|
|
|
_zone.runBinaryGuarded((error, stack) => { |
|
|
|
onError((Exception) error, (string) stack); |
|
|
|
_zone.runBinaryGuarded((error1, stack) => { |
|
|
|
onError((Exception) error1, (string) stack); |
|
|
|
return null; |
|
|
|
}, error, stackTrace); |
|
|
|
} |
|
|
|
|
|
|
return new _DoneStreamSubscription<T>(() => onDone()); |
|
|
|
} |
|
|
|
|
|
|
|
_subscription ??= _source.listen(_controller.add, |
|
|
|
_subscription = _subscription ?? _source.listen(_controller.add, |
|
|
|
onError: _controller.addError, onDone: () => _controller.close()); |
|
|
|
cancelOnError = Equals(true, cancelOnError); |
|
|
|
return _controller._subscribe(onData, onError, onDone, cancelOnError); |
|
|
|