通過前幾篇的學(xué)習(xí),相信大家對Akka應(yīng)該有所了解了甫窟,都說解決并發(fā)哪家強(qiáng)密浑,JVM上面找Akka,那么Akka到底在解決并發(fā)問題上幫我們做了什么呢蕴坪?
共享內(nèi)存
眾所周知肴掷,在處理并發(fā)問題上面敬锐,最核心的一部分就是如何處理共享內(nèi)存背传,很多時(shí)候我們都需要花費(fèi)很多時(shí)間和精力在共享內(nèi)存上,那么在學(xué)習(xí)Akka對共享內(nèi)存是如何管理之前台夺,我們先來看看Java中是怎么處理這個(gè)問題的径玖。
Java共享內(nèi)存
相信對Java并發(fā)有所了解的同學(xué)都應(yīng)該知道在Java5推出JSR 133后,Java對內(nèi)存管理有了更高標(biāo)準(zhǔn)的規(guī)范了颤介,這使我們開發(fā)并發(fā)程序也有更好的標(biāo)準(zhǔn)了梳星,不會有一些模糊的定義導(dǎo)致的無法確定的錯(cuò)誤赞赖。
首先來看看一下Java內(nèi)存模型的簡單構(gòu)圖:
從圖中我們可以看到我們線程都有自己的一個(gè)工作內(nèi)存,這就好比高速緩存冤灾,它是對主內(nèi)存部分?jǐn)?shù)據(jù)的拷貝前域,線程對自己工作內(nèi)存的操作速度遠(yuǎn)遠(yuǎn)快于對主內(nèi)存的操作,但這也往往會引起共享變量不一致的問題韵吨,比如以下一個(gè)場景:
int a = 0;
public void setA() {
a = a + 1;
}
上面是一個(gè)很簡單的例子匿垄,a是一個(gè)全局變量,然后我們有一個(gè)方法去修改這個(gè)值归粉,每次增加一椿疗,假如我們用100個(gè)線程去運(yùn)行這段代碼,那a最終的結(jié)果會是多少呢糠悼?
100届榄?顯然不一定,它可能是80倔喂,90铝条,或者其他數(shù),這就造成共享變量不一致的問題席噩,那么為什么會導(dǎo)致這個(gè)問題呢攻晒,就是我們上面所說的,線程去修改a的時(shí)候可能就只是修改了自己工作內(nèi)存中a的副本班挖,但并沒有將a的值及時(shí)的刷新到主內(nèi)存中鲁捏,這便會導(dǎo)致其他線程可能讀到未被修改a的值,最終出現(xiàn)變量不一致問題萧芙。
那么Java中是怎么處理這種問題给梅,如何保證共享變量的一致性的呢?
同步機(jī)制
大體上Java中有3類同步機(jī)制双揪,但它們所解決的問題并不相同动羽,我們先來看一看這三種機(jī)制:
- final關(guān)鍵詞
- volatile關(guān)鍵詞
- synchronized關(guān)鍵詞(這里代表了所有類似監(jiān)視鎖的機(jī)制)
1.final關(guān)鍵詞
寫過Java程序的同學(xué)對這個(gè)關(guān)鍵詞應(yīng)該再熟悉不過了,其基本含義就是不可變渔期,不可變變量运吓,比如:
final int a = 10;
final String b = "hello";
不可變的含義在于當(dāng)你對這些變量或者對象賦初值后,不能再重新去賦值疯趟,但對于對象來說拘哨,我們不能修改的是它的引用,但是對象內(nèi)的內(nèi)容還是可以修改的信峻。下面是一個(gè)簡單的例子:
final User u = new User(1,"a");
u.id = 2; //可以修改
u = new User(2,"b"); //不可修改
所以在利用final關(guān)鍵詞用來保證共享變量的一致性時(shí)一定要了解清楚自己的需求倦青,選擇合適的方法。
2.volatile關(guān)鍵詞
很多同學(xué)在遇到共享變量不一致的問題后盹舞,都會說我在聲明變量前加一個(gè)volatile就好了产镐,但事實(shí)真是這樣嘛隘庄?答案顯然不是。那我們來看看volatile到底為我們做了什么癣亚。
前面我們說過每個(gè)線程都有自己的工作內(nèi)存丑掺,很多時(shí)候線程去修改一個(gè)變量的值只是修改了自己工作內(nèi)存中副本的值,這便會導(dǎo)致主內(nèi)存的值并不是最新的述雾,其他線程讀取到的變量便會出現(xiàn)問題吼鱼。volatile幫我們解決了這個(gè)問題,它有兩個(gè)特點(diǎn):
- 線程每次都會去主內(nèi)存中讀取變量
- 線程每次修改變量后的值都會及時(shí)更新到主內(nèi)存中去
舉個(gè)例子:
volatile int a = 0;
public void setA() {
a = a + 1;
}
現(xiàn)在線程在執(zhí)行這段代碼時(shí)绰咽,都會強(qiáng)制去主內(nèi)存中讀取變量的值菇肃,修改后也會馬上更新到主內(nèi)存中去,但是這真的能解決共享變量不一致的問題嘛取募,其實(shí)不然琐谤,比如我們有這么一個(gè)場景:兩個(gè)線程同時(shí)讀取了主內(nèi)存中變量最新的值,這是我們兩個(gè)線程都去執(zhí)行修改操作玩敏,最后結(jié)果會是什么呢斗忌?這里就留給大家自己去思考了,其實(shí)也很簡單的旺聚。
那么volatile在什么場景下能保證線程安全织阳,按照官方來說,有以下兩個(gè)條件:
- 對變量的寫操作不依賴于當(dāng)前值
- 該變量沒有包含在具有其他變量的不變式中
多的方面這里我就不展開了砰粹,推薦兩篇我覺得寫的還不錯(cuò)的文章:volatile的使用及其原理volatile的適用場景
3.synchronized關(guān)鍵詞
很多同學(xué)在學(xué)習(xí)Java并發(fā)過程中最先接觸的就是synchronized關(guān)鍵詞了唧躲,它確實(shí)能解決我們上述的并發(fā)問題,那它到時(shí)如何幫我們保證共享變量的一致性的呢碱璃?
簡而言之的說弄痹,線程在訪問請求用synchronized關(guān)鍵詞修飾的方法,代碼塊都會要求獲得一個(gè)監(jiān)視器鎖嵌器,當(dāng)線程獲得了監(jiān)視器鎖后肛真,它才有權(quán)限去執(zhí)行相應(yīng)的方法或代碼塊,并在執(zhí)行結(jié)束后釋放監(jiān)視器鎖爽航,這便能保證共享內(nèi)存的一致性了蚓让,因?yàn)楸疚闹饕侵vAkka的共享內(nèi)存,過多的篇幅就不展開了讥珍,這里推薦一篇解析synchronized原理很不錯(cuò)的文章历极,有興趣的同學(xué)可以去看看:Synchronized及其實(shí)現(xiàn)原理
Akka共享內(nèi)存
Akka中的共享內(nèi)存是基于Actor模型的,Actor模型提倡的是:通過通訊來實(shí)現(xiàn)共享內(nèi)存串述,而不是用共享內(nèi)存來實(shí)現(xiàn)通訊执解,這點(diǎn)是跟Java解決共享內(nèi)存最大的區(qū)別,舉個(gè)例子:
在Java中我們要去操作共享內(nèi)存中數(shù)據(jù)時(shí)纲酗,每個(gè)線程都需要不斷的獲取共享內(nèi)存的監(jiān)視器鎖衰腌,然后將操作后的數(shù)據(jù)暴露給其他線程訪問使用,用共享內(nèi)存來實(shí)現(xiàn)各個(gè)線程之間的通訊觅赊,而在Akka中我們可以將共享可變的變量作為一個(gè)Actor內(nèi)部的狀態(tài)右蕊,利用Actor模型本身串行處理消息的機(jī)制來保證變量的一致性。
當(dāng)然要使用Akka中的機(jī)制也必須滿足一下兩條原則:
- 消息的發(fā)送必須先于消息的接收
- 同一個(gè)Actor對一條消息的處理先于下一條消息處理
第二個(gè)原則很好理解吮螺,就是上面我們說的Actor內(nèi)部是串行處理消息饶囚,那我們來看看第一個(gè)原則,為什么要保證消息的發(fā)送先于消息的接收鸠补,是為了防止我們在創(chuàng)建消息的時(shí)候發(fā)生了不確定的錯(cuò)誤萝风,接收者將可能接收到不正確的消息,導(dǎo)致發(fā)生奇怪的異常紫岩。
通過前面的學(xué)習(xí)我們知道Actor是一種比線程更輕量級规惰,抽象程度更高的一種結(jié)構(gòu),它幫我們規(guī)避了我們自己去操作線程泉蝌,那么Akka底層到底是怎么幫我們?nèi)ケWC共享內(nèi)存的一致性的呢歇万?
一個(gè)Actor它可能會有很多線程同時(shí)向它發(fā)送消息,之前我們也說到Actor本身是串行處理的消息的勋陪,那它是如何保障這種機(jī)制的呢贪磺?
Mailbox
Mailbox在Actor模型是一個(gè)很重要的概念,我們都知道向一個(gè)Actor發(fā)送的消息首先都會被存儲到它所對應(yīng)的Mailbox中诅愚,那么我們先來看看MailBox的定義結(jié)構(gòu)(本文所引用的代碼都在akka.dispatch.Mailbox.scala中寒锚,有興趣的同學(xué)也可以去研究一下):
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}
很清晰Mailbox內(nèi)部維護(hù)了一個(gè)messageQueue這樣的消息隊(duì)列,并繼承了Scala自身定義的ForkJoinTask任務(wù)執(zhí)行類和我們很熟悉的Runnable接口违孝,由此可以看出壕曼,Mailbox底層還是利用Java中的線程進(jìn)行處理的。那么我們先來看看它的run方法:
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
為了配合理解等浊,我們這里先來看一下定義:
@inline
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def isClosed: Boolean = currentStatus == Closed
這里我們可以看出Mailbox本身會維護(hù)一個(gè)狀態(tài)Mailbox.Status腮郊,是一個(gè)Int變量,而且是可變的,并且用到volatile來保證了它的可見性:
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
現(xiàn)在我們在回去看上面的代碼筹燕,run方法的執(zhí)行過程轧飞,首先它會去讀取MailBox此時(shí)的狀態(tài),因?yàn)槭且粋€(gè)Volatile read撒踪,所以能保證讀取到的是最新的值过咬,然后它會先處理任何的系統(tǒng)消息,這部分不需要我們太過關(guān)心制妄,之后便是執(zhí)行我們發(fā)送的消息掸绞,這里我們需要詳細(xì)看一下processMailbox()的實(shí)現(xiàn):
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue() //去出下一條消息
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs) //遞歸處理下一條消息
}
}
從上述代碼中我們可以清晰的看到,當(dāng)滿足消息處理的情況下就會進(jìn)行消息處理,從消息隊(duì)列列取出下一條消息就是上面的dequeue()
,然后將消息發(fā)給具體的Actor進(jìn)行處理衔掸,接下去又是處理系統(tǒng)消息烫幕,然后判斷是否還有滿足情況需要下一條消息,若有則再次進(jìn)行處理敞映,可以看成一個(gè)遞歸操作,@tailrec
也說明了這一點(diǎn)较曼,它表示的是讓編譯器進(jìn)行尾遞歸優(yōu)化。
現(xiàn)在我們來看一下一條消息從發(fā)送到最終處理在Akka中到底是怎么執(zhí)行的振愿,下面的內(nèi)容是我通過閱讀Akka源碼加自身理解得出的捷犹,這里先畫了一張流程圖:
消息的大致流程我都在圖中給出,還有一些細(xì)節(jié)冕末,必須序列化消息萍歉,獲取狀態(tài)等就沒有具體說明了,有興趣的同學(xué)可以自己去閱讀以下Akka的源碼档桃,個(gè)人覺得Akka的源碼閱讀性還是很好的枪孩,比如:
- 基本沒有方法超過20行
- 不會有過多的注釋,但關(guān)鍵部分會給出胳蛮,更能加深自己的理解
當(dāng)然也有一些困擾销凑,我們在不了解各個(gè)類,接口之間的關(guān)系時(shí)仅炊,閱讀體驗(yàn)就會變得很糟糕斗幼,當(dāng)然我用IDEA很快就解決了這個(gè)問題。
我們這里來看看關(guān)鍵的部分:Actor是如何保證串行處理消息的抚垄?
上圖中有一根判定蜕窿,是否已有線程在執(zhí)行任務(wù)?我們來看看這個(gè)判定的具體邏輯:
@tailrec
final def setAsScheduled(): Boolean = { //是否有線程正在調(diào)度執(zhí)行該MailBox的任務(wù)
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
*/
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
從注釋和代碼的邏輯上我們可以看出當(dāng)已有線程在執(zhí)行返回false呆馁,若沒有則去更改狀態(tài)為以調(diào)度桐经,直到被其他線程搶占或者更改成功,其中updateStatus()是線程安全的浙滤,我們可以看一下它的實(shí)現(xiàn),是一個(gè)CAS操作:
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
到這里我們應(yīng)該可以大致清楚Actor內(nèi)部是如何保證共享內(nèi)存的一致性了阴挣,Actor接收消息是多線程的,但處理消息是單線程的纺腊,利用MailBox中的Status來保障這一機(jī)制畔咧。
總結(jié)
通過上面的內(nèi)容我們可以總結(jié)出以下幾點(diǎn):
- Akka并不是說用了什么特殊魔法來保證并發(fā)的,底層使用的還是Java和JVM的同步機(jī)制
- Akka并沒有使用任何的鎖機(jī)制揖膜,這就避免了死鎖的可能性
- Akka并發(fā)執(zhí)行的處理并沒有使用線程切換誓沸,不僅提高了線程的使用效率,也大大減少了線程切換消耗
- Akka為我們提供了更高層次的并發(fā)抽象模型壹粟,讓我們不必關(guān)心底層的實(shí)現(xiàn)拜隧,只需著重實(shí)現(xiàn)業(yè)務(wù)邏輯就行,遵循它的規(guī)范,讓框架幫我們處理一切難點(diǎn)吧