How To Combine Flows In Kotlin
Coroutines provides combine, zip and flattenMerge operators is used to combine emissions from multiple flows
Combine
Combine operator takes the latest emission from two flows and gives result
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// gives 1..3 in every 300ms
val f1 = flowOf(1,2,3).onEach { delay(300) }
val f2 = flowOf("x", "y", "z").onEach { delay(400) }
val startTime = System.currentTimeMillis()
f1.combine(f2) { num, str -> "$num -> $str" }
.collect { result ->
println("$result at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
1 -> x at 424 ms from start
2 -> x at 627 ms from start
2 -> y at 826 ms from start
3 -> y at 927 ms from start
3 -> z at 1226 ms from start
*/
Zip -
Let's take an example as above. Each time emission occurs, zip operators waits for emission from other flow , when it occurs zip provide results in lambda expression as numbers and letters
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val f1 = flowOf(1,2,3).onEach { delay(300) }
val f2 = flowOf("x", "y", "z").onEach { delay(400) }
val startTime = System.currentTimeMillis()
f1.zip(f2) { num, str -> "$num -> $str" }
.collect { result ->
println("$result at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/*
1 -> x at 437 ms from start
2 -> y at 837 ms from start
3 -> z at 1239 ms from start
*/
it will stop execution when one of the flow is completed
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit>{
val f1 = (1..4).asFlow()
val f2 = flowOf("Hi", "Hello", )
f1.zip(f2){ a,b -> "$a -> $b"}
.collect{
println(it)
}
}
/*
1 -> Hi
2 -> Hello
*/
flattenMerge
It executes them as single flow ,it doesn't combines , it will not stop execution when one of the flow is completed (But zip operator does)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val f1 = flowOf(1,2,3,4).onEach{ delay(200)}
val f2 = flowOf("H","O","L","A").onEach{ delay(400)}
val startTime = System.currentTimeMillis()
flowOf(f1,f2).flattenMerge().collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1 at 238 ms from start
H at 438 ms from start
2 at 438 ms from start
3 at 639 ms from start
O at 838 ms from start
4 at 839 ms from start
L at 1239 ms from start
A at 1640 ms from start
*/