Kotlin通道

通道

延期的值提供了?種便捷的?法使單個(gè)值在多個(gè)協(xié)程之間進(jìn)?相互傳輸。通道提供了?種在流中傳輸值的?法屹逛。

通道基礎(chǔ)

?個(gè) Channel 是?個(gè)和 BlockingQueue ?常相似的概念础废。其中?個(gè)不同是它代替了阻塞的 put 操作并提供了掛起的send,還替代了阻塞的 take 操作并提供了掛起的 `receive罕模。

val channel = Channel<Int>()
launch {
// 這?可能是消耗?量 CPU 運(yùn)算的異步邏輯评腺,我們將僅僅做 5 次整數(shù)的平?并發(fā)送
for (x in 1..5) channel.send(x * x)
}
// 這?我們打印了 5 次被接收的整數(shù):
repeat(5) { println(channel.receive()) }
println("Done!")

這段代碼的輸出如下:

1
4
9
16
25
Done!
關(guān)閉與迭代通道

和隊(duì)列不同,?個(gè)通道可以通過(guò)被關(guān)閉來(lái)表明沒有更多的元素將會(huì)進(jìn)?通道淑掌。在接收者中可以定期的使? for 循環(huán)
來(lái)從通道中接收元素蒿讥。
從概念上來(lái)說(shuō),?個(gè)close 操作就像向通道發(fā)送了?個(gè)特殊的關(guān)閉指令锋拖。這個(gè)迭代停?就說(shuō)明關(guān)閉指令已經(jīng)被接收了。
所以這?保證所有先前發(fā)送出去的元素都在通道關(guān)閉前被接收到祸轮。

val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我們結(jié)束發(fā)送
}
// 這?我們使? `for` 循環(huán)來(lái)打印所有被接收到的元素(直到通道被關(guān)閉)
for (y in channel) println(y)
println("Done!")

構(gòu)建通道?產(chǎn)者

協(xié)程?成?系列元素的模式很常?兽埃。這是 ?產(chǎn)者?消費(fèi)者 模式的?部分,并且經(jīng)常能在并發(fā)的代碼中看到它适袜。你可
以將?產(chǎn)者抽象成?個(gè)函數(shù)柄错,并且使通道作為它的參數(shù),但這與必須從函數(shù)中返回結(jié)果的常識(shí)相違悖苦酱。
這?有?個(gè)名為 produce 的便捷的協(xié)程構(gòu)建器

val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
管道

管道是?種?個(gè)協(xié)程在流中開始?產(chǎn)可能?窮多個(gè)元素的模式:

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中開始從 1 ?產(chǎn)?窮多個(gè)整數(shù)
}

并且另?個(gè)或多個(gè)協(xié)程開始消費(fèi)這些流售貌,做?些操作,并?產(chǎn)了?些額外的結(jié)果疫萤。在下?的例?中颂跨,對(duì)這些數(shù)字僅僅做
了平?操作:

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}

主要的代碼啟動(dòng)并連接了整個(gè)管道:

val numbers = produceNumbers() // 從 1 開始?成整數(shù)
val squares = square(numbers) // 整數(shù)求平?
repeat(5) {
println(squares.receive()) // 輸出前五個(gè)
}
println("Done!") // ?此已完成
coroutineContext.cancelChildren() // 取消?協(xié)程

使?管道的素?cái)?shù)

讓我們來(lái)展??個(gè)極端的例??在協(xié)程中使??個(gè)管道來(lái)?成素?cái)?shù)。我們開啟了?個(gè)數(shù)字的?限序列扯饶。

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 開啟了?個(gè)?限的整數(shù)流
}

下?的例?打印了前?個(gè)素?cái)?shù)恒削,在主線程的上下?中運(yùn)?整個(gè)管道。直到所有的協(xié)程在該主協(xié)程 runBlocking 的作?
域中被啟動(dòng)完成尾序。我們不必使??個(gè)顯式的列表來(lái)保存所有被我們已經(jīng)啟動(dòng)的協(xié)程钓丰。我們使? cancelChildren 擴(kuò)展
函數(shù)在我們打印了前?個(gè)素?cái)?shù)以后來(lái)取消所有的?協(xié)程。

var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的?協(xié)程來(lái)讓主協(xié)程結(jié)束

注意每币,你可以在標(biāo)準(zhǔn)庫(kù)中使? iterator 協(xié)程構(gòu)建器來(lái)構(gòu)建?個(gè)相似的管道携丁。使? iterator 替換
produceyield 替換 send 兰怠、next 替換 receive 梦鉴、Iterator 替換ReceiveChannel 來(lái)擺脫協(xié)程作?域李茫,
你將不再需要 runBlocking 。然?尚揣,如上所?涌矢,如果你在 Dispatchers.Default 上下?中運(yùn)?它,使?通道的管道的
好處在于它可以充分利?多核? CPU快骗。

扇出

多個(gè)協(xié)程也許會(huì)接收相同的管道娜庇,在它們之間進(jìn)?分布式?作。讓我們啟動(dòng)?個(gè)定期產(chǎn)?整數(shù)的?產(chǎn)者協(xié)程(每秒?
個(gè)數(shù)字):

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 從 1 開始
while (true) {
send(x++) // 產(chǎn)?下?個(gè)數(shù)字
delay(100) // 等待 0.1 秒
}
}

接下來(lái)我們可以得到?個(gè)處理器協(xié)程方篮。在這個(gè)?例中名秀,它們只是打印它們的 id 和接收到的數(shù)字:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

現(xiàn)在讓我們啟動(dòng)五個(gè)處理器協(xié)程并讓它們?作將近?秒∨航Γ看看發(fā)?了什么:

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消協(xié)程?產(chǎn)者從?將它們?nèi)繗⑺?

該輸出將類似于如下所?匕得,盡管接收每個(gè)特定整數(shù)的處理器 id 可能會(huì)不同:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

注意,取消?產(chǎn)者協(xié)程將關(guān)閉它的通道巾表,從?最終終?處理器協(xié)程正在執(zhí)?的此通道上的迭代汁掠。
還有,注意我們?nèi)绾问?for 循環(huán)顯式迭代通道以在 launchProcessor 代碼中執(zhí)?扇出集币。與 consumeEach
同考阱,這個(gè)for循環(huán)是安全完美地使?多個(gè)協(xié)程的。如果其中?個(gè)處理器協(xié)程執(zhí)?失敗鞠苟,其它的處理器協(xié)程仍然會(huì)繼續(xù)
處理通道乞榨,?通過(guò) consumeEach 編寫的處理器始終在正常或?正常完成時(shí)消耗(取消)底層通道

扇?

多個(gè)協(xié)程可以發(fā)送到同?個(gè)通道当娱。?如說(shuō)吃既,讓我們創(chuàng)建?個(gè)字符串的通道,和?個(gè)在這個(gè)通道中以指定的延遲反復(fù)發(fā)
送?個(gè)指定字符串的掛起函數(shù):

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}

現(xiàn)在跨细,我們啟動(dòng)了?個(gè)發(fā)送字符串的協(xié)程鹦倚,讓我們看看會(huì)發(fā)?什么(在?例中,我們?cè)谥骶€程的上下?中作為主協(xié)程的
?協(xié)程來(lái)啟動(dòng)它們):

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六個(gè)
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有?協(xié)程來(lái)讓主協(xié)程結(jié)束

輸出如下:

foo
foo
BAR!
foo
foo
BAR!

帶緩沖的通道
到?前為?展?的通道都是沒有緩沖區(qū)的冀惭。?緩沖的通道在發(fā)送者和接收者相遇時(shí)傳輸元素(也稱“對(duì)接”)申鱼。如果發(fā)送
先被調(diào)?,則它將被掛起直到接收被調(diào)?云头,如果接收先被調(diào)?捐友,它將被掛起直到發(fā)送被調(diào)?。
Channel() ??函數(shù)與 produce 建造器通過(guò)?個(gè)可選的參數(shù) capacity 來(lái)指定 緩沖區(qū)?? 溃槐。緩沖允許發(fā)送者在被
掛起前發(fā)送多個(gè)元素匣砖,就像 BlockingQueue 有指定的容量?樣,當(dāng)緩沖區(qū)被占滿的時(shí)候?qū)?huì)引起阻塞。
看看如下代碼的表現(xiàn):

val channel = Channel<Int>(4) // 啟動(dòng)帶緩沖的通道
val sender = launch { // 啟動(dòng)發(fā)送者協(xié)程
repeat(10) {
println("Sending $it") // 在每?個(gè)元素發(fā)送前打印它們
channel.send(it) // 將在緩沖區(qū)被占滿時(shí)掛起
}
}
// 沒有接收到東西……只是等待……
delay(1000)
sender.cancel() // 取消發(fā)送者協(xié)程

使?緩沖通道并給 capacity 參數(shù)傳? 四 它將打印sending五 次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

前四個(gè)元素被加?到了緩沖區(qū)并且發(fā)送者在試圖發(fā)送第五個(gè)元素的時(shí)候被掛起猴鲫。

通道是公平的

發(fā)送和接收操作是 公平的 并且尊重調(diào)?它們的多個(gè)協(xié)程对人。它們遵守先進(jìn)先出原則,可以看到第?個(gè)協(xié)程調(diào)?
receive 并得到了元素拂共。在下?的例?中兩個(gè)協(xié)程“乒”和“乓”都從共享的“桌?”通道接收到這個(gè)“球”元素牺弄。

data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // ?個(gè)共享的 table(桌?)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 乒乓球
delay(1000) // 延遲 1 秒鐘
coroutineContext.cancelChildren() // 游戲結(jié)束,取消它們
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循環(huán)中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待?段時(shí)間
table.send(ball) // 將球發(fā)送回去
}
}

“乒”協(xié)程?先被啟動(dòng)宜狐,所以它?先接收到了球势告。甚?雖然“乒”協(xié)程在將球發(fā)送會(huì)桌?以后?即開始接收,但是球還是
被“乓”協(xié)程接收了抚恒,因?yàn)樗?直在等待著接收球:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

注意咱台,有時(shí)候通道執(zhí)?時(shí)由于執(zhí)?者的性質(zhì)?看起來(lái)不那么公平

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市俭驮,隨后出現(xiàn)的幾起案子回溺,更是在濱河造成了極大的恐慌,老刑警劉巖混萝,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遗遵,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡逸嘀,警方通過(guò)查閱死者的電腦和手機(jī)车要,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)厘熟,“玉大人屯蹦,你說(shuō)我怎么就攤上這事维哈∩蹋” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵阔挠,是天一觀的道長(zhǎng)飘庄。 經(jīng)常有香客問(wèn)我,道長(zhǎng)购撼,這世上最難降的妖魔是什么跪削? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮迂求,結(jié)果婚禮上碾盐,老公的妹妹穿的比我還像新娘。我一直安慰自己揩局,他們只是感情好毫玖,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般付枫。 火紅的嫁衣襯著肌膚如雪烹玉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天阐滩,我揣著相機(jī)與錄音二打,去河邊找鬼。 笑死掂榔,一個(gè)胖子當(dāng)著我的面吹牛继效,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播衅疙,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼莲趣,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了饱溢?” 一聲冷哼從身側(cè)響起喧伞,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绩郎,沒想到半個(gè)月后潘鲫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肋杖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年溉仑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片状植。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡浊竟,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出津畸,到底是詐尸還是另有隱情振定,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布肉拓,位于F島的核電站后频,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏暖途。R本人自食惡果不足惜卑惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望驻售。 院中可真熱鬧露久,春花似錦、人聲如沸欺栗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至镇草,卻和暖如春眶痰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背梯啤。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工竖伯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人因宇。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓七婴,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親察滑。 傳聞我的和親對(duì)象是個(gè)殘疾皇子打厘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容