協(xié)程
大家如果已經(jīng)使用Kotlin語(yǔ)言進(jìn)行開(kāi)發(fā)液走,對(duì)協(xié)程這個(gè)概念應(yīng)該不會(huì)很陌生烂完。雖然在網(wǎng)上有很多Kotlin協(xié)程相關(guān)的文章设哗,但當(dāng)我開(kāi)始準(zhǔn)備使用的時(shí)候,還是有如下幾個(gè)疑慮舶胀。
- 協(xié)程到底能夠解決什么樣的問(wèn)題概说?
- 協(xié)程和我們常用的Executor、RxJava有什么區(qū)別嚣伐?
- 項(xiàng)目上使用有什么風(fēng)險(xiǎn)嗎糖赔?
接下來(lái)就帶著這幾個(gè)問(wèn)題一起來(lái)揭開(kāi)協(xié)程神秘的面紗。
如何使用
關(guān)于協(xié)程轩端,我在網(wǎng)上看到最多的說(shuō)法是協(xié)程是輕量級(jí)的線程放典。那么協(xié)程首先應(yīng)該解決的問(wèn)題就是程序中我們常常遇到的 “異步” 的問(wèn)題。我們看看官網(wǎng)介紹的幾個(gè)使用例子。
依賴(lài)
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3'
入門(mén)
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程并繼續(xù)
delay(1000L)
println("World!")
}
println("Hello,") // 主線程中的代碼會(huì)立即執(zhí)行
runBlocking { // 但是這個(gè)表達(dá)式阻塞了主線程
delay(2000L) // ……我們延遲 2 秒來(lái)保證 JVM 的存活
}
}
掛起函數(shù)
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假設(shè)我們?cè)谶@里做了一些有用的事
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假設(shè)我們?cè)谶@里也做了一些有用的事
return 29
}
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
結(jié)果:
The answer is 42
Completed in 2015 ms
使用 async 并發(fā)
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
結(jié)果:
The answer is 42
Completed in 1017 ms
單元測(cè)試
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 這里我們可以使用任何喜歡的斷言風(fēng)格來(lái)使用掛起函數(shù)
}
}
更新詳細(xì)的使用可參考官網(wǎng)示例
為何使用
既然已經(jīng)有這么多異步處理的框架奋构,那我們?yōu)楹芜€要使用協(xié)程壳影。這里舉個(gè)例子,看看對(duì)同個(gè)需求弥臼,不同異步框架的處理方式宴咧。
現(xiàn)在有一個(gè)產(chǎn)品需求,生成一個(gè)二維碼在頁(yè)面展示給用戶(hù)径缅。我們來(lái)對(duì)比看看不同的做法掺栅。
Thread
Thread(Runnable {
try {
val qrCode: Bitmap =
CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
runOnUiThread {
img_qr_code.setImageBitmap(qrCode)
}
} catch (e: WriterException) {
e.printStackTrace()
}
}).start()
}
Executors
Executors.newSingleThreadExecutor().execute {
try {
val qrCode: Bitmap =
CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
runOnUiThread {
img_qr_code.setImageBitmap(qrCode)
}
} catch (e: WriterException) {
e.printStackTrace()
}
}
RxJava
Observable.just(SHARE_QR_CODE)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
return CodeCreator.createQRCode(ShareActivity.this, s);
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
img_qr_code.setImageBitmap(bitmap);
}
});
Koroutine
val job = GlobalScope.launch(Dispatchers.IO) {
val bitmap = CodeCreator.createQRCode(ShareActivity.this, SHARE_QR_CODE)
launch(Dispatchers.Main) {
img_qr_code.setImageBitmap(bitmap)
}
}
}
通過(guò)這個(gè)例子,可以看出使用協(xié)程的非常方便解決 "異步回調(diào)" 問(wèn)題纳猪。
相比傳統(tǒng)的Thread及Excutors氧卧,RxJava將嵌套回調(diào)轉(zhuǎn)換成鏈?zhǔn)秸{(diào)用的形式,提高了代碼可讀性兆旬。協(xié)程直接將鏈?zhǔn)秸{(diào)用轉(zhuǎn)換成了協(xié)程內(nèi)的順序調(diào)用假抄,"代碼更加精簡(jiǎn)"。
性能
官網(wǎng)上對(duì)于協(xié)程的有一句介紹丽猬。
本質(zhì)上宿饱,協(xié)程是輕量級(jí)的線程
那么協(xié)程的執(zhí)行效率到底怎么樣呢?下面我們采用官網(wǎng)的示例在相同的環(huán)境和設(shè)備下做下對(duì)比脚祟。
啟動(dòng)了 1000個(gè)協(xié)程谬以,并且為每個(gè)協(xié)程都輸出一個(gè)點(diǎn)
Coroutine
var startTime = System.currentTimeMillis()
repeat(times) { i -> // 啟動(dòng)大量的協(xié)程
GlobalScope.launch(Dispatchers.IO) {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
執(zhí)行結(jié)果:endTime=239 ms
Thread
var startTime = System.currentTimeMillis()
repeat(times) { i ->// 啟動(dòng)大量的線程
Thread(Runnable {
Log.d(this@MainActivity.toString(), "$i=.")
}).start()
}
var endTime = System.currentTimeMillis() - startTime;
執(zhí)行結(jié)果:endTime=3161 ms
Excutors
var startTime = System.currentTimeMillis()
var executors = Executors.newCachedThreadPool()
repeat(times) { i -> // 使用線程池
executors.execute {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
執(zhí)行結(jié)果:endTime=143 ms
rxjava
var startTime = System.currentTimeMillis()
repeat(times) { i -> // 啟動(dòng)Rxjava
Observable.just("").subscribeOn(Schedulers.io())
.subscribe {
Log.d(this@MainActivity.toString(), "$i=.")
}
}
var endTime = System.currentTimeMillis() - startTime;
Log.d(this@MainActivity.toString(), "endTime=$endTime")
執(zhí)行結(jié)果:endTime=241 ms
源碼工程:CorountineTest
Profiler
利用AS自帶的Profiler對(duì)運(yùn)行時(shí)的CPU狀態(tài)進(jìn)行檢測(cè),我們可以看到Thread對(duì)CPU的消耗比較大由桌,Koroutine为黎、Executor、RxJava的消耗基本差不多行您。
總結(jié)
從執(zhí)行時(shí)間和Profiler上看铭乾,Coroutine比使用Thread性能提升了一個(gè)量級(jí),但與Excutor和RxJava性能是在一個(gè)量級(jí)上娃循。
注意這里的例子為了簡(jiǎn)便炕檩,因?yàn)楫惒綀?zhí)行的時(shí)間基本和repeat的時(shí)間差不多,我們沒(méi)有等所有異步執(zhí)行完再打印時(shí)間捌斧,這里們不追求精確的時(shí)間笛质,只為做量級(jí)上的對(duì)比。
實(shí)現(xiàn)機(jī)制
協(xié)程底層異步實(shí)現(xiàn)機(jī)制
我們先來(lái)看一段簡(jiǎn)單的Kotlin程序捞蚂。
GlobalScope.launch(Dispatchers.IO) {
print("hello world")
}
我們接著看下launch的實(shí)現(xiàn)代碼妇押。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
這里注意,我們通過(guò)追蹤最后的繼承關(guān)系發(fā)現(xiàn)姓迅,DefaultScheduler.IO最后也是一個(gè)CoroutineContext敲霍。
接著發(fā)現(xiàn)繼續(xù)看coroutine.start的實(shí)現(xiàn)俊马,如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
接著繼續(xù)看CoroutineStart的start策略,如下:
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
繼續(xù)看startCoroutineCancellable方法肩杈,如下:
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
繼續(xù)看resumeCancellableWith方法實(shí)現(xiàn):
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
最后發(fā)現(xiàn)調(diào)用的resumeCancellableWith方法實(shí)現(xiàn)如下:
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
這里關(guān)鍵的觸發(fā)方法在這個(gè)地方
dispatcher.dispatch(context, this)
我們看 DefaultScheduler.IO最后的dispatch方法:
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
這里我們最終發(fā)現(xiàn)是調(diào)用了CoroutineScheduler的dispatch方法潭袱,繼續(xù)看CoroutineScheduler的實(shí)現(xiàn)發(fā)現(xiàn),CoroutineScheduler繼承了Executor锋恬。
通過(guò)dispatch的調(diào)用最后可以發(fā)現(xiàn)CoroutineScheduler其實(shí)就是對(duì)Worker的調(diào)度屯换,我們看看Worker的定義。
internal inner class Worker private constructor() : Thread()
通過(guò)這里我們發(fā)現(xiàn)另外一個(gè)老朋友Thread与学,所以到這里也符合上面性能驗(yàn)證的測(cè)試結(jié)果彤悔。
到這里我們也有結(jié)論了,協(xié)程異步實(shí)現(xiàn)機(jī)制本質(zhì)也就是自定義的線程池索守。
非阻塞式掛起 suspend
suspend有什么作用晕窑,如何做到異步不用回調(diào)?下面先定義一個(gè)最簡(jiǎn)單的suspend方法卵佛。
suspend fun hello(){
delay(100)
print("hello world")
}
通過(guò)Kotlin Bytecode轉(zhuǎn)換為java 代碼如下:
@Nullable
public final Object hello(@NotNull Continuation $completion) {
Object $continuation;
label20: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Test.this.hello(this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = this;
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(100L, (Continuation)$continuation) == var6) {
return var6;
}
break;
case 1:
Test var7 = (Test)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String var2 = "hello world";
boolean var3 = false;
System.out.print(var2);
return Unit.INSTANCE;
}
這里首先我們發(fā)現(xiàn)方法的參數(shù)多了一個(gè)Continuation completion并且內(nèi)部回定義一個(gè) Object continuation杨赤,看看Continuation的定義。
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
這是一個(gè)回調(diào)接口截汪,里面有一個(gè)關(guān)鍵的方法為resumeWith疾牲。 這個(gè)方法的具體調(diào)用通過(guò)上面的協(xié)程調(diào)用流程可以知道 ,在DispatchedContinuation的resumeCancellableWith會(huì)觸發(fā)衙解。
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
那么resumeWith里面做了那些事情阳柔?我們看下具體的實(shí)現(xiàn)在ContinuationImpl的父類(lèi)BaseContinuationImpl中。
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
首先我們發(fā)現(xiàn)這里其實(shí)是一個(gè)遞歸的循環(huán)蚓峦,并且會(huì)調(diào)用invokeSuspend方法觸發(fā)實(shí)際的調(diào)用舌剂,等待返回結(jié)果。通過(guò)上面的分析可以看出2點(diǎn)暑椰。
- 非阻塞是因?yàn)楸旧韱?dòng)一個(gè)協(xié)程也是使用線程池異步執(zhí)行霍转,所以不會(huì)阻塞
- 協(xié)程并不是沒(méi)有回調(diào),而是將回調(diào)的接口(Continuation)及調(diào)度代碼在編譯器生成一汽,不用自己編寫(xiě)避消。
- resumeWith是一個(gè)循環(huán)及遞歸,所以會(huì)將協(xié)程內(nèi)定義的表達(dá)式順序串聯(lián)調(diào)用角虫。達(dá)到掛起及恢復(fù)的鏈?zhǔn)秸{(diào)用沾谓。
總結(jié)
- 協(xié)程到底能夠解決什么樣的問(wèn)題委造?
- 解決異步回調(diào)嵌套
- 解決異步任務(wù)之間協(xié)作
- 協(xié)程和我們常用的Executor戳鹅、RxJava有什么區(qū)別?
- 從任務(wù)調(diào)度上看昏兆,本質(zhì)都是線程池的封裝
- 項(xiàng)目上使用有什么風(fēng)險(xiǎn)嗎枫虏?
- 從性能上看與線程池與RxJava在一個(gè)量級(jí)
- 目前已是穩(wěn)定版本1.3.3,開(kāi)源項(xiàng)目使用多
- 代碼使用簡(jiǎn)便,可維護(hù)性高
- 開(kāi)源生態(tài)支持良好隶债,方便使用(Retrofit腾它、Jitpack已支持)
- 團(tuán)隊(duì)學(xué)習(xí)及舊項(xiàng)目改造需要投入一定成本
參考資料
關(guān)于
歡迎關(guān)注我的個(gè)人公眾號(hào)
微信搜索:一碼一浮生,或者搜索公眾號(hào)ID:life2code