To work with multiple items of the same type, Kotlin offers various collections: they store the complete set of elements that are all kept in the memory at the same time. To generate items dynamically one-by-one, we can use a sequence – it can save memory when we only need one item at a time. However, the sequence generator function cannot be suspending, so if getting an item takes a lot of time, it may become a problem. Getting a potentially unlimited series of items in a suspending way is a common task: for example, fetching new messages from a chat API or scanning all folders on a disk and calculating the size of each one. For these tasks, we can use flows. Let's see how they work!
Flow basics
Consider the example with folders – we'll first need to define a sequence of results:
val folderSizes = sequence { // sequence builder lambda begins
for (folder in folderNames) {
val size = calculateSize(folder) // long running operation that can block a thread
yield(size)
}
}
Then we need to use it in the following way: folderSizes.forEach { doSomethingWith(it) }.
To allow this code to run a suspending function instead, we can use a very similar builder – flow:
val folderSizes = flow { // flow builder lambda begins
for (folder in folderNames) {
val size = calculateSize(folder) // now it can be a suspending function
emit(size)
}
}
Collecting data is very similar, too: folderSizes.collect { doSomethingWith(it) }.
A flow can also be constructed from an existing collection, which may be helpful if we already know the result (e.g., when the folder size is pre-cached) or want to have dummy values (e.g., in special cases or tests). Collection-like objects like channels, sequences, and ranges can be converted to flows, too.
flowOf(1, 2, 3, 4) // flow of known values
listOf(1, 2, 3, 4).asFlow() // list converted to flow
(1..4).asFlow() // range converted to flow
Just like with a sequence, the flow builder returns an object right away – it won't execute the builder's lambda until a terminal operation is called on it. forEach is a terminal operation for a sequence, and collect — a terminal operation for a flow: it starts the execution and runs until the last item is emitted or an exception is thrown.
Before collect is called, the flow object is "cold": it doesn't do any computation and can be safely passed as a parameter anywhere. That's why the flow builder is not a suspending function, while collect is.
val numbersFlow = flowOf(1, 2, 3, 4) // create flow, the object is still cold
numbersFlow.collect { print(it) } // collect flow, it will print 1234
numbersFlow.collect { print(it) } // collect same flow, it will print 1234 again
// can repeat as many times as needed with the same resultFlow transformations
Similar to sequences, we can transform an upstream flow object and get a new downstream flow object by applying intermediate operators.
To convert each element into another one, use .map:
val anotherFlow = flowOf(1, 2, 3, 4) // creates flow that emits 1, 2, 3 ,4
.map { it * 2 } // creates new flow that emits 2, 4, 6, 8
To filter the elements, use .filter:
val anotherFlow = flowOf(1, 2, 3, 4)
.filter { it % 2 == 0 } // emits 2, 4
There are other transformations available in the standard library, like take — to limit the number of emitted items, zip to stitch two flows together, and many more, including transform, which allows you to write your own custom transformations.
flowOf(1, 2, 3, 4)
.transform { value ->
if (value % 2 == 0) { // Emit only even values, but twice
emit(value)
emit(value)
} // Do nothing if odd
} // emits 2, 2, 4, 4
Multiple transformations can be applied on top of each other:
flowOf(1, 2, 3, 4)
.take(3)
.filter { it > 1 }
.map { it * 2} // emits 4, 6Terminal operators
There are multiple terminal operations, too. collect is the most generic one: it can be called with a lambda to be executed for each element or without parameters to simply let the flow run for its side effect:
flow.collect { println(it) } // prints each element that flow emits
flow.collect() // makes the flow emit same elements but ignores them
Moreover, we can collect data from the same flow as many times as we want, and it will run the same operations each time:
folderSizes.collect { doSomethingWith(it) } // check folder sizes once
// ...
folderSizes.collect { doSomethingWith(it) } // and then check if something changes
If we need to aggregate the emitted data into a collection, we can use toList and toSet. Note that these operations will not return until the last item is emitted.
To do the opposite, use .single. It will wait for the very first element and return it immediately.
And to combine all items into a single aggregate value, there are .fold and .reduce:
flowOf(1, 2, 3, 4).reduce { a, b -> a + b } // sum all and return 10
All aforementioned terminal operations are suspending because the flow itself may contain suspending code that emits elements (but doesn't have to), so they have to be called from a coroutine. If you need a new coroutine only to collect data from a flow, there is a handy launchIn operator, which launches a new coroutine in a given scope and runs .collect() inside.
flow.launchIn(scope) // same as: scope.launch { flow.collect() }Conclusion
A flow is an object that can generate a sequence of data by running a (potentially) suspending function. The flow object is "cold" and doesn't do anything before a terminal operation is called on it. Transformation can be applied on a flow to modify the data it emits; each transformation creates a new "cold" flow object. Terminal operations are suspending functions that trigger flow execution one or multiple times.