目前很多開發(fā)組都用上協(xié)程來處理異步任務(wù)了筑辨,但是有的地方協(xié)程提供的原生API還是不足以應(yīng)付,比方說一些SDK提供了傳入Executor的接口(以便復(fù)用調(diào)用者的線程池來執(zhí)行異步任務(wù))扩然,這時(shí)候可以用這JDK提供的線程池,或者封裝一下協(xié)程也可以滿足需求。
協(xié)程提供了Dispatchers.Default 和 Dispatchers.IO 分別用于 計(jì)算密集型 任務(wù)和 IO密集型 任務(wù),類似于RxJava的 Schedulers.computation() 和 Schedulers.io()渤刃。
但兩者有所差異,比如RxJava的 Schedulers.io() 不做并發(fā)限制贴膘,而 Dispatchers.io() 做了并發(fā)限制:
It defaults to the limit of 64 threads or the number of cores (whichever is larger)
考慮到當(dāng)前移動(dòng)設(shè)備的CPU核心數(shù)都不超過64卖子,所以可以認(rèn)為協(xié)程的 Dispatchers.IO 的最大并發(fā)為64。
Dispatchers.Default 的并發(fā)限制為:
By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two
考慮到目前Android設(shè)備核心數(shù)都在2個(gè)以上刑峡,所以可以認(rèn)為 Dispatchers.Default 的最大并發(fā)為 CPU cores洋闽。
Dispatchers.Default 和 Dispatchers.IO 是共享協(xié)程自己的線程池的,二者可以復(fù)用線程突梦。
不過目前這兩個(gè)Dispatchers 并未完全滿足項(xiàng)目中的需求诫舅,有時(shí)我們需要一些自定義的并發(fā)限制,其中最常見的是串行宫患。
RxJava有Schedulers.single() 刊懈,但這個(gè)Schedulers.single()和AsyncTask的SERAIL_EXECOTOR一樣,是全局串行,不同的任務(wù)處在同一個(gè)串行隊(duì)列虚汛,會(huì)相互堵塞匾浪,因而可能會(huì)引發(fā)問題。
或許也是因?yàn)檫@個(gè)原因卷哩,kotlin協(xié)程沒有定義“Dispatchers.Single"蛋辈。
對(duì)于需要串行的場(chǎng)景,可以這樣實(shí)現(xiàn):
val coroutineContext: CoroutineContext =
Executors.newSingleThreadExecutor().asCoroutineDispatcher()
這樣可以實(shí)現(xiàn)局部的串行殉疼,但和協(xié)程的線程池是相互獨(dú)立的梯浪,不能復(fù)用線程。
線程池的好處:
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)瓢娜,無需等待線程創(chuàng)建即可立即執(zhí)行挂洛。
- 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗眠砾。
- 提高線程的可管理性:線程是稀缺資源虏劲,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源褒颈,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡柒巫,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配谷丸、調(diào)優(yōu)和監(jiān)控堡掏。
然彼此獨(dú)立創(chuàng)建線程池的話,會(huì)大打折扣刨疼。
如何既復(fù)用協(xié)程的線程池泉唁,又自主控制并發(fā)呢?
一個(gè)辦法就是套隊(duì)列來控制并發(fā)揩慕,然后還是任務(wù)還是執(zhí)行在線程池之上亭畜。
AsyncTask 就是這樣實(shí)現(xiàn)的:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
用SerialExecutor的execute的任務(wù)會(huì)先進(jìn)入隊(duì)列,當(dāng)mActive為空時(shí)從隊(duì)列獲取任務(wù)賦值給mActive然后通過線程池 THREAD_POOL_EXECUTOR執(zhí)行迎卤。
當(dāng)然AsyncTask 的SerialExecutor是全局唯一的拴鸵,所以會(huì)有上面提到的各種任務(wù)相互堵塞的問題∥仙Γ可以通過創(chuàng)建不同是的SerialExecutor實(shí)例來達(dá)到各業(yè)務(wù)各自串行劲藐。
在Kotlin環(huán)境下,我們可以利用協(xié)程和Channel來實(shí)現(xiàn):
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
send(0)
CoroutineScope(Dispatchers.IO).launch {
block()
receive()
}
}
}
// 使用方法
private val serialChannel = Channel<Any>(1)
serialChannel.runBlock {
// do somthing
}
添加Log編寫測(cè)試如下:
private val a = AtomicInteger(0)
private val b = AtomicInteger(0)
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
Log.d("MyTag", "before send " + a.getAndIncrement() + getTime())
send(0)
Log.i("MyTag", "after send " + b.getAndIncrement() + getTime())
CoroutineScope(Dispatchers.Default).launch {
block()
receive()
}
}
}
private fun test() {
// 并發(fā)限制為1樟凄,串行執(zhí)行任務(wù)
val channel = Channel<Any>(1)
val t1 = System.currentTimeMillis()
repeat(4) { x ->
channel.runBlock {
Thread.sleep(1000L)
Log.w("MyTag", "$x done job" + getTime())
}
}
CoroutineScope(Dispatchers.Default).launch {
while (!channel.isEmpty) {
delay(200)
}
val t2 = System.currentTimeMillis()
Log.d("MyTag", "Jobs all done, use time:" + (t2 - t1))
}
}
執(zhí)行結(jié)果:
第一個(gè)任務(wù)可以順利通過send(), 而隨后的任務(wù)被suspend, 直到前面的任務(wù)執(zhí)行完(執(zhí)行block)瘩燥,調(diào)用recevie(), 然后下一個(gè)任務(wù)通過send() ……依此類推。
最終不同,消耗4s完成任務(wù)厉膀。
如果Channel的參數(shù)改成2溶耘,則能有兩個(gè)任務(wù)可以通過send() :
最終,消耗2s完成任務(wù)服鹅。
關(guān)于參數(shù)可以參考Channel的構(gòu)造函數(shù):
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
else -> ArrayChannel(capacity)
}
在前面的實(shí)現(xiàn)中凳兵, 我們關(guān)注UNLIMITED, BUFFERED 以及 capacity > 0 的情況即可:
- UNLIMITED: 不做限制企软;
- BUFFERED: 并發(fā)數(shù)由 kotlin "kotlinx.coroutines.channels.defaultBuffer"決定庐扫,目前測(cè)試得到8;
- capacity > 0, 則并發(fā)數(shù)由 capacity 決定仗哨;
- 特別地形庭,當(dāng)capacity = 1,為串行調(diào)度厌漂。
不過萨醒,[Dispatchers.IO] 本身有并發(fā)限制(目前版本是64),
所有對(duì)于 Channel.UNLIMITED 和 capacity > 64 的情況苇倡,和capacity=64的情況是相同的富纸。
我們可以為不同的業(yè)務(wù)創(chuàng)建不同的Channel實(shí)例,從而各自控制并發(fā)且最終在協(xié)程的線程池上執(zhí)行任務(wù)旨椒。
簡(jiǎn)要示意圖如下:
為了簡(jiǎn)化晓褪,我們假設(shè)Dispatchers的并發(fā)限制為4。
- 不同Channel有各自的buffer, 當(dāng)任務(wù)小于capacity時(shí)進(jìn)入buffer, 大于capacity時(shí)新任務(wù)被suspend综慎。
- Dispatchers 不斷地執(zhí)行任務(wù)然后調(diào)用receive(), 上面的實(shí)現(xiàn)中涣仿,receive并非要取什么信息,僅僅是讓channel空出buffer, 好讓被suspend的任務(wù)可以通過send()然后進(jìn)入Dispatchers的調(diào)度示惊。
- 極端情況下(進(jìn)入Disptachers的任務(wù)大于并發(fā)限制時(shí))好港,任務(wù)進(jìn)入Dispatchers也不會(huì)被立即執(zhí)行,這個(gè)設(shè)定可以避免開啟的線程太多而陷于線程上下文頻繁切換的困境涝涤。
通過Channel可以實(shí)現(xiàn)并發(fā)的控制,但是日常開發(fā)中有的地方并不是簡(jiǎn)單地執(zhí)行個(gè)任務(wù)岛杀,而是需要一個(gè)ExecutorService或者Executor阔拳。
我們可以通過Channel封裝一下:
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
send(0)
CoroutineScope(Dispatchers.IO).launch {
block()
receive()
}
}
}
class ChannelExecutor(capacity: Int) : Executor {
private val channel = Channel<Any>(capacity)
override fun execute(command: Runnable) {
channel.runBlock {
command.run()
}
}
}
class ChannelExecutorService(capacity: Int) : AbstractExecutorService() {
private val channel = Channel<Any>(capacity)
override fun execute(command: Runnable) {
channel.runBlock {
command.run()
}
}
fun isEmpty(): Boolean {
return channel.isEmpty || channel.isClosedForReceive
}
override fun shutdown() {
channel.close()
}
override fun shutdownNow(): MutableList<Runnable> {
shutdown()
return mutableListOf()
}
@ExperimentalCoroutinesApi
override fun isShutdown(): Boolean {
return channel.isClosedForSend
}
@ExperimentalCoroutinesApi
override fun isTerminated(): Boolean {
return channel.isClosedForReceive
}
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
var millis = unit.toMillis(timeout)
while (!isTerminated && millis > 0) {
try {
Thread.sleep(200L)
millis -= 200L
} catch (ignore: Exception) {
}
}
return isTerminated
}
}
需要簡(jiǎn)單地控制并發(fā)的地方,直接定義Channel然后調(diào)用runBlock即可类嗤;
需要Executor的地方糊肠,可創(chuàng)建ChannelExecutor來執(zhí)行。