Java多線程--JDK并發(fā)包(2)
線程池
在使用線程池后版保,創(chuàng)建線程變成了從線程池里獲得空閑線程盐须,關(guān)閉線程變成了將線程歸壞給線程池。
JDK有一套Executor框架茬贵,大概包括Executor郊霎、ExecutorService沼头、AbstractExeccutorService、ThreadPoolExecutor歹篓、Executors等成員瘫证,位于java.util.concurrent
包下揉阎。它們之間的關(guān)系如下:
Executor是頂層的接口庄撮,ExecutorService接口繼承了它,AbstrctExecutorService繼承了ExecutorService毙籽,ThreadPoolExecutor繼承了AbstrctExecutorService洞斯。如果用<——
表示繼承,<--
表示實(shí)現(xiàn)接口坑赡,它們的關(guān)系可表示如下:
Executor(接口) <—— ExecutorService(接口) <-- AbstrctExecutorService(抽象類) <—— ThreadPoolExecutor(類)
Executors是單獨(dú)的一個(gè)類烙如,可以看成是“線程池工廠”,它有很多靜態(tài)方法毅否,比如:
- newFixedThreadPool(int nThread)
- newSingleThreadExecutor()
- newCachedThreadPool()
- newSingleThreadScheduledExecutor()
- newScheduledThreadPool(int corePoolSize)
newFixedThreadPool該方法返回一個(gè)固定線程數(shù)的線程池亚铁。當(dāng)有新任務(wù)提交時(shí),如果線程池中有空閑線程就立即執(zhí)行螟加,否則會(huì)進(jìn)入任務(wù)隊(duì)列中徘溢,等到有空閑線程了才能執(zhí)行。
newSingleThreadExecutor捆探,該方法返回只有一個(gè)線程的線程池然爆,處理策略和上面一樣。實(shí)際上就是上面的參數(shù)指定為1而已黍图。
newCachedThreadPool該方法返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)的線程池曾雕,任務(wù)提交后,如果有空閑線程可以復(fù)用助被,則優(yōu)先復(fù)用剖张。若線程池中的線程全部在工作,而此時(shí)有新任務(wù)揩环,則會(huì)創(chuàng)建新的線程來(lái)處理任務(wù)修械,所有線程執(zhí)行完后會(huì)將線程歸還給線程池。
newScheduledThreadPool返回一個(gè)ScheduledExecutorService對(duì)象检盼,可以有計(jì)劃地執(zhí)行任務(wù)肯污,比如在某個(gè)延時(shí)之后開(kāi)始執(zhí)行,或者周期性地執(zhí)行某個(gè)任務(wù)。可以指定線程數(shù)量。
newSingleThreadScheduledExecutor實(shí)現(xiàn)了和上面一樣的功能疆液,不過(guò)線程池的大小為1答倡。
ScheduledExecutorService有三個(gè)方法可以有計(jì)劃地執(zhí)行任務(wù)。如:
-
schedule(Runnable command, long delay, TimeUnit unit);
該方法可以在給定的延時(shí)后畸陡,執(zhí)行一個(gè)任務(wù); -
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
該方法以任務(wù)開(kāi)始執(zhí)行的時(shí)間為initialDelay,加上周期period失晴,就是下一個(gè)任務(wù)開(kāi)始執(zhí)行的時(shí)間,以此類推拘央,因此這個(gè)方法任務(wù)調(diào)度的頻率是一定的涂屁; -
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
該方法表示每執(zhí)行完一個(gè)任務(wù),延遲delay
的時(shí)間后灰伟,開(kāi)始執(zhí)行下一個(gè)任務(wù)拆又,initialDelay
還是表示任務(wù)開(kāi)始的初始時(shí)延,上一個(gè)任務(wù)結(jié)束的時(shí)間點(diǎn)與下一個(gè)任務(wù)開(kāi)始的時(shí)間點(diǎn)之差是固定的栏账,固定為delay帖族。
即使單個(gè)任務(wù)的執(zhí)行時(shí)間超過(guò)調(diào)度周期,scheduleAtFixedRate也不會(huì)讓多個(gè)任務(wù)堆疊挡爵,比如任務(wù)執(zhí)行需要8s竖般,而調(diào)度周期是2s,調(diào)度第二個(gè)任務(wù)時(shí)茶鹃,第一個(gè)還沒(méi)執(zhí)行完涣雕,因此為了避免任務(wù)堆疊,此時(shí)調(diào)度周期會(huì)變成8s前计;而采用scheduleWithFixedDelay胞谭,兩個(gè)任務(wù)之間的實(shí)際間隔會(huì)變成10s,8s的執(zhí)行+2s的delay男杈。
線程池的內(nèi)部實(shí)現(xiàn)
- newFixedThreadPool(int nThread)
- newSingleThreadExecutor()
- newCachedThreadPool()
這三個(gè)內(nèi)部都是通過(guò)返回ThreadPoolExecutor產(chǎn)生線程池的丈屹。所以我們來(lái)重點(diǎn)關(guān)注它的構(gòu)造方法。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize表示線程池中的線程數(shù)伶棒;
- maximumPoolSize表示線程池中的最大線程數(shù)旺垒;
- keepAliveTime表示當(dāng)線程數(shù)超過(guò)corePoolSize時(shí),多余的空閑線程的存活時(shí)間肤无;
- unit是keepAliveTime的單位
- workQueue任務(wù)隊(duì)列先蒋,保存那些已經(jīng)提交但還沒(méi)有開(kāi)始執(zhí)行的任務(wù)(在等待空閑線程);
- threadFactory宛渐,線程工廠竞漾,可自定義眯搭,一般默認(rèn);
- handler拒絕策略业岁,當(dāng)任務(wù)多得來(lái)不及處理時(shí)鳞仙,如何拒絕任務(wù)。
workQueue是一個(gè)BlockingQueue接口的對(duì)象笔时,存放的是Runnable對(duì)象棍好。根據(jù)功能的不同,ThreadPoolExecutor中可以使用以下幾種BlockingQueue
- 直接提交的隊(duì)列:對(duì)應(yīng)SynchronousQueue對(duì)象允耿,它沒(méi)有容量借笙,每一個(gè)插入都要等待一個(gè)相應(yīng)的刪除操作;每一個(gè)刪除操作都要等待對(duì)應(yīng)的插入操作较锡。使用該對(duì)象业稼,提交的任務(wù)不會(huì)被真實(shí)保存,而總是將任務(wù)交給線程執(zhí)行念链。如果沒(méi)有空閑線程就創(chuàng)建新線程盼忌,如果線程數(shù)已經(jīng)達(dá)到最大值积糯,就執(zhí)行拒絕策略掂墓。
- 有界的任務(wù)隊(duì)列:使用ArrayBlockingQueue實(shí)現(xiàn)。當(dāng)有任務(wù)提交時(shí)看成,判斷線程池中當(dāng)前的實(shí)際線程數(shù)君编,如果小于corePoolSize,則優(yōu)先創(chuàng)建新線程川慌;若大于corePoolSize吃嘿,就將任務(wù)加入到等待隊(duì)列中;若此時(shí)等待隊(duì)列也滿梦重,創(chuàng)建新線程兑燥;若實(shí)際線程已經(jīng)達(dá)到maxPoolSize,就開(kāi)始執(zhí)行拒絕策略琴拧〗低可以看出有界的任務(wù)隊(duì)列只有在任務(wù)隊(duì)列滿時(shí),才會(huì)創(chuàng)建新線程蚓胸,通常情況下實(shí)際線程數(shù)可以穩(wěn)定在corePoolSize挣饥。
- 無(wú)界的任務(wù)隊(duì)列:使用LinkedBlockingQueue實(shí)現(xiàn)。和上面ArrayBlockingQueue相比沛膳,區(qū)別在于扔枫,任務(wù)隊(duì)列沒(méi)有大小限制,當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí)锹安,直接進(jìn)入任務(wù)隊(duì)列短荐。
- 優(yōu)先任務(wù)隊(duì)列:使用PriorityBlockingQueue實(shí)現(xiàn)倚舀。前面的幾種都是按照先進(jìn)先出的順序來(lái)處理任務(wù),而該對(duì)象實(shí)現(xiàn)的任務(wù)隊(duì)列可根據(jù)任務(wù)自身的優(yōu)先級(jí)順序執(zhí)行忍宋。
newFixedThreadPool因?yàn)樗腸orePoolSize和maxPoolSize大小一樣瞄桨,固定大小的線程不存在當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí)要新增線程的可能,所以它使用了LinkedBlockingQueue讶踪,當(dāng)有新任務(wù)且實(shí)際線程數(shù)已經(jīng)達(dá)到最大時(shí)芯侥,會(huì)直接進(jìn)入等待隊(duì)列。
newSingleThreadExecutor是newFixedThreadPool的一種特殊情況乳讥,即取corePoolSize和maxPoolSize都為1
而newCachedThreadPool的corePoolSize為0柱查,maxPoolSize為Integer.MAX_VALUE
,任務(wù)隊(duì)列使用SynchronousQueue直接提交云石,新任務(wù)提交后唉工,若有空閑線程就直接用,若沒(méi)有就進(jìn)入等待隊(duì)列——但是這是個(gè)直接提交的隊(duì)列汹忠,所有會(huì)新增線程執(zhí)行該任務(wù)淋硝!由于corePoolSize為0,所以任務(wù)執(zhí)行完畢后60s(構(gòu)造函數(shù)指定)就會(huì)被回收宽菜。
拒絕策略
當(dāng)實(shí)際線程數(shù)超過(guò)maxPoolSize時(shí)谣膳,該采取什么樣的策略?
- AbortPolicy:丟棄任務(wù)并拋出異常铅乡;
- CallerRunPolicy:該任務(wù)被線程池拒絕继谚,由調(diào)用execute方法的線程執(zhí)行該任務(wù);
- DiscardOldestPolicy:丟棄最老的一個(gè)阵幸,也就是馬上要執(zhí)行的一個(gè)任務(wù)花履;
- DiscardPolicy:默默丟棄被拒絕的任務(wù),體現(xiàn)在代碼中就是什么也不做挚赊。
下面看看CallerRunPolicy怎么拒絕的
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
DiscardOldestPolicy是這樣做的
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 最老的一個(gè)請(qǐng)求在隊(duì)列頭部
e.execute(r);
}
}
線程的創(chuàng)建--線程工廠
ThreadFactory只有一個(gè)方法Thread newThread(Runnable r);
诡壁,線程池中的線程就是由它創(chuàng)建的。
Fork/Join框架
fork也就是分支荠割、分叉的意思妹卿,可以將大任務(wù)分解成小任務(wù);join表示等待的意思涨共,必須等待fork后的小任務(wù)執(zhí)行完畢纽帖,得到執(zhí)行后的部分結(jié)果,才能將部分結(jié)果合并成最終結(jié)果举反。
比如計(jì)算1到10000的和懊直,就可以分成10個(gè)分支,每個(gè)分支計(jì)算一千個(gè)數(shù)的和火鼻,得到一個(gè)部分和室囊,等待這10個(gè)部分和的結(jié)果都計(jì)算完畢雕崩,最后將其全部合并,得到最終的結(jié)果融撞。
通常一個(gè)物理線程需要處理多個(gè)邏輯任務(wù)盼铁,所以每一個(gè)線程都有一個(gè)任務(wù)隊(duì)列。若線程A的任務(wù)都執(zhí)行完了尝偎,B還有很多任務(wù)沒(méi)執(zhí)行饶火,此時(shí)A會(huì)“幫助”B執(zhí)行它的任務(wù),A幫助B執(zhí)行B的任務(wù)時(shí)致扯,從隊(duì)列的尾部拿數(shù)據(jù)肤寝;而B(niǎo)自己執(zhí)行任務(wù)時(shí)從隊(duì)列頭部拿數(shù)據(jù),這就像是兩個(gè)指針一個(gè)往左移動(dòng)一個(gè)往右移動(dòng)抖僵,避免了A鲤看、B之間對(duì)數(shù)據(jù)的競(jìng)爭(zhēng)。
JDK中有ForkJoinPool耍群,該接口有個(gè)方法public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
ForkJoinTask支持fork()
和join()
方法义桂,它有兩個(gè)重要的子類,沒(méi)有返回值的RecursiveAction和有返回值的RecursiveTask蹈垢,它們都有個(gè)方法compute()
慷吊,在這個(gè)方法中進(jìn)行主要的計(jì)算。對(duì)于RecursiveAction來(lái)說(shuō)簽名是void耘婚,而對(duì)于RecursiveTask來(lái)說(shuō)有返回值所以簽名是<T>
JDK并發(fā)容器
- ConcurrentHashMap:高效的并發(fā)HashMap罢浇,可看作線程安全的HashMap陆赋;
- CopyOnWriteArrayList:讀-讀沐祷,讀-寫不會(huì)阻塞,只有在寫-寫下會(huì)進(jìn)行同步攒岛。在讀多寫少的場(chǎng)合赖临,性能很好;
- ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列灾锯,鏈表實(shí)現(xiàn)兢榨,使用了CAS操作(Compare and Swap),可看作線程安全的LinkedList顺饮;
- BlockingQueue:接口吵聪,實(shí)現(xiàn)了Queue;數(shù)組實(shí)現(xiàn)的ArrayBlockingQueue和鏈表實(shí)現(xiàn)的LinkedBlockingQueue實(shí)現(xiàn)了這個(gè)接口兼雄。
- ConcurrentSkipListMap:使用跳表的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的Map吟逝。
CopyOnWriteArrayList原理
CopyOnWriteArrayList的原理主要是:讀的時(shí)候正常讀,寫-寫需要同步赦肋,所以在寫之前要使用Lock块攒,然后為了讀-寫不阻塞励稳,CopyOnWriteArrayList在寫入操作時(shí),先將原數(shù)組復(fù)制一份囱井,然后在新數(shù)組末尾追加要添加的值驹尼,添加成功后再用新數(shù)組覆蓋舊數(shù)組。
JDK中的該類的add方法是這樣實(shí)現(xiàn)的:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 保證寫-寫阻塞庞呕,故進(jìn)行同步
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 關(guān)鍵新翎!寫入之前先賦值一個(gè)副本
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 新數(shù)組的末尾添加
newElements[len] = e;
// 新數(shù)組覆蓋舊數(shù)組
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
而數(shù)組的定義是這樣的:
private transient volatile Object[] array;
注意有volatile關(guān)鍵字,說(shuō)明當(dāng)寫數(shù)據(jù)的線程修改數(shù)組后住练,其他讀取線程能立即“察覺(jué)”到料祠。
BlockingQueue原理
BlockingQueue可以在并發(fā)環(huán)境下高效傳輸數(shù)據(jù),本質(zhì)上還是一個(gè)隊(duì)列澎羞,數(shù)據(jù)從隊(duì)列尾部入髓绽,從隊(duì)列頭部出。隊(duì)列都有的offer()
和pull()
就不說(shuō)了妆绞,沒(méi)什么特別的顺呕。BlockingQueue還有put()
和take()
方法,正是這兩個(gè)方法實(shí)現(xiàn)了阻塞括饶。
以ArrayBlockingQueue來(lái)說(shuō):當(dāng)隊(duì)列為空時(shí)株茶,take()方法會(huì)等待,直到隊(duì)列不為空图焰;當(dāng)隊(duì)列滿時(shí)启盛,put()方法會(huì)等待,直到隊(duì)列有空閑位置技羔。這是怎么實(shí)現(xiàn)的呢僵闯?來(lái)看代碼
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
首先讀和寫都用的同一個(gè)鎖lock,因此任何時(shí)候讀和寫只能有一個(gè)在執(zhí)行藤滥。然后是條件notNull鳖粟,等待非滿,以便put拙绊;notEmpty等待非空向图,以便take。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 關(guān)鍵标沪,若隊(duì)列滿了榄攀,就等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 關(guān)鍵!一旦插入了數(shù)據(jù)金句,隊(duì)列就不是非空了檩赢,于是喚醒在notEmpty上等待的線程(通知其他線程可以進(jìn)行take啦)
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 關(guān)鍵!若隊(duì)列為空趴梢,等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 關(guān)鍵漠畜!有元素出列了币他,等待在notFull上的線程可以被喚醒,可以進(jìn)行put操作了
notFull.signal();
return x;
}
LinkedBlockingQueue和ArrayBlockingQueue原理大同小異憔狞,不過(guò)LinkedBlockingQueue讀和寫分別用一把鎖蝴悉,因此讀和寫可以同時(shí)進(jìn)行。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
跳表
ConcurrentSkipMap使用跳表實(shí)現(xiàn)瘾敢。是一種可以進(jìn)行快速查找的數(shù)據(jù)結(jié)構(gòu)拍冠,時(shí)間復(fù)雜度是$O(lg n)$
跳表形象點(diǎn)說(shuō)像個(gè)“直角三角形一樣的金字塔”,每一層都是一條鏈表簇抵,最底層的鏈表包含了Map中的所有數(shù)據(jù)庆杜,每上一層都是下面一層的子集,越到上面結(jié)點(diǎn)越少碟摆。層與層之間通過(guò)值相同的元素鏈接起來(lái)晃财,因此結(jié)點(diǎn)除了有指向本層的下一個(gè)結(jié)點(diǎn)的right,還有指向下層中具有相同值的元素的down(實(shí)際上通過(guò)數(shù)據(jù)結(jié)構(gòu)Index表示)典蜕。另外断盛,跳表中所有鏈表的元素都是排序的。
查找時(shí)愉舔,先從頂層開(kāi)始查找钢猛,如果找到就結(jié)束了;否則當(dāng)發(fā)現(xiàn)查找的值大于當(dāng)前層的最大值(鏈表末尾)轩缤,就會(huì)“跳到”下一層鏈表接著向前查找命迈,查找朝著下面和右面兩個(gè)方向進(jìn)行,有點(diǎn)像“下臺(tái)階”...
by @sunhaiyu
2108.4.26