一琅豆、背景-對(duì)賬系統(tǒng)處理邏輯
用戶通過在線商城下單榔幸,會(huì)生成電子訂單允乐,保存在訂單庫;之后物流會(huì)生成派送單給用戶發(fā)貨削咆,派送單保存在派送單庫牍疏。為了防止漏派送或者重復(fù)派送,對(duì)賬系統(tǒng)每天還會(huì)校驗(yàn)是否存在異常訂單拨齐。
- 查詢訂單
- 查詢派送單
- 對(duì)比訂單和派送單
- 將差異寫入差異庫
while(存在未對(duì)賬訂單){
// 查詢未對(duì)賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
二鳞陨、利用并行優(yōu)化對(duì)賬系統(tǒng)
查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 采用并行處理;因?yàn)檫@兩個(gè)操作并沒有先后順序的依賴瞻惋。這兩個(gè)最耗時(shí)的操作并行之后厦滤,執(zhí)行過程如下圖所示。對(duì)比單線程的執(zhí)行熟史,同等時(shí)間里馁害,并行執(zhí)行的吞吐量近乎單線程的 2 倍。
while(存在未對(duì)賬訂單){
// 查詢未對(duì)賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1蹂匹、T2結(jié)束
T1.join();
T2.join();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
創(chuàng)建了兩個(gè)線程 T1 和 T2碘菜,并行執(zhí)行查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 這兩個(gè)操作。在主線程中執(zhí)行對(duì)賬操作 check() 和差異寫入 save() 兩個(gè)操作限寞。
主線程需要等待線程 T1 和 T2 執(zhí)行完才能執(zhí)行 check() 和 save() 這兩個(gè)操作忍啸,通過調(diào)用 T1.join() 和 T2.join() 來實(shí)現(xiàn)等待,當(dāng) T1 和 T2 線程退出時(shí)履植,調(diào)用 T1.join() 和 T2.join() 的主線程就會(huì)從阻塞態(tài)被喚醒计雌,從而執(zhí)行之后的 check() 和 save()。
三玫霎、CountDownLatch 實(shí)現(xiàn)線程等待
定義
CountDownLatch可以設(shè)置一個(gè)計(jì)數(shù)器凿滤,通過countDown()方法進(jìn)行減1操作妈橄,使用await()方法等待計(jì)數(shù)器不大于0,再繼續(xù)執(zhí)行await()方法之后的語句翁脆。
并行執(zhí)行的方案中眷蚓,while 循環(huán)里面每次都會(huì)創(chuàng)建新的線程,而創(chuàng)建線程可是個(gè)耗時(shí)的操作反番。所以最好是創(chuàng)建出來的線程能夠循環(huán)利用沙热,所以可以使用線程池就能解決這個(gè)問題。
創(chuàng)建一個(gè)固定大小為 2 的線程池罢缸,之后在 while 循環(huán)里重復(fù)利用篙贸。
主線程如何知道 getPOrders() 和 getDOrders() 這兩個(gè)操作什么時(shí)候執(zhí)行完。前面主線程通過調(diào)用線程 T1 和 T2 的 join() 方法來等待線程 T1 和 T2 退出枫疆,但是在線程池的方案里爵川,線程根本就不會(huì)退出,所以 join() 方法已經(jīng)失效了养铸。最直接的辦法是弄一個(gè)計(jì)數(shù)器雁芙,初始值設(shè)置成 2,當(dāng)執(zhí)行完pos = getPOrders();這個(gè)操作之后將計(jì)數(shù)器減 1钞螟,執(zhí)行完dos = getDOrders();之后也將計(jì)數(shù)器減 1,在主線程里谎碍,等待計(jì)數(shù)器等于 0鳞滨;當(dāng)計(jì)數(shù)器等于 0 時(shí),說明這兩個(gè)查詢操作執(zhí)行完了蟆淀。等待計(jì)數(shù)器等于 0 其實(shí)就是一個(gè)條件變量拯啦,用管程實(shí)現(xiàn)起來也很簡單。
使用CountDownLatch實(shí)現(xiàn):
// 創(chuàng)建2個(gè)線程的線程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對(duì)賬訂單){
// 計(jì)數(shù)器初始化為2
CountDownLatch latch = new CountDownLatch(2);
// 查詢未對(duì)賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待兩個(gè)查詢操作結(jié)束
latch.await();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
在 while 循環(huán)里面熔任,創(chuàng)建了一個(gè) CountDownLatch褒链,計(jì)數(shù)器的初始值等于 2,之后在pos = getPOrders();和dos = getDOrders();兩條語句的后面對(duì)計(jì)數(shù)器執(zhí)行減 1 操作疑苔,這個(gè)對(duì)計(jì)數(shù)器減 1 的操作是通過調(diào)用 latch.countDown(); 來實(shí)現(xiàn)的甫匹。在主線程中,調(diào)用 latch.await() 來實(shí)現(xiàn)對(duì)計(jì)數(shù)器等于 0 的等待惦费。
基本使用:
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
test();
}
private static void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
log.debug("waiting...");
latch.await();
log.debug("wait end...");
}
}
配合線程池:
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
test();
}
private static void test() {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(()->{
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
17:33:09.312 [pool-1-thread-3] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-1] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-2] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-4] DEBUG juc.cdl.TestCountDownLatch - waiting...
17:33:10.316 [pool-1-thread-1] DEBUG juc.cdl.TestCountDownLatch - end...2
17:33:10.818 [pool-1-thread-2] DEBUG juc.cdl.TestCountDownLatch - end...1
17:33:11.319 [pool-1-thread-3] DEBUG juc.cdl.TestCountDownLatch - end...0
17:33:11.319 [pool-1-thread-4] DEBUG juc.cdl.TestCountDownLatch - wait end...
如果想要獲取任務(wù)的結(jié)果兵迅,需要使用future。
四薪贫、CyclicBarrier 實(shí)現(xiàn)線程同步
定義
CyclicBarrier構(gòu)造方法第一個(gè)參數(shù)是目標(biāo)障礙數(shù)恍箭,如果達(dá)到了目標(biāo)障礙數(shù),才會(huì)執(zhí)行cyclicBarrier.await()之后的語句瞧省〕敦玻可以將CyclicBarrier理解為+1操作鳍贾。
demo1:
@Slf4j
public class TestCyclicBarrier {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task2 finish...");
});
for (int i = 0; i < 3; i++) { // task1 task2 task1
service.submit(() -> {
log.debug("task1 begin...");
sleep(1);
try {
barrier.await(); // 2-1=1
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
log.debug("task2 begin...");
sleep(2);
try {
barrier.await(); // 1-1=0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
private static void test1() {
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 3; i++) {
CountDownLatch latch = new CountDownLatch(2);
service.submit(() -> {
log.debug("task1 start...");
sleep(1);
latch.countDown();
});
service.submit(() -> {
log.debug("task2 start...");
sleep(2);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("task1 task2 finish...");
}
service.shutdown();
}
}
demo2:
前面將 getPOrders() 和 getDOrders() 這兩個(gè)查詢操作并行了,但這兩個(gè)查詢操作和對(duì)賬操作 check()交洗、save() 之間還是串行的骑科。很顯然,這兩個(gè)查詢操作和對(duì)賬操作也是可以并行的藕筋,也就是說纵散,在執(zhí)行對(duì)賬操作的時(shí)候,可以同時(shí)去執(zhí)行下一輪的查詢操作:
兩次查詢操作能夠和對(duì)賬操作并行隐圾,對(duì)賬操作還依賴查詢操作的結(jié)果伍掀,類似生產(chǎn)者 - 消費(fèi)者,兩次查詢操作是生產(chǎn)者暇藏,對(duì)賬操作是消費(fèi)者蜜笤。既然是生產(chǎn)者 - 消費(fèi)者模型,那就需要有個(gè)隊(duì)列盐碱,來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù)把兔,而消費(fèi)者則從這個(gè)隊(duì)列消費(fèi)數(shù)據(jù)。
訂單查詢操作將訂單查詢結(jié)果插入訂單隊(duì)列瓮顽,派送單查詢操作將派送單插入派送單隊(duì)列县好,這兩個(gè)隊(duì)列的元素之間是有一一對(duì)應(yīng)的關(guān)系的。兩個(gè)隊(duì)列的好處是暖混,對(duì)賬操作可以每次從訂單隊(duì)列出一個(gè)元素缕贡,從派送單隊(duì)列出一個(gè)元素,然后對(duì)這兩個(gè)元素執(zhí)行對(duì)賬操作拣播,這樣數(shù)據(jù)一定不會(huì)亂掉晾咪。
用雙隊(duì)列來實(shí)現(xiàn)完全的并行:
一個(gè)線程 T1 執(zhí)行訂單的查詢工作,一個(gè)線程 T2 執(zhí)行派送單的查詢工作贮配,當(dāng)線程 T1 和 T2 都各自生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候谍倦,通知線程 T3 執(zhí)行對(duì)賬操作。線程 T1 和線程 T2 的工作要步調(diào)一致泪勒,不能一個(gè)跑得太快昼蛀,一個(gè)跑得太慢,只有這樣才能做到各自生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候酣藻,通知線程 T3曹洽。
線程 T1 和線程 T2 只有都生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候,才能一起向下執(zhí)行辽剧,也就是說送淆,線程 T1 和線程 T2 要互相等待,步調(diào)要一致怕轿;同時(shí)當(dāng)線程 T1 和 T2 都生產(chǎn)完一條數(shù)據(jù)的時(shí)候偷崩,還要能夠通知線程 T3 執(zhí)行對(duì)賬操作辟拷。
實(shí)現(xiàn)方案
- 線程 T1 和 T2 要做到步調(diào)一致
- 要能夠通知到線程 T3
利用一個(gè)計(jì)數(shù)器來解決這兩個(gè)難點(diǎn),計(jì)數(shù)器初始化為 2阐斜,線程 T1 和 T2 生產(chǎn)完一條數(shù)據(jù)都將計(jì)數(shù)器減 1衫冻,如果計(jì)數(shù)器大于 0 則線程 T1 或者 T2 等待。如果計(jì)數(shù)器等于 0谒出,則通知線程 T3隅俘,并喚醒等待的線程 T1 或者 T2,與此同時(shí)笤喳,將計(jì)數(shù)器重置為 2为居,這樣線程 T1 和線程 T2 生產(chǎn)下一條數(shù)據(jù)的時(shí)候就可以繼續(xù)使用這個(gè)計(jì)數(shù)器了。
使用CyclicBarrier實(shí)現(xiàn)
// 訂單隊(duì)列
Vector<P> pos;
// 派送單隊(duì)列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 執(zhí)行對(duì)賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 循環(huán)查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對(duì)賬訂單){
// 查詢訂單庫
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循環(huán)查詢運(yùn)單庫
Thread T2 = new Thread(()->{
while(存在未對(duì)賬訂單){
// 查詢運(yùn)單庫
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
創(chuàng)建一個(gè)計(jì)數(shù)器初始值為 2 的 CyclicBarrier杀狡,創(chuàng)建 CyclicBarrier 的時(shí)候蒙畴,傳入了一個(gè)回調(diào)函數(shù),當(dāng)計(jì)數(shù)器減到 0 的時(shí)候呜象,會(huì)調(diào)用這個(gè)回調(diào)函數(shù)膳凝。
線程 T1 負(fù)責(zé)查詢訂單,當(dāng)查出一條時(shí)恭陡,調(diào)用 barrier.await() 來將計(jì)數(shù)器減 1蹬音,同時(shí)等待計(jì)數(shù)器變成 0;線程 T2 負(fù)責(zé)查詢派送單休玩,當(dāng)查出一條時(shí)祟绊,也調(diào)用 barrier.await() 來將計(jì)數(shù)器減 1,同時(shí)等待計(jì)數(shù)器變成 0哥捕;當(dāng) T1 和 T2 都調(diào)用 barrier.await() 的時(shí)候,計(jì)數(shù)器會(huì)減到 0嘉熊,此時(shí) T1 和 T2 就可以執(zhí)行下一條語句了遥赚,同時(shí)會(huì)調(diào)用 barrier 的回調(diào)函數(shù)來執(zhí)行對(duì)賬操作。
CyclicBarrier 的計(jì)數(shù)器有自動(dòng)重置的功能阐肤,當(dāng)減到 0 的時(shí)候凫佛,會(huì)自動(dòng)重置設(shè)置的初始值.
五、總結(jié)
CountDownLatch 和 CyclicBarrier 是 Java 并發(fā)包提供的兩個(gè)非常易用的線程同步工具類:
- CountDownLatch 主要用來解決一個(gè)線程等待多個(gè)線程的場(chǎng)景孕惜,可以類比旅游團(tuán)團(tuán)長要等待所有的游客到齊才能去下一個(gè)景點(diǎn)愧薛;
- CountDownLatch 的計(jì)數(shù)器不能循環(huán)利用的,一旦計(jì)數(shù)器減到 0衫画,再有線程調(diào)用 await()毫炉,該線程會(huì)直接通過。
- CyclicBarrier 是一組線程之間互相等待削罩,更像是幾個(gè)驢友之間不離不棄瞄勾。
- CyclicBarrier 的計(jì)數(shù)器是可以循環(huán)利用的费奸,而且具備自動(dòng)重置的功能,一旦計(jì)數(shù)器減到 0 會(huì)自動(dòng)重置到設(shè)置的初始值进陡。CyclicBarrier 還可以設(shè)置回調(diào)函數(shù)愿阐。