kotlin channel 入門
前言
最近項目中對 kotlin 的使用比較多忽刽。不得不說 kotlin 確實可以極大的提高 android 的開發(fā)效率,有許多之前得用 java 寫非常多矗晃、非常啰嗦的樣板代碼的 case往枣,用 kotlin 卻可以幾行搞定肛根,四兩撥千斤,同時邏輯表達(dá)也更加清晰活鹰。而 kotlin 對于 java 而言哈恰,最大的不同莫過于協(xié)程了。習(xí)慣了 kotlin 的協(xié)程志群,可能再也不想使用 java 的 handler + postDelay 了着绷。因此,在這里锌云,本人準(zhǔn)備對 kotlin 協(xié)程中一些比較難以上手的點荠医,進(jìn)行說明和分析。這篇文章桑涎,將會帶大家一起學(xué)習(xí)一下 kotlin 協(xié)程中 channel 的使用彬向。
channel 概述
kotlin 中,我們常用 defer 來進(jìn)行協(xié)助之間單個值的傳遞攻冷。比如娃胆,我們可能會寫如下代碼:
val deferred = GlobalScope.async {
// do something,
"this is a result"
}
deferred.await()
用來等待一個異步協(xié)程的結(jié)果。在結(jié)果返回之前等曼,當(dāng)前協(xié)程掛起里烦。那么,如果我們想獲取一系列的結(jié)果禁谦,應(yīng)該怎么辦呢胁黑?注意,這里的一系列的結(jié)果州泊,不是說我們需要一個 list丧蘸,而是說,我們想第一次 await()
的時候遥皂,得到一個值力喷,然后再次 await()
的時候,還能獲取到值渴肉。就像從一個隊列里面不斷的取出新的元素一樣冗懦。
這個時候我們就可以使用 channel
了爽冕。channel
非常類似于一個 java 中非常常見的概念 BlockingQueue
仇祭。只不過,BlockingQueue
使用可以阻塞的 put
方法颈畸,而 channel
使用可以掛起的 send
方法乌奇;BlockingQueue
使用可以阻塞的 take
方法没讲,而 channel
使用可以掛起的 receive
方法。所以礁苗,如果什么時候我們對于 channel
的理解產(chǎn)生了困惑爬凑,可以簡單的把相關(guān)的內(nèi)容類比到 BlockingQueue
中,來幫助我們進(jìn)行理解试伙。
channel 的用法
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
}
repeat(5) { println(channel.receive()) }
println("Done!")
簡單說明一下上面的代碼:我們有一個 channel 嘁信,我們會從這個 channel 中 receive 5 次。這五次一次獲取到從 1 到 5 一共五個數(shù)字疏叨。
這個簡單的代碼片段潘靖,其實蘊(yùn)含了非常重要的程序執(zhí)行流程:
我們假設(shè),根據(jù)代碼的書寫順序蚤蔓,先執(zhí)行到了 channel.send(1)
卦溢。根據(jù)上面闡述的內(nèi)容,send
因為是一個掛起的方法秀又,第一次只會執(zhí)行 1桐绒,并把 1 放入到 channel
中震缭。然后,receive
方法獲取到 1。這個時候在 repeat(5) 的循環(huán)中婉弹,再次執(zhí)行到 receive
的時候,因為 channel 中已經(jīng)沒有數(shù)了面睛,所以 receive
會掛起棚点。之后,協(xié)程會通過調(diào)度算法捷雕,讓 channel.send(2 * 2) 執(zhí)行椒丧,并讓 channel.send(3 * 3)
掛起。再之后救巷,channel.receive()
在經(jīng)過調(diào)度之后壶熏,得到執(zhí)行,獲取到剛才 channel.send(2 * 2)
的結(jié)果浦译,也就是 4 棒假。以此類推。
- channel.send(1)
- 發(fā)送方掛起
- channel.receive(1)
- 接收方掛起
- channel.send(4)
- 發(fā)送方掛起
- channel.receive(4)
- 接收方掛起
精盅。帽哑。。
channel 的關(guān)閉和遍歷
channel
跟 queue
的一個不同的點就是叹俏,channel
是可以關(guān)閉的妻枕。close
這個動作,底層其實是給 channel
發(fā)送了一個消息。官方管這個東西叫 close token
屡谐。因為 channel 在接收到 close
消息的時候述么,會立刻停止在這個 channel
上的遍歷的工作,所以 kotlin
會保證在 close
被調(diào)用之前已經(jīng)在 channel
中的消息被 received
愕掏。
kotlin 為我們提供了一個簡單的 channel 的遍歷方法度秘,也就是 for 循環(huán),使用方法如下:
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
channel 的流水線模式
流水線模式的使用場景如下:一個協(xié)程不斷的生產(chǎn)新的消息饵撑,其他協(xié)程不斷的處理這些消息剑梳,并且在這個過程中可能返回新的結(jié)果。跟我們說的函數(shù)式編程中的 map(映射) 非常類似滑潘。
這個模式可以讓我們很輕松的寫出一些簡潔而邏輯清晰的代碼阻荒,比如,下面代碼展示了如何生成素數(shù)的邏輯:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
fun main() {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
這樣众羡,執(zhí)行 main()
方法之后侨赡,就會輸出前 10 個素數(shù)。這里的原理也很簡單粱侣。如果我們不考慮 kotlin
的語法問題羊壹,但從計算機(jī)的角度解決生成素數(shù)的問題,一種解法是齐婴,我們需要用一個 list 來存儲已經(jīng)找到的素數(shù)油猫,然后,在對 n
進(jìn)行自增的過程中柠偶,遍歷所有已經(jīng)找到的素數(shù) list
情妖,如果所有的素數(shù)都不能整除 n
,那么這個 n
就是新的素數(shù)诱担。
用 pipeline
模式寫出的代碼毡证,原理跟上面闡述的一樣。只是上文所說的 list
被封裝在了一層一層的 filter
中蔫仙,最終執(zhí)行的過程中料睛,對于一個 n ,需要通過所有的 filter
摇邦,這跟上文說的遍歷所有已經(jīng)找到的素數(shù)列表的效果是一致的恤煞。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
扇入和扇出
扇入:多對一,多個 channel 作為生產(chǎn)者施籍,一個 channel 作為消費者居扒。
扇出:一對多,一個 channel 作為生產(chǎn)者丑慎,多個 channel 作為消費者喜喂。
雖然概念有不同瓤摧,但是,寫法上夜惭,跟一對一的 channel 是一樣的。
緩沖 channel
channel 默認(rèn)的 capacity 是 1铛绰。這也就是我們上文說的诈茧,send 方法在第二次會掛起,因為中間沒有 receive 來消費這個消息捂掰。直到有 receive 消費了上一個消息之后敢会,剛才掛起的 send 才能恢復(fù)執(zhí)行。當(dāng)然这嚣,我們可以通過設(shè)置參數(shù)讓這個 capacity 的值不為 1鸥昏,比如4。那么姐帚,跟上面的分析是一樣的吏垮,send 會執(zhí)行四次,然后在第五次的時候掛起罐旗,直到有 receive 把消息給消費掉了之后膳汪,之前掛起的 send 才能繼續(xù)恢復(fù)執(zhí)行。
channel 的公平性
channel 是公平的九秀。所以遗嗽,他會嚴(yán)格的按照 first-in first-out 的順序來執(zhí)行。一個比較好的例子鼓蜒,就是模擬打乒乓球:
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
// prints
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
這里的公平性就體現(xiàn)在痹换,ping
協(xié)程是先啟動的,所以理應(yīng)獲得到 ball
都弹。但是 ping
在通過調(diào)用 send
把 ball
送還給 channel
之后娇豫,又在循環(huán)的下一輪立刻請求獲取 ball
。但是畅厢,因為 pong
的 receive
是比 ping
的下一個 receive
先調(diào)用的锤躁,(是在上一個 ping
的后面調(diào)用的),所以是 pong
得到 ball
而不是 ping
或详。
Ticker channels
channel
還有一種比較常用的用法系羞,就是用來實現(xiàn)令牌系統(tǒng)。比如霸琴,我們現(xiàn)在的需求椒振,是每 100 ms 產(chǎn)生一個令牌,那么我在 51ms 來取梧乘,肯定是獲取不到的澎迎。但是我在 101ms 的時候來取庐杨,是可以獲取到的〖泄考慮一種情況灵份,令牌沒有得到及時的消費,比如哮洽,就是前 150ms 都沒有消費填渠,那么第 151ms 來的消費者是可以立刻獲取到令牌的。但是鸟辅,第 152ms 來的消費者是不能獲取到令牌的氛什。但是,第 201ms 過來的消費者是可以獲取到令牌的匪凉。