More Kotlin (Racing Ambiguous Coroutines)
18 Mar 2023Shows basic examples where two or more suspended functions can be executed in parallel, the first result to complete successfully will be used and the rest will be cancelled, ending the nearest coroutine scope.
Useful for Happy Eyeballs or another fast fallback algorithm is required.
The main entry point suspend
begins with an implicit Default Dispatcher (backed by a shared pool of threads on JVM).
Wrapping with coroutineScope
will wait for child coroutines until completed or cancelled (when an exception is raised all children in the scope are cancelled).
Example 1 using select:
Use select
to wait for the result of multiple suspending functions simultaneously. If any fail, then select
produces the exception.
The awaitOn
is called when a deferred value is resolved then emits the result to the enclosing select
clause.
Call coroutineContext.cancelChildren()
once the select
has produced a result, all coroutines in the scope are cancelled.
The main
function will complete after the quickest task completes - in this case task1
.
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.select
import kotlin.system.measureTimeMillis
suspend fun task1(): String {
delay(1000)
return "Task 1 completed"
}
suspend fun task2(): String {
delay(12000)
return "Task 2 completed"
}
suspend fun task3(): String {
delay(13000)
return "Task 3 completed"
}
suspend fun main() = coroutineScope {
val tasks = listOf(::task3, ::task2, ::task1)
val ms = measureTimeMillis {
val first =
select {
tasks.forEach { task ->
async { task() }.onAwait { it }
}
}
coroutineContext.cancelChildren()
println(first) // "Task 1 completed"
}
println("in $ms milliseconds)")
}
Example 2 using Channels:
Where channelFlow
is a cold flow.
The first
operator returns the first value or exception emitted by the flow and then cancels flow’s collection.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.first
import kotlin.system.measureTimeMillis
suspend fun task1(): String {
delay(1000)
return "Task 1 completed"
}
suspend fun task2(): String {
delay(12000)
return "Task 2 completed"
}
suspend fun task3(): String {
delay(13000)
return "Task 3 completed"
}
suspend fun main() = coroutineScope {
val tasks = listOf(::task3, ::task2, ::task1)
val ms = measureTimeMillis {
val first =
channelFlow {
tasks.forEach { task ->
launch { send(task()) }
}
}.first()
println(first)
//"Task 1 completed"
}
println("in $ms milliseconds)")
}
Example 3 using Flows:
Similar to channelFlow
, concurrently merge
without limit on the number of simultaneously collected flows.
The first
operator returns the first element emitted by the flow and then cancels flow’s collection
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
suspend fun task1(): String {
delay(1000)
return "Task 1 completed"
}
suspend fun task2(): String {
delay(12000)
return "Task 2 completed"
}
suspend fun task3(): String {
delay(13000)
return "Task 3 completed"
}
@OptIn(FlowPreview::class)
suspend fun main() = coroutineScope {
val tasks = listOf(::task3, ::task2, ::task1)
val ms = measureTimeMillis {
val first = tasks.map { it.asFlow() }.merge().first()
println(first)
//"Task 1 completed"
}
println("in $ms milliseconds)")
}
Example 4 Exceptions:
Wrapping with supervisorScope
adds a SupervisorJob to the Coroutine Context,
allowing any Exception thrown by an async
job to be handled by the User.
The first job that fails, select
produces the corresponding exception as the result.
Unless cancelled, the remaining jobs will continue to run and prevent the coroutine scope from completing.
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.select
import kotlin.system.measureTimeMillis
suspend fun task1(): String {
delay(1000)
error("Task 1 failed")
}
suspend fun task2(): String {
delay(12000)
return "Task 2 completed"
}
suspend fun task3(): String {
delay(13000)
return "Task 3 completed"
}
suspend fun main() = supervisorScope {
val tasks = listOf(::task3, ::task2, ::task1)
val ms = measureTimeMillis {
try {
val first =
select {
tasks.forEach { task ->
async() { task() }.onAwait { it }
}
}
println(first)
} catch (e: Exception) {
println(e.message)
// "Task 1 failed"
} finally {
coroutineContext.cancelChildren()
}
}
println("in $ms milliseconds)")
}
Example 5 Happy Eyeballs:
Hostnames on the Internet often resolve to multiple IP addresses, each of which may have different performance and connectivity characteristics. Address families (IPv4 or IPv6) may be blocked, broken, or sub-optimal on a network, clients that attempt multiple connections in parallel have a chance of establishing a connection more quickly reducing the overall client delay.
Pseudocode for the Happy Eyeballs algorithm of racing connections returning the quickest resolved ip address for e.g. fastly.com. A real example would attempt to connect with a socket and close any sockets that are not used.
The ordering of ip addresses is expected to be interleaved by family type.
Each suspend function is decorated with a staggered 250ms connection delay using onEach by order of input. The first ip in the list is started without delay, subsequent ips are delayed (e.g. 250ms, 500ms …) before starting.
What is missing from this naive example - tasks should be staggered by a delay or run immediately if the previous task fails. The exception handling required is yet to be discovered as this is quite tricky. (see gist.github.com/griffio/073e5f440971e7e19dbc3e2011c9ec07 for a possible implementation)
Once again, the flows merge
concurrently starting after their respective delay, the first ip to “resolve” is returned
and the rest of the tasks are cancelled.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.system.measureTimeMillis
suspend fun ip4a(): String {
delay(1200)
return "151.101.1.57"
}
suspend fun ip6a(): String {
delay(1100)
return "2a04:4e42:400::313"
}
suspend fun ip4b(): String {
delay(100)
return "151.101.193.57"
}
suspend fun ip6b(): String {
delay(100)
return "2a04:4e42:600::313"
}
suspend fun main(): Unit = coroutineScope {
val ms = measureTimeMillis {
val fastly = listOf(::ip6a, ::ip4a, ::ip6b, ::ip4b)
val result = happyEyeballs(fastly, 250.milliseconds)
println(result)
// 2a04:4e42:600::313
}
println("in $ms milliseconds)")
}
@OptIn(FlowPreview::class)
suspend fun <T> happyEyeballs(tasks: List<suspend () -> T>, delayBy: Duration): T = coroutineScope {
val flows = tasks.mapIndexed { ix, it ->
it.asFlow().onEach {
delay(delayBy * ix)
}
}
flows.merge().first()
}