Introduction to Kotlin Flows(Flow,MutableSharedFlow and SharedFlow)

Abhishek Srivastava
13 min readDec 24, 2022

--

Kotlin flow is one of the types that can be emitted with multiple values sequentially, As opposed to suspend functions it returns only a single value. The flows are built on top of the coroutines and it can provide the multiple values. It is conceptually a stream of data's that can be computed asynchronously. Some entities it involved in data streams used with these entities like producer produces data. That can be allowed to passing the stream similarly consumer consumes the data values from the stream line and intermediaries modify each value emitted to the stream.

Kotlin Flow is a new stream processing API developed by JetBrains, the company behind the Kotlin language. It’s an implementation of the Reactive Stream specification, an initiative whose goal is to provide a standard for asynchronous stream processing. Jetbrains built Kotlin Flow on top of Kotlin Coroutines.

By using Flow to handle streams of values, you can transform data in complex multi-threaded ways, by writing just a small bit of code!

We will learn more about flow with below points:

  • Data collections and streams.
  • Synchronous and asynchronous API calls.
  • Hot and cold data streams.
  • Exception handling during flow processing.
  • Testing Kotlin Flow

Syntax

In kotlin language has many default keywords, variables, and functions for implementing the application. The flow is the builder function that creates the new flow where you can manually emit the new values into the stream of data using the built-in function.

class name{
val varname;
val vars2:Flow<List<type>>= flowname{
---some logic codes---
}
}

The above code is the basic syntax for to create and utilize the flow type in the kotlin codes. Using the flow type, the function creates a new flow, and it can be manually emit the new values into the stream of data using the built-in function like emit.

Why Do You Need Kotlin Flows?

To make an application responsive and boost its performance, asynchronous programming plays a very important role. To make applications, asynchronous RxJava can be used, which is a Java-based extension of ReactiveX.

In Kotlin, the same functionality is given in the form of Kotlin Flow API. Not only does it have all the operators and functionalities required, but it also supports suspending functions, which helps to perform asynchronous tasks in a sequential manner.

How does Flow work in Kotlin?

The kotlin flow is one of the types like thread mechanisms; it is mainly due to the flow of data that can safely make the network request for to produce the next value without blocking the main programming thread. It is basically followed through three types of entities that are involved in the streams of data.

We can create the flows using the flow builder APIs and the function called flow for to create manually and emit the new values into the data stream using emit function. Basically, the data source fetches the datas automatically at the fixed interval as the suspend function that cannot return multiple consecutive values creates and returns the flow to fulfill the requirement. Sometimes the data-source will act as the producer, so the flow builder is executed within the coroutine thus will be benefited from the same asynchronous APIs, but some restrictions should be applied.

Flows are sequential as the producer is in coroutine type; when calling the suspend function, the producer suspends until the suspend function returns the value. The producer suspends until the network requests are complete, so the result is emitted to the stream. Also, sometimes using the flow builder, the producer cannot emit the values from the different Coroutine contexts.

Or in other way we can understand from new points i.e. When data is requested from the server, and asynchronous programming is used to handle that data, then the Flow manages that data in the background thread asynchronously because there might be a chance that some process might run longer to fetch data. Once the data is received and collected in the collector, then it is displayed using the recycler view.

A Flow is a sequence of values that uses suspend functions to produce and consume values asynchronously.

A flow consists of three entities:

  • Producer
  • Intermediary
  • Consumer

Producer: A producer is used to emit the data, which is added to the stream.

Intermediary: It can modify the value which is emitted into the stream.

Consumer: A consumer receives the value from the stream.

Now that you have understood How Kotlin Flows work, let’s go ahead and create a Kotlin Flow.

Creating a Kotlin Flow

To create a Flow, first, you need to create a producer.

Producer

A producer is the one that is used to emit the data. So to create one, open the Main.kt, and instead of the main, write suspend fun (name of the function) main. It is named suspended because it will call other suspending functions, and only a suspending function can do that.

After that, you will create a flow builder to create a flow producer. The flow will emit integer values one by one from 0 to 9 and a delay of 2000 milliseconds or 2 seconds to create a long-running task.

The emit() will help in sending values to the consumer.

Before moving on to the Consumer, you will understand the cold stream and hot stream.

The Cold stream doesn’t produce any value until or unless the Consumer is there to collect them, whereas the Hot stream produces the values even if there is no consumer.

Now Flow is an example of a cold stream, so we will create a Consumer to collect the values.

Consumer

In Consumer, first, you will launch a coroutine because we have to use a flow builder function, which is the collect function that needs to be kept inside the coroutine. The collect is used to collect the values emitted by it.

Here, the delay is used because there can be a condition of backpressure. The backpressure occurs when the Consumer is not done consuming the data, and the Producer is producing it, which can result in data loss.

So to avoid such a case, you use delay() inside the collect function. The delay (3000) will add a delay of 3 seconds.

Now, you can run the main function and check for the displayed values. That is how the Flow is created.

Creating Your First Flow

First, let’s run our first flow program.

fun getSequence(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}

fun main() = runBlocking {
getSequence()
.collect { value ->
println(value)
}
}

The above code will output the following.

1
2
3

flow() is a flow builder and will create a Flow<T>. Moreover, a flow is a reactive stream, as we mentioned at the beginning, flow is inspired by ReactiveX.

emit() will send a result to the flow. We know that flow can return multiple values, so you can call emit() multiple times to send multiple values. As in the example, it calls emit() three times, sending 1, 2, and 3 respectively.

collect() receives the value sent by emit(). Every time emit() is called to send a value, the block of collect() will be executed once to receive the value. Therefore, in the example, emit() is called three times, so the block of collect() will also be executed three times.

Flow Builders

fun <T> flow(
block: suspend FlowCollector<T>.() -> Unit
): Flow<T>

We introduced flow() at the beginning. It creates a cold flow. Cold flow means every time you call collect(), it will execute the block of flow(). Then, we use emit() in the block to send the value to upstream.

The following code shows that the block of flow() is executed after calling collect().

fun getSequence(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emit $i")
emit(i)
}
}

fun main() = runBlocking {
val f = getSequence()
println("Start to collect")
f.collect { value ->
println("Collected $value")
}
}

Its output is below

Start to collect
Emit 1
Collected 1
Emit 2
Collected 2
Emit 3
Collected 3

asFlow()

@FlowPreview fun <T> BroadcastChannel<T>.asFlow(): Flow<T>

asFlow() will create a hot flow. Hot flow means that the data is ready to be in the stream when the flow is created; while cold flow is when collect() is called, the flow() block will be executed, and emit() is called in the block to transfer the data to the stream. Therefore, hot flow is like a hot data source.

The following code shows how to create a flow with asFlow().

fun main() = runBlocking {
val f = (1..3).asFlow()
println("1. Start to collect")
f.collect { value ->
println("Collected $value")
}
println("2. Start to collect")
f.collect { value ->
println("Collected $value")
}
}

Its output is below.

1. Start to collect
Collected 1
Collected 2
Collected 3
2. Start to collect
Collected 1
Collected 2
Collected 3

for read full detail for Flow Builder, You can click here.

What is MutableSharedFlow?

The MutableSharedFlow() factory function has an optional replay parameter. This indicates how many objects should be cached by the flow and delivered to late subscribers. The default is 0, so nothing gets cached this way.

Here, we have the same random-numbers SharedFlow from before, except that replay is set to 2. Also, the "B" subscriber delays its collect() call by one second. The result is that our MutableSharedFlow will emit five objects before "B" subscribes. Due to the replay value, though, "B" will immediately get the last two of those five objects. After that, "B" gets the same objects as "A" and at approximately the same time.

The output will be:-

...and we're off!
Collector A: 88
Collector A: 28
Collector A: 88
Collector A: 87
Collector A: 18
Collector B: 87
Collector B: 18
Collector A: 83
Collector B: 83
Collector A: 60
Collector B: 60
Collector A: 94
Collector B: 94
Collector A: 90
Collector B: 90
Collector A: 20
Collector B: 20

Full overview, You can read from here or read the official documentation page from here.

MutableSharedFlow and MutableStateFlow

It defines the corresponding constructor functions to create a hot flow that can be directly updated.

MutableStateFlow :

The mutable state holder flow emits the current and initial value to its collector. MutableStateFlow is a hot flow. With StateFlow data is a state.

Example:

We’ll create MutableStateFlow of UiState to update our UI to show progress while making API call and show upcoming shows once we have shows from the API call.

MutableSharedFlow :
The mutable shared flow that shares all emitted values among all its collector. With SharedFlow data is basically an event.

Example:

Here, we’ve MutableSharedFlow with replay 1 which means it will cache the number of items specified in replay for its future collector. In our example, both job1 and job2 received the last emitted item User(name=User 2).

What is SharedFlow?

SharedFlow is a type of Flow that shares itself between multiple collectors, so it is only materialized once for every subscriber. What else it can do?

  1. SharedFlow in contrast to a normal Flow is hot, every collector uses the same SharedFlow, because it is shared.
  2. SharedFlow has its buffer called replay cache. It keeps a specific number of the most recent values in it. Every new subscriber gets the values from the replay cache and then gets new emitted values. You can set the maximum size of the replay cache in replay parameter in the constructor. A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters. A SharedFlow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.
  3. If you use the default SharedFlow constructor of MutableSharedFlow then the replay cache won’t be created.

You can also transform a normal Flow into a SharedFlow using this extension method:

fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> (source)

StateFlow and SharedFlow

StateFlow

StateFlow represents a value that changes and you can listen to those changes as a flow. StateFlow is usually used to represent the state of something in your app, like say the text that should be shown in a TextView. StateFlow is similar to LiveData in android or Subjects in reactive streams.

SharedFlow

SharedFlow represents a stream of values and it can be listened to multiple times just like StateFlow. But it doesn’t really have a “current” value (it can have a buffer though). SharedFlow has an arbitrary size buffer that you can configure. SharedFlow is similar to Processor in reactive streams.

You can also read the google official documentation from here.

StateFlow is a SharedFlow with a couple other things:

  1. When creating a StateFlow you have to provide its initialState.
  2. You can access StateFlow’s current state by .value property, just like in LiveData.
  3. If you add a new collector in the meantime then it will automatically receive current state. Also, it won’t get any info about previous states, but only the new ones that will be emitted.

You can also transform a normal Flow into a StateFlow using this extension method.

fun <T> Flow<T>.stateIn(    
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> (source)

Why use it?

SharedFlow’s use is more general but it can be used for example to represent button touch events. Since since button touches don’t have a “current” value and are really just events in time, SharedFlow would be more appropriate. Another example is a stream of audio data going to a speaker.

Parameters

replay : This is number of events from the buffer the SharedFlow will emit to new subscribers.

extraBufferCapacity : This is the number of events in the buffer the SharedFlow will not emit to new subscribers.

The total buffer size is replay + extraBufferCapacity. Default is 0 for both. Typically you want one or the other but you can use both.

onBufferOverflow: represents what happens when the buffer is full and you try to use emit. There are three possible options for BufferOverflow:

  • SUSPEND: This will make emit suspend until there's space in the buffer. This is the default.
  • DROP_LATEST: This will drop the latest value in the buffer.
  • DROP_OLDEST: This will drop the oldest value in the buffer.

Since the default buffer is zero and overflow strategy is suspend, emit will always suspend with the default parameters.

Relationship with StateFlow

StateFlow is a SharedFlow but SharedFlow is more general than StateFlow. StateFlow is kind of like a SharedFlow with replay buffer size 1. But with StateFlow you can additionally read and write to that value easily with property accessor syntax like this:

stateFlow.value = "hello"
val text = stateFlow.value

Usage

You listen SharedFlow to the same way you do StateFlow albeit with some caveats when it comes to buffers. To send values to a SharedFlow you can use emit from a suspend function or the best effort tryEmit from a non suspend function.

Note that tryEmit will never emit a value with the default parameters since the buffer is zero. It will only really work properly in cases where you know the buffer is non zero and the overflow strategy is not SUSPEND.

A reasonable use case for shared flow is to send button presses to a view model like this:

class WeatherViewModel {
val refreshEvents = SharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
}
fun onButtonPress() {
refreshEvents.tryEmit(Unit)
}

In practice it’s probably better to use an intermediate callbackFlow rather than do it this way.

For read about more, Click here.

--

--

Abhishek Srivastava

Senior Software Engineer | Android | Java | Kotlin |Xamarin Native Android |Flutter |Go