場景:對賬系統(tǒng)最近越來越慢寿弱,老板讓優(yōu)化犯眠,用戶通過在線商城下單,會生成電子訂單症革,保存在訂單庫筐咧;之后物流會生成派送單給用戶發(fā)貨,派送單保存在派送單庫地沮。為了防止漏派送或者重復派送嗜浮,對賬系統(tǒng)每天還會校驗是否存在異常訂單羡亩。
對賬系統(tǒng)代碼抽象:
while(存在未對賬訂單){
// 查詢未對賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
思路:
單線程改成多線程并發(fā)執(zhí)行。
我們創(chuàng)建了兩個線程T1和T2危融,并行執(zhí)行查詢未對賬訂單getPOrders()和查詢派送單getDOrders()這兩個操作畏铆。在主線程中執(zhí)行對賬操作check()和差異寫入save()兩個操作。
主線程需要等待線程T1和T2執(zhí)行完才能執(zhí)行check()和save()這兩個操作吉殃,為此我們通過調(diào)用T1.join()和T2.join()來實現(xiàn)等待辞居,當T1和T2線程退出時,調(diào)用T1.join()和T2.join()的主線程就會從阻塞態(tài)被喚醒蛋勺,從而執(zhí)行之后的check()和save()瓦灶。
while(存在未對賬訂單){
// 查詢未對賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2結束
T1.join();
T2.join();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
CountDownLatch實現(xiàn)線程等待
在while循環(huán)里面抱完,我們首先創(chuàng)建了一個CountDownLatch贼陶,計數(shù)器的初始值等于2,之后在pos = getPOrders();和dos = getDOrders();兩條語句的后面對計數(shù)器執(zhí)行減1操作巧娱,這個對計數(shù)器減1的操作是通過調(diào)用 latch.countDown(); 來實現(xiàn)的碉怔。在主線程中,我們通過調(diào)用 latch.await() 來實現(xiàn)對計數(shù)器等于0的等待禁添。
// 創(chuàng)建2個線程的線程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 計數(shù)器初始化為2
CountDownLatch latch =
new CountDownLatch(2);
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待兩個查詢操作結束
latch.await();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
再次思考:
兩次查詢操作能夠和對賬操作并行撮胧,對賬操作還依賴查詢操作的結果,這明顯有點生產(chǎn)者-消費者的意思老翘,兩次查詢操作是生產(chǎn)者芹啥,對賬操作是消費者。既然是生產(chǎn)者-消費者模型铺峭,那就需要有個隊列墓怀,來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),而消費者則從這個隊列消費數(shù)據(jù)逛薇。
一個線程T1執(zhí)行訂單的查詢工作捺疼,一個線程T2執(zhí)行派送單的查詢工作,當線程T1和T2都各自生產(chǎn)完1條數(shù)據(jù)的時候永罚,通知線程T3執(zhí)行對賬操作啤呼。
隱藏條件,就是線程T1和線程T2的工作要步調(diào)一致呢袱,不能一個跑得太快官扣,一個跑得太慢,只有這樣才能做到各自生產(chǎn)完1條數(shù)據(jù)的時候羞福,通知線程T3惕蹄。
CyclicBarrier實現(xiàn)線程同步
創(chuàng)建了一個計數(shù)器初始值為2的CyclicBarrier,創(chuàng)建CyclicBarrier的時候,我們還傳入了一個回調(diào)函數(shù)卖陵,當計數(shù)器減到0的時候遭顶,會調(diào)用這個回調(diào)函數(shù)。
線程T1負責查詢訂單泪蔫,當查出一條時棒旗,調(diào)用 barrier.await() 來將計數(shù)器減1,同時等待計數(shù)器變成0撩荣;線程T2負責查詢派送單铣揉,當查出一條時,也調(diào)用 barrier.await() 來將計數(shù)器減1餐曹,同時等待計數(shù)器變成0逛拱;當T1和T2都調(diào)用 barrier.await() 的時候,計數(shù)器會減到0台猴,此時T1和T2就可以執(zhí)行下一條語句了朽合,同時會調(diào)用barrier的回調(diào)函數(shù)來執(zhí)行對賬操作。
CyclicBarrier的計數(shù)器有自動重置的功能卿吐,當減到0的時候旁舰,會自動重置你設置的初始值。
// 訂單隊列
Vector<P> pos;
// 派送單隊列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 執(zhí)行對賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 循環(huán)查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對賬訂單){
// 查詢訂單庫
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循環(huán)查詢運單庫
Thread T2 = new Thread(()->{
while(存在未對賬訂單){
// 查詢運單庫
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
總結:
CountDownLatch主要用來解決一個線程等待多個線程的場景嗡官,可以類比旅游團團長要等待所有的游客到齊才能去下一個景點;而CyclicBarrier是一組線程之間互相等待毯焕,更像是幾個驢友之間不離不棄衍腥。除此之外CountDownLatch的計數(shù)器是不能循環(huán)利用的,也就是說一旦計數(shù)器減到0纳猫,再有線程調(diào)用await()婆咸,該線程會直接通過。但CyclicBarrier的計數(shù)器是可以循環(huán)利用的芜辕,而且具備自動重置的功能尚骄,一旦計數(shù)器減到0會自動重置到你設置的初始值。除此之外侵续,CyclicBarrier還可以設置回調(diào)函數(shù)倔丈。