1 year ago




How to cache last onNext signal but not onError in a Flux?

I am in the situation where I want a Flux to be converted to a hot source and cache its last onNext signal but not error signals.

Suppose I have the following Flux

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))

Somewhere else I want to use the last emitted onNext signal from Flux a like this

  .map(processedUserInput -> doSomething())

So far this is working just fine and timeConsumingMono(input) is executed only once per user input as intended.

But I also want to recover from errors in the original Flux a. So I added somewhere downstream of a the following retry

Flux b = a
  .doOnError(t -> logError(t))
  .doOnError(t -> showFatalError(t))

Now, as .cache(1) also replays error signals, the subscription to b immediately runs into the showFatalError(t) path on the first error/timeout in a without retrying the timeConsumingMono(input) the specified amount of times.

I tried to use Mono::cache inside the switchMap instead of Flux::cache like this to not cache error signals

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
    .cache(next -> Duration.ofMillis(Long.MAX_VALUE), t -> Duration.ZERO, () -> Duration.ZERO)

but this way timeConsumingMono(input) gets called multiple times when late subscribing to a in the second code block above.

Is there an easy way to cache just onNext but not onError signals in a Flux so that late subscribers will just see the latest successful signal but the original subscriber still gets the errors?


0 Answers

Your Answer

Accepted video resources