通道
延期的值提供了?種便捷的?法使單個(gè)值在多個(gè)協(xié)程之間進(jìn)?相互傳輸。通道提供了?種在流中傳輸值的?法屹逛。
通道基礎(chǔ)
?個(gè) Channel
是?個(gè)和 BlockingQueue
?常相似的概念础废。其中?個(gè)不同是它代替了阻塞的 put
操作并提供了掛起的send
,還替代了阻塞的 take
操作并提供了掛起的 `receive罕模。
val channel = Channel<Int>()
launch {
// 這?可能是消耗?量 CPU 運(yùn)算的異步邏輯评腺,我們將僅僅做 5 次整數(shù)的平?并發(fā)送
for (x in 1..5) channel.send(x * x)
}
// 這?我們打印了 5 次被接收的整數(shù):
repeat(5) { println(channel.receive()) }
println("Done!")
這段代碼的輸出如下:
1
4
9
16
25
Done!
關(guān)閉與迭代通道
和隊(duì)列不同,?個(gè)通道可以通過(guò)被關(guān)閉來(lái)表明沒有更多的元素將會(huì)進(jìn)?通道淑掌。在接收者中可以定期的使? for
循環(huán)
來(lái)從通道中接收元素蒿讥。
從概念上來(lái)說(shuō),?個(gè)close
操作就像向通道發(fā)送了?個(gè)特殊的關(guān)閉指令锋拖。這個(gè)迭代停?就說(shuō)明關(guān)閉指令已經(jīng)被接收了。
所以這?保證所有先前發(fā)送出去的元素都在通道關(guān)閉前被接收到祸轮。
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我們結(jié)束發(fā)送
}
// 這?我們使? `for` 循環(huán)來(lái)打印所有被接收到的元素(直到通道被關(guān)閉)
for (y in channel) println(y)
println("Done!")
構(gòu)建通道?產(chǎn)者
協(xié)程?成?系列元素的模式很常?兽埃。這是 ?產(chǎn)者?消費(fèi)者 模式的?部分,并且經(jīng)常能在并發(fā)的代碼中看到它适袜。你可
以將?產(chǎn)者抽象成?個(gè)函數(shù)柄错,并且使通道作為它的參數(shù),但這與必須從函數(shù)中返回結(jié)果的常識(shí)相違悖苦酱。
這?有?個(gè)名為 produce
的便捷的協(xié)程構(gòu)建器
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
管道
管道是?種?個(gè)協(xié)程在流中開始?產(chǎn)可能?窮多個(gè)元素的模式:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中開始從 1 ?產(chǎn)?窮多個(gè)整數(shù)
}
并且另?個(gè)或多個(gè)協(xié)程開始消費(fèi)這些流售貌,做?些操作,并?產(chǎn)了?些額外的結(jié)果疫萤。在下?的例?中颂跨,對(duì)這些數(shù)字僅僅做
了平?操作:
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
主要的代碼啟動(dòng)并連接了整個(gè)管道:
val numbers = produceNumbers() // 從 1 開始?成整數(shù)
val squares = square(numbers) // 整數(shù)求平?
repeat(5) {
println(squares.receive()) // 輸出前五個(gè)
}
println("Done!") // ?此已完成
coroutineContext.cancelChildren() // 取消?協(xié)程
使?管道的素?cái)?shù)
讓我們來(lái)展??個(gè)極端的例??在協(xié)程中使??個(gè)管道來(lái)?成素?cái)?shù)。我們開啟了?個(gè)數(shù)字的?限序列扯饶。
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 開啟了?個(gè)?限的整數(shù)流
}
下?的例?打印了前?個(gè)素?cái)?shù)恒削,在主線程的上下?中運(yùn)?整個(gè)管道。直到所有的協(xié)程在該主協(xié)程 runBlocking
的作?
域中被啟動(dòng)完成尾序。我們不必使??個(gè)顯式的列表來(lái)保存所有被我們已經(jīng)啟動(dòng)的協(xié)程钓丰。我們使? cancelChildren
擴(kuò)展
函數(shù)在我們打印了前?個(gè)素?cái)?shù)以后來(lái)取消所有的?協(xié)程。
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的?協(xié)程來(lái)讓主協(xié)程結(jié)束
注意每币,你可以在標(biāo)準(zhǔn)庫(kù)中使? iterator
協(xié)程構(gòu)建器來(lái)構(gòu)建?個(gè)相似的管道携丁。使? iterator
替換
produce
、yield
替換 send
兰怠、next 替換 receive
梦鉴、Iterator
替換ReceiveChannel
來(lái)擺脫協(xié)程作?域李茫,
你將不再需要 runBlocking
。然?尚揣,如上所?涌矢,如果你在 Dispatchers.Default
上下?中運(yùn)?它,使?通道的管道的
好處在于它可以充分利?多核? CPU快骗。
扇出
多個(gè)協(xié)程也許會(huì)接收相同的管道娜庇,在它們之間進(jìn)?分布式?作。讓我們啟動(dòng)?個(gè)定期產(chǎn)?整數(shù)的?產(chǎn)者協(xié)程(每秒?
個(gè)數(shù)字):
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 從 1 開始
while (true) {
send(x++) // 產(chǎn)?下?個(gè)數(shù)字
delay(100) // 等待 0.1 秒
}
}
接下來(lái)我們可以得到?個(gè)處理器協(xié)程方篮。在這個(gè)?例中名秀,它們只是打印它們的 id 和接收到的數(shù)字:
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
現(xiàn)在讓我們啟動(dòng)五個(gè)處理器協(xié)程并讓它們?作將近?秒∨航Γ看看發(fā)?了什么:
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消協(xié)程?產(chǎn)者從?將它們?nèi)繗⑺?
該輸出將類似于如下所?匕得,盡管接收每個(gè)特定整數(shù)的處理器 id 可能會(huì)不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消?產(chǎn)者協(xié)程將關(guān)閉它的通道巾表,從?最終終?處理器協(xié)程正在執(zhí)?的此通道上的迭代汁掠。
還有,注意我們?nèi)绾问?for
循環(huán)顯式迭代通道以在 launchProcessor
代碼中執(zhí)?扇出集币。與 consumeEach
不
同考阱,這個(gè)for
循環(huán)是安全完美地使?多個(gè)協(xié)程的。如果其中?個(gè)處理器協(xié)程執(zhí)?失敗鞠苟,其它的處理器協(xié)程仍然會(huì)繼續(xù)
處理通道乞榨,?通過(guò) consumeEach
編寫的處理器始終在正常或?正常完成時(shí)消耗(取消)底層通道
扇?
多個(gè)協(xié)程可以發(fā)送到同?個(gè)通道当娱。?如說(shuō)吃既,讓我們創(chuàng)建?個(gè)字符串的通道,和?個(gè)在這個(gè)通道中以指定的延遲反復(fù)發(fā)
送?個(gè)指定字符串的掛起函數(shù):
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現(xiàn)在跨细,我們啟動(dòng)了?個(gè)發(fā)送字符串的協(xié)程鹦倚,讓我們看看會(huì)發(fā)?什么(在?例中,我們?cè)谥骶€程的上下?中作為主協(xié)程的
?協(xié)程來(lái)啟動(dòng)它們):
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六個(gè)
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有?協(xié)程來(lái)讓主協(xié)程結(jié)束
輸出如下:
foo
foo
BAR!
foo
foo
BAR!
帶緩沖的通道
到?前為?展?的通道都是沒有緩沖區(qū)的冀惭。?緩沖的通道在發(fā)送者和接收者相遇時(shí)傳輸元素(也稱“對(duì)接”)申鱼。如果發(fā)送
先被調(diào)?,則它將被掛起直到接收被調(diào)?云头,如果接收先被調(diào)?捐友,它將被掛起直到發(fā)送被調(diào)?。
Channel()
??函數(shù)與 produce
建造器通過(guò)?個(gè)可選的參數(shù) capacity
來(lái)指定 緩沖區(qū)?? 溃槐。緩沖允許發(fā)送者在被
掛起前發(fā)送多個(gè)元素匣砖,就像 BlockingQueue
有指定的容量?樣,當(dāng)緩沖區(qū)被占滿的時(shí)候?qū)?huì)引起阻塞。
看看如下代碼的表現(xiàn):
val channel = Channel<Int>(4) // 啟動(dòng)帶緩沖的通道
val sender = launch { // 啟動(dòng)發(fā)送者協(xié)程
repeat(10) {
println("Sending $it") // 在每?個(gè)元素發(fā)送前打印它們
channel.send(it) // 將在緩沖區(qū)被占滿時(shí)掛起
}
}
// 沒有接收到東西……只是等待……
delay(1000)
sender.cancel() // 取消發(fā)送者協(xié)程
使?緩沖通道并給 capacity
參數(shù)傳? 四 它將打印sending
五 次:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前四個(gè)元素被加?到了緩沖區(qū)并且發(fā)送者在試圖發(fā)送第五個(gè)元素的時(shí)候被掛起猴鲫。
通道是公平的
發(fā)送和接收操作是 公平的 并且尊重調(diào)?它們的多個(gè)協(xié)程对人。它們遵守先進(jìn)先出原則,可以看到第?個(gè)協(xié)程調(diào)?
receive
并得到了元素拂共。在下?的例?中兩個(gè)協(xié)程“乒”和“乓”都從共享的“桌?”通道接收到這個(gè)“球”元素牺弄。
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // ?個(gè)共享的 table(桌?)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 乒乓球
delay(1000) // 延遲 1 秒鐘
coroutineContext.cancelChildren() // 游戲結(jié)束,取消它們
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循環(huán)中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待?段時(shí)間
table.send(ball) // 將球發(fā)送回去
}
}
“乒”協(xié)程?先被啟動(dòng)宜狐,所以它?先接收到了球势告。甚?雖然“乒”協(xié)程在將球發(fā)送會(huì)桌?以后?即開始接收,但是球還是
被“乓”協(xié)程接收了抚恒,因?yàn)樗?直在等待著接收球:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
注意咱台,有時(shí)候通道執(zhí)?時(shí)由于執(zhí)?者的性質(zhì)?看起來(lái)不那么公平