[toc]
1 Future與Callable
使用Runnable接口有很大的局限性,他不能夠返回一個值或者一個受檢查的異常替劈。這種情況下寄雀,可以使用Callable<V>接口,其中V就是返回的結果陨献。
Future<V>用來接收callable結束后返回的結果盒犹。ExecuteService 的submit方法都是返回一個Future,可以利用Future獲取執(zhí)行的結果,同時可以利用Future取消任務湿故。任務生命周期 創(chuàng)建阿趁,提交膜蛔,執(zhí)行坛猪,結束。如果任務提交皂股,但是沒有執(zhí)行墅茉,可以使用Future 取消。
使用線程池(ThreadPoolExecute)的時候,ExecuteService的submit方法實際上使用的是AbstractExecutorService的submit就斤。查看源碼可以看到提交任務后返回的是 FutureTask<T>
示例:
package com.fun.concurrent;
import java.util.concurrent.*;
/**
* callable示例
*
* @author fun
* @date 2017-04-01 10:59
*/
public class FutureCallableDemo {
public static void main(String[] args) {
FutureCallableDemo test = new FutureCallableDemo();
ExecutorService executorService = Executors.newFixedThreadPool(2);
//do test
// test.testCallableTask(executorService);
test.testRunableTask(executorService);
executorService.shutdown();
}
public void testCallableTask(ExecutorService executorService) {
Callable<String> task = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("invoke method call, " + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println("method call is going to return, " + System.currentTimeMillis());
return "SUCCESS";
}
};
Future<String> future = executorService.submit(task);
System.out.println("main is going to get callable future result, " + System.currentTimeMillis());
try {
System.out.println("callable result = " + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main got the future result, " + System.currentTimeMillis());
}
public void testRunableTask(ExecutorService executorService) {
Runnable runTask = new Runnable() {
@Override
public void run() {
System.out.println("invoke method run, " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("method run is going to end, " + System.currentTimeMillis());
}
};
Future<Integer> runFuture = executorService.submit(runTask,new Integer(100));
// Future<?> runFuture = executorService.submit(runTask); // 這兩種方式區(qū)別悍募,打開注釋觀察
try {
System.out.println("main is going to get runnable future result, " + System.currentTimeMillis());
System.out.println("runnable result = " + runFuture.get());
System.out.println("main got the future result, " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
執(zhí)行會發(fā)現future.get() 方法是一個阻塞的方法,一直等到任務執(zhí)行完成得到結果洋机。
思考:看上面例子可以看到一個問題坠宴,runnable的任務和callable的任務都是可以返回Futrue的,那么他們有什么區(qū)別呢 绷旗?
觀察不難發(fā)現喜鼓,Callable<V> 返回結果是在call方法執(zhí)行完成后返回的,他返回什么結果可以是call里面的計算得到的衔肢,類型為V即可庄岖。他的結果是可變的,程序運行返回的是什么就是什么角骤。
但是Runable的返回結果只是提前定義的一個結果隅忿,可預期正確執(zhí)行后的一個結果。他的結果在任務提交的時候已經決定了具體的值邦尊。
2 原子類 atomic
原子類是如何保證原子操作的背桐?
回答這個問題之前,先一起來看一個原子類的源碼
// AtomicInteger部分源碼
// AtomicInteegr 加法操作
public final int getAndAdd(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
這段代碼很關鍵的一個地方就是compareAndSwap (CAS) 蝉揍,每次操作(寫)之前牢撼,先比較一下值,確認沒有被改過疑苫,才寫數據熏版。在compareAndSet的注釋上面也很清楚的可以看到,只有當cuurent value==expect value的時候捍掺,才把value更新成update value撼短。整個執(zhí)行的原則就是: 先檢查后執(zhí)行
而compareAndSwapXXX是一個native的方法,是虛擬機底層的實現挺勿。
測試示例:
package com.fun.concurrent;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 原子類操作
*
* @author fun
* @date 2017-04-01 12:07
*/
public class AtomicClassDemo {
public static void main(String[] args) {
AtomicInteger aint = new AtomicInteger(100);
boolean b = aint.compareAndSet(100,200); // 修改expect與當前值不同測試
if (b) {
System.out.println(aint.get());
}
System.out.println(aint.getAndAdd(100));
System.out.println(aint.get());
System.out.println("-------------------");
AtomicClassDemo ref1 = new AtomicClassDemo();
AtomicClassDemo ref2 = new AtomicClassDemo();
System.out.println("ref1=" + ref1);
System.out.println("ref2=" + ref2);
AtomicReference<AtomicClassDemo> ref3 = new AtomicReference<>(ref1);
System.out.println("ref3 before set=" + ref3);
boolean b2 = ref3.compareAndSet(ref2,ref1); // 修改expect為ref1測試
System.out.println(b2);
System.out.println("ref3 after set =" + ref3);
}
}
/**
output:
200
200
300
-------------------
ref1=com.fun.concurrent.AtomicClassDemo@74a14482
ref2=com.fun.concurrent.AtomicClassDemo@1540e19d
ref3 before set=com.fun.concurrent.AtomicClassDemo@74a14482
false
ref3 after set =com.fun.concurrent.AtomicClassDemo@74a14482
*/
3 lock與ReentrantLock
此小節(jié)重點學習下ReentrantLock,區(qū)別于內置鎖曲横,ReentrantLock是一個顯示鎖。他那有那些特性呢不瓶?
3.1 輪詢鎖和定時鎖
使用tryLock() 方法可以在不能獲取到鎖的情況下禾嫉,使用定時或者輪詢的方式獲取所,執(zhí)行時間內沒有完成就釋放鎖蚊丐,平滑的退出任務熙参。而內置鎖會阻塞在獲取鎖的地方,一旦操作不當就可能發(fā)生死鎖麦备,如果出現死鎖了孽椰,唯一的解決辦法就是重啟程序昭娩。使用定時或者輪詢鎖可以有效的避免死鎖的問題。
轉賬示例:
package com.fun.concurrent;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by fun
* @date 2017/4/12.
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
ReentrantLockDemo test = new ReentrantLockDemo();
Account fromAcct = test.newAccount(4000);
Account toAcct = test.newAccount(1000);
test.transferMoney(fromAcct,toAcct,1000L,20000,TimeUnit.NANOSECONDS);
System.out.println("fromAccount balance=" + fromAcct.getBalance() + "\ntoAccount balance=" + toAcct.getBalance());
}
// 轉賬示例
public boolean transferMoney(Account fromAcct,
Account toAcct,
long amount,
long timeout,
TimeUnit unit) {
long stopTime = System.nanoTime() + unit.toNanos(timeout); // 超時時間
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
boolean rs = false;
if (fromAcct.debit(amount) ){
rs = toAcct.credit(amount);
}
return rs;
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() > stopTime) { // 如果已經超時了黍匾,就直接返回栏渺,提前結束任務
return false;
}
try {
long x = new Random().nextInt(50) + 10;
Thread.sleep(timeout/x ); // 過一會兒再嘗試下一次獲取鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
protected class Account {
private Lock lock = new ReentrantLock();
private long balance;
public boolean debit(long amount) {
if (balance < amount) {
return false;
}
balance = balance - amount;
return true;
}
public boolean credit(long amount) {
balance = balance + amount;
return true;
}
/*getter and setter*/
}
public Account newAccount(long balance) {
Account account = new Account();
if (balance > 0) {
account.setBalance(balance);
} else {
account.setBalance(0);
}
return account;
}
}
在幾次獲得鎖的地方,都是trylock, 在多線程的情況下锐涯,如果沒有獲得鎖的時候磕诊,線程并不會阻塞,而是之后往后面運行纹腌。進入判斷是否超時的語句秀仲。如果超時就退出,可以避免等待加鎖可能出現的死鎖問題壶笼。
注意:但是使用顯示鎖人的時候神僵,很容易在編寫程序的時候忘記了釋放鎖,切記覆劈,在使用顯示鎖的時候一定要在try-finally 的finally里面對鎖進行釋放保礼。
3.2 可中斷的鎖操作
lockInterruptibly()方法,可中斷的獲取鎖的方式,在獲取鎖的過程中可能被中斷责语,方法本身是可以拋出InterruptException的
// 此方法源碼
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
他的使用和不同lock和tryLock一樣炮障,只是需要在外面處理lockInterruptibly的InterruptException.
3.3 非塊結構加鎖
對鏈表上的每個節(jié)點單獨建立鎖,使不同的線程可以獨立的對鏈表的不同部分進行操作坤候。所得很模糊胁赢,需要結合ConcurrentHashMap理解
3.3 公平性
公平性是在競爭資源時候的一種策略,大部分情況都是使用公平原則來獲取鎖白筹,例如:FIFO 隊列智末。但是,有時候前面的操作比較耗時的時候徒河,會拖慢整個處理速率系馆,這個時候不公平原則可以提前獲得鎖。例如顽照,線程A獲得一個對象的鎖由蘑,現線程B和C都在等待這個鎖,當A釋放鎖的時候代兵,如果B喚醒的時間比較的久尼酿,此時C先獲得鎖,使用了并釋放了植影,B剛好喚醒裳擎,獲得鎖。這個過程B的時間沒有耽誤何乎。同時C也處理了句惯,增加了吞吐量土辩。但是支救,我還要說但是抢野。使用不公平性的時候同樣會有問題,不公平的比較極端的情況就是找出一個線程一直拿不到鎖各墨,一直等待指孤。所以使用時候需要權衡和控制。
ReentrantLock 可以設置不保證公平性贬堵。
4 CountDownLatch & Semaphore
4.1 CountDownLatch
CountDownLatch 有什么作用呢 恃轩?它就是一個同步助手,它能夠讓一個或者多個線程等到另外的線程完成一系列的操作之后再執(zhí)行黎做。
內部有一個倒數計數器叉跛,當倒數計數器減到0的時候釋放鎖。
先看示例:
package com.fun.concurrent;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch 使用和測試
*
* @author fun
* @date 2017-04-13 17:50
*/
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatchDemo test = new CountDownLatchDemo();
int N = 10;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
new Thread(test.newWorker(startSignal,doneSignal)).start();
}
try {
System.out.println("do something else 1");
startSignal.countDown();
System.out.println("do something else 2");
doneSignal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Worker newWorker(CountDownLatch startSignal, CountDownLatch doneSignal){
return new Worker(startSignal,doneSignal);
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("do something...");
}
}
}
此示例中蒸殿,所有Runnable共享一個startSignal 和 一個 doneSignal筷厘。CountDownLatch的 await() 方法等待計數器變?yōu)?在喚醒。而在執(zhí)行countDown() 方法的時候宏所,每次countDown
就會是計數器減一酥艳,知道減少為0才釋放。
查看源碼中countDown 的過程
// countDown
public void countDown() {
sync.releaseShared(1);
}
// sync.releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// tryReleaseShared 在countDownLatch中的Sync中有重寫父類的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
很顯然爬骤,只有當count=0的時候才會釋放鎖充石。在回頭看上面的示例程序,分析如下:
- startSignal.countDown(); Runnable任務線程創(chuàng)建并start了霞玄,但是出于wait狀態(tài)骤铃,等待計數器變?yōu)?,次代碼操作把計數器減一變?yōu)?坷剧,所有任務開始工作劲厌。
- System.out.println("do something else 2");主線程忙其他事情
- doneSignal.await();所有任務完成之前,主線程從此處開始阻塞(掛起等待)听隐。doneSignal計數器不變?yōu)?补鼻,主線程一直掛起。完成一個任務雅任,count-1,直到所有任務完成风范,count=0,主線程醒過來并完成后面的動作。
利用CountDownLacth 能做很多事沪么,例如進項大數據的一個累加硼婿,可以分成多個線程處理,然后在主線程中合并(累加)多個任務的結果禽车。增加處理速率(這個有點像MapReduce的思想)寇漫。
思考: 其實看了CountDownLatch 之后刊殉,發(fā)現和volatile+synchronized效果很像。完全可以控制一個volatile的count變量等于任務數州胳,完成一個任務记焊,count-1,主線程wait,等到count=0。 效果差不多栓撞。但是代碼實現上就較CountDownLatch 復雜點遍膜。所有有類似這樣的功能,應該優(yōu)先想到CountDownLatch
4.2 Semaphore
使用信號量的時候瓤湘,一個線程想要獲得一個item,必須要先從Semaphore那里獲得許可(permit)瓢颅,保證item是可用的。當線程完成任務的時候弛说,在向pool歸還item同時還需要向Semaphore歸還許可挽懦,以便其他線程可以使用item。需要注意的是木人,當調用acquire的時候信柿,不需要額外加鎖限制,因為這樣將會阻止item被歸還到pool虎囚。 實際上Semaphore已經封裝了同步鎖來保證item的獲取角塑,并且pool對每個item有單獨的維護。
當Semaphore被初始化成一個并且只有之多一個許可的時候淘讥,他就表現成了一個互斥鎖圃伶。這個更像一個Binary Semaphore一樣,因為他只有兩個狀態(tài): 有一個可用許可蒲列,沒有可用許可窒朋。當以這種方式使用的時候,semaphore 和其他Lock的實現不同蝗岖,他能夠允許鎖被線程本身釋放侥猩,而不是鎖的所有者。
Semaphore 構造方法有個釋放使用公平鎖的方式抵赢。當使用不公平鎖的時候欺劳,不保證獲取許可的先后順序。公平性在前文有說過铅鲤,有時候可以提高吞吐量划提,避免一直等待。但是也同樣也會出現一直獲取不到鎖也進去一直等待邢享。
Semaphore可以設定一個閾值鹏往,基于此,多個線程競爭獲取許可信號骇塘,做完自己的申請后歸還伊履,超過閾值后韩容,線程申請許可信號將會被阻塞。Semaphore可以用來構建一些對象池唐瀑,資源池之類的群凶,比如數據庫連接池
實現分析:
Semaphore 實現和CountDownLatch有幾分相似:
- CountDownLatch里面有個count計數器,每次操作countDown 則count = count-1 ,當count==0 的時候才釋放所
- Semaphore 則是內部維護一個available的數量介褥,每次減去獲取permits的數量座掘,得到剩余的數量递惋,釋放鎖的時候available加上歸還的permit的數量柔滔。獲取的鎖的過程就是對available減操作,release則是加回對available的操作萍虽。
Semaphore默認實現是不公平性的睛廊,就以不公平性的實現來看嗎:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
選取源碼中關鍵的兩個方法嗎,加鎖和解鎖杉编。
- 獲取permit: 先使用getState后去當前可用的permit的數量超全,剩余數量=當前數量-申請的數量,然后再執(zhí)行CAS設置狀態(tài),并返回剩余可用數量邓馒。
- 釋放鎖:釋放鎖的過程時間就是歸還permit可用數量的過程嘶朱。當前可用數量+歸還數量<當前數量的是時候,或者已經溢出光酣,歸還數量為負數了疏遏,如果current + releases >=cuurent,執(zhí)行CAS 設置狀態(tài)值。
5 ConcurrentHashMap
使用分段鎖(Lock striping)的方式救军,使鎖的粒度更細來實現更大程度的共享财异,提高并發(fā)性和伸縮性。
鎖分段(Lock striping):在某些情況下唱遭,可以將鎖分解技術進一步擴展為對一組獨立對象上的鎖進行分解戳寸。這種情況被稱為鎖分段。
例如: 在concurrentHashMap 中使用一個包含16個鎖的數組拷泽,每個鎖保護一個散列桶疫鹊,其中第N個散列桶由第(N mod 16)個鎖來寫入。假設散列函數具有合理性司致,并且關鍵字分布均勻拆吆,那么這大約能都把對于鎖的請求減少到原來1/16。正是這項技術使得ConcurrentHashMap能夠支持多大16個并發(fā)的寫入器蚌吸。
鎖分段劣勢: 與采用單個鎖來實現獨占訪問相比锈拨,要獲取多個鎖來實現獨占訪問將更加困難并且開銷更高。例如:ConcurrentHashMap在擴容的時候羹唠,以及重新計算Hash并且重新散列分布時候奕枢,都需要獲取所有鎖娄昆,實現整個Map的獨占訪問。