Java線程協(xié)作
[TOC]
線程協(xié)作的基本方法:wait/notify/notifyAll和await/signal/signalAll。
一爆侣、wait(long timeout)/notify/notifyAll
除了用于鎖的鎖池(entry set)恩闻,每個對象(Object)還有另一個等待池(wait set)用于線程間協(xié)作艺糜,表示等待一個條件,這個條件自己改變不了幢尚,需要等待其他線程改變條件后喚醒當前線程破停。
調(diào)用Object的wait方法會將當前線程放入wait set并阻塞。
使用wait的注意事項:
- wait/notify/notifyAll必須包裹在 synchronized 同步塊中尉剩。
- 多線程 synchronized 同步的必須是一個相同的對象真慢。
wait/notify的兩個問題
- wait 應(yīng)該包裹在while(true){}循環(huán)中,而不是if(true)理茎。
因為:如果if用不能重復(fù)判斷條件黑界,while則會重新判斷條件執(zhí)行。
- 多線程競爭的情況下應(yīng)該使用notifyAll而不是notify否則可能造成死循環(huán)皂林。
因為:wait/notify的同步代碼塊 synchronized 是一個非公平鎖實現(xiàn)朗鸠。
如果一次只喚醒一個線程,可能造成達不到條件的兩個線程相互調(diào)用的死循環(huán)础倍。
比如在消費者/生產(chǎn)者場景下烛占,如果存在多個消費者和生產(chǎn)者,需要喚醒生產(chǎn)者的時候,可能會造成兩個消費者互相喚醒的死循環(huán)忆家。
生產(chǎn)者消費者場景的實現(xiàn)犹菇,可以通過在生產(chǎn)者消費者synchronized同一個容器來實現(xiàn),在生產(chǎn)者消費者內(nèi)部來使用wait/notify實現(xiàn)協(xié)作芽卿。
也可以通過一個阻塞隊列來實現(xiàn)揭芍,阻塞隊列的底層實現(xiàn)一般是采用另外一種機制:await/signal/signalAll
二、await/signal/signalAll
wait/notify屬于Object的方法卸例,配合 synchronized(JVM方法)使用沼沈。
而await/signal/signalAll是Condition的方法,配合Lock的子(jdk方法)類(如ReentrantLock)使用。
另外一個可以讓線程阻塞的方法是sleep(Thread的方法)方法币厕。調(diào)用wait方法需要先獲得鎖列另,而調(diào)用sleep方法是不需要的。
如果已經(jīng)持有鎖定旦装,sleep不會釋放鎖页衙,而wait會杀怠。
這三者的異同簡單對比如下:
方法 | 誰的方法 | 配合使用 | 持有鎖時是否釋放鎖 | wait set |
---|---|---|---|---|
wait/notify | Object | synchronized | Y | unfair |
await/signal | Condition | Lock接口的子類 | Y | 當Lock是fair則wait set是fair |
sleep | Thread | 任何地方 | N | - |
await和wait的作用的區(qū)別有一小點:wait只能在synchronized同步的對象中使用一個隊列俊庇。
而await對應(yīng)的Lock子類以newCondition多個,也就是一個鎖有多個等待隊列撇寞,相當于一個房子安裝了多個門呻袭。
Condition接口支持把在condition queue上等待的線程 設(shè)置為fair或unfair, 當且僅當Lock是fair的眨八,它生成的Condition對象才是fair的。
synchronized 塊或方法對于一個對象只有一個condition queue左电。這樣如果在這個queue上如果有多個condition predicate, 比如isFull(),isEmpty() ,
就必須用notifyAll()方法廉侧, 會有context switch及獲取鎖的性能損失。
三篓足、協(xié)作場景
主要有五個場景:
1 生產(chǎn)者/消費者模式
生產(chǎn)者與消費者通過共享隊列協(xié)作段誊,生產(chǎn)者生產(chǎn)產(chǎn)品放入隊列中,消費者從隊列中消費產(chǎn)品栈拖。
如果隊列長度有限连舍,隊列滿的時候生產(chǎn)者需要等待,隊列空的時候消費者需要等待涩哟。
2 同時開始
類似于運動員比賽索赏,同時開始,常見于模擬仿真程序中贴彼。
3 等待結(jié)束(主從協(xié)作)
主從協(xié)作模式的一種情況:主線程將任務(wù)分解為若干子任務(wù)潜腻,為每個子任務(wù)創(chuàng)建線程,主線程繼續(xù)需要等待所有子線程執(zhí)行完畢锻弓。
4 異步結(jié)果(主從協(xié)作)
主從協(xié)作模式的一種情況:主線程將子線程的調(diào)用封裝成異步調(diào)用砾赔,調(diào)用后馬上返回Future對象蝌箍,后續(xù)通過Future獲取最終結(jié)果青灼。
5 集合點
類似于團隊旅游暴心,過程中設(shè)置若干集合點,集合點集合完畢后進行后續(xù)活動杂拨。
比如在并行迭代計算中专普,每個線程負責(zé)一部分計算,然后在集合點等待其他線程完成弹沽,所有線程到齊后交換數(shù)據(jù)計算結(jié)果后進行下一次迭代檀夹。
3.1 生產(chǎn)者/消費者模式
有兩種方法。
3.1.2 第一種方法:在每個線程中使用協(xié)作策橘。
在生產(chǎn)者/消費者的線程中通過判斷隊列的狀態(tài)炸渡,自主的使用wait/notify.
這里的生產(chǎn)/消費過程while包裹在synchronized內(nèi),所以生產(chǎn)的時候由一個生產(chǎn)者生產(chǎn)到滿丽已,由一個消費者消費到0這種比較規(guī)律的現(xiàn)象蚌堵。
第二種實現(xiàn)方法(共享阻塞隊列)實現(xiàn)的方式,生產(chǎn)/消費過程synchronized被while包裹沛婴,因而每次生產(chǎn)消費后都會釋放鎖吼畏,所以出現(xiàn)了比較頻繁隨機的競爭現(xiàn)象。
消費者和生產(chǎn)者的核心代碼:
synchronized (linkedList){//消費者線程核心代碼
try {
while (true) {
if (linkedList.isEmpty()) {
OS.print(String.format("%s:緩存隊列為空嘁灯,通知生產(chǎn)者生產(chǎn)(notifyAll),暫停消費(wait)",getName()));
linkedList.notifyAll();
linkedList.wait();
} else {
String rs = linkedList.poll();
OS.print(String.format("%s:消費產(chǎn)品:%s,消費后緩存隊列size:%s",getName(),rs,linkedList.size()));
Thread.sleep(200);
}
}
} catch(InterruptedException e){
e.printStackTrace();
}
}
try {//生產(chǎn)者線程核心代碼
synchronized (linkedList){
int i = 0;
while(true){
if(linkedList.size()==max){
OS.print(String.format("%s:緩存隊列滿泻蚊,通知消費者啟動(notifyAll),暫停生產(chǎn)(wait)丑婿。", getName()));
linkedList.notifyAll();
linkedList.wait();
}else{
linkedList.offer(getName()+(++i));
OS.print(String.format("%s:繼續(xù)生產(chǎn):%s性雄,隊列當前長度(%s)", getName(),getName()+i, linkedList.size()));
Thread.sleep(500);
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
這里的例子使用的是await/notify,也可以用await/signal來實現(xiàn),具體實現(xiàn)大同小異羹奉。
4.1.2 在共享阻塞隊列中實現(xiàn)協(xié)作
即消費者線程和生產(chǎn)者線程通過阻塞隊列來實現(xiàn)協(xié)作毅贮。java提供的阻塞隊列如 ArrayBlockingQueue 等底層都是通過await機制實現(xiàn)的。
第一種實現(xiàn)便于理解wait的原理尘奏,但是第二種實現(xiàn)代碼更整潔一些滩褥,后續(xù)的場景只舉例使用第二種實現(xiàn)來實現(xiàn)。
4.2 同時開始(實現(xiàn)最大的并行性):閉鎖CountDownLatch也可以實現(xiàn)
主線程充當裁判員炫加,每個子線程充當一個運動員瑰煎。就位后,運動員子線程等待協(xié)作的共享變量是做為起跑信號俗孝。同樣有兩種實現(xiàn)酒甸。這里舉例使用第二種簡潔的實現(xiàn):
一個起跑信號類FireFlag。
public class FireFlag {
private volatile boolean fireFlag = false;
public synchronized void wait4Fire() throws InterruptedException {
while (!fireFlag){ wait(); }
}
public synchronized void fire(){
this.fireFlag = true;
notifyAll();
}
}
運動員類赋铝,start后調(diào)用wait4Fire準備就緒:
public class Racer extends Thread{
private FireFlag fireFlag;
Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
fireFlag.wait4Fire();
System.out.println(String.format("%s:開跑(%s)",getName(),System.nanoTime()+""));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
然后裁判員主線程插勤,通過調(diào)用fire方法開槍,運動員線程開始同時起跑。
這里的FireFlag使用閉鎖CountDownLatch也可以實現(xiàn)农尖,初始值設(shè)為1析恋,所有運動員線程調(diào)用CountDownLatch的await()
方法,主線程調(diào)用CountDownLatch的countDown()
盛卡,計數(shù)器變成0助隧,相當于裁判員開槍。
4.3 等待結(jié)束(主從協(xié)作) 及同步協(xié)作類:CountDownLatch
主從協(xié)作的一種情況滑沧,Thread的join方法就是一種等待結(jié)束并村,底層是實現(xiàn)利用了wait():
while(isAlive){
wait(0);
}
join的等待是線性的,需要逐一等待每個子線程滓技。
public class Run {
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
for (int i = 0;i<100;i++){
threads[i] = new JoinThread("Thread"+i,(int)(Math.random()*1000));
threads[i].start();
}
//join必須單獨循環(huán)一次哩牍,否則100個子線程就變成線性執(zhí)行了
//但是主線程的等待還是逐個等待的。一些join可能是在線程執(zhí)行完畢后調(diào)用的令漂。
for (int i=0;i<100;i++){
threads[i].join();
}
System.out.println("其他線程都執(zhí)行完畢了");
}
}
這里舉個并發(fā)執(zhí)行的例子姐叁,主線程與子線程寫作共享一個數(shù)count(WaitCount對象)來表示未完成的線程個數(shù)。
每執(zhí)行完畢一個減一洗显,當count==0時調(diào)用notify(此時只剩下當前線程和主線程外潜,可以使用notify)通知主線程停止wait。
WaitCountDownLatch對象代碼如下:
public class WaitCountDownLatch {
private int count;
WaitCountDownLatch(int count) {
this.count = count;
}
/**
* 阻塞等待計數(shù)器==0
* @throws InterruptedException
*/
public synchronized void await() throws InterruptedException {
while (count>0){
wait();
}
System.out.println(String.format("%s:count==0了挠唆,所有線程執(zhí)行完畢了",Thread.currentThread().getName()));
}
/**
* 任務(wù)執(zhí)行完畢后計數(shù)器減一
*/
public synchronized void countDown(){
count--;
System.out.println(String.format("%s:結(jié)束处窥,count(%s)數(shù)減一",Thread.currentThread().getName(),count));
if(count == 0){
notify();
}
}
}
WaitCountDownLatch 是一個用于同步協(xié)作的工具類,用于演示原理玄组。
jdk中提供了一個專門的同步類
CountDownLatch
,實際開發(fā)中應(yīng)該使用這個類滔驾。
使用用法和這里的WaitCountDownLatch類似,也提供了await()
和countDown()
這兩個方法俄讹。
100個子線程共用一個WaitCountDownLatch對象哆致,在run方法中調(diào)用中調(diào)用countDown()
讓計數(shù)器減一,主線程中啟動所有線程后調(diào)用await()
患膛。
4.4 異步結(jié)果(主從協(xié)作):Executor Future
主從協(xié)作的情況下摊阀,手工創(chuàng)建線程往往比較麻煩,一種常見的模式是異步調(diào)用踪蹬。
異步調(diào)用一般返回一個Future對象胞此,通過它可以獲得最終的結(jié)果。
在java中表示子任務(wù)的接口是Callable跃捣。
異步結(jié)果的主要邏輯在Executor的execute()
方法中:
創(chuàng)建callable線程漱牵,返回Future對象。Future的get()
方法的邏輯是疚漆,阻塞等待任務(wù)線程結(jié)束酣胀,結(jié)束后返回結(jié)果刁赦。
這里有一個自己實現(xiàn)的Executor的execute()
方法,包含F(xiàn)uture的get()
的實現(xiàn)的源碼,供參考原理:
public class MyExecutor {
public <V> MyFuture<V> execute(final Callable<V> task){
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task,lock);
thread.start();
return () -> {
synchronized (lock){
while (!thread.isDone()){
lock.wait();
}
}
if(thread.getException() != null){
throw thread.getException();
}
return thread.getResult();
};
}
}
4.5 集合點:柵欄 CyclicBarrier
各個線程分頭行動各自到達一個集合點闻镶,集合點等待所有線程到齊甚脉,交換數(shù)據(jù)后進行下一步動作。
這里舉一個具體的例子:公司部門10人約定去爬山儒溉,各自從家中出發(fā),到山底集合发钝,所有人到齊開始爬山顿涣。
每個員工對應(yīng)一個線程:
public class ClimberThread extends Thread {
private CyclicBarrier cyclicBarrier;
private int sleep;
public ClimberThread(String threadName, int sleep, CyclicBarrier cyclicBarrier) {
setName(threadName);
this.cyclicBarrier = cyclicBarrier;
this.sleep = sleep;
}
@Override
public void run() {
System.out.println(String.format("%s:我出發(fā)了",getName()));
try {
sleep(sleep);
System.out.println(String.format("%s:我已經(jīng)到達山底了,只到了(%s)人酝豪,我先等一下涛碑。",getName(),cyclicBarrier.getNumberWaiting()));
cyclicBarrier.await();
System.out.println(String.format("%s:所有人到齊了,開始爬山",getName()));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
主線程模擬10個員工從家中出發(fā):
public class Run {
public static void main(String[] args) {
int climberCount = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(climberCount);
for (int i = 0; i < climberCount; i++){
new ClimberThread("climber"+i,(int)(Math.random()*1000),cyclicBarrier).start();
}
System.out.println("攀登者們陸續(xù)從家里出發(fā)了……");
}
}