1 year ago
#381981
Denis
How to cache last onNext signal but not onError in a Flux?
I am in the situation where I want a Flux to be converted to a hot source and cache its last onNext signal but not error signals.
Suppose I have the following Flux
Flux a = fluxThatSignalsSomeUserInput()
.switchMap(input -> timeConsumingMono(input)
.subscribeOn(Schedulers.boundedElastic())
.timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
)
.cache(1)
Somewhere else I want to use the last emitted onNext signal from Flux a
like this
a.next()
.map(processedUserInput -> doSomething())
...
.subscribe()
So far this is working just fine and timeConsumingMono(input)
is executed only once per user input as intended.
But I also want to recover from errors in the original Flux a
. So I added somewhere downstream of a
the following retry
Flux b = a
...
.doOnError(t -> logError(t))
.retryWhen(Retry.maxInARow(NUM_RETRIES))
.doOnError(t -> showFatalError(t))
.subscribe()
Now, as .cache(1)
also replays error signals, the subscription to b
immediately runs into the showFatalError(t)
path on the first error/timeout in a
without retrying the timeConsumingMono(input)
the specified amount of times.
I tried to use Mono::cache
inside the switchMap instead of Flux::cache
like this to not cache error signals
Flux a = fluxThatSignalsSomeUserInput()
.switchMap(input -> timeConsumingMono(input)
.subscribeOn(Schedulers.boundedElastic())
.timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
.cache(next -> Duration.ofMillis(Long.MAX_VALUE), t -> Duration.ZERO, () -> Duration.ZERO)
)
but this way timeConsumingMono(input)
gets called multiple times when late subscribing to a
in the second code block above.
Is there an easy way to cache just onNext but not onError signals in a Flux so that late subscribers will just see the latest successful signal but the original subscriber still gets the errors?
project-reactor
0 Answers
Your Answer