作者: 一字馬胡
轉(zhuǎn)載標志1 【2017-11-01】
轉(zhuǎn)載標志2 【內(nèi)容來自 一字馬胡】
更新日志
日期 | 更新內(nèi)容 | 備注 |
---|---|---|
2017-11-01 | 新建文章 | V1 |
2018-05-21 | 新增無鎖并發(fā)設(shè)計須知 | V2 |
2020-08-23 | 重新發(fā)布 | V3 |
批注-2020-08-23
文章內(nèi)容較多可帽,且因為寫的比較早垮斯,有些內(nèi)容可能存在錯誤或者不再正確,文章會被持續(xù)迭代定嗓,整理下來只是為了后續(xù)能不斷回頭看看枯怖,不斷迭代注整,僅此而已。
本文主要內(nèi)容索引
1度硝、Java線程
2肿轨、線程模型
3、Java線程池
4蕊程、Future(各種Future)
5椒袍、Fork/Join框架
6、volatile
7藻茂、CAS(原子操作)
8驹暑、AQS(并發(fā)同步框架)
9、synchronized(同步鎖)
10辨赐、并發(fā)隊列(阻塞隊列)
11优俘、無鎖并發(fā)設(shè)計須知
本文僅分析java并發(fā)編程中的若干核心問題,對于上面沒有提到但是又和java并發(fā)編程有密切關(guān)系的技術(shù)將會不斷添加進來完善文章掀序,本文將長期更新帆焕,不斷迭代。本文試圖從一個更高的視覺來總結(jié)Java語言中的并發(fā)編程內(nèi)容不恭,希望閱讀完本文之后叶雹,可以收獲一些內(nèi)容,至少應(yīng)該知道在java中做并發(fā)編程實踐的時候應(yīng)該注意什么换吧,應(yīng)該關(guān)注什么折晦,如何保證線程安全,以及如何選擇合適的工具來滿足需求沾瓦。當然满着,更深層次的內(nèi)容就會涉及到j(luò)vm層面的知識打颤,包括底層對java內(nèi)存的管理,對線程的管理等較為核心的問題漓滔,當然,本文的定位在于抽象與總結(jié)乖篷,更為具體而深入的內(nèi)容就需要自己去實踐响驴,考慮到可能篇幅過長、重復(fù)描述某些內(nèi)容撕蔼,以及自身技術(shù)深度等原因豁鲤,本文將在深度和廣度上做一些權(quán)衡届宠,某些內(nèi)容會做一些深入的分析矮固,而有些內(nèi)容會一帶而過动遭,點到為止熊户,總之践惑,本文就當是對學(xué)習(xí)java并發(fā)編程內(nèi)容的一個總結(jié)吉懊,以及給哪些希望快速了解java并發(fā)編程內(nèi)容的讀者拋磚引玉刽脖,不足之處還望指正腕窥。
Java線程
一般來說怒坯,在java中實現(xiàn)高并發(fā)是基于多線程編程的炫狱,所謂并發(fā),也就是多個線程同時工作剔猿,來處理我們的業(yè)務(wù)视译,在機器普遍多核心的今天,并發(fā)編程的意義極為重大归敬,因為我們有多個cpu供線程使用酷含,如果我們的應(yīng)用依然只使用單線程模式來工作的話,對極度浪費機器資源的汪茧。所以椅亚,學(xué)習(xí)java并發(fā)知識的首要問題是:如何創(chuàng)建一個線程,并且讓這個線程做一些事情陆爽?這是java并發(fā)編程內(nèi)容的起點什往,下面將分別介紹多個創(chuàng)建線程,并且讓線程做一些事情的方法慌闭。
繼承Thread類
繼承Thread類别威,然后重寫run方法,這是第一種創(chuàng)建線程的方法驴剔。run方法里面就是我們要做的事情省古,可以在run方法里面寫我們想要在新的線程里面運行的任務(wù),下面是一個小例子丧失,我們繼承了Thread類豺妓,并且在run方法里面打印出了當然線程的名字,然后sleep1秒中之后就退出了:
/**
* Created by hujian06 on 2017/10/31.
*
* the demo of thread
*/
public class ThreadDemo {
public static void main(String ... args) {
AThread aThread = new AThread();
//start the thread
aThread.start();
}
}
class AThread extends Thread {
@Override
public void run() {
System.out.println("Current Thread Name:" +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果我們想要啟動這個線程,只需要像上面代碼中那樣琳拭,調(diào)用Thread類的start方法就可以了训堆。
實現(xiàn)Runnable接口
啟動一個線程的第二種方法是實現(xiàn)Runnable接口,然后實現(xiàn)其run方法白嘁,將你想要在新線程里面執(zhí)行的業(yè)務(wù)代碼寫在run方法里面坑鱼,下面的例子展示了這種方法啟動線程的示例,實現(xiàn)的功能和上面的第一種示例是一樣的:
/**
* Created by hujian06 on 2017/10/31.
*
* the demo of Runnable
*/
public class ARunnableaDemo {
public static void main(String ... args) {
ARunnanle aRunnanle = new ARunnanle();
Thread thread = new Thread(aRunnanle);
thread.start();
}
}
class ARunnanle implements Runnable {
@Override
public void run() {
System.out.println("Current Thread Name:" +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在啟動線程的時候絮缅,依然還是使用了Thread這個類鲁沥,只是我們在構(gòu)造函數(shù)中將我們實現(xiàn)的Runnable對象傳遞進去了,所以在我們執(zhí)行Thread類的start方法的時候耕魄,實際執(zhí)行的內(nèi)容是我們的Runnable的run方法画恰。
使用FutureTask
啟動一個新的線程的第三種方法是使用FutureTask,下面來看一下FutureTask的類圖吸奴,就可以明白為什么可以使用FutureTask來啟動一個新的線程了:
從FutureTask的類圖中可以看出允扇,F(xiàn)utureTask實現(xiàn)了Runnable接口和Future接口,所以它兼?zhèn)銻unnable和Future兩種特性则奥,下面先來看看如何使用FutureTask來啟動一個新的線程:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* Created by hujian06 on 2017/10/31.
*
* the demo of FutureTask
*/
public class FutureTaskDemo {
public static void main(String ... args) {
ACallAble callAble = new ACallAble();
FutureTask<String> futureTask = new FutureTask<>(callAble);
Thread thread = new Thread(futureTask);
thread.start();
do {
}while (!futureTask.isDone());
try {
String result = futureTask.get();
System.out.println("Result:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ACallAble implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Thread-Name:" +
Thread.currentThread().getName();
}
}
可以看到蔼两,使用FutureTask來啟動一個線程之后,我們可以監(jiān)控這個線程是否完成逞度,上面的示例中主線程會一直等待這個新創(chuàng)建的線程直到它返回额划,其實只要是Future提供的接口,我們在FutureTask中都可以使用档泽,這極大的方便了我們俊戳,F(xiàn)uture在并發(fā)編程中的意義極為重要,F(xiàn)uture代表一個未來會發(fā)生的東西馆匿,它是一種暗示抑胎,一種占位符,它示意我們它可能不會立即得到結(jié)果渐北,因為它的任務(wù)還在運行阿逃,但是我們可以得到一個對這個線程的監(jiān)控對象,我們可以對線程的執(zhí)行做一些判斷赃蛛,甚至是控制恃锉,比如,如果我們覺得我們等了太久呕臂,并且我們覺得沒有必要再等待下去的時候破托,就可以將這個Task取消,還有一點需要提到的是歧蒋,F(xiàn)uture代表它可能正在運行土砂,也可能已經(jīng)返回州既,當然Future更多的暗示你可以在等待這個結(jié)果的同時可以使用其他的線程做一些其他的事情,當你真的需要這個結(jié)果的時候再來獲取就可以了萝映,這就是并發(fā)吴叶,理解這一點非常重要。
本小節(jié)通過介紹三種創(chuàng)建并啟動一個新線程的方法序臂,為進行并發(fā)編程開了一個頭晤郑,目前,我們還只是在能創(chuàng)建多個線程贸宏,然后讓多個線程做不同個的事情的階段,當然磕洪,這是學(xué)習(xí)并發(fā)編程最為基礎(chǔ)的吭练,無論如何,現(xiàn)在析显,我們可以讓我們的應(yīng)用運行多個線程了鲫咽,下面的文章將會基于這個假設(shè)(一個應(yīng)用開啟了多個線程)討論一些并發(fā)編程中值得關(guān)注的內(nèi)容。關(guān)于本小節(jié)更為詳細的內(nèi)容谷异,可以參考文章Java CompletableFuture中的部分內(nèi)容分尸。
線程模型
我們現(xiàn)在可以啟動多個線程,但是好像并沒有形成一種類似于模型的東西歹嘹,非陈嵘埽混亂,并且到目前為止我們的多個線程依然只是各自做各自的事情尺上,互不相干材蛛,多個線程之間并沒有交互(通信),這是最簡單的模型怎抛,也是最基礎(chǔ)的模型卑吭,本小節(jié)試圖介紹線程模型,一種指導(dǎo)我們的代碼組織的思想马绝,線程模型確定了我們需要處理那些多線程的問題豆赏,在一個系統(tǒng)中,多個線程之間沒有通信是不太可能的富稻,更為一般的情況是掷邦,多個線程共享一些資源,然后相互競爭來獲取資源權(quán)限椭赋,多個線程相互配合耙饰,來提高系統(tǒng)的處理能力。正因為多個線程之間會有通信交互纹份,所以本文接下來的討論才有了意義苟跪,如果我們的系統(tǒng)里面有幾百個線程在工作廷痘,但是這些線程互不相干,那么這樣的系統(tǒng)要么實現(xiàn)的功能非常單一件已,要么毫無意義(當然不是絕對的笋额,比如Netty的線程模型)。
繼續(xù)來討論線程模型篷扩,上面說到線程模型是一種指導(dǎo)代碼組織的思想兄猩,這是我自己的理解,不同的線程模型需要我們使用不同的代碼組織鉴未,好的線程模型可以提高系統(tǒng)的并發(fā)度枢冤,并且可以使得系統(tǒng)的復(fù)雜度降低,這里需要提一下Netty 4的線程模型铜秆,Netty 4的線程模型使得我們可以很容易的理解Netty的事件處理機制淹真,這種優(yōu)秀的設(shè)計基于Reactor線程模型,Reactor線程模型分為單線程模型连茧、多線程模型以及主從多線程模型核蘸,Netty的線程模型類似于Reactor主從多線程模型。
當然線程模型是一種更高級別的并發(fā)編程內(nèi)容啸驯,它是一種編程指導(dǎo)思想客扎,尤其在我們進行底層框架設(shè)計的時候特別需要注意線程模型,因為一旦線程模型設(shè)計不合理罚斗,可能會導(dǎo)致后面框架代碼過于復(fù)雜徙鱼,并且可能因為線程同步等問題造成問題不可控,最終導(dǎo)致系統(tǒng)運行失控针姿。類似于Netty的線程模型是一種好的線程模型疆偿,下面展示了這種模型:
簡單來說,Netty為每個新建立的Channel分配一個NioEventLoop搓幌,而每個NioEventLoop內(nèi)部僅使用一個線程杆故,這就避免了多線程并發(fā)的同步問題,因為為每個Channel處理的線程僅有一個溉愁,所以不需要使用鎖等線程同步手段來做線程同步处铛,在我們的系統(tǒng)設(shè)計的時候應(yīng)該借鑒這種線程模型的設(shè)計思路,可以避免我們走很多彎路拐揭。關(guān)于線程池以及Netty線程池這部分的內(nèi)容撤蟆,可以參考文章Netty線程模型及EventLoop詳解。
Java線程池
池化技術(shù)是一種非常有用的技術(shù)堂污,對于線程來說家肯,創(chuàng)建一個線程的代價是很高的,如果我們在創(chuàng)建了一個線程盟猖,并且讓這個線程做一個任務(wù)之后就回收的話讨衣,那么下次要使用線程來執(zhí)行我們的任務(wù)的時候又需要創(chuàng)建一個新的線程换棚,是否可以在創(chuàng)建完成一個線程之后一直緩沖,直到系統(tǒng)關(guān)閉的時候再進行回收呢反镇?java線程池就是這樣的組件固蚤,使用線程池,就沒必要頻繁創(chuàng)建線程歹茶,線程池會為我們管理線程夕玩,當我們需要一個新的線程來執(zhí)行我們的任務(wù)的時候,就向線程池申請惊豺,而線程池會從池子里面找到一個空閑的線程返回給請求者燎孟,如果池子里面沒有可用的線程,那么線程池會根據(jù)一些參數(shù)指標來創(chuàng)建一個新的線程尸昧,或者將我們的任務(wù)提交到任務(wù)隊列中去揩页,等待一個空閑的線程來執(zhí)行這個任務(wù)。細節(jié)內(nèi)容在下文中進行分析彻磁,目前我們只需要明白,線程池里面有很多線程狸捅,這些線程會一直到系統(tǒng)關(guān)系才會被回收衷蜓,否則一直會處于處理任務(wù)或者等待處理任務(wù)的狀態(tài)。
首先尘喝,如何使用線程池呢磁浇?下面的代碼展示了如何使用java線程池的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by hujian06 on 2017/10/31.
*
* the demo of Executors
*/
public class ExecutorsDemo {
public static void main(String ... args) {
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
AThreadFactory threadFactory = new AThreadFactory();
ARunnanle runnanle = new ARunnanle();
ExecutorService fixedThreadPool=
Executors.newFixedThreadPool(cpuCoreCount, threadFactory);
ExecutorService cachedThreadPool =
Executors.newCachedThreadPool(threadFactory);
ScheduledExecutorService newScheduledThreadPool =
Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);
ScheduledExecutorService singleThreadExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
fixedThreadPool.submit(runnanle);
cachedThreadPool.submit(runnanle);
newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS);
singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
fixedThreadPool.shutdownNow();
cachedThreadPool.shutdownNow();
newScheduledThreadPool.shutdownNow();
singleThreadExecutor.shutdownNow();
}
}
class ARunnable implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Current Thread Name:" +
Thread.currentThread().getName());
}
}
/**
* the thread factory
*/
class AThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread("aThread-" + threadNumber.incrementAndGet());
}
}
更為豐富的應(yīng)用應(yīng)該自己去探索,結(jié)合自身的需求來借助線程池來實現(xiàn)朽褪,下面來分析一下Java線程池實現(xiàn)中幾個較為重要的內(nèi)容置吓。
ThreadPoolExecutor和ScheduledThreadPoolExecutor
ThreadPoolExecutor和ScheduledThreadPoolExecutor是java實現(xiàn)線程池的核心類,不同類型的線程池其實就是在使用不同的構(gòu)造函數(shù)缔赠,以及不同的參數(shù)來構(gòu)造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor衍锚,所以,學(xué)習(xí)java線程池的重點也在于學(xué)習(xí)這兩個核心類嗤堰。前者適用于構(gòu)造一般的線程池戴质,而后者繼承了前者,并且很多內(nèi)容是通用的踢匣,但是ScheduledThreadPoolExecutor增加了schedule功能告匠,也就是說,ScheduledThreadPoolExecutor使用于構(gòu)造具有調(diào)度功能的線程池离唬,在需要周期性調(diào)度執(zhí)行的場景下就可以使用ScheduledThreadPoolExecutor后专。關(guān)于ThreadPoolExecutor與ScheduledThreadPoolExecutor較為詳細深入的分析可以參考下面的文章:
- Java線程池詳解(一)
- Java線程池詳解(二)
- Java調(diào)度線程池ScheduleExecutorService
- Java調(diào)度線程池ScheduleExecutorService(續(xù))
下面展示了ThreadPoolExecutor和ScheduledThreadPoolExecutor的類圖,可以看出他們的關(guān)系输莺,以及他們的繼承關(guān)系:
關(guān)于較為細節(jié)的內(nèi)容不再本文的敘述范圍之內(nèi)戚哎,如果想要了解這些內(nèi)容的詳細內(nèi)容裸诽,可以參考文章中給出的鏈接,這些文章較為深入的分析和總結(jié)了相關(guān)的內(nèi)容建瘫。
上文中提到崭捍,線程池會管理著一些線程,這些線程要么處于運行狀態(tài)啰脚,要么處于等待任務(wù)的狀態(tài)殷蛇,當然這只是我們較為形象的描述,一個線程的狀態(tài)不僅有運行態(tài)與等待狀態(tài)橄浓,還有其他的狀態(tài)粒梦,但是對我我們來說,線程池里面的線程確實是要么處于運行狀態(tài)荸实,要么處于等待任務(wù)的狀態(tài)匀们,這體現(xiàn)在,當我們向一個線程池提交一個任務(wù)的時候准给,可能會被等待任務(wù)的線程立即執(zhí)行泄朴,但是可能線程池里面的線程都處于忙碌狀態(tài),那么我們提交的任務(wù)就會被加入到等待運行的任務(wù)隊列中去露氮,當有空閑線程了祖灰,或者隊列也滿了,那么線程池就會采用一些策略來執(zhí)行任務(wù)畔规,并且在某些時刻會拒絕提交的任務(wù)局扶,這些細節(jié)都可以在ThreadPoolExecutor的實現(xiàn)中找到。
在線程池的實現(xiàn)中叁扫,有一個角色特別重要三妈,那就是任務(wù)隊列,當線程池里面沒有空閑的線程來執(zhí)行我們的任務(wù)的時候莫绣,我們的任務(wù)就會被添加到任務(wù)隊列中去等待執(zhí)行畴蒲,而這個任務(wù)隊列可能會被多個線程并發(fā)讀寫,所以需要支持多線程安全訪問对室,java提供了一類支持并發(fā)環(huán)境的隊列饿凛,稱為阻塞隊列,這是一類特殊的隊列软驰,他們的使用時非常廣泛的涧窒,特別是在jdk自身的類庫建設(shè)上,當然在我們實際的工作中也是有很多使用場景的锭亏。
關(guān)于ThreadPoolExecutor是如何處理一個提交的任務(wù)的細節(jié)纠吴,可以參考下面的代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
下面來看一下java中借助ThreadPoolExecutor來構(gòu)造的幾個線程池的特性:
1、newFixedThreadPool
使用ThreadPoolExecutor構(gòu)造一個newCachedThreadPool的流程如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
在任意時刻慧瘤,newFixedThreadPool構(gòu)造出來的線程池中最多只可能存活著nThreads個線程,如果所有的線程都在運行任務(wù),那么這個時候提交的任務(wù)將會被添加到任務(wù)隊列中去等待執(zhí)行点楼。我們可以控制corePoolSize和maximumPoolSize來使得通過ThreadPoolExecutor構(gòu)造出來的線程池具有一些不一樣的特性,但是需要注意的是伐坏,當我們設(shè)置的maximumPoolSize大于corePoolSize的時候,如果當前線程池里面的線程數(shù)量已經(jīng)達到了corePoolSize了握联,并且當前所以線程都處于運行任務(wù)的狀態(tài)桦沉,那么在這個時候提交的任務(wù)會被添加到任務(wù)隊列中去,只有在任務(wù)隊列滿了的時候金闽,才會去創(chuàng)建新的線程纯露,如果線程數(shù)量已經(jīng)達到了maximumPoolSize了,那么到此就會拒絕提交的任務(wù)代芜,這些流程可以參考上面展示出來的execute方法的實現(xiàn)埠褪。該類型的線程池使用的任務(wù)隊列是LinkedBlockingQueue類型的阻塞隊列。
2挤庇、newCachedThreadPool
通過ThreadPoolExecutor構(gòu)造一個newCachedThreadPool線程池的流程如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
newCachedThreadPool適合于類似秒殺系統(tǒng)中钞速,它可以按需創(chuàng)建線程。每個線程在空閑了一段時間之后會被回收嫡秕,然后需要創(chuàng)建的時候再創(chuàng)建出來渴语,在使用的時候應(yīng)該使用合適的構(gòu)造參數(shù)。該類型使用的任務(wù)隊列是SynchronousQueue這種同步隊列淘菩,這是一種特別的隊列遵班,每個線程都是有使命的屠升,每個線程都會等待另外一個線程和自己交易潮改,在交易完成之前都會阻塞住線程,他們之間有一種傳遞關(guān)系腹暖,數(shù)據(jù)是從一個線程直接傳遞到例外一個線程中去的汇在,SynchronousQueue這種隊列不存儲實際的數(shù)據(jù),而是存儲著一些線程的信息脏答,而SynchronousQueue管理著這些線程之間的交易糕殉,更為詳細的細節(jié)參考后面的文章。
上面提到殖告,ScheduleThreadPoolExecutor是繼承自ThreadPoolExecutor的阿蝶,而且從類圖中也可以看出來這種關(guān)系,所以其實ScheduleThreadPoolExecutor是對ThreadPoolExecutor的增強黄绩,它增加了schedule功能羡洁,使用與那些需要周期性調(diào)度執(zhí)行,或者是延時執(zhí)行的任務(wù)爽丹,在ScheduleThreadPoolExecutor中使用了一種阻塞隊列稱為延時阻塞隊列筑煮,這種隊列有能力持有一段時間數(shù)據(jù)辛蚊,我們可以設(shè)定這種時間,時間沒到的時候嘗試獲取數(shù)據(jù)的線程會被阻塞真仲,直到設(shè)定的時間到了袋马,線程才會被喚醒來消費數(shù)據(jù)。而關(guān)于ScheduleThreadPoolExecutor是如何運作的秸应,包括他的周期性任務(wù)調(diào)度是如何工作的虑凛,可以參考上面提到的鏈接。
Future
Future代表一種未來某個時刻會發(fā)生的事情灸眼,在并發(fā)環(huán)境下使用Future是非常重要的卧檐,使用Future的前提是我們可以容許線程執(zhí)行一段時間來完成這個任務(wù),但是需要在我們提交了任務(wù)的時候就返回一個Future焰宣,這樣在接下來的時間程序員可以根據(jù)實際情況來取消任務(wù)或者獲取任務(wù)霉囚,在多個任務(wù)沒有相互依賴關(guān)系的時候,使用Future可以實現(xiàn)多線程的并發(fā)執(zhí)行匕积,多個線程可以執(zhí)行在不同的處理器上盈罐,然后在某個時間點來統(tǒng)一獲取結(jié)果就可以了。上文中已經(jīng)提到了FutureTask闪唆,F(xiàn)utureTask既是一種Runnable盅粪,也是一種Future,并且結(jié)合了兩種類型的特性悄蕾。下面展示了Future提供的一些方法票顾,使用這些方法可以很方便的進行任務(wù)控制:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在java 8中增加了一個新的類CompletableFuture,這是對Future的極大增強帆调,CompletableFuture提供了非常豐富的操作可以來控制我們的任務(wù)奠骄,并且可以根據(jù)多種規(guī)則來關(guān)聯(lián)多個Future。關(guān)于CompletableFuture的詳細分析總結(jié)可以參考文章Java CompletableFuture番刊。
Fork/Join框架
Fork/Join框架是一種并行框架含鳞,它可以將一個較大的任務(wù)切分成一些小任務(wù)來執(zhí)行,并且多個線程之間會相互配合芹务,每個線程都會有一個任務(wù)隊列蝉绷,對于某些線程來說它們可能很快完成了自己的任務(wù)隊列中的任務(wù),但是其他的線程還沒有完成枣抱,那么這些線程就會去竊取那些還沒有完成任務(wù)執(zhí)行的線程的任務(wù)來執(zhí)行熔吗,這成為“工作竊取”算法,關(guān)于Fork/Join中的工作竊取佳晶,其實現(xiàn)還是較為復(fù)雜的桅狠,如果想對Fork/Join框架有一個大致的認識,可以參考文章Java Fork/Join并行框架。下面展示了Fork/Join框架的工作模式:
可以從上面的圖中看出垂攘,一個較大的任務(wù)會被切分為一個小任務(wù)维雇,并且小任務(wù)還會繼續(xù)切分,直到符合我們設(shè)定的執(zhí)行閾值晒他,然后就會執(zhí)行吱型,執(zhí)行完成之后會進行join,也就是將小任務(wù)的結(jié)果組合起來陨仅,組裝出我們提交的整個任務(wù)的結(jié)果津滞,這是一種非常先進的工作模式,非常有借鑒意義灼伤。當然触徐,使用Fork/Join框架的前提是我們的任務(wù)時可以拆分成小任務(wù)來執(zhí)行的,并且小人物的結(jié)果可以組裝出整個大任務(wù)的結(jié)果狐赡,歸并排序是一種可以借助Fork/Join框架來提供處理速度的算法撞鹉,下面展示了使用Fork/Join框架來執(zhí)行歸并排序的代碼,可以試著調(diào)整參數(shù)來進行性能測試:
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/**
* Created by hujian06 on 2017/10/23.
*
* merge sort by fork/join
*/
public class ForkJoinMergeSortDemo {
public static void main(String ... args) {
new Worker().runWork();
}
}
class Worker {
private static final boolean isDebug = false;
public void runWork() {
int[] array = mockArray(200000000, 1000000); // mock the data
forkJoinCase(array);
normalCase(array);
}
private void printArray(int[] arr) {
if (isDebug == false) {
return;
}
for (int i = 0; i < arr.length; i ++) {
System.out.print(arr[i] + " ");
}
System.out.println();
}
private void forkJoinCase(int[] array) {
ForkJoinPool pool = new ForkJoinPool();
MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);
long start = System.currentTimeMillis();
pool.invoke(mergeSortTask);
long end = System.currentTimeMillis();
printArray(array);
System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
array.length + " items' sort work.");
}
private void normalCase(int[] array) {
long start = System.currentTimeMillis();
new MergeSortWorker().sort(array, 0, array.length - 1);
long end = System.currentTimeMillis();
printArray(array);
System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
array.length + " items' sort work.");
}
private static final int[] mockArray(int length, int up) {
if (length <= 0) {
return null;
}
int[] array = new int[length];
Random random = new Random(47);
for (int i = 0; i < length; i ++) {
array[i] = random.nextInt(up);
}
return array;
}
}
class MergeSortTask extends RecursiveAction {
private static final int threshold = 100000;
private final MergeSortWorker mergeSortWorker = new MergeSortWorker();
private int[] data;
private int left;
private int right;
public MergeSortTask(int[] array, int l, int r) {
this.data = array;
this.left = l;
this.right = r;
}
@Override
protected void compute() {
if (right - left < threshold) {
mergeSortWorker.sort(data, left, right);
} else {
int mid = left + (right - left) / 2;
MergeSortTask l = new MergeSortTask(data, left, mid);
MergeSortTask r = new MergeSortTask(data, mid + 1, right);
invokeAll(l, r);
mergeSortWorker.merge(data, left, mid, right);
}
}
}
class MergeSortWorker {
// Merges two subarrays of arr[].
// First subarray is arr[l..m]
// Second subarray is arr[m+1..r]
void merge(int arr[], int l, int m, int r) {
// Find sizes of two subarrays to be merged
int n1 = m - l + 1;
int n2 = r - m;
/* Create temp arrays */
int L[] = new int[n1];
int R[] = new int[n2];
/*Copy data to temp arrays*/
for (int i = 0; i < n1; ++i)
L[i] = arr[l + i];
for (int j = 0; j < n2; ++j)
R[j] = arr[m + 1 + j];
/* Merge the temp arrays */
// Initial indexes of first and second subarrays
int i = 0, j = 0;
// Initial index of merged subarry array
int k = l;
while (i < n1 && j < n2) {
if (L[i] <= R[j]) {
arr[k ++] = L[i ++];
} else {
arr[k ++] = R[j ++];
}
}
/* Copy remaining elements of L[] if any */
while (i < n1) {
arr[k ++] = L[i ++];
}
/* Copy remaining elements of R[] if any */
while (j < n2) {
arr[k ++] = R[j ++];
}
}
// Main function that sorts arr[l..r] using
// merge()
void sort(int arr[], int l, int r) {
if (l < r) {
// Find the middle point
int m = l + (r - l) / 2;
// Sort first and second halves
sort(arr, l, m);
sort(arr, m + 1, r);
// Merge the sorted halves
merge(arr, l, m, r);
}
}
}
在jdk中颖侄,使用Fork/Join框架的一個典型案例是Streams API鸟雏,Streams API試圖簡化我們的并發(fā)編程,可以使用很簡單的流式API來處理我們的數(shù)據(jù)流览祖,在我們無感知的狀態(tài)下孝鹊,其實Streams的實現(xiàn)上借助了Fork/Join框架來實現(xiàn)了并發(fā)計算,所以強烈建議使用Streams API來處理我們的流式數(shù)據(jù)展蒂,這樣可以充分的利用機器的多核心資源又活,來提高數(shù)據(jù)處理的速度。關(guān)于java的Streams API的分析總結(jié)可以參考文章Java Streams API锰悼,關(guān)于Stream API是如何使用Fork/Join框架來實現(xiàn)并行計算的內(nèi)容可以參考文章Java Stream的并行實現(xiàn)柳骄。鑒于Fork/Join框架的先進思想,理解并且學(xué)會使用Fork/Join框架來處理我們的實際問題是非常有必要的松捉。
Java volatile關(guān)鍵字
volatile解決的問題是多個線程的內(nèi)存可見性問題夹界,在并發(fā)環(huán)境下馆里,每個線程都會有自己的工作空間隘世,每個線程只能訪問各自的工作空間,而一些共享變量會被加載到每個線程的工作空間中鸠踪,所以這里面就有一個問題丙者,內(nèi)存中的數(shù)據(jù)什么時候被加載到線程的工作緩存中,而線程工作空間中的內(nèi)容什么時候會回寫到內(nèi)存中去营密。這兩個步驟處理不當就會造成內(nèi)存可加性問題械媒,也就是數(shù)據(jù)的不一致,比如某個共享變量被線程A修改了,但是沒有回寫到內(nèi)存中去纷捞,而線程B在加載了內(nèi)存中的數(shù)據(jù)之后讀取到的共享變量是臟數(shù)據(jù)痢虹,正確的做法應(yīng)該是線程A的修改應(yīng)該對線程B是可見的,更為通用一些主儡,就是在并發(fā)環(huán)境下共享變量對多個線程是一致的奖唯。
對于內(nèi)存可見性的一點補充是,之所以會造成多個線程看到的共享變量的值不一樣糜值,是因為線程在占用CPU時間的時候丰捷,cpu為了提高處理速度不會直接和內(nèi)存交互,而是會先將內(nèi)存中的共享內(nèi)容讀取到內(nèi)部緩存中(L1寂汇,L2)病往,然后cpu在處理的過程中就只會和內(nèi)部緩存交互,在多核心的機器中這樣的處理方式就會造成內(nèi)存可見性問題骄瓣。
volatile可以解決并發(fā)環(huán)境下的內(nèi)存可見性問題停巷,只需要在共享變量前面加上volatile關(guān)鍵字就可以解決,但是需要說明的是榕栏,volatile僅僅是解決內(nèi)存可見性問題叠穆,對于像i++這樣的問題還是需要使用其他的方式來保證線程安全。使用volatile解決內(nèi)存可見性問題的原理是臼膏,如果對被volatile修飾的共享變量執(zhí)行寫操作的話硼被,JVM就會向cpu發(fā)送一條Lock前綴的指令,cpu將會這個變量所在的緩存行(緩存中可以分配的最小緩存單位)寫回到內(nèi)存中去渗磅。但是在多處理器的情況下嚷硫,將某個cpu上的緩存行寫回到系統(tǒng)內(nèi)存之后,其他cpu上該變量的緩存還是舊的始鱼,這樣再進行后面的操作的時候就會出現(xiàn)問題仔掸,所以為了使得所有線程看到的內(nèi)容都是一致的,就需要實現(xiàn)緩存一致性協(xié)議医清,cpu將會通過監(jiān)控總線上傳遞過來的數(shù)據(jù)來判斷自己的緩存是否過期起暮,如果過期,就需要使得緩存失效会烙,如果cpu再來訪問該緩存的時候负懦,就會發(fā)現(xiàn)緩存失效了,這時候就會重新從內(nèi)存加載緩存柏腻。
總結(jié)一下纸厉,volatile的實現(xiàn)原則有兩條:
1、JVM的Lock前綴的指令將使得cpu緩存寫回到系統(tǒng)內(nèi)存中去
2五嫂、為了保證緩存一致性原則颗品,在多cpu的情景下肯尺,一個cpu的緩存回寫內(nèi)存會導(dǎo)致其他的cpu上的緩存都失效,再次訪問會重新從系統(tǒng)內(nèi)存加載新的緩存內(nèi)容躯枢。
原子操作CAS
原子操作表達的意思是要么一個操作成功则吟,要么失敗,中間過程不會被其他的線程中斷锄蹂,這一點對于并發(fā)編程來說非常重要逾滥,在java中使用了大量的CAS來做并發(fā)編程,包括jdk的ConcurrentHsahMap的實現(xiàn)败匹,還有AtomicXXX的實現(xiàn)等其他一些并發(fā)工具的實現(xiàn)都使用了CAS這種技術(shù)寨昙,CAS包括兩部分,也就是Compare and swap掀亩,首先是比較舔哪,然后再交互,這樣做的原因是槽棍,在并發(fā)環(huán)境下捉蚤,可能不止一個線程想要來改變某個共享變量的值,那么在進行操作之前使用一個比較炼七,而這個比較的值是當前線程認為(知道)該共享變量最新的值缆巧,但是可能其他線程已經(jīng)改變了這個值,那么此時CAS操作就會失敗豌拙,只有在共享變量的值等于線程提供的用于比較的值的時候才會進行原子改變操作陕悬。
java中有一個類是專門用于提供CAS操作支持的,那就是Unsafe類按傅,但是我們不能直接使用Unsafe類捉超,因為Unsafe類提供的一些底層的操作,需要非常專業(yè)的人才能使用好唯绍,并且Unsafe類可能會造成一些安全問題拼岳,所以不建議直接使用Unsafe類,但是如果想使用Unsafe類的話還是有方法的况芒,那就是通過反射來獲取Unsafe實例惜纸,類似于下面的代碼:
class UnsafeHolder {
private static Unsafe U = null;
public static Unsafe getUnsafe() {
if (U == null) {
synchronized (UnsafeHolder.class) {
if (U == null) {
List<Exception> exception = null;
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
try {
U = (Unsafe) field.get(null);
} catch (IllegalAccessException e) {
exception.add(e);
}
} catch (NoSuchFieldException e) {
exception.add(e);
} finally {
if (exception != null) {
reportException(exception);
}
}
}
}
}
return U;
}
/**
* handler the exception in this method .
* @param e The exception
*/
private static void reportException(List<Exception> e) {
e.forEach(System.out::println);
}
}
如果想要了解Unsafe類到底提供了哪些較為底層的操作,可以直接參考Unsafe的源碼绝骚。CAS操作解決了原子操作問題耐版,只要進行操作,CAS就會保證操作會成功皮壁,不會被中斷椭更,這是一種非常好非常強大的特性哪审,下面就java 8中的ConcurrentHashMap的size實現(xiàn)來談?wù)凜AS操作在并發(fā)環(huán)境下的使用案例蛾魄。
在java 7中,ConcurrentHashMap的實現(xiàn)是基于分段鎖協(xié)議的實現(xiàn),本質(zhì)上還是使用了鎖滴须,只是基于一種考慮舌狗,就是多個線程訪問哈希桶具有隨機性,基于這種考慮來將數(shù)據(jù)存儲在不同的哈希段上面扔水,然后每一個段配有一把鎖痛侍,在需要寫某個段的時候需要加鎖,而在這個時候魔市,其他訪問其他段的線程是不需要阻塞的主届,但是對于該段的線程訪問就需要等待,直到這個加鎖的線程釋放了鎖待德,其他線程才能進行訪問君丁。在java 8中,ConcurrentHashMap的實現(xiàn)拋棄了這種復(fù)雜的架構(gòu)設(shè)計将宪,但是繼承了這種分散線程競爭壓力的思想绘闷,其實就提高系統(tǒng)的并發(fā)度這一維度來說,分散競爭壓力是一種最為直接明了的解決方案较坛,而java 8在實現(xiàn)ConcurrentHashMap的時候大量使用了CAS操作印蔗,減少了使用鎖的頻度來提高系統(tǒng)的響應(yīng)度,其實使用鎖和使用CAS來做并發(fā)在復(fù)雜度上不是一個數(shù)量級的丑勤,使用鎖在很大程度上假設(shè)了多個線程的排斥性华嘹,并且使用鎖會將線程阻塞等待,也就是說使用鎖來做線程同步的時候法竞,線程的狀態(tài)是會改變的除呵,但是使用CAS是不會改變線程的狀態(tài)的(不太嚴謹?shù)恼f),所以使用CAS比起使用synchronized或者使用Lcok來說更為輕量級爪喘。
現(xiàn)在就ConcurrentHashMap的size方法來分析一下如何將線程競爭的壓力分散出去颜曾。在java 7的實現(xiàn)上,在調(diào)用size方法之后秉剑,ConcurrentHashMap會進行兩次對哈希桶中的記錄累加的操作泛豪,這兩次累加的操作是不加鎖的侦鹏,然后判斷兩次結(jié)果是否一致诡曙,如果一致就說明目前的系統(tǒng)是讀多寫少的場景,并且可能目前沒有線程競爭略水,所以直接返回就可以价卤,這就避免了使用鎖,但是如果兩次累加結(jié)果不一致渊涝,那就說明此時可能寫的線程較多慎璧,或者線程競爭較為嚴重床嫌,那么此時ConcurrentHashMap就會進行一個重量級的操作,對所有段進行加鎖胸私,然后對每一個段進行記錄計數(shù)厌处,然后求得最終的結(jié)果返回。在最有情況下岁疼,size方法需要做兩次累加計數(shù)阔涉,最壞情況需要三次,并且會涉及全局加鎖這種重量級的加鎖操作捷绒,性能肯定是不高的瑰排。而在java 8的實現(xiàn)上,ConcurrentHashMap的size方法實際上是與ConcurrentHashMap是解耦的暖侨,size方法更像是接入了一個額外的并發(fā)計數(shù)系統(tǒng)凶伙,在進行size方法調(diào)用的時候是不會影響數(shù)據(jù)的存取的,這其實是一種非常先進的思想它碎,就是一個系統(tǒng)模塊化函荣,然后模塊可以進行更新,系統(tǒng)解耦扳肛,比如java 8中接入了并發(fā)計數(shù)組件Striped64來作為size方法的支撐傻挂,可能未來出現(xiàn)了比Striped64更為高效的算法來計數(shù),那么只需要將Striped64模塊換成新的模塊就可以了挖息,對原來的核心操作是不影響的金拒,這種模塊化系統(tǒng)設(shè)定的思想應(yīng)該在我們的項目中具體實踐。
上面說到j(luò)ava 8在進行size方法的設(shè)計上引入了Striped64這種并發(fā)計數(shù)組件套腹,這種組件的計數(shù)思想其實也是分散競爭绪抛,Striped64的實現(xiàn)上使用了volatile和CAS,在Striped64的實現(xiàn)中是看不到鎖的使用的电禀,但是Striped64確實是一種高效的適用于并發(fā)環(huán)境下的計數(shù)組件幢码,它會基于請求計數(shù)的線程,Striped64的計數(shù)會根據(jù)兩部分的內(nèi)容來得到最后的結(jié)果尖飞,類似于java 7中ConcurrentHashMap的size方法的實現(xiàn)症副,在Striped64的實現(xiàn)上也是借鑒了這種思想的,Striped64會首先嘗試將某個線程的計數(shù)請求累加到一個base共享變量上政基,如果成功了贞铣,那么說明目前的競爭不是很激烈,也就沒必要后面的操作了沮明,但是很多情況下辕坝,并發(fā)環(huán)境下的線程競爭是很激烈的,所以嘗試累加到base上的計數(shù)請求很大概率是會失敗的荐健,那么Striped64會維護一個Cell數(shù)組,每個Cell是一個計數(shù)組件,Striped64會為每個請求計數(shù)的線程計算一個哈希值灰蛙,然后哈希到Cell數(shù)組中的某個位置上,然后這個線程的計數(shù)就會累加到該Cell上面去挚歧。當然扛稽,Striped64的實現(xiàn)細節(jié)是非常復(fù)雜的吁峻,想要了解Striped64的實現(xiàn)細節(jié)的讀者可以參考文章Java 并發(fā)計數(shù)組件Striped64詳解,配合Striped64的源碼效果更佳在张。
并發(fā)同步框架AQS
AQS是java中實現(xiàn)Lock的基礎(chǔ)用含,也是實現(xiàn)線程同步的基礎(chǔ),AQS提供了鎖的語義帮匾,并且支持獨占模式和共享模式啄骇,對應(yīng)于悲觀鎖和樂觀鎖,獨占模式的含義是說同一時刻只能有一個線程獲取鎖瘟斜,而其他試圖獲取鎖的線程都需要阻塞等待缸夹,而共享鎖的含義是說可以有多個線程獲得鎖,兩種模式在不同的場景下使用螺句。
而鎖在并發(fā)編程中的地位不言而喻虽惭,多個線程的同步很多時候是需要鎖來做同步的,比如對于某些資源蛇尚,我們希望可以有多個線程獲得鎖來讀取芽唇,但是只允許有一個線程獲得鎖來執(zhí)行寫操作,這種鎖稱為讀寫鎖取劫,它的實現(xiàn)上結(jié)合了AQS的共享模式和獨占模式匆笤,共享模式對應(yīng)于可以使得多個線程獲得鎖來進行讀操作,獨占模式對應(yīng)于只允許有一個線程獲得鎖來進行寫操作谱邪。關(guān)于java中多個Lock的實現(xiàn)細節(jié)炮捧,以及是如何借助AQS來實現(xiàn)其具體邏輯的內(nèi)容,可以參考文章ava可重入鎖詳解惦银。該文章詳細講述了多個Lock接口的實現(xiàn)類寓盗,以及他們是如何借助AQS來實現(xiàn)的具體細節(jié)。
某些時候璧函,我們需要定制我們自己的線程同步策略傀蚌,個性化的線程同步借助AQS可以很容易的實現(xiàn),比如我們的需求是允許限定個數(shù)的線程獲得鎖來進行一些操作蘸吓,想要實現(xiàn)這樣的語義善炫,只需要實現(xiàn)一個類,繼承AQS库继,然后重寫方法下面兩個方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
關(guān)于AQS的具體分析箩艺,可以參考文章Java同步框架AbstractQueuedSynchronizer窜醉。
還需要提到的一點是,鎖分為公平鎖和非公平鎖艺谆,java中大多數(shù)時候會使用隊列來實現(xiàn)公平鎖榨惰,而使用棧來實現(xiàn)非公平鎖,當然這是基于隊列和棧這兩種數(shù)據(jù)結(jié)構(gòu)的特點來實現(xiàn)的静汤,直觀的來說琅催,使用隊列的FIFO的特性就可以實現(xiàn)類似排隊的效果,也就保證了公平性虫给,而棧是一個后進先出的數(shù)據(jù)結(jié)構(gòu)藤抡,它的這種結(jié)構(gòu)造成的結(jié)果就是,最新進入的線程可能比那些等待過一段時間的線程更早的獲得鎖抹估,更為具體的內(nèi)容可以參考上面的文章進行了解缠黍。
synchronized(同步鎖)
相對于volatile,synchronized就顯得比較重量級了药蜻。
首先瓷式,我們應(yīng)該知道,在java中语泽,所有的對象都可以作為鎖贸典。可以分為下面三種情況:
1湿弦、普通方法同步瓤漏,鎖是當前對象
2、靜態(tài)方法同步颊埃,鎖是當前類的Class對象
3蔬充、普通塊同步,鎖是synchronize里面配置的對象
當一個線程試圖訪問同步代碼時班利,必須要先獲得鎖饥漫,退出或者拋出異常時必須要釋放鎖。
JVM基于進入和退出Monitor對象來實現(xiàn)方法同步和代碼塊同步罗标,可以使用monitorenter和monitorexit指令實現(xiàn)庸队。monitorenter指令是在編譯后插入到同步代碼塊的開始位置,而monitorexit指令則插入到方法結(jié)束和異常處闯割,JVM保證每個monitorenter都有一個monitorexit閾值相對應(yīng)彻消。線程執(zhí)行到monitorenter的時候,會嘗試獲得對象所對應(yīng)的monitor的鎖宙拉,然后才能獲得訪問權(quán)限宾尚,synchronize使用的鎖保存在Java對象頭中。
并發(fā)隊列(阻塞隊列,同步隊列)
并發(fā)隊列煌贴,也就是可以在并發(fā)環(huán)境下使用的隊列御板,為什么一般的隊列不能再并發(fā)環(huán)境下使用呢?因為在并發(fā)環(huán)境下牛郑,可能會有多個線程同時來訪問一個隊列怠肋,這個時候因為上下文切換的原因可能會造成數(shù)據(jù)不一致的情況,并發(fā)隊列解決了這個問題淹朋,并且java中的并發(fā)隊列的使用時非常廣泛的笙各,比如在java的線程池的實現(xiàn)上使用了多種不同特性的阻塞隊列來做任務(wù)隊列,對于阻塞隊列來說瑞你,它要解決的首要的兩個問題是:
- 多線程環(huán)境支持酪惭,多個線程可以安全的訪問隊列
- 支持生產(chǎn)和消費等待希痴,多個線程之間互相配合者甲,當隊列為空的時候,消費線程會阻塞等待隊列不為空砌创;當隊列滿了的時候虏缸,生產(chǎn)線程就會阻塞直到隊列不滿。
java中提供了豐富的并發(fā)隊列實現(xiàn)嫩实,下面展示了這些并發(fā)隊列的概覽:
根據(jù)上面的圖可以將java中實現(xiàn)的并發(fā)隊列分為幾類:
- 一般的阻塞隊列
- 支持雙端存取的并發(fā)隊列
- 支持延時獲取數(shù)據(jù)的延時阻塞隊列
- 支持優(yōu)先級的阻塞隊列
這些隊列的區(qū)別就在于從隊列中存取數(shù)據(jù)時的具體表現(xiàn)刽辙,比如對于延時隊列來說,獲取數(shù)據(jù)的線程可能被阻塞等待一段時間甲献,也可能立刻返回宰缤,對于優(yōu)先級阻塞隊列,獲取的數(shù)據(jù)是根據(jù)一定的優(yōu)先級取到的晃洒。下面展示了一些隊列操作的具體表現(xiàn):
操作類型 | Throws Exception | Special Value | Blocked | Timed out |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, unit) |
取出(刪除) | remove(o) | poll() | take() | poll(timeout, unit) |
- Throws Exception 類型的插入和取出在不能立即被執(zhí)行的時候就會拋出異常慨灭。
- Special Value 類型的插入和取出在不能被立即執(zhí)行的情況下會返回一個特殊的值(true 或者 false)
- Blocked 類型的插入和取出操作在不能被立即執(zhí)行的時候會阻塞線程直到可以操作的時候會被其他線程喚醒
- Timed out 類型的插入和取出操作在不能立即執(zhí)行的時候會被阻塞一定的時候,如果在指定的時間內(nèi)沒有被執(zhí)行球及,那么會返回一個特殊值
關(guān)于更為具體的分析總結(jié)可以參考文章Java 并發(fā)隊列詳解氧骤。
無鎖并發(fā)設(shè)計須知
在并發(fā)系統(tǒng)設(shè)計的時候,為了數(shù)據(jù)安全等原因需要對共享數(shù)據(jù)進行加鎖訪問吃引,但是使用鎖必然會有開銷筹陵,在并并發(fā)系統(tǒng)較為繁忙的時候,這個開銷就變得很可觀了镊尺,為了規(guī)避這個問題朦佩,無鎖并發(fā)系統(tǒng)設(shè)計成為一種趨勢。
所謂無鎖并發(fā)庐氮,即在進行并發(fā)系統(tǒng)設(shè)計的時候不使用鎖语稠,而使用一些其他的技術(shù)來達到鎖的效果,在java中旭愧,CAS技術(shù)是無鎖并發(fā)系統(tǒng)設(shè)計的基礎(chǔ)技術(shù)颅筋,結(jié)合CAS和spin即可實現(xiàn)無鎖并發(fā)系統(tǒng)的設(shè)計宙暇,但是,因為涉及spin议泵,也就是自旋占贫,所以需要特別注意CPU 100%的問題,以及因為使用無鎖先口,如果沒有做好線程競爭管理型奥,就會出現(xiàn)線程饑餓的問題,下面是在使用CAS進行無鎖并發(fā)系統(tǒng)設(shè)計時需要注意的一些問題:
- (1)碉京、使用隊列來管理競爭線程厢汹,解決線程饑餓的問題
- (2)、在多次CAS無果之后谐宙,請讓線程休息一會烫葬,讓出CPU使得一些其他的事情得到處理
- (3)、避免出現(xiàn)CPU 100%的問題
如果沒有足夠的CAS經(jīng)驗凡蜻,不推薦使用CAS來進行無鎖并發(fā)設(shè)計搭综,使用鎖可以方便快速的解決并發(fā)問題,并且性能問題逐漸得到解決划栓,所以兑巾,如果對性能不是要求特別嚴苛,使用鎖即可忠荞,當然蒋歌,鎖的使用也需要合理,否則性能依然會成為一個很大的問題委煤。
總結(jié)
本文總結(jié)了java并發(fā)編程中的若干核心技術(shù)堂油,并且對每一個核心技術(shù)都做了一些分析,并給出了參考鏈接素标,可以在參考鏈接中查找到更為具體深入的分析總結(jié)內(nèi)容称诗。java并發(fā)編程需要解決一些問題,比如線程間同步問題头遭,如何保證數(shù)據(jù)可見性問題寓免,以及如何高效的協(xié)調(diào)多個線程工作等內(nèi)容,本文在這些維度上都有所設(shè)計计维,本文作為對閱讀java.util.Concurrent包的源碼閱讀的一個總結(jié)袜香,同時本文也作為一個起點,一個開始更高層次分析總結(jié)的起點鲫惶,之前的分析都是基于jdk源碼來進行的蜈首,并且某些細節(jié)的內(nèi)容還沒有完全搞明白,其實在閱讀了一些源碼之后就會發(fā)現(xiàn),如果想要深入分析某個方面的內(nèi)容欢策,就需要一些底層的知識吆寨,否則很難完整的分析總結(jié)出來,但是這種不徹底的分析又是很有必要的踩寇,至少可以對這些內(nèi)容有一些大概的了解啄清,并且知道自己的不足,以及未來需要了解的底層內(nèi)容俺孙。對于java并發(fā)包的分析研究辣卒,深入到底層就是對jvm如何管理內(nèi)容,如何管理線程的分析睛榄,在深入下去荣茫,就是操作系統(tǒng)對內(nèi)存的管理,對線程的管理等內(nèi)容场靴,從操作系統(tǒng)再深入下去啡莉,就是去理解cpu的指令系統(tǒng),學(xué)習(xí)磁盤知識等內(nèi)容憎乙,當然票罐,知識的關(guān)聯(lián)是無止境的叉趣,學(xué)習(xí)也是無止境的泞边,目前來說,首要解決的問題是可以熟練的使用java提供的并發(fā)包內(nèi)容來進行并發(fā)編程疗杉,在業(yè)務(wù)上提高并發(fā)處理能力阵谚,在出現(xiàn)問題的時候可以很快找到問題并且解決問題,在達到這個要求之后烟具,可以去了解一些jvm層次的內(nèi)容梢什,比如jvm的內(nèi)存模型,以及線程的實現(xiàn)朝聋,并且可以與學(xué)習(xí)操作系統(tǒng)的相關(guān)內(nèi)容并行進行嗡午。