注:本文中使用
runBlocking
是為了方便測試,業(yè)務開發(fā)中禁止使用
一扰她、協(xié)程基礎(chǔ)
1、創(chuàng)建協(xié)程的三種方式
(1) 使用 runBlocking
頂層函數(shù)(業(yè)務開發(fā)中不會用到這種方法喉磁,因為它是線程阻塞的碱茁,一般用于單元測試)。
val runBlock = runBlocking {}
(2) 使用 GlobalScope
單例對象創(chuàng)建(Android 開發(fā)中不推薦這種用法,因為它的生命周期只受整個應用程序的生命周期限制茁瘦,且不能取消)品抽。
val globalScopeLaunch = GlobalScope.launch {}
val globalScopeAsync = GlobalScope.async {}
(3) 自行通過 CoroutineScope
創(chuàng)建。
val coroutineScopeLaunch = CoroutineScope(SupervisorJob() + Dispatchers.Default).launch {}
val coroutineScopeAsync = CoroutineScope(SupervisorJob() + Dispatchers.Default).async {}
2甜熔、簡單示例
fun testCoroutine() = runBlocking {
launch {
delay(1000)
println("hello")
delay(2000)
println("world")
}
println("test1")
println("test2")
// test1
// test2
// hello
// world
}
fun testCoroutine()2 = runBlocking {
val job = launch {
delay(1000)
println("hello")
delay(2000)
println("world")
}
println("test1")
job.join() // 顯示地等待 job 執(zhí)行結(jié)束
println("test2")
// test1
// hello
// world
// test2
}
3圆恤、取消協(xié)程
fun cancelCoroutine() = runBlocking {
val job = launch {
repeat(1000) {
println("job: test $it ...")
delay(500)
}
}
delay(1300)
println("main: ready to cancel !")
job.cancel() // 取消作業(yè)
job.join() // 等待作業(yè)執(zhí)行結(jié)束
// job.cancelAndJoin()
println("main: Now cancel.")
// job: test 0 ...
// job: test 1 ...
// job: test 2 ...
// main: ready to cancel !
// main: Now cancel.
}
如果協(xié)程正在執(zhí)行計算任務,并且沒有檢查取消的話腔稀,那么它是不能被取消的盆昙。
fun canNotCancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: hello ${i++} ...")
nextPrintTime += 500
}
}
}
delay(1300)
println("main: ready to cancel !")
job.cancelAndJoin()
println("main: now canceled.")
// job: hello 0 ...
// job: hello 1 ...
// job: hello 2 ...
// main: ready to cancel !
// job: hello 3 ...
// job: hello 4 ...
// main: now canceled.
}
加入狀態(tài)判斷 isActive
或者使用狀態(tài)檢查 ensureActive()
再或者使用 yield()
(內(nèi)部會先檢查狀態(tài))來確保取消協(xié)程。
fun ensureCancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5 && isActive) {
// ensureActive()
// yield()
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: hello ${i++} ...")
nextPrintTime += 500
}
}
}
delay(1300)
println("main: ready to cancel !")
job.cancelAndJoin()
println("main: now canceled.")
// job: hello 0 ...
// job: hello 1 ...
// job: hello 2 ...
// main: ready to cancel !
// main: now canceled.
}
4焊虏、等待協(xié)程執(zhí)行結(jié)果
fun asyncCoroutine() = runBlocking {
val deferred = async {
delay(2000)
"async result"
}
val result = deferred.await()
println("deferred: $result")
// deferred: async result
}
5淡喜、異常處理
(1)try - catch - finally
fun catchExCoroutine() = runBlocking {
val job = launch {
try {
delay(200)
println("try...")
throw NullPointerException()
} catch (e: Exception) {
println("exception: ${e.message}")
} finally {
println("finally...")
}
}
delay(1000)
println("cancel")
job.cancel()
println("done")
// try...
// exception: null
// finally...
// cancel
// done
}
(2)CoroutineExceptionHandler
fun handleExCoroutine() = runBlocking {
val job = launch(CoroutineExceptionHandler { coroutineContext, throwable ->
println("exception: ${throwable.message}")
}) {
delay(200)
println("try...")
throw NullPointerException()
}
delay(1000)
println("cancel")
job.cancel()
println("done")
// try...
// Exception in thread "main" java.lang.NullPointerException
// at com.wf.kotlin.study.11協(xié)程._01_協(xié)程CoroutineKt$handleExCoroutine$1$job$2.invokeSuspend(01.協(xié)程Coroutine.kt:186)
// ......
}
6、協(xié)程的超時
fun timeoutCoroutine() = runBlocking {
withTimeout(300) {
println("start...")
delay(100)
println("progress 1...")
delay(100)
println("progress 2...")
delay(100)
println("progress 3...")
delay(100)
println("progress 4...")
delay(100)
println("progress 5...")
println("end")
}
// start...
// progress 1...
// progress 2...
// Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
// at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
// ......
}
二炕淮、并發(fā)與掛起函數(shù)
1拆火、使用 async 并發(fā)
fun asyncConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("start")
val a = async(Dispatchers.IO) {
printlnWithThread()
delay(1000)
1
}
val b = async(Dispatchers.IO) {
printlnWithThread()
delay(2000)
2
}
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, time: 2065
}
2、惰性啟動 async
(1)通過將 start
參數(shù)設置為 CoroutineStart.LAZY
變成惰性的涂圆;
(2)在這個模式下们镜,調(diào)用 await
獲取協(xié)程執(zhí)行結(jié)果或者調(diào)用 Job
的 start
方法時,協(xié)程才會啟動润歉。
fun lazyAsyncConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("enter")
val a = async(Dispatchers.IO, CoroutineStart.LAZY) {
printlnWithThread()
delay(1000)
1
}
val b = async(Dispatchers.IO, CoroutineStart.LAZY) {
printlnWithThread()
delay(2000)
2
}
delay(1000)
printlnWithThread("start")
a.start()
b.start()
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:24:32 CST 2023 Thread -> id: 1, name: main, enter
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, time: 3071
}
3模狭、掛起函數(shù)
- 定義:
(1)使用suspend
關(guān)鍵字修飾的函數(shù)成為掛起函數(shù);
(2)掛起函數(shù)只能在另一個掛起函數(shù)踩衩,或者協(xié)程中被調(diào)用嚼鹉;
(3)在掛起函數(shù)中可以調(diào)用普通函數(shù)(非掛起函數(shù))。 - 實現(xiàn):實現(xiàn)掛起的的目的是讓程序脫離當前的線程驱富,也就是要 切線程
(1)給函數(shù)加上suspend
關(guān)鍵字锚赤;
(2)如果是耗時操作,則使用withContext(Dispatchers.IO)
切換線程去執(zhí)行耗時操作褐鸥;
(3)如果是延時操作线脚,則調(diào)用delay(100)
函數(shù)即可。
suspend fun callA(): Int {
printlnWithThread() // 調(diào)用普通函數(shù)
delay(1000) // 調(diào)用掛起函數(shù)
return 1
}
fun suspendFunConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("start")
val a = async(Dispatchers.IO) {
callA()
}
val b = async(Dispatchers.IO) {
printlnWithThread()
delay(2000)
2
}
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, time: 2074
}
三叫榕、協(xié)程上下文和作用域
1浑侥、協(xié)程上下文 CoroutineContext
(1)協(xié)程上下文包含當前協(xié)程 scope
的信息, 比如 Job
, Dispatcher
, ContinuationInterceptor
, CoroutineName
和 CoroutineId
晰绎;
(2)在 CoroutineContext
中寓落,是用 map
來存這些信息的, map
的鍵是這些類的 伴生對象荞下,值是這些類的一個實例伶选。
fun jobCoroutineContext() = runBlocking {
val job = launch(Dispatchers.Default + CoroutineName("test")) {
printlnWithThread("job: ${this.coroutineContext[Job]}, ${this.coroutineContext[CoroutineName]}")
}
printlnWithThread("job: $job")
printlnWithThread("job: ${job[Job]}")
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, job: StandaloneCoroutine{Active}@7d0587f1, CoroutineName(test)
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
}
2史飞、協(xié)程作用域 CoroutineScope
(1)CoroutineScope
的代碼很簡單,主要作用是提供 CoroutineContext
考蕾, 啟動協(xié)程需要 CoroutineContext
祸憋;
(2)作用域可以管理其域內(nèi)的所有協(xié)程,一個 CoroutineScope
可以有許多的子 scope
肖卧;
(3)協(xié)程內(nèi)部是通過 CoroutineScope.coroutineContext
自動繼承自父協(xié)程的上下文蚯窥;
(4)一個父協(xié)程總是等待所有的子協(xié)程執(zhí)行結(jié)束,取消父協(xié)程會取消所有的子協(xié)程塞帐;
(5)默認情況下拦赠,協(xié)程內(nèi),某個子協(xié)程拋出一個非 CancellationException
異常葵姥,未被捕獲荷鼠,會傳遞到父協(xié)程;
(6)任何一個子協(xié)程異常退出榔幸,那么整體都將退出允乐。
fun myCoroutineScope() = runBlocking {
val dispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
val myScope = CoroutineScope(dispatcher)
myScope.launch {
printlnWithThread()
}
// Wed Feb 22 15:30:48 CST 2023 Thread -> id: 13, name: pool-1-thread-1,
}
3、SupervisorJob
(1)SupervisorJob 與 Job 基本類似削咆,區(qū)別在于父協(xié)程和兄弟協(xié)程不會被此子協(xié)程的異常和取消所影響;
(2)適合一些獨立不相干的任務牍疏,任何一個任務出問題,并不會影響其他任務的工作拨齐。
fun supervisorJob() = runBlocking(CoroutineExceptionHandler { _, throwable ->
printlnWithThread(throwable.message ?: "throwable")
}) {
launch(Dispatchers.Default + SupervisorJob()) {
printlnWithThread("child job")
throw NullPointerException()
}
launch {
delay(1000)
printlnWithThread("brother job")
}
delay(3000)
printlnWithThread("end")
// Wed Feb 22 14:50:19 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, child job
// Wed Feb 22 14:50:19 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, ExceptionHandler
// Wed Feb 22 14:50:20 CST 2023 Thread -> id: 1, name: main, brother job
// Wed Feb 22 14:50:22 CST 2023 Thread -> id: 1, name: main, end
}
四鳞陨、協(xié)程并發(fā)和同步
1、Volatile
(1)保證此變量對所有的線程的可見性(跳過 cpu cache 讀取主存)瞻惋;
(2)禁止指令重排序優(yōu)化厦滤;
(3)不會阻塞線程;
(4)不能保證原子性(即不++歼狼、--等非一次原子操作中不能同步)掏导。
@Volatile
var flag1 = true
fun volatileFun() {
printlnWithThread("start")
Thread {
flag1 = false
printlnWithThread("flag1 = $flag1")
}.start()
while (flag1) {
printlnWithThread("flag1 = $flag1")
}
printlnWithThread("end")
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 13, name: Thread-0, flag1 = false
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, end
}
2、synchronized
var flag2 = true
fun synchronizedFun() {
printlnWithThread("start")
Thread {
flag2 = false
printlnWithThread("flag2 = $flag2")
}.start()
while (flag2) {
synchronized(flag2) {
printlnWithThread("flag2 = $flag2")
}
}
printlnWithThread("end")
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, flag2 = true
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 13, name: Thread-0, flag2 = false
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, end
}
3羽峰、使用線程安全的數(shù)據(jù)結(jié)構(gòu) Atomicxxxx碘菜、ReentrantLock等
var count = AtomicInteger()
fun atomicFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
count.incrementAndGet()
}
}
}
launch {
delay(3000)
printlnWithThread("count = ${count.get()}")
printlnWithThread("end")
}
// Wed Feb 22 15:14:40 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:14:43 CST 2023 Thread -> id: 24, name: DefaultDispatcher-worker-12, count = 100000
// Wed Feb 22 15:14:43 CST 2023 Thread -> id: 24, name: DefaultDispatcher-worker-12, end
}
var lock = ReentrantLock()
var count2 = 0
fun lockFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
lock.lock()
try {
count2++
} finally {
lock.unlock()
}
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count2")
printlnWithThread("end")
}
// Wed Feb 22 15:17:45 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:17:48 CST 2023 Thread -> id: 30, name: DefaultDispatcher-worker-18, count = 100000
// Wed Feb 22 15:17:48 CST 2023 Thread -> id: 30, name: DefaultDispatcher-worker-18, end
}
4、協(xié)程專屬鎖 Mutex
(1)它具有 lock
和 unlock
方法限寞,關(guān)鍵的區(qū)別在于, Mutex.lock()
是一個掛起函數(shù)仰坦,它不會阻塞當前線程履植;
(2)還有 withLock
擴展函數(shù),可以方便的替代常用的 mutex.lock()
悄晃、try { …… } finally { mutex.unlock() }
模式玫霎。
var mutex = Mutex()
var count3 = 0
fun mutexFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
mutex.withLock {
count3++
}
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count3")
printlnWithThread("end")
}
// Wed Feb 22 15:19:44 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:19:47 CST 2023 Thread -> id: 79, name: DefaultDispatcher-worker-67, count = 100000
// Wed Feb 22 15:19:47 CST 2023 Thread -> id: 79, name: DefaultDispatcher-worker-67, end
}
5凿滤、限制線程(單線程)
val countContext = newSingleThreadContext("countContext")
var count4 = 0
fun singleThreadFun() = runBlocking {
printlnWithThread("start")
withContext(countContext) {
repeat(100) {
launch {
repeat(1000) {
count4++
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count4")
printlnWithThread("end")
}
}
// Wed Feb 22 15:21:27 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:21:30 CST 2023 Thread -> id: 13, name: countContext, count = 100000
// Wed Feb 22 15:21:30 CST 2023 Thread -> id: 13, name: countContext, end
}
6、使用 Actors(后續(xù)介紹的熱流 Channel庶近,類似于 java 的 BlockingQueue)
(1)一個 actor
是由 協(xié)程翁脆、 被限制并封裝到該協(xié)程中的 狀態(tài),以及一個與其它協(xié)程通信的 通道 組合而成的一個實體鼻种;
(2)CoroutineScope.actor()
方法返回的是一個 SendChannel
對象反番。
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun actorsFun() = runBlocking(Dispatchers.IO) {
// Channel 管道接收消息并處理
val counterActor = actor<CounterMsg> {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
printlnWithThread("start")
// 發(fā)送消息到 Channel 增加計數(shù)
repeat(100) {
launch {
repeat(1000) {
counterActor.send(IncCounter)
}
}
}
// 延時3s后,發(fā)送消息到 Channel 獲取計數(shù)結(jié)果
launch {
delay(3000)
// 發(fā)送一條獲取值的消息
val resp = CompletableDeferred<Int>()
counterActor.send(GetCounter(resp))
printlnWithThread("counter = ${resp.await()}")
counterActor.close()
printlnWithThread("end")
}
// Wed Feb 22 15:25:07 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:25:10 CST 2023 Thread -> id: 77, name: DefaultDispatcher-worker-64, counter = 100000
// Wed Feb 22 15:25:10 CST 2023 Thread -> id: 77, name: DefaultDispatcher-worker-64, end
}
注:自定義線程信息打印函數(shù)如下:
fun printlnWithThread(msg: String = "") {
val thread = Thread.currentThread()
println("${Date()} Thread -> id: ${thread.id}, name: ${thread.name}, $msg")
}