(1)基本用法
Channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列
谨读,它可以用來(lái)連接協(xié)程信柿,實(shí)現(xiàn)不同協(xié)程的通信。
生產(chǎn)者/消費(fèi)者模式 (send - channel - receive)
Channel的基本用法如下:
runBlocking {
val channel = Channel<Int>()
// 生產(chǎn)者
val producer = GlobalScope.launch {
var i = 0
while(true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
while(true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
(2)Channel的容量
Channel實(shí)際上就是一個(gè)隊(duì)列,隊(duì)列中一定存在緩沖區(qū)胸蛛,那么一旦這個(gè)緩沖區(qū)滿了习绢,
并且也一直沒(méi)有人調(diào)用receive取走數(shù)據(jù)渠抹,send就需要掛起。
故意讓接收端的節(jié)奏放慢闪萄,發(fā)現(xiàn)send總是會(huì)掛起梧却,直到receive之后才會(huì)繼續(xù)往下執(zhí)行。
Channel的默認(rèn)大小為0败去。
(3)迭代Channel
Channel本身確實(shí)像序列放航,所以我們?cè)谧x取的時(shí)候可以直接獲取一個(gè)Channel的iterator。
runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
// 生產(chǎn)者
val producer = GlobalScope.launch {
for (x in 1..5) {
println("send ${x * x}")
channel.send(x * x)
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while(iterator.hasNext()) {
val element = iterator.next()
println("receive $element")
delay(1000)
}
}
joinAll(producer, consumer)
}
消費(fèi)者代碼也可以改成:
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(1000)
}
}
(4)produce與actor
構(gòu)造生產(chǎn)者與消費(fèi)者的便捷方法
圆裕。
我們可以通過(guò)produce方法啟動(dòng)一個(gè)生產(chǎn)者協(xié)程广鳍,并返回一個(gè)ReceiveChannel,其他協(xié)程就可以用這個(gè)Channel來(lái)接收數(shù)據(jù)了吓妆。
反過(guò)來(lái)搜锰,我們可以用actor啟動(dòng)一個(gè)消費(fèi)者協(xié)程。
produce演示:
runBlocking {
val receiveChannel = GlobalScope.produce<Int> {
repeat(100) {
delay(1000)
send(it)
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in receiveChannel) {
println("receive $element")
}
}
consumer.join()
}
actor演示:
runBlocking {
// 消費(fèi)者
val sendChannel = GlobalScope.actor<Int> {
while (true) {
val element = receive()
println("receive $element")
}
}
// 生產(chǎn)者
val producer = GlobalScope.launch {
repeat(100) {
sendChannel.send(it)
delay(1000)
}
}
producer.join()
}
(5)Channel關(guān)閉
produce和actor返回的Channel都會(huì)隨著對(duì)應(yīng)的協(xié)程執(zhí)行完畢而關(guān)閉耿战,也正是這樣蛋叼,Channel才被稱為 熱數(shù)據(jù)流
。
對(duì)于一個(gè)Channel剂陡,如果我們調(diào)用了它的close方法狈涮,它會(huì)立即停止接收新元素,也就是說(shuō)這時(shí)它的 isClosedForSend
會(huì)立即返回true鸭栖,而由于Channel緩沖區(qū)的存在歌馍,這時(shí)可能還有一些元素沒(méi)有處理完,因此要等所有的元素都讀取之后 isClosedForReceive
才會(huì)返回true晕鹊。
Channel的生命周期最好由主導(dǎo)方來(lái)維護(hù)松却,建議 由主導(dǎo)的一方實(shí)現(xiàn)關(guān)閉
。
runBlocking {
val channel = Channel<Int>(3)
// 生產(chǎn)者
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
channel.close()
println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive:$element")
delay(1000)
}
println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
}
joinAll(producer, consumer)
}
(6)BroadcastChannel
正常情況下溅话,一個(gè) 發(fā)送者
對(duì)應(yīng)著一個(gè) 接收者
晓锻。
使用 BroadcastChannel
可以存在多個(gè)接收者。
runBlocking {
val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
// 生產(chǎn)者
val producer = GlobalScope.launch {
List(3) {
delay(1000)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) { index->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (element in receiveChannel) {
delay(1000)
println("[#${index}] receive:$element")
}
}
}.joinAll()
producer.join()
}
BroadcastChannel<Int>(Channel.BUFFERED)
可以改成 Channel<Int>().broadcast(Channel.BUFFERED)
飞几。
(7)await多路復(fù)用
什么是多路復(fù)用砚哆?
數(shù)據(jù)通信系統(tǒng)或計(jì)算機(jī)網(wǎng)絡(luò)系統(tǒng)中,傳輸媒體的帶寬或容量往往會(huì)大于傳輸單一信號(hào)的需求屑墨,為了有效地利用通信線路躁锁,希望 一個(gè)信道同時(shí)傳輸多路信號(hào)
纷铣,這就是所謂多路復(fù)用技術(shù)。
復(fù)用多個(gè)await战转?
兩個(gè)API分別從網(wǎng)絡(luò)和本地緩存 獲取數(shù)據(jù)搜立,期望哪個(gè)先返回就先用哪個(gè)做展示。
request->server->response--
----Select -> Response
request->server->response--
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
delay(1000)
User(name)
}
suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
delay(100)
User(name)
}
fun main() {
runBlocking {
GlobalScope.launch {
val localRequest = getUserForLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
// select 選擇執(zhí)行
val userResponse = select<Response<User>> {
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) }
}
println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
}.join()
}
}
select:誰(shuí)先返回槐秧,就選擇誰(shuí)儒拂。
(8)復(fù)用多個(gè)Channel
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[0].send(100)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}
}
(9)SelectClause
我們?cè)趺粗滥男┦录梢员籹elect呢? 其實(shí)所有能夠被select的事件都是selectClauseN 類型色鸳,包括:
- selectClause0:對(duì)應(yīng)事件沒(méi)有返回值社痛,例如join沒(méi)有返回值,那么onJoin就是SelectClauseN類型命雀。使用時(shí)蒜哀,onJoin的參數(shù)是一個(gè)無(wú)參函數(shù)。
- selectClause1:對(duì)應(yīng)事件有返回值吏砂,例如:onAwait和onReceive都是此類情況撵儿。
- selectClause2:對(duì)應(yīng)事件有返回值,此外還需要一個(gè)額外的參數(shù)狐血,例如Channel.onSend有兩個(gè)參數(shù)淀歇,第一個(gè)是具體的數(shù)據(jù),第二個(gè)參數(shù)是發(fā)送成功的回調(diào)參數(shù)匈织。
-> 如果我們想要確認(rèn)掛起函數(shù)是否支持select浪默,只需要查看其是否存在對(duì)應(yīng)的SelectClauseN類型可回調(diào)即可。
selectClause0舉例:
fun main() {
runBlocking {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")
}
val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
select<Unit> {
job1.onJoin { println("job1 onJoin") }
job2.onJoin { println("job2 onJoin") }
}
}
}
selectClause1舉例:
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
delay(1000)
User(name)
}
suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
delay(100)
User(name)
}
fun main() {
runBlocking {
GlobalScope.launch {
val localRequest = getUserForLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
// select 選擇執(zhí)行
val userResponse = select<Response<User>> {
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) }
}
println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
}.join()
}
}
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[0].send(100)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}
}
selectClause2舉例:
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit> {
launch {
delay(10)
channels[1].onSend(200) { sentChannel->
println("sent on $sentChannel")
}
}
launch {
delay(100)
channels[0].onSend(100) { sentChannel->
println("sent on $sentChannel")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
}
}
[完...]