-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Using RxJava 3.1.10.
The javadoc of Flowable.fromFuture
suggests to use a .doOnCancel(() -> future.cancel(true))
operator to cancel the future when the Flowable gets cancelled. However, this can result in RxJava outputting undeliverable exceptions:
Caused by: java.util.concurrent.CancellationException: Task was cancelled.
at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1574)
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:594)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:575)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:43)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoOnLifecycle.subscribeActual(FlowableDoOnLifecycle.java:39)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16095)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:80)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:71)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Corresponding RxJava code for reference:
Flowable.fromFuture(future)
.doOnCancel(() -> future.cancel(true))
.doOnNext(results -> LOG.debug("...")
.subscribeOn(Schedulers.io())
If I understand correctly, this is because the FlowableDoOnLifecycle
(used by .doOnCancel
) executes its action first (i.e. cancelling the Future), before it cancels the subscription. This causes the FlowableFromFuture#subscribeActual
to do an onError
, which can't be forwarded by FlowableDoOnLifecycle#onError
because it already set its upstream
to SubscriptionHelper.CANCELLED
.
As a workaround, I added a .onErrorComplete(t -> t instanceof CancellationException)
operator.
I'm guessing the ordering in FlowableDoOnLifecycle logic is chosen for a reason, and can't be changed. I'd suggest then instead to add support for cancelling a Future into FlowableFromFuture
.