How To Combine Flows In Kotlin

Coroutines provides combine, zip and flattenMerge operators is used to combine emissions from multiple flows

Combine

Combine operator takes the latest emission from two flows and gives result

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
   // gives 1..3 in every 300ms 
    val f1 = flowOf(1,2,3).onEach { delay(300) } 
    val f2 = flowOf("x", "y", "z").onEach { delay(400) }
    val startTime = System.currentTimeMillis() 
    f1.combine(f2) { num, str -> "$num -> $str" }
    .collect { result ->
        println("$result at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/**
1 -> x at 424 ms from start
2 -> x at 627 ms from start
2 -> y at 826 ms from start
3 -> y at 927 ms from start
3 -> z at 1226 ms from start
*/

Zip -

Let's take an example as above. Each time emission occurs, zip operators waits for emission from other flow , when it occurs zip provide results in lambda expression as numbers and letters

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val f1 = flowOf(1,2,3).onEach { delay(300) } 
    val f2 = flowOf("x", "y", "z").onEach { delay(400) }
    val startTime = System.currentTimeMillis() 
    f1.zip(f2) { num, str -> "$num -> $str" }
    .collect { result ->
        println("$result at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/*
1 -> x at 437 ms from start
2 -> y at 837 ms from start
3 -> z at 1239 ms from start
*/

it will stop execution when one of the flow is completed

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit>{
    val f1 = (1..4).asFlow()
    val f2 = flowOf("Hi", "Hello", )
    f1.zip(f2){ a,b -> "$a -> $b"}
    .collect{
        println(it)
    }
}
/*
1 -> Hi
2 -> Hello
*/

flattenMerge

It executes them as single flow ,it doesn't combines , it will not stop execution when one of the flow is completed (But zip operator does)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val f1 = flowOf(1,2,3,4).onEach{ delay(200)}
    val f2 = flowOf("H","O","L","A").onEach{ delay(400)}
     val startTime = System.currentTimeMillis() 
    flowOf(f1,f2).flattenMerge().collect { 
    println("$it at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/**
 * 1 at 238 ms from start
H at 438 ms from start
2 at 438 ms from start
3 at 639 ms from start
O at 838 ms from start
4 at 839 ms from start
L at 1239 ms from start
A at 1640 ms from start
 */

Did you find this article valuable?

Support Sanjay Prajapat's Blog by becoming a sponsor. Any amount is appreciated!