定義
協(xié)程基于線程秉颗,是輕量級(jí)的線程
作用
- 處理耗時(shí)任務(wù),這種任務(wù)常常會(huì)阻塞主線程
- 保證主線程安全送矩,即確保安全地從主線程調(diào)用任何suspend函數(shù)
特點(diǎn)
- 讓異步邏輯同步化
- 最核心的點(diǎn)就是站宗,函數(shù)或者一段程序能夠被掛起,稍后再在掛起得位置恢復(fù)
掛起函數(shù)
- 使用suspend關(guān)鍵字修飾的函數(shù)
- 掛起函數(shù)只能在協(xié)程體內(nèi)或其他掛起函數(shù)內(nèi)調(diào)用
掛起和阻塞的區(qū)別
- 掛起不會(huì)阻塞主線程益愈,主線程可以正常刷新UI梢灭,但阻塞就會(huì)導(dǎo)致主線程ANR
協(xié)程調(diào)度器
- Dispatchers.Main:主線程上處理UI交互相關(guān),更新LiveData
- Dispatchers.IO:非主線程蒸其,磁盤讀寫和網(wǎng)絡(luò)IO
- Dispatchers.Default:非主線程敏释,CPU密集型任務(wù),排序摸袁,JSON數(shù)據(jù)解析等
任務(wù)泄漏
- 當(dāng)某個(gè)協(xié)程任務(wù)丟失钥顽,無(wú)法追蹤,會(huì)導(dǎo)致內(nèi)存靠汁、CPU蜂大、磁盤等資源浪費(fèi),甚至發(fā)送一個(gè)無(wú)用的網(wǎng)絡(luò)請(qǐng)求蝶怔,這種稱為任務(wù)泄漏
- 為了避免奶浦,引入了結(jié)構(gòu)化并發(fā)機(jī)制
結(jié)構(gòu)化并發(fā)
- 可以取消任務(wù)、追蹤任務(wù)踢星、協(xié)程失敗時(shí)發(fā)出錯(cuò)誤信號(hào)
協(xié)程作用域CoroutineScope
- 可以追蹤所有協(xié)程澳叉,也可以取消協(xié)程
- GlobalScope:生命周期是Process級(jí)別,即使Activity或Fragment已經(jīng)被銷毀沐悦,協(xié)程仍然運(yùn)行
- MainScope:在activity中使用成洗,可以在onDestroy中取消協(xié)程
- ViewModelScope:只能在ViewModel中使用,綁定ViewModel生命周期
- lifecycleScope:只能在Activity藏否、Fragment中使用瓶殃,會(huì)綁定Activity、Fragment的生命周期
協(xié)程構(gòu)建器
launch和async構(gòu)建器都用來(lái)啟動(dòng)新協(xié)程
- launch副签,返回一個(gè)Job并且不附帶任何結(jié)果
- async遥椿,返回一個(gè)Deferred基矮,Deferred也是一個(gè)Job,可以使用.await()在一個(gè)延期的值上得到最終的結(jié)果
- launch 是非阻塞的 而 runBlocking 是阻塞的修壕。多個(gè) withContext 任務(wù)是串行的愈捅, 且withContext 可直接返回耗時(shí)任務(wù)的結(jié)果遏考。 多個(gè) async 任務(wù)是并行的慈鸠,async 返回的是一個(gè)Deferred<T>,需要調(diào)用其await()方法獲取結(jié)果
- runBlocking一般用在測(cè)試中灌具,會(huì)阻塞當(dāng)前線程青团,會(huì)等到包裹的子協(xié)程都執(zhí)行完畢才退出
- 事實(shí)上await()也不一定導(dǎo)致協(xié)程會(huì)被掛起,await() 只有在 async 未執(zhí)行完成返回結(jié)果時(shí)咖楣,才會(huì)掛起協(xié)程督笆。若 async 已經(jīng)有結(jié)果了,await() 則直接獲取其結(jié)果并賦值給變量诱贿,此時(shí)不會(huì)掛起協(xié)程
構(gòu)建器 | 是否立即啟動(dòng)娃肿? | 串行?并行珠十? | 是否阻塞當(dāng)前線程料扰? | 返回結(jié)果 |
---|---|---|---|---|
launch | 是 | 根據(jù)包裹的子協(xié)程類型而定 | 否 | Job對(duì)象 |
async | 是 | 任務(wù)之間是并行 | 否 | Deferred,可以用await()方法獲取結(jié)果 |
runBlocking | 是 | 根據(jù)包裹的子協(xié)程類型而定 | 阻塞 | 子協(xié)程都執(zhí)行完畢后才退出 |
withContext | 不是 | 任務(wù)之間是串行 | 否 | 可以直接返回耗時(shí)任務(wù)結(jié)果焙蹭,協(xié)程體最后一行內(nèi)容 |
doAsync和async
- doAsync 的源碼它的實(shí)現(xiàn)都是基于Java的 Future 類進(jìn)行異步處理和通過Handler進(jìn)行線程切換 晒杈,從而封裝的一個(gè)擴(kuò)展函數(shù)方便線程切換。
- 與 async 關(guān)系不大孔厉,因?yàn)?doAsync并沒有用到協(xié)程庫(kù)中的東西
- 可以通過 uiThread { } 來(lái)切換會(huì)主線程
btn.setOnClickListener {
doAsync {
Log.e("TAG", " doAsync... [當(dāng)前線程為:${Thread.currentThread().name}]")
uiThread {
Log.e("TAG", " uiThread.... [當(dāng)前線程為:${Thread.currentThread().name}]")
}
}
}
Job對(duì)象的生命周期
- 每一個(gè)通過launch或者async創(chuàng)建的協(xié)程拯钻,都會(huì)返回一個(gè)Job實(shí)例,該實(shí)例時(shí)協(xié)程的唯一標(biāo)識(shí)撰豺,負(fù)責(zé)管理協(xié)程的生命周期
- 一個(gè)任務(wù)包含一系列狀態(tài):新創(chuàng)建(New)粪般、活躍(Active)、完成中(Completing)污桦、已完成(Completed)刊驴、取消中(Canceling)和已取消(Cancelled)。我們無(wú)法直接訪問這些狀態(tài)寡润,可以通過訪問Job的屬性:isActive捆憎、isCancelled和isCompleted
- 如果協(xié)程處于活躍狀態(tài),協(xié)程運(yùn)行出錯(cuò)或是調(diào)用job.cancel()梭纹,都會(huì)將當(dāng)前任務(wù)置為取消中(Cancelling)狀態(tài)(isActive=false躲惰,isCancelled=true)。當(dāng)所有子協(xié)程都完成后变抽,協(xié)程會(huì)進(jìn)入已取消(Cancelled)狀態(tài)础拨,此時(shí)isCompleted=true
- 協(xié)程完成氮块,可能是正常完成,也可能是被取消了
等待一個(gè)作業(yè)
由launch啟動(dòng)的協(xié)程用join()方法诡宗;用async啟動(dòng)的協(xié)程用await()
@Test
fun `test coroutine join`() = runBlocking {
val job1 = launch {
delay(200)
println("job1 finished")
}
//這樣可以確保job1執(zhí)行完再執(zhí)行后面的job2和job3
job1.join()
val job2 = launch {
delay(200)
println("job2 finished")
//返回結(jié)果
"job2 result"
}
val job3 = launch {
delay(200)
println("job3 finished")
//返回結(jié)果
"job2 result"
}
}
組合并發(fā)
@Test
fun `test async`() = runBlocking {
val time = measureTimeMillis {
val one = doOne()
val two = doTwo()
//輸出是30
println("result: ${one + two}")
}
//輸出是2秒多滔蝉,也就是是串行的
println(time)
}
//并發(fā)
@Test
fun `test combine async`() = runBlocking {
val time = measureTimeMillis {
val one = async { doOne() }
val two = async { doTwo() }
//輸出是30
println("result: ${one.await() + two.await()}")
}
//輸出是1秒多,也就是是并行的
println(time)
}
private suspend fun doOne(): Int{
delay(1000)
return 10
}
private suspend fun doTwo(): Int{
delay(1000)
return 20
}
注意async的寫法不能是:
val one = async { doOne() }.await()
val two = async { doTwo() }.await()
這樣起不到并發(fā)效果塔沃,而是等到one執(zhí)行完蝠引,再執(zhí)行two
協(xié)程的啟動(dòng)模式
- DEFAULT:協(xié)程創(chuàng)建后,立即開始調(diào)度蛀柴,在調(diào)度前如果協(xié)程被取消螃概,其將直接進(jìn)去取消響應(yīng)狀態(tài)
- ATOMIC:協(xié)程創(chuàng)建后,立即開始調(diào)度鸽疾,協(xié)程執(zhí)行到第一個(gè)掛起點(diǎn)之前不響應(yīng)取消
需要注意的是吊洼,立即調(diào)度不等于立即執(zhí)行
- LAZY:只有協(xié)程被需要時(shí),包括主動(dòng)調(diào)用協(xié)程的start制肮、join或者await等函數(shù)時(shí)才會(huì)開始調(diào)度冒窍,如果調(diào)度前就被取消,那么該協(xié)程將直接進(jìn)入異常結(jié)束狀態(tài)
@Test
fun `test start mode`() = runBlocking {
val job = async(start = CoroutineStart.LAZY) {
//
}
//...其他代碼
//啟動(dòng)協(xié)程
job.await()
}
- UNDISPATCHED:協(xié)程創(chuàng)建后立即在當(dāng)前函數(shù)調(diào)用棧中執(zhí)行豺鼻,直到遇到第一個(gè)真正的掛起點(diǎn)
@Test
fun `test start mode`() = runBlocking {
val job = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
println("thread:"+ Thread.currentThread().name)
}
}
//上面輸出的線程名字是主線程综液,因?yàn)閁NDISPATCHED會(huì)立即在當(dāng)前線程中執(zhí)行,而runBlocking是在主線程中
協(xié)程作用域構(gòu)建器 coroutineScope拘领、runBlocking意乓、supervisorScope
- runBlocking是常規(guī)函數(shù),會(huì)阻塞當(dāng)前線程约素;coroutineScope是掛起函數(shù)届良,不會(huì)阻塞當(dāng)前線程
- 它們都會(huì)等待協(xié)程體以及所有子協(xié)程結(jié)束,一個(gè)是阻塞線程等待圣猎,一個(gè)是掛起等待
協(xié)程作用域構(gòu)建器 coroutineScope士葫、supervisorScope
- coroutineScope,一個(gè)協(xié)程失敗了送悔,所有其他兄弟協(xié)程也會(huì)被取消
- supervisorScope慢显,一個(gè)子協(xié)程失敗了,不會(huì)影響其他兄弟協(xié)程欠啤,但如果是作用域有異常失敗了荚藻,則所有的子協(xié)程都會(huì)失敗退出
coroutineScope和CoroutineScope
- coroutineScope是一個(gè)掛起函數(shù),是協(xié)程作用域構(gòu)建器洁段,CoroutineScope()是一個(gè)普通函數(shù)
- coroutineScope后面的協(xié)程作用域的協(xié)程上下文是繼承父協(xié)程作用域的上下文
- CoroutineScope()有自己的作用域上下文
- 都能夠進(jìn)行解構(gòu)化并發(fā)应狱,可以很好的管理多個(gè)子協(xié)程
協(xié)程的取消
- 取消作用域會(huì)取消它的子協(xié)程
- 被取消的子協(xié)程不會(huì)影響其余兄弟協(xié)程
- 協(xié)程通過拋出一個(gè)特殊的異常CancellationException來(lái)處理取消操作
- 所有kotlinx.coroutines中的掛起函數(shù)(withContext、delay等)都是可取消的
- CPU密集型任務(wù)無(wú)法直接用cancel來(lái)取消
CPU密集型任務(wù)的取消
- 通過isActive來(lái)判斷取消祠丝,因?yàn)槿∠娜蝿?wù)isActive為false
- 通過ensureActive()來(lái)取消疾呻,如果被取消除嘹,任務(wù)isActive為false,會(huì)拋一個(gè)異常
- yield函數(shù)會(huì)檢查所在協(xié)程的狀態(tài)岸蜗,如果已經(jīng)取消尉咕,則拋出CancellationException予以響應(yīng)。此外璃岳,它還會(huì)嘗試出讓線程的執(zhí)行權(quán)年缎,給其他協(xié)程提供執(zhí)行的機(jī)會(huì)
協(xié)程取消的副作用
- 在finally中釋放資源
@Test
fun `test release resources`() = runBlocking {
var br = BufferedReader(FileReader("xxx"))
with(br){
var line:String?
try {
while (true){
line = readLine() ?: break
println(line)
}
}finally {
//關(guān)閉資源
close()
}
}
}
- 用use函數(shù):該函數(shù)只能被實(shí)現(xiàn)了Closeable的對(duì)象使用,程序結(jié)束的時(shí)候會(huì)自動(dòng)調(diào)用close方法矾睦,適合文件對(duì)象
//use函數(shù)在文件使用完畢后會(huì)自動(dòng)調(diào)用close函數(shù)
BufferedReader(FileReader("xxx")).use {
var line:String?
while (true){
line = readLine() ?: break
println(line)
}
}
不能取消的任務(wù)
協(xié)程被取消后晦款,finally里面還有掛起函數(shù)炎功,可以用withContext(NonCancellable)
@Test
fun `test cancel with noncancellable`() = runBlocking {
val job = launch {
try {
repeat(1000){
println("job: i'm sleeping $it")
delay(500L)
}
}finally {
//不用withContext(NonCancellable)枚冗,delay后面的打印不會(huì)執(zhí)行
withContext(NonCancellable){
println("running finally")
delay(1000L)
println("job: noncancellable")
}
}
}
delay(1300)
println("main: waiting")
job.cancelAndJoin()
println("main: i can quit")
}
超時(shí)任務(wù)
withTimeout()方法可以開啟超時(shí)任務(wù),默認(rèn)超時(shí)會(huì)拋出異常
/*
* 超時(shí)任務(wù)
* */
@Test
fun `test deal with timeout`() = runBlocking {
withTimeout(1300){
repeat(1000){
println("job: sleeping $it")
delay(500L)
}
}
}
如果不想拋出異常蛇损,可以用withTimeoutOrNull
/*
* 超時(shí)任務(wù)赁温,超時(shí)會(huì)返回null,不超時(shí)返回最后的done
* */
@Test
fun `test deal with timeout ornull`() = runBlocking {
val result = withTimeoutOrNull(1300){
repeat(1000){
println("job: sleeping $it")
delay(500L)
}
"done"
}
println("result: $result")
}