注:本文中使用 runBlocking 是為了方便測試首尼,業(yè)務開發(fā)中禁止使用
一软能、Channel 基礎
(1)Channel 翻譯過來為通道或者管道查排,實際上就是個隊列, 是一個面向多協(xié)程之間數(shù)據(jù)傳輸?shù)?BlockQueue
跋核,用于協(xié)程間通信;
(2)Channel 使用 send
和 receive
兩個方法往管道里面寫入和讀取數(shù)據(jù)蹋订,這兩個方法是非阻塞的掛起函數(shù)刻伊;
(3)Channel 是熱流,不管有沒有訂閱者智什,上游都會發(fā)射數(shù)據(jù)丁屎。
1、簡單使用
fun channelFun() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
}
repeat(5) {
println(channel.receive())
}
println("Done!")
// 1
// 4
// 9
// 16
// 25
// Done!
}
2、Channel 的迭代
(1)我們發(fā)現(xiàn)甫贯,這種方式叫搁,實際上是我們一直在等待讀取 Channel 中的數(shù)據(jù)渴逻,只要有數(shù)據(jù)到了惨奕,就會被讀取到梨撞;
(2)最后一行 Done! 沒有打印出來卧波,表示程序沒有結束,一直處于等待讀取數(shù)據(jù)的狀態(tài)螃成。
fun channelIteratorFun() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) {
channel.send(x * x)
}
}
/*val iterator = channel.iterator()
while (iterator.hasNext()) {
println(iterator.next())
}*/
for (y in channel) {
println(y)
}
println("Done")
// 1
// 4
// 9
// 16
// 25
}
3、關閉 Channel
(1)調用 close
方法就像向通道發(fā)送了一個特殊的關閉指令查坪,這個迭代停止,說明關閉指令已經被接收了偿曙;
(2)這里能夠保證所有先前發(fā)送出去的元素都能在通道關閉前被接收到;
(3)調用了 close
會立即停止接受新元素望忆,isClosedForSend
會立即返回 true
,而由于 Channel
緩沖區(qū)的存在炭臭,這時候可能還有一些元素沒有被處理完,所以要等所有的元素都被讀取之后 isClosedForReceive
才會返回 true
袍辞。
fun channelCloseFun() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) {
channel.send(x * x)
}
channel.close() //結束發(fā)送數(shù)據(jù)
}
for (y in channel) {
println(y)
}
println("Done!")
// 1
// 4
// 9
// 16
// 25
// Done!
}
4鞋仍、Channel 的類型
(1)Channel
是一個接口,它繼承了 SendChannel
和 ReceiveChannel
兩個接口
(2)SendChannel
提供了發(fā)射數(shù)據(jù)的功能威创,有如下重點接口:
??- send
是一個掛起函數(shù)谎懦,將指定的元素發(fā)送到此通道,在該通道的緩沖區(qū)已滿或不存在時掛起調用者享甸。如果通道已經關閉,調用發(fā)送時會拋出異常蛉威;
??- trySend
如果不違反其容量限制哲虾,則立即將指定元素添加到此通道丙躏,并返回成功。否則束凑,返回失敗或關閉晒旅;
??- close
關閉通道;
??- isClosedForSend
判斷通道是否已經關閉湘今,如果關閉敢朱,調用 send 會引發(fā)異常。
(3)ReceiveChannel
提供了接收數(shù)據(jù)的功能摩瞎,有如下重點接口:
?? - receive
如果此通道不為空拴签,則從中檢索并刪除元素;如果通道為空旗们,則掛起調用者蚓哩;如果通道未接收而關閉,則引發(fā) ClosedReceiveChannel 異常上渴;
?? - tryReceive
如果此通道不為空岸梨,則從中檢索并刪除元素,返回成功結果稠氮;如果通道為空曹阔,則返回失敗結果;如果通道關閉隔披,則返回關閉結果赃份;
?? - receiveCatching
如果此通道不為空,則從中檢索并刪除元素奢米,返回成功結果抓韩;如果通道為空,則返回失敗結果鬓长;如果通道關閉谒拴,則返回關閉的原因;
?? - isEmpty
判斷通道是否為空涉波;
?? - isClosedForReceive
判斷通道是否已經關閉英上,如果關閉,調用 receive 會引發(fā)異常啤覆;
?? - cancel(cause: CancellationException? = null)
以可選原因取消接收此頻道的剩余元素善延,此函數(shù)用于關閉通道并從中刪除所有緩沖發(fā)送的元素;
?? - iterator()
返回通道的迭代器城侧。
(4)創(chuàng)建不同類型的 Channel
?? - Rendezvous channel
0尺寸 buffer (默認類型)
?? - Unlimited channel
無限元素, send 不被掛起
?? - Buffered channel
指定大小, 滿了之后 send 掛起
?? - Conflated channel
新元素會覆蓋舊元素, receiver 只會得到最新元素, send 永不掛起
fun channelCreateFun() = runBlocking {
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(Channel.CONFLATED)
val unlimitedChannel = Channel<String>(Channel.UNLIMITED)
}
二易遣、Channel實現(xiàn)協(xié)程間通信
1、多個協(xié)程訪問同一個 Channel
fun multipleCoroutineFun() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..3) {
channel.send(x)
}
}
launch {
delay(10)
for (y in channel) {
println("1 --> $y")
}
}
launch {
delay(20)
for (y in channel) {
println("2 --> $y")
}
}
launch {
delay(30)
for (x in 90..93) {
channel.send(x)
}
channel.close()
}
delay(1000)
println("Done!")
// 1 --> 1
// 2 --> 3
// 1 --> 2
// 2 --> 90
// 2 --> 92
// 1 --> 91
// 2 --> 93
// Done!
}
2嫌佑、produce 和 actor
(1)通過 produce
這個方法啟動一個生產者協(xié)程豆茫,并返回一個 ReceiveChannel
侨歉,其他協(xié)程就可以拿著這個 Channel 來接收數(shù)據(jù)了;
(2)通過 actor
可以用來構建一個消費者協(xié)程揩魂,并返回一個 SendChannel
幽邓,其他協(xié)程就可以拿著這個 Channel 來發(fā)送數(shù)據(jù)了。
fun produceFun() = runBlocking {
val receiveChannel = produce {
for (x in 1..3) {
delay(500)
send(x)
}
}
for (x in receiveChannel) {
println(x)
}
delay(3000)
receiveChannel.cancel()
println("Done!")
// 1
// 2
// 3
// Done!
}
fun actorFun() = runBlocking {
val sendChannel = actor<Int> {
for (e in channel) {
println(e)
}
}
sendChannel.send(100)
delay(2000)
sendChannel.close()
println("Done!")
// 100
// Done!
}
3火脉、BroadcastChannel
(1)BroadcastChannel
被標記為過時了牵舵,請使用 SharedFlow
和 StateFlow
替代它;
(2)1中例子提到一對多的情形倦挂,從數(shù)據(jù)處理本身來講畸颅,有多個接收端的時候,同一個元素只會被一個接收端讀到方援;而 BroadcastChannel
則不然没炒,多個接收端不存在互斥現(xiàn)象。
fun broadcastChannelFun() = runBlocking {
val broadcastChannel = BroadcastChannel<Int>(5)
val receiveChannel1 = broadcastChannel.openSubscription()
val receiveChannel2 = broadcastChannel.openSubscription()
launch {
for (x in 1..3) {
broadcastChannel.send(x)
}
}
launch {
for (e in receiveChannel1) {
println("1 --> $e")
}
}
launch {
for (e in receiveChannel2) {
println("2 --> $e")
}
}
delay(1000)
broadcastChannel.close()
println("Done!")
// 1 --> 1
// 1 --> 2
// 1 --> 3
// 2 --> 1
// 2 --> 2
// 2 --> 3
// Done!
}
使用 broadcast()
擴展函數(shù)可以將 Channel
轉換成 BroadcastChannel
fun broadcastChannelFun2() = runBlocking {
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast(3)
val receiveChannel1 = broadcastChannel.openSubscription()
val receiveChannel2 = broadcastChannel.openSubscription()
launch {
for (x in 1..3) {
channel.send(x)
}
}
launch {
for (e in receiveChannel1) {
println("1 --> $e")
}
}
launch {
for (e in receiveChannel2) {
println("2 --> $e")
}
}
delay(1000)
channel.close()
println("Done!")
// 1 --> 1
// 1 --> 2
// 1 --> 3
// 2 --> 1
// 2 --> 2
// 2 --> 3
// Done!
}