Skip to content

FromFuture with cancelling leads to undeliverable exceptions #7884

@DieterDP-ng

Description

@DieterDP-ng

Using RxJava 3.1.10.

The javadoc of Flowable.fromFuture suggests to use a .doOnCancel(() -> future.cancel(true)) operator to cancel the future when the Flowable gets cancelled. However, this can result in RxJava outputting undeliverable exceptions:

Caused by: java.util.concurrent.CancellationException: Task was cancelled.
	at com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1574)
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:594)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:575)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:43)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoOnLifecycle.subscribeActual(FlowableDoOnLifecycle.java:39)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16149)
	at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:16095)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:80)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:71)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Corresponding RxJava code for reference:

Flowable.fromFuture(future)
                    .doOnCancel(() -> future.cancel(true))
                    .doOnNext(results -> LOG.debug("...")
                    .subscribeOn(Schedulers.io())

If I understand correctly, this is because the FlowableDoOnLifecycle (used by .doOnCancel) executes its action first (i.e. cancelling the Future), before it cancels the subscription. This causes the FlowableFromFuture#subscribeActual to do an onError, which can't be forwarded by FlowableDoOnLifecycle#onError because it already set its upstream to SubscriptionHelper.CANCELLED.

As a workaround, I added a .onErrorComplete(t -> t instanceof CancellationException) operator.

I'm guessing the ordering in FlowableDoOnLifecycle logic is chosen for a reason, and can't be changed. I'd suggest then instead to add support for cancelling a Future into FlowableFromFuture.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions