CountDownLatch和CyclicBarrier

一琅豆、背景-對(duì)賬系統(tǒng)處理邏輯

用戶通過在線商城下單榔幸,會(huì)生成電子訂單允乐,保存在訂單庫;之后物流會(huì)生成派送單給用戶發(fā)貨削咆,派送單保存在派送單庫牍疏。為了防止漏派送或者重復(fù)派送,對(duì)賬系統(tǒng)每天還會(huì)校驗(yàn)是否存在異常訂單拨齐。

對(duì)賬系統(tǒng)流程.png
  • 查詢訂單
  • 查詢派送單
  • 對(duì)比訂單和派送單
  • 將差異寫入差異庫
while(存在未對(duì)賬訂單){
  // 查詢未對(duì)賬訂單
  pos = getPOrders();
  // 查詢派送單
  dos = getDOrders();
  // 執(zhí)行對(duì)賬操作
  diff = check(pos, dos);
  // 差異寫入差異庫
  save(diff);
} 

二鳞陨、利用并行優(yōu)化對(duì)賬系統(tǒng)

查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 采用并行處理;因?yàn)檫@兩個(gè)操作并沒有先后順序的依賴瞻惋。這兩個(gè)最耗時(shí)的操作并行之后厦滤,執(zhí)行過程如下圖所示。對(duì)比單線程的執(zhí)行熟史,同等時(shí)間里馁害,并行執(zhí)行的吞吐量近乎單線程的 2 倍。


并行優(yōu)化.png
while(存在未對(duì)賬訂單){
  // 查詢未對(duì)賬訂單
  Thread T1 = new Thread(()->{
    pos = getPOrders();
  });
  T1.start();
  // 查詢派送單
  Thread T2 = new Thread(()->{
    dos = getDOrders();
  });
  T2.start();
  // 等待T1蹂匹、T2結(jié)束
  T1.join();
  T2.join();
  // 執(zhí)行對(duì)賬操作
  diff = check(pos, dos);
  // 差異寫入差異庫
  save(diff);
} 

創(chuàng)建了兩個(gè)線程 T1 和 T2碘菜,并行執(zhí)行查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 這兩個(gè)操作。在主線程中執(zhí)行對(duì)賬操作 check() 和差異寫入 save() 兩個(gè)操作限寞。
主線程需要等待線程 T1 和 T2 執(zhí)行完才能執(zhí)行 check() 和 save() 這兩個(gè)操作忍啸,通過調(diào)用 T1.join() 和 T2.join() 來實(shí)現(xiàn)等待,當(dāng) T1 和 T2 線程退出時(shí)履植,調(diào)用 T1.join() 和 T2.join() 的主線程就會(huì)從阻塞態(tài)被喚醒计雌,從而執(zhí)行之后的 check() 和 save()。

三玫霎、CountDownLatch 實(shí)現(xiàn)線程等待

定義

CountDownLatch可以設(shè)置一個(gè)計(jì)數(shù)器凿滤,通過countDown()方法進(jìn)行減1操作妈橄,使用await()方法等待計(jì)數(shù)器不大于0,再繼續(xù)執(zhí)行await()方法之后的語句翁脆。

并行執(zhí)行的方案中眷蚓,while 循環(huán)里面每次都會(huì)創(chuàng)建新的線程,而創(chuàng)建線程可是個(gè)耗時(shí)的操作反番。所以最好是創(chuàng)建出來的線程能夠循環(huán)利用沙热,所以可以使用線程池就能解決這個(gè)問題。

創(chuàng)建一個(gè)固定大小為 2 的線程池罢缸,之后在 while 循環(huán)里重復(fù)利用篙贸。
主線程如何知道 getPOrders() 和 getDOrders() 這兩個(gè)操作什么時(shí)候執(zhí)行完。前面主線程通過調(diào)用線程 T1 和 T2 的 join() 方法來等待線程 T1 和 T2 退出枫疆,但是在線程池的方案里爵川,線程根本就不會(huì)退出,所以 join() 方法已經(jīng)失效了养铸。最直接的辦法是弄一個(gè)計(jì)數(shù)器雁芙,初始值設(shè)置成 2,當(dāng)執(zhí)行完pos = getPOrders();這個(gè)操作之后將計(jì)數(shù)器減 1钞螟,執(zhí)行完dos = getDOrders();之后也將計(jì)數(shù)器減 1,在主線程里谎碍,等待計(jì)數(shù)器等于 0鳞滨;當(dāng)計(jì)數(shù)器等于 0 時(shí),說明這兩個(gè)查詢操作執(zhí)行完了蟆淀。等待計(jì)數(shù)器等于 0 其實(shí)就是一個(gè)條件變量拯啦,用管程實(shí)現(xiàn)起來也很簡單。

使用CountDownLatch實(shí)現(xiàn):

// 創(chuàng)建2個(gè)線程的線程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對(duì)賬訂單){
  // 計(jì)數(shù)器初始化為2
  CountDownLatch latch = new CountDownLatch(2);
  // 查詢未對(duì)賬訂單
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查詢派送單
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待兩個(gè)查詢操作結(jié)束
  latch.await();
  
  // 執(zhí)行對(duì)賬操作
  diff = check(pos, dos);
  // 差異寫入差異庫
  save(diff);
}

在 while 循環(huán)里面熔任,創(chuàng)建了一個(gè) CountDownLatch褒链,計(jì)數(shù)器的初始值等于 2,之后在pos = getPOrders();和dos = getDOrders();兩條語句的后面對(duì)計(jì)數(shù)器執(zhí)行減 1 操作疑苔,這個(gè)對(duì)計(jì)數(shù)器減 1 的操作是通過調(diào)用 latch.countDown(); 來實(shí)現(xiàn)的甫匹。在主線程中,調(diào)用 latch.await() 來實(shí)現(xiàn)對(duì)計(jì)數(shù)器等于 0 的等待惦费。

基本使用:

public class TestCountDownLatch {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        test();
    }
    private static void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            log.debug("begin...");
            sleep(1);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();

        new Thread(() -> {
            log.debug("begin...");
            sleep(2);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();

        new Thread(() -> {
            log.debug("begin...");
            sleep(1.5);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();

        log.debug("waiting...");
        latch.await();
        log.debug("wait end...");
    }
}

配合線程池:

public class TestCountDownLatch {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        test();
    }
    private static void test() {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService service = Executors.newFixedThreadPool(4);
        service.submit(() -> {
            log.debug("begin...");
            sleep(1);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        service.submit(() -> {
            log.debug("begin...");
            sleep(1.5);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        service.submit(() -> {
            log.debug("begin...");
            sleep(2);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        service.submit(()->{
            try {
                log.debug("waiting...");
                latch.await();
                log.debug("wait end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}
17:33:09.312 [pool-1-thread-3] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-1] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-2] DEBUG juc.cdl.TestCountDownLatch - begin...
17:33:09.312 [pool-1-thread-4] DEBUG juc.cdl.TestCountDownLatch - waiting...
17:33:10.316 [pool-1-thread-1] DEBUG juc.cdl.TestCountDownLatch - end...2
17:33:10.818 [pool-1-thread-2] DEBUG juc.cdl.TestCountDownLatch - end...1
17:33:11.319 [pool-1-thread-3] DEBUG juc.cdl.TestCountDownLatch - end...0
17:33:11.319 [pool-1-thread-4] DEBUG juc.cdl.TestCountDownLatch - wait end...

如果想要獲取任務(wù)的結(jié)果兵迅,需要使用future。

四薪贫、CyclicBarrier 實(shí)現(xiàn)線程同步

定義

CyclicBarrier構(gòu)造方法第一個(gè)參數(shù)是目標(biāo)障礙數(shù)恍箭,如果達(dá)到了目標(biāo)障礙數(shù),才會(huì)執(zhí)行cyclicBarrier.await()之后的語句瞧省〕敦玻可以將CyclicBarrier理解為+1操作鳍贾。

demo1:

@Slf4j
public class TestCyclicBarrier {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
            log.debug("task1, task2 finish...");
        });
        for (int i = 0; i < 3; i++) { // task1  task2  task1
            service.submit(() -> {
                log.debug("task1 begin...");
                sleep(1);
                try {
                    barrier.await(); // 2-1=1
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
            service.submit(() -> {
                log.debug("task2 begin...");
                sleep(2);
                try {
                    barrier.await(); // 1-1=0
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();

    }

    private static void test1() {
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 3; i++) {
            CountDownLatch latch = new CountDownLatch(2);
            service.submit(() -> {
                log.debug("task1 start...");
                sleep(1);
                latch.countDown();
            });
            service.submit(() -> {
                log.debug("task2 start...");
                sleep(2);
                latch.countDown();
            });
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task1 task2 finish...");
        }
        service.shutdown();
    }
}

demo2:
前面將 getPOrders() 和 getDOrders() 這兩個(gè)查詢操作并行了,但這兩個(gè)查詢操作和對(duì)賬操作 check()交洗、save() 之間還是串行的骑科。很顯然,這兩個(gè)查詢操作和對(duì)賬操作也是可以并行的藕筋,也就是說纵散,在執(zhí)行對(duì)賬操作的時(shí)候,可以同時(shí)去執(zhí)行下一輪的查詢操作:


完全并行執(zhí)行.png

兩次查詢操作能夠和對(duì)賬操作并行隐圾,對(duì)賬操作還依賴查詢操作的結(jié)果伍掀,類似生產(chǎn)者 - 消費(fèi)者,兩次查詢操作是生產(chǎn)者暇藏,對(duì)賬操作是消費(fèi)者蜜笤。既然是生產(chǎn)者 - 消費(fèi)者模型,那就需要有個(gè)隊(duì)列盐碱,來保存生產(chǎn)者生產(chǎn)的數(shù)據(jù)把兔,而消費(fèi)者則從這個(gè)隊(duì)列消費(fèi)數(shù)據(jù)。

雙隊(duì)列.png

訂單查詢操作將訂單查詢結(jié)果插入訂單隊(duì)列瓮顽,派送單查詢操作將派送單插入派送單隊(duì)列县好,這兩個(gè)隊(duì)列的元素之間是有一一對(duì)應(yīng)的關(guān)系的。兩個(gè)隊(duì)列的好處是暖混,對(duì)賬操作可以每次從訂單隊(duì)列出一個(gè)元素缕贡,從派送單隊(duì)列出一個(gè)元素,然后對(duì)這兩個(gè)元素執(zhí)行對(duì)賬操作拣播,這樣數(shù)據(jù)一定不會(huì)亂掉晾咪。

用雙隊(duì)列來實(shí)現(xiàn)完全的并行:
一個(gè)線程 T1 執(zhí)行訂單的查詢工作,一個(gè)線程 T2 執(zhí)行派送單的查詢工作贮配,當(dāng)線程 T1 和 T2 都各自生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候谍倦,通知線程 T3 執(zhí)行對(duì)賬操作。線程 T1 和線程 T2 的工作要步調(diào)一致泪勒,不能一個(gè)跑得太快昼蛀,一個(gè)跑得太慢,只有這樣才能做到各自生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候酣藻,通知線程 T3曹洽。

同步執(zhí)行.png

線程 T1 和線程 T2 只有都生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候,才能一起向下執(zhí)行辽剧,也就是說送淆,線程 T1 和線程 T2 要互相等待,步調(diào)要一致怕轿;同時(shí)當(dāng)線程 T1 和 T2 都生產(chǎn)完一條數(shù)據(jù)的時(shí)候偷崩,還要能夠通知線程 T3 執(zhí)行對(duì)賬操作辟拷。

實(shí)現(xiàn)方案

  • 線程 T1 和 T2 要做到步調(diào)一致
  • 要能夠通知到線程 T3
    利用一個(gè)計(jì)數(shù)器來解決這兩個(gè)難點(diǎn),計(jì)數(shù)器初始化為 2阐斜,線程 T1 和 T2 生產(chǎn)完一條數(shù)據(jù)都將計(jì)數(shù)器減 1衫冻,如果計(jì)數(shù)器大于 0 則線程 T1 或者 T2 等待。如果計(jì)數(shù)器等于 0谒出,則通知線程 T3隅俘,并喚醒等待的線程 T1 或者 T2,與此同時(shí)笤喳,將計(jì)數(shù)器重置為 2为居,這樣線程 T1 和線程 T2 生產(chǎn)下一條數(shù)據(jù)的時(shí)候就可以繼續(xù)使用這個(gè)計(jì)數(shù)器了。

使用CyclicBarrier實(shí)現(xiàn)


// 訂單隊(duì)列
Vector<P> pos;
// 派送單隊(duì)列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池 
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 執(zhí)行對(duì)賬操作
  diff = check(p, d);
  // 差異寫入差異庫
  save(diff);
}
  
void checkAll(){
  // 循環(huán)查詢訂單庫
  Thread T1 = new Thread(()->{
    while(存在未對(duì)賬訂單){
      // 查詢訂單庫
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循環(huán)查詢運(yùn)單庫
  Thread T2 = new Thread(()->{
    while(存在未對(duì)賬訂單){
      // 查詢運(yùn)單庫
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

創(chuàng)建一個(gè)計(jì)數(shù)器初始值為 2 的 CyclicBarrier杀狡,創(chuàng)建 CyclicBarrier 的時(shí)候蒙畴,傳入了一個(gè)回調(diào)函數(shù),當(dāng)計(jì)數(shù)器減到 0 的時(shí)候呜象,會(huì)調(diào)用這個(gè)回調(diào)函數(shù)膳凝。

線程 T1 負(fù)責(zé)查詢訂單,當(dāng)查出一條時(shí)恭陡,調(diào)用 barrier.await() 來將計(jì)數(shù)器減 1蹬音,同時(shí)等待計(jì)數(shù)器變成 0;線程 T2 負(fù)責(zé)查詢派送單休玩,當(dāng)查出一條時(shí)祟绊,也調(diào)用 barrier.await() 來將計(jì)數(shù)器減 1,同時(shí)等待計(jì)數(shù)器變成 0哥捕;當(dāng) T1 和 T2 都調(diào)用 barrier.await() 的時(shí)候,計(jì)數(shù)器會(huì)減到 0嘉熊,此時(shí) T1 和 T2 就可以執(zhí)行下一條語句了遥赚,同時(shí)會(huì)調(diào)用 barrier 的回調(diào)函數(shù)來執(zhí)行對(duì)賬操作。

CyclicBarrier 的計(jì)數(shù)器有自動(dòng)重置的功能阐肤,當(dāng)減到 0 的時(shí)候凫佛,會(huì)自動(dòng)重置設(shè)置的初始值.

五、總結(jié)

CountDownLatch 和 CyclicBarrier 是 Java 并發(fā)包提供的兩個(gè)非常易用的線程同步工具類:

  • CountDownLatch 主要用來解決一個(gè)線程等待多個(gè)線程的場(chǎng)景孕惜,可以類比旅游團(tuán)團(tuán)長要等待所有的游客到齊才能去下一個(gè)景點(diǎn)愧薛;
  • CountDownLatch 的計(jì)數(shù)器不能循環(huán)利用的,一旦計(jì)數(shù)器減到 0衫画,再有線程調(diào)用 await()毫炉,該線程會(huì)直接通過。
  • CyclicBarrier 是一組線程之間互相等待削罩,更像是幾個(gè)驢友之間不離不棄瞄勾。
  • CyclicBarrier 的計(jì)數(shù)器是可以循環(huán)利用的费奸,而且具備自動(dòng)重置的功能,一旦計(jì)數(shù)器減到 0 會(huì)自動(dòng)重置到設(shè)置的初始值进陡。CyclicBarrier 還可以設(shè)置回調(diào)函數(shù)愿阐。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市趾疚,隨后出現(xiàn)的幾起案子缨历,更是在濱河造成了極大的恐慌,老刑警劉巖糙麦,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件辛孵,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡喳资,警方通過查閱死者的電腦和手機(jī)觉吭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來仆邓,“玉大人鲜滩,你說我怎么就攤上這事〗谥担” “怎么了徙硅?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長搞疗。 經(jīng)常有香客問我嗓蘑,道長,這世上最難降的妖魔是什么匿乃? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任桩皿,我火速辦了婚禮,結(jié)果婚禮上幢炸,老公的妹妹穿的比我還像新娘泄隔。我一直安慰自己,他們只是感情好宛徊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布佛嬉。 她就那樣靜靜地躺著,像睡著了一般闸天。 火紅的嫁衣襯著肌膚如雪暖呕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天苞氮,我揣著相機(jī)與錄音湾揽,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛钝腺,可吹牛的內(nèi)容都是我干的抛姑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼艳狐,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼定硝!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起毫目,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤蔬啡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后镀虐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體箱蟆,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年刮便,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了空猜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恨旱,死狀恐怖辈毯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情搜贤,我是刑警寧澤谆沃,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站仪芒,受9級(jí)特大地震影響唁影,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜掂名,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一据沈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧饺蔑,春花似錦卓舵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽裹虫。三九已至肿嘲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間筑公,已是汗流浹背雳窟。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人封救。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓拇涤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親誉结。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鹅士,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容