Kotlin Flows with Select Expression

Abhishek Srivastava
8 min readMay 28, 2023

Coroutines provide the select function, which lets us await the multiple suspending functions simultaneously and selects the first result of the coroutine that completes. It also offers the possibility of sending to the first channel that has space in the buffer or receiving from the first channel that has an available element. This lets us make races between coroutines or join results from multiple sources.

Select expressions are an experimental feature of kotlinx.coroutines. Their API is expected to evolve in the upcoming updates of the kotlinx.coroutines library with potentially breaking changes.

The select expression can be used in some of the use cases while application development. For example,

  • There’s a scenario in which you want to process “X” thing when anyone from data “A” or “B” becomes available first.
  • You are running a few operations concurrently and whichever finishes (or returns result) first, proceed with the result of the winner operation i.e. data race.

Official source : Select

Selecting deferred values

If two parallel coroutine is running for some operation like DB operation and network operation, But i want the fastest response who ever flow can do, so how we can do. To do that with existing approach I think we can do something similar using channelFlow. It launches parallel coroutines for each Deferred and they are emitted by the Flow in order, so you can then filter and pick the first one with first().

fun main() = runBlocking {
val list = asyncStringsList()
val result = channelFlow {
list.forEach { launch { send(it.await()) } }
}
.filterNotNull()
.first()
println(result)
}

But this process is very complex and time consuming.

The easiest way to achieve this is to start these requests in async processes; then, use the select function as an expression, and await different values inside it. Inside select, we can call onAwait on Deferred value, which specifies a possible select expression result. Inside its lambda expression, you can transform the value. In the example below, we just return an async result, so the select expression will complete once the first async task is completed, then it will return its result.

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.select

fun main() = runBlocking<Unit> {
val winner = select<String> {
data1().onAwait { it }
data2().onAwait { it }
}

println("The winner = $winner") // prints "The winner = Hello"
}

fun data1() = GlobalScope.async {
delay(1000)
"Hello"
}

fun data2() = GlobalScope.async {
delay(2000)
"World"
}

In this example:

  • data1() and data2() are function that return Deferred<String> in which data1 takes 1000ms to load data and data2 takes 2000ms.
  • In the “select” builder, onAwait() clause is used to wait for the result.
  • As data1() returns the result, its value ("Hello") is selected since it's completed first. Thus it's a winner.

Note: When multiple clauses can be selected at the same time, the first one which was executed of them gets selected on the priority i.e. the select function is biased toward the first clause. Use selectUnbiased for an unbiased selection among the clauses which simply shuffles/randomizes the selection.

Selecting from channels

Suppose we have two coroutines that are sending values to a Channel. Producer 1delays for 1 second and send a value while Producer 2 delays for 2 seconds before sending a value. These coroutines are created by using the produce builder. It provides a Channel for you to send values to it and gives you back a RecieveChannel instance to read values.

fun CoroutineScope.producer1() = produce<String> {
while (true) {
delay(1000)
send("Producer Item 1")
}
}
fun CoroutineScope.producer2() = produce<String> {
while (true) {
delay(2000)
send("Producer Item 2")
}
}

We could receive values from either of these producers as shown below.

val channel1: ReceiveChannel = producer1()
val channel2: ReceiveChannel = producer2()
val item1: String = channel1.receive()
val item2: String = channel2.receive()

But, suppose I want to receive values simultaneously from either producer. In order to do this, we will need to use a Select expression.

A Select expression takes a lambda block which allows you to use clauses inside of it to listen for values from multiple Channels.

inline suspend fun <R> select(
crossinline builder: SelectBuilder<R>.() -> Unit
): R

How do we create a select expression for our example?

val channel1: ReceiveChannel = producer1()
val channel2: ReceiveChannel = producer2()
select<Unit> {
channel1.onReceive {
println(it)
}
channel2.onReceive {
println(it)
}
}

Inside the select expression’s lambda, I am calling the onReceive clause on both Channels. This clause can only be used from inside a select expression. Therefore, I am listening to emissions on both of these Channels simultaneously. The first value received from either Channel will be printed to the console.

Here is a full example that you could play with.

fun main() = runBlocking {

val channel1 = producer1()
val channel2 = producer2()
repeat(10) {
select<Unit> {
channel1.onReceive {
println(it)
}
channel2.onReceive {
println(it)
}
}
}
coroutineContext.cancelChildren()
}
fun CoroutineScope.producer1() = produce<String> {
while (true) {
delay(1000)
send("Producer Item 1")
}
}
fun CoroutineScope.producer2() = produce<String> {
while (true) {
delay(2000)
send("Producer Item 2")
}
}

If you run this example, you will see that each execution of the select expression produces a new value that could be from either Channel.

Use of Select expression to combine multiple flows

Under the hood, the combine operator uses a select expression to listen to emissions from multiple Flows. Let’s look closely at the source to understand the implementation. Here is a diagram showing how it works.

Suppose we want to combine Flow A and Flow B.

  1. Two producer coroutines are created for Flow A and Flow B.
  2. A select expression is created to listen to the emission of both producers. The select expression runs until a value is received from both producers’ channels. These values emitted from both Flows are stored locally in variables.
  3. When emissions are received from both producers, they are combined with the transformation that you specify.
  4. Finally, the transformed value is given back to you in a new Flow that you could collect from.

These are the steps that are taken when you use the combine operator. Here is the source for these steps.

suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
first: Flow<T1>, second: Flow<T2>,
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {
coroutineScope {
val firstChannel = asFairChannel(first)
val secondChannel = asFairChannel(second)
var firstValue: Any? = null
var secondValue: Any? = null
var firstIsClosed = false
var secondIsClosed = false
while (!firstIsClosed || !secondIsClosed) {
select<Unit> {
onReceive(
firstIsClosed,
firstChannel,
{ firstIsClosed = true }
) { value ->
firstValue = value
if (secondValue !== null) {
transform(...)
}
}

onReceive(
secondIsClosed,
secondChannel,
{ secondIsClosed = true }
) { value ->
secondValue = value
if (firstValue !== null) {
transform(...)
}
}
}
}
}
}

Lets breakdown this source to see the steps in action.

suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
first: Flow<T1>, second: Flow<T2>,
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {

}

This function is internal and private. It takes in two Flows and a transformation that you want to apply.

  • Two producer coroutines are created for Flow A and Flow B.
suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
first: Flow<T1>, second: Flow<T2>,
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {
coroutineScope {
val firstChannel = asFairChannel(first)
val secondChannel = asFairChannel(second)
}
}
fun CoroutineScope.asFairChannel(
flow: Flow<*>
): ReceiveChannel<Any> = produce {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
return@collect channel.sendFair(value ?: NULL)
}
}

A coroutine is created using the produce coroutine builder. It provides you with a Channel that you could send values emitted from a Flow. This is what the asFairChannel method is doing.

  • A select expression is created to listen to the emission of both producers.
suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
first: Flow<T1>, second: Flow<T2>,
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {
coroutineScope {
val firstChannel = asFairChannel(first)
val secondChannel = asFairChannel(second)
var firstValue: Any? = null
var secondValue: Any? = null
var firstIsClosed = false
var secondIsClosed = false
}

For each channel we store it’s emission into a variable and maintain whether it’s closed with a flag.

suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
first: Flow<T1>, second: Flow<T2>,
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {
coroutineScope {
val firstChannel = asFairChannel(first)
val secondChannel = asFairChannel(second)
var firstValue: Any? = null
var secondValue: Any? = null
var firstIsClosed = false
var secondIsClosed = false
while (!firstIsClosed || !secondIsClosed) {
select<Unit> {
...
}
}
}
}

Why is the select expression above in a while loop?

So, let’s recall the simple example we looked at for a select expression. You will find that it completes execution when one of the Channels you are listening to emits a value. Depending on the rate of the emissions from the Channels, you may get value from either the first or second Channel in any order. Therefore, we have variables firstValue and secondValue to store the emitted value. Since we need the first emission from both Channels, the select expression is run until either Channel is closed to ensure we read from both Channels.

  • When items are received from both producers, they are combined with the transformation that you specify.
while (!firstIsClosed || !secondIsClosed) {
select<Unit> {
onReceive(...) { value ->
firstValue = value
if (secondValue !== null) {
transform(...)
}
}

onReceive(...) { value ->
secondValue = value
if (firstValue !== null) {
transform(...)
}
}
}
}
}
fun SelectBuilder<Unit>.onReceive(
...,
channel: ReceiveChannel<Any>,
...,
onReceive: suspend (value: Any) -> Unit
) {
if (isClosed) return
channel.onReceiveOrNull {
if (it === null) onClosed()
else onReceive(it)
}
}

How are values read in from the Channels in the select expression?

onReceiveOrNull is a select clause on Channel. It is being used to read the value emission from the channel above in the onReceive. If there are no more emissions from the Flow, the channel is closed. The emitted values are stored in the variables firstValue and secondValue. These values are used to apply the transformation you had specified.

  • Finally, the transformed value is given back to you in a new Flow that you could collect from.
fun <T1, T2, R> Flow<T1>.combine(
flow: Flow<T2>,
transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
combineTransformInternal(this@combine, flow) { a, b ->
emit(transform(a, b))
}
}

As you could see in the code snippet above, the combine operator calling the internal method I showed above. It creates a new Flow and emits the transformed values.

This is how the combine operator uses select expressions internally. Although they are experimental, when you look under the hood, they are used in many places. Furthermore, the combine operator is very useful in many use cases such as implementing an MVI architecture. Understanding how combine works internally is valuable when debugging and testing.

List of supported select methods

These are APIs from coroutines that are supported for select expression clauses.

You can visit official documentation for exploring more available APIs around the select expression.

Note: This is experimental API so remember that the design of the corresponding declarations has open issues which may (or may not) lead to their changes in the future.

To read full detail related to Select expression, you can check below link.

Thanks for reading…

--

--

Abhishek Srivastava

Senior Software Engineer | Android | Java | Kotlin | Xamarin Native Android | Flutter | Go