bind method
- Stream<
T> stream
Transforms the provided stream
.
Returns a new stream with events that are computed from events of the
provided stream
.
The StreamTransformer interface is completely generic, so it cannot say what subclasses do. Each StreamTransformer should document clearly how it transforms the stream (on the class or variable used to access the transformer), as well as any differences from the following typical behavior:
- When the returned stream is listened to, it starts listening to the
input
stream
. - Subscriptions of the returned stream forward (in a reasonable time)
a StreamSubscription.pause call to the subscription of the input
stream
. - Similarly, canceling a subscription of the returned stream eventually
(in reasonable time) cancels the subscription of the input
stream
.
"Reasonable time" depends on the transformer and stream. Some transformers, like a "timeout" transformer, might make these operations depend on a duration. Others might not delay them at all, or just by a microtask.
Transformers are free to handle errors in any way. A transformer implementation may choose to propagate errors, or convert them to other events, or ignore them completely, but if errors are ignored, it should be documented explicitly.
Implementation
@override
Stream<T> bind(Stream<T> stream) {
var controller = stream.isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
if (isClosed) {
// Ignore errors here, because otherwise there would be no way for the
// user to handle them gracefully.
stream.listen(null).cancel().catchError((_) {});
return;
}
var subscription =
stream.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
_subscriptions.remove(subscription);
_controllers.remove(controller);
controller.close();
});
_subscriptions.add(subscription);
if (!stream.isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
controller.onCancel = () {
_controllers.remove(controller);
// If the subscription has already been removed, that indicates that the
// underlying stream has been cancelled by [close] and its cancellation
// future has been handled there. In that case, we shouldn't forward it
// here as well.
if (_subscriptions.remove(subscription)) return subscription.cancel();
return null;
};
};
if (isClosed) {
controller.close();
} else {
_controllers.add(controller);
}
return controller.stream;
}