Computer scienceProgramming languagesKotlinConcurrency and parallelismConcurrency

SharedFlow

15 minutes read

In previous topics, we explored the handling of asynchronous and reactive programming using Kotlin Flows. Additionally, we discussed how to manage shared reactive states. Now, it's time to apply these concepts to the asynchronous and reactive handling of data streams using SharedFlows.

SharedFlow and MutableSharedFlow

A SharedFlow is a hot flow that broadcasts emitted values to all its collectors, ensuring that every collector receives all the emitted values. The term "hot" indicates that an active instance of a SharedFlow exists independently of the presence of collectors. This contrasts with a regular Flow, defined by the flow { ... } function, which is considered cold and starts anew for each collector.

A SharedFlow supports multiple collectors and can emit values independently of them, allowing multiple collectors to receive the same values from the flow. It is particularly useful for broadcasting a value to multiple collectors or when you need multiple subscribers to the same data stream. Unlike a regular Flow, a SharedFlow does not have an initial value. Additionally, its replay cache can be configured to store a specific number of previously emitted values for new collectors.

To create a mutable shared flow, you can use the MutableSharedFlow(...) constructor. The state of a mutable shared flow can be updated by emitting values to it and performing other operations.

SharedFlow is particularly useful for broadcasting events within an application to subscribers who may join or leave at any time. For instance, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers have received the emitted event.

Use SharedFlow when you need to broadcast values to multiple collectors or when you want to have multiple subscribers to the same stream of data.

Here, we have an example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow

// Event Bus
class EventBus(private val coroutineScope: CoroutineScope) {
    private val eventFlow = MutableSharedFlow<String>()

    // Make postEvent a suspending function
    suspend fun postEvent(event: String) {
        // Emit the event to all subscribers
        eventFlow.emit(event)
    }

    fun subscribe(): SharedFlow<String> {
        // Return the shared flow for collecting events
        return eventFlow.asSharedFlow()
    }
}

// Client
class Client(private val name: String, private val coroutineScope: CoroutineScope) {
    fun startListening(eventBus: EventBus) {
        // Collect events from the event bus
        coroutineScope.launch {
            eventBus.subscribe().collect { event ->
                println("$name received event: $event")
            }
        }
    }
}

suspend fun main() = coroutineScope {
    val eventBus = EventBus(this)

    val client1 = Client("Client 1", this)
    val client2 = Client("Client 2", this)

    // Start listening for events
    client1.startListening(eventBus)
    client2.startListening(eventBus)

    // Post some events
    launch {
        eventBus.postEvent("Event 1")
        eventBus.postEvent("Event 2")
        eventBus.postEvent("Event 3")
    }

    // We delay the coroutine
    delay(1000)
}
Client 1 received event: Event 1
Client 2 received event: Event 1
Client 1 received event: Event 2
Client 2 received event: Event 2
Client 1 received event: Event 3
Client 2 received event: Event 3
  • EventBus is a class that encapsulates the logic for an event bus. It contains a MutableSharedFlow<String>, which is used to emit events to subscribers. SharedFlow is a type of flow that supports multiple collectors and is designed for scenarios where you want to emit values to an unknown number of subscribers.

  • The postEvent function is a suspending function that takes an event as a string and emits it to the eventFlow. It needs to be a suspending function because emit is a suspending operation that may suspend the coroutine if the buffer is full and it is waiting for collectors to retrieve some values.

  • The subscribe function returns a SharedFlow<String> that subscribers can collect from. The asSharedFlow method is used to prevent external mutation of the MutableSharedFlow by providing a read-only SharedFlow view.

  • Client represents a subscriber that listens for events from the EventBus. It has a startListening function that starts a new coroutine within the provided coroutineScope. Inside the coroutine, it subscribes to the event bus and collects events, printing them to the console along with the client's name.

  • The main function is defined as a suspending function using coroutineScope, which creates a scope for coroutines launched within the function. Inside the main function, an EventBus instance is created with the current coroutine scope.

  • Two Client instances are created, each with a unique name, using the same coroutine scope.

  • Both clients start listening for events from the EventBus.

  • A new coroutine is launched within the same scope to post three events to the event bus.

  • The delay(1000) function is used to keep the program running for a short time, ensuring that there is enough time for the events to be emitted and collected. Without this delay, the program would terminate immediately, potentially before any events are collected.

This code demonstrates a simple publish-subscribe (pub-sub) pattern, where EventBus allows multiple Client instances to subscribe and react to events asynchronously.

Replay cache and buffer

For a SharedFlow, you can configure both the buffer capacity and the replay capacity. To prevent backpressure issues, select an appropriate buffer capacity. The replay capacity should be set based on the specific requirements of your use case.

A SharedFlow maintains a replay cache that stores a certain number of the most recent values. Every new subscriber initially receives the values from the replay cache before receiving newly emitted values. The maximum size of the replay cache is defined at the creation of the SharedFlow using the replay parameter. You can access a snapshot of the current replay cache via the replayCache property, and it can be cleared using the MutableSharedFlow.resetReplayCache() function.

The replay cache also acts as a buffer for emissions to the SharedFlow, enabling slower subscribers to receive values from the buffer without causing the emitters to suspend. The buffer space determines the extent to which slower subscribers can fall behind faster ones. When creating a SharedFlow, you can reserve additional buffer capacity beyond the replay cache using the extraBufferCapacity parameter.

A SharedFlow with a buffer can be configured to prevent emitters from suspending on buffer overflow by using the onBufferOverflow parameter. This parameter accepts values from the BufferOverflow enum. When a strategy other than SUSPENDED is used, emissions to the SharedFlow do not cause suspension.

A strategy for buffer overflow handling controls what is sacrificed when the buffer overflows:

  • SUSPEND: The upstream emitter a value is suspended when the buffer is full.

  • DROP_OLDEST: The oldest value in the buffer is dropped on overflow, the new value is added to the buffer, and the emitter is not suspended.

  • DROP_LATEST: The latest value being added to the buffer is dropped on overflow (keeping the buffer contents unchanged), and the emitter is not suspended.

Buffer overflow can only occur when there is at least one subscriber that is not ready to accept a new value. Without any subscribers, only the most recent replay values are stored, and buffer overflow behavior is not triggered, making it ineffective. In particular, without subscribers, the emitter never suspends, even with the BufferOverflow.SUSPEND option, and the BufferOverflow.DROP_LATEST option is also ineffective. Essentially, in the absence of subscribers, the behavior is always akin to BufferOverflow.DROP_OLDEST, but the buffer is limited to the size of the replay cache (without any extraBufferCapacity).

Example 1. SharedFlow with No Replay and No Extra Buffer

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Define an EventBus with no replay and no extra buffer
class EventBus {
    private val eventFlow = MutableSharedFlow<String>(replay = 0)

    suspend fun postEvent(event: String) {
        eventFlow.emit(event)
    }

    fun subscribe(): SharedFlow<String> = eventFlow.asSharedFlow()
}

fun main() = runBlocking {
    val eventBus = EventBus()

    // Subscribe to the EventBus
    eventBus.subscribe().collect { event ->
        println("Subscriber received: $event")
    }

    // Post events
    launch {
        eventBus.postEvent("Event 1")
        eventBus.postEvent("Event 2")
        eventBus.postEvent("Event 3")
    }
}
Subscriber received: Event 1
Subscriber received: Event 2
Subscriber received: Event 3

Example 2. SharedFlow with Replay Capacity

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Define an EventBus with a replay capacity of 2
class EventBus {
    private val eventFlow = MutableSharedFlow<String>(replay = 2)

    suspend fun postEvent(event: String) {
        eventFlow.emit(event)
    }

    fun subscribe(): SharedFlow<String> = eventFlow.asSharedFlow()
}

fun main() = runBlocking {
    val eventBus = EventBus()

    // Post events before subscription
    launch {
        eventBus.postEvent("Event 1")
        eventBus.postEvent("Event 2")
        eventBus.postEvent("Event 3")
    }

    // Delay to ensure events are posted before subscribing
    delay(100)

    // Subscribe to the EventBus
    eventBus.subscribe().collect { event ->
        println("Subscriber received: $event")
    }
}
Subscriber received: Event 2
Subscriber received: Event 3

Example 3. SharedFlow with Buffer Capacity

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Define an EventBus with a buffer capacity of 2
class EventBus {
    private val eventFlow = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 2)

    suspend fun postEvent(event: String) {
        eventFlow.emit(event)
    }

    fun subscribe(): SharedFlow<String> = eventFlow.asSharedFlow()
}

fun main() = runBlocking {
    val eventBus = EventBus()

    // Subscribe to the EventBus
    val job = launch {
        eventBus.subscribe().collect { event ->
            println("Subscriber received: $event")
            delay(500) // Simulate processing delay
        }
    }

    // Post events
    launch {
        eventBus.postEvent("Event 1")
        eventBus.postEvent("Event 2")
        eventBus.postEvent("Event 3")
        eventBus.postEvent("Event 4")
    }

    // Wait for a while before canceling the subscriber to see some output
    delay(2000)
    job.cancel()
}
Subscriber received: Event 1
Subscriber received: Event 2
Subscriber received: Event 3
Subscriber received: Event 4

Example 4. BufferOverflow.SUSPEND option

Using BufferOverflow.SUSPEND will suspend the producer when the buffer is full until space becomes available. Here's an example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 0,
        onBufferOverflow = BufferOverflow.SUSPEND
    )

    launch {
        repeat(10) { i ->
            println("Emitting $i")
            sharedFlow.emit(i) // This will suspend when the buffer is full
            delay(100) // Just to simulate some work
        }
    }

    delay(500) // Wait for some time before collecting

    sharedFlow.collect { value ->
        println("Collected $value")
    }
}

In this example, the producer will start emitting values immediately but will suspend after the first emission because there are no subscribers yet and the buffer size is zero. Once the collector starts, the producer will resume emitting.

Example 6. BufferOverflow.DROP_LATTEST option

Using BufferOverflow.DROP_LATEST will drop the most recent value when the buffer is full. Here's an example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 1,
        extraBufferCapacity = 1, // Set extra buffer capacity
        onBufferOverflow = BufferOverflow.DROP_LATEST
    )

    launch {
        repeat(10) { i ->
            println("Emitting $i")
            sharedFlow.emit(i) // Will drop the latest value if buffer is full
            delay(100) // Just to simulate some work
        }
    }

    delay(500) // Wait for some time before collecting

    sharedFlow.collect { value ->
        println("Collected $value")
    }
}

In this scenario, the sharedFlow has a replay buffer of 1 and an extra buffer capacity of 1. If the producer emits values faster than the collector can collect them, the most recent value will be dropped if the buffer is full. However, due to the replay buffer, the collector will always receive the last emitted value.

Remember that the behavior of BufferOverflow.DROP_LATEST and BufferOverflow.SUSPEND may not be very effective without subscribers, as the buffer is limited to the size of the replay cache and does not have any extraBufferCapacity. In the absence of subscribers, BufferOverflow.DROP_LATEST behaves like BufferOverflow.DROP_OLDEST because there's no buffer space to hold emitted values.

SharedFlow vs StateFlow

StateFlow is a specialized, high-performance, and efficient implementation of SharedFlow designed for the common yet specific use case of state sharing.

A StateFlow always carries an initial value, replays the most recent value to new subscribers, does not buffer additional values beyond the last emitted one, and lacks support for resetReplayCache. A StateFlow behaves similarly to a SharedFlow with specific parameters and when the distinctUntilChanged operator is applied. Use SharedFlow when you need a StateFlow with modified behavior, such as extra buffering, replaying more values, or omitting the initial value.

// MutableStateFlow(initialValue) is equivalent to a shared flow with these parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // Emit the initial value
val state = shared.distinctUntilChanged() // Achieve StateFlow-like behavior

SharedFlow vs BroadcastChannel

Conceptually, SharedFlow is a replacement for BroadcastChannel and is intended to fully supersede it. The key distinctions include:

  1. SharedFlow is simpler in design, as it does not need to implement the comprehensive Channel APIs, enabling a more straightforward and efficient implementation.

  2. SharedFlow offers configurable replay capabilities and strategies for handling buffer overflow.

  3. SharedFlow provides a clear division between the read-only SharedFlow interface and its mutable counterpart, MutableSharedFlow.

  4. Unlike BroadcastChannel, SharedFlow cannot be closed and does not inherently signal failure. Any errors or completion indications must be explicitly handled externally when necessary.

To transition from BroadcastChannel to SharedFlow, begin by replacing instances of the BroadcastChannel(capacity) constructor with MutableSharedFlow(replay = 0, extraBufferCapacity = capacity), noting that BroadcastChannel does not replay values to new subscribers. Replace send and trySend with emit and tryEmit, respectively, and adapt subscriber code to utilize flow operators.

Conclusion

Throughout this topic, we've gained a comprehensive understanding of asynchrony and reactivity within the Kotlin ecosystem. Our journey has led us to explore managing shared reactive states, a cornerstone for developing responsive and resilient applications. By utilizing SharedFlows, we can handle data streams in a way that is both asynchronous and reactive, allowing for a more efficient and effective approach to data management. This knowledge is a vital tool in our toolkit for building modern, interactive, and highly scalable applications.

Now, let's put what we've learned into practice with some tasks. Let's get started!

2 learners liked this piece of theory. 3 didn't like it. What about you?
Report a typo