github原文地址
原創(chuàng)翻譯意敛,轉(zhuǎn)載請保留或注明出處:http://www.reibang.com/p/01d26fbc9b80
共享的可變狀態(tài)和并發(fā)
協(xié)程可用多線程調(diào)度器(比如默認(rèn)的 CommonPool )并發(fā)執(zhí)行妈经。這樣就可以提出所有常見的并發(fā)問題凿跳。主要的問題是同步訪問共享的可變狀態(tài)秒裕。協(xié)程領(lǐng)域?qū)@個問題的一些解決方案類似于多線程領(lǐng)域中的解決方案烈评,但其他解決方案則是獨一無二的凉袱。
問題
我們啟動一千個協(xié)程逗柴,它們都做一千次相同的動作(總計100萬次執(zhí)行)。我們同時會測量它們的完成時間院塞,以便進(jìn)一步的比較:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我們從一個非常簡單的動作開始:在多線程 CommonPool 上下文遞增一個共享的可變變量遮晚。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼最后打印出什么結(jié)果?它不太可能打印出“Counter = 1000000”拦止,因為一千個協(xié)程從多個線程同時遞增計數(shù)器而且沒有做同步并發(fā)處理县遣。
注意:如果你的運行機(jī)器使用兩個或者更少的cpu,那么你總是會看到1000000汹族,因為
CommonPool
在這種情況下只會在一個線程中運行萧求。要重現(xiàn)這個問題,可以做如下的變動:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
沒有發(fā)揮作用的volatile
有一種常見的誤解:volatile
可以解決并發(fā)問題顶瞒。讓我們嘗試一下:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行速度更慢了夸政,但我們?nèi)匀粵]有得到 “Counter = 1000000”,因為 volatile 變量保證可線性化(這是“原子”的技術(shù)術(shù)語)讀取和寫入變量榴徐,但在大量動作(在我們的示例中即“遞增”操作)發(fā)生時并不提供原子性守问。
線程安全的數(shù)據(jù)結(jié)構(gòu)
一種對線程、協(xié)程都有效的常規(guī)解決方法坑资,就是使用線程安全(也稱為同步的耗帕、可線性化、原子)的數(shù)據(jù)結(jié)構(gòu)袱贮,它為需要在共享狀態(tài)上執(zhí)行的相應(yīng)操作提供所有必需的同步處理仿便。在簡單的計數(shù)器場景中,我們可以使用具有 incrementAndGet
原子操作的AtomicInteger
類:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
獲取完整代碼 here
這是針對此類特定問題的最快解決方案。它適用于普通計數(shù)器嗽仪、集合荒勇、隊列和其他標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu)以及它們的基本操作。然而钦幔,它并不容易擴(kuò)展為應(yīng)對復(fù)雜狀態(tài)枕屉、或復(fù)雜操作沒有現(xiàn)成的線程安全實現(xiàn)的情況。
以細(xì)粒度限制線程
限制線程是解決共享可變狀態(tài)問題的一種方案鲤氢,其中對特定共享狀態(tài)的所有訪問權(quán)都限制在單個線程中搀擂。它通常應(yīng)用于UI程序中:所有UI狀態(tài)都局限于單個事件分發(fā)線程或應(yīng)用主線程中。這在協(xié)程中很容易實現(xiàn)卷玉,通過使用一個單線程上下文:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行非常緩慢哨颂,因為它進(jìn)行了細(xì)粒度的線程限制。每個增量操作都得使用 withContext 塊從多線程 CommonPool
上下文切換到單線程上下文相种。
以粗粒度限制線程
在實踐中威恼,線程限制是在大段代碼中執(zhí)行的,例如:狀態(tài)更新類業(yè)務(wù)邏輯中大部分都是限于單線程中寝并。下面的示例演示了這種情況箫措,在單線程上下文中運行每個協(xié)程。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行更快而且打印出了正確的結(jié)果衬潦。
互斥
該問題的互斥解決方案是使用永遠(yuǎn)不會同時執(zhí)行的關(guān)鍵代碼塊來保護(hù)共享狀態(tài)的所有修改斤蔓。在阻塞的世界中,你通常會使用 synchronized
或者 ReentrantLock
镀岛。在協(xié)程中的替代品叫做 Mutex 弦牡。它具有 lock 和 unlock 方法,可以隔離關(guān)鍵的部分漂羊。關(guān)鍵的區(qū)別在于 Mutex.lock()
是一個掛起函數(shù)驾锰,它不會阻塞線程。
還有 withLock 擴(kuò)展函數(shù)走越,可以方便的替代常用的 mutex.lock(); try { ... } finally { mutex.unlock() }
模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
獲取完整代碼 here
此示例中鎖是細(xì)粒度的椭豫,因此會付出一些代價。但是對于某些必須定期修改共享狀態(tài)的場景旨指,它是一個不錯的選擇赏酥,但是沒有自然線程可以限制此狀態(tài)。
Actors
一個 actor 是由若干元素組成的一個實體:一個協(xié)程淤毛、它的狀態(tài)受限封裝在此協(xié)程中、以及一個與其他協(xié)程通信的 channel 算柳。一個簡單的 actor 可以簡單的寫成一個函數(shù)低淡,但是一個擁有復(fù)雜狀態(tài)的 actor 更適合由類來表示。
有一個 actor 協(xié)程構(gòu)建器,它可以方便地將 actor 的郵箱 channel 組合到其作用域中(用來接收消息)蔗蹋、組合發(fā)送 channel 與結(jié)果集對象何荚,這樣對 actor 的單個引用就可以作為其句柄持有。
使用 actor 的第一步是定一個 actor 要處理的消息類猪杭。Kotlin 的 sealed classes 密封類很適合這種場景餐塘。我們使用 IncCounter
消息(用來遞增計數(shù)器)和 GetCounter
消息(用來獲取值)來定義 CounterMsg
密封類。后者需要發(fā)送回復(fù)皂吮。CompletableDeferred 通信原語表示未來可知(傳達(dá))的單個值戒傻,此處用于此目的。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
接下來我們定義一個函數(shù)蜂筹,使用 actor 協(xié)程構(gòu)建器來啟動一個 actor:
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主函數(shù)代碼很簡單:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
獲取完整代碼 here
actor 本身執(zhí)行所處上下文的正確性無關(guān)緊要需纳。一個 actor 是一個協(xié)程,而一個協(xié)程是按順序執(zhí)行的艺挪,因此將狀態(tài)限制到特定協(xié)程可以解決共享可變狀態(tài)的問題不翩。實際上,actor 可以修改自己的私有狀態(tài)麻裳,但只能通過消息互相影響(避免任何鎖定)口蝠。
actor 在高負(fù)載下比鎖更有效,因為在這種情況下它總是有工作要做津坑,而且根本不需要切換到不同的上下文妙蔗。
注意, actor 協(xié)程構(gòu)建器是 produce 協(xié)程構(gòu)建器的雙重構(gòu)件国瓮。一個 actor 與它接收消息的 channel 相關(guān)聯(lián)灭必,而一個 producer 與它發(fā)送元素的 channel 相關(guān)聯(lián)。