12 minutes read

Flows make it easy to write simple reactive code — code that declares an action to take when an event happens or new data become available. However, can flows deal with events happening in different threads? Or, can they handle not only the valid data but also errors? In other words, are they ready for real-world applications? Yes, they are! Let's figure out the details.

Context and dispatchers

It's a common pattern to load data in batches and process them as they are ready; in such a case, loading usually happens in the IO thread pool. Task assigned to this pool are mostly sleeping – waiting for the next portion of data to arrive from network or to be loaded from a hard drive. Processing, on the other hand, usually happens in a different thread pool, which is designed to optimize for heavy CPU use but not to interfere with the UI thread (in case it's an application with a UI). The only way to change the execution context of the emitter is through the flowOn operator. Here is how it works:

// helper function to visualize current thread
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {
    flow {
        log("Emitting") // this code will flow in context of IO dispatcher
        emit("some value")
    }
    .map { log("Processing $it"); it }
    .flowOn(Dispatchers.IO)   // everything below will stay in parent context
    .collect { value ->
        log("Collected $value")
    }
}

It will print something like this:

[DefaultDispatcher-worker-1] Emitting
[DefaultDispatcher-worker-1] Processing some value
[main] Collected some value

The flowOn operator changes the context of the upstream flow, but it has no effect on the downstream. This is an important property because it allows to fully hide the context of a flow from the consumer when needed. For example, our operation can only execute in an IO pool, and we don't want anyone to break it accidentally.

// fully encapsulated flow
val ioFlow = flow {
    log("Emitting") // this code will keep flowing in IO dispatcher
    emit("some value")    //
}.flowOn(Dispatchers.IO)

fun main() = runBlocking<Unit> {
    ioFlow
    	.map { log("Processing $it"); it } // this code will run with Default dispatcher
        .flowOn(newSingleThreadContext("1CPU")) // change upstream again up to previous flowOn
        .collect { value ->                // still collect in the parent context
            log("Collected $value")
        }
}

It prints:

[DefaultDispatcher-worker-1] Emitting
[1CPU] Processing some value
[main] Collected some value

In this example, ioFlow could have been passed as a parameter, and even if we know nothing about it, we can be sure our processing will flow in the context we've specified. This property is called context preservation, and it makes reasoning about the execution context of our transformations trivial. Good news is that if we try to emit from a different context by mistake in the flow builder, it will not allow it: try emitting from within the withContext block inside the flow builder and see what happens.

Error catching

An error can occur at different stages of the flow: an emitter may fail, or any of the operators after it, but just like with context, anything that happens upstream can be caught downstream with the .catch operator even if we don't know the internals of the upstream flow.

val flow = flow { emit("Value") }

fun main() = runBlocking {
    try {
        flow
            .map { println("Passed $it"); it }
            .map {
                throw Exception("Exception in Map-1")
                it
            }
            .catch { println("Catch 1: $it") } // Catch everything upstream
            .map {
                throw Exception("Exception in Map-2")
                it
            }
            .catch { println("Catch 2: $it") } // Catch everything upstream, past previous catch
            .collect {
                println("Collected $it")
            }
    } catch (e: Exception) {      // Catch all uncaught exceptions in the flow (past last .catch)
        println("Uncaught $e")
    }
}

You may be surprised at first to know that this code prints:

Passed Value
Catch 1: java.lang.Exception: Exception in Map-1

However, that's expected: when an exception occurs, the flow has to stop because it cannot proceed further. By default, the .catch operator simply allows us to stop the exception from propagating further, just like the regular catch in a try/catch block. And everything that is not caught by it is propagated further up the call stack. You can emit a value from the body of .catch, but it won't resume the failed flow.

Handling abnormal flow completion

Like in the regular try, there is an option to always execute code when the flow finishes, whether terminated by an exception or not: there is an .onCompletion operator for that. It has a nullable parameter that allows us to tell if the completion was abnormal.

flow
    .map { println("Passed $it"); it }
    .onCompletion { exc ->
        println("Completed 1, Exception: $exc")
    }
    .collect {
        println("Collected $it")
    }

It prints:

Passed Value
Collected Value
Completed 1, Exception: null

Here, null means there was no exception and the flow completed normally.

If it happens that onCompletion itself throws an exception while processing uncaught exceptions from the upstream, only one exception can propagate, and that will be the last one thrown – again, just like in the regular try/finally code.

Note that onCompletion is a special operator – it doesn't affect up- or downstream, but rather the while flow, so it can be placed anywhere. However, it's better to keep it organized.

flow {
    emit("Value")
}
.onCompletion {
    println("Completed") // will be executed last, after .map and .collect
}
.map { println("Processed $it"); it }
.collect {
    println("Collected $it")
}

The most important requirement is that flows must be transparent to exceptions: it is a violation to emit values in the flow { ... } builder from inside a try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the examples above.

flow {
    try {
        emit("Value")
    } catch (exc: Exception) { // intercept the exception that happens downstream
        println("Intercepted $exc")
    }
}
.collect {
    throw Exception()       // exception happens on collect
    println("Collected $it")
}
println("This code should not be executed. Where did the exception go?")

Code that violates the exception transparency property can be very confusing. Unlike context preservation, Kotlin can not check it for us, so it's important to always call emit directly.

Conclusion

All implementations of the Flow interface must adhere to two key properties: context preservation and exception transparency. These properties enable us to perform local reasoning about flows and modularize the code in a way that upstream flow emitters can be developed separately from downstream flow collectors. The user of a flow does not have to know the implementation details of the flow it uses. When these requirements are met, flows provide a safe way to execute code that can potentially fail, and they can do that in a specified thread, if needed.

13 learners liked this piece of theory. 2 didn't like it. What about you?
Report a typo