1 year ago

#389267

test-img

Julian A.

Why does collecting this Flow not terminate?

As part of practicing coroutines and flow, I tried an implementation of RxJava's amb/race operator where I don't allow myself to use channels directly. Or, to put it differently, I use shareIn (SharedFlow).

It runs fine except that flow collection suspends indefinitely, even though the jobs used to collect the losing flows are completed and the winning flow has emitted all its items.

Why?

I spent a couple days trying to figure out what might be going on, without success. Here's a gist of my work, including the code I'm using to test it, and the output generated.

You'll see from the output that the flow correctly emits the exception thrown by the first flow, and cancels the third flow because it emits last (so it loses the race), and all the items in the winning flow are emitted.

This is a learning exercise, so I'm interested in the shortest path to getting this implementation to work. I'm not looking for a completely different implementation that works.

Also happy to create a project on Github with dependencies if anyone's interested in running the code directly. Let me know.

(I use Arrow's Either data-type which is basically a sealed class that represents the result of an operation. Either has two sub-types, known as Left and Right where Left holds an error if there's one, and Right holds the happy path result of the operation.)

Thank you!

PS

Since the code seems to be difficult to grok, I'll summarize what it does.

  1. Within a flow builder
  2. Get a reference to the flow's job. This will be our parent job.
  3. Combine all the flows provided to the operator in a single list.
  4. Map each emission of each flow to an Either.Right
  5. Catch each exception of each flow and emit it as an Either.Left
  6. Share each resulting flow in a CoroutineScope whose job's parent is the job in step 2. Each flow gets its own job.
  7. Transform the list of flows into a flow that emits the emissions of each of the flows i.e. flat-map the constituent flows. Do this with concurrency set to the number of flows in the list, so that the constituent flows will be collected in parallel.
  8. Collect the merged flow. Emissions will be the Left-s or Right-s produced in steps 4, 5.
  9. When collecting, emit Lefts and Rights to the outer flow of step 1.
  10. If a Left is collected, cancel the job (created in step 6) of the flow that emitted the Left, so that the flow stops emitting.
  11. If a Right is collected, cancel the jobs of any flows other than the flow that emitted the Right. Only cancel those flows that haven't already been cancelled.
@OptIn(FlowPreview::class)
fun <A> Flow<A>.ambUsingJobsToCancelRacersAndEmittingFailures(
    vararg others: Flow<A>
): Flow<Either<Throwable, A>> = flow {
    val parentJob = currentCoroutineContext()[Job]!!
    (listOf(this@ambUsingJobsToCancelRacersAndEmittingFailures) + others).foldIndexed(
        Pair(emptyList<Job>(), emptyList<Flow<Pair<Int, Either<Throwable, A>>>>())
    ) { index, acc, curr ->
        val job = Job(parentJob)
        val flow = curr.map { Pair(index, it.right() as Either<Throwable, A>) }
            .catch { emit(Pair(index, it.left() as Either<Throwable, A>)) }
            .onCompletion { println("completion $index $it") }
            .shareIn(CoroutineScope(job), SharingStarted.WhileSubscribed())
        Pair(acc.first + job, acc.second + flow)
    }.let { (jobs, flows) ->
        var winnerIndex: Int? = null
        flows.asFlow()
            .flatMapMerge(concurrency = flows.size) { it }
            .collect { (index, item) ->
                if (winnerIndex == null) {
                    item.fold(ifLeft = {
                        println("cancelling $index")
                        jobs[index].cancelAndJoin()
                    }, ifRight = {
                        winnerIndex = index
                        println("winnerIndex $winnerIndex")
                        jobs.forEachIndexed { i, job ->
                            if (!job.isCancelled && i != winnerIndex) {
                                println("cancelling $i")
                                job.cancelAndJoin()
                            }
                        }
                    })
                }
                emit(item)
                jobs.forEachIndexed { i, job ->
                    println("job cancelled $i ${job.isCancelled}")
                    println("job completed $i ${job.isCompleted}")
                }
            }
        println("done!")
    }
}

val e1 = Exception("Bang!")
val f1 = flow<Int> { throw e1 }.onStart { delay(50) }
val f2 = flowOf(1, 2, 3).onStart { delay(100) }
val f3 = flowOf(10, 20, 30).onStart { delay(150) }
val f4 = f1.ambUsingJobsToCancelRacersAndEmittingFailures(f2, f3)
f4.collect { // Emits expected items, then suspends indefinitely.
    println(it)
}

Output

completion 0 null
cancelling 0
Either.Left(java.lang.Exception: Bang!)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 false
job completed 2 false
completion 1 null
winnerIndex 1
cancelling 2
completion 2 kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@97269b4
Either.Right(1)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true
Either.Right(2)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true
Either.Right(3)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true

kotlin

kotlin-coroutines

kotlin-flow

0 Answers

Your Answer

Accepted video resources