Kotlin 是⼀⻔仅在标准库中提供最基本底层 API 以便各种其他库能够利⽤协程的语⾔。与许多其他具有类似功能的语⾔不同,async 与 await 在 Kotlin 中并不是关键字,甚⾄都不是标准库的⼀部分。此外,Kotlin 的 挂起函数概念为异步操作提供了⽐ future 与 promise 更安全、更不易出错的抽象。
kotlinx.coroutines 是由 JetBrains 开发的功能丰富的协程库。它包含本指南中涵盖的很多启⽤⾼级协程的原语,包括 launch 、async 等等。
一.基础:
本质上,协程是轻量级的线程。它们在某些 CoroutineScope 上下⽂中与 launch 协程构建器 ⼀起启动。这⾥我们在 GlobalScope 中启动了⼀个新的协程,这意味着新协程的⽣命周期只受整个应⽤程序的⽣命周期限制。
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后台启动⼀个新的协程并继续
delay(1000L) // ⾮阻塞的等待 1 秒钟
println("World!") // 在延迟后打印输出
}
// thread {
// Suspend function 'delay' should be called only
// from a coroutine or another suspend function
// delay(1000)
// }
println("Hello,") // 协程已在等待时主线程还在继续
Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
}
fun main() = runBlocking<Unit> {
GlobalScope.launch {
delay(1000)
println("World")
}
println("Hello,")
delay(2000)
}
等待一个作业:
延迟⼀段时间来等待另⼀个协程运⾏并不是⼀个好的选择。显式(以⾮阻塞⽅式)等待所启动的后台job执⾏结束。
fun main() = runBlocking<Unit> {
val job = GlobalScope.launch {
delay(1000)
println("World")
}
println("Hello,")
job.join()
}
结构化的并发:
在执⾏操作所在的指定作⽤域内启动协程,⽽不是像通常使⽤线程(线程总是全局的)那样在GlobalScope中启动。
fun main() = runBlocking<Unit> {
//启动launch协程,无需显式join。外部协程runBlocking直到其
//作用域中启动的所有协程都执行完毕后才会结束。
launch {
delay(1000)
println("World")
}
println("Hello,")
}
作用域构建器:
除了由不同的构建器提供协程作⽤域之外,还可以使⽤coroutineScope构建器声明自己的作用域。它会创建⼀个协程作⽤域并且在所有已启动⼦协程执⾏完毕之前不会结束。runBlocking与coroutineScope很类似,都是等待其协程体以及所有子协程结束。主要区别在于runBlocking方法会阻塞当前线程来等待,而coroutineScope只是挂起,会释放底层线程用于其他用途。runBlocking是常规函数,coroutineScope是挂起函数。
fun main() = runBlocking<Unit> {
launch {
delay(200)
println("Task from runBlocking")
}
coroutineScope {
launch {
delay(500)
println("Task from nested launch")
}
delay(100)
println("Task from coroutine scope")
}
println("Coroutine scope is over")
}
提取函数重构:
将launch{}内部的代码块提取到独立的函数中。提取函数时需加上suspend修饰符及成为挂起函数。
fun main() = runBlocking<Unit> {
launch {
doWorld()
println("Hello,")
}
}
suspend fun doWorld() { //挂函数
delay(1000)
println("World!")
}
全局协程像守护线程:
启动的活动协程并不会使进程保活。它们就像守护线程。
fun main() = runBlocking {
GlobalScope.launch {
//大于主线程中的1.3s,只会执行到2次,
// 第三次随着主线程中的结束而结束循环。
repeat(1000) { i ->
delay(500)
println("I'm sleeping $i")
}
}
delay(1300)
}
二.取消与超时:
取消协程的执行:
在⼀个⻓时间运⾏的应⽤程序中,也许需要对后台协程进⾏细粒度的控制
fun main() = runBlocking {
val job = launch {
repeat(10) { i ->
delay(500)
println("I'm sleeping $i")
}
}
delay(1300) //延迟一段时间
println("main: I`m tred of wating!")
//⼀旦 main 函数调⽤了 job.cancel ,我们在其它的协程中就看不到任何输出,因为它被取消了
job.cancel() //如cancel,主线程的1.3s过后job会结束执行
job.join() //如只join的话,主线程的1.3s过后job会在一直执行到结束
// job.cancelAndJoin()
println("main: Now I can quit!")
}
取消时协作的:
协程的取消是协作的。⼀段协程代码必须协作才能被取消。所有 kotlinx.coroutines 中的挂起函数都是可被取消的 。如果协程正在执⾏计算任务,并且没有检查取消的话,那么它是不能被取消的。
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { //直到打印5次后才结束
if (System.currentTimeMillis() >= nextPrintTime) {
println("Job: I`m sleeping ${i++} ...")
nextPrintTime += 1000
}
}
}
delay(1300)
println("main: I`m tred of wating!")
job.cancelAndJoin()
println("main: Now I can quit!")
}
使计算代码可取消:
有两种⽅法来使执⾏计算的代码可以被取消,第⼀种⽅法是定期调⽤挂起函数来检查取消。对于这种目的的yield是个好选择。
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { //isActive使计算可取消
if (System.currentTimeMillis() >= nextPrintTime) {
println("Job: I`m sleeping ${i++} ...")
nextPrintTime += 1000
}
}
}
delay(1300)
println("main: I`m tred of wating!")
job.cancelAndJoin()
println("main: Now I can quit!")
}
在finally中释放资源:
fun main() = runBlocking {
val job = launch {
try {
repeat(10) { i ->
delay(500)
println("Job: I`m sleeping $i ...")
}
} catch (e: Exception) {
//在被取消的协程中 CancellationException 被认为
//是协程执⾏结束的正常原因
//会在跟随着主线程结束而抛出CancellationException异常
println("Exception:" + e)
} finally {
println("job: I'm running finally")
}
}
delay(1300) //延迟一段时间
println("main: I`m tred of wating!")
job.cancelAndJoin()
println("main: Now I can quit!")
}
运行不能取消的代码块:
任何尝试在 finally 块中调⽤挂起函数的⾏为都会抛出CancellationException,因为这里持续运行的代码是可以被取消的。通常,这并不是一个问题,所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。当需要挂起一个被取消的协程,可以把代码包装在withConext(NonCancellable){}中。
fun main() = runBlocking {
val job = launch {
try {
repeat(10) { i ->
delay(500)
println("Job: I`m sleeping $i ...")
}
} catch (e: Exception) {
println("Exception:" + e)
} finally {
//包装挂起函数,避免抛出异常
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000) //包装withContext中能执行挂起函数
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300) //延迟一段时间
println("main: I`m tred of wating!")
job.cancelAndJoin()
println("main: Now I can quit!")
}
超时:
绝⼤多数取消⼀个协程的理由是它有可能超时。
fun main() = runBlocking {
//使用withTimeout会使超时抛出异常
withTimeout(1300) {
repeat(10) { i ->
delay(500)
println("Job: I`m sleeping $i ...")
}
}
}
fun main() = runBlocking {
//使用withTimeoutOrNull不会使超时抛出异常,超时会返回null
val result = withTimeoutOrNull(1300) {
repeat(10) { i ->
delay(500)
println("Job: I`m sleeping $i ...")
}
"Done"
}
println("Result is $result")
}
超时和资源共享的异步:
在withtTmeout时异步的,要特别关注的是此代码块内的资源释放及流关闭是否成功的问题。
var acquired = 0
class Resource {
init {
acquired++
}
fun close() {
acquired--
}
}
fun main() = runBlocking {
repeat(100_0000) { i ->
launch {
val resource = withTimeout(60) {
delay(59) //withtTmeout是异步的超时后,导致内存泄漏acquired不一定为0
Resource()
}
resource.close()
}
}
//非0,有内存泄漏
//0代表资源都释放了,没有内存泄漏
println(acquired)
}
fun main() = runBlocking {
repeat(100_0000) { i ->
launch {
val resource: Resource? = null
try {
withTimeout(60) {
delay(59) //withtTmeout是异步的超时后,导致内存泄漏acquired不一定为0
Resource()
}
} finally {
resource?.close() //在finally中确保资源的释放
}
}
}
println(acquired)
}
三.组合挂起函数:
默认顺序调用:
使⽤普通的顺序来进⾏调⽤,因为这些代码是运⾏在协程中的,只要像常规的代码⼀样顺序 都是默认的。
suspend fun doSomethingUsefullOne(): Int {
delay(1000)
return 13
}
suspend fun doSomethingUsefullTwo(): Int {
delay(1000)
return 42
}
fun main() = runBlocking {
//顺序调用并计算时间
val time = measureTimeMillis {
val one = doSomethingUsefullOne()
val two = doSomethingUsefullTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
使用async并发:
在概念上,async类似于launch。启动了一个单独的协程,这是一个轻量级的线程并与其他所有的协程一起并发工作。不同之处在于launch返回一个job并不附带任何的结果值,而async返回一个Deferred——一个轻量级的非阻塞future,这代表一个将会稍后提供结果的promise。可以使⽤ .await() 在⼀个延期的值上得到它的最终结果,但是 Deferred 也是⼀个 Job ,所以如果需要的话,你可以取消它。
fun main() = runBlocking {
//异步调用并计算时间
val time = measureTimeMillis {
val one = async { doSomethingUsefullOne() }
val two = async { doSomethingUsefullTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
惰性启动的async:
可以通过将start参数设置为CoroutineStart.LAZY而变成惰性的。在这种模式下,只有结果通过await获取的时候协程才会启动,或者在Job的start函数调用。
fun main() = runBlocking {
val time = measureTimeMillis {
//这⾥定义的两个协程没有执⾏
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefullOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefullTwo() }
//调用start开始执行
one.start()
two.start()
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
async风格函数:
用显示的GlobalScope引用,并不是suspend挂起函数。可以在任何地方使用。然而调⽤它们的代码中意味着异步。
fun main() {
fun oneAsync() = GlobalScope.async {
doSomethingUsefullOne()
}
fun twoAsync() = GlobalScope.async {
doSomethingUsefullTwo()
}
val time = measureTimeMillis {
val one = oneAsync()
val two = twoAsync()
runBlocking{
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
使用async的结构化并发:
fun main() = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch (e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // 模拟⼀个⻓时间的运算
42
} finally {
println("First child was cancelled")
}
}
//子协程失败,则在coroutineScope作用域内的子协程
// 及failedConcurrentSum的父协程都会被取消
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
四.协程上下文与调度器:
协程总是运⾏在⼀些以coroutineConext类型为代表的上下⽂中,它们被定义在了 Kotlin 的标准库⾥。
调度器与线程:
协程上下⽂包含⼀个 协程调度器(CoroutineDispatcher)它确定了相关的协程在哪个线程或哪些线程上 执⾏。协程调度器可以将协程限制在⼀个特定的线程执⾏,或将它分派到⼀个线程池,亦或是让它不受限地运⾏。
fun main() = runBlocking<Unit> {
// 运⾏在⽗协程的上下⽂中,即 runBlocking 主协程
launch {
println("main runBlocking " +
": I'm working in thread ${Thread.currentThread().name}")
}
// 不受限的——将⼯作在主线程中
launch(Dispatchers.Unconfined) {
println("Unconfined " +
": I'm working in thread ${Thread.currentThread().name}")
}
// 将会获取默认调度器,默认调度器使⽤共享的后台线程池
launch(Dispatchers.Default) {
println("Default " +
": I'm working in thread ${Thread.currentThread().name}")
}
// 将使它获得⼀个新的线程⼀个专⽤的线程是⼀种⾮常昂贵的资源。在真实的
//应⽤程序中两者都必须被释放,当不再需要的时候,使⽤ close 函数
// ,或存储在⼀个顶层变量中使它在整个应⽤程序中被重⽤。
launch(newSingleThreadContext("MyOwnThread")) {
println("newSingleThreadContext" +
": I'm working in thread ${Thread.currentThread().name}")
}
}
⾮受限调度器 vs 受限调度器:
Dispatchers.Unconfinded协程调度器在调⽤它的线程启动了⼀个协程,但它仅仅只是运⾏到第⼀个挂起点。挂起后,它恢复线程中的协程,⽽这完全由被调⽤的挂起函数来决定。⾮受限的调度器⾮常适⽤于执⾏不消耗 CPU时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。该调度器默认继承了外部的CorontineScope,runBlocking协程的默认调度器,特别是,当它被限 制在了调⽤者线程时,继承⾃它将会有效地限制协程在该线程运⾏并且具有可预测的 FIFO 调度。
fun main() = runBlocking<Unit> {
// ⾮受限的——将和主线程⼀起⼯作
launch(Dispatchers.Unconfined) {
println("Unconfined " +
": I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
// ⽗协程的上下⽂,主 runBlocking 协程
launch {
println("main runBlocking" +
": I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking" +
": After delay in thread ${Thread.currentThread().name}")
}
}
- 调试协程与线程:
- 在不同线程间跳转:
- 上下文的作业:协程的job是上下文的一部分,并且可以使用coroutineContext [Job] 表达式在上下⽂中检索它;
- 子协程:当⼀个协程被其它协程在CorutineScope中启动的时候,它通过CorutineScope。coroutineContext来承袭上下文,并且在这个新的job将会成为⽗协程作业的子作业。当⼀个⽗协程被取消的时候,所有它的⼦协程也会被递归的取消。然而,当使用GlobalScope来启动一个协程时,则新协程的作业没有父作业。因此它与这个启动的作用域无关且独立运作。
- 父协程的职责:⼀个⽗协程总是等待所有的⼦协程执⾏结束。⽗协程并不显式的跟踪所有⼦协程的启动,并且不必使⽤Job.join在最后的时候等它们。
- 命名协程以用于调试:当协程经常打印⽇志并且你只需要关联来⾃同⼀个协程的⽇志记录时,
则⾃动分配的 id 是⾮常好的。然⽽,当⼀个协程与特定请求的处理相关联时或做⼀些特定的后台任务,最好将其明确命名以⽤于调试⽬的。
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
- 组合上下文的元素:需要在协程上下⽂中定义多个元素。可以使⽤ + 操作符来实现。
launch (Dispatchers.Default + CoroutineName("myCoroutine")){
println("I'm working in thread ${Thread.currentThread().name}")
}
- 协程的作用域:通过创建一个CoroutineScope实例来管理协程的生命周期,并使它与activity的生命周期相关联。CoroutineScope可以通过CoroutineScope()创建或通过MainScope工厂函数。前者创建一个通用作用域,而后者为使用Dispatchers.Main作为默认调度器的UI应用程序创建作用域:
fun main() {
val mainScop = MainScope()
mainScop.launch(Dispatchers.Main) {
// TODO:协程的作用域,可添加多个子协程
}
mainScop.cancel()
}
- 线程局部数据:创建了额外的上下⽂元素,且保留给定ThreadLocal 的值,并在每次协程切换其上下⽂时恢复它。
fun main() = runBlocking{
val threadLocal = ThreadLocal<String>();
threadLocal.set("main")
println(
"Pre-main, current thread: ${Thread.currentThread()}," +
" thread local value:'${threadLocal.get()}'")
val job = launch (Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
println(
"Launch start, current thread: ${Thread.currentThread()}, " +
"thread local value:'${threadLocal.get()}'")
yield ()
println ("After yield, current thread: ${Thread.currentThread()}, " +
"thread local value:'${threadLocal.get()}'")
}
job.join()
println(
"Post-main, current thread: ${Thread.currentThread()}, " +
"thread local value:'${threadLocal.get()}'")
}
五.异步流:
挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
@RequiresApi(Build.VERSION_CODES.R)
fun main() = runBlocking<Unit>{
launch {
for (k in 1 ..3) {
println("I`m not blocked $k")
delay(100)
}
}
simple().collect()
}
流收集状态过程:
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
fun simple(): Flow<Int> = flow {
println("Flow started")// flow builder
for (i in 1..3) {
delay(1000) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit>{
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
流取消基础:
流采⽤与协程同样的协作取消。像往常⼀样,流的收集可以在当流在⼀个可取消的挂起函数中挂起的时候取消。
fun simple(): Flow<Int> = flow {
println("Flow started")// flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit>{
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}
流构建器:
除了flow{..}还有其他流构建器。flowOf发构建器定义了一个发射固定值集的流,使用.asFlow()扩展函数,可以将各种集合与序列转换为流。
fun main() = runBlocking<Unit>{
//发射固定值集的流
val flowOf = flowOf(1, 3, 2)
flowOf.collect{
value -> println(value)
}
println("flowOf======")
// 将⼀个整数区间转化为流
(1..3).asFlow().collect { value -> println(value) }
println("Done")
}
过渡流操作符:
可以使⽤操作符转换流,就像使⽤集合与序列⼀样。过渡操作符应⽤于上游流,并返回下游流。这些操作符也是冷操作符,就像流⼀样。这类操作符本⾝不是挂起函数。它运⾏的速度很快,返回新的转换流的定义。流与序列的主要区别在于这些操作符中的代码可以调⽤挂起函数。
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
suspend fun filterRequest(request: Int): Boolean {
delay(1000)
return request <= 3
}
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.map { value -> performRequest(value) }
.collect { value -> println(value) }
println("filter==========")
(1..5).asFlow()
.filter { value -> filterRequest(value) }
.collect { value -> println(value) }
}
转换操作符:
在流转换操作符中,最通⽤的⼀种称为transform。
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { response ->
println(response)
}
}
限长操作符:
限⻓过渡操作符在流触及相应限制的时候会将它的执⾏取消。协程中的取消操作总是通过抛出异常来执⾏,这样所有的资源管理函数(如 try {...} finally {...} 块)会在取消的情况下正常运⾏;
fun numbers():Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2)
.collect {
value -> println(value)
}
}
末端流操作符:
末端操作符是在流上⽤于启动流收集的挂起函数。如collect、toList、toSet、first、single、reduce、fold等
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 ⾄ 5 的平⽅
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)
}
流式连续的:
流的每次单独收集都是按顺序执⾏的,除⾮进⾏特殊操作的操作符使⽤多个流。该收集过程直接在协程中运⾏,该协程调⽤末端操作符。默认情况下不启动新协程。从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
流的上下文:
流的收集总是在调⽤协程的上下⽂中发⽣,所以默认的,flow { ... } 构建器中的代码运⾏在相应流的收集器提供的上下⽂中。flowOn函数用于更改流发射的上下文。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的⽅式进⾏计算
log("Emitting $i")
emit(i) // 发射下⼀个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下⽂的正确⽅式
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
缓冲:
从收集流所花费的时间来看,将流的不同部分运⾏在不同的协程中将会很有帮助,特别是当涉及到⻓时间运⾏的异步操作时。可以在流上使用buffer操作符来并发运行流中的代码以及收集的代码,而不是顺序运行。
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // 缓冲发射项,⽆需等待
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
fun simple(): Flow<Int> = flow {
println("Flow started")// flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
合并:
当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,⽽是只处理最新的那个。
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // 合并发射项,不对每个值进⾏处理
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
fun simple(): Flow<Int> = flow {
println("Flow started")// flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
处理最新值:
当发射器和收集器都很慢的时候,合并是加快处理速度的⼀种⽅式。它通过删除发射值来实现。另⼀种⽅式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。
fun simple(): Flow<Int> = flow {
println("Flow started")// flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新发射最后⼀个值
println("Collecting $value")
delay(300) // 假装我们花费 300 毫秒来处理它
println("Done $value")
}
}
println("Collected in $time ms")
}
组合多个流:
fun main() = runBlocking <Unit>{
val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
.collect { println(it) } // 收集并打印
val nums2 = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs2 = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射⼀次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums2.combine(strs2) { a, b -> "$a -> $b" } // 使⽤“combine”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
展平流:
流表⽰异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另⼀个值序列的请求。
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3)
.asFlow()
.onEach { delay(100) } // 每 100 毫秒发射⼀个数字
.flatMapConcat { requestFlow(it) }//在等待内部流完成之前开始收集下⼀个值
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
println("flatMapMerge==========")
(1..6)
.asFlow()
.onEach { delay(100) } // 每 100 毫秒发射⼀个数字
.flatMapMerge (2){ requestFlow(it) }//接收可选的⽤于限制并发收集的流的个数的concurrency 参数
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
println("flatMapLatest==========")
(1..6)
.asFlow()
.onEach { delay(100) } // 每 100 毫秒发射⼀个数字
.flatMapLatest { requestFlow(it) }//在发出新流后⽴即取消先前流的收集。
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
流异常:
当运算符中的发射器或代码抛出异常时,流收集可以带有异常的完成。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 发射下⼀个值
}
}
fun simple2(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 发射下⼀个值
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
println("simple2======================")
try {
simple2().collect { value ->
println(value)
}
} catch (e: Throwable) {
println("Caught $e")
}
}
异常透明度:
catch操作符来保留此异常的透明性并允许封装它的异常处理。catch 操作符的代码块可以分析异常并根据捕获到的异常以不同的⽅式对其做出反应。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 发射下⼀个值
}
}
fun main() = runBlocking<Unit> {
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
流完成:
当流收集完成时(普通情况或异常情况),它可能需要执⾏⼀个动作。你可能已经注意到,它可以通过两种⽅式完成:命令式或声明式。
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
}
} finally {
println("Done")
}
println("onCompletion================")
simple()
.onCompletion {
println("Done")
}
.collect { value ->
println(value)
}
}
启动流:
⼀个过渡操作符。也需要⼀个末端操作符来收集流。仅调⽤ onEach 是⽆效的
fun events(): Flow<Int> = (1..3).asFlow().onEach {
delay(100)
}
fun main() = runBlocking<Unit> {
events().onEach { event ->
println("Event: $event")
}.collect() // <--- 等待流收集
println("Done")
println("launchIn===============")
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执⾏流
println("Done")
}
流取消检测:
为⽅便起⻅,流构建器对每个发射值执⾏附加的ensureActive检测以进⾏取消。这意味着从 flow { ... }发出的繁忙循环是可以取消的。
六.通道:
延期的值提供了⼀种便捷的⽅法使单个值在多个协程之间进⾏相互传输。通道提供了⼀种在流中传输值的⽅法。
通道基础:
一个Channel是一个和BlockingQueue⾮常相似的概念。其中⼀个不同是它代替了阻塞的 put 操作并提供了挂起的send,还替代了阻塞的take操作并提供了挂起的receive。
fun main() = runBlocking<Unit> {
val channel = Channel<Int> ()
launch {
// 这⾥可能是消耗⼤量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平⽅并发送
for (x in 1..5) channel.send(x * x)
}
// 这⾥我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")
}
关闭与迭代通道:
和队列不同,⼀个通道可以通过被关闭来表明没有更多的元素将会进⼊通道。在接收者中可以定期的使⽤ for循环来从通道中接收元素。
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我们结束发送
}
// 这⾥我们使⽤ `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")
}
构建通道生产者:
produce的便捷的协程构建器,可以很容易的在⽣产者端正确⼯作。
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
管道:
管道是⼀种⼀个协程在流中开始⽣产可能⽆穷多个元素的模式。并且另⼀个或多个协程开始消费这些流,做⼀些操作,并⽣产了⼀些额外的结果。
fun main() = runBlocking {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
扇出:
多个协程也许会接收相同的管道,在它们之间进⾏分布式⼯作
fun main() = runBlocking {
val numbers = produceNumbers() // produces integers from 1 and on
repeat(5) {
println(launchProcessor(it, numbers)) // print first five
}
delay(950)
numbers.cancel()
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
扇入:
多个协程可以发送到同⼀个通道。
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六个
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有⼦协程来让主协程结束
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
带缓冲的通道:
⽆缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调⽤,则它将被挂起直到接收被调⽤,如果接收先被调⽤,它将被挂起直到发送被调⽤。
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(4) // 启动带缓冲的通道
val sender = launch { // 启动发送者协程
repeat(10) {
println("Sending $it") // 在每⼀个元素发送前打印它们
channel.send(it) // 将在缓冲区被占满时挂起
}
}
// 没有接收到东西……只是等待……
delay(1000)
sender.cancel() // 取消发送者协程
}
通道是公平的:
发送和接收操作是 公平的 并且尊重调⽤它们的多个协程。它们遵守先进先出原则。
计时器通道:
是⼀种特别的会合通道,每次经过特定的延迟都会从该通道进⾏消费并产⽣ Unit
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // no initial delay
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements
//have 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模拟⼤量消费延迟
println("Consumer pauses for 150ms")
delay(150)
// 下⼀个元素⽴即可⽤
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// 请注意,`receive` 调⽤之间的暂停被考虑在内,下⼀个元素的到达速度更快
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 表明不再需要更多的元素
}
七.异常处理与监督:
异常的传播:
协程构建器有两种形式:⾃动传播异常或向⽤⼾暴露异常。当这些构建
fun main() = runBlocking {
val job = GlobalScope.launch { // launch 根协程
println("Throwing exception from launch")
// 我们将在控制台打印Thread.defaultUncaughtExceptionHandler
throw IndexOutOfBoundsException()
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async { // async 根协程
println("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖⽤⼾去调⽤等待
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}
CoroutineExceptionHandler:
将未捕获异常打印到控制台的默认⾏为是可⾃定义的。根协程CoroutineExceptionHandler上下⽂元素 可以被⽤于这个根协程通⽤的 catch 块,及其所有可能⾃定义了异常处理的⼦协程。⽆法从 CoroutineExceptionHandler 的异常中恢复。当调⽤处理者的时候,协程已经完成并带有相应的异常。通常,该处理者⽤于记录异常,显⽰某种错误消息,终⽌和(或)重新启动应⽤程序。
取消与异常:
取消与异常紧密相关。协程内部使⽤ CancellationException 来进⾏取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被⽤来作为额外调试信息的资源。当⼀个协程使⽤Job.cancel取消的时候,它会被终⽌,但是它不会取消它的⽗协程。
异常聚合:
当协程的多个⼦协程因异常⽽失败时,⼀般规则是“取第⼀个异常”,因此将处理第⼀个异常。在第⼀个异常之后发⽣的所有其他异常都作为被抑制的异常绑定⾄第⼀个异常。
监督:
取消是在协程的整个层次结构中传播的双向关系,监督作业SupervisorJob 的取消只会向下传播。
fun main() = runBlocking {
try {
supervisorScope {
val child = launch {
try {
println("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("The child is cancelled")
}
}
// 使⽤ yield 来给我们的⼦作业⼀个机会来执⾏打印
yield()
println("Throwing an exception from the scope")
throw AssertionError()
}
} catch(e: AssertionError) {
println("Caught an assertion error")
}
}
八.共享的可变状态与并发:
协程可⽤多线程调度器并发执⾏。这样就可以提出所有常⻅的并发问题。主 要的问题是同步访问共享的可变状态。协程领域对这个问题的⼀些解决⽅案类似于多线程领域中的解决⽅案, 但其它解决⽅案则是独⼀⽆⼆的。
线程安全的数据结构:
⼀种对线程、协程都有效的常规解决⽅法,就是使⽤线程安全(也称为同步的、可线性化、原⼦)的数据结构,它为需要在共享状态上执⾏的相应操作提供所有必需的同步处理。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执⾏同⼀动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作⽤域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
以细粒度限制线程:
限制线程 是解决共享可变状态问题的⼀种⽅案:对特定共享状态的所有访问权都限制在单个线程中。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执⾏同⼀动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作⽤域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 将每次⾃增限制在单线程上下⽂中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
以粗粒度限制线程:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
互斥:
互斥解决⽅案:使⽤永远不会同时执⾏的 关键代码块 来保护共享状态的所有修改。在阻塞的世界中,你通常会为此⽬的使⽤ synchronized 或者 ReentrantLock 。在协程中的替代品叫做Mutex。它具有lock 和unlock ⽅法,可以隔离关键的部分。关键的区别在Mutex.lock() 是⼀个挂起函数,它不会阻塞线程。
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// ⽤锁保护每次⾃增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
Actors:
是由协程、被限制并封装到该协程中的状态以及⼀个与其它协程通信的 通道 组合⽽成的⼀个实体。 ⼀个简单的 actor 可以简单的写成⼀个函数,但是⼀个拥有复杂状态的 actor 更适合由类来表⽰。actor协程构建器,它可以⽅便地将 actor 的邮箱通道组合到其作⽤域中(⽤来接收消息)、组合发送channel 与结果集对象,这样对 actor 的单个引⽤就可以作为其句柄持有。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // create the actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
九.select表达式:
select 表达式可以同时等待多个挂起函数,并选择 第⼀个可⽤的。
fun CoroutineScope.fizz() = produce<String> {
while (true) { // 每 300 毫秒发送⼀个 "Fizz"
delay(300)
send("Fizz")
}
}
fun CoroutineScope.buzz() = produce<String> {
while (true) { // 每 500 毫秒发送⼀个"Buzz!"
delay(500)
send("Buzz!")
}
}
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> 意味着该 select 表达式不返回任何结果
fizz.onReceive { value -> // 这是第⼀个 select ⼦句
println("fizz -> '$value'")
}
buzz.onReceive { value -> // 这是第⼆个 select ⼦句
println("buzz -> '$value'")
}
}
}
fun main() = runBlocking<Unit> {
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // 取消 fizz 和 buzz 协程
}
通道关闭时select:
select 中onReceive⼦句在已经关闭的通道执⾏会发⽣失败,并导致相应的 select 抛出异常。我们可以使⽤onReceiveOrNull⼦句在关闭通道时执⾏特定操作。
suspend fun selectAorB(a: ReceiveChannel<String>
, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"a -> '$value'"
} else {
"Channel 'a' is closed"
}
}
b.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"b -> '$value'"
} else {
"Channel 'b' is closed"
}
}
}
fun main() = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
Select以发送:
Select 表达式具有onSend⼦句,可以很好的与选择的偏向特性结合使⽤。
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
fun main() = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
Select延迟值:
延迟值可以使⽤onAwait⼦句查询。
fun CoroutineScope.asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
fun main() = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
在延迟值通道上切换:
它消费⼀个产⽣延迟字符串的通道,并等待每个接收的延迟值,但它只在下⼀个延迟值到达或者通道关闭之前处于运⾏状态。
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
// return next deferred value from this select or null
val next = select<Deferred<String>?> {
input.onReceiveCatching { update ->
update.getOrNull()
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
// and use the next deferred from the input channel
input.receiveCatching().getOrNull()
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
fun CoroutineScope.asyncString(str: String, time: Long) = async {
delay(time)
str
}
fun main() = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}