Java—并發(fā)編程指南

并發(fā)編程是提高程序運(yùn)行效率與響應(yīng)速度的重要手段,在多CPU條件下字管,并發(fā)編程可以使硬件得到更大程度的運(yùn)用黄橘。由于在并發(fā)環(huán)境下CPU隨時(shí)會(huì)對(duì)多線程的運(yùn)行進(jìn)行調(diào)度,因此線程中各指令執(zhí)行的順序是不確定的悼沈,出現(xiàn)問題時(shí)也難以復(fù)現(xiàn)和定位贱迟。如果開發(fā)人員了解并發(fā)的原理,就能在有并發(fā)問題隱患的地方妥善處理來規(guī)避風(fēng)險(xiǎn)絮供。

并發(fā)的知識(shí)體系很龐大衣吠,涉及到內(nèi)存模型、并發(fā)容器杯缺、線程池等一系列知識(shí)點(diǎn)蒸播,優(yōu)秀并發(fā)程序?qū)π阅芘c活躍性也有較高的要求,因此想要吃透并發(fā)并不是一件容易的事萍肆。想要寫好并發(fā)程序袍榆,不僅需要對(duì)并發(fā)的原理有所了解,更需要工程上的實(shí)踐塘揣,萬丈高樓平地起包雀,下面讓我們來一起探索并發(fā)編程。

一亲铡、原子性才写、可見性、順序性問題

1.1 原子性

提到并發(fā)奖蔓,有一個(gè)很經(jīng)典的例子就是同時(shí)啟動(dòng)2個(gè)線程對(duì)一個(gè)數(shù)循環(huán)+1赞草,程序如下所示。線程t1和t2都會(huì)循環(huán)1萬次a++吆鹤,理想狀態(tài)下a最終的值應(yīng)該是20000厨疙,但是運(yùn)行之后a的值總是小于20000,并且每次的結(jié)果都不盡相同疑务,這是為什么呢沾凄?

public class Test {
    public static int a = 0;
    public static void main(String[] args) {
        Runnable r = () -> {
            for (int i = 0; i < 10000; i++) {
                a++;
            }
        };
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);
        t1.start();
        t2.start();
        try {
            // 程序本身運(yùn)行在主線程, 這里需要讓主線程等待t1和t2運(yùn)行完再獲取a的值
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("a: " + a);
    }
}

這里涉及到2個(gè)問題:

  • 一是原子性問題。先明確原子操作的定義:如果某個(gè)操作要么不執(zhí)行知允,要么完全執(zhí)行且不會(huì)被打斷撒蟀,則將其稱為原子操作。上述程序中我們將a++當(dāng)成了原子操作温鸽,而它實(shí)際由多個(gè)原子操作組成保屯,本身并不是原子操作。
  • 二是可見性問題。線程t1修改了a的值后配椭,t2可能感知不到虫溜。

可見性問題1.2中會(huì)講,這里我們先討論原子性問題股缸。a++看起來是一個(gè)原子操作衡楞,而實(shí)際上a++需要3條CPU指令完成,這里的CPU指令才具備原子性敦姻。

指令1: 將變量a從內(nèi)存加載到CPU寄存器
指令2: 寄存器修改a的值為a+1
指令3: 將結(jié)果寫入內(nèi)存

并發(fā)情況下瘾境,線程切換可能發(fā)生在任一CPU指令執(zhí)行完的時(shí)候。例如當(dāng)t1和t2一起運(yùn)行時(shí)镰惦,可能出現(xiàn)下面的情況迷守。

t1線程讀到a的值為0
t1線程對(duì)寄存器中的值+1得到1
----------線程切換----------
t2線程從內(nèi)存中讀到a的值為0
t2線程對(duì)寄存器的值+1得到1
t2線程將1寫入內(nèi)存
----------線程切換----------
t1線程將1寫入內(nèi)存

可以發(fā)現(xiàn)在并發(fā)環(huán)境下,某個(gè)線程從內(nèi)存中讀取到的值可能不是最新數(shù)據(jù)旺入,這也解釋了為什么程序的結(jié)果總是小于20000兑凿。要解決這個(gè)問題,只需要將a++變?yōu)樵硬僮骷纯梢瘃ㄟ^synchronized關(guān)鍵字即可使某塊代碼具備原子性礼华,該代碼塊也被稱為同步代碼塊,如下所示拗秘。

synchronized (Test.class) {
    a++;
}

該同步代碼塊以Test.class作為互斥鎖圣絮,任何線程在進(jìn)入該代碼塊之前需要先獲取該鎖,如果獲取不到則需等待當(dāng)前持有該鎖的線程釋放鎖雕旨“缃常可以理解為,鎖是用來保護(hù)共享變量的凡涩,這里的Test.class就是用來保護(hù)共享變量a同一時(shí)刻只能被一個(gè)線程訪問的鎖棒搜。

那么如果新增一個(gè)與a無關(guān)的共享變量b,是否也可以使用Test.class來保護(hù)呢活箕?
答案是否定的帮非,如果用同一個(gè)鎖來保護(hù)它們,那么不管修改a或b之前都要得到Test.class這個(gè)鎖讹蘑,導(dǎo)致訪問a的同時(shí)不能訪問b,但這兩個(gè)變量并無關(guān)聯(lián)筑舅,反而降低了運(yùn)行效率座慰。

不過可以通過另一個(gè)鎖來保護(hù)b,由于Java中的任何對(duì)象都可以作為鎖來使用翠拣,所以可以直接新建一個(gè)final對(duì)象來保護(hù)b版仔,如下所示。

public class Test {
    public static int b = 0;
    public static final Object lock = new Object();

    public static void main(String[] args) {
        Runnable r = () -> {
            for (int i = 0; i < 10000; i++) {
                synchronized (lock) {
                    b++;
                }
            }
        };
        ......
    }
}

1.2 可見性與CPU緩存

可見性問題是指在并發(fā)條件下,一個(gè)線程修改共享變量后蛮粮,另一個(gè)線程無法感知到共享變量的變化益缎。該問題是由CPU緩存引起的,由于CPU與內(nèi)存的運(yùn)行速度相差太多然想,為了平衡這個(gè)差距莺奔,CPU引入了高速緩存cache作為中間層,當(dāng)CPU從內(nèi)存中讀取數(shù)據(jù)時(shí)會(huì)將其讀入cache中变泄,之后CPU不用每次讀取內(nèi)存令哟,而是直接操作cache中的數(shù)據(jù),最后在合適的時(shí)機(jī)際將cache中的數(shù)據(jù)寫入內(nèi)存妨蛹,當(dāng)然這個(gè)時(shí)機(jī)對(duì)開發(fā)人員來說是不可控的屏富。

在單核CPU條件下,CPU整體只有一個(gè)cache蛙卤,因此多線程操作的也是同一個(gè)cache狠半,不會(huì)出現(xiàn)可見性問題。而在多核CPU條件下颤难,每核CPU都對(duì)應(yīng)一個(gè)cache神年,很有可能兩個(gè)線程讀寫的是各自的cache,由此產(chǎn)生了可見性問題乐严。

下面用一個(gè)例子說明可見性問題瘤袖,子線程t1在flag為true時(shí)會(huì)一直運(yùn)行,但是主線程會(huì)在500ms后將flag改為false昂验,看起來子線程只會(huì)運(yùn)行500ms捂敌,但事實(shí)是子線程會(huì)一直運(yùn)行下去。換句話說既琴,主線程修改flag的行為對(duì)子線程來說不可見占婉。

public class Test {
    private static boolean flag = true;
    private static int i = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable r1 = () -> {
            while (flag) {
                i++;
            }
            System.out.println("子線程結(jié)束");
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread.sleep(500);
        flag = false;
        System.out.println("Main Thread結(jié)束");
    }
}

要解決這個(gè)問題,使用volatile關(guān)鍵字修飾flag變量即可甫恩。

volatile關(guān)鍵字表示一個(gè)變量是易變的逆济,從抽象的角度明確了該變量的可見性。作為高級(jí)語言關(guān)鍵字磺箕,volatile屏蔽了底層硬件的不一致性奖慌,在不同的環(huán)境下,volatile關(guān)鍵字在底層可能具有不同的實(shí)現(xiàn)松靡,但是作為高級(jí)語言的開發(fā)者简僧,我們只需要理解volatile在抽象層面的定義即可。

之前提到雕欺,CPU高速緩存中的數(shù)據(jù)會(huì)在合適的時(shí)機(jī)被寫入內(nèi)存岛马,那么上面程序中的flag變量即使不加volatile棉姐,主線程對(duì)flag變量的修改也會(huì)在一段時(shí)間后寫入內(nèi)存。那為什么子線程一直感知不到flag的變化呢啦逆?

我們有理由猜測伞矩,子線程在while(flag)循環(huán)中讀取的一直是CPU緩存中的flag變量,而沒有從內(nèi)存中重新獲取夏志。為了驗(yàn)證這個(gè)猜測乃坤,嘗試在while(flag)循環(huán)中加上一些代碼,如下所示盲镶。

public class Test {
    private static boolean flag = true;
    private static int i = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable r1 = () -> {
            while (flag) {
                i++;
                try {
                    Thread.sleep(50);
                    System.out.println("......");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("子線程結(jié)束");
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread.sleep(500);
        flag = false;
        System.out.println("Main Thread結(jié)束");
    }
}

運(yùn)行后發(fā)現(xiàn)子線程運(yùn)行約500ms后停止侥袜,表示子線程在某個(gè)時(shí)機(jī)獲取到了內(nèi)存中flag的值,印證了我們的猜測溉贿。這表明如果不加volatile關(guān)鍵字枫吧,雖然cache的值與內(nèi)存的值也會(huì)同步,但同步的時(shí)機(jī)是不確定的宇色,這也是很多并發(fā)bug難以溯源的原因九杂。

這時(shí)候你可能會(huì)對(duì)1.1的例子產(chǎn)生疑惑,雖然1.1中使用synchronized關(guān)鍵字使a++具備了原子性宣蠕,不過并沒有使用volatile修飾變量a例隆,如果a不具備可見性的話,最終的結(jié)果也應(yīng)該小于預(yù)期值才對(duì)抢蚀。

但實(shí)際情況是镀层,該程序最終的結(jié)果都是正確的,這與Java內(nèi)存模型(JMM)有關(guān)皿曲,內(nèi)存模型保證原子操作中的變量具備可見性唱逢,第2節(jié)會(huì)闡述JMM相關(guān)內(nèi)容。

1.3 順序性與指令重排

指令重排是指編譯器為了提高程序的運(yùn)行效率屋休,在不影響運(yùn)行結(jié)果的前提下對(duì)CPU的指令重新排序坞古。即使指令之間存在依賴關(guān)系居凶,編譯器也會(huì)保證運(yùn)行結(jié)果不受影響揩局。

在單線程運(yùn)行環(huán)境下,指令重排確實(shí)不會(huì)影響運(yùn)行結(jié)果凹髓,但在多線程環(huán)境下它是存在一定隱患的叠艳。拿雙重判空的單例模式來舉例奶陈,其代碼如下。假設(shè)A線程和B線程同時(shí)運(yùn)行到同步代碼塊附较,A線程成功獲取到鎖并新建Singleton尿瞭,A線程釋放鎖后B線程搶占到鎖進(jìn)入同步代碼塊,隨后B線程發(fā)現(xiàn)單例已經(jīng)初始化就退出翅睛。如果沒有指令重排声搁,這一段代碼確實(shí)沒有任何問題。

public class Singleton {
    private static Singleton sInstance;
    public static Singleton getInstance() {
        if (sInstance == null) {
            synchronized(Singleton.class) {
                if (sInstance == null)
                    sInstance = new Singleton();
                }
            }
        return instance;
    }
}

但是sInstance= new Singleton()是可能會(huì)被指令重排的捕发,正常來說這句代碼的指令的執(zhí)行順序是這樣的疏旨。

指令1: 分配一塊內(nèi)存
指令2: 在內(nèi)存上初始化Singleton對(duì)象
指令3: 將內(nèi)存地址賦值給sInstance變量

指令重排后的執(zhí)行順序可能是這樣的。

指令1: 分配一塊內(nèi)存
指令2: 將內(nèi)存的地址賦值給sInstance變量
指令3: 在內(nèi)存上初始化Singleton對(duì)象

假設(shè)A線程執(zhí)行到指令2處時(shí)被掛起扎酷,而B線程進(jìn)入getInstance()方法檐涝,在第一個(gè)判空處發(fā)現(xiàn)sInstance不為空就會(huì)直接返回,但此時(shí)sInstance指向的內(nèi)存并沒有初始化法挨,訪問sInstance時(shí)很可能出現(xiàn)問題谁榜。因此需要使用volatile修飾sInstance來禁止相關(guān)的指令重排。

那么有沒有一種更簡單的實(shí)現(xiàn)單例的方法呢凡纳?自然是有的窃植,如果一個(gè)單例對(duì)象在系統(tǒng)運(yùn)行中肯定會(huì)被使用,在類初始化的時(shí)候直接新建單例的實(shí)例對(duì)象即可荐糜,不用考慮并發(fā)情況巷怜,其實(shí)現(xiàn)如下。

public class Singleton {
    private static class Holder {
        private static Singleton INSTANCE = new Singleton();
    }

    private Singleton() {
    }

    public static Singleton getInstance() {
        return Holder.INSTANCE;
    }

二暴氏、Java內(nèi)存模型

并發(fā)場景下的原子性延塑、可見性和順序性問題會(huì)導(dǎo)致程序的運(yùn)行結(jié)果不可預(yù)測,為了解決這些問題答渔,Java內(nèi)存模型(JMM)提供了volatile和synchronized關(guān)鍵字关带,其中volatile關(guān)鍵字用于實(shí)現(xiàn)共享變量的可見性與限制重排序,synchronized關(guān)鍵字用于實(shí)現(xiàn)代碼塊的原子性沼撕。JMM提供了6條Happens-Before規(guī)則來描述這兩個(gè)關(guān)鍵字的作用宋雏,以便給開發(fā)人員提供指導(dǎo)。

Happens-Before是指端朵,如果操作A Happens-Before 操作B好芭,則表示在內(nèi)存順序上,A的結(jié)果對(duì)B是可見的冲呢。Happens-Before的具體規(guī)則如下舍败。

① 對(duì)synchronized互斥鎖的解鎖Happens-Before對(duì)該互斥鎖的后續(xù)加鎖
② 對(duì)volatile變量的寫操作Happens-Before對(duì)該變量的后續(xù)讀操作
③ 主線程調(diào)用子線程start()方法前的操作Happens-Before該子線程的start()方法
④ 一個(gè)線程內(nèi)的所有操作Happens-Before其他線程成功join()該線程
⑤ 任意對(duì)象的默認(rèn)初始化Happens-Before程序的其他操作
Happens-Before具有可傳遞性,如果A Happens-Before B敬拓,B Happens-Before C邻薯,則A Happens-Before C

回頭看1.1,代碼中使用synchronized關(guān)鍵字使a++具備了原子性乘凸,同時(shí)a變量也具備了可見性厕诡。這是因?yàn)榛コ怄i的解鎖Happens-Before后續(xù)加鎖,因此之后拿到互斥鎖的線程都知道上個(gè)線程對(duì)a的操作結(jié)果营勤。

三灵嫌、互斥鎖

互斥鎖用于保證代碼塊的原子性壹罚,實(shí)際是用于保護(hù)一個(gè)或一系列共享變量同一時(shí)刻只能被一個(gè)線程訪問。當(dāng)某個(gè)線程進(jìn)入同步代碼塊時(shí)寿羞,該線程需要先獲取同步代碼塊的鎖猖凛,退出同步代碼塊時(shí)則釋放鎖。如果線程嘗試獲取鎖時(shí)發(fā)現(xiàn)鎖已被占用绪穆,則該線程被掛起辨泳,直到獲取鎖之后再進(jìn)入運(yùn)行態(tài)。

Java提供了synchronized和Lock兩種互斥鎖的實(shí)現(xiàn)玖院。synchronized是Java關(guān)鍵字菠红,屬于語言特性,但是在Java6之前它的效率并不高难菌。Lock系列由并發(fā)大師Doug Lea編寫试溯,在Java5時(shí)加入Java并發(fā)包。

3.1 synchronized

3.1.1 基本使用

synchronized用于修飾方法或代碼塊扔傅,它隱式地實(shí)現(xiàn)了加鎖/解鎖操作耍共,線程進(jìn)入方法或代碼塊時(shí)會(huì)自動(dòng)加鎖,退出時(shí)會(huì)自動(dòng)解鎖猎塞。其使用示例如下试读。

public class Sample {
    private static final Object lock = new Object();

    // 1. 修飾代碼塊
    public void fun1() {
        synchronized(lock) {
            // ......
        }
    }

    // 2. 修飾非靜態(tài)方法
    public synchronized void fun2() {
      // ......
    }

    // 3. 修飾靜態(tài)方法
    public synchronized static void fun3() {
        // ......
    }
}  

可以發(fā)現(xiàn)synchronized只有在修飾代碼塊時(shí)才需要指定互斥鎖,而修飾方法時(shí)的互斥鎖是Java隱性添加的荠耽。其規(guī)則為:修飾static方法時(shí)钩骇,互斥鎖為當(dāng)前類的class對(duì)象,也就是例子中的Sample.class铝量;而修飾非static方法時(shí)倘屹,互斥鎖為當(dāng)前的對(duì)象實(shí)例。

3.1.2 鎖的選擇

首先需要明確什么樣的對(duì)象適合作為互斥鎖慢叨,由于互斥鎖是用于保護(hù)共享變量的纽匙,那么互斥鎖的生命周期應(yīng)該與它保護(hù)的共享變量一致,且在運(yùn)行過程中不可再被賦值拍谐。

synchronized修飾方法時(shí)隱式添加的鎖就遵循了這樣的規(guī)則烛缔,當(dāng)synchronized修飾靜態(tài)方法時(shí),它的鎖為當(dāng)前類的class對(duì)象轩拨,該對(duì)象在程序開始運(yùn)行時(shí)就被創(chuàng)建践瓷,且不可更改。

以下面的程序?yàn)槔鋈兀瑂taticList作為靜態(tài)變量晕翠,它的生命周期是整個(gè)程序,與互斥鎖Sample.class的生命周期相同砍濒。

public class Sample {
    private static List<String> staticList = new ArrayList<>();

    public synchronized static void addString(String s) {
        staticList.add(s);
    }
}  

當(dāng)synchronized修飾非靜態(tài)方法時(shí)淋肾,它的鎖為當(dāng)前對(duì)象this硫麻。因?yàn)榉庆o態(tài)方法用于操作非靜態(tài)成員,例如下面示例中的mList樊卓,而非靜態(tài)成員的生命周期就是當(dāng)前對(duì)象this的生命周期庶香。

public class Sample {
    private List<String> mList;

    public Sample() {
        mList = new ArrayList<>();
    }

    public synchronized void addString(String s) {
        mList.add(s);
    }
}  

需要注意的是,使用synchronized修飾static方法時(shí)简识,該類所有static方法的互斥鎖都是同一個(gè)對(duì)象,所以該類所有static方法都是互斥的感猛。如果各個(gè)static方法中需要保護(hù)的資源不一樣七扰,這樣只會(huì)影響運(yùn)行效率,使用synchronized修飾非靜態(tài)方法時(shí)同理陪白。

下方程序就使用了Sample.class這把鎖保護(hù)了兩個(gè)不同的資源颈走,那么該如何修改呢?

public class Sample {
    private static List<String> staticList1 = new ArrayList<>();
    private static List<String> staticList2 = new ArrayList<>();
    
    public synchronized static void addString1(String s) {
        staticList1.add(s);
    }

    public synchronized static void addString2(String s) {
        staticList2.add(s);
    }
}  

一般來說咱士,互斥鎖應(yīng)與被保護(hù)的資源一一對(duì)應(yīng)立由,最簡單的方法就是將被保護(hù)的對(duì)象本身作為互斥鎖,如下所示序厉。

public class Sample {
    private static final List<String> staticList1 = new ArrayList<>();
    private static final List<String> staticList2 = new ArrayList<>();

    public static void addString1(String s) {
        synchronized (staticList1) {
            staticList1.add(s);
        }
    }

    public static void addString2(String s) {
        synchronized (staticList2) {
            staticList2.add(s);
        }
    }
}

可以發(fā)現(xiàn)新的示例中锐膜,我們使用final修飾了staticList1和staticList2,這是為什么呢弛房?
這涉及到對(duì)線程加鎖的原理道盏,當(dāng)程序?qū)€程加鎖時(shí),實(shí)際是在鎖對(duì)象中寫入了該線程的id文捶,因此鎖對(duì)象在運(yùn)行期間不該被重新賦值荷逞。如果在運(yùn)行期間鎖被重新賦值(例如上方的staticList1指向了一個(gè)新的對(duì)象),就相當(dāng)于使用多個(gè)鎖保護(hù)一個(gè)資源粹排,無法達(dá)到互斥的目的种远。而被final修飾的對(duì)象是必須被初始化的,且無法被重新賦值顽耳,滿足作為鎖的條件坠敷。

3.1.3 線程"等待-通知"機(jī)制

考慮如下程序,當(dāng)fun()方法中canExecute()這個(gè)運(yùn)行條件不滿足時(shí)斧抱,我們可能會(huì)通過循環(huán)等待的方式常拓,直到canExecute()方法返回true后再運(yùn)行接下來的邏輯。

public class Sample {
    private boolean execute = false;
    
    public void fun() throws InterruptedException {
        while (!canExecute()) {
            Thread.sleep(10);
        }
        // ......
    }

    private synchronized boolean canExecute() {
        return execute;
    }

    public synchronized void setExecute() {
        execute = true;
    }
}

問題在于辉浦,線程睡眠的時(shí)間是一個(gè)固定的值弄抬。如果該值太大,線程無法在運(yùn)行條件滿足后的第一時(shí)間開始運(yùn)行宪郊;如果該值太小掂恕,線程會(huì)不斷地調(diào)用同步方法canExecute()判斷當(dāng)前的狀態(tài)拖陆,浪費(fèi)了運(yùn)行資源。那么有沒有一種方法懊亡,能夠使不滿足運(yùn)行條件的線程進(jìn)入休眠狀態(tài)(該狀態(tài)下線程不消耗運(yùn)行資源)依啰,而在滿足條件后的第一時(shí)間開始運(yùn)行呢?這就是線程的"等待-通知"機(jī)制店枣。

Java所有的對(duì)象都有wait()速警、notify()notifyAll()這3個(gè)方法,這是對(duì)象作為互斥鎖時(shí)所使用的方法鸯两,它們與synchronized共同實(shí)現(xiàn)了線程的"等待-通知"機(jī)制闷旧,需要注意的是,這3個(gè)方法必須在synchronized代碼塊中執(zhí)行钧唐。下面介紹這3個(gè)方法的作用忙灼。

  1. wait(): 用于使當(dāng)前線程進(jìn)入休眠狀態(tài)。如果當(dāng)前的運(yùn)行條件未滿足钝侠,可以調(diào)用互斥鎖的wait()方法主動(dòng)放棄互斥鎖并進(jìn)入休眠態(tài)该园,隨后該線程會(huì)進(jìn)入互斥鎖的等待隊(duì)列中,直到被其他線程喚醒帅韧。
  2. notify(): 用于喚醒互斥鎖等待隊(duì)列中的一個(gè)線程里初。當(dāng)?shù)却?duì)列中的線程的運(yùn)行條件被滿足時(shí),可以調(diào)用notify()方法隨機(jī)喚醒等待隊(duì)列中的一個(gè)線程弱匪,隨后被喚醒的線程去爭奪互斥鎖青瀑。
  3. notifyAll(): 喚醒互斥鎖等待隊(duì)列中的所有線程。由于notify()方法喚醒線程時(shí)是隨機(jī)的萧诫,可能喚醒的并不是真正滿足運(yùn)行條件的線程斥难,因此實(shí)際開發(fā)中基本只用notifyAll(),讓等待隊(duì)列中的所有線程去爭奪互斥鎖帘饶。

舉個(gè)簡單的例子哑诊,代碼如下所示。
線程t1運(yùn)行tryExecute()方法及刻,發(fā)現(xiàn)mShouldExecute為false镀裤,調(diào)用wait()主動(dòng)進(jìn)入休眠態(tài)。線程t2睡眠3秒后將mShouldExecute設(shè)置為true并調(diào)用notifyAll()通知等待隊(duì)列中的線程缴饭,t1被喚醒后發(fā)現(xiàn)運(yùn)行條件滿足暑劝,繼續(xù)執(zhí)行。

public class Sample {
    private boolean mShouldExecute = false;

    public void tryExecute() throws InterruptedException {
        synchronized (this) {
            while (!mShouldExecute) {
                System.out.println("tryExecute wait...");
                wait();
            }
            realExecute();
        }
    }

    private void realExecute() {
        System.out.println("realExecute");
    }

    public void setExecute() {
        synchronized (this) {
            mShouldExecute = true;
            notifyAll();
        }
    }

    public static void main(String[] args) {
        Sample sample = new Sample();
        Thread t1 = new Thread(() -> {
            try {
                sample.tryExecute();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(3000);
                sample.setExecute();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t1.start();
        t2.start();
    }
}

上述程序中的互斥鎖是當(dāng)前對(duì)象this颗搂,因此在調(diào)用wait()notifyAll()方法時(shí)省略了this担猛,寫完整應(yīng)該是this.wait()this.notifyAll()

在判斷程序是否滿足運(yùn)行條件時(shí),tryExecute()方法中使用了循環(huán)while (!mShouldExecute)傅联,這是因?yàn)榫€程t2調(diào)用notifyAll()后先改,線程t1并不會(huì)馬上執(zhí)行,而是要去爭奪互斥鎖蒸走。有可能爭奪到互斥鎖時(shí)仇奶,運(yùn)行條件又不滿足了,因此需要重新判斷比驻。

線程"等待-通知"機(jī)制可以用來實(shí)現(xiàn)阻塞隊(duì)列(BlockingQueue)该溯。阻塞隊(duì)列是一種"生產(chǎn)者-消費(fèi)者"模型的數(shù)據(jù)結(jié)構(gòu),當(dāng)隊(duì)列為空時(shí)别惦,所有嘗試獲取數(shù)據(jù)的線程都會(huì)休眠朗伶,直到別的線程成功添加數(shù)據(jù)后喚醒它們;當(dāng)隊(duì)列已滿時(shí)步咪,所有嘗試添加數(shù)據(jù)的線程都會(huì)休眠,直到獲取數(shù)據(jù)成功的線程喚醒它們益楼。

public class BlockingQueue<T> {

    private Queue<T> mQueue;
    private int mCapacity;

    public BlockingQueue(int capacity) {
        mQueue = new ArrayDeque<>(capacity);
        mCapacity = capacity;
    }

    /**
     * 嘗試向隊(duì)列添加數(shù)據(jù), 如果隊(duì)列已滿則休眠當(dāng)前線程
     */
    public void offer(T t) throws InterruptedException {
        synchronized (this) {
            while (isQueueFull()) {
                wait();
            }
            mQueue.offer(t);
            notifyAll();
        }
    }

    /**
     * 嘗試獲取隊(duì)頭數(shù)據(jù), 如果隊(duì)列為空則休眠當(dāng)前線程
     */
    public T take() throws InterruptedException {
        synchronized (this) {
            while (isQueueEmpty()) {
                wait();
            }
            T result = mQueue.poll();
            notifyAll();
            return result;
        }
    }

    /**
     * 判斷隊(duì)列是否為空
     */
    private boolean isQueueEmpty() {
        return mQueue.isEmpty();
    }

    /**
     * 判斷隊(duì)列是否已滿
     */
    private boolean isQueueFull() {
        return mQueue.size() == mCapacity;
    }
}

3.2 Lock

Lock用于解決synchronized在某些場景下的缺陷猾漫,在以下場景中synchronized無法實(shí)現(xiàn)最佳效果,但是Lock可以輕松解決感凤。

  1. 當(dāng)線程進(jìn)入synchronized修飾的方法或代碼塊中悯周,只有等線程執(zhí)行完或者調(diào)用wait()方法才會(huì)釋放鎖。如果一個(gè)線程在進(jìn)行耗時(shí)任務(wù)陪竿,那么其他線程都必須等待它運(yùn)行完畢禽翼,無法中斷。
  2. 使用synchronized保護(hù)共享變量時(shí)族跛,在同一時(shí)刻最多只有一個(gè)線程進(jìn)行讀寫闰挡。但是多個(gè)讀線程并不沖突,如果讀線程也互斥的話會(huì)影響程序效率礁哄。針對(duì)這種情況长酗,Lock提供了讀寫鎖ReadWriteLock。
  3. 使用synchronized實(shí)現(xiàn)"等待-通知"機(jī)制時(shí)桐绒,調(diào)用notifyAll()會(huì)喚醒所有阻塞的線程夺脾,無法喚醒特定的線程。例如在3.1.3的阻塞隊(duì)列中茉继,如果某個(gè)線程執(zhí)行poll()方法取出了隊(duì)列中的最后一個(gè)數(shù)據(jù)咧叭,隨后只需要喚醒那些調(diào)用offer(T t)的線程即可,而notifyAll()會(huì)喚醒所有線程烁竭,如果調(diào)用poll()方法的線程搶占到互斥鎖菲茬,也會(huì)馬上發(fā)現(xiàn)條件不滿足,然后繼續(xù)休眠。

Lock本身是一個(gè)接口生均,針對(duì)不同的場景Lock提供了不同的實(shí)現(xiàn)類听想,接口如下所示。

    /**
     * 獲取鎖
     *
     * Lock的實(shí)現(xiàn)應(yīng)該能夠監(jiān)測到鎖的錯(cuò)誤使用, 例如可能產(chǎn)生死鎖或拋出異常
     * Lock的實(shí)現(xiàn)必須記錄鎖的情況和異常類型
     */
    void lock();

    /**
     * 獲取鎖, 除非線程被中斷
     *
     * 如果當(dāng)前線程在進(jìn)入方法前或者在獲取鎖的過程中被設(shè)置為interrupted
     * 那么會(huì)拋出InterruptedException并且當(dāng)前線程的中斷狀態(tài)會(huì)被清除
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 如果當(dāng)前鎖可用, 則獲取鎖, 否則直接返回false
     *
     * 一個(gè)典型的用法如下所示: 
     * Lock lock = ...;
     * if (lock.tryLock()) {
     *   try {
     *     // manipulate protected state
     *   } finally {
     *     lock.unlock();
     *   }
     * } else {
     *   // 備用邏輯
     * }}
     */
    boolean tryLock();

    /**
     * 如果在給定時(shí)間內(nèi)線程沒有被中斷且獲取到鎖則返回true
     * 如果在給定時(shí)間之后線程還未獲取到鎖, 則返回false
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 釋放鎖
     */
    void unlock();

    /**
     * 返回一個(gè)綁定了當(dāng)前Lock的條件變量Condition, 用于實(shí)現(xiàn)線程的"等待-通知"機(jī)制
     * 在某個(gè)Condition上被阻塞的線程可以被單獨(dú)喚醒
     */
    Condition newCondition();

Lock作為并發(fā)包的實(shí)現(xiàn)马胧,在使用上與synchronized關(guān)鍵字有所不同汉买。synchronized是自動(dòng)加鎖/解鎖的,即使在同步代碼塊中出現(xiàn)異常佩脊,JVM也能保證鎖正常釋放蛙粘。但是Lock無法做到,在使用Lock時(shí)威彰,需要遵守以下的范式來保證遇到異常時(shí)也能正常釋放鎖出牧,否則其他線程永遠(yuǎn)得不到運(yùn)行的機(jī)會(huì)。

public void fun() {
    lock.lock();
    try {
        // ......
    } finally {
        rtl.unlock();
    }
}
3.2.1 可重入鎖ReentrantLock

可重入鎖指的是線程可以重復(fù)獲取同一把鎖歇盼,示例如下所示舔痕,一個(gè)線程運(yùn)行到fun1()時(shí)獲取到了鎖,在未釋放鎖的情況下調(diào)用fun2()是可以正常運(yùn)行的豹缀。
synchronized關(guān)鍵字也是可重入鎖伯复,因?yàn)樗募渔i操作本質(zhì)上就是在鎖這個(gè)Java對(duì)象的對(duì)象頭寫入當(dāng)前線程的id,表示鎖被當(dāng)前線程占有了邢笙,自然是可重入的啸如。

public class Sample {
    private Lock lock = new ReentrantLock();

    public void fun1() {
        lock.lock();
        try {
            fun2(); // fun2()也需要獲取鎖
        } finally {
            lock.unlock();
        }
    }

    public void fun2() {
        lock.lock();
        try {
            // ......
        } finally {
            lock.unlock();
        }
    }

使用synchronized關(guān)鍵字時(shí),可以通過wait()notifyAll()實(shí)現(xiàn)線程的"等待-通知"機(jī)制氮惯,在3.1.3中通過它們實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列叮雳。但是使用wait()notifyAll()存在的問題是,只能喚醒所有休眠的線程妇汗,而無法根據(jù)當(dāng)前的條件喚醒特定的線程去執(zhí)行帘不。

但是Lock解決了這個(gè)問題,Lock接口有個(gè)方法Condition newCondition()可以新建一個(gè)與當(dāng)前Lock綁定的條件變量杨箭⊙峋可以通過Condition.await()方法使某個(gè)線程休眠,當(dāng)該條件變量滿足后告唆,可以通過Condition.signalAll()喚醒該條件變量下休眠的線程棺弊。

下面用ReentrantLock和Condition實(shí)現(xiàn)阻塞隊(duì)列如下,此時(shí)可以創(chuàng)建兩個(gè)Condition擒悬,一個(gè)Condition表示隊(duì)列不滿模她,如果線程添加數(shù)據(jù)時(shí)發(fā)現(xiàn)隊(duì)列已滿,那么阻塞在該Condition上懂牧,直到有數(shù)據(jù)出隊(duì)時(shí)喚醒阻塞在該條件上的線程侈净;另一個(gè)Condition表示隊(duì)列不空尊勿,如果線程獲取數(shù)據(jù)時(shí)發(fā)現(xiàn)隊(duì)列為空,則阻塞在該Condition上畜侦,直到有數(shù)據(jù)入隊(duì)時(shí)喚醒阻塞在該條件上的線程元扔。

public class BlockingQueue<T> {
    final Lock lock = new ReentrantLock();

    final Condition notFull = lock.newCondition(); // 條件: 隊(duì)列不滿
    final Condition notEmpty = lock.newCondition(); // 條件: 隊(duì)列不空
    
    private Queue<T> mQueue;
    private int mCapacity;
    
    public BlockingQueue(int capacity) {
        mQueue = new ArrayDeque<>(capacity);
        mCapacity = capacity;
    }
    
    public void offer(T t) {
        lock.lock();
        try {
            while (isQueueFull()) {
                notFull.await();
            }
            mQueue.offer(t);
            // 入隊(duì)后, 通知可以出隊(duì)了
            notEmpty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private boolean isQueueFull() {
        return mQueue.size() == mCapacity;
    }

    public T take() {
        T result = null;
        lock.lock();
        try {
            while (isQueueEmpty()) {
                notEmpty.await();
            }
            result = mQueue.poll();
            // 出隊(duì)后, 通知可以入隊(duì)了
            notFull.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return result;
    }

    private boolean isQueueEmpty() {
        return mQueue.size() == 0;
    }
}
3.2.2 讀寫鎖ReadWriteLock

使用synchronized關(guān)鍵字保護(hù)共享變量時(shí),線程的讀操作也會(huì)互斥旋膳,但是多個(gè)線程的讀操作并不會(huì)產(chǎn)生并發(fā)問題澎语。針對(duì)讀寫場景,Java并發(fā)包提供了讀寫鎖ReadWriteLock验懊,它是一個(gè)接口该肴,如下所示攀痊。

public interface ReadWriteLock {
    Lock readLock(); // 返回讀鎖

    Lock writeLock(); // 返回寫鎖
}

ReadWriteLock的實(shí)現(xiàn)為ReentrantReadWriteLock夹纫,從命名可以看出蝌戒,這是一個(gè)可重入鎖。當(dāng)調(diào)用readLock()writeLock()獲取讀鎖或?qū)戞i時(shí)碱工,返回的是ReentrantReadWriteLock中的內(nèi)部變量readerLock和writerLock娃承,它們都是Lock接口的實(shí)現(xiàn)類。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    // 默認(rèn)新建非公平鎖
    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    ......
}

在使用之前先來看一下ReadWriteLock的特性:

  1. 讀和讀之間不互斥怕篷,讀和寫之間互斥草慧,寫和寫之間互斥。這意味著沒有寫鎖時(shí)匙头,讀鎖可以被多個(gè)線程持有。
  2. 讀寫鎖只適用于讀多寫少的場景仔雷,例如某個(gè)很少被修改但是經(jīng)常被搜索的數(shù)據(jù)(如條目)就適合使用讀寫鎖蹂析。如果寫操作較為頻繁,那么數(shù)據(jù)大部分時(shí)間都被獨(dú)占鎖占據(jù)碟婆,并不會(huì)提升并發(fā)性能电抚。
  3. 持有讀鎖的線程無法直接獲取寫鎖,但是持有寫鎖的線程可以獲取讀鎖竖共,其他線程無法獲取讀鎖蝙叛。換句話說,讀鎖無法升級(jí)為寫鎖公给,寫鎖可以降級(jí)為讀鎖借帘。
  4. 讀鎖和寫鎖都實(shí)現(xiàn)了Lock接口,因此它們都支持tryLock()lockInterruptibly()等方法淌铐。但是只有寫鎖支持Condition肺然,讀鎖不支持使用條件變量。

使用ReadWriteLock的一個(gè)簡單示例如下腿准。

public class Sample<T> {
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private T mData;

    public T read() {
        readWriteLock.readLock().lock();
        try {
            return mData;
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
    
    public void write(T t) {
        readWriteLock.writeLock().lock();
        try {
            mData = t;
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
}
3.2.3 支持樂觀讀的StampedLock

StampedLock具有三種支持讀/寫功能的模式际起,它的狀態(tài)由版本(stamp)和當(dāng)前的模式組成。StampedLock獲取鎖的方法會(huì)返回一個(gè)stamp,該值用于表示當(dāng)前鎖的狀態(tài)街望,在釋放鎖和轉(zhuǎn)換鎖時(shí)需要stamp作為參數(shù)校翔,如果它與鎖的狀態(tài)不匹配就會(huì)失敗。StampedLock支持的三種模式為寫灾前、讀和樂觀讀防症。

  1. 寫模式,類似讀寫鎖的寫鎖豫柬。線程調(diào)用StampedLock.writeLock()后獨(dú)占訪問數(shù)據(jù)告希,該方法返回一個(gè)stamp表示鎖的狀態(tài),調(diào)用StampedLock.unlockWrite(stamp)解鎖時(shí)需要該stamp烧给。
    tryWriteLock()tryWriteLock()方法也會(huì)返回stamp燕偶,這2個(gè)方法獲取鎖失敗時(shí)會(huì)返回0。當(dāng)鎖處于寫模式時(shí)無法獲得讀鎖础嫡,所有樂觀讀的驗(yàn)證都將失敗指么。
  2. 讀模式,類似讀寫鎖的讀鎖榴鼎。線程調(diào)用StampedLock.readLock()后進(jìn)行讀操作伯诬,多個(gè)線程的讀操作可同時(shí)進(jìn)行。讀鎖加鎖時(shí)也會(huì)返回stamp巫财,用于解鎖時(shí)調(diào)用StampedLock.unlockRead(stamp)使用盗似。
  3. 樂觀讀模式,該模式是讀鎖的弱化版本平项,使用時(shí)不需要獲取鎖赫舒。樂觀讀的思想是,假設(shè)讀的過程中數(shù)據(jù)并未被修改闽瓢。樂觀讀避免了鎖的爭用并提高了吞吐量接癌,但是它的正確性無法保證,因此讀到結(jié)果后需要驗(yàn)證在樂觀讀時(shí)是否有線程獲取了寫鎖扣讼。
    調(diào)用StampedLock.tryooptimisticread()后可進(jìn)行樂觀讀缺猛,如果當(dāng)前不處于寫入模式則返回非零stamp,樂觀讀結(jié)束后調(diào)用validate(stamp)判斷結(jié)果是否有效椭符。

StampedLock的使用依賴于對(duì)所保護(hù)的數(shù)據(jù)荔燎、對(duì)象和方法的了解,如果樂觀讀的結(jié)果未經(jīng)驗(yàn)證销钝,不應(yīng)該用于無法忍受潛在不一致的方法湖雹。StampedLock是不可重入的,因此獲取了鎖的線程不應(yīng)該調(diào)用其他嘗試獲取鎖的方法曙搬。

來看官方提供的例子摔吏,Point類有3個(gè)方法鸽嫂,move(...)方法描述了寫模式的一般流程;distanceFromOrigin()描述了樂觀讀模式的流程征讲,如果樂觀讀的數(shù)據(jù)失效据某,則獲取讀鎖進(jìn)行讀操作;moveIfAtOrigin(...)描述了StampedLock的鎖升級(jí)诗箍,在平時(shí)開發(fā)中癣籽,鎖的升級(jí)要慎重使用。

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    // 寫鎖
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 樂觀讀
    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) {
            // 如果數(shù)據(jù)已過時(shí), 則獲取讀鎖進(jìn)行讀操作
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 鎖的升級(jí)
    void moveIfAtOrigin(double newX, double newY) {
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    // 需要將升級(jí)后的stamp值更新
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

四滤祖、線程安全的變量

互斥鎖是用于保護(hù)共享變量的筷狼,但如果一個(gè)變量不會(huì)被多個(gè)線程同時(shí)訪問也就沒有保護(hù)的必要了,例如方法內(nèi)的局部變量匠童。根據(jù)Java虛擬機(jī)規(guī)范埂材,每個(gè)線程的內(nèi)存中都包含程序計(jì)數(shù)器、虛擬機(jī)棧以及本地方法區(qū)這3塊內(nèi)存汤求,其中虛擬機(jī)棧用于存儲(chǔ)線程的方法調(diào)用俏险,當(dāng)調(diào)用某個(gè)方法時(shí),線程會(huì)在虛擬機(jī)棧中壓入一個(gè)棧幀扬绪;等該方法調(diào)用完畢時(shí)竖独,對(duì)應(yīng)的棧幀就會(huì)從虛擬機(jī)棧中彈出,而局部變量表就保存在棧幀中挤牛。

虛擬機(jī)棧.png

在大部分Java虛擬機(jī)的實(shí)現(xiàn)中莹痢,虛擬機(jī)棧的大小是固定的,只有少數(shù)JVM的虛擬機(jī)棧是可以擴(kuò)展的墓赴。虛擬機(jī)棧大小固定的情況下竞膳,如果進(jìn)行一個(gè)深度過大的遞歸調(diào)用時(shí),就可能產(chǎn)生StackOverflow異常竣蹦,就是因?yàn)樘摂M機(jī)棧沒有足夠的空間去存儲(chǔ)一個(gè)棧幀了。

當(dāng)然共享變量也不一定是線程不安全的沧奴,下面介紹3種線程安全的變量痘括,它們實(shí)現(xiàn)線程安全的方式各不相同。

4.1 不可變變量

并發(fā)環(huán)境下對(duì)共享變量的讀寫會(huì)出現(xiàn)問題滔吠,但是只讀不寫就沒有并發(fā)問題纲菌。如果一個(gè)共享變量在初始化后不會(huì)再改變,它就是線程安全變量疮绷。我們知道被final關(guān)鍵字修飾的變量是必須被初始化且初始化后不再改變的翰舌,那么被final修飾的共享變量是否就是線程安全的呢?我們以兩個(gè)例子來說明冬骚。

先來看這個(gè)例子椅贱,類中有2個(gè)共享變量分別是String和int類型的懂算,它們在構(gòu)造方法中會(huì)被初始化,并且不可更改庇麦,因此是線程安全的计技。

public class Sample {
    private final String mString;
    private final int mInt;
    
    public Sample(String s, int i) {
        mString = s;
        mInt = i;
    }
    
    public String getString() {
        return mString;
    }
    
    public int getInt() {
        return mInt;
    }
}

來看第二個(gè)例子,雖然Sample中將mTest修飾為final變量山橄,但是這里的final只是表示mTest對(duì)象的內(nèi)存地址不可變垮媒,對(duì)象中的值是可變的,所以通過getTest()獲取到mTest變量后航棱,是可以對(duì)mTest中的String和int重新賦值的睡雇。而Test類中的String和int變量不是final變量,所以Sample不是線程安全的饮醇。

public class Sample {
    private final Test mTest;
    
    public Sample(Test test) {
        this.mTest = test;
    }
    
    public Test getTest() {
        return mTest;
    }

    static class Test {
        public String s;
        public int i;
    }
}

如果想要讓類的對(duì)象是不可變的它抱,那么類的屬性應(yīng)該全部修飾為final,如果某個(gè)屬性不是基本類型驳阎,那么該類也應(yīng)將所有屬性修飾為final抗愁。

需要注意的是,String類型是不可變的呵晚,當(dāng)調(diào)用String的subString()replace()方法時(shí)不是修改原來的String蜘腌,而是生成一個(gè)新的String對(duì)象。但是StringBuilder和StringBuffer是可變的饵隙,調(diào)用append()方法時(shí)是在原對(duì)象上操作的撮珠。

4.2 原子類變量

在1.1中我們解決a++原子性問題的方法是使用synchronized,但是加鎖/解鎖操作以及線程切換都是比較消耗性能的金矛。但是有一種變量叫原子類變量芯急,它的方法本身就是原子操作,這是通過CPU指令支持的驶俊,因此具有很好的效率娶耍。

4.2.1 基本變量原子類

原子類中的AtomicBoolean, AtomicInteger, AtomicLong對(duì)應(yīng)基本類型的Boolean, Integer, Long,這3個(gè)原子類共有的方法如下饼酿。

// 只有當(dāng)前原子變量的值是expect才更新為update, 成功返回true, 失敗返回false
boolean compareAndSet(expect, update);
// 將原子變量的值更新為newValue并返回更新前的值
boolean/int/long getAndSet(newValue);
// 將原子變量的值更新為newValue, 但是對(duì)其他線程并不一定立即可見
void lazySet(newValue);

當(dāng)然AtomicInteger和AtomicLong還有另外的方法榕酒,這些方法通常成對(duì)出現(xiàn),例如getAndIncrement()incrementAndGet()故俐,它們的功能都是自增想鹰,唯一的不同就在于返回的是更新前的值還是更新后的值。如果1.1的例子使用原子類來實(shí)現(xiàn)药版,只需要將a改為原子變量a = new new AtomicInteger(0)并將a++改為a.getAndIncrement()就能得到正確的結(jié)果辑舷。

// 增加delta, 返回更新后的值, getAndAdd()與其作用相同但是返回更新前的值
int/long addAndGet(int delta);
// 通過IntBinaryOperator對(duì)prev和x計(jì)算得到新的值, IntBinaryOperator的方法必須無副作用
int/long accumulateAndGet(x, IntBinaryOperator);
// 通過IntUnaryOperator對(duì)prev計(jì)算得到新的值, IntUnaryOperator的方法必須無副作用
int/long updateAndGet(IntUnaryOperator updateFunction);

原子類通過CPU提供的CAS指令實(shí)現(xiàn)了原子性,該指令本身具有原子性槽片。CAS指Compare And Swap何缓,即比較并交換肢础,該指令先比較共享變量的值與期望值,只有這兩個(gè)值相等才將共享變量的值更新歌殃。原子類的compareAndSet(expect, update)方法就是通過native的compareAndSwapXXX(...)方法實(shí)現(xiàn)的乔妈。

CAS+自旋是并發(fā)編程的一大利器,在Java并發(fā)包中得到了廣泛的應(yīng)用氓皱,"自旋"實(shí)際上就是通過循環(huán)不斷嘗試CAS操作直到成功路召。例如AtomicInteger的getAndAdd(delta)方法,它實(shí)際調(diào)用Unsafe中的getAndAddInt(Object var1, long var2, int var4)方法波材,如下所示股淡。由于在進(jìn)行CAS指令之前,原子類的值可能被其他線程修改廷区,因此需要循環(huán)獲取原子變量當(dāng)前的值并調(diào)用compareAndSwapInt(...)直到成功唯灵。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}
4.2.2 原子數(shù)組類

數(shù)組原子類包括AtomicIntegerArray, AtomicLongArray和AtomicReferenceArray,它們的方法都大同小異隙轻。以AtomicIntegerArray為例埠帕,它有2個(gè)構(gòu)造方法,一個(gè)傳入數(shù)組長度敛瓷,數(shù)組中所有的值被初始化為0呐籽,另一個(gè)構(gòu)造方法直接傳入一個(gè)int數(shù)組蚀瘸。

public AtomicIntegerArray(int length) {
    array = new int[length];
}

public AtomicIntegerArray(int[] array) {
    // Visibility guaranteed by final field guarantees
    this.array = array.clone();
}

相比于AtomicInteger,AtomicIntegerArray的方法在使用時(shí)只是多傳入了數(shù)組下標(biāo)參數(shù)i贮勃,看看下面的方法寂嘉,是不是似曾相識(shí)丝格?AtomicLongArray同理。
AtomicReferenceArray是原子引用的數(shù)組订咸,接下來的4.2.3會(huì)講解原子引用AtomicReference骆撇,使用原子數(shù)組類時(shí)只需加上下標(biāo)即可神郊。

boolean compareAndSet(int i, int expect, int update);
int getAndAdd(int i, int delta);
int getAndIncrement(int i);
......
4.2.3 原子引用類

引用原子類為AtomicReference,那么它是將改變引用這個(gè)操作封裝成了原子操作嗎夕晓?當(dāng)然不是!因?yàn)楦淖円眠@個(gè)操作本身就是原子的躬贡,對(duì)引用賦值只需要一條指令逗宜,它不會(huì)被CPU打斷。
這點(diǎn)也可以從AtomicReference的set(V newValue)方法看出來熬甚,該方法只是對(duì)引用賦值。

public final void set(V newValue) {
    value = newValue;
}

因此AtomicReference的價(jià)值就在于它的CAS指令诲泌,用于防止其他線程隨意篡改引用的值敷扫。

boolean compareAndSet(V expect, V update);

使用AtomicReference要注意是否存在ABA問題绘迁,先來解釋一下什么是ABA問題。
之前使用AtomicInteger時(shí)膛腐,我們將原子類的值當(dāng)成了它的狀態(tài)依疼,在調(diào)用compareAndSet(A, update)時(shí)只要原子類的值與A相等就認(rèn)為它沒有被修改過,但這樣的判斷是存在隱患的误辑。如果T1線程在調(diào)用compareAndSet(A, update)之前,T2線程將值從A改為B砰苍,再從B改為A赚导,雖然原子類的值沒變吼旧,但是它的狀態(tài)已經(jīng)發(fā)生了變化。隨后T1線程執(zhí)行CAS時(shí)發(fā)現(xiàn)值未變就會(huì)繼續(xù)執(zhí)行员串,但是原子類狀態(tài)上的變化會(huì)引起一些意想不到的錯(cuò)誤。

維基百科的CAS詞條[參考2]描述了一個(gè)無鎖椃梅蓿可能出現(xiàn)的ABA問題。先解釋一下無鎖棧卧斟,如果需要實(shí)現(xiàn)一個(gè)支持并發(fā)的棧,有哪些方式可以實(shí)現(xiàn)呢板乙?
顯而易見的實(shí)現(xiàn)方式是在pop()push()方法上加鎖,這種實(shí)現(xiàn)簡單粗暴放接,不會(huì)出錯(cuò),但是效率較低。更好的方式是使用CAS+自旋汉操,該實(shí)現(xiàn)不需要加鎖,通過CAS+自旋實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)也被稱為無鎖結(jié)構(gòu)采缚。

以無鎖棧為例,其數(shù)據(jù)結(jié)構(gòu)為鏈表贸呢,通過一個(gè)指針head指向棧頂怔鳖,棧的push和pop操作都依賴head完成,當(dāng)head指向null時(shí)棧為空献幔。

無鎖棧.png

為了保證線程對(duì)head操作時(shí)的原子性,需要將head定義為原子引用AtomicReference<StackNode>铸敏,無鎖棧代碼如下所示。

下面以push操作為例,介紹CAS+自旋是如何工作的禁筏。如下方代碼的push()方法所示,新建pushNode后州刽,將pushNode.next指向head,并通過CAS將head指向pushNode门坷。但是執(zhí)行CAS之前head可能會(huì)被修改,因此如果CAS執(zhí)行失敗绸吸,就需要重新為pushNode賦值并再次嘗試CAS直到成功温数。

public class ConcurrentStack {
    private AtomicReference<StackNode> headReference;

    public ConcurrentStack() {
        headReference = new AtomicReference<>(null);
    }

    public void push(Integer i) {
        StackNode pushNode = new StackNode(i);
        while (true) {
            StackNode head = headReference.get();
            pushNode.next = head;
            // 其他線程可能在此處修改了head的值
            if (headReference.compareAndSet(head, pushNode)) {
                break;
            }
        }
    }

    public StackNode pop()  {
        while (true) {
            StackNode head = headReference.get();
            if (head == null) {
                return null;
            }
            StackNode newHead = head.next;
            // 其他線程可能在此處修改了head的值
            if (headReference.compareAndSet(head, newHead)) {
                return head;
            }
        }
    }
    
    private static class StackNode {
        public int value;
        public StackNode next;
        
        public StackNode(Integer i) {
            value = i;
        }
    }
}

那么在什么情況下鹉胖,無鎖棧會(huì)發(fā)生ABA問題呢?假設(shè)當(dāng)前棧如下所示。

ABA-初始狀態(tài).png

此時(shí)T1線程調(diào)用pop(),但是在執(zhí)行CAS之前該線程被掛起丘喻,也就是停在了上述pop()方法的注釋處。此時(shí)head指向StackNode4,newHead指向StackNode3叽躯。

但是這時(shí)T2線程調(diào)用了2次pop()使得StackNode4和StackNode3出棧酣难,隨后新建另一個(gè)值為4的節(jié)點(diǎn)StackNode4*并push入棧。可以看到此時(shí)StackNode4和StackNode3是游離在棧外的尾膊,但是StackNode3.next還是指向StackNode2的。T1線程中的臨時(shí)變量head指向StackNode4抓谴,臨時(shí)變量newHead指向StackNode3。

ABA-T2操作后.png

如果由于某種原因,StackNode4*的地址與T1線程中的head(也就是StackNode4)相等,那么T1線程獲得時(shí)間片后會(huì)認(rèn)為head并未發(fā)生變化并執(zhí)行CAS券犁,即使此時(shí)棧的結(jié)構(gòu)和T1掛起前已經(jīng)不一樣了。T1執(zhí)行完之后棧如下所示,該結(jié)果與預(yù)期的并不一致褂删。

ABA-T1操作后.png

問題在于,StackNode4*的地址可能與StackNode4一樣嗎钦无?換個(gè)問法,新的對(duì)象可能與之前的對(duì)象地址相同嗎?當(dāng)然有可能决记!主要是以下2種情況:

① 該類使用享元模式按价,所有值一樣的對(duì)象實(shí)際都是一個(gè)。但是StackNode不可能使用享元模式凄杯,因?yàn)楣?jié)點(diǎn)的next值都是不同的。
② 類使用了對(duì)象緩存池,以StackNode為例隔崎,被pop的節(jié)點(diǎn)會(huì)被緩存,需要新節(jié)點(diǎn)時(shí)先從緩存池中獲取实牡。

了解了ABA問題產(chǎn)生的原因后,我們通過代碼來模擬。由于該場景難以模擬携栋,因此選擇在棧的pop()方法中進(jìn)行延時(shí),保證其他線程在CAS之前能夠修改head,修改后的ConcurrentStack如下。

public class ConcurrentStack {

    省略其他代碼......

    public StackNode pop(long t)  {
        while (true) {
            StackNode head = headReference.get();
            if (head == null) {
                return null;
            }
            StackNode newHead = head.next;
            try {
                Thread.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (headReference.compareAndSet(head, newHead)) {
                return head;
            }
        }
    }
}

然后修改StackNode類,通過ConcurrentHashMap緩存被回收的StackNode對(duì)象蒜危,獲取StackNode對(duì)象統(tǒng)一通過obtain(Integer i)方法。

public class StackNode {

    public int value;
    public StackNode next;
    private static ConcurrentHashMap<Integer, StackNode> mCache = new ConcurrentHashMap<>();

    public static StackNode obtain(Integer i) {
        StackNode stackNode = mCache.remove(i);
        if (stackNode == null) {
            stackNode = new StackNode(i);
        }
        return stackNode;
    }

    public static void recycle(StackNode stackNode) {
        mCache.putIfAbsent(stackNode.value, stackNode);
    }

    private StackNode(Integer i) {
        value = i;
    }
}

隨后運(yùn)行以下代碼測試,就會(huì)發(fā)現(xiàn)無鎖棧最后的結(jié)果與預(yù)期不符。代碼中的注釋詳細(xì)描述了每一步的運(yùn)行流程,不再贅述蛔翅。

public class Main {

    public static void main(String[] args) throws InterruptedException {
        ConcurrentStack concurrentStack = new ConcurrentStack();
        // 初始化棧為 StackNode4->StackNode3->StackNode2->StackNode1->StackNode0
        for (int i = 0; i < 5; i++) {
            concurrentStack.push(i);
        }
        // 開啟 popThread, 在 CAS 前 sleep 一段時(shí)間, 讓另一個(gè)線程去修改棧
        Thread popThread = new Thread(() -> {
            concurrentStack.pop(1000);
        });
        // 開啟 abaThread 先 pop StackNode4 和 StackNode3, 并回收 StackNode4
        // 再 push StackNode4, 正常情況下棧應(yīng)為 4->2->1->0
        // 但是 popThread 在休眠后繼續(xù)執(zhí)行 CAS 時(shí)發(fā)現(xiàn)棧頭還是 StackNode4, 誤以為棧未發(fā)生變化
        // 因此 popThread 將 headReference 賦值給已經(jīng)被 pop 的 StackNode3
        // 而 StackNode3.next 指向 StackNode2, 最后棧為 3->2->1->0
        Thread abaThread = new Thread(() -> {
            StackNode.recycle(concurrentStack.pop(0));
            concurrentStack.pop(0);
            concurrentStack.push(4);
        });
        popThread.start();
        Thread.sleep(500);
        abaThread.start();
        popThread.join();
        // 運(yùn)行完, 查看棧的結(jié)果
        StackNode s;
        System.out.print("當(dāng)前棧為: ");
        while ((s = concurrentStack.pop(0)) != null) {
            System.out.print(s.value + " -> ");
        }
    }
}

針對(duì)這個(gè)問題,并發(fā)包中自然也提供了解決方案,那就是AtomicStampedReference,該類通過stamp記錄原子類當(dāng)前的版本洁灵,每次修改都會(huì)更新版本。

4.3 ThreadLocal

ThreadLocal也被稱為線程本地變量,它為各個(gè)線程都存儲(chǔ)了一個(gè)變量琅翻。每個(gè)線程調(diào)用ThreadLocal.set(value)ThreadLocal.get()時(shí)操作的都是當(dāng)前線程對(duì)應(yīng)的變量聂抢,因此ThreadLocal是線程安全的。

4.3.1 基本使用

Java提供了一個(gè)例子,通過ThreadLocal為每個(gè)線程設(shè)置一個(gè)ID,代碼如下所示篱瞎。

public class ThreadId {
    // 通過原子變量為線程設(shè)置ID
    private static final AtomicInteger nextId = new AtomicInteger(0);
    // 重寫ThreadLocal的initialValue()方法, 用于初始化各線程對(duì)應(yīng)的變量
    private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return nextId.getAndIncrement();
        }
    };

    // Returns the current thread's unique ID, assigning it if necessary
    public static int get() {
        return threadId.get();
    }
}

在新建ThreadLocal時(shí)重寫了它的initialValue()方法,該方法的作用是,如果某個(gè)線程調(diào)用ThreadLocal.get()時(shí)發(fā)現(xiàn)值為null腰奋,就會(huì)調(diào)用initialValue()初始化該線程對(duì)應(yīng)的變量。

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }

ThreadLocal還提供了一個(gè)靜態(tài)的構(gòu)造方法withInitial(Supplier<? extends S> supplier),也用于為ThreadLocal的每個(gè)線程設(shè)置初始值。

ThreadLocal<Integer> threadId = ThreadLocal.withInitial(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return nextId.getAndIncrement();
    }
});

ThreadLocal.withInitial(...)返回的是SuppliedThreadLocal對(duì)象,它繼承了ThreadLocal并重寫了initialValue()方法,其實(shí)與第一種初始化方法沒什么區(qū)別蔓挖。

    static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
    }
4.3.2 ThreadLocal實(shí)現(xiàn)原理

在ThreadLocal中角溃,線程和數(shù)據(jù)是一對(duì)一的關(guān)系,這種對(duì)應(yīng)關(guān)系很容易讓我們聯(lián)想到Map。直觀上來看燕鸽,似乎是ThreadLocal中維護(hù)了一個(gè)Key為線程九昧,Value為數(shù)據(jù)的Map。這樣當(dāng)然也能實(shí)現(xiàn)ThreadLocal,不過在這種實(shí)現(xiàn)下,ThreadLocal成為了線程數(shù)據(jù)的持有者,并且持有Thread的引用胶滋,這種實(shí)現(xiàn)很容易造成內(nèi)存泄漏镀迂。

而在Java的實(shí)現(xiàn)中,Thread才是數(shù)據(jù)的持有者,ThreadLocal是線程本地變量的管理者。Thread中持有一個(gè)ThreadLocalMap類型的Map用于保存該線程的本地變量赚瘦。

public class Thread {
    ......
    ThreadLocal.ThreadLocalMap threadLocals = null;
    ......
}

ThreadLocalMap是ThreadLocal的內(nèi)部類,它是一個(gè)專門設(shè)計(jì)出來,用于維護(hù)線程本地變量的Map亲善,它的Key為ThreadLocal的弱引用戏溺,Value則是數(shù)據(jù)袍睡。問題在于肋僧,為什么要專門實(shí)現(xiàn)一個(gè)Map來維護(hù)線程本地變量斑胜,難道原生的HashMap不滿足需求嗎?下面讓我們帶著問題去分析ThreadLocalMap的源碼嫌吠。

①. 成員變量和構(gòu)造方法
ThreadLocalMap通過Entry[] table保存所有的實(shí)體。實(shí)體Entry繼承自ThreadLocal的弱引用辫诅,Entry中的value保存當(dāng)前線程與ThreadLocal關(guān)聯(lián)的變量凭戴。我們知道,當(dāng)一個(gè)對(duì)象只有弱引用指向它時(shí)炕矮,它就會(huì)被GC回收么夫。因此ThreadLocal本身不會(huì)內(nèi)存泄漏。

    static class ThreadLocalMap {

        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        private static final int INITIAL_CAPACITY = 16; // 初始容量, 容量必須是2的倍數(shù)

        private Entry[] table; // Hash表

        private int size = 0; // Hash表當(dāng)前Entry的數(shù)量

        private int threshold; // size達(dá)到這個(gè)數(shù)量時(shí)擴(kuò)容

        // 根據(jù)容量計(jì)算下一個(gè)索引
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        // 根據(jù)容量計(jì)算上一個(gè)索引
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

        // ThreadLocalMap是懶加載的肤视,只有當(dāng)線程需要保存本地變量時(shí)
        // 才會(huì)新建ThreadLocalMap并在構(gòu)造方法傳入第一個(gè)變量的key和value档痪。
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }
        ......
    }

②. 保存數(shù)據(jù)
ThreadLocl.set(value)方法如下,首先獲取當(dāng)前線程的ThreadLocalMap邢滑,如果存在直接調(diào)用ThreadLocalMap的set(key, value)方法腐螟,如果不存在就新建。

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

下面來看ThreadLocalMap是如何保存數(shù)據(jù)的,它使用線性探測法解決Hash沖突乐纸。保存Entry時(shí)先計(jì)算key的hash值衬廷,再通過index = hash & (len - 1)得到Entry應(yīng)保存的位置。如果table[index]為空則在該位置保存Entry汽绢,如果不為空說明出現(xiàn)hash沖突泵督,對(duì)index向后順移直到table[index+n] (n=1,2,3...)為空。使用線性探測法時(shí)庶喜,如果Hash沖突較多小腊,它的效率會(huì)大幅下降。

線性探測法.png

ThreadLocalMap的set(ThreadLocal<?> key, Object value)方法如下久窟,該方法描述了使用線性探測法時(shí)添加數(shù)據(jù)的整體邏輯秩冈。

    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        // 從位置i開始向后遍歷, 直到table[i]為空
        for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
            // 當(dāng)前key已經(jīng)存在, 則保存新的value即可
            if (k == key) {
                e.value = value;
                return;
            }
            // key為null說明ThreadLocal已經(jīng)被GC回收了
            // 需要用新的數(shù)據(jù)替代之前已經(jīng)過期的
            if (k == null) {
                // replaceStaleEntry()方法做了大量的工作, 下面會(huì)詳細(xì)分析
                replaceStaleEntry(key, value, i);
                return;
            }
        }
        // 遍歷到table[i]為空也沒有發(fā)現(xiàn)相同的key或過期的key
        // 因此需要新建一個(gè)Entry放置在當(dāng)前位置
        tab[i] = new Entry(key, value);
        int sz = ++size;
        // 如果數(shù)量達(dá)到了threshold則進(jìn)行擴(kuò)容并重新hash
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }

接著來看replaceStaleEntry(key, value, staleSlot)方法,該方法用于將要插入的數(shù)據(jù)放到合適的位置斥扛,并清除Hash表中過期的key和value入问。

    private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
        // 向前遍歷, 找到最前面的Entry不為空但是key被回收的位置
        int slotToExpunge = staleSlot;
        for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;

        // 從staleSlot下一個(gè)位置開始向后遍歷
        for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            // 如果找到了與當(dāng)前key相同的Entry
            // 需要將其與staleSlot位置上的Entry對(duì)換, 這樣能保證Hash表的順序
            if (k == key) {
                e.value = value;
                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;
                // slotToExpunge == staleSlot表示向前遍歷時(shí)沒有找到過期的key
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }

            // slotToExpunge == staleSlot表示向前遍歷時(shí)沒有找到過期的key
            // 這里找到的是staleSlot后的第1個(gè)過期的位置, 將其賦值給slotToExpunge
            // staleSlot位置本身不用清除, 因?yàn)橐谠撐恢蒙戏胖眯虏迦氲臄?shù)據(jù)
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }

        // 如果當(dāng)前要插入的key不存在, 那么新建一個(gè)Entry放到staleSlot位置上
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);

        // slotToExpunge != staleSlot表示有過期的key, 需要將它們清除
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

這里通過一個(gè)例子來分析replaceStaleEntry(key, value, staleSlot)方法的運(yùn)行過程,為了便于分析稀颁,假設(shè)使用index = key % 10來計(jì)算數(shù)據(jù)存放的位置芬失。初始時(shí)情況如下圖,假設(shè)當(dāng)前調(diào)用ThreadLocalMap.set(key, value)方法插入一個(gè)key為13的數(shù)據(jù)匾灶,該數(shù)據(jù)的index應(yīng)為3棱烂,發(fā)現(xiàn)該位置上的key已經(jīng)過時(shí),則調(diào)用replaceStaleEntry(key, value, staleSlot)方法阶女。

首先向前遍歷颊糜,找到最前面的Entry不為空但是key已經(jīng)過時(shí)的位置,示例中就是index為1的位置秃踩,因此slotToExpunge為1衬鱼。

replaceStaleEntry1.png

隨后向后遍歷,嘗試尋找key與13相等的位置憔杨,發(fā)現(xiàn)table[4]符合條件鸟赫,隨后將table[4]與staleSlot位置的數(shù)據(jù)table[3]交換,得到結(jié)果如下消别。
這里交換的意義在于維持線性探測法的特性抛蚤,如果不交換的話,table[3]上的數(shù)據(jù)之后會(huì)被清除妖啥,就不滿足線性探測法的規(guī)律了霉颠。因?yàn)橹蟛檎襨ey為13的數(shù)據(jù)的話,定位到table[3]會(huì)發(fā)現(xiàn)為空荆虱,而實(shí)際上key為13的數(shù)據(jù)在table[4]上。

replaceStaleEntry2.png

如果在replaceStaleEntry(key, value, staleSlot)方法中找不到key與要插入數(shù)據(jù)相等的位置,就在staleSlot位置存放要插入的數(shù)據(jù)怀读。將數(shù)據(jù)存放到合適的位置后诉位,最后調(diào)用expungeStaleEntry(slotToExpunge)將Hash表中過期的數(shù)據(jù)清除。

    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        // 清除staleSlot位置上的數(shù)據(jù)
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;

        // 向后遍歷, 對(duì)各個(gè)位置上的數(shù)據(jù)重新hash, 直到Entry為null
        Entry e;
        int i;
        for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) {
                    tab[i] = null;
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i;
    }

③. 獲取數(shù)據(jù)
ThreadLocal獲取數(shù)據(jù)的邏輯比較簡單菜枷,直接調(diào)用了ThreadLocalMap的getEntry(key)方法苍糠。

    private Entry getEntry(ThreadLocal<?> key) {
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }

如果直接key直接命中第一個(gè)數(shù)據(jù),那么直接返回啤誊,否則調(diào)用getEntryAfterMiss()方法岳瞭。方法內(nèi)遵循線性探測法去尋找對(duì)應(yīng)的Entry,在發(fā)現(xiàn)過時(shí)的數(shù)據(jù)時(shí)調(diào)用expungeStaleEntry()方法進(jìn)行清理蚊锹,直到找到對(duì)應(yīng)key的Entry或遍歷到空數(shù)據(jù)瞳筏。

    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;
        while (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == key)
                return e;
            if (k == null)
                expungeStaleEntry(i);
            else
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null;
    }
4.3.3 預(yù)防ThreadLocal內(nèi)存泄漏

ThreadLocalMap內(nèi)部使用弱引用指向ThreadLocal,當(dāng)ThreadLocal被回收時(shí)牡昆,map中對(duì)應(yīng)的數(shù)據(jù)過時(shí)姚炕,Entry和value會(huì)殘留在map中造成內(nèi)存泄露。雖然在ThreadLocalMap下一次添加或刪除數(shù)據(jù)時(shí)就可能被清理丢烘,但也可能一直留在map中柱宦。

因此當(dāng)線程不需要某個(gè)ThreadLocal時(shí)需要手動(dòng)調(diào)用一下ThreadLocal.remove()方法,該方法最終會(huì)調(diào)用ThreadLocalMap的remove(ThreadLocal<?> key)方法去清理對(duì)應(yīng)的Entry播瞳。

    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                e.clear();
                expungeStaleEntry(i);
                return;
            }
        }
    }

五掸刊、并發(fā)容器

由于ArrayList、HashMap等容器不支持并發(fā)赢乓,早期JDK提供了Vector痒给、Hashtable等一系列同步容器,它們實(shí)現(xiàn)的方法是對(duì)所有公有方法進(jìn)行同步骏全,使得同一時(shí)刻只有一個(gè)線程能訪問容器的狀態(tài)苍柏。通過Collections.synchronizedXxx()方法,可以創(chuàng)建各個(gè)基礎(chǔ)容器的同步容器姜贡。

即使Vector這樣的容器對(duì)所有方法都進(jìn)行了同步试吁,但它還是不安全的,這種情況出現(xiàn)在對(duì)容器進(jìn)行復(fù)合操作時(shí)楼咳。以下面的程序?yàn)槔ê矗僭O(shè)A線程執(zhí)行getLast()方法,在step1處掛起母怜,隨后B線程執(zhí)行完了整個(gè)deleteLast()方法余耽,A線程繼續(xù)執(zhí)行時(shí)size的值已經(jīng)過時(shí)了,再執(zhí)行vector.get(size - 1)就會(huì)出錯(cuò)苹熏。

    public Object getLast(Vector vector) {
        int size = vector.size();
        // step1
        return vector.get(size - 1);
    }

    public void deleteLast(Vector vector) {
        int size = vector.size();
        vector.remove(size - 1);
    }

這種“先判斷后執(zhí)行”的操作稱為競態(tài)條件碟贾,再以Hashtable為例币喧,如果有2個(gè)線程執(zhí)行如下putIfNotExist(Object key, Object value)方法,可能會(huì)導(dǎo)致覆蓋插入袱耽。要想解決這個(gè)問題杀餐,需要將整個(gè)復(fù)合操作進(jìn)行同步。

    public void putIfNotExist(Object key, Object value) {
        if (!hashtable.contains(key)) {
            hashtable.put(key, value);
        }
    }

由于同步容器的效率不高且具備競態(tài)條件這樣的隱患朱巨,Java推出了并發(fā)容器來改進(jìn)同步容器的性能史翘,不同的并發(fā)容器對(duì)應(yīng)不同的場景,并且ConcurrentHashMap這樣的并發(fā)容器還封裝了常見的復(fù)合操作冀续。

5.1 CopyOnWriteArrayList

當(dāng)通過Iterator對(duì)容器進(jìn)行迭代時(shí)琼讽,如果有別的線程修改了容器,那么正在迭代的線程會(huì)拋出ConcurrentModificationException洪唐,這是一種“及時(shí)失敗”的機(jī)制钻蹬,用于通知用戶該處代碼存在隱患,想要避免該異常桐罕,需要在迭代過程中加鎖脉让。但是如果容器數(shù)量很大或者每個(gè)元素的操作時(shí)間很長,那么其余線程就會(huì)等待很久功炮。一種解決方案是在容器發(fā)生修改時(shí)克隆該容器溅潜,并在副本上進(jìn)行寫操作,期間的迭代操作都在原容器上進(jìn)行薪伏。

上述方法被稱為寫時(shí)拷貝CopyOnWrite(COW)滚澜,CopyOnWriteArrayList中通過變量array保存實(shí)際的數(shù)組,當(dāng)容器即將發(fā)生變化(add, remove...)時(shí)克隆當(dāng)前容器嫁怀,并在副本上進(jìn)行增刪操作设捐,操作之后再將結(jié)果賦值給array,以add(E e)方法為例塘淑,相關(guān)代碼如下所示萝招。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    private transient volatile Object[] array;

    final void setArray(Object[] a) {
        array = a;
    }

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

    ......
}

當(dāng)對(duì)CopyOnWriteArrayList迭代時(shí),通過iterator()方法新建迭代器存捺,該迭代器遍歷的是當(dāng)前array的快照槐沼。迭代器中已經(jīng)保存了引用,即使別的線程通過setArray(newElements)方法修改了array的引用也不會(huì)出現(xiàn)異常捌治。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    public Iterator<E> iterator() {
        return new COWIterator<E>(getArray(), 0);
    }

    static final class COWIterator<E> implements ListIterator<E> {
        /** Snapshot of the array */
        private final Object[] snapshot;
        /** Index of element to be returned by subsequent call to next.  */
        private int cursor;

        private COWIterator(Object[] elements, int initialCursor) {
            cursor = initialCursor;
            snapshot = elements;
        }
        ......
    }

    ......
}

根據(jù)CopyOnWriteArrayList的實(shí)現(xiàn)岗钩,可以發(fā)現(xiàn)該容器具有“弱一致性”,因?yàn)榈鷷r(shí)容器的內(nèi)容可能會(huì)發(fā)生變化肖油,如果一個(gè)場景能容忍短暫的數(shù)據(jù)不一致性兼吓,才適合使用該容器。例如通過CopyOnWriteArrayList保存監(jiān)聽器森枪,該場景對(duì)時(shí)效性的要求不高视搏,而且遍歷監(jiān)聽器并執(zhí)行是一個(gè)非常耗時(shí)的操作审孽。

5.2 ConcurrentHashMap

ConcurrentHashMap與HashMap一樣是一個(gè)基于散列的Map,JDK1.8中它采用CAS+Node鎖的同步方式來提供更高的并發(fā)性與伸縮性凶朗。ConcurrentHashMap的使用方法與HashMap相同瓷胧,并且封裝了一些常見的復(fù)合操作显拳,如下所示棚愤。

// 當(dāng)前key沒有對(duì)應(yīng)值時(shí)插入
public V putIfAbsent(K key, V value)

// 僅當(dāng)key對(duì)應(yīng)的值為value時(shí)才移除
public boolean remove(Object key, Object value)

// 僅當(dāng)key對(duì)應(yīng)的值為oldValue時(shí)才替換為newValue
public boolean replace(K key, V oldValue, V newValue)

// 僅當(dāng)key對(duì)應(yīng)到某個(gè)值時(shí)才替換為value
public V replace(K key, V value)

ConcurrentHashMap中的檢索操作一般不阻塞,因此可能與更新操作重疊杂数,檢索操作反映的是最近的更新操作完成的結(jié)果宛畦,換句話說,更新操作對(duì)檢索操作來說是可見的(happen-before)揍移。以get()方法為例次和,代碼中并沒有進(jìn)行同步的地方,這是因?yàn)镹ode節(jié)點(diǎn)中的val和next字段都是volatile修飾的那伐,因此一個(gè)線程的更新操作是對(duì)其他線程可見的踏施。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    ......

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); // 得到hash值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                // 槽中的首節(jié)點(diǎn)就是要尋找的節(jié)點(diǎn)
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // hash < 0表示正在擴(kuò)容
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        ......
    }
}

ConcurrentHashMap中的更新操作是通過CAS+Node鎖的方式進(jìn)行同步的,以putVal(K key, V value, boolean onlyIfAbsent)方法為例罕邀,其主體邏輯如下畅形。
① 判斷table是否已經(jīng)初始化,如果沒有則調(diào)用initTable()方法
② 如果當(dāng)前槽為空诉探,則調(diào)用casTabAt(...)插入數(shù)據(jù)
③ 如果當(dāng)前正在擴(kuò)容日熬,當(dāng)前線程也去參與擴(kuò)容(ConcurrentHashMap支持并發(fā)擴(kuò)容)
④ 根據(jù)當(dāng)前是鏈表還是樹插入對(duì)應(yīng)節(jié)點(diǎn)

可以發(fā)現(xiàn)putVal(...)方法的主體邏輯是位于一個(gè)循環(huán)中的,這是一種CAS+自旋的思路肾胯。假設(shè)2個(gè)線程同時(shí)執(zhí)行到第②步竖席,成功的線程會(huì)退出循環(huán),失敗的線程會(huì)開始自旋直到插入成功或失敗敬肚。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) // 如果table為空則進(jìn)行初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 當(dāng)前槽為空毕荐,通過cas插入節(jié)點(diǎn),成功則退出循環(huán)
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 有hash值為MOVED的節(jié)點(diǎn)表示正在擴(kuò)容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) { // 當(dāng)前槽上是鏈表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 當(dāng)前槽上是樹節(jié)點(diǎn)
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

擴(kuò)容Hash表是一個(gè)相對(duì)緩慢的操作艳馒,最好在新建時(shí)提供一個(gè)預(yù)估的大小initialCapacity來構(gòu)造憎亚。

六、線程生命周期

Java中線程的生命周期與操作系統(tǒng)的有所不同鹰溜,在操作系統(tǒng)中虽填,線程只有在真正獲取到CPU使用權(quán)時(shí)才屬于運(yùn)行狀態(tài),如果一個(gè)線程可以執(zhí)行但是沒獲取到CPU曹动,它則屬于就緒狀態(tài)斋日。而在Java中,如果一個(gè)線程在廣義上能夠運(yùn)行墓陈,它就是運(yùn)行狀態(tài)恶守。

Java將線程分為以下6種狀態(tài):

  1. NEW(初始化狀態(tài)):新建了一個(gè)線程對(duì)象第献,但是還沒有調(diào)用線程的start()方法,此時(shí)線程只存在于JVM兔港,在操作系統(tǒng)中它還沒有被創(chuàng)建庸毫。

  2. RUNNABLE(運(yùn)行狀態(tài)):Java線程的RUNNABLE狀態(tài)表達(dá)的含義比較寬泛,它對(duì)應(yīng)操作系統(tǒng)中線程的就緒狀態(tài)衫樊、運(yùn)行狀態(tài)以及部分休眠狀態(tài)飒赃。
    當(dāng)線程在搶占CPU的使用權(quán)時(shí),其對(duì)應(yīng)操作系統(tǒng)中的就緒狀態(tài)科侈;當(dāng)線程調(diào)用操作系統(tǒng)層面的阻塞式API時(shí)(例如I/O)载佳,其對(duì)應(yīng)操作系統(tǒng)中的休眠狀態(tài)。在Java中臀栈,這些狀態(tài)統(tǒng)稱為RUNNABLE蔫慧,JVM并不關(guān)心線程是否真的在運(yùn)行,只要當(dāng)前線程不在等待鎖权薯,也沒有被其他線程阻塞姑躲,那么它就是運(yùn)行狀態(tài)。

  3. BLOCKED(阻塞狀態(tài)):當(dāng)線程正在等待進(jìn)入synchronized代碼塊時(shí)盟蚣,該線程會(huì)從RUNNABLE狀態(tài)變?yōu)锽LOCKED狀態(tài)黍析;當(dāng)搶占到鎖時(shí),再從BLOCKED狀態(tài)轉(zhuǎn)換到RUNNABLE狀態(tài)刁俭。

  4. WAITING(無時(shí)限等待):當(dāng)獲得鎖的線程主動(dòng)調(diào)用Object.wait()時(shí)橄仍,線程會(huì)從RUNNABLE狀態(tài)進(jìn)入WAITING狀態(tài)直到被喚醒。還有一種情況是線程調(diào)用LockSupport.park()從RUNNABLE狀態(tài)進(jìn)入WAITING狀態(tài)牍戚,并發(fā)包中的Lock就是依賴LockSupport實(shí)現(xiàn)的侮繁,如果要將線程恢復(fù)到RUNNABLE狀態(tài),則需調(diào)用LockSupport.unpark(Thread thread)方法如孝。

  5. TIMED_WAITING(有時(shí)限等待):TIMED_WAITING狀態(tài)與WAITING狀態(tài)唯一的區(qū)別是宪哩,線程調(diào)用方法進(jìn)入該狀態(tài)時(shí)多了時(shí)間參數(shù)。例如Thread.sleep(long millis), Object.wait(long timeout), LockSupport.parkNanos(Object blocker, long deadline)等方法第晰。

  6. TERMINATED(終止?fàn)顟B(tài)):線程的run()方法執(zhí)行完后锁孟,或者出現(xiàn)未捕獲的異常就會(huì)進(jìn)入TERMINATED狀態(tài)。如果想要主動(dòng)終止一個(gè)線程茁瘦,可以調(diào)用Thread.interrupt()方法通知線程停止品抽,其他類似Thread.stop()這樣的方法因?yàn)榘踩詥栴}已經(jīng)被棄用了。

如何正確地停止線程甜熔?
上面提到通過Thread.interrupt()方法通知線程結(jié)束運(yùn)行圆恤,該方法并不會(huì)強(qiáng)行停止線程,它只是為線程添加了一個(gè)標(biāo)志位表示該線程應(yīng)該結(jié)束了腔稀。為什么這么做呢盆昙?因?yàn)槲覀兏M€程收到通知后進(jìn)行收尾工作再停止(例如釋放Lock)羽历,這樣可以保證共享資源狀態(tài)的一致性,更加安全淡喜,而不是像已被棄用的Thread.stop()方法那樣強(qiáng)制停止線程秕磷。

下面來看如何處理中斷異常(InterruptedException)和停止線程,當(dāng)對(duì)線程調(diào)用Thread.interrupt()方法后炼团,RUNNABLE狀態(tài)下的線程可以通過Thread.currentThread().isInterrupted()檢查自己的中斷標(biāo)志位澎嚣,如果發(fā)現(xiàn)自己被中斷,則進(jìn)行退出工作们镜。但是如果線程處于WAITING狀態(tài)币叹,那么當(dāng)前線程會(huì)拋出異常并清除中斷標(biāo)志位润歉,以下方法可以響應(yīng)中斷并拋出中斷異常模狭。

Object.wait() / Object.wait(long) / Object.wait(long,int)
Thread.sleep(long) / Thread.sleep(long,int)
Thread.join() / Thread.join(long) / Thread.join(long,int)
java.util.concurrent.BlockingQueue.take() / put(E)
java.util.concurrent.locks.Lock.lockInterruptibly()
java.util.concurrent.CountDownLatch.await()
java.util.concurrent.CyclicBarrier.await()
java.util.concurrent.Rxchanger.exchange(V)
java.nio.channels.InterruptibleChannel相關(guān)方法
java.nio.channels.Selector的相關(guān)方法

對(duì)于RUNNABLE狀態(tài)下的線程,可以在循環(huán)中檢查自己的狀態(tài)踩衩,發(fā)現(xiàn)被中斷后進(jìn)行收尾工作嚼鹉。

Thread thread = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // 業(yè)務(wù)邏輯
    }
    // 收尾工作
});

如果線程處于WAITING狀態(tài)并響應(yīng)了中斷異常,在方法內(nèi)的話一般選擇拋出驱富,讓上層處理锚赤;如果必須處理的話,則需要重置線程的中斷標(biāo)志褐鸥,以便讓線程正常退出线脚,如下所示。

Thread thread = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // 業(yè)務(wù)邏輯
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            // 捕獲中斷異常會(huì)清除中斷標(biāo)志, 需要重新設(shè)置
            Thread.currentThread().interrupt();
            // 收尾工作...
        }
    }
});

七叫榕、死鎖

7.1 死鎖的產(chǎn)生條件

死鎖是指兩個(gè)線程都需要多個(gè)資源浑侥,但是每個(gè)線程只占有了其中一部分并請求另外的資源,此時(shí)就會(huì)產(chǎn)生循環(huán)等待晰绎,導(dǎo)致兩個(gè)線程都無法運(yùn)行寓落。在《操作系統(tǒng)》這門課中,歸納了4個(gè)產(chǎn)生死鎖的必要條件:
① 資源互斥:共享資源在同一時(shí)刻只能被一個(gè)線程占有荞下。
② 占有資源并等待:占有資源的線程不會(huì)釋放伶选,并請求其余資源。
③ 不可搶占:線程無法搶奪其余線程占有的資源尖昏。
④ 循環(huán)等待:每個(gè)線程都在等待其余線程占有的資源仰税。

死鎖示意.png

7.2 死鎖的解決方式

7.2.1 使用Lock

使用synchronized關(guān)鍵字進(jìn)行同步時(shí),線程占有資源后不會(huì)釋放抽诉,如果在申請其他資源時(shí)阻塞陨簇,就有出現(xiàn)死鎖的風(fēng)險(xiǎn)。而使用Lock.tryLock()方法掸鹅,線程嘗試獲取資源失敗時(shí)會(huì)直接返回false塞帐,開發(fā)人員可以使線程釋放之前占有的資源拦赠。也可以使用Lock.tryLock(long time, TimeUnit unit)方法,讓線程嘗試在指定時(shí)間內(nèi)獲取某個(gè)資源葵姥,如果失敗則返回false荷鼠,開發(fā)人員可以根據(jù)返回值進(jìn)一步處理。

7.2.2 統(tǒng)一資源獲取順序

出現(xiàn)死鎖的代碼榔幸,一定是兩個(gè)線程循環(huán)等待允乐,這是因?yàn)閮蓚€(gè)線程獲取資源的順序不一樣,如下所示削咆。線程t1先嘗試獲取a資源牍疏,再嘗試獲取b資源;而線程t2先嘗試獲取b資源拨齐,再嘗試獲取a資源鳞陨,這就有死鎖的風(fēng)險(xiǎn)。如果這兩個(gè)線程獲取資源的順序相等瞻惋,就不可能發(fā)生死鎖了厦滤。

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (a) {
                    ......
                    synchronized (b) {
                        ......
                    }
                }
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (b) {
                    ......
                    synchronized (a) {
                        ......
                    }
                }
            }
        });

八、線程池

線程池遵循典型的"生產(chǎn)者-消費(fèi)者"模型歼狼,生產(chǎn)者是線程池的使用者掏导,消費(fèi)者是線程池本身。提到"生產(chǎn)者-消費(fèi)者"模型羽峰,我們很容易聯(lián)想到阻塞隊(duì)列趟咆,實(shí)際上線程池就是依賴阻塞隊(duì)列實(shí)現(xiàn)的。在線程池中梅屉,生產(chǎn)者提交的任務(wù)會(huì)被添加到阻塞隊(duì)列中值纱,如果阻塞隊(duì)列已滿,則生產(chǎn)者阻塞履植;消費(fèi)者會(huì)不斷從阻塞隊(duì)列中取出任務(wù)執(zhí)行计雌,如果阻塞隊(duì)列為空,則消費(fèi)者阻塞玫霎。

利用3.2.1中實(shí)現(xiàn)的BlockQueue可以實(shí)現(xiàn)一個(gè)簡單的線程池凿滤,代碼如下所示。在測試時(shí)庶近,新建了一個(gè)具有5個(gè)工作線程的線程池翁脆,運(yùn)行后發(fā)現(xiàn)各任務(wù)執(zhí)行的順序是不確定的,因此線程池一般不用于執(zhí)行一系列耦合的任務(wù)鼻种。不過如果將工作線程數(shù)量設(shè)為1個(gè)的話反番,任務(wù)的執(zhí)行順序就是確定的,當(dāng)需要執(zhí)行一連串耦合的任務(wù)時(shí),可以新建只有1個(gè)工作線程的線程池進(jìn)行處理罢缸。

public class SampleTreadPool {

    private BlockingQueue<Runnable> mRunnableQueue;
    private List<Worker> mWorkers = new ArrayList<>();

    public SampleTreadPool(int threadSize, BlockingQueue<Runnable> queue) {
        mRunnableQueue = queue;
        for (int i = 0; i < threadSize; i++) {
            Worker worker = new Worker();
            worker.start();
            mWorkers.add(worker);
        }
    }

    public void execute(Runnable r) {
        mRunnableQueue.offer(r);
    }

    private class Worker extends Thread {
        @Override
        public void run() {
            while (true) {
                Runnable r = mRunnableQueue.take();
                r.run();
            }
        }
    }

    // 測試
    public static void main(String[] args) {
        BlockingQueue<Runnable> blockingQueue = new BlockingQueue<>(20);
        SampleTreadPool treadPool = new SampleTreadPool(5, blockingQueue);
        for (int i = 0; i < 50; i++) {
            int t = i;
            treadPool.execute(() -> {
                System.out.println("execute: " + t);
            });
        }
    }
}

上述線程池顯然無法投入實(shí)際使用篙贸,因?yàn)樗嬖诤芏鄦栴}:
① 在阻塞隊(duì)列已滿的情況下提交任務(wù),該線程池會(huì)阻塞主線程枫疆。雖然可以使用無界的阻塞隊(duì)列爵川,但是當(dāng)任務(wù)數(shù)量過多時(shí),可能會(huì)造成OOM息楔。
② 該線程池中工作線程的數(shù)量是固定的寝贡,我們希望能設(shè)置工作線程數(shù)量的最大值和最小值,讓它能夠隨系統(tǒng)的運(yùn)行情況靈活地增加或減少值依。
③ 當(dāng)線程運(yùn)行出現(xiàn)異常時(shí)圃泡,線程池沒有容錯(cuò)機(jī)制。
④ 該線程池只能提交Runnable這種無返回值的任務(wù)愿险,無法處理Future<T>這類有返回值的任務(wù)颇蜡。

8.1 Java線程池的使用

Java線程池中最核心的實(shí)現(xiàn)是ThreadPoolExecutor,它定義了線程池的5種狀態(tài)如下拯啦。

  • RUNNING: 線程池正在運(yùn)行澡匪,接受新任務(wù)并處理已經(jīng)提交的任務(wù)。
  • SHUTDOWN: 線程池即將停止褒链,此時(shí)不接受新任務(wù),但是會(huì)處理已經(jīng)提交的任務(wù)疑苔。調(diào)用線程池的shutdown()方法會(huì)使其狀態(tài)從RUNNING變?yōu)镾HUTDOWN甫匹。
  • STOP: 線程池即將停止,此時(shí)不接受新任務(wù)惦费,也不處理已經(jīng)提交的任務(wù)兵迅,并會(huì)中斷正在處理的任務(wù)。調(diào)用線程池的shutdownNow()方法會(huì)使其狀態(tài)從(RUNNING/SHUTDOWN)變?yōu)镾TOP薪贫。
  • TIDYING: 線程池已經(jīng)停止運(yùn)行恍箭,工作線程為0,此時(shí)會(huì)執(zhí)行用戶重載的terminated()函數(shù)扎唾。
  • TERMINATED: terminated()函數(shù)運(yùn)行完畢新翎,線程池徹底停止拓型。
8.1.1 新建線程池

下面來看如何新建一個(gè)線程池,ThreadPoolExecutor中重載了4個(gè)構(gòu)造函數(shù)交洗,最完整的構(gòu)造函數(shù)有7個(gè)參數(shù),如下所示橡淑。

    /**
     * @param corePoolSize: 核心線程數(shù)量构拳,指線程池中最少的線程數(shù)量,即使這些線程空閑
     *        但如果設(shè)置了 allowCoreThreadTimeOut, 核心線程也可以回收
     * @param maximumPoolSize: 線程池中允許存在的最大線程數(shù)
     * @param keepAliveTime: 當(dāng)前線程數(shù)量大于核心線程數(shù)量時(shí)
     *        如果等待該時(shí)間后仍然沒有新任務(wù), 則回收空閑線程
     * @param unit: keepAliveTime參數(shù)的單位
     * @param workQueue: 任務(wù)隊(duì)列, 這是一個(gè)阻塞隊(duì)列
     *        用于保存用戶提交但尚未執(zhí)行的 Runnable
     * @param threadFactory: 線程池新建線程時(shí)使用的 ThreadFactory
     * @param handler: 任務(wù)拒絕策略, 用于在線程數(shù)量和任務(wù)隊(duì)列都已滿時(shí)拒絕新任務(wù)
     *        線程池提供了以下四種策略: 
     *        CallerRunsPolicy: 提交任務(wù)的線程自己執(zhí)行
     *        AbortPolicy: 默認(rèn)策略, 拋出RejectedExecutionException
     *        DiscardPolicy: 靜默丟棄任務(wù)
     *        DiscardOldestPolicy: 丟棄任務(wù)隊(duì)列中最老的任務(wù)并添加新任務(wù)
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ......
    }

為了避免OOM,在新建線程池時(shí)置森,強(qiáng)烈建議指定線程池的maximumPoolSize斗埂,否則在任務(wù)繁忙時(shí)會(huì)出現(xiàn)線程數(shù)暴增的情況;同時(shí)也建議使用有界的阻塞隊(duì)列凫海,不然阻塞隊(duì)列的無限制增長也會(huì)增加OOM的風(fēng)險(xiǎn)蜜笤。

8.1.2 提交任務(wù)

ExecutorService接口中定義了線程池提交任務(wù)的方法,如下所示盐碱。

// 提交一個(gè)Runnable, 該任務(wù)無返回值, 也無法查詢它的運(yùn)行情況
void execute(Runnable command);
// 提交一個(gè)Callable, 該任務(wù)有返回值, 并且可以查詢它的運(yùn)行情況
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
// 提交一個(gè)Runnable任務(wù)把兔,可以通過返回的Future查詢其運(yùn)行情況,返回null時(shí)表示運(yùn)行完畢
Future<?> submit(Runnable task);
// 執(zhí)行給定的任務(wù), 當(dāng)所有任務(wù)完成(或超時(shí)), 返回一個(gè)保存其狀態(tài)和結(jié)果的Future列表
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit);
// 執(zhí)行給定的任務(wù)瓮顽,當(dāng)任一任務(wù)完成(或超時(shí)), 返回對(duì)應(yīng)結(jié)果, 其余任務(wù)會(huì)被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit);

需要注意的是县好,如果調(diào)用Future.get()方法,調(diào)用線程會(huì)被阻塞暖混,直到Future任務(wù)返回結(jié)果缕贡,因此可以選擇Future.get(long timeout, TimeUnit unit)設(shè)置最大等待時(shí)間,也可以在獲取結(jié)果前先調(diào)用Future.isDone()查詢其運(yùn)行情況拣播。

8.1.3 關(guān)閉線程池

在介紹線程池的狀態(tài)時(shí)我們提到了shutdown()shutdownNow()這兩個(gè)方法晾咪,這兩個(gè)方法都用于關(guān)閉線程池,不過處理上有所不同:shutdown()方法會(huì)讓線程池不再接受新的任務(wù)贮配,但是會(huì)繼續(xù)處理已經(jīng)開始運(yùn)行的任務(wù)谍倦;而shutdownNow()方法不僅會(huì)讓線程池不再接受新的任務(wù),也會(huì)去中斷當(dāng)前正在執(zhí)行任務(wù)的線程泪勒。

關(guān)閉線程池可以使用如下的兩段式關(guān)閉法昼蛀,調(diào)用shutdown()后等待尚未完成的任務(wù)繼續(xù)運(yùn)行一段時(shí)間,如果等待后還沒運(yùn)行完圆存,則調(diào)用shutdownNow()中斷這些線程叼旋。

這里用到了awaitTermination()方法,它會(huì)阻塞一段時(shí)間沦辙,直到線程池中的任務(wù)全部運(yùn)行完或者等待超時(shí)夫植,如果線程池的所有任務(wù)運(yùn)行完則返回true,否則返回false油讯。需要注意的是详民,該方法會(huì)阻塞調(diào)用線程

    void shutdownAndAwaitTermination(ExecutorService pool) {
        // 拒絕新任務(wù)的提交
        pool.shutdown();
        try {
            // 等待一段時(shí)間, 讓當(dāng)前任務(wù)繼續(xù)執(zhí)行
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                // 嘗試中斷正在運(yùn)行的線程
                pool.shutdownNow();
                // 等待一段時(shí)間, 讓尚未結(jié)束任務(wù)的線程響應(yīng)中斷
                if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // 如果當(dāng)前線程被中斷了, 再次嘗試關(guān)閉線程池
            pool.shutdownNow();
            // 重新設(shè)置中斷標(biāo)志位
            Thread.currentThread().interrupt();
        }
    }

8.2 如何確定線程池的參數(shù)

新建ThreadPoolExecutor時(shí)需要傳入多個(gè)參數(shù)撞羽,包括核心線程數(shù)阐斜、最大線程數(shù)、空閑線程保留時(shí)間以及阻塞隊(duì)列等诀紊,實(shí)際開發(fā)中應(yīng)該根據(jù)具體的業(yè)務(wù)類型來確定這一系列的參數(shù)谒出。

在項(xiàng)目中并不建議將所有的任務(wù)都放到一個(gè)線程池中去執(zhí)行隅俘,可以根據(jù)任務(wù)場景新建CPU線程池、IO線程池等笤喳,CPU線程池的大小可以固定為(CPU核心數(shù)+1)为居,而IO線程池的大小可以預(yù)設(shè)為(2 * CPU核心數(shù)+1),之后再根據(jù)IO的吞吐量進(jìn)行調(diào)整杀狡。美團(tuán)的技術(shù)博客Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐介紹了動(dòng)態(tài)修改線程池參數(shù)的實(shí)現(xiàn)蒙畴,可以根據(jù)線程池的運(yùn)行情況不斷調(diào)整參數(shù),保證系統(tǒng)的吞吐量呜象。

參考

  1. 并發(fā)編程實(shí)戰(zhàn)
  2. 維基百科-CAS
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末膳凝,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子恭陡,更是在濱河造成了極大的恐慌蹬音,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件休玩,死亡現(xiàn)場離奇詭異著淆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拴疤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門永部,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人呐矾,你說我怎么就攤上這事苔埋。” “怎么了凫佛?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵讲坎,是天一觀的道長。 經(jīng)常有香客問我愧薛,道長,這世上最難降的妖魔是什么衫画? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任毫炉,我火速辦了婚禮,結(jié)果婚禮上削罩,老公的妹妹穿的比我還像新娘瞄勾。我一直安慰自己,他們只是感情好弥激,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布进陡。 她就那樣靜靜地躺著,像睡著了一般微服。 火紅的嫁衣襯著肌膚如雪趾疚。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音糙麦,去河邊找鬼辛孵。 笑死,一個(gè)胖子當(dāng)著我的面吹牛赡磅,可吹牛的內(nèi)容都是我干的魄缚。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼焚廊,長吁一口氣:“原來是場噩夢啊……” “哼冶匹!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起咆瘟,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤嚼隘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后搞疗,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嗓蘑,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年匿乃,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了桩皿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡幢炸,死狀恐怖泄隔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宛徊,我是刑警寧澤佛嬉,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站闸天,受9級(jí)特大地震影響暖呕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜苞氮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一湾揽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧笼吟,春花似錦库物、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至撵枢,卻和暖如春民晒,著一層夾襖步出監(jiān)牢的瞬間精居,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來泰國打工镀虐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留箱蟆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓刮便,卻偏偏與公主長得像空猜,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子恨旱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容