1 year ago

#381981

test-img

Denis

How to cache last onNext signal but not onError in a Flux?

I am in the situation where I want a Flux to be converted to a hot source and cache its last onNext signal but not error signals.

Suppose I have the following Flux

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
  )
  .cache(1)

Somewhere else I want to use the last emitted onNext signal from Flux a like this

  a.next()
  .map(processedUserInput -> doSomething())
  ...
  .subscribe()

So far this is working just fine and timeConsumingMono(input) is executed only once per user input as intended.

But I also want to recover from errors in the original Flux a. So I added somewhere downstream of a the following retry

Flux b = a
  ...
  .doOnError(t -> logError(t))
  .retryWhen(Retry.maxInARow(NUM_RETRIES))
  .doOnError(t -> showFatalError(t))
  .subscribe()

Now, as .cache(1) also replays error signals, the subscription to b immediately runs into the showFatalError(t) path on the first error/timeout in a without retrying the timeConsumingMono(input) the specified amount of times.

I tried to use Mono::cache inside the switchMap instead of Flux::cache like this to not cache error signals

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
    .cache(next -> Duration.ofMillis(Long.MAX_VALUE), t -> Duration.ZERO, () -> Duration.ZERO)
  )

but this way timeConsumingMono(input) gets called multiple times when late subscribing to a in the second code block above.

Is there an easy way to cache just onNext but not onError signals in a Flux so that late subscribers will just see the latest successful signal but the original subscriber still gets the errors?

project-reactor

0 Answers

Your Answer

Accepted video resources