聊一聊線程池和Kotlin協(xié)程

目前很多開發(fā)組都用上協(xié)程來處理異步任務(wù)了筑辨,但是有的地方協(xié)程提供的原生API還是不足以應(yīng)付,比方說一些SDK提供了傳入Executor的接口(以便復(fù)用調(diào)用者的線程池來執(zhí)行異步任務(wù))扩然,這時(shí)候可以用這JDK提供的線程池,或者封裝一下協(xié)程也可以滿足需求。

協(xié)程提供了Dispatchers.DefaultDispatchers.IO 分別用于 計(jì)算密集型 任務(wù)和 IO密集型 任務(wù),類似于RxJava的 Schedulers.computation()Schedulers.io()渤刃。
但兩者有所差異,比如RxJava的 Schedulers.io() 不做并發(fā)限制贴膘,而 Dispatchers.io() 做了并發(fā)限制:

It defaults to the limit of 64 threads or the number of cores (whichever is larger)

考慮到當(dāng)前移動(dòng)設(shè)備的CPU核心數(shù)都不超過64卖子,所以可以認(rèn)為協(xié)程的 Dispatchers.IO 的最大并發(fā)為64。
Dispatchers.Default 的并發(fā)限制為:

By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two

考慮到目前Android設(shè)備核心數(shù)都在2個(gè)以上刑峡,所以可以認(rèn)為 Dispatchers.Default 的最大并發(fā)為 CPU cores洋闽。
Dispatchers.DefaultDispatchers.IO 是共享協(xié)程自己的線程池的,二者可以復(fù)用線程突梦。
不過目前這兩個(gè)Dispatchers 并未完全滿足項(xiàng)目中的需求诫舅,有時(shí)我們需要一些自定義的并發(fā)限制,其中最常見的是串行宫患。

RxJava有Schedulers.single() 刊懈,但這個(gè)Schedulers.single()和AsyncTask的SERAIL_EXECOTOR一樣,是全局串行,不同的任務(wù)處在同一個(gè)串行隊(duì)列虚汛,會(huì)相互堵塞匾浪,因而可能會(huì)引發(fā)問題。

或許也是因?yàn)檫@個(gè)原因卷哩,kotlin協(xié)程沒有定義“Dispatchers.Single"蛋辈。
對(duì)于需要串行的場(chǎng)景,可以這樣實(shí)現(xiàn):

val coroutineContext: CoroutineContext =
    Executors.newSingleThreadExecutor().asCoroutineDispatcher()

這樣可以實(shí)現(xiàn)局部的串行殉疼,但和協(xié)程的線程池是相互獨(dú)立的梯浪,不能復(fù)用線程。
線程池的好處:

  1. 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)瓢娜,無需等待線程創(chuàng)建即可立即執(zhí)行挂洛。
  2. 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗眠砾。
  3. 提高線程的可管理性:線程是稀缺資源虏劲,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源褒颈,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡柒巫,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配谷丸、調(diào)優(yōu)和監(jiān)控堡掏。

然彼此獨(dú)立創(chuàng)建線程池的話,會(huì)大打折扣刨疼。
如何既復(fù)用協(xié)程的線程池泉唁,又自主控制并發(fā)呢?
一個(gè)辦法就是套隊(duì)列來控制并發(fā)揩慕,然后還是任務(wù)還是執(zhí)行在線程池之上亭畜。
AsyncTask 就是這樣實(shí)現(xiàn)的:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

用SerialExecutor的execute的任務(wù)會(huì)先進(jìn)入隊(duì)列,當(dāng)mActive為空時(shí)從隊(duì)列獲取任務(wù)賦值給mActive然后通過線程池 THREAD_POOL_EXECUTOR執(zhí)行迎卤。
當(dāng)然AsyncTask 的SerialExecutor是全局唯一的拴鸵,所以會(huì)有上面提到的各種任務(wù)相互堵塞的問題∥仙Γ可以通過創(chuàng)建不同是的SerialExecutor實(shí)例來達(dá)到各業(yè)務(wù)各自串行劲藐。

在Kotlin環(huán)境下,我們可以利用協(xié)程和Channel來實(shí)現(xiàn):

fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        send(0)
        CoroutineScope(Dispatchers.IO).launch {
            block()
            receive()
        }
    }
}

// 使用方法
private val serialChannel = Channel<Any>(1)
serialChannel.runBlock {
    // do somthing
}

添加Log編寫測(cè)試如下:

private val a = AtomicInteger(0)
private val b = AtomicInteger(0)
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        Log.d("MyTag", "before send " + a.getAndIncrement() + getTime())
        send(0)
        Log.i("MyTag", "after send " + b.getAndIncrement() + getTime())
        CoroutineScope(Dispatchers.Default).launch {
            block()
            receive()
        }
    }
}

private fun test() {
    // 并發(fā)限制為1樟凄,串行執(zhí)行任務(wù)
    val channel = Channel<Any>(1)
    val t1 = System.currentTimeMillis()
    repeat(4) { x ->
        channel.runBlock {
            Thread.sleep(1000L)
            Log.w("MyTag", "$x done job" + getTime())
        }
    }

    CoroutineScope(Dispatchers.Default).launch {
        while (!channel.isEmpty) {
            delay(200)
        }
        val t2 = System.currentTimeMillis()
        Log.d("MyTag", "Jobs all done, use time:" + (t2 - t1))
    }
}

執(zhí)行結(jié)果:

第一個(gè)任務(wù)可以順利通過send(), 而隨后的任務(wù)被suspend, 直到前面的任務(wù)執(zhí)行完(執(zhí)行block)瘩燥,調(diào)用recevie(), 然后下一個(gè)任務(wù)通過send() ……依此類推。
最終不同,消耗4s完成任務(wù)厉膀。

如果Channel的參數(shù)改成2溶耘,則能有兩個(gè)任務(wù)可以通過send() :

最終,消耗2s完成任務(wù)服鹅。

關(guān)于參數(shù)可以參考Channel的構(gòu)造函數(shù):

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

在前面的實(shí)現(xiàn)中凳兵, 我們關(guān)注UNLIMITED, BUFFERED 以及 capacity > 0 的情況即可:

  • UNLIMITED: 不做限制企软;
  • BUFFERED: 并發(fā)數(shù)由 kotlin "kotlinx.coroutines.channels.defaultBuffer"決定庐扫,目前測(cè)試得到8;
  • capacity > 0, 則并發(fā)數(shù)由 capacity 決定仗哨;
  • 特別地形庭,當(dāng)capacity = 1,為串行調(diào)度厌漂。

不過萨醒,[Dispatchers.IO] 本身有并發(fā)限制(目前版本是64),
所有對(duì)于 Channel.UNLIMITED 和 capacity > 64 的情況苇倡,和capacity=64的情況是相同的富纸。
我們可以為不同的業(yè)務(wù)創(chuàng)建不同的Channel實(shí)例,從而各自控制并發(fā)且最終在協(xié)程的線程池上執(zhí)行任務(wù)旨椒。
簡(jiǎn)要示意圖如下:

為了簡(jiǎn)化晓褪,我們假設(shè)Dispatchers的并發(fā)限制為4。

  • 不同Channel有各自的buffer, 當(dāng)任務(wù)小于capacity時(shí)進(jìn)入buffer, 大于capacity時(shí)新任務(wù)被suspend综慎。
  • Dispatchers 不斷地執(zhí)行任務(wù)然后調(diào)用receive(), 上面的實(shí)現(xiàn)中涣仿,receive并非要取什么信息,僅僅是讓channel空出buffer, 好讓被suspend的任務(wù)可以通過send()然后進(jìn)入Dispatchers的調(diào)度示惊。
  • 極端情況下(進(jìn)入Disptachers的任務(wù)大于并發(fā)限制時(shí))好港,任務(wù)進(jìn)入Dispatchers也不會(huì)被立即執(zhí)行,這個(gè)設(shè)定可以避免開啟的線程太多而陷于線程上下文頻繁切換的困境涝涤。

通過Channel可以實(shí)現(xiàn)并發(fā)的控制,但是日常開發(fā)中有的地方并不是簡(jiǎn)單地執(zhí)行個(gè)任務(wù)岛杀,而是需要一個(gè)ExecutorService或者Executor阔拳。
我們可以通過Channel封裝一下:

fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        send(0)
        CoroutineScope(Dispatchers.IO).launch {
            block()
            receive()
        }
    }
}


class ChannelExecutor(capacity: Int) : Executor {
    private val channel = Channel<Any>(capacity)

    override fun execute(command: Runnable) {
        channel.runBlock {
            command.run()
        }
    }
}


class ChannelExecutorService(capacity: Int) : AbstractExecutorService() {
    private val channel = Channel<Any>(capacity)

    override fun execute(command: Runnable) {
        channel.runBlock {
            command.run()
        }
    }

    fun isEmpty(): Boolean {
        return channel.isEmpty || channel.isClosedForReceive
    }

    override fun shutdown() {
        channel.close()
    }

    override fun shutdownNow(): MutableList<Runnable> {
        shutdown()
        return mutableListOf()
    }

    @ExperimentalCoroutinesApi
    override fun isShutdown(): Boolean {
        return channel.isClosedForSend
    }

    @ExperimentalCoroutinesApi
    override fun isTerminated(): Boolean {
        return channel.isClosedForReceive
    }

    override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
        var millis = unit.toMillis(timeout)
        while (!isTerminated && millis > 0) {
            try {
                Thread.sleep(200L)
                millis -= 200L
            } catch (ignore: Exception) {
            }
        }
        return isTerminated
    }
}

需要簡(jiǎn)單地控制并發(fā)的地方,直接定義Channel然后調(diào)用runBlock即可类嗤;
需要Executor的地方糊肠,可創(chuàng)建ChannelExecutor來執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末遗锣,一起剝皮案震驚了整個(gè)濱河市货裹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌精偿,老刑警劉巖弧圆,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赋兵,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡搔预,警方通過查閱死者的電腦和手機(jī)霹期,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拯田,“玉大人历造,你說我怎么就攤上這事〈樱” “怎么了吭产?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)鸭轮。 經(jīng)常有香客問我臣淤,道長(zhǎng),這世上最難降的妖魔是什么张弛? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任荒典,我火速辦了婚禮,結(jié)果婚禮上吞鸭,老公的妹妹穿的比我還像新娘寺董。我一直安慰自己,他們只是感情好刻剥,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布遮咖。 她就那樣靜靜地躺著,像睡著了一般造虏。 火紅的嫁衣襯著肌膚如雪御吞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天漓藕,我揣著相機(jī)與錄音陶珠,去河邊找鬼。 笑死享钞,一個(gè)胖子當(dāng)著我的面吹牛揍诽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播栗竖,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼暑脆,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了狐肢?” 一聲冷哼從身側(cè)響起添吗,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎份名,沒想到半個(gè)月后碟联,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妓美,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年玄帕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了部脚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡裤纹,死狀恐怖委刘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鹰椒,我是刑警寧澤锡移,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站漆际,受9級(jí)特大地震影響淆珊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜奸汇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一施符、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧擂找,春花似錦戳吝、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至塘雳,卻和暖如春陆盘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背败明。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工隘马, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人妻顶。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓酸员,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親盈包。 傳聞我的和親對(duì)象是個(gè)殘疾皇子沸呐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容