淺談Java并發(fā)編程中的若干核心技術(shù)

作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-01】

更新日志

日期 更新內(nèi)容 備注
2017-11-01 新建文章 V1
2018-05-21 新增無(wú)鎖并發(fā)設(shè)計(jì)須知 V2

本文主要內(nèi)容索引

1、Java線程
2舟铜、線程模型
3奔则、Java線程池
4摔桦、Future(各種Future)
5诊赊、Fork/Join框架
6栋盹、volatile
7笙各、CAS(原子操作)
8唱蒸、AQS(并發(fā)同步框架)
9邦鲫、synchronized(同步鎖)
10、并發(fā)隊(duì)列(阻塞隊(duì)列)
11油宜、無(wú)鎖并發(fā)設(shè)計(jì)須知

本文僅分析java并發(fā)編程中的若干核心問(wèn)題掂碱,對(duì)于上面沒(méi)有提到但是又和java并發(fā)編程有密切關(guān)系的技術(shù)將會(huì)不斷添加進(jìn)來(lái)完善文章,本文將長(zhǎng)期更新慎冤,不斷迭代疼燥。本文試圖從一個(gè)更高的視覺(jué)來(lái)總結(jié)Java語(yǔ)言中的并發(fā)編程內(nèi)容,希望閱讀完本文之后蚁堤,可以收獲一些內(nèi)容醉者,至少應(yīng)該知道在java中做并發(fā)編程實(shí)踐的時(shí)候應(yīng)該注意什么但狭,應(yīng)該關(guān)注什么,如何保證線程安全撬即,以及如何選擇合適的工具來(lái)滿足需求立磁。當(dāng)然,更深層次的內(nèi)容就會(huì)涉及到j(luò)vm層面的知識(shí)剥槐,包括底層對(duì)java內(nèi)存的管理唱歧,對(duì)線程的管理等較為核心的問(wèn)題,當(dāng)然粒竖,本文的定位在于抽象與總結(jié)颅崩,更為具體而深入的內(nèi)容就需要自己去實(shí)踐,考慮到可能篇幅過(guò)長(zhǎng)蕊苗、重復(fù)描述某些內(nèi)容沿后,以及自身技術(shù)深度等原因,本文將在深度和廣度上做一些權(quán)衡朽砰,某些內(nèi)容會(huì)做一些深入的分析尖滚,而有些內(nèi)容會(huì)一帶而過(guò),點(diǎn)到為止瞧柔,總之漆弄,本文就當(dāng)是對(duì)學(xué)習(xí)java并發(fā)編程內(nèi)容的一個(gè)總結(jié),以及給哪些希望快速了解java并發(fā)編程內(nèi)容的讀者拋磚引玉非剃,不足之處還望指正置逻。

Java線程

一般來(lái)說(shuō)推沸,在java中實(shí)現(xiàn)高并發(fā)是基于多線程編程的备绽,所謂并發(fā),也就是多個(gè)線程同時(shí)工作鬓催,來(lái)處理我們的業(yè)務(wù)肺素,在機(jī)器普遍多核心的今天,并發(fā)編程的意義極為重大宇驾,因?yàn)槲覀冇卸鄠€(gè)cpu供線程使用倍靡,如果我們的應(yīng)用依然只使用單線程模式來(lái)工作的話,對(duì)極度浪費(fèi)機(jī)器資源的课舍。所以塌西,學(xué)習(xí)java并發(fā)知識(shí)的首要問(wèn)題是:如何創(chuàng)建一個(gè)線程,并且讓這個(gè)線程做一些事情筝尾?這是java并發(fā)編程內(nèi)容的起點(diǎn)捡需,下面將分別介紹多個(gè)創(chuàng)建線程,并且讓線程做一些事情的方法筹淫。

繼承Thread類

繼承Thread類站辉,然后重寫(xiě)run方法,這是第一種創(chuàng)建線程的方法。run方法里面就是我們要做的事情饰剥,可以在run方法里面寫(xiě)我們想要在新的線程里面運(yùn)行的任務(wù)殊霞,下面是一個(gè)小例子,我們繼承了Thread類汰蓉,并且在run方法里面打印出了當(dāng)然線程的名字绷蹲,然后sleep1秒中之后就退出了:


/**
 * Created by hujian06 on 2017/10/31.
 * 
 * the demo of thread
 */
public class ThreadDemo {
    
    public static void main(String ... args) {
        
        AThread aThread = new AThread();
        
        //start the thread
        aThread.start();
        
    }
    
}

class AThread extends Thread {
    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


如果我們想要啟動(dòng)這個(gè)線程,只需要像上面代碼中那樣顾孽,調(diào)用Thread類的start方法就可以了瘸右。

實(shí)現(xiàn)Runnable接口

啟動(dòng)一個(gè)線程的第二種方法是實(shí)現(xiàn)Runnable接口,然后實(shí)現(xiàn)其run方法岩齿,將你想要在新線程里面執(zhí)行的業(yè)務(wù)代碼寫(xiě)在run方法里面太颤,下面的例子展示了這種方法啟動(dòng)線程的示例,實(shí)現(xiàn)的功能和上面的第一種示例是一樣的:


/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Runnable
 */
public class ARunnableaDemo {
    
    public static void main(String ... args) {
        
        ARunnanle aRunnanle = new ARunnanle();
        Thread thread = new Thread(aRunnanle);
        
        thread.start();
        
    }
    
}

class ARunnanle implements Runnable {

    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


在啟動(dòng)線程的時(shí)候盹沈,依然還是使用了Thread這個(gè)類龄章,只是我們?cè)跇?gòu)造函數(shù)中將我們實(shí)現(xiàn)的Runnable對(duì)象傳遞進(jìn)去了,所以在我們執(zhí)行Thread類的start方法的時(shí)候乞封,實(shí)際執(zhí)行的內(nèi)容是我們的Runnable的run方法做裙。

使用FutureTask

啟動(dòng)一個(gè)新的線程的第三種方法是使用FutureTask,下面來(lái)看一下FutureTask的類圖肃晚,就可以明白為什么可以使用FutureTask來(lái)啟動(dòng)一個(gè)新的線程了:

FutureTask的類圖

從FutureTask的類圖中可以看出锚贱,F(xiàn)utureTask實(shí)現(xiàn)了Runnable接口和Future接口,所以它兼?zhèn)銻unnable和Future兩種特性关串,下面先來(lái)看看如何使用FutureTask來(lái)啟動(dòng)一個(gè)新的線程:


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of FutureTask
 */
public class FutureTaskDemo {

    public static void main(String ... args) {
        
        ACallAble callAble = new ACallAble();
        
        FutureTask<String> futureTask = new FutureTask<>(callAble);
        
        Thread thread = new Thread(futureTask);
        
        thread.start();
        
        do {
            
        }while (!futureTask.isDone());

        try {
            String result = futureTask.get();
            
            System.out.println("Result:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

class ACallAble implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Thread-Name:" + 
                Thread.currentThread().getName();
    }
}


可以看到拧廊,使用FutureTask來(lái)啟動(dòng)一個(gè)線程之后,我們可以監(jiān)控這個(gè)線程是否完成晋修,上面的示例中主線程會(huì)一直等待這個(gè)新創(chuàng)建的線程直到它返回吧碾,其實(shí)只要是Future提供的接口,我們?cè)贔utureTask中都可以使用墓卦,這極大的方便了我們倦春,F(xiàn)uture在并發(fā)編程中的意義極為重要,F(xiàn)uture代表一個(gè)未來(lái)會(huì)發(fā)生的東西落剪,它是一種暗示睁本,一種占位符,它示意我們它可能不會(huì)立即得到結(jié)果忠怖,因?yàn)樗娜蝿?wù)還在運(yùn)行呢堰,但是我們可以得到一個(gè)對(duì)這個(gè)線程的監(jiān)控對(duì)象,我們可以對(duì)線程的執(zhí)行做一些判斷脑又,甚至是控制暮胧,比如锐借,如果我們覺(jué)得我們等了太久,并且我們覺(jué)得沒(méi)有必要再等待下去的時(shí)候往衷,就可以將這個(gè)Task取消钞翔,還有一點(diǎn)需要提到的是,F(xiàn)uture代表它可能正在運(yùn)行席舍,也可能已經(jīng)返回布轿,當(dāng)然Future更多的暗示你可以在等待這個(gè)結(jié)果的同時(shí)可以使用其他的線程做一些其他的事情,當(dāng)你真的需要這個(gè)結(jié)果的時(shí)候再來(lái)獲取就可以了来颤,這就是并發(fā)汰扭,理解這一點(diǎn)非常重要。

本小節(jié)通過(guò)介紹三種創(chuàng)建并啟動(dòng)一個(gè)新線程的方法福铅,為進(jìn)行并發(fā)編程開(kāi)了一個(gè)頭萝毛,目前,我們還只是在能創(chuàng)建多個(gè)線程滑黔,然后讓多個(gè)線程做不同個(gè)的事情的階段笆包,當(dāng)然,這是學(xué)習(xí)并發(fā)編程最為基礎(chǔ)的略荡,無(wú)論如何庵佣,現(xiàn)在,我們可以讓我們的應(yīng)用運(yùn)行多個(gè)線程了汛兜,下面的文章將會(huì)基于這個(gè)假設(shè)(一個(gè)應(yīng)用開(kāi)啟了多個(gè)線程)討論一些并發(fā)編程中值得關(guān)注的內(nèi)容巴粪。關(guān)于本小節(jié)更為詳細(xì)的內(nèi)容,可以參考文章Java CompletableFuture中的部分內(nèi)容粥谬。

線程模型

我們現(xiàn)在可以啟動(dòng)多個(gè)線程肛根,但是好像并沒(méi)有形成一種類似于模型的東西,非车畚耍混亂晶通,并且到目前為止我們的多個(gè)線程依然只是各自做各自的事情,互不相干哟玷,多個(gè)線程之間并沒(méi)有交互(通信),這是最簡(jiǎn)單的模型一也,也是最基礎(chǔ)的模型巢寡,本小節(jié)試圖介紹線程模型,一種指導(dǎo)我們的代碼組織的思想椰苟,線程模型確定了我們需要處理那些多線程的問(wèn)題抑月,在一個(gè)系統(tǒng)中,多個(gè)線程之間沒(méi)有通信是不太可能的舆蝴,更為一般的情況是谦絮,多個(gè)線程共享一些資源题诵,然后相互競(jìng)爭(zhēng)來(lái)獲取資源權(quán)限,多個(gè)線程相互配合层皱,來(lái)提高系統(tǒng)的處理能力性锭。正因?yàn)槎鄠€(gè)線程之間會(huì)有通信交互,所以本文接下來(lái)的討論才有了意義叫胖,如果我們的系統(tǒng)里面有幾百個(gè)線程在工作草冈,但是這些線程互不相干,那么這樣的系統(tǒng)要么實(shí)現(xiàn)的功能非常單一瓮增,要么毫無(wú)意義(當(dāng)然不是絕對(duì)的怎棱,比如Netty的線程模型)。

繼續(xù)來(lái)討論線程模型绷跑,上面說(shuō)到線程模型是一種指導(dǎo)代碼組織的思想拳恋,這是我自己的理解,不同的線程模型需要我們使用不同的代碼組織砸捏,好的線程模型可以提高系統(tǒng)的并發(fā)度诅岩,并且可以使得系統(tǒng)的復(fù)雜度降低,這里需要提一下Netty 4的線程模型带膜,Netty 4的線程模型使得我們可以很容易的理解Netty的事件處理機(jī)制吩谦,這種優(yōu)秀的設(shè)計(jì)基于Reactor線程模型,Reactor線程模型分為單線程模型膝藕、多線程模型以及主從多線程模型式廷,Netty的線程模型類似于Reactor主從多線程模型。

當(dāng)然線程模型是一種更高級(jí)別的并發(fā)編程內(nèi)容芭挽,它是一種編程指導(dǎo)思想滑废,尤其在我們進(jìn)行底層框架設(shè)計(jì)的時(shí)候特別需要注意線程模型,因?yàn)橐坏┚€程模型設(shè)計(jì)不合理袜爪,可能會(huì)導(dǎo)致后面框架代碼過(guò)于復(fù)雜蠕趁,并且可能因?yàn)榫€程同步等問(wèn)題造成問(wèn)題不可控,最終導(dǎo)致系統(tǒng)運(yùn)行失控辛馆。類似于Netty的線程模型是一種好的線程模型俺陋,下面展示了這種模型:

Netty線程模型

簡(jiǎn)單來(lái)說(shuō),Netty為每個(gè)新建立的Channel分配一個(gè)NioEventLoop昙篙,而每個(gè)NioEventLoop內(nèi)部?jī)H使用一個(gè)線程腊状,這就避免了多線程并發(fā)的同步問(wèn)題,因?yàn)闉槊總€(gè)Channel處理的線程僅有一個(gè)苔可,所以不需要使用鎖等線程同步手段來(lái)做線程同步缴挖,在我們的系統(tǒng)設(shè)計(jì)的時(shí)候應(yīng)該借鑒這種線程模型的設(shè)計(jì)思路,可以避免我們走很多彎路焚辅。關(guān)于線程池以及Netty線程池這部分的內(nèi)容映屋,可以參考文章Netty線程模型及EventLoop詳解苟鸯。

Java線程池

池化技術(shù)是一種非常有用的技術(shù),對(duì)于線程來(lái)說(shuō)棚点,創(chuàng)建一個(gè)線程的代價(jià)是很高的早处,如果我們?cè)趧?chuàng)建了一個(gè)線程,并且讓這個(gè)線程做一個(gè)任務(wù)之后就回收的話乙濒,那么下次要使用線程來(lái)執(zhí)行我們的任務(wù)的時(shí)候又需要?jiǎng)?chuàng)建一個(gè)新的線程陕赃,是否可以在創(chuàng)建完成一個(gè)線程之后一直緩沖,直到系統(tǒng)關(guān)閉的時(shí)候再進(jìn)行回收呢颁股?java線程池就是這樣的組件么库,使用線程池,就沒(méi)必要頻繁創(chuàng)建線程甘有,線程池會(huì)為我們管理線程诉儒,當(dāng)我們需要一個(gè)新的線程來(lái)執(zhí)行我們的任務(wù)的時(shí)候,就向線程池申請(qǐng)亏掀,而線程池會(huì)從池子里面找到一個(gè)空閑的線程返回給請(qǐng)求者忱反,如果池子里面沒(méi)有可用的線程,那么線程池會(huì)根據(jù)一些參數(shù)指標(biāo)來(lái)創(chuàng)建一個(gè)新的線程滤愕,或者將我們的任務(wù)提交到任務(wù)隊(duì)列中去温算,等待一個(gè)空閑的線程來(lái)執(zhí)行這個(gè)任務(wù)。細(xì)節(jié)內(nèi)容在下文中進(jìn)行分析间影,目前我們只需要明白注竿,線程池里面有很多線程,這些線程會(huì)一直到系統(tǒng)關(guān)系才會(huì)被回收魂贬,否則一直會(huì)處于處理任務(wù)或者等待處理任務(wù)的狀態(tài)巩割。

首先,如何使用線程池呢付燥?下面的代碼展示了如何使用java線程池的例子:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Executors
 */
public class ExecutorsDemo {

    public static void main(String ... args) {

        int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        AThreadFactory threadFactory = new AThreadFactory();
        ARunnanle runnanle = new ARunnanle();
        
        ExecutorService fixedThreadPool=
                Executors.newFixedThreadPool(cpuCoreCount, threadFactory);

        ExecutorService cachedThreadPool = 
                Executors.newCachedThreadPool(threadFactory);

        ScheduledExecutorService newScheduledThreadPool = 
                Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);

        ScheduledExecutorService singleThreadExecutor = 
                Executors.newSingleThreadScheduledExecutor(threadFactory);
        
        fixedThreadPool.submit(runnanle);
        cachedThreadPool.submit(runnanle);
        newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS);
        singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        fixedThreadPool.shutdownNow();
        cachedThreadPool.shutdownNow();
        newScheduledThreadPool.shutdownNow();
        singleThreadExecutor.shutdownNow();
    }

}

class ARunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("Current Thread Name:" + 
                Thread.currentThread().getName());
    }
}

/**
 * the thread factory
 */
class AThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread("aThread-" + threadNumber.incrementAndGet());
    }
}

更為豐富的應(yīng)用應(yīng)該自己去探索宣谈,結(jié)合自身的需求來(lái)借助線程池來(lái)實(shí)現(xiàn),下面來(lái)分析一下Java線程池實(shí)現(xiàn)中幾個(gè)較為重要的內(nèi)容键科。

ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor和ScheduledThreadPoolExecutor是java實(shí)現(xiàn)線程池的核心類闻丑,不同類型的線程池其實(shí)就是在使用不同的構(gòu)造函數(shù),以及不同的參數(shù)來(lái)構(gòu)造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor萝嘁,所以梆掸,學(xué)習(xí)java線程池的重點(diǎn)也在于學(xué)習(xí)這兩個(gè)核心類。前者適用于構(gòu)造一般的線程池牙言,而后者繼承了前者,并且很多內(nèi)容是通用的怪得,但是ScheduledThreadPoolExecutor增加了schedule功能咱枉,也就是說(shuō)卑硫,ScheduledThreadPoolExecutor使用于構(gòu)造具有調(diào)度功能的線程池,在需要周期性調(diào)度執(zhí)行的場(chǎng)景下就可以使用ScheduledThreadPoolExecutor蚕断。關(guān)于ThreadPoolExecutor與ScheduledThreadPoolExecutor較為詳細(xì)深入的分析可以參考下面的文章:

下面展示了ThreadPoolExecutor和ScheduledThreadPoolExecutor的類圖欢伏,可以看出他們的關(guān)系,以及他們的繼承關(guān)系:

ThreadPoolExecutor類圖
ScheduledThreadPoolExecutor類圖

關(guān)于較為細(xì)節(jié)的內(nèi)容不再本文的敘述范圍之內(nèi)亿乳,如果想要了解這些內(nèi)容的詳細(xì)內(nèi)容硝拧,可以參考文章中給出的鏈接,這些文章較為深入的分析和總結(jié)了相關(guān)的內(nèi)容葛假。

上文中提到障陶,線程池會(huì)管理著一些線程,這些線程要么處于運(yùn)行狀態(tài)聊训,要么處于等待任務(wù)的狀態(tài)抱究,當(dāng)然這只是我們較為形象的描述,一個(gè)線程的狀態(tài)不僅有運(yùn)行態(tài)與等待狀態(tài)带斑,還有其他的狀態(tài)鼓寺,但是對(duì)我我們來(lái)說(shuō),線程池里面的線程確實(shí)是要么處于運(yùn)行狀態(tài)勋磕,要么處于等待任務(wù)的狀態(tài)妈候,這體現(xiàn)在,當(dāng)我們向一個(gè)線程池提交一個(gè)任務(wù)的時(shí)候挂滓,可能會(huì)被等待任務(wù)的線程立即執(zhí)行苦银,但是可能線程池里面的線程都處于忙碌狀態(tài),那么我們提交的任務(wù)就會(huì)被加入到等待運(yùn)行的任務(wù)隊(duì)列中去杂彭,當(dāng)有空閑線程了墓毒,或者隊(duì)列也滿了,那么線程池就會(huì)采用一些策略來(lái)執(zhí)行任務(wù)亲怠,并且在某些時(shí)刻會(huì)拒絕提交的任務(wù)所计,這些細(xì)節(jié)都可以在ThreadPoolExecutor的實(shí)現(xiàn)中找到。

在線程池的實(shí)現(xiàn)中团秽,有一個(gè)角色特別重要主胧,那就是任務(wù)隊(duì)列,當(dāng)線程池里面沒(méi)有空閑的線程來(lái)執(zhí)行我們的任務(wù)的時(shí)候习勤,我們的任務(wù)就會(huì)被添加到任務(wù)隊(duì)列中去等待執(zhí)行踪栋,而這個(gè)任務(wù)隊(duì)列可能會(huì)被多個(gè)線程并發(fā)讀寫(xiě),所以需要支持多線程安全訪問(wèn)图毕,java提供了一類支持并發(fā)環(huán)境的隊(duì)列夷都,稱為阻塞隊(duì)列,這是一類特殊的隊(duì)列予颤,他們的使用時(shí)非常廣泛的,特別是在jdk自身的類庫(kù)建設(shè)上,當(dāng)然在我們實(shí)際的工作中也是有很多使用場(chǎng)景的猪钮。

關(guān)于ThreadPoolExecutor是如何處理一個(gè)提交的任務(wù)的細(xì)節(jié),可以參考下面的代碼:


   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

下面來(lái)看一下java中借助ThreadPoolExecutor來(lái)構(gòu)造的幾個(gè)線程池的特性:

1肝陪、newFixedThreadPool

使用ThreadPoolExecutor構(gòu)造一個(gè)newCachedThreadPool的流程如下:


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }    

在任意時(shí)刻,newFixedThreadPool構(gòu)造出來(lái)的線程池中最多只可能存活著nThreads個(gè)線程刑顺,如果所有的線程都在運(yùn)行任務(wù)氯窍,那么這個(gè)時(shí)候提交的任務(wù)將會(huì)被添加到任務(wù)隊(duì)列中去等待執(zhí)行。我們可以控制corePoolSize和maximumPoolSize來(lái)使得通過(guò)ThreadPoolExecutor構(gòu)造出來(lái)的線程池具有一些不一樣的特性蹲堂,但是需要注意的是狼讨,當(dāng)我們?cè)O(shè)置的maximumPoolSize大于corePoolSize的時(shí)候,如果當(dāng)前線程池里面的線程數(shù)量已經(jīng)達(dá)到了corePoolSize了贯城,并且當(dāng)前所以線程都處于運(yùn)行任務(wù)的狀態(tài)熊楼,那么在這個(gè)時(shí)候提交的任務(wù)會(huì)被添加到任務(wù)隊(duì)列中去,只有在任務(wù)隊(duì)列滿了的時(shí)候能犯,才會(huì)去創(chuàng)建新的線程鲫骗,如果線程數(shù)量已經(jīng)達(dá)到了maximumPoolSize了,那么到此就會(huì)拒絕提交的任務(wù)踩晶,這些流程可以參考上面展示出來(lái)的execute方法的實(shí)現(xiàn)执泰。該類型的線程池使用的任務(wù)隊(duì)列是LinkedBlockingQueue類型的阻塞隊(duì)列。

2渡蜻、newCachedThreadPool

通過(guò)ThreadPoolExecutor構(gòu)造一個(gè)newCachedThreadPool線程池的流程如下:


    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
        

newCachedThreadPool適合于類似秒殺系統(tǒng)中术吝,它可以按需創(chuàng)建線程。每個(gè)線程在空閑了一段時(shí)間之后會(huì)被回收茸苇,然后需要?jiǎng)?chuàng)建的時(shí)候再創(chuàng)建出來(lái)排苍,在使用的時(shí)候應(yīng)該使用合適的構(gòu)造參數(shù)。該類型使用的任務(wù)隊(duì)列是SynchronousQueue這種同步隊(duì)列学密,這是一種特別的隊(duì)列淘衙,每個(gè)線程都是有使命的,每個(gè)線程都會(huì)等待另外一個(gè)線程和自己交易腻暮,在交易完成之前都會(huì)阻塞住線程彤守,他們之間有一種傳遞關(guān)系,數(shù)據(jù)是從一個(gè)線程直接傳遞到例外一個(gè)線程中去的哭靖,SynchronousQueue這種隊(duì)列不存儲(chǔ)實(shí)際的數(shù)據(jù)具垫,而是存儲(chǔ)著一些線程的信息,而SynchronousQueue管理著這些線程之間的交易试幽,更為詳細(xì)的細(xì)節(jié)參考后面的文章筝蚕。

上面提到,ScheduleThreadPoolExecutor是繼承自ThreadPoolExecutor的,而且從類圖中也可以看出來(lái)這種關(guān)系饰及,所以其實(shí)ScheduleThreadPoolExecutor是對(duì)ThreadPoolExecutor的增強(qiáng)蔗坯,它增加了schedule功能康震,使用與那些需要周期性調(diào)度執(zhí)行燎含,或者是延時(shí)執(zhí)行的任務(wù),在ScheduleThreadPoolExecutor中使用了一種阻塞隊(duì)列稱為延時(shí)阻塞隊(duì)列腿短,這種隊(duì)列有能力持有一段時(shí)間數(shù)據(jù)屏箍,我們可以設(shè)定這種時(shí)間,時(shí)間沒(méi)到的時(shí)候嘗試獲取數(shù)據(jù)的線程會(huì)被阻塞橘忱,直到設(shè)定的時(shí)間到了赴魁,線程才會(huì)被喚醒來(lái)消費(fèi)數(shù)據(jù)。而關(guān)于ScheduleThreadPoolExecutor是如何運(yùn)作的钝诚,包括他的周期性任務(wù)調(diào)度是如何工作的颖御,可以參考上面提到的鏈接。

Future

Future代表一種未來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的事情凝颇,在并發(fā)環(huán)境下使用Future是非常重要的潘拱,使用Future的前提是我們可以容許線程執(zhí)行一段時(shí)間來(lái)完成這個(gè)任務(wù),但是需要在我們提交了任務(wù)的時(shí)候就返回一個(gè)Future拧略,這樣在接下來(lái)的時(shí)間程序員可以根據(jù)實(shí)際情況來(lái)取消任務(wù)或者獲取任務(wù)芦岂,在多個(gè)任務(wù)沒(méi)有相互依賴關(guān)系的時(shí)候,使用Future可以實(shí)現(xiàn)多線程的并發(fā)執(zhí)行垫蛆,多個(gè)線程可以執(zhí)行在不同的處理器上禽最,然后在某個(gè)時(shí)間點(diǎn)來(lái)統(tǒng)一獲取結(jié)果就可以了。上文中已經(jīng)提到了FutureTask袱饭,F(xiàn)utureTask既是一種Runnable川无,也是一種Future,并且結(jié)合了兩種類型的特性虑乖。下面展示了Future提供的一些方法懦趋,使用這些方法可以很方便的進(jìn)行任務(wù)控制:


public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在java 8中增加了一個(gè)新的類CompletableFuture,這是對(duì)Future的極大增強(qiáng)决左,CompletableFuture提供了非常豐富的操作可以來(lái)控制我們的任務(wù)愕够,并且可以根據(jù)多種規(guī)則來(lái)關(guān)聯(lián)多個(gè)Future。關(guān)于CompletableFuture的詳細(xì)分析總結(jié)可以參考文章Java CompletableFuture佛猛。

Fork/Join框架

Fork/Join框架是一種并行框架惑芭,它可以將一個(gè)較大的任務(wù)切分成一些小任務(wù)來(lái)執(zhí)行,并且多個(gè)線程之間會(huì)相互配合继找,每個(gè)線程都會(huì)有一個(gè)任務(wù)隊(duì)列遂跟,對(duì)于某些線程來(lái)說(shuō)它們可能很快完成了自己的任務(wù)隊(duì)列中的任務(wù),但是其他的線程還沒(méi)有完成,那么這些線程就會(huì)去竊取那些還沒(méi)有完成任務(wù)執(zhí)行的線程的任務(wù)來(lái)執(zhí)行幻锁,這成為“工作竊取”算法凯亮,關(guān)于Fork/Join中的工作竊取,其實(shí)現(xiàn)還是較為復(fù)雜的哄尔,如果想對(duì)Fork/Join框架有一個(gè)大致的認(rèn)識(shí)假消,可以參考文章Java Fork/Join并行框架。下面展示了Fork/Join框架的工作模式:

Fork/Join工作模式

可以從上面的圖中看出岭接,一個(gè)較大的任務(wù)會(huì)被切分為一個(gè)小任務(wù)富拗,并且小任務(wù)還會(huì)繼續(xù)切分,直到符合我們?cè)O(shè)定的執(zhí)行閾值鸣戴,然后就會(huì)執(zhí)行啃沪,執(zhí)行完成之后會(huì)進(jìn)行join,也就是將小任務(wù)的結(jié)果組合起來(lái)窄锅,組裝出我們提交的整個(gè)任務(wù)的結(jié)果创千,這是一種非常先進(jìn)的工作模式,非常有借鑒意義入偷。當(dāng)然追驴,使用Fork/Join框架的前提是我們的任務(wù)時(shí)可以拆分成小任務(wù)來(lái)執(zhí)行的,并且小人物的結(jié)果可以組裝出整個(gè)大任務(wù)的結(jié)果盯串,歸并排序是一種可以借助Fork/Join框架來(lái)提供處理速度的算法氯檐,下面展示了使用Fork/Join框架來(lái)執(zhí)行歸并排序的代碼,可以試著調(diào)整參數(shù)來(lái)進(jìn)行性能測(cè)試:


import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * Created by hujian06 on 2017/10/23.
 *
 * merge sort by fork/join
 */
public class ForkJoinMergeSortDemo {

    public static void main(String ... args) {
        new Worker().runWork();
    }

}

class Worker {

    private static final boolean isDebug = false;

    public void runWork() {

        int[] array = mockArray(200000000, 1000000); // mock the data

        forkJoinCase(array);
        normalCase(array);

    }

    private void printArray(int[] arr) {

        if (isDebug == false) {
            return;
        }

        for (int i = 0; i < arr.length; i ++) {
            System.out.print(arr[i] + " ");
        }

        System.out.println();
    }

    private void forkJoinCase(int[] array) {
        ForkJoinPool pool = new ForkJoinPool();

        MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);

        long start = System.currentTimeMillis();

        pool.invoke(mergeSortTask);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private void normalCase(int[] array) {

        long start = System.currentTimeMillis();

        new MergeSortWorker().sort(array, 0, array.length - 1);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private static final  int[] mockArray(int length, int up) {
        if (length <= 0) {
            return null;
        }

        int[] array = new int[length];

        Random random = new Random(47);

        for (int i = 0; i < length; i ++) {
            array[i] = random.nextInt(up);
        }

        return array;
    }
}

class MergeSortTask extends RecursiveAction {

    private static final int threshold = 100000;
    private final MergeSortWorker mergeSortWorker = new MergeSortWorker();

    private int[] data;

    private int left;
    private int right;

    public MergeSortTask(int[] array, int l, int r) {
        this.data = array;
        this.left = l;
        this.right = r;
    }

    @Override
    protected void compute() {
        if (right - left < threshold) {
            mergeSortWorker.sort(data, left, right);
        } else {
            int mid = left + (right - left) / 2;
            MergeSortTask l = new MergeSortTask(data, left, mid);
            MergeSortTask r = new MergeSortTask(data, mid + 1, right);

            invokeAll(l, r);

            mergeSortWorker.merge(data, left, mid, right);
        }
    }
}

class MergeSortWorker {

    // Merges two subarrays of arr[].
    // First subarray is arr[l..m]
    // Second subarray is arr[m+1..r]
    void merge(int arr[], int l, int m, int r) {
        // Find sizes of two subarrays to be merged
        int n1 = m - l + 1;
        int n2 = r - m;

        /* Create temp arrays */
        int L[] = new int[n1];
        int R[] = new int[n2];

        /*Copy data to temp arrays*/
        for (int i = 0; i < n1; ++i)
            L[i] = arr[l + i];
        for (int j = 0; j < n2; ++j)
            R[j] = arr[m + 1 + j];

        /* Merge the temp arrays */

        // Initial indexes of first and second subarrays
        int i = 0, j = 0;

        // Initial index of merged subarry array
        int k = l;
        while (i < n1 && j < n2) {
            if (L[i] <= R[j]) {
                arr[k ++] = L[i ++];
            } else {
                arr[k ++] = R[j ++];
            }
        }

        /* Copy remaining elements of L[] if any */
        while (i < n1) {
            arr[k ++] = L[i ++];
        }

        /* Copy remaining elements of R[] if any */
        while (j < n2) {
            arr[k ++] = R[j ++];
        }
    }

    // Main function that sorts arr[l..r] using
    // merge()
    void sort(int arr[], int l, int r) {
        if (l < r) {
            // Find the middle point
            int m = l + (r - l) / 2;

            // Sort first and second halves
            sort(arr, l, m);
            sort(arr, m + 1, r);

            // Merge the sorted halves
            merge(arr, l, m, r);
        }
    }
}

在jdk中体捏,使用Fork/Join框架的一個(gè)典型案例是Streams API冠摄,Streams API試圖簡(jiǎn)化我們的并發(fā)編程,可以使用很簡(jiǎn)單的流式API來(lái)處理我們的數(shù)據(jù)流几缭,在我們無(wú)感知的狀態(tài)下河泳,其實(shí)Streams的實(shí)現(xiàn)上借助了Fork/Join框架來(lái)實(shí)現(xiàn)了并發(fā)計(jì)算,所以強(qiáng)烈建議使用Streams API來(lái)處理我們的流式數(shù)據(jù)年栓,這樣可以充分的利用機(jī)器的多核心資源拆挥,來(lái)提高數(shù)據(jù)處理的速度。關(guān)于java的Streams API的分析總結(jié)可以參考文章Java Streams API某抓,關(guān)于Stream API是如何使用Fork/Join框架來(lái)實(shí)現(xiàn)并行計(jì)算的內(nèi)容可以參考文章Java Stream的并行實(shí)現(xiàn)纸兔。鑒于Fork/Join框架的先進(jìn)思想,理解并且學(xué)會(huì)使用Fork/Join框架來(lái)處理我們的實(shí)際問(wèn)題是非常有必要的否副。

Java volatile關(guān)鍵字

volatile解決的問(wèn)題是多個(gè)線程的內(nèi)存可見(jiàn)性問(wèn)題汉矿,在并發(fā)環(huán)境下,每個(gè)線程都會(huì)有自己的工作空間备禀,每個(gè)線程只能訪問(wèn)各自的工作空間洲拇,而一些共享變量會(huì)被加載到每個(gè)線程的工作空間中奈揍,所以這里面就有一個(gè)問(wèn)題,內(nèi)存中的數(shù)據(jù)什么時(shí)候被加載到線程的工作緩存中赋续,而線程工作空間中的內(nèi)容什么時(shí)候會(huì)回寫(xiě)到內(nèi)存中去男翰。這兩個(gè)步驟處理不當(dāng)就會(huì)造成內(nèi)存可加性問(wèn)題,也就是數(shù)據(jù)的不一致纽乱,比如某個(gè)共享變量被線程A修改了蛾绎,但是沒(méi)有回寫(xiě)到內(nèi)存中去,而線程B在加載了內(nèi)存中的數(shù)據(jù)之后讀取到的共享變量是臟數(shù)據(jù)迫淹,正確的做法應(yīng)該是線程A的修改應(yīng)該對(duì)線程B是可見(jiàn)的秘通,更為通用一些,就是在并發(fā)環(huán)境下共享變量對(duì)多個(gè)線程是一致的敛熬。

對(duì)于內(nèi)存可見(jiàn)性的一點(diǎn)補(bǔ)充是,之所以會(huì)造成多個(gè)線程看到的共享變量的值不一樣第股,是因?yàn)榫€程在占用CPU時(shí)間的時(shí)候应民,cpu為了提高處理速度不會(huì)直接和內(nèi)存交互,而是會(huì)先將內(nèi)存中的共享內(nèi)容讀取到內(nèi)部緩存中(L1夕吻,L2)诲锹,然后cpu在處理的過(guò)程中就只會(huì)和內(nèi)部緩存交互,在多核心的機(jī)器中這樣的處理方式就會(huì)造成內(nèi)存可見(jiàn)性問(wèn)題涉馅。

volatile可以解決并發(fā)環(huán)境下的內(nèi)存可見(jiàn)性問(wèn)題归园,只需要在共享變量前面加上volatile關(guān)鍵字就可以解決,但是需要說(shuō)明的是稚矿,volatile僅僅是解決內(nèi)存可見(jiàn)性問(wèn)題庸诱,對(duì)于像i++這樣的問(wèn)題還是需要使用其他的方式來(lái)保證線程安全。使用volatile解決內(nèi)存可見(jiàn)性問(wèn)題的原理是晤揣,如果對(duì)被volatile修飾的共享變量執(zhí)行寫(xiě)操作的話桥爽,JVM就會(huì)向cpu發(fā)送一條Lock前綴的指令,cpu將會(huì)這個(gè)變量所在的緩存行(緩存中可以分配的最小緩存單位)寫(xiě)回到內(nèi)存中去昧识。但是在多處理器的情況下钠四,將某個(gè)cpu上的緩存行寫(xiě)回到系統(tǒng)內(nèi)存之后,其他cpu上該變量的緩存還是舊的跪楞,這樣再進(jìn)行后面的操作的時(shí)候就會(huì)出現(xiàn)問(wèn)題缀去,所以為了使得所有線程看到的內(nèi)容都是一致的,就需要實(shí)現(xiàn)緩存一致性協(xié)議甸祭,cpu將會(huì)通過(guò)監(jiān)控總線上傳遞過(guò)來(lái)的數(shù)據(jù)來(lái)判斷自己的緩存是否過(guò)期缕碎,如果過(guò)期,就需要使得緩存失效淋叶,如果cpu再來(lái)訪問(wèn)該緩存的時(shí)候阎曹,就會(huì)發(fā)現(xiàn)緩存失效了伪阶,這時(shí)候就會(huì)重新從內(nèi)存加載緩存。
總結(jié)一下处嫌,volatile的實(shí)現(xiàn)原則有兩條:
1栅贴、JVM的Lock前綴的指令將使得cpu緩存寫(xiě)回到系統(tǒng)內(nèi)存中去
2、為了保證緩存一致性原則熏迹,在多cpu的情景下檐薯,一個(gè)cpu的緩存回寫(xiě)內(nèi)存會(huì)導(dǎo)致其他的cpu上的緩存都失效,再次訪問(wèn)會(huì)重新從系統(tǒng)內(nèi)存加載新的緩存內(nèi)容注暗。

原子操作CAS

原子操作表達(dá)的意思是要么一個(gè)操作成功坛缕,要么失敗,中間過(guò)程不會(huì)被其他的線程中斷捆昏,這一點(diǎn)對(duì)于并發(fā)編程來(lái)說(shuō)非常重要赚楚,在java中使用了大量的CAS來(lái)做并發(fā)編程,包括jdk的ConcurrentHsahMap的實(shí)現(xiàn)骗卜,還有AtomicXXX的實(shí)現(xiàn)等其他一些并發(fā)工具的實(shí)現(xiàn)都使用了CAS這種技術(shù)宠页,CAS包括兩部分,也就是Compare and swap寇仓,首先是比較举户,然后再交互,這樣做的原因是遍烦,在并發(fā)環(huán)境下俭嘁,可能不止一個(gè)線程想要來(lái)改變某個(gè)共享變量的值,那么在進(jìn)行操作之前使用一個(gè)比較服猪,而這個(gè)比較的值是當(dāng)前線程認(rèn)為(知道)該共享變量最新的值供填,但是可能其他線程已經(jīng)改變了這個(gè)值,那么此時(shí)CAS操作就會(huì)失敗蔓姚,只有在共享變量的值等于線程提供的用于比較的值的時(shí)候才會(huì)進(jìn)行原子改變操作捕虽。

java中有一個(gè)類是專門用于提供CAS操作支持的,那就是Unsafe類坡脐,但是我們不能直接使用Unsafe類泄私,因?yàn)閁nsafe類提供的一些底層的操作,需要非常專業(yè)的人才能使用好备闲,并且Unsafe類可能會(huì)造成一些安全問(wèn)題晌端,所以不建議直接使用Unsafe類,但是如果想使用Unsafe類的話還是有方法的恬砂,那就是通過(guò)反射來(lái)獲取Unsafe實(shí)例咧纠,類似于下面的代碼:


class UnsafeHolder {

    private static Unsafe U = null;

    public static Unsafe getUnsafe() {
        if (U == null) {
            synchronized (UnsafeHolder.class) {
                if (U == null) {

                    List<Exception> exception = null;
                    try {
                        Field field = Unsafe.class.getDeclaredField("theUnsafe");

                        field.setAccessible(true);

                        try {
                            U = (Unsafe) field.get(null);
                        } catch (IllegalAccessException e) {

                            exception.add(e);
                        }
                    } catch (NoSuchFieldException e) {

                        exception.add(e);
                    } finally {

                        if (exception != null) {
                            reportException(exception);
                        }

                    }
                }
            }
        }

        return U;
    }

    /**
     * handler the exception in this method .
     * @param e The exception
     */
    private static void reportException(List<Exception> e) {
        e.forEach(System.out::println);
    }

}

如果想要了解Unsafe類到底提供了哪些較為底層的操作,可以直接參考Unsafe的源碼泻骤。CAS操作解決了原子操作問(wèn)題漆羔,只要進(jìn)行操作梧奢,CAS就會(huì)保證操作會(huì)成功,不會(huì)被中斷演痒,這是一種非常好非常強(qiáng)大的特性亲轨,下面就java 8中的ConcurrentHashMap的size實(shí)現(xiàn)來(lái)談?wù)凜AS操作在并發(fā)環(huán)境下的使用案例。

在java 7中鸟顺,ConcurrentHashMap的實(shí)現(xiàn)是基于分段鎖協(xié)議的實(shí)現(xiàn)惦蚊,本質(zhì)上還是使用了鎖,只是基于一種考慮讯嫂,就是多個(gè)線程訪問(wèn)哈希桶具有隨機(jī)性蹦锋,基于這種考慮來(lái)將數(shù)據(jù)存儲(chǔ)在不同的哈希段上面,然后每一個(gè)段配有一把鎖欧芽,在需要寫(xiě)某個(gè)段的時(shí)候需要加鎖莉掂,而在這個(gè)時(shí)候,其他訪問(wèn)其他段的線程是不需要阻塞的渐裸,但是對(duì)于該段的線程訪問(wèn)就需要等待巫湘,直到這個(gè)加鎖的線程釋放了鎖,其他線程才能進(jìn)行訪問(wèn)昏鹃。在java 8中,ConcurrentHashMap的實(shí)現(xiàn)拋棄了這種復(fù)雜的架構(gòu)設(shè)計(jì)诀诊,但是繼承了這種分散線程競(jìng)爭(zhēng)壓力的思想洞渤,其實(shí)就提高系統(tǒng)的并發(fā)度這一維度來(lái)說(shuō),分散競(jìng)爭(zhēng)壓力是一種最為直接明了的解決方案属瓣,而java 8在實(shí)現(xiàn)ConcurrentHashMap的時(shí)候大量使用了CAS操作载迄,減少了使用鎖的頻度來(lái)提高系統(tǒng)的響應(yīng)度,其實(shí)使用鎖和使用CAS來(lái)做并發(fā)在復(fù)雜度上不是一個(gè)數(shù)量級(jí)的抡蛙,使用鎖在很大程度上假設(shè)了多個(gè)線程的排斥性护昧,并且使用鎖會(huì)將線程阻塞等待,也就是說(shuō)使用鎖來(lái)做線程同步的時(shí)候粗截,線程的狀態(tài)是會(huì)改變的惋耙,但是使用CAS是不會(huì)改變線程的狀態(tài)的(不太嚴(yán)謹(jǐn)?shù)恼f(shuō)),所以使用CAS比起使用synchronized或者使用Lcok來(lái)說(shuō)更為輕量級(jí)熊昌。

現(xiàn)在就ConcurrentHashMap的size方法來(lái)分析一下如何將線程競(jìng)爭(zhēng)的壓力分散出去绽榛。在java 7的實(shí)現(xiàn)上,在調(diào)用size方法之后婿屹,ConcurrentHashMap會(huì)進(jìn)行兩次對(duì)哈希桶中的記錄累加的操作灭美,這兩次累加的操作是不加鎖的,然后判斷兩次結(jié)果是否一致昂利,如果一致就說(shuō)明目前的系統(tǒng)是讀多寫(xiě)少的場(chǎng)景届腐,并且可能目前沒(méi)有線程競(jìng)爭(zhēng)铁坎,所以直接返回就可以,這就避免了使用鎖犁苏,但是如果兩次累加結(jié)果不一致硬萍,那就說(shuō)明此時(shí)可能寫(xiě)的線程較多,或者線程競(jìng)爭(zhēng)較為嚴(yán)重傀顾,那么此時(shí)ConcurrentHashMap就會(huì)進(jìn)行一個(gè)重量級(jí)的操作襟铭,對(duì)所有段進(jìn)行加鎖,然后對(duì)每一個(gè)段進(jìn)行記錄計(jì)數(shù)短曾,然后求得最終的結(jié)果返回寒砖。在最有情況下,size方法需要做兩次累加計(jì)數(shù)嫉拐,最壞情況需要三次哩都,并且會(huì)涉及全局加鎖這種重量級(jí)的加鎖操作,性能肯定是不高的婉徘。而在java 8的實(shí)現(xiàn)上漠嵌,ConcurrentHashMap的size方法實(shí)際上是與ConcurrentHashMap是解耦的,size方法更像是接入了一個(gè)額外的并發(fā)計(jì)數(shù)系統(tǒng)盖呼,在進(jìn)行size方法調(diào)用的時(shí)候是不會(huì)影響數(shù)據(jù)的存取的儒鹿,這其實(shí)是一種非常先進(jìn)的思想,就是一個(gè)系統(tǒng)模塊化几晤,然后模塊可以進(jìn)行更新约炎,系統(tǒng)解耦,比如java 8中接入了并發(fā)計(jì)數(shù)組件Striped64來(lái)作為size方法的支撐蟹瘾,可能未來(lái)出現(xiàn)了比Striped64更為高效的算法來(lái)計(jì)數(shù)圾浅,那么只需要將Striped64模塊換成新的模塊就可以了,對(duì)原來(lái)的核心操作是不影響的憾朴,這種模塊化系統(tǒng)設(shè)定的思想應(yīng)該在我們的項(xiàng)目中具體實(shí)踐狸捕。

上面說(shuō)到j(luò)ava 8在進(jìn)行size方法的設(shè)計(jì)上引入了Striped64這種并發(fā)計(jì)數(shù)組件,這種組件的計(jì)數(shù)思想其實(shí)也是分散競(jìng)爭(zhēng)众雷,Striped64的實(shí)現(xiàn)上使用了volatile和CAS灸拍,在Striped64的實(shí)現(xiàn)中是看不到鎖的使用的,但是Striped64確實(shí)是一種高效的適用于并發(fā)環(huán)境下的計(jì)數(shù)組件报腔,它會(huì)基于請(qǐng)求計(jì)數(shù)的線程株搔,Striped64的計(jì)數(shù)會(huì)根據(jù)兩部分的內(nèi)容來(lái)得到最后的結(jié)果,類似于java 7中ConcurrentHashMap的size方法的實(shí)現(xiàn)纯蛾,在Striped64的實(shí)現(xiàn)上也是借鑒了這種思想的纤房,Striped64會(huì)首先嘗試將某個(gè)線程的計(jì)數(shù)請(qǐng)求累加到一個(gè)base共享變量上,如果成功了翻诉,那么說(shuō)明目前的競(jìng)爭(zhēng)不是很激烈炮姨,也就沒(méi)必要后面的操作了捌刮,但是很多情況下,并發(fā)環(huán)境下的線程競(jìng)爭(zhēng)是很激烈的舒岸,所以嘗試?yán)奂拥絙ase上的計(jì)數(shù)請(qǐng)求很大概率是會(huì)失敗的绅作,那么Striped64會(huì)維護(hù)一個(gè)Cell數(shù)組,每個(gè)Cell是一個(gè)計(jì)數(shù)組件蛾派,Striped64會(huì)為每個(gè)請(qǐng)求計(jì)數(shù)的線程計(jì)算一個(gè)哈希值俄认,然后哈希到Cell數(shù)組中的某個(gè)位置上,然后這個(gè)線程的計(jì)數(shù)就會(huì)累加到該Cell上面去洪乍。當(dāng)然眯杏,Striped64的實(shí)現(xiàn)細(xì)節(jié)是非常復(fù)雜的,想要了解Striped64的實(shí)現(xiàn)細(xì)節(jié)的讀者可以參考文章Java 并發(fā)計(jì)數(shù)組件Striped64詳解壳澳,配合Striped64的源碼效果更佳岂贩。

并發(fā)同步框架AQS

AQS是java中實(shí)現(xiàn)Lock的基礎(chǔ),也是實(shí)現(xiàn)線程同步的基礎(chǔ)巷波,AQS提供了鎖的語(yǔ)義萎津,并且支持獨(dú)占模式和共享模式,對(duì)應(yīng)于悲觀鎖和樂(lè)觀鎖抹镊,獨(dú)占模式的含義是說(shuō)同一時(shí)刻只能有一個(gè)線程獲取鎖锉屈,而其他試圖獲取鎖的線程都需要阻塞等待,而共享鎖的含義是說(shuō)可以有多個(gè)線程獲得鎖垮耳,兩種模式在不同的場(chǎng)景下使用部念。

而鎖在并發(fā)編程中的地位不言而喻,多個(gè)線程的同步很多時(shí)候是需要鎖來(lái)做同步的氨菇,比如對(duì)于某些資源,我們希望可以有多個(gè)線程獲得鎖來(lái)讀取妓湘,但是只允許有一個(gè)線程獲得鎖來(lái)執(zhí)行寫(xiě)操作查蓉,這種鎖稱為讀寫(xiě)鎖,它的實(shí)現(xiàn)上結(jié)合了AQS的共享模式和獨(dú)占模式榜贴,共享模式對(duì)應(yīng)于可以使得多個(gè)線程獲得鎖來(lái)進(jìn)行讀操作豌研,獨(dú)占模式對(duì)應(yīng)于只允許有一個(gè)線程獲得鎖來(lái)進(jìn)行寫(xiě)操作。關(guān)于java中多個(gè)Lock的實(shí)現(xiàn)細(xì)節(jié)唬党,以及是如何借助AQS來(lái)實(shí)現(xiàn)其具體邏輯的內(nèi)容鹃共,可以參考文章ava可重入鎖詳解。該文章詳細(xì)講述了多個(gè)Lock接口的實(shí)現(xiàn)類驶拱,以及他們是如何借助AQS來(lái)實(shí)現(xiàn)的具體細(xì)節(jié)霜浴。

某些時(shí)候,我們需要定制我們自己的線程同步策略蓝纲,個(gè)性化的線程同步借助AQS可以很容易的實(shí)現(xiàn)阴孟,比如我們的需求是允許限定個(gè)數(shù)的線程獲得鎖來(lái)進(jìn)行一些操作晌纫,想要實(shí)現(xiàn)這樣的語(yǔ)義,只需要實(shí)現(xiàn)一個(gè)類永丝,繼承AQS锹漱,然后重寫(xiě)方法下面兩個(gè)方法:


    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }    

關(guān)于AQS的具體分析,可以參考文章Java同步框架AbstractQueuedSynchronizer慕嚷。

還需要提到的一點(diǎn)是哥牍,鎖分為公平鎖和非公平鎖,java中大多數(shù)時(shí)候會(huì)使用隊(duì)列來(lái)實(shí)現(xiàn)公平鎖喝检,而使用棧來(lái)實(shí)現(xiàn)非公平鎖嗅辣,當(dāng)然這是基于隊(duì)列和棧這兩種數(shù)據(jù)結(jié)構(gòu)的特點(diǎn)來(lái)實(shí)現(xiàn)的,直觀的來(lái)說(shuō)蛇耀,使用隊(duì)列的FIFO的特性就可以實(shí)現(xiàn)類似排隊(duì)的效果辩诞,也就保證了公平性,而棧是一個(gè)后進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)纺涤,它的這種結(jié)構(gòu)造成的結(jié)果就是译暂,最新進(jìn)入的線程可能比那些等待過(guò)一段時(shí)間的線程更早的獲得鎖,更為具體的內(nèi)容可以參考上面的文章進(jìn)行了解撩炊。

synchronized(同步鎖)

相對(duì)于volatile外永,synchronized就顯得比較重量級(jí)了。
首先拧咳,我們應(yīng)該知道伯顶,在java中,所有的對(duì)象都可以作為鎖骆膝〖礼茫可以分為下面三種情況:
1、普通方法同步阅签,鎖是當(dāng)前對(duì)象
2掐暮、靜態(tài)方法同步,鎖是當(dāng)前類的Class對(duì)象
3政钟、普通塊同步路克,鎖是synchronize里面配置的對(duì)象
當(dāng)一個(gè)線程試圖訪問(wèn)同步代碼時(shí),必須要先獲得鎖养交,退出或者拋出異常時(shí)必須要釋放鎖精算。
JVM基于進(jìn)入和退出Monitor對(duì)象來(lái)實(shí)現(xiàn)方法同步和代碼塊同步,可以使用monitorenter和monitorexit指令實(shí)現(xiàn)碎连。monitorenter指令是在編譯后插入到同步代碼塊的開(kāi)始位置灰羽,而monitorexit指令則插入到方法結(jié)束和異常處,JVM保證每個(gè)monitorenter都有一個(gè)monitorexit閾值相對(duì)應(yīng)。線程執(zhí)行到monitorenter的時(shí)候谦趣,會(huì)嘗試獲得對(duì)象所對(duì)應(yīng)的monitor的鎖疲吸,然后才能獲得訪問(wèn)權(quán)限,synchronize使用的鎖保存在Java對(duì)象頭中前鹅。

并發(fā)隊(duì)列(阻塞隊(duì)列摘悴,同步隊(duì)列)

并發(fā)隊(duì)列,也就是可以在并發(fā)環(huán)境下使用的隊(duì)列舰绘,為什么一般的隊(duì)列不能再并發(fā)環(huán)境下使用呢蹂喻?因?yàn)樵诓l(fā)環(huán)境下,可能會(huì)有多個(gè)線程同時(shí)來(lái)訪問(wèn)一個(gè)隊(duì)列捂寿,這個(gè)時(shí)候因?yàn)樯舷挛那袚Q的原因可能會(huì)造成數(shù)據(jù)不一致的情況口四,并發(fā)隊(duì)列解決了這個(gè)問(wèn)題,并且java中的并發(fā)隊(duì)列的使用時(shí)非常廣泛的秦陋,比如在java的線程池的實(shí)現(xiàn)上使用了多種不同特性的阻塞隊(duì)列來(lái)做任務(wù)隊(duì)列飞袋,對(duì)于阻塞隊(duì)列來(lái)說(shuō)妇垢,它要解決的首要的兩個(gè)問(wèn)題是:

  1. 多線程環(huán)境支持砌们,多個(gè)線程可以安全的訪問(wèn)隊(duì)列
  2. 支持生產(chǎn)和消費(fèi)等待旅薄,多個(gè)線程之間互相配合,當(dāng)隊(duì)列為空的時(shí)候顺又,消費(fèi)線程會(huì)阻塞等待隊(duì)列不為空更卒;當(dāng)隊(duì)列滿了的時(shí)候,生產(chǎn)線程就會(huì)阻塞直到隊(duì)列不滿稚照。

java中提供了豐富的并發(fā)隊(duì)列實(shí)現(xiàn)蹂空,下面展示了這些并發(fā)隊(duì)列的概覽:

java并發(fā)隊(duì)列概覽

根據(jù)上面的圖可以將java中實(shí)現(xiàn)的并發(fā)隊(duì)列分為幾類:

  1. 一般的阻塞隊(duì)列
  2. 支持雙端存取的并發(fā)隊(duì)列
  3. 支持延時(shí)獲取數(shù)據(jù)的延時(shí)阻塞隊(duì)列
  4. 支持優(yōu)先級(jí)的阻塞隊(duì)列

這些隊(duì)列的區(qū)別就在于從隊(duì)列中存取數(shù)據(jù)時(shí)的具體表現(xiàn),比如對(duì)于延時(shí)隊(duì)列來(lái)說(shuō)果录,獲取數(shù)據(jù)的線程可能被阻塞等待一段時(shí)間上枕,也可能立刻返回,對(duì)于優(yōu)先級(jí)阻塞隊(duì)列弱恒,獲取的數(shù)據(jù)是根據(jù)一定的優(yōu)先級(jí)取到的姿骏。下面展示了一些隊(duì)列操作的具體表現(xiàn):

操作類型 Throws Exception Special Value Blocked Timed out
插入 add(o) offer(o) put(o) offer(o, timeout, unit)
取出(刪除) remove(o) poll() take() poll(timeout, unit)
  • Throws Exception 類型的插入和取出在不能立即被執(zhí)行的時(shí)候就會(huì)拋出異常。
  • Special Value 類型的插入和取出在不能被立即執(zhí)行的情況下會(huì)返回一個(gè)特殊的值(true 或者 false)
  • Blocked 類型的插入和取出操作在不能被立即執(zhí)行的時(shí)候會(huì)阻塞線程直到可以操作的時(shí)候會(huì)被其他線程喚醒
  • Timed out 類型的插入和取出操作在不能立即執(zhí)行的時(shí)候會(huì)被阻塞一定的時(shí)候斤彼,如果在指定的時(shí)間內(nèi)沒(méi)有被執(zhí)行,那么會(huì)返回一個(gè)特殊值

關(guān)于更為具體的分析總結(jié)可以參考文章Java 并發(fā)隊(duì)列詳解蘸泻。

無(wú)鎖并發(fā)設(shè)計(jì)須知

在并發(fā)系統(tǒng)設(shè)計(jì)的時(shí)候琉苇,為了數(shù)據(jù)安全等原因需要對(duì)共享數(shù)據(jù)進(jìn)行加鎖訪問(wèn),但是使用鎖必然會(huì)有開(kāi)銷悦施,在并并發(fā)系統(tǒng)較為繁忙的時(shí)候并扇,這個(gè)開(kāi)銷就變得很可觀了,為了規(guī)避這個(gè)問(wèn)題抡诞,無(wú)鎖并發(fā)系統(tǒng)設(shè)計(jì)成為一種趨勢(shì)穷蛹。

所謂無(wú)鎖并發(fā)土陪,即在進(jìn)行并發(fā)系統(tǒng)設(shè)計(jì)的時(shí)候不使用鎖,而使用一些其他的技術(shù)來(lái)達(dá)到鎖的效果肴熏,在java中鬼雀,CAS技術(shù)是無(wú)鎖并發(fā)系統(tǒng)設(shè)計(jì)的基礎(chǔ)技術(shù),結(jié)合CAS和spin即可實(shí)現(xiàn)無(wú)鎖并發(fā)系統(tǒng)的設(shè)計(jì)蛙吏,但是源哩,因?yàn)樯婕皊pin,也就是自旋鸦做,所以需要特別注意CPU 100%的問(wèn)題励烦,以及因?yàn)槭褂脽o(wú)鎖,如果沒(méi)有做好線程競(jìng)爭(zhēng)管理泼诱,就會(huì)出現(xiàn)線程饑餓的問(wèn)題坛掠,下面是在使用CAS進(jìn)行無(wú)鎖并發(fā)系統(tǒng)設(shè)計(jì)時(shí)需要注意的一些問(wèn)題:

  • (1)、使用隊(duì)列來(lái)管理競(jìng)爭(zhēng)線程治筒,解決線程饑餓的問(wèn)題
  • (2)屉栓、在多次CAS無(wú)果之后,請(qǐng)讓線程休息一會(huì)矢炼,讓出CPU使得一些其他的事情得到處理
  • (3)系瓢、避免出現(xiàn)CPU 100%的問(wèn)題

如果沒(méi)有足夠的CAS經(jīng)驗(yàn),不推薦使用CAS來(lái)進(jìn)行無(wú)鎖并發(fā)設(shè)計(jì)句灌,使用鎖可以方便快速的解決并發(fā)問(wèn)題夷陋,并且性能問(wèn)題逐漸得到解決,所以胰锌,如果對(duì)性能不是要求特別嚴(yán)苛骗绕,使用鎖即可,當(dāng)然资昧,鎖的使用也需要合理酬土,否則性能依然會(huì)成為一個(gè)很大的問(wèn)題。

總結(jié)

本文總結(jié)了java并發(fā)編程中的若干核心技術(shù)格带,并且對(duì)每一個(gè)核心技術(shù)都做了一些分析撤缴,并給出了參考鏈接,可以在參考鏈接中查找到更為具體深入的分析總結(jié)內(nèi)容叽唱。java并發(fā)編程需要解決一些問(wèn)題屈呕,比如線程間同步問(wèn)題,如何保證數(shù)據(jù)可見(jiàn)性問(wèn)題棺亭,以及如何高效的協(xié)調(diào)多個(gè)線程工作等內(nèi)容虎眨,本文在這些維度上都有所設(shè)計(jì),本文作為對(duì)閱讀java.util.Concurrent包的源碼閱讀的一個(gè)總結(jié),同時(shí)本文也作為一個(gè)起點(diǎn)嗽桩,一個(gè)開(kāi)始更高層次分析總結(jié)的起點(diǎn)岳守,之前的分析都是基于jdk源碼來(lái)進(jìn)行的,并且某些細(xì)節(jié)的內(nèi)容還沒(méi)有完全搞明白碌冶,其實(shí)在閱讀了一些源碼之后就會(huì)發(fā)現(xiàn)湿痢,如果想要深入分析某個(gè)方面的內(nèi)容,就需要一些底層的知識(shí)种樱,否則很難完整的分析總結(jié)出來(lái)蒙袍,但是這種不徹底的分析又是很有必要的,至少可以對(duì)這些內(nèi)容有一些大概的了解嫩挤,并且知道自己的不足害幅,以及未來(lái)需要了解的底層內(nèi)容。對(duì)于java并發(fā)包的分析研究岂昭,深入到底層就是對(duì)jvm如何管理內(nèi)容以现,如何管理線程的分析,在深入下去约啊,就是操作系統(tǒng)對(duì)內(nèi)存的管理邑遏,對(duì)線程的管理等內(nèi)容,從操作系統(tǒng)再深入下去恰矩,就是去理解cpu的指令系統(tǒng)记盒,學(xué)習(xí)磁盤知識(shí)等內(nèi)容,當(dāng)然外傅,知識(shí)的關(guān)聯(lián)是無(wú)止境的纪吮,學(xué)習(xí)也是無(wú)止境的,目前來(lái)說(shuō)萎胰,首要解決的問(wèn)題是可以熟練的使用java提供的并發(fā)包內(nèi)容來(lái)進(jìn)行并發(fā)編程碾盟,在業(yè)務(wù)上提高并發(fā)處理能力,在出現(xiàn)問(wèn)題的時(shí)候可以很快找到問(wèn)題并且解決問(wèn)題技竟,在達(dá)到這個(gè)要求之后冰肴,可以去了解一些jvm層次的內(nèi)容,比如jvm的內(nèi)存模型榔组,以及線程的實(shí)現(xiàn)熙尉,并且可以與學(xué)習(xí)操作系統(tǒng)的相關(guān)內(nèi)容并行進(jìn)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末搓扯,一起剝皮案震驚了整個(gè)濱河市骡尽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌擅编,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異爱态,居然都是意外死亡谭贪,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門锦担,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)俭识,“玉大人,你說(shuō)我怎么就攤上這事洞渔√酌模” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵磁椒,是天一觀的道長(zhǎng)堤瘤。 經(jīng)常有香客問(wèn)我,道長(zhǎng)浆熔,這世上最難降的妖魔是什么本辐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮医增,結(jié)果婚禮上慎皱,老公的妹妹穿的比我還像新娘。我一直安慰自己叶骨,他們只是感情好茫多,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著忽刽,像睡著了一般天揖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缔恳,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天宝剖,我揣著相機(jī)與錄音,去河邊找鬼歉甚。 笑死万细,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的纸泄。 我是一名探鬼主播赖钞,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼聘裁!你這毒婦竟也來(lái)了雪营?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤衡便,失蹤者是張志新(化名)和其女友劉穎献起,沒(méi)想到半個(gè)月后洋访,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谴餐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年姻政,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岂嗓。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡汁展,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出厌殉,到底是詐尸還是另有隱情食绿,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布公罕,位于F島的核電站器紧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏熏兄。R本人自食惡果不足惜品洛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望摩桶。 院中可真熱鬧桥状,春花似錦、人聲如沸硝清。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)芦拿。三九已至士飒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔗崎,已是汗流浹背酵幕。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缓苛,地道東北人芳撒。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像未桥,于是被迫代替她去往敵國(guó)和親笔刹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容