Java并發(fā)編程的總結(jié)與思考

編寫優(yōu)質(zhì)的并發(fā)代碼是一件難度極高的事情。Java語(yǔ)言從第一版本開始內(nèi)置了對(duì)多線程的支持剃根,這一點(diǎn)在當(dāng)年是非常了不起的傻盟,但是當(dāng)我們對(duì)并發(fā)編程有了更深刻的認(rèn)識(shí)和更多的實(shí)踐后,實(shí)現(xiàn)并發(fā)編程就有了更多的方案和更好的選擇途乃。本文是對(duì)并發(fā)編程的一點(diǎn)總結(jié)和思考际歼,同時(shí)也分享了Java 5以后的版本中如何編寫并發(fā)代碼的一點(diǎn)點(diǎn)經(jīng)驗(yàn)牲芋。

為什么需要并發(fā)

并發(fā)其實(shí)是一種解耦合的策略,它幫助我們把做什么(目標(biāo))和什么時(shí)候做(時(shí)機(jī))分開熬甫。這樣做可以明顯改進(jìn)應(yīng)用程序的吞吐量(獲得更多的CPU調(diào)度時(shí)間)和結(jié)構(gòu)(程序有多個(gè)部分在協(xié)同工作)猎塞。做過Java Web開發(fā)的人都知道试读,Java Web中的Servlet程序在Servlet容器的支持下采用單實(shí)例多線程的工作模式,Servlet容器幫助你處理了并發(fā)請(qǐng)求的問題邢享。

誤解和正解

最常見的對(duì)并發(fā)編程的誤解有以下這些:

A. 并發(fā)總能改進(jìn)性能鹏往。(真相:并發(fā)在CPU有很多空閑時(shí)間時(shí)能明顯改進(jìn)程序的性能,但當(dāng)線程數(shù)量較多的時(shí)候骇塘,線程間頻繁的調(diào)度切換反而會(huì)讓系統(tǒng)的性能下降)
B. 編寫并發(fā)程序無需修改原有的設(shè)計(jì)伊履。(真相:目的與時(shí)機(jī)的解耦往往會(huì)對(duì)系統(tǒng)結(jié)構(gòu)產(chǎn)生巨大的影響)
C. 在使用Web或EJB容器時(shí)不用關(guān)注并發(fā)問題。(真相:只有了解了容器在做什么款违,才能更好的使用容器)

下面的這些說法是對(duì)并發(fā)編程比較客觀的認(rèn)識(shí):

A. 編寫并發(fā)程序會(huì)在代碼上增加額外的開銷唐瀑。
B. 正確的并發(fā)是非常復(fù)雜的,即使對(duì)于很簡(jiǎn)單的問題插爹。
C. 并發(fā)中的缺陷因?yàn)椴灰字噩F(xiàn)也不容易被發(fā)現(xiàn)哄辣。
D. 并發(fā)往往需要對(duì)設(shè)計(jì)策略從根本上進(jìn)行修改。

并發(fā)編程的原則和技巧

  1. 單一職責(zé)原則:分離并發(fā)相關(guān)代碼和其他代碼(并發(fā)相關(guān)代碼有自己的開發(fā)赠尾、修改和調(diào)優(yōu)生命周期)力穗。
  2. 限制數(shù)據(jù)作用域:兩個(gè)線程修改共享對(duì)象的同一字段時(shí)可能會(huì)相互干擾,導(dǎo)致不可預(yù)期的行為气嫁,解決方案之一是構(gòu)造臨界區(qū)当窗,但是必須限制臨界區(qū)的數(shù)量。
  3. 使用數(shù)據(jù)副本:數(shù)據(jù)副本是避免共享數(shù)據(jù)的好方法寸宵,復(fù)制出來的對(duì)象只是以只讀的方式對(duì)待崖面。Java 5的java.util.concurrent包中增加一個(gè)名為CopyOnWriteArrayList的類,它是List接口的子類型梯影,所以你可以認(rèn)為它是ArrayList的線程安全的版本巫员,它使用了寫時(shí)復(fù)制的方式創(chuàng)建數(shù)據(jù)副本進(jìn)行操作來避免對(duì)共享數(shù)據(jù)并發(fā)訪問而引發(fā)的問題。
  4. 線程應(yīng)盡可能獨(dú)立:讓線程存在于自己的世界中甲棍,不與其他線程共享數(shù)據(jù)简识。有過Java Web開發(fā)經(jīng)驗(yàn)的人都知道,Servlet就是以單實(shí)例多線程的方式工作感猛,和每個(gè)請(qǐng)求相關(guān)的數(shù)據(jù)都是通過Servlet子類的service方法(或者是doGet或doPost方法)的參數(shù)傳入的七扰。只要Servlet中的代碼只使用局部變量,Servlet就不會(huì)導(dǎo)致同步問題唱遭。Spring MVC的控制器也是這么做的戳寸,從請(qǐng)求中獲得的對(duì)象都是以方法的參數(shù)傳入而不是作為類的成員,很明顯Struts 2的做法就正好相反拷泽,因此Struts 2中作為控制器的Action類都是每個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)實(shí)例疫鹊。

Java 5以前的并發(fā)編程

Java的線程模型建立在搶占式線程調(diào)度的基礎(chǔ)上,也就是說:

  1. 所有線程可以很容易的共享同一進(jìn)程中的對(duì)象司致。
  2. 能夠引用這些對(duì)象的任何線程都可以修改這些對(duì)象拆吆。
  3. 為了保護(hù)數(shù)據(jù),對(duì)象可以被鎖住脂矫。

Java基于線程和鎖的并發(fā)過于底層枣耀,而且使用鎖很多時(shí)候都是很萬(wàn)惡的,因?yàn)樗喈?dāng)于讓所有的并發(fā)都變成了排隊(duì)等待庭再。

在Java 5以前捞奕,可以用synchronized關(guān)鍵字來實(shí)現(xiàn)鎖的功能牺堰,它可以用在代碼塊和方法上,表示在執(zhí)行整個(gè)代碼塊或方法之前線程必須取得合適的鎖颅围。對(duì)于類的非靜態(tài)方法(成員方法)而言伟葫,這意味這要取得對(duì)象實(shí)例的鎖,對(duì)于類的靜態(tài)方法(類方法)而言院促,要取得類的Class對(duì)象的鎖筏养,對(duì)于同步代碼塊,程序員可以指定要取得的是那個(gè)對(duì)象的鎖常拓。

不管是同步代碼塊還是同步方法渐溶,每次只有一個(gè)線程可以進(jìn)入,如果其他線程試圖進(jìn)入(不管是同一同步塊還是不同的同步塊)弄抬,JVM會(huì)將它們掛起(放入到等鎖池中)茎辐。這種結(jié)構(gòu)在并發(fā)理論中稱為臨界區(qū)(critical section)。這里我們可以對(duì)Java中用synchronized實(shí)現(xiàn)同步和鎖的功能做一個(gè)總結(jié):

  • 只能鎖定對(duì)象眉睹,不能鎖定基本數(shù)據(jù)類型荔茬。
  • 被鎖定的對(duì)象數(shù)組中的單個(gè)對(duì)象不會(huì)被鎖定。
  • 同步方法可以視為包含整個(gè)方法的synchronized(this) { ... }代碼塊竹海。
  • 靜態(tài)同步方法會(huì)鎖定它的Class對(duì)象慕蔚。
  • 內(nèi)部類的同步是獨(dú)立于外部類的。
  • synchronized修飾符并不是方法簽名的組成部分斋配,所以不能出現(xiàn)在接口的方法聲明中孔飒。
  • 非同步的方法不關(guān)心鎖的狀態(tài),它們?cè)谕椒椒ㄟ\(yùn)行時(shí)仍然可以得以運(yùn)行艰争。
  • synchronized實(shí)現(xiàn)的鎖是可重入的鎖坏瞄。

在JVM內(nèi)部,為了提高效率甩卓,同時(shí)運(yùn)行的每個(gè)線程都會(huì)有它正在處理的數(shù)據(jù)的緩存副本鸠匀,當(dāng)我們使用synchronzied進(jìn)行同步的時(shí)候,真正被同步的是在不同線程中表示被鎖定對(duì)象的內(nèi)存塊(副本數(shù)據(jù)會(huì)保持和主內(nèi)存的同步逾柿,現(xiàn)在知道為什么要用同步這個(gè)詞匯了吧)缀棍,簡(jiǎn)單的說就是在同步塊或同步方法執(zhí)行完后,對(duì)被鎖定的對(duì)象做的任何修改要在釋放鎖之前寫回到主內(nèi)存中机错;在進(jìn)入同步塊得到鎖之后爬范,被鎖定對(duì)象的數(shù)據(jù)是從主內(nèi)存中讀出來的,持有鎖的線程的數(shù)據(jù)副本一定和主內(nèi)存中的數(shù)據(jù)視圖是同步的 弱匪。

在Java最初的版本中青瀑,就有一個(gè)叫volatile的關(guān)鍵字,它是一種簡(jiǎn)單的同步的處理機(jī)制,因?yàn)楸籿olatile修飾的變量遵循以下規(guī)則:

  • 變量的值在使用之前總會(huì)從主內(nèi)存中再讀取出來斥难。
  • 對(duì)變量值的修改總會(huì)在完成之后寫回到主內(nèi)存中枝嘶。

使用volatile關(guān)鍵字可以在多線程環(huán)境下預(yù)防編譯器不正確的優(yōu)化假設(shè)(編譯器可能會(huì)將在一個(gè)線程中值不會(huì)發(fā)生改變的變量?jī)?yōu)化成常量),但只有修改時(shí)不依賴當(dāng)前狀態(tài)(讀取時(shí)的值)的變量才應(yīng)該聲明為volatile變量蘸炸。

不變模式也是并發(fā)編程時(shí)可以考慮的一種設(shè)計(jì)躬络。讓對(duì)象的狀態(tài)是不變的尖奔,如果希望修改對(duì)象的狀態(tài)搭儒,就會(huì)創(chuàng)建對(duì)象的副本并將改變寫入副本而不改變?cè)瓉淼膶?duì)象,這樣就不會(huì)出現(xiàn)狀態(tài)不一致的情況提茁,因此不變對(duì)象是線程安全的淹禾。Java中我們使用頻率極高的String類就采用了這樣的設(shè)計(jì)。如果對(duì)不變模式不熟悉茴扁,可以閱讀閻宏博士的《Java與模式》一書的第34章铃岔。說到這里你可能也體會(huì)到final關(guān)鍵字的重要意義了。

Java 5的并發(fā)編程

不管今后的Java向著何種方向發(fā)展或者滅忙峭火,Java 5絕對(duì)是Java發(fā)展史中一個(gè)極其重要的版本毁习,我們必須要感謝Doug Lea在Java 5中提供了他里程碑式的杰作java.util.concurrent包,它的出現(xiàn)讓Java的并發(fā)編程有了更多的選擇和更好的工作方式卖丸。Doug Lea的杰作主要包括以下內(nèi)容:

  • 更好的線程安全的容器
  • 線程池和相關(guān)的工具類
  • 可選的非阻塞解決方案
  • 顯示的鎖和信號(hào)量機(jī)制

原子類

Java 5中的java.util.concurrent包下面有一個(gè)atomic子包纺且,其中有幾個(gè)以Atomic打頭的類,例如AtomicInteger和AtomicLong稍浆。它們利用了現(xiàn)代處理器的特性载碌,可以用非阻塞的方式完成原子操作,代碼如下所示:

/**
 ID序列生成器
*/
public class IdGenerator {
    private final AtomicLong sequenceNumber = new AtomicLong(0);
    
    public long next() {
        return sequenceNumber.getAndIncrement(); 
    }
}

顯示鎖

基于synchronized關(guān)鍵字的鎖機(jī)制有以下問題:

  • 鎖只有一種類型衅枫,而且對(duì)所有同步操作都是一樣的作用
  • 鎖只能在代碼塊或方法開始的地方獲得嫁艇,在結(jié)束的地方釋放
  • 線程要么得到鎖,要么阻塞弦撩,沒有其他的可能性

Java 5對(duì)鎖機(jī)制進(jìn)行了重構(gòu)步咪,提供了顯示的鎖,這樣可以在以下幾個(gè)方面提升鎖機(jī)制:

  • 可以添加不同類型的鎖益楼,例如讀取鎖和寫入鎖猾漫。
  • 可以在一個(gè)方法中加鎖,在另一個(gè)方法中解鎖偏形。
  • 可以使用tryLock方式嘗試獲得鎖静袖,如果得不到鎖可以等待、回退或者干點(diǎn)別的事情俊扭,當(dāng)然也可以在超時(shí)之后放棄操作队橙。

示的鎖都實(shí)現(xiàn)了java.util.concurrent.Lock接口,主要有兩個(gè)實(shí)現(xiàn)類:

  • ReentrantLock - 比synchronized稍微靈活一些的重入鎖
  • ReentrantReadWriteLock - 在讀操作很多寫操作很少時(shí)性能更好的一種重入鎖

對(duì)于如何使用顯示鎖,可以參考我的Java面試系列文章《Java面試題集51-70》中第60題的代碼捐康。只有一點(diǎn)需要提醒仇矾,解鎖的方法unlock的調(diào)用最好能夠在finally塊中,因?yàn)檫@里是釋放外部資源最好的地方解总,當(dāng)然也是釋放鎖的最佳位置贮匕,因?yàn)椴还苷.惓?赡芏家尫诺翩i來給其他線程以運(yùn)行的機(jī)會(huì)花枫。

CountDownLatch

CountDownLatch是一種簡(jiǎn)單的同步模式刻盐,它讓一個(gè)線程可以等待一個(gè)或多個(gè)線程完成它們的工作從而避免對(duì)臨界資源并發(fā)訪問所引發(fā)的各種問題。下面借用別人的一段代碼(我對(duì)它做了一些重構(gòu))來演示CountDownLatch是如何工作的劳翰。

import java.util.concurrent.CountDownLatch;

/**
 * 工人類
 * @author 駱昊
 *
 */
class Worker {
    private String name;        // 名字
    private long workDuration;  // 工作持續(xù)時(shí)間
    
    /**
     * 構(gòu)造器
     */
    public Worker(String name, long workDuration) {
        this.name = name;
        this.workDuration = workDuration;
    }
    
    /**
     * 完成工作
     */
    public void doWork() {
        System.out.println(name + " begins to work...");
        try {
            Thread.sleep(workDuration); // 用休眠模擬工作執(zhí)行的時(shí)間
        } catch(InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println(name + " has finished the job...");
    }
}

/**
 * 測(cè)試線程
 * @author 駱昊
 *
 */
class WorkerTestThread implements Runnable {
    private Worker worker;
    private CountDownLatch cdLatch;
    
    public WorkerTestThread(Worker worker, CountDownLatch cdLatch) {
        this.worker = worker;
        this.cdLatch = cdLatch;
    }
    
    @Override
    public void run() {
        worker.doWork();        // 讓工人開始工作
        cdLatch.countDown();    // 工作完成后倒計(jì)時(shí)次數(shù)減1
    }
}

class CountDownLatchTest {

    private static final int MAX_WORK_DURATION = 5000;  // 最大工作時(shí)間
    private static final int MIN_WORK_DURATION = 1000;  // 最小工作時(shí)間

    // 產(chǎn)生隨機(jī)的工作時(shí)間
    private static long getRandomWorkDuration(long min, long max) {
        return (long) (Math.random() * (max - min) + min);
    }
    
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);   // 創(chuàng)建倒計(jì)時(shí)閂并指定倒計(jì)時(shí)次數(shù)為2
        Worker w1 = new Worker("駱昊", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
        Worker w2 = new Worker("王大錘", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
        
        new Thread(new WorkerTestThread(w1, latch)).start();
        new Thread(new WorkerTestThread(w2, latch)).start();
        
        try {
            latch.await();  // 等待倒計(jì)時(shí)閂減到0
            System.out.println("All jobs have been finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ConcurrentHashMap

ConcurrentHashMap是HashMap在并發(fā)環(huán)境下的版本敦锌,大家可能要問,既然已經(jīng)可以通過Collections.synchronizedMap獲得線程安全的映射型容器佳簸,為什么還需要ConcurrentHashMap呢乙墙?因?yàn)橥ㄟ^Collections工具類獲得的線程安全的HashMap會(huì)在讀寫數(shù)據(jù)時(shí)對(duì)整個(gè)容器對(duì)象上鎖,這樣其他使用該容器的線程無論如何也無法再獲得該對(duì)象的鎖生均,也就意味著要一直等待前一個(gè)獲得鎖的線程離開同步代碼塊之后才有機(jī)會(huì)執(zhí)行听想。實(shí)際上,HashMap是通過哈希函數(shù)來確定存放鍵值對(duì)的桶(桶是為了解決哈希沖突而引入的)马胧,修改HashMap時(shí)并不需要將整個(gè)容器鎖住汉买,只需要鎖住即將修改的“桶”就可以了。HashMap的數(shù)據(jù)結(jié)構(gòu)如下圖所示漓雅。

圖1. HashMap的數(shù)據(jù)結(jié)構(gòu)

此外录别,ConcurrentHashMap還提供了原子操作的方法,如下所示:

  • putIfAbsent:如果還沒有對(duì)應(yīng)的鍵值對(duì)映射邻吞,就將其添加到HashMap中
  • remove:如果鍵存在而且值與當(dāng)前狀態(tài)相等(equals比較結(jié)果為true)组题,則用原子方式移除該鍵值對(duì)映射
  • replace:替換掉映射中元素的原子操作

CopyOnWriteArrayList

CopyOnWriteArrayList是ArrayList在并發(fā)環(huán)境下的替代品。CopyOnWriteArrayList通過增加寫時(shí)復(fù)制語(yǔ)義來避免并發(fā)訪問引起的問題抱冷,也就是說任何修改操作都會(huì)在底層創(chuàng)建一個(gè)列表的副本崔列,也就意味著之前已有的迭代器不會(huì)碰到意料之外的修改。這種方式對(duì)于不要嚴(yán)格讀寫同步的場(chǎng)景非常有用旺遮,因?yàn)樗峁┝烁玫男阅苷匝丁S涀。M量減少鎖的使用耿眉,因?yàn)槟莿?shì)必帶來性能的下降(對(duì)數(shù)據(jù)庫(kù)中數(shù)據(jù)的并發(fā)訪問不也是如此嗎边翼?如果可以的話就應(yīng)該放棄悲觀鎖而使用樂觀鎖),CopyOnWriteArrayList很明顯也是通過犧牲空間獲得了時(shí)間(在計(jì)算機(jī)的世界里鸣剪,時(shí)間和空間通常是不可調(diào)和的矛盾组底,可以犧牲空間來提升效率獲得時(shí)間丈积,當(dāng)然也可以通過犧牲時(shí)間來減少對(duì)空間的使用)。

圖1. CopyOnWriteArrayList原理示意圖

可以通過下面兩段代碼的運(yùn)行狀況來驗(yàn)證一下CopyOnWriteArrayList是不是線程安全的容器债鸡。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class AddThread implements Runnable {
    private List<Double> list;
    
    public AddThread(List<Double> list) {
        this.list = list;
    }
    
    @Override
    public void run() {
        for(int i = 0; i < 10000; ++i) {
            list.add(Math.random());
        }
    }
}

public class Test05 {
    private static final int THREAD_POOL_SIZE = 2;

    public static void main(String[] args) {
        List<Double> list = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        es.execute(new AddThread(list));
        es.execute(new AddThread(list));
        es.shutdown();
    }
}

上面的代碼會(huì)在運(yùn)行時(shí)產(chǎn)生ArrayIndexOutOfBoundsException江滨,試一試將上面代碼25行的ArrayList換成CopyOnWriteArrayList再重新運(yùn)行。

List<Double> list = new CopyOnWriteArrayList<>();

Queue

隊(duì)列是一個(gè)無處不在的美妙概念唬滑,它提供了一種簡(jiǎn)單又可靠的方式將資源分發(fā)給處理單元(也可以說是將工作單元分配給待處理的資源,這取決于你看待問題的方式)棺弊。實(shí)現(xiàn)中的并發(fā)編程模型很多都依賴隊(duì)列來實(shí)現(xiàn)晶密,因?yàn)樗梢栽诰€程之間傳遞工作單元。

Java 5中的BlockingQueue就是一個(gè)在并發(fā)環(huán)境下非常好用的工具镊屎,在調(diào)用put方法向隊(duì)列中插入元素時(shí)惹挟,如果隊(duì)列已滿,它會(huì)讓插入元素的線程等待隊(duì)列騰出空間缝驳;在調(diào)用take方法從隊(duì)列中取元素時(shí),如果隊(duì)列為空归苍,取出元素的線程就會(huì)阻塞用狱。

圖3. BlockingQueue示意圖

可以用BlockingQueue來實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者并發(fā)模型(下一節(jié)中有介紹),當(dāng)然在Java 5以前也可以通過wait和notify來實(shí)現(xiàn)線程調(diào)度拼弃,比較一下兩種代碼就知道基于已有的并發(fā)工具類來重構(gòu)并發(fā)代碼到底好在哪里了夏伊。

  • 基于wait和notify的實(shí)現(xiàn)
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 公共常量
 * @author 駱昊
 *
 */
class Constants {
    public static final int MAX_BUFFER_SIZE = 10;
    public static final int NUM_OF_PRODUCER = 2;
    public static final int NUM_OF_CONSUMER = 3;
}

/**
 * 工作任務(wù)
 * @author 駱昊
 *
 */
class Task {
    private String id;  // 任務(wù)的編號(hào)
    
    public Task() {
        id = UUID.randomUUID().toString();
    }
    
    @Override
    public String toString() {
        return "Task[" + id + "]";
    }
}

/**
 * 消費(fèi)者
 * @author 駱昊
 *
 */
class Consumer implements Runnable {
    private List<Task> buffer;

    public Consumer(List<Task> buffer) {
        this.buffer = buffer;
    }
    
    @Override
    public void run() {
        while(true) {
            synchronized(buffer) {
                while(buffer.isEmpty()) {
                    try {
                        buffer.wait();
                    } catch(InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Task task = buffer.remove(0);
                buffer.notifyAll();
                System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
            }
        }
    }
}

/**
 * 生產(chǎn)者
 * @author 駱昊
 *
 */
class Producer implements Runnable {
    private List<Task> buffer;
    
    public Producer(List<Task> buffer) {
        this.buffer = buffer;
    }
    
    @Override
    public void run() {
        while(true) {
            synchronized (buffer) {
                while(buffer.size() >= Constants.MAX_BUFFER_SIZE) {
                    try {
                        buffer.wait();
                    } catch(InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Task task = new Task();
                buffer.add(task);
                buffer.notifyAll();
                System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
            }
        }
    }
    
}

public class Test06 {

    public static void main(String[] args) {
        List<Task> buffer = new ArrayList<>(Constants.MAX_BUFFER_SIZE);
        ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
        for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
            es.execute(new Producer(buffer));
        }
        for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
            es.execute(new Consumer(buffer));
        }
    }
}

  • 基于BlockingQueue的實(shí)現(xiàn)
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 公共常量
 * @author 駱昊
 *
 */
class Constants {
    public static final int MAX_BUFFER_SIZE = 10;
    public static final int NUM_OF_PRODUCER = 2;
    public static final int NUM_OF_CONSUMER = 3;
}

/**
 * 工作任務(wù)
 * @author 駱昊
 *
 */
class Task {
    private String id;  // 任務(wù)的編號(hào)
    
    public Task() {
        id = UUID.randomUUID().toString();
    }
    
    @Override
    public String toString() {
        return "Task[" + id + "]";
    }
}

/**
 * 消費(fèi)者
 * @author 駱昊
 *
 */
class Consumer implements Runnable {
    private BlockingQueue<Task> buffer;

    public Consumer(BlockingQueue<Task> buffer) {
        this.buffer = buffer;
    }
    
    @Override
    public void run() {
        while(true) {
            try {
                Task task = buffer.take();
                System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 生產(chǎn)者
 * @author 駱昊
 *
 */
class Producer implements Runnable {
    private BlockingQueue<Task> buffer;
    
    public Producer(BlockingQueue<Task> buffer) {
        this.buffer = buffer;
    }
    
    @Override
    public void run() {
        while(true) {
            try {
                Task task = new Task();
                buffer.put(task);
                System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }
    
}

public class Test07 {

    public static void main(String[] args) {
        BlockingQueue<Task> buffer = new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE);
        ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
        for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
            es.execute(new Producer(buffer));
        }
        for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
            es.execute(new Consumer(buffer));
        }
    }
}

使用BlockingQueue后代碼優(yōu)雅了很多。

并發(fā)模型

在繼續(xù)下面的探討之前吻氧,我們還是重溫一下幾個(gè)概念:

概念 解釋
臨界資源 并發(fā)環(huán)境中有著固定數(shù)量的資源
互斥 對(duì)資源的訪問是排他式的
饑餓 一個(gè)或一組線程長(zhǎng)時(shí)間或永遠(yuǎn)無法取得進(jìn)展
死鎖 兩個(gè)或多個(gè)線程相互等待對(duì)方結(jié)束
活鎖 想要執(zhí)行的線程總是發(fā)現(xiàn)其他的線程正在執(zhí)行以至于長(zhǎng)時(shí)間或永遠(yuǎn)無法執(zhí)行

重溫了這幾個(gè)概念后溺忧,我們可以探討一下下面的幾種并發(fā)模型。

生產(chǎn)者-消費(fèi)者

一個(gè)或多個(gè)生產(chǎn)者創(chuàng)建某些工作并將其置于緩沖區(qū)或隊(duì)列中盯孙,一個(gè)或多個(gè)消費(fèi)者會(huì)從隊(duì)列中獲得這些工作并完成之鲁森。這里的緩沖區(qū)或隊(duì)列是臨界資源。當(dāng)緩沖區(qū)或隊(duì)列放滿的時(shí)候振惰,生產(chǎn)這會(huì)被阻塞歌溉;而緩沖區(qū)或隊(duì)列為空的時(shí)候,消費(fèi)者會(huì)被阻塞骑晶。生產(chǎn)者和消費(fèi)者的調(diào)度是通過二者相互交換信號(hào)完成的痛垛。

讀者-寫者

當(dāng)存在一個(gè)主要為讀者提供信息的共享資源,它偶爾會(huì)被寫者更新桶蛔,但是需要考慮系統(tǒng)的吞吐量匙头,又要防止饑餓和陳舊資源得不到更新的問題。在這種并發(fā)模型中仔雷,如何平衡讀者和寫者是最困難的蹂析,當(dāng)然這個(gè)問題至今還是一個(gè)被熱議的問題抖剿,恐怕必須根據(jù)具體的場(chǎng)景來提供合適的解決方案而沒有那種放之四海而皆準(zhǔn)的方法(不像我在國(guó)內(nèi)的科研文獻(xiàn)中看到的那樣)。

哲學(xué)家進(jìn)餐

1965年识窿,荷蘭計(jì)算機(jī)科學(xué)家圖靈獎(jiǎng)得主Edsger Wybe Dijkstra提出并解決了一個(gè)他稱之為哲學(xué)家進(jìn)餐的同步問題斩郎。這個(gè)問題可以簡(jiǎn)單地描述如下:五個(gè)哲學(xué)家圍坐在一張圓桌周圍,每個(gè)哲學(xué)家面前都有一盤通心粉喻频。由于通心粉很滑缩宜,所以需要兩把叉子才能夾住。相鄰兩個(gè)盤子之間放有一把叉子如下圖所示甥温。哲學(xué)家的生活中有兩種交替活動(dòng)時(shí)段:即吃飯和思考锻煌。當(dāng)一個(gè)哲學(xué)家覺得餓了時(shí),他就試圖分兩次去取其左邊和右邊的叉子姻蚓,每次拿一把宋梧,但不分次序。如果成功地得到了兩把叉子狰挡,就開始吃飯捂龄,吃完后放下叉子繼續(xù)思考。

把上面問題中的哲學(xué)家換成線程加叁,把叉子換成競(jìng)爭(zhēng)的臨界資源倦沧,上面的問題就是線程競(jìng)爭(zhēng)資源的問題。如果沒有經(jīng)過精心的設(shè)計(jì)它匕,系統(tǒng)就會(huì)出現(xiàn)死鎖展融、活鎖、吞吐量下降等問題豫柬。

圖4.哲學(xué)家進(jìn)餐模型

??
下面是用信號(hào)量原語(yǔ)來解決哲學(xué)家進(jìn)餐問題的代碼告希,使用了Java 5并發(fā)工具包中的Semaphore類(代碼不夠漂亮但是已經(jīng)足以說明問題了)。

//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 存放線程共享信號(hào)量的上下問
 * @author 駱昊
 *
 */
class AppContext {
    public static final int NUM_OF_FORKS = 5;   // 叉子數(shù)量(資源)
    public static final int NUM_OF_PHILO = 5;   // 哲學(xué)家數(shù)量(線程)

    public static Semaphore[] forks;    // 叉子的信號(hào)量
    public static Semaphore counter;    // 哲學(xué)家的信號(hào)量

    static {
        forks = new Semaphore[NUM_OF_FORKS];

        for (int i = 0, len = forks.length; i < len; ++i) {
            forks[i] = new Semaphore(1);    // 每個(gè)叉子的信號(hào)量為1
        }

        counter = new Semaphore(NUM_OF_PHILO - 1);  // 如果有N個(gè)哲學(xué)家烧给,最多只允許N-1人同時(shí)取叉子
    }
    
    /**
     * 取得叉子
     * @param index 第幾個(gè)哲學(xué)家
     * @param leftFirst 是否先取得左邊的叉子
     * @throws InterruptedException
     */
    public static void putOnFork(int index, boolean leftFirst) throws InterruptedException {
        if(leftFirst) {
            forks[index].acquire();
            forks[(index + 1) % NUM_OF_PHILO].acquire();
        }
        else {
            forks[(index + 1) % NUM_OF_PHILO].acquire();
            forks[index].acquire();
        }
    }
    
    /**
     * 放回叉子
     * @param index 第幾個(gè)哲學(xué)家
     * @param leftFirst 是否先放回左邊的叉子
     * @throws InterruptedException
     */
    public static void putDownFork(int index, boolean leftFirst) throws InterruptedException {
        if(leftFirst) {
            forks[index].release();
            forks[(index + 1) % NUM_OF_PHILO].release();
        }
        else {
            forks[(index + 1) % NUM_OF_PHILO].release();
            forks[index].release();
        }
    }
}

/**
 * 哲學(xué)家
 * @author 駱昊
 *
 */
class Philosopher implements Runnable {
    private int index;      // 編號(hào)
    private String name;    // 名字

    public Philosopher(int index, String name) {
        this.index = index;
        this.name = name;
    }

    @Override
    public void run() {
        while(true) {
            try {
                AppContext.counter.acquire();
                boolean leftFirst = index % 2 == 0;
                AppContext.putOnFork(index, leftFirst);
                System.out.println(name + "正在吃意大利面(通心粉)...");   // 取到兩個(gè)叉子就可以進(jìn)食
                AppContext.putDownFork(index, leftFirst);
                AppContext.counter.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Test04 {
    
    public static void main(String[] args) {
        String[] names = { "駱昊", "王大錘", "張三豐", "楊過", "李莫愁" };   // 5位哲學(xué)家的名字
//      ExecutorService es = Executors.newFixedThreadPool(AppContext.NUM_OF_PHILO); // 創(chuàng)建固定大小的線程池
//      for(int i = 0, len = names.length; i < len; ++i) {
//          es.execute(new Philosopher(i, names[i]));   // 啟動(dòng)線程
//      }
//      es.shutdown();
        for(int i = 0, len = names.length; i < len; ++i) {
            new Thread(new Philosopher(i, names[i])).start();
        }
    }

}

現(xiàn)實(shí)中的并發(fā)問題基本上都是這三種模型或者是這三種模型的變體燕偶。

測(cè)試并發(fā)代碼

對(duì)并發(fā)代碼的測(cè)試也是非常棘手的事情,棘手到無需說明大家也很清楚的程度创夜,所以這里我們只是探討一下如何解決這個(gè)棘手的問題杭跪。我們建議大家編寫一些能夠發(fā)現(xiàn)問題的測(cè)試并經(jīng)常性的在不同的配置和不同的負(fù)載下運(yùn)行這些測(cè)試。不要忽略掉任何一次失敗的測(cè)試驰吓,線程代碼中的缺陷可能在上萬(wàn)次測(cè)試中僅僅出現(xiàn)一次涧尿。具體來說有這么幾個(gè)注意事項(xiàng):

  • 不要將系統(tǒng)的失效歸結(jié)于偶發(fā)事件,就像拉不出屎的時(shí)候不能怪地球沒有引力檬贰。
  • 先讓非并發(fā)代碼工作起來姑廉,不要試圖同時(shí)找到并發(fā)和非并發(fā)代碼中的缺陷。
  • 編寫可以在不同配置環(huán)境下運(yùn)行的線程代碼翁涤。
  • 編寫容易調(diào)整的線程代碼桥言,這樣可以調(diào)整線程使性能達(dá)到最優(yōu)萌踱。
  • 讓線程的數(shù)量多于CPU或CPU核心的數(shù)量,這樣CPU調(diào)度切換過程中潛在的問題才會(huì)暴露出來号阿。
  • 讓并發(fā)代碼在不同的平臺(tái)上運(yùn)行并鸵。
  • 通過自動(dòng)化或者硬編碼的方式向并發(fā)代碼中加入一些輔助測(cè)試的代碼。

Java 7的并發(fā)編程

Java 7中引入了TransferQueue扔涧,它比BlockingQueue多了一個(gè)叫transfer的方法园担,如果接收線程處于等待狀態(tài),該操作可以馬上將任務(wù)交給它枯夜,否則就會(huì)阻塞直至取走該任務(wù)的線程出現(xiàn)弯汰。可以用TransferQueue代替BlockingQueue湖雹,因?yàn)樗梢垣@得更好的性能咏闪。

剛才忘記了一件事情,Java 5中還引入了Callable接口摔吏、Future接口和FutureTask接口鸽嫂,通過他們也可以構(gòu)建并發(fā)應(yīng)用程序,代碼如下所示舔腾。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class Test07 {
    private static final int POOL_SIZE = 10;

    static class CalcThread implements Callable<Double> {
        private List<Double> dataList = new ArrayList<>();
        
        public CalcThread() {
            for(int i = 0; i < 10000; ++i) {
                dataList.add(Math.random());
            }
        }
        
        @Override
        public Double call() throws Exception {
            double total = 0;
            for(Double d : dataList) {
                total += d;
            }
            return total / dataList.size();
        }
    
    }
    
    public static void main(String[] args) {
        List<Future<Double>> fList = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(POOL_SIZE);
        for(int i = 0; i < POOL_SIZE; ++i) {
            fList.add(es.submit(new CalcThread()));
        }
        
        for(Future<Double> f : fList) {
            try {
                System.out.println(f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        es.shutdown();
    }
}

Callable接口也是一個(gè)單方法接口溪胶,顯然這是一個(gè)回調(diào)方法,類似于函數(shù)式編程中的回調(diào)函數(shù)稳诚,在Java 8 以前,Java中還不能使用Lambda表達(dá)式來簡(jiǎn)化這種函數(shù)式編程瀑踢。和Runnable接口不同的是Callable接口的回調(diào)方法call方法會(huì)返回一個(gè)對(duì)象扳还,這個(gè)對(duì)象可以用將來時(shí)的方式在線程執(zhí)行結(jié)束的時(shí)候獲得信息。上面代碼中的call方法就是將計(jì)算出的10000個(gè)0到1之間的隨機(jī)小數(shù)的平均值返回橱夭,我們通過一個(gè)Future接口的對(duì)象得到了這個(gè)返回值氨距。目前最新的Java版本中,Callable接口和Runnable接口都被打上了@FunctionalInterface的注解棘劣,也就是說它可以用函數(shù)式編程的方式(Lambda表達(dá)式)創(chuàng)建接口對(duì)象俏让。

下面是Future接口的主要方法:

  • get():獲取結(jié)果。如果結(jié)果還沒有準(zhǔn)備好茬暇,get方法會(huì)阻塞直到取得結(jié)果首昔;當(dāng)然也可以通過參數(shù)設(shè)置阻塞超時(shí)時(shí)間
  • cancel():在運(yùn)算結(jié)束前取消
  • isDone():可以用來判斷運(yùn)算是否結(jié)束

Java 7中還提供了分支/合并(fork/join)框架,它可以實(shí)現(xiàn)線程池中任務(wù)的自動(dòng)調(diào)度糙俗,并且這種調(diào)度對(duì)用戶來說是透明的勒奇。為了達(dá)到這種效果,必須按照用戶指定的方式對(duì)任務(wù)進(jìn)行分解巧骚,然后再將分解出的小型任務(wù)的執(zhí)行結(jié)果合并成原來任務(wù)的執(zhí)行結(jié)果赊颠。這顯然是運(yùn)用了分治法(divide-and-conquer)的思想格二。下面的代碼使用了分支/合并框架來計(jì)算1到10000的和,當(dāng)然對(duì)于如此簡(jiǎn)單的任務(wù)根本不需要分支/合并框架竣蹦,因?yàn)榉种Ш秃喜⒈旧硪矔?huì)帶來一定的開銷顶猜,但是這里我們只是探索一下在代碼中如何使用分支/合并框架,讓我們的代碼能夠充分利用現(xiàn)代多核CPU的強(qiáng)大運(yùn)算能力痘括。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

class Calculator extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 7333472779649130114L;

    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public Integer compute() {
        int sum = 0;
        if ((end - start) < THRESHOLD) {    // 當(dāng)問題分解到可求解程度時(shí)直接計(jì)算結(jié)果
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) >>> 1;
            // 將任務(wù)一分為二
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            left.fork();
            right.fork();
            // 注意:由于此處是遞歸式的任務(wù)分解长窄,也就意味著接下來會(huì)二分為四,四分為八...

            sum = left.join() + right.join();   // 合并兩個(gè)子任務(wù)的結(jié)果
        }
        return sum;
    }

}

public class Test08 {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> result = forkJoinPool.submit(new Calculator(1, 10000));
        System.out.println(result.get());
    }
}

伴隨著Java 7的到來远寸,Java中默認(rèn)的數(shù)組排序算法已經(jīng)不再是經(jīng)典的快速排序(雙樞軸快速排序)了抄淑,新的排序算法叫TimSort,它是歸并排序和插入排序的混合體驰后,TimSort可以通過分支合并框架充分利用現(xiàn)代處理器的多核特性肆资,從而獲得更好的性能(更短的排序時(shí)間)。

參考文獻(xiàn)

  1. Benjamin J. Evans, etc, The Well-Grounded Java Developer. Jul 21, 2012
  2. Robert Martin, Clean Code. Aug 11, 2008.
  3. Doug Lea, Concurrent Programming in Java: Design Principles and Patterns. 1999
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末灶芝,一起剝皮案震驚了整個(gè)濱河市郑原,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌夜涕,老刑警劉巖犯犁,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異女器,居然都是意外死亡酸役,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門驾胆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涣澡,“玉大人,你說我怎么就攤上這事丧诺∪牍穑” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵驳阎,是天一觀的道長(zhǎng)抗愁。 經(jīng)常有香客問我,道長(zhǎng)呵晚,這世上最難降的妖魔是什么蜘腌? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮劣纲,結(jié)果婚禮上逢捺,老公的妹妹穿的比我還像新娘。我一直安慰自己癞季,他們只是感情好劫瞳,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布倘潜。 她就那樣靜靜地躺著,像睡著了一般志于。 火紅的嫁衣襯著肌膚如雪涮因。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天伺绽,我揣著相機(jī)與錄音养泡,去河邊找鬼。 笑死奈应,一個(gè)胖子當(dāng)著我的面吹牛澜掩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播杖挣,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼肩榕,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了惩妇?” 一聲冷哼從身側(cè)響起株汉,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎歌殃,沒想到半個(gè)月后乔妈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡氓皱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年路召,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片波材。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡优训,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出各聘,到底是詐尸還是另有隱情,我是刑警寧澤抡医,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布躲因,位于F島的核電站,受9級(jí)特大地震影響忌傻,放射性物質(zhì)發(fā)生泄漏大脉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一水孩、第九天 我趴在偏房一處隱蔽的房頂上張望镰矿。 院中可真熱鬧,春花似錦俘种、人聲如沸秤标。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)苍姜。三九已至牢酵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間衙猪,已是汗流浹背馍乙。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留垫释,地道東北人丝格。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像棵譬,于是被迫代替她去往敵國(guó)和親显蝌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-01】 更新日志 日期更新內(nèi)容備注2017-11-01新建文章V1...
    一字馬胡閱讀 7,294評(píng)論 9 133
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 5,798評(píng)論 1 19
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語(yǔ)法茫船,類相關(guān)的語(yǔ)法琅束,內(nèi)部類的語(yǔ)法,繼承相關(guān)的語(yǔ)法算谈,異常的語(yǔ)法涩禀,線程的語(yǔ)...
    子非魚_t_閱讀 31,581評(píng)論 18 399
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司然眼,掛了不少艾船,但最終還是拿到小米、百度高每、阿里屿岂、京東、新浪鲸匿、CVTE爷怀、樂視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,184評(píng)論 11 349
  • 清末民出年間,天下大亂带欢,天地間一片殺戮运授,血腥味彌漫在整個(gè)華夏大地,清軍和洋軍整日的廝殺讓人民苦不堪言乔煞,終于…… ...
    SVIP閱讀 285評(píng)論 0 0