kotlin<第十一篇>:Channel-通道

(1)基本用法

Channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列谨读,它可以用來(lái)連接協(xié)程信柿,實(shí)現(xiàn)不同協(xié)程的通信。
生產(chǎn)者/消費(fèi)者模式 (send - channel - receive)

Channel的基本用法如下:

runBlocking {

    val channel = Channel<Int>()
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        var i = 0
        while(true) {
            delay(1000)
            channel.send(++i)
            println("send $i")
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        while(true) {
            val element = channel.receive()
            println("receive $element")
        }
    }

    joinAll(producer, consumer)

}
(2)Channel的容量

Channel實(shí)際上就是一個(gè)隊(duì)列,隊(duì)列中一定存在緩沖區(qū)胸蛛,那么一旦這個(gè)緩沖區(qū)滿了习绢,
并且也一直沒(méi)有人調(diào)用receive取走數(shù)據(jù)渠抹,send就需要掛起。
故意讓接收端的節(jié)奏放慢闪萄,發(fā)現(xiàn)send總是會(huì)掛起梧却,直到receive之后才會(huì)繼續(xù)往下執(zhí)行。

Channel的默認(rèn)大小為0败去。

(3)迭代Channel

Channel本身確實(shí)像序列放航,所以我們?cè)谧x取的時(shí)候可以直接獲取一個(gè)Channel的iterator。

runBlocking {

    val channel = Channel<Int>(Channel.UNLIMITED)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        for (x in 1..5) {
            println("send ${x * x}")
            channel.send(x * x)
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        val iterator = channel.iterator()
        while(iterator.hasNext()) {
            val element = iterator.next()
            println("receive $element")
            delay(1000)
        }
    }

    joinAll(producer, consumer)

}

消費(fèi)者代碼也可以改成:

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive $element")
            delay(1000)
        }
    }
(4)produce與actor

構(gòu)造生產(chǎn)者與消費(fèi)者的便捷方法圆裕。
我們可以通過(guò)produce方法啟動(dòng)一個(gè)生產(chǎn)者協(xié)程广鳍,并返回一個(gè)ReceiveChannel,其他協(xié)程就可以用這個(gè)Channel來(lái)接收數(shù)據(jù)了吓妆。
反過(guò)來(lái)搜锰,我們可以用actor啟動(dòng)一個(gè)消費(fèi)者協(xié)程。

produce演示:

runBlocking {

    val receiveChannel = GlobalScope.produce<Int> {
        repeat(100) {
            delay(1000)
            send(it)
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in receiveChannel) {
            println("receive $element")
        }
    }
    consumer.join()
}

actor演示:

runBlocking {

    // 消費(fèi)者
    val sendChannel = GlobalScope.actor<Int> {
        while (true) {
            val element = receive()
            println("receive $element")
        }
    }

    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        repeat(100) {
            sendChannel.send(it)
            delay(1000)
        }
    }
    producer.join()
}
(5)Channel關(guān)閉

produce和actor返回的Channel都會(huì)隨著對(duì)應(yīng)的協(xié)程執(zhí)行完畢而關(guān)閉耿战,也正是這樣蛋叼,Channel才被稱為 熱數(shù)據(jù)流
對(duì)于一個(gè)Channel剂陡,如果我們調(diào)用了它的close方法狈涮,它會(huì)立即停止接收新元素,也就是說(shuō)這時(shí)它的 isClosedForSend 會(huì)立即返回true鸭栖,而由于Channel緩沖區(qū)的存在歌馍,這時(shí)可能還有一些元素沒(méi)有處理完,因此要等所有的元素都讀取之后 isClosedForReceive 才會(huì)返回true晕鹊。

Channel的生命周期最好由主導(dǎo)方來(lái)維護(hù)松却,建議 由主導(dǎo)的一方實(shí)現(xiàn)關(guān)閉

runBlocking {

    val channel = Channel<Int>(3)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        List(3) {
           channel.send(it)
           println("send $it")
        }
        channel.close()
        println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive:$element")
            delay(1000)
        }
        println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }
    joinAll(producer, consumer)
}
(6)BroadcastChannel

正常情況下溅话,一個(gè) 發(fā)送者 對(duì)應(yīng)著一個(gè) 接收者晓锻。
使用 BroadcastChannel 可以存在多個(gè)接收者。

runBlocking {

    val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        List(3) {
            delay(1000)
            broadcastChannel.send(it)
        }
        broadcastChannel.close()
    }

    List(3) { index->
        GlobalScope.launch {
            val receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                delay(1000)
                println("[#${index}] receive:$element")
            }
        }
    }.joinAll()

    producer.join()
}

BroadcastChannel<Int>(Channel.BUFFERED) 可以改成 Channel<Int>().broadcast(Channel.BUFFERED)飞几。

(7)await多路復(fù)用

什么是多路復(fù)用砚哆?
數(shù)據(jù)通信系統(tǒng)或計(jì)算機(jī)網(wǎng)絡(luò)系統(tǒng)中,傳輸媒體的帶寬或容量往往會(huì)大于傳輸單一信號(hào)的需求屑墨,為了有效地利用通信線路躁锁,希望 一個(gè)信道同時(shí)傳輸多路信號(hào)纷铣,這就是所謂多路復(fù)用技術(shù)。

復(fù)用多個(gè)await战转?
兩個(gè)API分別從網(wǎng)絡(luò)和本地緩存 獲取數(shù)據(jù)搜立,期望哪個(gè)先返回就先用哪個(gè)做展示。

request->server->response--
                         ----Select -> Response
request->server->response--
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 選擇執(zhí)行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}

select:誰(shuí)先返回槐秧,就選擇誰(shuí)儒拂。

(8)復(fù)用多個(gè)Channel
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}
(9)SelectClause

我們?cè)趺粗滥男┦录梢员籹elect呢? 其實(shí)所有能夠被select的事件都是selectClauseN 類型色鸳,包括:

  • selectClause0:對(duì)應(yīng)事件沒(méi)有返回值社痛,例如join沒(méi)有返回值,那么onJoin就是SelectClauseN類型命雀。使用時(shí)蒜哀,onJoin的參數(shù)是一個(gè)無(wú)參函數(shù)。
  • selectClause1:對(duì)應(yīng)事件有返回值吏砂,例如:onAwait和onReceive都是此類情況撵儿。
  • selectClause2:對(duì)應(yīng)事件有返回值,此外還需要一個(gè)額外的參數(shù)狐血,例如Channel.onSend有兩個(gè)參數(shù)淀歇,第一個(gè)是具體的數(shù)據(jù),第二個(gè)參數(shù)是發(fā)送成功的回調(diào)參數(shù)匈织。

-> 如果我們想要確認(rèn)掛起函數(shù)是否支持select浪默,只需要查看其是否存在對(duì)應(yīng)的SelectClauseN類型可回調(diào)即可。

selectClause0舉例:

fun main() {
    runBlocking {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job1 onJoin") }
            job2.onJoin { println("job2 onJoin") }
        }
    }
}

selectClause1舉例:

data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 選擇執(zhí)行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}

selectClause2舉例:

fun main() {
    runBlocking {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
    }
}

[完...]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缀匕,一起剝皮案震驚了整個(gè)濱河市纳决,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌乡小,老刑警劉巖阔加,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異满钟,居然都是意外死亡胜榔,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門湃番,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)夭织,“玉大人,你說(shuō)我怎么就攤上這事牵辣∷ぱⅲ” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵纬向,是天一觀的道長(zhǎng)择浊。 經(jīng)常有香客問(wèn)我,道長(zhǎng)逾条,這世上最難降的妖魔是什么琢岩? 我笑而不...
    開(kāi)封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮师脂,結(jié)果婚禮上担孔,老公的妹妹穿的比我還像新娘。我一直安慰自己吃警,他們只是感情好糕篇,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著酌心,像睡著了一般拌消。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上安券,一...
    開(kāi)封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天墩崩,我揣著相機(jī)與錄音,去河邊找鬼侯勉。 笑死鹦筹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的址貌。 我是一名探鬼主播铐拐,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼练对!你這毒婦竟也來(lái)了余舶?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤锹淌,失蹤者是張志新(化名)和其女友劉穎匿值,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赂摆,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡挟憔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了烟号。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绊谭。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖汪拥,靈堂內(nèi)的尸體忽然破棺而出达传,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布宪赶,位于F島的核電站宗弯,受9級(jí)特大地震影響隘截,放射性物質(zhì)發(fā)生泄漏畅哑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一奴曙、第九天 我趴在偏房一處隱蔽的房頂上張望欲主。 院中可真熱鬧邓厕,春花似錦、人聲如沸扁瓢。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)引几。三九已至昧互,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間她紫,已是汗流浹背硅堆。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贿讹,地道東北人渐逃。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像民褂,于是被迫代替她去往敵國(guó)和親茄菊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容