?編寫優(yōu)質(zhì)的并發(fā)代碼是一件難度極高的事情。Java語言從第一版本開始內(nèi)置了對多線程的支持童漩,這一點(diǎn)在當(dāng)年是非常了不起的弄贿,但是當(dāng)我們對并發(fā)編程有了更深刻的認(rèn)識和更多的實(shí)踐后驰徊,實(shí)現(xiàn)并發(fā)編程就有了更多的方案和更好的選擇嘹吨。本文是對并發(fā)編程的一點(diǎn)總結(jié)和思考,同時也分享了Java 5以后的版本中如何編寫并發(fā)代碼的一點(diǎn)點(diǎn)經(jīng)驗(yàn)厅须。
為什么需要并發(fā)
??并發(fā)其實(shí)是一種解耦合的策略,它幫助我們把做什么(目標(biāo))和什么時候做(時機(jī))分開危尿。這樣做可以明顯改進(jìn)應(yīng)用程序的吞吐量(獲得更多的CPU調(diào)度時間)和結(jié)構(gòu)(程序有多個部分在協(xié)同工作)能庆。做過Java Web開發(fā)的人都知道,Java Web中的Servlet程序在Servlet容器的支持下采用單實(shí)例多線程的工作模式脚线,Servlet容器為你處理了并發(fā)問題。
誤解和正解
??最常見的對并發(fā)編程的誤解有以下這些:
-并發(fā)總能改進(jìn)性能(并發(fā)在CPU有很多空閑時間時能明顯改進(jìn)程序的性能弥搞,但當(dāng)線程數(shù)量較多的時候邮绿,線程間頻繁的調(diào)度切換反而會讓系統(tǒng)的性能下降)
-編寫并發(fā)程序無需修改原有的設(shè)計(目的與時機(jī)的解耦往往會對系統(tǒng)結(jié)構(gòu)產(chǎn)生巨大的影響)
-在使用Web或EJB容器時不用關(guān)注并發(fā)問題(只有了解了容器在做什么,才能更好的使用容器)
??下面的這些說法才是對并發(fā)客觀的認(rèn)識:
-編寫并發(fā)程序會在代碼上增加額外的開銷
-正確的并發(fā)是非常復(fù)雜的攀例,即使對于很簡單的問題
-并發(fā)中的缺陷因?yàn)椴灰字噩F(xiàn)也不容易被發(fā)現(xiàn)
-并發(fā)往往需要對設(shè)計策略從根本上進(jìn)行修改
并發(fā)編程的原則和技巧
單一職責(zé)原則
分離并發(fā)相關(guān)代碼和其他代碼(并發(fā)相關(guān)代碼有自己的開發(fā)船逮、修改和調(diào)優(yōu)生命周期)。
限制數(shù)據(jù)作用域
兩個線程修改共享對象的同一字段時可能會相互干擾粤铭,導(dǎo)致不可預(yù)期的行為挖胃,解決方案之一是構(gòu)造臨界區(qū),但是必須限制臨界區(qū)的數(shù)量梆惯。
使用數(shù)據(jù)副本
數(shù)據(jù)副本是避免共享數(shù)據(jù)的好方法酱鸭,復(fù)制出來的對象只是以只讀的方式對待。Java 5的java.util.concurrent包中增加一個名為CopyOnWriteArrayList的類垛吗,它是List接口的子類型凹髓,所以你可以認(rèn)為它是ArrayList的線程安全的版本,它使用了寫時復(fù)制的方式創(chuàng)建數(shù)據(jù)副本進(jìn)行操作來避免對共享數(shù)據(jù)并發(fā)訪問而引發(fā)的問題怯屉。
線程應(yīng)盡可能獨(dú)立
讓線程存在于自己的世界中蔚舀,不與其他線程共享數(shù)據(jù)。有過Java Web開發(fā)經(jīng)驗(yàn)的人都知道锨络,Servlet就是以單實(shí)例多線程的方式工作赌躺,和每個請求相關(guān)的數(shù)據(jù)都是通過Servlet子類的service方法(或者是doGet或doPost方法)的參數(shù)傳入的。只要Servlet中的代碼只使用局部變量羡儿,Servlet就不會導(dǎo)致同步問題礼患。Spring MVC的控制器也是這么做的,從請求中獲得的對象都是以方法的參數(shù)傳入而不是作為類的成員失受,很明顯Struts 2的做法就正好相反讶泰,因此Struts 2中作為控制器的Action類都是每個請求對應(yīng)一個實(shí)例。
Java 5以前的并發(fā)編程
Java的線程模型建立在搶占式線程調(diào)度的基礎(chǔ)上拂到,也就是說:
所有線程可以很容易的共享同一進(jìn)程中的對象痪署。
能夠引用這些對象的任何線程都可以修改這些對象。
為了保護(hù)數(shù)據(jù)兄旬,對象可以被鎖住狼犯。
??Java基于線程和鎖的并發(fā)過于底層余寥,而且使用鎖很多時候都是很萬惡的,因?yàn)樗喈?dāng)于讓所有的并發(fā)都變成了排隊(duì)等待悯森。
??在Java 5以前宋舷,可以用synchronized關(guān)鍵字來實(shí)現(xiàn)鎖的功能,它可以用在代碼塊和方法上瓢姻,表示在執(zhí)行整個代碼塊或方法之前線程必須取得合適的鎖祝蝠。對于類的非靜態(tài)方法(成員方法)而言,這意味這要取得對象實(shí)例的鎖幻碱,對于類的靜態(tài)方法(類方法)而言绎狭,要取得類的Class對象的鎖,對于同步代碼塊褥傍,程序員可以指定要取得的是那個對象的鎖儡嘶。
??不管是同步代碼塊還是同步方法,每次只有一個線程可以進(jìn)入恍风,如果其他線程試圖進(jìn)入(不管是同一同步塊還是不同的同步塊)蹦狂,JVM會將它們掛起(放入到等鎖池中)。這種結(jié)構(gòu)在并發(fā)理論中稱為臨界區(qū)(critical section)朋贬。這里我們可以對Java中用synchronized實(shí)現(xiàn)同步和鎖的功能做一個總結(jié):
只能鎖定對象凯楔,不能鎖定基本數(shù)據(jù)類型
被鎖定的對象數(shù)組中的單個對象不會被鎖定
同步方法可以視為包含整個方法的synchronized(this) { … }代碼塊
靜態(tài)同步方法會鎖定它的Class對象
內(nèi)部類的同步是獨(dú)立于外部類的
synchronized修飾符并不是方法簽名的組成部分,所以不能出現(xiàn)在接口的方法聲明中
非同步的方法不關(guān)心鎖的狀態(tài)锦募,它們在同步方法運(yùn)行時仍然可以得以運(yùn)行
synchronized實(shí)現(xiàn)的鎖是可重入的鎖啼辣。
??在JVM內(nèi)部,為了提高效率御滩,同時運(yùn)行的每個線程都會有它正在處理的數(shù)據(jù)的緩存副本鸥拧,當(dāng)我們使用synchronzied進(jìn)行同步的時候,真正被同步的是在不同線程中表示被鎖定對象的內(nèi)存塊(副本數(shù)據(jù)會保持和主內(nèi)存的同步削解,現(xiàn)在知道為什么要用同步這個詞匯了吧)富弦,簡單的說就是在同步塊或同步方法執(zhí)行完后,對被鎖定的對象做的任何修改要在釋放鎖之前寫回到主內(nèi)存中氛驮;在進(jìn)入同步塊得到鎖之后腕柜,被鎖定對象的數(shù)據(jù)是從主內(nèi)存中讀出來的,持有鎖的線程的數(shù)據(jù)副本一定和主內(nèi)存中的數(shù)據(jù)視圖是同步的 矫废。
??在Java最初的版本中盏缤,就有一個叫volatile的關(guān)鍵字,它是一種簡單的同步的處理機(jī)制蓖扑,因?yàn)楸籿olatile修飾的變量遵循以下規(guī)則:
變量的值在使用之前總會從主內(nèi)存中再讀取出來唉铜。
對變量值的修改總會在完成之后寫回到主內(nèi)存中。
??使用volatile關(guān)鍵字可以在多線程環(huán)境下預(yù)防編譯器不正確的優(yōu)化假設(shè)(編譯器可能會將在一個線程中值不會發(fā)生改變的變量優(yōu)化成常量)律杠,但只有修改時不依賴當(dāng)前狀態(tài)(讀取時的值)的變量才應(yīng)該聲明為volatile變量潭流。
??不變模式也是并發(fā)編程時可以考慮的一種設(shè)計竞惋。讓對象的狀態(tài)是不變的,如果希望修改對象的狀態(tài)灰嫉,就會創(chuàng)建對象的副本并將改變寫入副本而不改變原來的對象拆宛,這樣就不會出現(xiàn)狀態(tài)不一致的情況,因此不變對象是線程安全的讼撒。Java中我們使用頻率極高的String類就采用了這樣的設(shè)計浑厚。如果對不變模式不熟悉,可以閱讀閻宏博士的《Java與模式》一書的第34章根盒。說到這里你可能也體會到final關(guān)鍵字的重要意義了瞻颂。
Java 5的并發(fā)編程
不管今后的Java向著何種方向發(fā)展或者滅忙,Java 5絕對是Java發(fā)展史中一個極其重要的版本郑象,這個版本提供的各種語言特性我們不在這里討論(有興趣的可以閱讀我的另一篇文章《Java的第20年:從Java版本演進(jìn)看編程技術(shù)的發(fā)展》),但是我們必須要感謝Doug Lea在Java 5中提供了他里程碑式的杰作java.util.concurrent包茬末,它的出現(xiàn)讓Java的并發(fā)編程有了更多的選擇和更好的工作方式厂榛。Doug Lea的杰作主要包括以下內(nèi)容:
更好的線程安全的容器
線程池和相關(guān)的工具類
可選的非阻塞解決方案
顯示的鎖和信號量機(jī)制
下面我們對這些東西進(jìn)行一一解讀。
原子類
Java 5中的java.util.concurrent包下面有一個atomic子包丽惭,其中有幾個以Atomic打頭的類击奶,例如AtomicInteger和AtomicLong。它們利用了現(xiàn)代處理器的特性责掏,可以用非阻塞的方式完成原子操作柜砾,代碼如下所示:
顯示鎖
基于synchronized關(guān)鍵字的鎖機(jī)制有以下問題:
鎖只有一種類型,而且對所有同步操作都是一樣的作用
鎖只能在代碼塊或方法開始的地方獲得换衬,在結(jié)束的地方釋放
線程要么得到鎖痰驱,要么阻塞,沒有其他的可能性
Java 5對鎖機(jī)制進(jìn)行了重構(gòu)瞳浦,提供了顯示的鎖担映,這樣可以在以下幾個方面提升鎖機(jī)制:
可以添加不同類型的鎖,例如讀取鎖和寫入鎖
可以在一個方法中加鎖叫潦,在另一個方法中解鎖
可以使用tryLock方式嘗試獲得鎖蝇完,如果得不到鎖可以等待、回退或者干點(diǎn)別的事情矗蕊,當(dāng)然也可以在超時之后放棄操作
顯示的鎖都實(shí)現(xiàn)了java.util.concurrent.Lock接口短蜕,主要有兩個實(shí)現(xiàn)類:
ReentrantLock - 比synchronized稍微靈活一些的重入鎖
ReentrantReadWriteLock - 在讀操作很多寫操作很少時性能更好的一種重入鎖
對于如何使用顯示鎖,可以參考我的Java面試系列文章《Java面試題集51-70》中第60題的代碼傻咖。只有一點(diǎn)需要提醒朋魔,解鎖的方法unlock的調(diào)用最好能夠在finally塊中,因?yàn)檫@里是釋放外部資源最好的地方卿操,當(dāng)然也是釋放鎖的最佳位置铺厨,因?yàn)椴还苷.惓缎玫?赡芏家尫诺翩i來給其他線程以運(yùn)行的機(jī)會。
CountDownLatch
CountDownLatch是一種簡單的同步模式解滓,它讓一個線程可以等待一個或多個線程完成它們的工作從而避免對臨界資源并發(fā)訪問所引發(fā)的各種問題赃磨。下面借用別人的一段代碼(我對它做了一些重構(gòu))來演示CountDownLatch是如何工作的。
ConcurrentHashMap
??ConcurrentHashMap是HashMap在并發(fā)環(huán)境下的版本洼裤,大家可能要問邻辉,既然已經(jīng)可以通過Collections.synchronizedMap獲得線程安全的映射型容器,為什么還需要ConcurrentHashMap呢腮鞍?因?yàn)橥ㄟ^Collections工具類獲得的線程安全的HashMap會在讀寫數(shù)據(jù)時對整個容器對象上鎖值骇,這樣其他使用該容器的線程無論如何也無法再獲得該對象的鎖,也就意味著要一直等待前一個獲得鎖的線程離開同步代碼塊之后才有機(jī)會執(zhí)行移国。實(shí)際上吱瘩,HashMap是通過哈希函數(shù)來確定存放鍵值對的桶(桶是為了解決哈希沖突而引入的),修改HashMap時并不需要將整個容器鎖住迹缀,只需要鎖住即將修改的“桶”就可以了使碾。HashMap的數(shù)據(jù)結(jié)構(gòu)如下圖所示。
??此外祝懂,ConcurrentHashMap還提供了原子操作的方法票摇,如下所示:
putIfAbsent:如果還沒有對應(yīng)的鍵值對映射,就將其添加到HashMap中砚蓬。
remove:如果鍵存在而且值與當(dāng)前狀態(tài)相等(equals比較結(jié)果為true)矢门,則用原子方式移除該鍵值對映射
replace:替換掉映射中元素的原子操作
CopyOnWriteArrayList
??CopyOnWriteArrayList是ArrayList在并發(fā)環(huán)境下的替代品。CopyOnWriteArrayList通過增加寫時復(fù)制語義來避免并發(fā)訪問引起的問題灰蛙,也就是說任何修改操作都會在底層創(chuàng)建一個列表的副本祟剔,也就意味著之前已有的迭代器不會碰到意料之外的修改。這種方式對于不要嚴(yán)格讀寫同步的場景非常有用摩梧,因?yàn)樗峁┝烁玫男阅芟坷S涀。M量減少鎖的使用障本,因?yàn)槟莿荼貛硇阅艿南陆担▽?shù)據(jù)庫中數(shù)據(jù)的并發(fā)訪問不也是如此嗎教届?如果可以的話就應(yīng)該放棄悲觀鎖而使用樂觀鎖),CopyOnWriteArrayList很明顯也是通過犧牲空間獲得了時間(在計算機(jī)的世界里驾霜,時間和空間通常是不可調(diào)和的矛盾案训,可以犧牲空間來提升效率獲得時間,當(dāng)然也可以通過犧牲時間來減少對空間的使用)粪糙。
??可以通過下面兩段代碼的運(yùn)行狀況來驗(yàn)證一下CopyOnWriteArrayList是不是線程安全的容器强霎。
上面的代碼會在運(yùn)行時產(chǎn)生ArrayIndexOutOfBoundsException,試一試將上面代碼25行的ArrayList換成CopyOnWriteArrayList再重新運(yùn)行蓉冈。
List<Double> list = new CopyOnWriteArrayList<>();
1
Queue
??隊(duì)列是一個無處不在的美妙概念城舞,它提供了一種簡單又可靠的方式將資源分發(fā)給處理單元(也可以說是將工作單元分配給待處理的資源轩触,這取決于你看待問題的方式)。實(shí)現(xiàn)中的并發(fā)編程模型很多都依賴隊(duì)列來實(shí)現(xiàn)家夺,因?yàn)樗梢栽诰€程之間傳遞工作單元脱柱。
??Java 5中的BlockingQueue就是一個在并發(fā)環(huán)境下非常好用的工具,在調(diào)用put方法向隊(duì)列中插入元素時拉馋,如果隊(duì)列已滿榨为,它會讓插入元素的線程等待隊(duì)列騰出空間;在調(diào)用take方法從隊(duì)列中取元素時煌茴,如果隊(duì)列為空随闺,取出元素的線程就會阻塞。
??可以用BlockingQueue來實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者并發(fā)模型(下一節(jié)中有介紹)蔓腐,當(dāng)然在Java 5以前也可以通過wait和notify來實(shí)現(xiàn)線程調(diào)度矩乐,比較一下兩種代碼就知道基于已有的并發(fā)工具類來重構(gòu)并發(fā)代碼到底好在哪里了。
基于wait和notify的實(shí)現(xiàn)
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 公共常量
* @author 駱昊
*
*/
class Constants {
? ? public static final int MAX_BUFFER_SIZE = 10;
? ? public static final int NUM_OF_PRODUCER = 2;
? ? public static final int NUM_OF_CONSUMER = 3;
}
/**
* 工作任務(wù)
* @author 駱昊
*
*/
class Task {
? ? private String id;? // 任務(wù)的編號
? ? public Task() {
? ? ? ? id = UUID.randomUUID().toString();
? ? }
? ? @Override
? ? public String toString() {
? ? ? ? return "Task[" + id + "]";
? ? }
}
/**
* 消費(fèi)者
* @author 駱昊
*
*/
class Consumer implements Runnable {
? ? private List<Task> buffer;
? ? public Consumer(List<Task> buffer) {
? ? ? ? this.buffer = buffer;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? while(true) {
? ? ? ? ? ? synchronized(buffer) {
? ? ? ? ? ? ? ? while(buffer.isEmpty()) {
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? buffer.wait();
? ? ? ? ? ? ? ? ? ? } catch(InterruptedException e) {
? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Task task = buffer.remove(0);
? ? ? ? ? ? ? ? buffer.notifyAll();
? ? ? ? ? ? ? ? System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
/**
* 生產(chǎn)者
* @author 駱昊
*
*/
class Producer implements Runnable {
? ? private List<Task> buffer;
? ? public Producer(List<Task> buffer) {
? ? ? ? this.buffer = buffer;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? while(true) {
? ? ? ? ? ? synchronized (buffer) {
? ? ? ? ? ? ? ? while(buffer.size() >= Constants.MAX_BUFFER_SIZE) {
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? buffer.wait();
? ? ? ? ? ? ? ? ? ? } catch(InterruptedException e) {
? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Task task = new Task();
? ? ? ? ? ? ? ? buffer.add(task);
? ? ? ? ? ? ? ? buffer.notifyAll();
? ? ? ? ? ? ? ? System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
public class Test06 {
? ? public static void main(String[] args) {
? ? ? ? List<Task> buffer = new ArrayList<>(Constants.MAX_BUFFER_SIZE);
? ? ? ? ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
? ? ? ? for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
? ? ? ? ? ? es.execute(new Producer(buffer));
? ? ? ? }
? ? ? ? for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
? ? ? ? ? ? es.execute(new Consumer(buffer));
? ? ? ? }
? ? }
}
基于BlockingQueue的實(shí)現(xiàn)
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 公共常量
* @author 駱昊
*
*/
class Constants {
? ? public static final int MAX_BUFFER_SIZE = 10;
? ? public static final int NUM_OF_PRODUCER = 2;
? ? public static final int NUM_OF_CONSUMER = 3;
}
/**
* 工作任務(wù)
* @author 駱昊
*
*/
class Task {
? ? private String id;? // 任務(wù)的編號
? ? public Task() {
? ? ? ? id = UUID.randomUUID().toString();
? ? }
? ? @Override
? ? public String toString() {
? ? ? ? return "Task[" + id + "]";
? ? }
}
/**
* 消費(fèi)者
* @author 駱昊
*
*/
class Consumer implements Runnable {
? ? private BlockingQueue<Task> buffer;
? ? public Consumer(BlockingQueue<Task> buffer) {
? ? ? ? this.buffer = buffer;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? while(true) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Task task = buffer.take();
? ? ? ? ? ? ? ? System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
/**
* 生產(chǎn)者
* @author 駱昊
*
*/
class Producer implements Runnable {
? ? private BlockingQueue<Task> buffer;
? ? public Producer(BlockingQueue<Task> buffer) {
? ? ? ? this.buffer = buffer;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? while(true) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Task task = new Task();
? ? ? ? ? ? ? ? buffer.put(task);
? ? ? ? ? ? ? ? System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
public class Test07 {
? ? public static void main(String[] args) {
? ? ? ? BlockingQueue<Task> buffer = new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE);
? ? ? ? ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
? ? ? ? for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
? ? ? ? ? ? es.execute(new Producer(buffer));
? ? ? ? }
? ? ? ? for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
? ? ? ? ? ? es.execute(new Consumer(buffer));
? ? ? ? }
? ? }
}
??使用BlockingQueue后代碼優(yōu)雅了很多回论。
??在繼續(xù)下面的探討之前散罕,我們還是重溫一下幾個概念:
生產(chǎn)者-消費(fèi)者
??一個或多個生產(chǎn)者創(chuàng)建某些工作并將其置于緩沖區(qū)或隊(duì)列中,一個或多個消費(fèi)者會從隊(duì)列中獲得這些工作并完成之透葛。這里的緩沖區(qū)或隊(duì)列是臨界資源。當(dāng)緩沖區(qū)或隊(duì)列放滿的時候卿樱,生產(chǎn)這會被阻塞僚害;而緩沖區(qū)或隊(duì)列為空的時候,消費(fèi)者會被阻塞繁调。生產(chǎn)者和消費(fèi)者的調(diào)度是通過二者相互交換信號完成的萨蚕。
讀者-寫者
??當(dāng)存在一個主要為讀者提供信息的共享資源,它偶爾會被寫者更新蹄胰,但是需要考慮系統(tǒng)的吞吐量岳遥,又要防止饑餓和陳舊資源得不到更新的問題。在這種并發(fā)模型中裕寨,如何平衡讀者和寫者是最困難的浩蓉,當(dāng)然這個問題至今還是一個被熱議的問題,恐怕必須根據(jù)具體的場景來提供合適的解決方案而沒有那種放之四海而皆準(zhǔn)的方法(不像我在國內(nèi)的科研文獻(xiàn)中看到的那樣)宾袜。
哲學(xué)家進(jìn)餐
??1965年捻艳,荷蘭計算機(jī)科學(xué)家圖靈獎得主Edsger Wybe Dijkstra提出并解決了一個他稱之為哲學(xué)家進(jìn)餐的同步問題。這個問題可以簡單地描述如下:五個哲學(xué)家圍坐在一張圓桌周圍庆猫,每個哲學(xué)家面前都有一盤通心粉认轨。由于通心粉很滑,所以需要兩把叉子才能夾住月培。相鄰兩個盤子之間放有一把叉子如下圖所示嘁字。哲學(xué)家的生活中有兩種交替活動時段:即吃飯和思考恩急。當(dāng)一個哲學(xué)家覺得餓了時,他就試圖分兩次去取其左邊和右邊的叉子纪蜒,每次拿一把衷恭,但不分次序。如果成功地得到了兩把叉子霍掺,就開始吃飯匾荆,吃完后放下叉子繼續(xù)思考。
??把上面問題中的哲學(xué)家換成線程杆烁,把叉子換成競爭的臨界資源牙丽,上面的問題就是線程競爭資源的問題。如果沒有經(jīng)過精心的設(shè)計兔魂,系統(tǒng)就會出現(xiàn)死鎖烤芦、活鎖、吞吐量下降等問題析校。
??下面是用信號量原語來解決哲學(xué)家進(jìn)餐問題的代碼构罗,使用了Java 5并發(fā)工具包中的Semaphore類(代碼不夠漂亮但是已經(jīng)足以說明問題了)。
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* 存放線程共享信號量的上下問
* @author 駱昊
*
*/
class AppContext {
? ? public static final int NUM_OF_FORKS = 5;? // 叉子數(shù)量(資源)
? ? public static final int NUM_OF_PHILO = 5;? // 哲學(xué)家數(shù)量(線程)
? ? public static Semaphore[] forks;? ? // 叉子的信號量
? ? public static Semaphore counter;? ? // 哲學(xué)家的信號量
? ? static {
? ? ? ? forks = new Semaphore[NUM_OF_FORKS];
? ? ? ? for (int i = 0, len = forks.length; i < len; ++i) {
? ? ? ? ? ? forks[i] = new Semaphore(1);? ? // 每個叉子的信號量為1
? ? ? ? }
? ? ? ? counter = new Semaphore(NUM_OF_PHILO - 1);? // 如果有N個哲學(xué)家智玻,最多只允許N-1人同時取叉子
? ? }
? ? /**
? ? * 取得叉子
? ? * @param index 第幾個哲學(xué)家
? ? * @param leftFirst 是否先取得左邊的叉子
? ? * @throws InterruptedException
? ? */
? ? public static void putOnFork(int index, boolean leftFirst) throws InterruptedException {
? ? ? ? if(leftFirst) {
? ? ? ? ? ? forks[index].acquire();
? ? ? ? ? ? forks[(index + 1) % NUM_OF_PHILO].acquire();
? ? ? ? }
? ? ? ? else {
? ? ? ? ? ? forks[(index + 1) % NUM_OF_PHILO].acquire();
? ? ? ? ? ? forks[index].acquire();
? ? ? ? }
? ? }
? ? /**
? ? * 放回叉子
? ? * @param index 第幾個哲學(xué)家
? ? * @param leftFirst 是否先放回左邊的叉子
? ? * @throws InterruptedException
? ? */
? ? public static void putDownFork(int index, boolean leftFirst) throws InterruptedException {
? ? ? ? if(leftFirst) {
? ? ? ? ? ? forks[index].release();
? ? ? ? ? ? forks[(index + 1) % NUM_OF_PHILO].release();
? ? ? ? }
? ? ? ? else {
? ? ? ? ? ? forks[(index + 1) % NUM_OF_PHILO].release();
? ? ? ? ? ? forks[index].release();
? ? ? ? }
? ? }
}
/**
* 哲學(xué)家
* @author 駱昊
*
*/
class Philosopher implements Runnable {
? ? private int index;? ? ? // 編號
? ? private String name;? ? // 名字
? ? public Philosopher(int index, String name) {
? ? ? ? this.index = index;
? ? ? ? this.name = name;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? while(true) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? AppContext.counter.acquire();
? ? ? ? ? ? ? ? boolean leftFirst = index % 2 == 0;
? ? ? ? ? ? ? ? AppContext.putOnFork(index, leftFirst);
? ? ? ? ? ? ? ? System.out.println(name + "正在吃意大利面(通心粉)...");? // 取到兩個叉子就可以進(jìn)食
? ? ? ? ? ? ? ? AppContext.putDownFork(index, leftFirst);
? ? ? ? ? ? ? ? AppContext.counter.release();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
public class Test04 {
? ? public static void main(String[] args) {
? ? ? ? String[] names = { "駱昊", "王大錘", "張三豐", "楊過", "李莫愁" };? // 5位哲學(xué)家的名字
//? ? ? ExecutorService es = Executors.newFixedThreadPool(AppContext.NUM_OF_PHILO); // 創(chuàng)建固定大小的線程池
//? ? ? for(int i = 0, len = names.length; i < len; ++i) {
//? ? ? ? ? es.execute(new Philosopher(i, names[i]));? // 啟動線程
//? ? ? }
//? ? ? es.shutdown();
? ? ? ? for(int i = 0, len = names.length; i < len; ++i) {
? ? ? ? ? ? new Thread(new Philosopher(i, names[i])).start();
? ? ? ? }
? ? }
}
?現(xiàn)實(shí)中的并發(fā)問題基本上都是這三種模型或者是這三種模型的變體遂唧。
測試并發(fā)代碼
對并發(fā)代碼的測試也是非常棘手的事情,棘手到無需說明大家也很清楚的程度吊奢,所以這里我們只是探討一下如何解決這個棘手的問題盖彭。我們建議大家編寫一些能夠發(fā)現(xiàn)問題的測試并經(jīng)常性的在不同的配置和不同的負(fù)載下運(yùn)行這些測試。不要忽略掉任何一次失敗的測試页滚,線程代碼中的缺陷可能在上萬次測試中僅僅出現(xiàn)一次召边。具體來說有這么幾個注意事項(xiàng):
不要將系統(tǒng)的失效歸結(jié)于偶發(fā)事件,就像拉不出屎的時候不能怪地球沒有引力裹驰。
先讓非并發(fā)代碼工作起來隧熙,不要試圖同時找到并發(fā)和非并發(fā)代碼中的缺陷。
編寫可以在不同配置環(huán)境下運(yùn)行的線程代碼幻林。
編寫容易調(diào)整的線程代碼贞盯,這樣可以調(diào)整線程使性能達(dá)到最優(yōu)。
讓線程的數(shù)量多于CPU或CPU核心的數(shù)量沪饺,這樣CPU調(diào)度切換過程中潛在的問題才會暴露出來邻悬。
讓并發(fā)代碼在不同的平臺上運(yùn)行。
通過自動化或者硬編碼的方式向并發(fā)代碼中加入一些輔助測試的代碼随闽。
Java 7的并發(fā)編程
??Java 7中引入了TransferQueue父丰,它比BlockingQueue多了一個叫transfer的方法,如果接收線程處于等待狀態(tài),該操作可以馬上將任務(wù)交給它蛾扇,否則就會阻塞直至取走該任務(wù)的線程出現(xiàn)攘烛。可以用TransferQueue代替BlockingQueue镀首,因?yàn)樗梢垣@得更好的性能坟漱。
??剛才忘記了一件事情,Java 5中還引入了Callable接口更哄、Future接口和FutureTask接口芋齿,通過他們也可以構(gòu)建并發(fā)應(yīng)用程序,代碼如下所示成翩。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test07 {
? ? private static final int POOL_SIZE = 10;
? ? static class CalcThread implements Callable<Double> {
? ? ? ? private List<Double> dataList = new ArrayList<>();
? ? ? ? public CalcThread() {
? ? ? ? ? ? for(int i = 0; i < 10000; ++i) {
? ? ? ? ? ? ? ? dataList.add(Math.random());
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? @Override
? ? ? ? public Double call() throws Exception {
? ? ? ? ? ? double total = 0;
? ? ? ? ? ? for(Double d : dataList) {
? ? ? ? ? ? ? ? total += d;
? ? ? ? ? ? }
? ? ? ? ? ? return total / dataList.size();
? ? ? ? }
? ? }
? ? public static void main(String[] args) {
? ? ? ? List<Future<Double>> fList = new ArrayList<>();
? ? ? ? ExecutorService es = Executors.newFixedThreadPool(POOL_SIZE);
? ? ? ? for(int i = 0; i < POOL_SIZE; ++i) {
? ? ? ? ? ? fList.add(es.submit(new CalcThread()));
? ? ? ? }
? ? ? ? for(Future<Double> f : fList) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? System.out.println(f.get());
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? es.shutdown();
? ? }
Callable接口也是一個單方法接口觅捆,顯然這是一個回調(diào)方法,類似于函數(shù)式編程中的回調(diào)函數(shù)麻敌,在Java 8 以前栅炒,Java中還不能使用Lambda表達(dá)式來簡化這種函數(shù)式編程。和Runnable接口不同的是Callable接口的回調(diào)方法call方法會返回一個對象术羔,這個對象可以用將來時的方式在線程執(zhí)行結(jié)束的時候獲得信息赢赊。上面代碼中的call方法就是將計算出的10000個0到1之間的隨機(jī)小數(shù)的平均值返回,我們通過一個Future接口的對象得到了這個返回值级历。目前最新的Java版本中释移,Callable接口和Runnable接口都被打上了@FunctionalInterface的注解,也就是說它可以用函數(shù)式編程的方式(Lambda表達(dá)式)創(chuàng)建接口對象寥殖。
??下面是Future接口的主要方法:
get():獲取結(jié)果玩讳。如果結(jié)果還沒有準(zhǔn)備好,get方法會阻塞直到取得結(jié)果扛禽;當(dāng)然也可以通過參數(shù)設(shè)置阻塞超時時間锋边。
cancel():在運(yùn)算結(jié)束前取消皱坛。
isDone():可以用來判斷運(yùn)算是否結(jié)束编曼。
??Java 7中還提供了分支/合并(fork/join)框架,它可以實(shí)現(xiàn)線程池中任務(wù)的自動調(diào)度剩辟,并且這種調(diào)度對用戶來說是透明的掐场。為了達(dá)到這種效果,必須按照用戶指定的方式對任務(wù)進(jìn)行分解贩猎,然后再將分解出的小型任務(wù)的執(zhí)行結(jié)果合并成原來任務(wù)的執(zhí)行結(jié)果熊户。這顯然是運(yùn)用了分治法(divide-and-conquer)的思想。下面的代碼使用了分支/合并框架來計算1到10000的和吭服,當(dāng)然對于如此簡單的任務(wù)根本不需要分支/合并框架嚷堡,因?yàn)榉种Ш秃喜⒈旧硪矔硪欢ǖ拈_銷,但是這里我們只是探索一下在代碼中如何使用分支/合并框架,讓我們的代碼能夠充分利用現(xiàn)代多核CPU的強(qiáng)大運(yùn)算能力蝌戒。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
class Calculator extends RecursiveTask<Integer> {
? ? private static final long serialVersionUID = 7333472779649130114L;
? ? private static final int THRESHOLD = 10;
? ? private int start;
? ? private int end;
? ? public Calculator(int start, int end) {
? ? ? ? this.start = start;
? ? ? ? this.end = end;
? ? }
? ? @Override
? ? public Integer compute() {
? ? ? ? int sum = 0;
? ? ? ? if ((end - start) < THRESHOLD) {? ? // 當(dāng)問題分解到可求解程度時直接計算結(jié)果
? ? ? ? ? ? for (int i = start; i <= end; i++) {
? ? ? ? ? ? ? ? sum += i;
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? int middle = (start + end) >>> 1;
? ? ? ? ? ? // 將任務(wù)一分為二
? ? ? ? ? ? Calculator left = new Calculator(start, middle);
? ? ? ? ? ? Calculator right = new Calculator(middle + 1, end);
? ? ? ? ? ? left.fork();
? ? ? ? ? ? right.fork();
? ? ? ? ? ? // 注意:由于此處是遞歸式的任務(wù)分解串塑,也就意味著接下來會二分為四,四分為八...
? ? ? ? ? ? sum = left.join() + right.join();? // 合并兩個子任務(wù)的結(jié)果
? ? ? ? }
? ? ? ? return sum;
? ? }
}
public class Test08 {
? ? public static void main(String[] args) throws Exception {
? ? ? ? ForkJoinPool forkJoinPool = new ForkJoinPool();
? ? ? ? Future<Integer> result = forkJoinPool.submit(new Calculator(1, 10000));
? ? ? ? System.out.println(result.get());
? ? }
}
伴隨著Java 7的到來北苟,Java中默認(rèn)的數(shù)組排序算法已經(jīng)不再是經(jīng)典的快速排序(雙樞軸快速排序)了桩匪,新的排序算法叫TimSort,它是歸并排序和插入排序的混合體友鼻,TimSort可以通過分支合并框架充分利用現(xiàn)代處理器的多核特性傻昙,從而獲得更好的性能(更短的排序時間)。