前言
最近在看并發(fā)編程藝術(shù)這本書膛堤,對看書的一些筆記及個人工作中的總結(jié)奋构。
Callable和Future使用
Future模式非常適合在處理很耗時很長的業(yè)務(wù)邏輯時進(jìn)行使用壳影,可以有效的減小系統(tǒng)的響應(yīng)時間,提高系統(tǒng)的吞吐量弥臼。
//get方法的作用之前也講過了宴咧,阻塞直到異步執(zhí)行結(jié)果返回才不阻塞,繼續(xù)往下執(zhí)行
public class TestFuture implements Callable<String> {
private String para;
public TestFuture(String para){
this.para = para;
}
/**
* 這里是真實的業(yè)務(wù)邏輯径缅,其執(zhí)行可能很慢
*/
@Override
public String call() throws Exception {
//模擬執(zhí)行耗時,模擬的是db操作掺栅,para就是查詢條件
Thread.sleep(5000);
String result = this.para + "處理完成";
System.out.println("執(zhí)行成功了!D芍怼Q跷浴!氏堤!");
return result;
}
//主控制函數(shù)
public static void main(String[] args) throws Exception {
String queryStr = "query";
//構(gòu)造FutureTask沙绝,并且傳入需要真正進(jìn)行業(yè)務(wù)邏輯處理的類,該類一定是實現(xiàn)了Callable接口的類
FutureTask<String> future = new FutureTask<>(new TestFuture(queryStr));
FutureTask<String> future2 = new FutureTask<>(new TestFuture(queryStr));
//創(chuàng)建一個固定線程的線程池且線程數(shù)為2,
ExecutorService executor = Executors.newFixedThreadPool(2);
//這里提交任務(wù)future,則開啟線程執(zhí)行RealData的call()方法執(zhí)行
//submit和execute的區(qū)別: 第一點是submit可以傳入實現(xiàn)Callable接口的實例對象, 第二點是submit方法有返回值
Future f1 = executor.submit(future); //單獨啟動一個線程去執(zhí)行的
Future f2 = executor.submit(future2);
//f1.get() == null的時候說明已經(jīng)取到了結(jié)果鼠锈,執(zhí)行到了最后
System.out.println("請求完畢");
try {
//這里可以做額外的數(shù)據(jù)操作闪檬,也就是主程序執(zhí)行其他業(yè)務(wù)邏輯
System.out.println("處理實際的業(yè)務(wù)邏輯...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//調(diào)用獲取數(shù)據(jù)方法,如果call()方法沒有執(zhí)行完成,則依然會進(jìn)行等
//System.out.println("數(shù)據(jù):" + future.get());
//System.out.println("數(shù)據(jù):" + future2.get());
executor.shutdown();
}
}
CountDownLatch
//join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實現(xiàn)原理是不停檢查,join線程是否存活购笆,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待谬以。
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(() -> {
});
Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實現(xiàn)原理是不停檢查,join線程是否存活由桌,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。直到j(luò)oin線程中止后,線程的this.notifyAll()方法會被調(diào)用
使用CountDownLatch也可以實現(xiàn)
public class CountDownLatchTest {
//參數(shù)是2表示對象執(zhí)行2次countDown方法才能釋放鎖
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}).start();
c.await(); //線程等待
System.out.println("3");
}
}
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)行您。它要做的事情是,讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞娃循,直到最后一個線程到達(dá)屏障時炕檩,屏障才會開門捌斧,所有被屏障攔截的線程才會繼續(xù)運行妇押。
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
CyclicBarrier還提供一個更高級的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action)肩杈,用于在線程到達(dá)屏障時解寝,優(yōu)先執(zhí)行barrierAction扩然,方便處理更復(fù)雜的業(yè)務(wù)場景
//先執(zhí)行線程A
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
執(zhí)行結(jié)果:
3
1
2
** CyclicBarrier的應(yīng)用場景 **
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景聋伦。例如夫偶,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水嘉抓,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水索守,先用多線程處理每個sheet里的銀行流水,都執(zhí)行完之后抑片,得到每個sheet的日均銀行流水卵佛,最后,再用barrierAction用這些線程的計算結(jié)果敞斋,計算出整個Excel的日均銀行流水截汪,
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置植捎。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景衙解。例如,如果計算發(fā)生錯誤焰枢,可以重置計數(shù)器蚓峦,并讓線程重新執(zhí)行一次舌剂。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量暑椰。isBroken()方法用來了解阻塞的線程是否被中斷霍转。
public class CyclicBarrierTest3 {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
Thread thread = new Thread(() -> {
try {
c.await();
} catch (Exception e) {
}
});
thread.start();
thread.interrupt();
try {
c.await();
} catch (Exception e) {
System.out.println(c.isBroken());
}
}
}
Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程一汽,以保證合理的使用公共資源避消。
Semaphore可以控制系統(tǒng)的流量,拿到信號量的線程可以進(jìn)入召夹,否則就等待岩喷。通過acquire()和release()獲取和釋放訪問許可。
public class TestSemaphore {
public static void main(String[] args) {
// 線程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5個線程同時訪問
final Semaphore semp = new Semaphore(5);
// 模擬20個客戶端訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = () -> {
try {
// 獲取許可
semp.acquire();
System.out.println("Accessing: " + NO);
//模擬實際業(yè)務(wù)邏輯
Thread.sleep((long) (Math.random() * 10000));
// 訪問完后监憎,釋放
semp.release();
} catch (InterruptedException e) {
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(semp.getQueueLength());
// 退出線程池
exec.shutdown();
}
}
Semaphore可以用于做流量控制纱意,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接枫虏。假如有一個需求淋叶,要讀取幾萬個文件的數(shù)據(jù)因惭,因為都是IO密集型任務(wù),我們可以啟動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后报亩,還需要存儲到數(shù)據(jù)庫中章蚣,而數(shù)據(jù)庫的連接數(shù)只有10個推励,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù)父晶,否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候赞警,就可以使用Semaphore來做流量控制
//線程池中有30個線程妓忍,但是只有10個線程可以獲得許可
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire(); //獲取許可證
System.out.println("save data");
Thread.sleep(1000);
s.release(); //釋放許可證
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
其他方法
Semaphore:還提供一些其他方法,具體如下愧旦。
intavailablePermits():返回此信號量中當(dāng)前可用的許可證數(shù)世剖。
intgetQueueLength():返回正在等待獲取許可證的線程數(shù)。
booleanhasQueuedThreads():是否有線程正在等待獲取許可證笤虫。
void reducePermits(int reduction):減少reduction個許可證旁瘫,是個protected方法。
Collection getQueuedThreads():返回所有等待獲取許可證的線程集合琼蚯,是個protected方法酬凳。
線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換遭庶。它提供一個同步點宁仔,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)峦睡。這兩個線程通過exchange方法交換數(shù)據(jù)翎苫,如果第一個線程先執(zhí)行exchange()方法权埠,它會一直等待第二個線程也執(zhí)行exchange方法,當(dāng)兩個線程都到達(dá)同步點時拉队,這兩個線程就可以交換數(shù)據(jù)弊知,將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "銀行流水A";// A錄入銀行流水?dāng)?shù)據(jù)
Thread.sleep(4000);
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "銀行流水B";// B錄入銀行流水?dāng)?shù)據(jù)
String A = exgr.exchange("B"); //得到A值
System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + "粱快,A錄入的是:" + A + ",B錄入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果兩個線程有一個沒有執(zhí)行exchange()方法叔扼,則會一直等待事哭,如果擔(dān)心有特殊情況發(fā)生,避免一直等待瓜富,可以使用exchange(V x鳍咱,longtimeout,TimeUnit unit)設(shè)置最大等待時長与柑。