多線程知識(shí)梳理(1) - 并發(fā)編程的藝術(shù)筆記

第三章 Java內(nèi)存模型

3.1 Java內(nèi)存模型的基礎(chǔ)

  • 通信
    在共享內(nèi)存的模型里,通過寫-讀內(nèi)存中的公共狀態(tài)進(jìn)行隱式通信干签;在消息傳遞的并發(fā)模型里,線程之間必須通過發(fā)送消息來進(jìn)行顯示的通信恭理。
  • 同步
    在共享內(nèi)存并發(fā)模型里殉挽,同步是顯示進(jìn)行的,程序員必須顯示指定某個(gè)方法或者某段代碼需要在線程之間互斥執(zhí)行裳擎;在消息傳遞的并發(fā)模型里涎永,由于消息的發(fā)送必須在接收之前,因此同步是隱式進(jìn)行的。

Java中羡微,所有實(shí)例域谷饿、靜態(tài)域和數(shù)組元素都存儲(chǔ)在堆內(nèi)存中,堆內(nèi)存在線程之間共享妈倔;局部變量博投、方法定義參數(shù)和異常處理器參數(shù)不會(huì)在線程之間共享。
從抽象角度來看启涯,JMM定義了線程和主內(nèi)存之間的抽象關(guān)系:線程之間的共享變量存儲(chǔ)在主內(nèi)存中贬堵,每個(gè)線程都有一個(gè)私有的本地內(nèi)存,本地內(nèi)存涵蓋了緩存结洼、寫緩沖區(qū)黎做、寄存器以及其它的硬件和編譯器優(yōu)化。
JMM通過控制主內(nèi)存與每個(gè)線程的本地內(nèi)存之間的交互松忍,來為Java程序員提供內(nèi)存可見性保證蒸殿。

重排序

指編譯器和處理器為了優(yōu)化程序性能而對(duì)指令序列進(jìn)行重新排序的一種手段:

  • 編譯器優(yōu)化的重排序:編譯器在不改變單線程程序語義的前提下,重新安排語句的執(zhí)行順序鸣峭。
  • 處理器的指令級(jí)并行的重排序:如果不存在數(shù)據(jù)依賴性宏所,處理器可以改變語句對(duì)應(yīng)機(jī)器指令的執(zhí)行順序。
  • 內(nèi)存系統(tǒng)的重排序:由于處理器使用緩存和讀/寫緩沖區(qū)摊溶,這使得加載和存儲(chǔ)操作看上去可能是在亂序執(zhí)行爬骤。

JMM的編譯器重新排序規(guī)則會(huì)禁止特定類型的編譯器重排序,對(duì)于處理器重排序莫换,JMM的處理器重排序規(guī)則會(huì)要求Java編譯器在生成指令時(shí)霞玄,插入特定類型的內(nèi)存屏障。
現(xiàn)代的處理器使用寫緩沖區(qū)臨時(shí)保存向內(nèi)存寫入的數(shù)據(jù)拉岁,但每個(gè)處理器上的寫緩沖區(qū)坷剧,僅僅對(duì)它所在的處理器可見
由于寫緩沖區(qū)僅對(duì)自己的處理器可見喊暖,它會(huì)導(dǎo)致處理器執(zhí)行內(nèi)存操作的順序可能會(huì)與內(nèi)存實(shí)際的操作執(zhí)行順序不一致惫企,由于現(xiàn)代的處理器都會(huì)使用寫緩沖區(qū),因此現(xiàn)代的處理器都會(huì)允許對(duì)寫-讀操作進(jìn)行重排序陵叽,但不允許對(duì)存在數(shù)據(jù)依賴的操作做重排序狞尔。

happens-before簡(jiǎn)介

用來闡述操作之間的內(nèi)存可見性,如果一個(gè)操作執(zhí)行的結(jié)果需要對(duì)另一個(gè)操作可見巩掺,那么這兩個(gè)操作必須要存在happens-before關(guān)系沪么,這兩個(gè)操作既可以在一個(gè)線程之內(nèi),也可以在不同線程之間锌半,但并不等于前一個(gè)操作必須要在后一個(gè)操作之前執(zhí)行

數(shù)據(jù)依賴性

編譯器和處理器不會(huì)改變存在數(shù)據(jù)依賴關(guān)系的兩個(gè)操作的執(zhí)行順序,但是僅針對(duì)單個(gè)處理器中執(zhí)行的指令序列和單個(gè)線程中執(zhí)行的操作刊殉。

as-if-serial

無論怎么重排序殉摔,單線程程序的執(zhí)行結(jié)果不能改變。

在單線程中记焊,對(duì)存在控制依賴的操作重排序逸月,不會(huì)改變執(zhí)行結(jié)果;但在多線程程序中遍膜,對(duì)存在控制依賴的操作重排序碗硬,可能會(huì)改變程序的執(zhí)行結(jié)果。

順序一致性

順序一致性是一個(gè)理論參考模型瓢颅,在設(shè)計(jì)的時(shí)候恩尾,處理器的內(nèi)存模型和編程語言的內(nèi)存模型都會(huì)以順序一致性內(nèi)存作為參照。
如果程序是正確同步的挽懦,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果與該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同翰意。
如果程序是正確同步的,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同信柿。
順序一致模型有兩大特性:

  • 一個(gè)線程中的所有操作必須按照程序的順序來執(zhí)行冀偶。
  • 所有線程都只能看到一個(gè)單一的操作執(zhí)行順序,在順序一致性內(nèi)存模型中渔嚷,每個(gè)操作都必須原子執(zhí)行且立刻對(duì)所有線程可見进鸠。

對(duì)于未同步或未正確同步的多線程程序,JMM只提供最小安全性:線程執(zhí)行時(shí)讀取到的值形病,要么是之前某個(gè)線程寫入的值客年,要么是默認(rèn)值。
JMM不保證未同步程序的執(zhí)行結(jié)果與該程序在順序一致性模型中的執(zhí)行結(jié)果一致窒朋。
未同步程序在兩個(gè)模型中的執(zhí)行特征有如下差異:

  • 順序一致性模型保證單線程內(nèi)的操作會(huì)按程序的順序執(zhí)行搀罢,而JMM不保證單線程內(nèi)的操作會(huì)按程序的順序執(zhí)行。
  • 順序一致性模型保證所有線程只能看到一致的操作執(zhí)行順序侥猩,而JMM不保證所有線程能看到一致的操作執(zhí)行順序榔至。
  • JMM不保證對(duì)64位的long/double型變量的寫操作具有原子性,而順序一致性模型保證對(duì)所有內(nèi)存讀/寫操作都具有原子性欺劳。

第四章 Java并發(fā)編程基礎(chǔ)

  • 現(xiàn)代操作系統(tǒng)調(diào)度的最小單元是線程唧取,也叫輕量級(jí)進(jìn)程,在一個(gè)進(jìn)程里可以創(chuàng)建多個(gè)線程划提,這些線程都擁有各自的計(jì)數(shù)器枫弟、堆棧和局部變量等特性,并且能夠訪問共享的內(nèi)存變量鹏往。
  • 設(shè)置線程優(yōu)先級(jí)時(shí)淡诗,針對(duì)頻繁阻塞(休眠或者I/O操作)的線程需要設(shè)置較高優(yōu)先級(jí),而偏重計(jì)算(需要較多CPU時(shí)間或者偏運(yùn)算)的線程則設(shè)置較低的優(yōu)先級(jí),確保處理器不會(huì)被獨(dú)占韩容。
  • 線程在運(yùn)行的生命周期中可能處于以下6種不同的狀態(tài):
  • New:初始狀態(tài)款违,線程被創(chuàng)建,但是沒有調(diào)用start()方法群凶。
  • Runnable:運(yùn)行狀態(tài)插爹,Java線程將操作系統(tǒng)中的就緒和運(yùn)行兩種狀態(tài)統(tǒng)稱為“運(yùn)行中”。
  • Blocked:阻塞狀態(tài)请梢,表示線程阻塞于鎖赠尾。
  • Waiting:等待狀態(tài),表示線程進(jìn)入等待狀態(tài)毅弧,進(jìn)入該狀態(tài)表示當(dāng)前線程需要等待其它線程做出一些指定動(dòng)作(通知或中斷)气嫁。
  • Time_Waiting:超時(shí)等待狀態(tài),可以在指定的時(shí)間自行返回形真。
  • Terminated:終止?fàn)顟B(tài)杉编,表示當(dāng)前線程已經(jīng)執(zhí)行完畢。
  • 中斷可以理解為線程的一個(gè)標(biāo)識(shí)位屬性咆霜,它標(biāo)識(shí)一個(gè)運(yùn)行中的線程是否被其它線程進(jìn)行了中斷操作邓馒。中斷好比其他線程對(duì)該線程打了一個(gè)招呼,其他線程通過調(diào)用該線程的interrupt()方法對(duì)其進(jìn)行中斷操作蛾坯。
  • 線程通過檢查自身是否被中斷來進(jìn)行響應(yīng)光酣,線程通過方法isInterrupt來進(jìn)行判斷是否被中斷,也可以調(diào)用靜態(tài)方法Thread.interrupt對(duì)當(dāng)前線程的中斷標(biāo)識(shí)位進(jìn)行復(fù)位脉课,如果該線程已經(jīng)處于終止?fàn)顟B(tài)救军,即使該線程被中斷過,在調(diào)用該線程對(duì)象的isInterrupt時(shí)依舊返回false倘零。
  • 在拋出InterruptedException異常之前唱遭,Java虛擬機(jī)會(huì)先將該線程的中斷標(biāo)識(shí)位清除。
  • 中斷狀態(tài)是線程的一個(gè)標(biāo)識(shí)位呈驶,而中斷操作是一種簡(jiǎn)便的線程間交互方式拷泽,而這種交互方式最適合用來取消或停止任務(wù),除了中斷之外袖瞻,還可以利用一個(gè)boolean變量來控制是否需要停止任務(wù)并終止該線程司致。
  • Java支持多個(gè)線程同時(shí)訪問一個(gè)對(duì)象或者對(duì)象的成員變量,由于每個(gè)線程可以擁有這個(gè)變量的拷貝聋迎,所以在程序的執(zhí)行過程中脂矫,一個(gè)線程看到的變量并不一定是最新的。
  • volatile可以用來修飾字段霉晕,就是告知程序任何對(duì)該變量的訪問需要從共享內(nèi)存中獲取庭再,而對(duì)它的改變必須同步刷新回共享內(nèi)存捞奕,它能保證所有線程對(duì)變量訪問的可見性。
  • synchronized可以修飾方法或者以同步塊的形式來進(jìn)行使用佩微,它主要確保多個(gè)線程在同一時(shí)刻缝彬,只能有一個(gè)線程處于方法或者同步塊中,它保證了線程對(duì)變量訪問的可見性和排他性哺眯。
  • 任意線程對(duì)ObjectObjectsynchronized保護(hù))的訪問,首先要獲得Object的監(jiān)視器扒俯,如果獲取失敗奶卓,線程進(jìn)入同步隊(duì)列,線程狀態(tài)變?yōu)?code>Blocked撼玄,當(dāng)訪問Object的前驅(qū)(獲得了鎖的線程)釋放了鎖夺姑,則該釋放操作喚醒阻塞在同步隊(duì)列中的線程,使其重新嘗試對(duì)監(jiān)視器的獲取掌猛。
  • 等待/通知的相關(guān)方法:
  • notify():通知一個(gè)在對(duì)象上等待的線程盏浙,使其從wait()方法返回,而返回的前提是該線程獲取到了對(duì)象上的鎖荔茬。
  • notifyAll():通知所有等待在該對(duì)象上的鎖废膘。
  • wait():調(diào)用該方法的線程進(jìn)入Waiting狀態(tài),只有等待另外線程的通知或被中斷才會(huì)返回慕蔚,調(diào)用wait()方法后丐黄,會(huì)釋放對(duì)象的鎖。
  • wait(long):超時(shí)等待一段時(shí)間孔飒,如果沒有通知就返回灌闺。
  • wait(long, int):對(duì)于超時(shí)時(shí)間更精細(xì)粒度的控制,可以達(dá)到納秒坏瞄。
  • 兩個(gè)線程通過對(duì)象來完成交互桂对,而對(duì)象上的waitnotify/notifyAll()的關(guān)系就如同開關(guān)信號(hào)一樣,用來完成等待方和通知方之間的交互工作鸠匀。
  • 等待/通知的經(jīng)典范式:
  • 等待方
    (1) 獲取對(duì)象的鎖蕉斜。
    (2) 如果條件不滿足,那么調(diào)用對(duì)象的wait()方法狮崩,被通知后仍要檢查條件蛛勉。
    (3) 條件滿足則執(zhí)行對(duì)應(yīng)的邏輯。
synchronized(對(duì)象) {
        while(條件不滿足) {
            對(duì)象.wait();
        }
        對(duì)應(yīng)的處理邏輯;
}
  • 通知方
    (1) 獲得對(duì)象的鎖
    (2) 改變條件
    (3) 通知所有等待在該對(duì)象上的線程睦柴。
synchronized(對(duì)象) {
        改變條件;
        對(duì)象.notifyAll();
}
  • 管道輸入/輸出流用于線程之間的數(shù)據(jù)傳輸诽凌,而傳輸?shù)拿浇闉閮?nèi)存,主要包括了以下4種實(shí)現(xiàn):PipedOutputStream坦敌、PipeInputStream侣诵、PipedReader痢法、PipedWriter,前兩種面向字節(jié)杜顺,后兩種面向字符财搁。
  • 如果一個(gè)線程A執(zhí)行了Thread.join(),其含義是:當(dāng)前線程A等待Thread線程終止后躬络,才從Thread.join返回尖奔,線程Thread除了提供join()方法外桦卒,還提供了join(long millis)join(long millis, int nanos)兩個(gè)具備超時(shí)特性的方法筒严,如果在給定的超時(shí)時(shí)間內(nèi)沒有終止,那么將會(huì)從超時(shí)方法中返回殿如。
  • ThreadLocal馁菜,即線程變量茴扁,是一個(gè)以ThreadLocal對(duì)象為鍵、任意對(duì)象為值的存儲(chǔ)結(jié)構(gòu)汪疮,這個(gè)結(jié)構(gòu)被附帶在線程上峭火,也就是說一個(gè)線程可以根據(jù)一個(gè)ThreadLocal對(duì)象查詢到綁定在這個(gè)線程上的一個(gè)值,可以通過set(T)方法來設(shè)置一個(gè)值智嚷,在當(dāng)前線程下再通過get()方法獲取到原先設(shè)置的值卖丸。

第五章 Java中的鎖

5.1 Lock接口

  • 鎖是用來控制多個(gè)線程訪問共享資源的方式,雖然它缺少了隱式獲取釋放鎖的便捷性纤勒,但是卻擁有了鎖獲取與釋放的可操作性坯苹、可中斷地獲取鎖以及超時(shí)獲取鎖等多種synchronized關(guān)鍵字不具備的同步特性。

  • finally塊中釋放鎖摇天,目的是保證在獲取到鎖之后粹湃,最終能夠被釋放。

  • Lock接口提供的synchronized關(guān)鍵字不具備的主要特性

  • 嘗試非阻塞地獲取鎖:當(dāng)前線程嘗試獲取鎖泉坐,如果這一時(shí)刻沒有被其它線程獲取到为鳄,則成功獲取并持有鎖。

  • 能被中斷地獲取鎖:與synchronized不同腕让,獲取到鎖的線程能夠響應(yīng)中斷孤钦,當(dāng)獲取到鎖的線程被中斷時(shí),中斷異常將會(huì)被拋出纯丸,同時(shí)鎖會(huì)被釋放偏形。

  • 在指定的截止時(shí)間之前獲取鎖:如果截止時(shí)間到了仍舊無法獲取鎖,則返回觉鼻。

  • LockAPI

  • void lock():獲取鎖俊扭,調(diào)用該方法當(dāng)前線程將會(huì)獲取鎖,當(dāng)鎖獲得后坠陈,從該方法返回萨惑。

  • void lockInterruptibly():可中斷地獲取鎖捐康,該方法會(huì)響應(yīng)中斷,即在鎖的獲取中可以中斷當(dāng)前線程庸蔼。

  • boolean tryLock():嘗試非阻塞地獲取鎖解总,調(diào)用該方法后立刻返回,如果能夠獲取則返回true姐仅,否則返回false花枫。

  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException:當(dāng)前線程在超時(shí)時(shí)間內(nèi)獲得了鎖;當(dāng)前線程在超時(shí)時(shí)間內(nèi)被中斷萍嬉;超時(shí)時(shí)間結(jié)束乌昔,返回false

  • void unlock():釋放鎖壤追。

  • Condition newCondition():獲取等待/通知組件,該組件和當(dāng)前的鎖綁定供屉,當(dāng)前線程只有獲得了鎖行冰,才能調(diào)用該組件的wait()方法,而調(diào)用后伶丐,當(dāng)前線程將釋放鎖悼做。

5.2 隊(duì)列同步器

5.2.1 隊(duì)列同步器接口

  • 隊(duì)列同步器AbstractQueuedSynchronizer,是用來構(gòu)建鎖或者其它同步組件的基礎(chǔ)框架哗魂,它使用了一個(gè)int成員變量表示同步狀態(tài)肛走,通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作。
  • 同步器是實(shí)現(xiàn)鎖的關(guān)鍵录别,在鎖的實(shí)現(xiàn)中聚合同步器朽色,利用同步器實(shí)現(xiàn)鎖的語義∽樘猓可以理解二者之間的關(guān)系:鎖是面向使用者的葫男,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)細(xì)節(jié)崔列;同步器面向鎖的實(shí)現(xiàn)者梢褐,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理赵讯、線程的排隊(duì)盈咳、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注地領(lǐng)域边翼。
  • 同步器的設(shè)計(jì)是基于模板方法模式鱼响,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合自定義同步組件的實(shí)現(xiàn)中讯私,并調(diào)用同步器的模板方法热押,而這些模板方法將會(huì)調(diào)用使用者重載的方法西傀。
  • 重寫同步器指定的方法時(shí),需要使用同步器提供的3個(gè)方法來訪問或者修改同步狀態(tài):
  • getState():獲取當(dāng)前同步狀態(tài)桶癣。
  • setState(int newState):設(shè)置當(dāng)前同步狀態(tài)拥褂。
  • compareAndSetState(int except, int update):使用CAS設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原始性牙寞。
  • 同步器提供的模板方法基本上分為以下3類:
  • 獨(dú)占式獲取與釋放同步狀態(tài)
  • 共享式獲取與釋放同步狀態(tài)
  • 查詢同步隊(duì)列中的等待線程情況饺鹃。

5.2.2 隊(duì)列同步器的實(shí)現(xiàn)分析

5.2.2.1 同步隊(duì)列

  • 同步器依賴內(nèi)部的同步隊(duì)列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí)间雀,同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造稱為一個(gè)節(jié)點(diǎn)悔详,并將其加入同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程惹挟,當(dāng)同步狀態(tài)釋放時(shí)茄螃,會(huì)把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)连锯。
  • 同步器中包含了兩個(gè)節(jié)點(diǎn)類型的引用归苍,一個(gè)指向頭節(jié)點(diǎn),而另一個(gè)指向尾節(jié)點(diǎn)运怖。
  • 當(dāng)一個(gè)線程成功地獲取了同步狀態(tài)拼弃,其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列當(dāng)中摇展,而這個(gè)加入到隊(duì)列地過程必須要保證線程安全吻氧,因此同步器提供了一個(gè)基于CAS的設(shè)置尾節(jié)點(diǎn)的方法。
  • 同步隊(duì)列遵循FIFO咏连,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn)盯孙,首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn)捻勉,而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)镀梭。

5.2.2.2 獨(dú)占式同步狀態(tài)獲取與釋放

  • 通過調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感踱启,即由于線程獲取同步狀態(tài)失敗而進(jìn)入同步隊(duì)列后报账,后續(xù)對(duì)線程進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列中移除埠偿。
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

它的主要邏輯是:

  • (1)調(diào)用自定義同步器實(shí)現(xiàn)的tryAcquire方法透罢,該方法保證線程安全的獲取同步狀態(tài),這個(gè)方法需要隊(duì)列同步器的實(shí)現(xiàn)者來重寫冠蒋。
  • (2)如果同步狀態(tài)獲取失敗羽圃,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式Node.EXCLUSIVE)并通過addWaiter(Node node)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部。
   private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) {
           node.prev = pred;
           //1.確保節(jié)點(diǎn)能夠線程安全地被添加
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       //2.通過死循環(huán)來確保節(jié)點(diǎn)的正確添加抖剿,在"死循環(huán)"中只有通過`CAS`將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后朽寞,當(dāng)前線程才能從該方法返回识窿,否則當(dāng)前線程不斷地進(jìn)行嘗試。
       enq(node);
       return node;
   }

   private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else {
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }
  • (3)最后調(diào)用acquireQueued(Node node, int arg)方法脑融,使得該節(jié)點(diǎn)以死循環(huán)的方式獲取同步狀態(tài)喻频。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //1.得到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                //2.如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),只有在這種情況下獲取同步狀態(tài)成功
                if (p == head && tryAcquire(arg)) {
                    //3.將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 可以看到肘迎,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài)甥温,而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài),這是由于:

    • 頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn)妓布,而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)后姻蚓,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)匣沼。
    • 維護(hù)同步隊(duì)列的FIFO原則狰挡,通過簡(jiǎn)單地判斷自己的前驅(qū)是否為頭節(jié)點(diǎn),這樣就使得節(jié)點(diǎn)的釋放規(guī)則符合FIFO释涛,并且也便于對(duì)過早通知的處理(過早通知是指前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)的線程由于中斷而被喚醒
  • 當(dāng)同步狀態(tài)獲取成功之后圆兵,當(dāng)前線程從acquire(int arg)方法返回,如果對(duì)于鎖這種并發(fā)組件而言枢贿,代表著當(dāng)前線程獲取了鎖。

  • 通過調(diào)用同步器的release(int arg)方法可以釋放同步狀態(tài)刀脏,該方法執(zhí)行時(shí)局荚,會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程愈污。

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              unparkSuccessor(h);
          return true;
      }
      return false;
  }
  • (4)如果獲取不到耀态,則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來實(shí)現(xiàn)暂雹。

總結(jié):
1.在獲取同步狀態(tài)時(shí)首装,同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中進(jìn)行自旋杭跪;
2.移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)仙逻。
3.在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法來釋放同步狀態(tài)涧尿,然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)系奉。

5.2.2.3 共享式同步狀態(tài)獲取與釋放

  • 共享式獲取和獨(dú)占式獲取最主要的區(qū)別在于同一時(shí)刻能夠有多個(gè)線程同時(shí)獲取到同步狀態(tài)
  • 通過調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài):
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared返回int類型姑廉,如果同步狀態(tài)獲取成功缺亮,那么返回值大于等于0,否則進(jìn)入自旋狀態(tài)桥言;成功獲取到同步狀態(tài)并退出自旋狀態(tài)的條件是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)萌踱,并且返回值大于等于0.

  • 共享式獲取葵礼,通過調(diào)用releaseShared(int arg)方法釋放同步狀態(tài),tryReleaseShared必須要確保同步狀態(tài)線程安全釋放并鸵,一般是通過循環(huán)或CAS來保證的鸳粉,因?yàn)獒尫磐綘顟B(tài)的操作會(huì)同時(shí)來自多個(gè)線程。
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

5.2.2.4 獨(dú)占式超時(shí)獲取同步狀態(tài)

  • 通過調(diào)用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超時(shí)獲取同步狀態(tài)能真,即在指定的時(shí)間段內(nèi)獲取同步狀態(tài)赁严。
  • 在此之前,一個(gè)線程如果獲取不到鎖而被阻塞在synchronized之外粉铐,對(duì)該線程進(jìn)行中斷操作疼约,此時(shí)線程中斷的標(biāo)志位會(huì)被修改,但線程依舊會(huì)阻塞在synchronized上蝙泼;如果通過acquireInterruptibly(int arg)方法獲取程剥,如果在等待過程中被中斷,會(huì)立刻返回汤踏,并拋出InterruptedException異常织鲸。
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1.計(jì)算出截止時(shí)間.
        final long deadline = System.nanoTime() + nanosTimeout;
       //2.加入節(jié)點(diǎn)
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //3.取出前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                //4.如果獲取成功則直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //5.如果到了超時(shí)時(shí)間,則直接返回
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //6.如果在自旋過程中被中斷溪胶,那么拋出異常返回
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

通過上面的代碼可以知道搂擦,它和獨(dú)占式獲取的區(qū)別在于未獲取到同步狀態(tài)時(shí)的處理邏輯:獨(dú)占式獲取在獲取不到是會(huì)一直自旋等待;而超時(shí)獲取則會(huì)使當(dāng)前線程等待nanosTimeout納秒哗脖,如果當(dāng)前線程在這個(gè)時(shí)間內(nèi)沒有獲取到同步狀態(tài)瀑踢,將會(huì)從等待邏輯中自動(dòng)返回。

5.2.2.5 自定義同步組件 - TwinsLock

TwinsLock只允許至多兩個(gè)線程同時(shí)訪問才避,超過兩個(gè)線程的訪問將會(huì)被阻塞橱夭。

public class TwinsLock implements Lock {
    
    private final Sync sync = new Sync(2);
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        
        Sync(int count) {
            //初始值為2.
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for(;;) {
                //1.獲得當(dāng)前的狀態(tài).
                int current = getState();
                //2.newCount表示剩余可獲取同步狀態(tài)的線程數(shù)
                int newCount = current - arg;
                //3.如果小于0,那么返回獲取同步狀態(tài)失敗;否則通過CAS確保設(shè)置的正確性.
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    //4.當(dāng)返回值大于等于0表示獲取同步狀態(tài)成功.
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;) {
                int current = getState();
                //將可獲取同步狀態(tài)的線程數(shù)加1.
                int newCount = current + current;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        
    }

    @NonNull
    @Override
    public Condition newCondition() {
        return null;
    }
}

測(cè)試用例:

    public static void createTwinsLock() {
        final Lock lock = new TwinsLock();
        class TwinsLockThread extends Thread {

            @Override
            public void run() {
                Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
                while (true) {
                    lock.lock();
                    try {
                        Thread.sleep(1000);
                        Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
                        lock.unlock();
                    }
                }
            }
        }
        for (int i = 0; i < 10; i++) {
            Thread thread = new TwinsLockThread();
            thread.start();
        }
    }

5.3 重入鎖

  • 重入鎖ReentrantLock表示該鎖能夠支持一個(gè)線程對(duì)資源的重復(fù)加鎖。
  • 如果在絕對(duì)時(shí)間上桑逝,先對(duì)鎖獲取的請(qǐng)求一定先被滿足棘劣,那么這個(gè)鎖是公平的,公平地獲取鎖楞遏,也就是等待時(shí)間最長(zhǎng)的線程最優(yōu)先地獲取鎖茬暇。

5.3.1 實(shí)現(xiàn)重進(jìn)入

重進(jìn)入需要解決兩個(gè)問題:

  • 線程再次獲取鎖,鎖需要去識(shí)別獲取鎖地線程是否為當(dāng)前占據(jù)鎖的線程橱健,如果是而钞,則再次獲取成功。
  • 鎖的最終釋放拘荡,線程重復(fù)n次獲取了鎖臼节,隨后在第n次釋放該鎖后,其它線程能夠獲取到該鎖。

5.3.2 公平與非公平鎖的區(qū)別

  • 公平與否是針對(duì)獲取鎖而言的网缝,如果一個(gè)鎖是公平的巨税,那么鎖的獲取順序就應(yīng)該符合請(qǐng)求的絕對(duì)時(shí)間順序,即FIFO粉臊。
  • 公平鎖的區(qū)別在于加入了同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)的判斷草添,如果該方法返回true,表示有線程比當(dāng)前線程更早地請(qǐng)求獲取鎖扼仲,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖远寸;而對(duì)于非公平鎖,只要CAS設(shè)置同步狀態(tài)成功即可屠凶。
  • 因此驰后,公平鎖每次都是從同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)獲取到鎖,而非公平鎖出現(xiàn)了一個(gè)線程連續(xù)獲取鎖的情況矗愧。
  • 非公平鎖可能使線程饑餓灶芝,但其極少的線程切換,保證了更大的吞吐量唉韭。

5.4 讀寫鎖

  • 之前提到的鎖都是排它鎖夜涕,這些鎖在同一時(shí)刻只允許一個(gè)線程進(jìn)行訪問,而讀寫鎖在同一時(shí)刻可以允許多個(gè)讀線程訪問属愤,但是在寫線程訪問時(shí)女器,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護(hù)了一對(duì)鎖住诸,一個(gè)讀鎖和一個(gè)寫鎖晓避,通過分離讀鎖和寫鎖,使得并發(fā)性有很大提升只壳。
  • 并發(fā)包提供的讀寫鎖的實(shí)現(xiàn)是ReentrantReadWrireLock,它支持公平性選擇暑塑、重進(jìn)入吼句、鎖降級(jí)(寫鎖能夠降級(jí)為讀鎖)

ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個(gè)方法事格,即readLockwriteLock惕艳,而其實(shí)現(xiàn)ReentrantReadWriteLock

  • getReadLockCount:返回當(dāng)前讀鎖被獲取的次數(shù)。
  • getReadHoldCount:返回當(dāng)前線程獲取讀鎖的次數(shù)驹愚。
  • isWriteLocked:判斷寫鎖是否被獲取远搪。
  • getWriteHoldCount:返回當(dāng)前線程獲取寫鎖的次數(shù)。

下面是一個(gè)讀寫鎖的簡(jiǎn)單用例:

public class ReadWriteCache {
    
    static Map<String, Object> map = new HashMap<>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    
    public static Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    
    public static Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    
    public static void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

5.4.2 讀寫鎖的實(shí)現(xiàn)分析

  • 讀寫狀態(tài)的設(shè)計(jì)
    讀寫鎖需要在同步狀態(tài)(一個(gè)整形變量逢捺,高16表示讀谁鳍,低16表示寫)上維護(hù)多個(gè)讀線程和一個(gè)寫線程的狀態(tài),使得該狀態(tài)的設(shè)計(jì)成為讀寫鎖實(shí)現(xiàn)的關(guān)鍵。
  • 寫鎖的獲取與釋放
    寫鎖是一個(gè)支持重進(jìn)入的排它鎖倘潜,如果當(dāng)前線程已經(jīng)獲取了寫鎖绷柒,則增加寫狀態(tài)。如果當(dāng)前線程在獲取寫鎖時(shí)涮因,讀鎖已經(jīng)被獲取废睦,則當(dāng)前線程進(jìn)入等待狀態(tài)。
    原因在于:讀寫鎖要確保寫鎖的操作對(duì)讀鎖可見养泡,如果允許讀鎖在已經(jīng)被獲取的情況下對(duì)寫鎖的獲取嗜湃,那么正在運(yùn)行的其它讀線程就無法感知到當(dāng)前寫線程的操作。
  • 讀鎖的獲取與釋放
    讀鎖是一個(gè)支持重進(jìn)入的共享鎖澜掩,它能被多個(gè)線程同時(shí)獲取购披,在沒有其它寫線程訪問(或者寫狀態(tài)為0)時(shí),讀鎖總是被成功地獲取输硝,而所做的也只是(線程安全)增加讀狀態(tài)今瀑。
  • 鎖降級(jí)
    鎖降級(jí)是指把持住(當(dāng)前擁有的)寫鎖点把,再獲取到讀鎖橘荠,隨后釋放(先前擁有的)寫鎖的過程。

5.6 Condition接口

Condition定義了等待/通知兩種類型的方法郎逃,當(dāng)前線程調(diào)用這些方法時(shí)哥童,需要提前獲取到Condition對(duì)象關(guān)聯(lián)的鎖,Condition是依賴Lock對(duì)象的褒翰。
當(dāng)調(diào)用await()方法后贮懈,當(dāng)前線程會(huì)釋放鎖并在此等待,而其他線程調(diào)用Condition對(duì)象的signal方法优训,通知當(dāng)前線程后朵你,當(dāng)前線程才從await方法返回,并且在返回前已經(jīng)獲取了鎖揣非。
獲取一個(gè)Condition必須通過LocknewCondition方法抡医,下面是一個(gè)有界隊(duì)列的示例:

public class BoundedQueue<T> {

    private Object[] items;
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) { //如果當(dāng)前隊(duì)列內(nèi)的個(gè)數(shù)等于最大長(zhǎng)度,那么釋放鎖.
                notFull.await();
            }
            if (++addIndex == items.length) { //如果已經(jīng)到了尾部,那么從頭開始.
                addIndex = 0;
            }
            ++count;
            notEmpty.signal(); //通知阻塞在"空"條件上的線程.
        } finally {
            lock.unlock();
        }
    }

    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); //如果當(dāng)前隊(duì)列的個(gè)數(shù)等于0,那么釋放鎖.
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal(); //通知阻塞在"滿"條件上的線程.
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

Condition的方法:

  • await():當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知signal或中斷,當(dāng)前線程進(jìn)入運(yùn)行狀態(tài)且從await返回的情況:

  • 其他線程調(diào)用該ConditionsignalsignalAll方法早敬。

  • 其它線程中斷當(dāng)前線程(interrupt)忌傻。

  • 如果當(dāng)前等待線程從await方法返回,那么表明當(dāng)前線程已經(jīng)獲取了Condition對(duì)象所對(duì)應(yīng)的鎖搞监。

  • awaitUninerruptibly:對(duì)中斷不敏感

  • long await Nanos(long):加入了超時(shí)的判斷水孩,返回值是(nanosTimeout - 實(shí)際耗時(shí)),如果返回值是0或者負(fù)數(shù)琐驴,那么可以認(rèn)定為超時(shí)俘种。

  • boolean awaitUntil(Data):直到某個(gè)固定時(shí)間秤标。

  • signal:?jiǎn)拘岩粋€(gè)等待在Condition上的線程。

  • signalAll:?jiǎn)拘阉械却?code>Condition上的線程安疗。

5.6.2 Condition的實(shí)現(xiàn)

ConditionObjectAbstractQueuedSynchronizer的內(nèi)部類抛杨,每個(gè)Condition對(duì)象都包含著一個(gè)隊(duì)列。

1.等待隊(duì)列

在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程的引用荐类,該線程就是在Condition對(duì)象上等待的線程怖现,同步隊(duì)列和等待隊(duì)列中節(jié)點(diǎn)的類型都是同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronizer.Node
由于Condition的實(shí)現(xiàn)是同步器的內(nèi)部類玉罐,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法屈嗤,相當(dāng)于每個(gè)Condition都擁有所屬同步器的引用。
當(dāng)調(diào)用await方法時(shí)吊输,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn)饶号,并將節(jié)點(diǎn)從尾部加入到等待隊(duì)列,也就是將同步隊(duì)列移動(dòng)到Condition隊(duì)列當(dāng)中季蚂。

2.等待

調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖茫船,也就是同步隊(duì)列中的首節(jié)點(diǎn),它不是直接加入到等待隊(duì)列當(dāng)中扭屁,而是通過addConditionWaiter()方法把當(dāng)前線程構(gòu)造成一個(gè)新的節(jié)點(diǎn)并將其加入到等待隊(duì)列當(dāng)中算谈。

3.通知

調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖,接著獲取等待隊(duì)列的首節(jié)點(diǎn)料滥,將其移動(dòng)到同步隊(duì)列并使用LockSupport喚醒節(jié)點(diǎn)中的線程然眼。
被喚醒的線程,將從await方法中的while中返回葵腹,進(jìn)而調(diào)用同步器的acquireQueued方法加入到獲取同步狀態(tài)的競(jìng)爭(zhēng)中高每。
ConditionsignalAll方法,相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal方法践宴,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中鲸匿,并喚醒每個(gè)節(jié)點(diǎn)。

六阻肩、Java并發(fā)容器和框架

6.1 ConcurrentHashMap

ConcurrentHashMap是線程安全并且高效的HashMap晒骇,其它的類似容器有以下缺點(diǎn):

  • HashMap在并發(fā)執(zhí)行put操作時(shí),會(huì)導(dǎo)致Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)磺浙,就會(huì)產(chǎn)生死循環(huán)獲取Entry
  • HashTable使用synchronized來保證線程安全徒坡,但在線程競(jìng)爭(zhēng)激烈的情況下HashTable的效率非常低下撕氧。
    ConcurrentHashMap高效的原因在于它采用鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段地存儲(chǔ)喇完,然后給每段數(shù)據(jù)配一把鎖伦泥,當(dāng)一個(gè)線程占用鎖并且訪問一段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問。

6.1.2 ConcurrentHashMap的結(jié)構(gòu)

ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成:

  • Segment是一種可重入鎖不脯,在ConcurrentHashMap里面扮演鎖的角色府怯;
  • HashEntry則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。

一個(gè)ConcurrentHashMap里包含一個(gè)Segment數(shù)組防楷,它的結(jié)構(gòu)和HashMap類似牺丙,是一種數(shù)組和鏈表結(jié)構(gòu)。
一個(gè)Segment里包含一個(gè)HashEntry數(shù)組复局,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)的元素冲簿,每個(gè)Segment守護(hù)著一個(gè)HashEntry里的元素,當(dāng)對(duì)HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí)亿昏,必須首先獲得與它對(duì)應(yīng)的Segment鎖峦剔。

6.1.5 ConcurrentHashMap的操作

get
get的高效在于整個(gè)get過程中不需要加鎖,除非讀到的值是空才會(huì)加鎖重讀角钩。原因是它的get方法將要使用的共享變量都設(shè)為volatile吝沫,能夠在線程間保持可見性,能夠被多線程同時(shí)讀递礼,并且不會(huì)讀到過期的值惨险,例如用于統(tǒng)計(jì)當(dāng)前Segment大小的count字段和用于存儲(chǔ)值的HashEntryvalue
put
put方法里需要對(duì)共享變量進(jìn)行寫入操作宰衙,所以為了線程安全平道,在操作共享變量之前必須加鎖,put首先定位到Segment供炼,然后在Segment里進(jìn)行插入操作一屋。
size
先嘗試2次通過不鎖住Segment的方式來統(tǒng)計(jì)各個(gè)Segment的大小,如果統(tǒng)計(jì)的過程中袋哼,容器的count發(fā)生了變化冀墨,則再用加鎖的方式來統(tǒng)計(jì)所有Segment的大小。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列涛贯,它采用先進(jìn)先出的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序诽嘉,它采用CAS算法來實(shí)現(xiàn)。

6.2.1 入隊(duì)列

入隊(duì)主要做兩件事情:

  • 將入隊(duì)節(jié)點(diǎn)設(shè)置成當(dāng)前隊(duì)列尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)弟翘。
  • 更新tail節(jié)點(diǎn)虫腋,如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)不為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail節(jié)點(diǎn)稀余;如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)為空悦冀,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tailnext節(jié)點(diǎn)。

在多線程情況下睛琳,如果有一個(gè)線程正在入隊(duì)盒蟆,那么它必須先獲取尾節(jié)點(diǎn)踏烙,然后設(shè)置尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為入隊(duì)節(jié)點(diǎn),但這時(shí)可能有另外一個(gè)線程插隊(duì)了历等,那么隊(duì)列的尾節(jié)點(diǎn)就會(huì)發(fā)生變化讨惩,這時(shí)第一個(gè)線程要暫停入隊(duì)操作,然后重新獲取尾節(jié)點(diǎn)寒屯。
整個(gè)入隊(duì)操作主要做兩件事:

  • 定位出尾節(jié)點(diǎn)荐捻。
  • 使用CAS算法將入隊(duì)節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的next節(jié)點(diǎn),如不成功則重試浩螺。

6.3 阻塞隊(duì)列

6.3.1 阻塞隊(duì)列

阻塞隊(duì)列是一個(gè)支持兩個(gè)附加操作的隊(duì)列靴患,這兩個(gè)附加的操作支持阻塞的插入和移除方法:

  • 當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程要出,直到隊(duì)列不滿鸳君。
  • 當(dāng)隊(duì)列空時(shí),獲取元素的線程會(huì)等待隊(duì)列為空患蹂。

在阻塞隊(duì)列不可用時(shí)或颊,附加操作提供了4種處理方式:拋出異常、返回特殊值传于、一直阻塞囱挑、超時(shí)退出。每種方式通過調(diào)用不同的方法來實(shí)現(xiàn)沼溜。
Java里面提供了7種阻塞隊(duì)列平挑。

6.4 Fork/Join框架

用于并行執(zhí)行任務(wù)的框架,是把一個(gè)大任務(wù)分割成若干個(gè)小任務(wù)系草,最終匯總每個(gè)小任務(wù)結(jié)果后得到大人物結(jié)果的框架通熄。
Fork/Join使用兩個(gè)類來完成事情:

  • ForkJoinTask:它提供了fork()join()操作的機(jī)制,通常情況下找都,我們繼承它的子類:有返回結(jié)果的RecursiveTask和沒有返回結(jié)果的RecursiveAction唇辨。
  • ForkJoinPoolForkJoinTask需要通過ForkJoinPool來添加。
    ForkJoinTask在執(zhí)行的時(shí)候可能會(huì)拋出異常能耻,但是我們沒有辦法在主線程里直接捕獲異常赏枚,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)取消了晓猛。
    ForkJoinPoolForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成饿幅,ForkJoinTask數(shù)組負(fù)責(zé)將存放程序提交給ForkJoinPool的任務(wù),而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)戒职。

七栗恩、Java中的13個(gè)原子操作類

Atomic包里提供了:原子更新基本類型、原子更新數(shù)組帕涌、原子更新引用和原子更新屬性摄凡。

7.1 原子更新基本類型:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

基本方法:

  • int addAndGet(int delta):以原子方式將輸入的值與當(dāng)前的值相加,并返回結(jié)果蚓曼。
  • boolean compareAndSet(int expect, int update):如果當(dāng)前的數(shù)值等于預(yù)期值亲澡,則以原子方式將該值設(shè)置為輸入的值。
  • int getAndIncrement():以原子方式加1纫版,并返回自增前的值床绪。
  • void lazySet(int newValue):最終會(huì)設(shè)置成newValue,可能會(huì)導(dǎo)致其他線程在之后的一小段時(shí)間內(nèi)還是讀到舊值其弊。
  • int getAndSet(int newValue):以原子方式設(shè)置為newValue的值癞己,并返回舊值。

7.2 原子更新引用類型

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

基本方法:

  • int addAndGet(int i, int delta):以原子方式將輸入值和索引i的元素相加梭伐。
  • boolean compareAndSet(int i, int expect, int update):如果當(dāng)前值等于預(yù)期值痹雅,則以原子方式將數(shù)組位置i的元素設(shè)置成update值。

7.3 原子更新引用類型

用于原子更新多個(gè)變量糊识,提供了3種類型:

  • AtomicReference:原子更新引用類型绩社。
  • AtomicReferenceFieldUpdater:原子更新引用類型里的字段。
  • AtomicMarkableReference:原子更新帶有標(biāo)記位的引用類型赂苗。

7.4 原子更新字段類

  • AtomicIntegerFieldUpdater:原子更新整形的字段的更新器愉耙。
  • AtomicLongFieldUpdater:原子更新長(zhǎng)整形字段的更新器舌镶。
  • AtomicStampedReference:原子更新帶有版本號(hào)的引用類型眶俩。

原子地更新字段需要兩步:

  • 因?yàn)樵痈伦侄晤惗际浅橄箢悾看问褂玫臅r(shí)候必須使用靜態(tài)方法newUpdater創(chuàng)建一個(gè)更新器授艰,并且需要設(shè)置想要更新的類和屬性败砂。
  • 更新類的字段必須使用public volatile來修飾赌渣。

八、Java中的并發(fā)工具類

九吠卷、Java中的線程池

線程池的優(yōu)點(diǎn):降低資源消耗锡垄,提高響應(yīng)速度,提高線程的可管理性祭隔。

9.1 線程池的實(shí)現(xiàn)原理

線程池的處理流程如下:

  • 判斷核心線程池是否已滿货岭,如果不是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)疾渴;如果已滿千贯,則進(jìn)入下個(gè)流程。
  • 判斷工作隊(duì)列是否已滿搞坝,如果不是搔谴,則將提交的任務(wù)存儲(chǔ)在工作隊(duì)列里;如果已滿桩撮,則進(jìn)入下個(gè)流程敦第。
  • 判斷線程池的線程是否都處于工作狀態(tài)峰弹,如果沒有,則創(chuàng)建一個(gè)新的工作線程芜果;如果已滿鞠呈,則交給飽和策略來處理。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //1.添加進(jìn)入核心線程.
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //2.添加進(jìn)入隊(duì)列.
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //3.添加進(jìn)入非核心線程.
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

在以上的三步中右钾,除了加入隊(duì)列不用獲取全局鎖以外蚁吝,其它兩種情況都需要獲取,為了盡可能地避免獲取全局鎖舀射,在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于corePoolSize)窘茁,幾乎所有的execute方法調(diào)用都是加入到隊(duì)列當(dāng)中。

9.2 線程池的使用

9.2.1 線程池的創(chuàng)建

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:當(dāng)提交一個(gè)任務(wù)到線程池時(shí)脆烟,線程池會(huì)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù)山林,即使其它空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建。
  • runnableTaskQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列浩淘,可以選擇:
  • ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列捌朴。
  • LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,吞吐量高于前者张抄。
  • SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列砂蔽,每個(gè)插入操作必須等待另一個(gè)線程調(diào)用了移除操作,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列署惯。
  • PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無限阻塞隊(duì)列左驾。
  • maxPoolSize:允許創(chuàng)建的最大線程數(shù)。
  • ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠极谊。
  • RejectExecutionHandler:飽和策略诡右。
  • keepAliveTime:線程池的工作線程空閑后,保持存活的時(shí)間轻猖。
  • TimeUnit:線程保持活動(dòng)的單位帆吻。

9.2.2 向線程池提交任務(wù)

  • execute(Runnable runnable):提交不需要返回值的任務(wù)。
  • Future<Object> future = executor.submit(haveReturnValuetask):用于提交需要返回值的任務(wù)咙边,線程池會(huì)返回一個(gè)future類型任務(wù)猜煮,可以用它來判斷任務(wù)是否執(zhí)行成功,并且可以通過get方法來獲取返回值败许,get方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成王带。

9.2.3 關(guān)閉線程池

  • shutdownNow:首先將線程池的狀態(tài)設(shè)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程市殷,并返回等待執(zhí)行任務(wù)的列表愕撰。
  • shutdown:將線程池的狀態(tài)置為SHUTDOWN,然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

十搞挣、Executor框架

(1)在上層带迟,Java多線程程序通常把應(yīng)用分解為若干個(gè)任務(wù),然后使用用戶級(jí)的調(diào)度器(Executor框架)將這些任務(wù)映射為固定數(shù)量的線程囱桨。
(2)在HotSpot VM的線程模型中邮旷,Java線程再被一對(duì)一映射為本地操作系統(tǒng)線程,Java線程啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)本地操作系統(tǒng)線程蝇摸,當(dāng)該線程終止時(shí),這個(gè)操作系統(tǒng)線程也會(huì)被回收办陷。
(3)操作系統(tǒng)會(huì)調(diào)度所有線程并將它們分配給可用的CPU貌夕。

Executor框架

由三個(gè)部分組成:

  • 任務(wù),即Runnable接口或Callable接口民镜。
  • 任務(wù)的執(zhí)行啡专,包括核心接口Executor,以及繼承自ExecutorExecutorService制圈,還有它的兩個(gè)關(guān)鍵類ThreadPoolExecutor(用來執(zhí)行任務(wù))和ScheduledThreadPoolExecutor(可以在給定的延遲后運(yùn)行命令们童,或者定期執(zhí)行命令)。
  • 異步計(jì)算的結(jié)果鲸鹦,包括接口Future和實(shí)現(xiàn)類FutureTask慧库。

10.2 ThreadPoolExecutor詳解

通過工具類Executors,可以創(chuàng)建以下三種類型的ThreadPoolExecutor馋嗜,調(diào)用靜態(tài)創(chuàng)建方法之后齐板,會(huì)返回ExecutorService

  • FixedThreadPool
    可重用固定線程數(shù)的線程池;如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize葛菇,則創(chuàng)建新線程來執(zhí)行任務(wù)甘磨;如果等于corePoolSize,將任務(wù)加入到無界隊(duì)列LinkedBlockingQueue當(dāng)中眯停;多余的空閑線程將會(huì)被立即終止济舆。
  • SingleThreadPool
    單個(gè)woker線程的executorcorePoolSizemaximumPoolSize為1莺债;采用無界隊(duì)列作為工作隊(duì)列滋觉。
  • CacheThreadPool
    采用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,其corePoolSize為0九府,maximumPool是無界的椎瘟;其中的空閑線程最多等待60s。
    如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時(shí)侄旬,CacheThreadPool會(huì)不斷創(chuàng)建新線程肺蔚,極端情況下,CacheThreadPool會(huì)因?yàn)閯?chuàng)建過多線程而耗盡CPU資源儡羔。

10.3 ScheduledThreadPoolExecutor詳解

用來在給定的延遲之后執(zhí)行任務(wù)宣羊,或者定期執(zhí)行任務(wù)璧诵,并且可以在指定的構(gòu)造函數(shù)中指定多個(gè)對(duì)應(yīng)的后臺(tái)線程數(shù)。
它采用DelayQueue這個(gè)無界隊(duì)列作為工作隊(duì)列仇冯,其執(zhí)行分為兩個(gè)部分:

  • 當(dāng)調(diào)用ScheduledThreadPoolExecutorscheduleAtFixedRate()或者scheduleWithFIxedDelay之宿,它會(huì)向DelayQueue中添加ScheduledFutureTask
  • 線程池中的線程從DelayQueue中獲取ScheduledFutureTask苛坚。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末比被,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子泼舱,更是在濱河造成了極大的恐慌等缀,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件娇昙,死亡現(xiàn)場(chǎng)離奇詭異尺迂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)冒掌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門噪裕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人股毫,你說我怎么就攤上這事膳音。” “怎么了铃诬?”我有些...
    開封第一講書人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵严蓖,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我氧急,道長(zhǎng)颗胡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任吩坝,我火速辦了婚禮毒姨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钉寝。我一直安慰自己弧呐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開白布嵌纲。 她就那樣靜靜地躺著俘枫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逮走。 梳的紋絲不亂的頭發(fā)上鸠蚪,一...
    開封第一講書人閱讀 51,115評(píng)論 1 296
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼茅信。 笑死盾舌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蘸鲸。 我是一名探鬼主播妖谴,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼酌摇!你這毒婦竟也來了膝舅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤窑多,失蹤者是張志新(化名)和其女友劉穎铸史,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體怯伊,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年判沟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了耿芹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挪哄。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡迹炼,死狀恐怖砸彬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情砂碉,我是刑警寧澤刻两,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布增蹭,位于F島的核電站磅摹,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏饼灿。R本人自食惡果不足惜碍彭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望窗骑。 院中可真熱鬧创译,春花似錦软族、人聲如沸残制。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恼布。三九已至折汞,卻和暖如春爽待,著一層夾襖步出監(jiān)牢的瞬間鸟款,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留禁炒,地道東北人霍比。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓们豌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親障癌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子涛浙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容