Java中的并發(fā)包下常見類

前言

最近在看并發(fā)編程藝術(shù)這本書膛堤,對看書的一些筆記及個人工作中的總結(jié)奋构。

Callable和Future使用

Future模式非常適合在處理很耗時很長的業(yè)務(wù)邏輯時進(jìn)行使用壳影,可以有效的減小系統(tǒng)的響應(yīng)時間,提高系統(tǒng)的吞吐量弥臼。

//get方法的作用之前也講過了宴咧,阻塞直到異步執(zhí)行結(jié)果返回才不阻塞,繼續(xù)往下執(zhí)行
public class TestFuture implements Callable<String> {
    private String para;

    public TestFuture(String para){
        this.para = para;
    }

    /**
     * 這里是真實的業(yè)務(wù)邏輯径缅,其執(zhí)行可能很慢
     */
    @Override
    public String call() throws Exception {
        //模擬執(zhí)行耗時,模擬的是db操作掺栅,para就是查詢條件
        Thread.sleep(5000);
        String result = this.para + "處理完成";
        System.out.println("執(zhí)行成功了!D芍怼Q跷浴!氏堤!");
        return result;
    }

    //主控制函數(shù)
    public static void main(String[] args) throws Exception {
        String queryStr = "query";
        //構(gòu)造FutureTask沙绝,并且傳入需要真正進(jìn)行業(yè)務(wù)邏輯處理的類,該類一定是實現(xiàn)了Callable接口的類
        FutureTask<String> future = new FutureTask<>(new TestFuture(queryStr));

        FutureTask<String> future2 = new FutureTask<>(new TestFuture(queryStr));
        //創(chuàng)建一個固定線程的線程池且線程數(shù)為2,
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //這里提交任務(wù)future,則開啟線程執(zhí)行RealData的call()方法執(zhí)行
        //submit和execute的區(qū)別: 第一點是submit可以傳入實現(xiàn)Callable接口的實例對象, 第二點是submit方法有返回值
        Future f1 = executor.submit(future);        //單獨啟動一個線程去執(zhí)行的
        Future f2 = executor.submit(future2);
        //f1.get() == null的時候說明已經(jīng)取到了結(jié)果鼠锈,執(zhí)行到了最后
        System.out.println("請求完畢");

        try {
            //這里可以做額外的數(shù)據(jù)操作闪檬,也就是主程序執(zhí)行其他業(yè)務(wù)邏輯
            System.out.println("處理實際的業(yè)務(wù)邏輯...");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //調(diào)用獲取數(shù)據(jù)方法,如果call()方法沒有執(zhí)行完成,則依然會進(jìn)行等
        //System.out.println("數(shù)據(jù):" + future.get());
        //System.out.println("數(shù)據(jù):" + future2.get());

        executor.shutdown();
    }

}

CountDownLatch

//join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實現(xiàn)原理是不停檢查,join線程是否存活购笆,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待谬以。
public class JoinCountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(() -> {

        });

        Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));

        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }

}

join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實現(xiàn)原理是不停檢查,join線程是否存活由桌,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。直到j(luò)oin線程中止后,線程的this.notifyAll()方法會被調(diào)用

使用CountDownLatch也可以實現(xiàn)

public class CountDownLatchTest {

    //參數(shù)是2表示對象執(zhí)行2次countDown方法才能釋放鎖
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            c.countDown();  
            System.out.println(2);
            c.countDown();
        }).start();

        c.await(); //線程等待
        System.out.println("3");
    }

}

同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)行您。它要做的事情是,讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞娃循,直到最后一個線程到達(dá)屏障時炕檩,屏障才會開門捌斧,所有被屏障攔截的線程才會繼續(xù)運行妇押。

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {

            }
            System.out.println(1);
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }
}

CyclicBarrier還提供一個更高級的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action)肩杈,用于在線程到達(dá)屏障時解寝,優(yōu)先執(zhí)行barrierAction扩然,方便處理更復(fù)雜的業(yè)務(wù)場景

//先執(zhí)行線程A
public class CyclicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {

            }
            System.out.println(1);
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {

        @Override
        public void run() {
            System.out.println(3);
        }

    }

}

執(zhí)行結(jié)果:

3
1
2

** CyclicBarrier的應(yīng)用場景 **
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景聋伦。例如夫偶,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水嘉抓,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水索守,先用多線程處理每個sheet里的銀行流水,都執(zhí)行完之后抑片,得到每個sheet的日均銀行流水卵佛,最后,再用barrierAction用這些線程的計算結(jié)果敞斋,計算出整個Excel的日均銀行流水截汪,

CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置植捎。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景衙解。例如,如果計算發(fā)生錯誤焰枢,可以重置計數(shù)器蚓峦,并讓線程重新執(zhí)行一次舌剂。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量暑椰。isBroken()方法用來了解阻塞的線程是否被中斷霍转。

public class CyclicBarrierTest3 {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        Thread thread = new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {
            }
        });
        thread.start();
        thread.interrupt();
        try {
            c.await();
        } catch (Exception e) {
            System.out.println(c.isBroken());
        }
    }
}

Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程一汽,以保證合理的使用公共資源避消。
Semaphore可以控制系統(tǒng)的流量,拿到信號量的線程可以進(jìn)入召夹,否則就等待岩喷。通過acquire()和release()獲取和釋放訪問許可。

public class TestSemaphore {
    public static void main(String[] args) {
        // 線程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5個線程同時訪問
        final Semaphore semp = new Semaphore(5);
        // 模擬20個客戶端訪問
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = () -> {
                try {
                    // 獲取許可
                    semp.acquire();
                    System.out.println("Accessing: " + NO);
                    //模擬實際業(yè)務(wù)邏輯
                    Thread.sleep((long) (Math.random() * 10000));
                    // 訪問完后监憎,釋放
                    semp.release();
                } catch (InterruptedException e) {
                }
            };
            exec.execute(run);
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //System.out.println(semp.getQueueLength());
        // 退出線程池
        exec.shutdown();
    }
}

Semaphore可以用于做流量控制纱意,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接枫虏。假如有一個需求淋叶,要讀取幾萬個文件的數(shù)據(jù)因惭,因為都是IO密集型任務(wù),我們可以啟動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后报亩,還需要存儲到數(shù)據(jù)庫中章蚣,而數(shù)據(jù)庫的連接數(shù)只有10個推励,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù)父晶,否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候赞警,就可以使用Semaphore來做流量控制

//線程池中有30個線程妓忍,但是只有10個線程可以獲得許可
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire(); //獲取許可證
                        System.out.println("save data");
                        Thread.sleep(1000);
                        s.release(); //釋放許可證
                    } catch (InterruptedException e) {
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

其他方法
Semaphore:還提供一些其他方法,具體如下愧旦。
intavailablePermits():返回此信號量中當(dāng)前可用的許可證數(shù)世剖。
intgetQueueLength():返回正在等待獲取許可證的線程數(shù)。
booleanhasQueuedThreads():是否有線程正在等待獲取許可證笤虫。
void reducePermits(int reduction):減少reduction個許可證旁瘫,是個protected方法。
Collection getQueuedThreads():返回所有等待獲取許可證的線程集合琼蚯,是個protected方法酬凳。

線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換遭庶。它提供一個同步點宁仔,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)峦睡。這兩個線程通過exchange方法交換數(shù)據(jù)翎苫,如果第一個線程先執(zhí)行exchange()方法权埠,它會一直等待第二個線程也執(zhí)行exchange方法,當(dāng)兩個線程都到達(dá)同步點時拉队,這兩個線程就可以交換數(shù)據(jù)弊知,將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水A";// A錄入銀行流水?dāng)?shù)據(jù)
                    Thread.sleep(4000);
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水B";// B錄入銀行流水?dāng)?shù)據(jù)
                    String A = exgr.exchange("B"); //得到A值
                    System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + "粱快,A錄入的是:" + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();

    }
}

如果兩個線程有一個沒有執(zhí)行exchange()方法叔扼,則會一直等待事哭,如果擔(dān)心有特殊情況發(fā)生,避免一直等待瓜富,可以使用exchange(V x鳍咱,longtimeout,TimeUnit unit)設(shè)置最大等待時長与柑。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谤辜,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子价捧,更是在濱河造成了極大的恐慌丑念,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件结蟋,死亡現(xiàn)場離奇詭異脯倚,居然都是意外死亡,警方通過查閱死者的電腦和手機嵌屎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門推正,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人宝惰,你說我怎么就攤上這事植榕。” “怎么了尼夺?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵尊残,是天一觀的道長。 經(jīng)常有香客問我汞斧,道長夜郁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任粘勒,我火速辦了婚禮竞端,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘庙睡。我一直安慰自己事富,他們只是感情好技俐,可當(dāng)我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著统台,像睡著了一般雕擂。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贱勃,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天井赌,我揣著相機與錄音,去河邊找鬼贵扰。 笑死仇穗,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的戚绕。 我是一名探鬼主播纹坐,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼舞丛!你這毒婦竟也來了耘子?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤球切,失蹤者是張志新(化名)和其女友劉穎谷誓,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體欧聘,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡片林,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了怀骤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片费封。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蒋伦,靈堂內(nèi)的尸體忽然破棺而出弓摘,到底是詐尸還是另有隱情,我是刑警寧澤痕届,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布韧献,位于F島的核電站,受9級特大地震影響研叫,放射性物質(zhì)發(fā)生泄漏锤窑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一嚷炉、第九天 我趴在偏房一處隱蔽的房頂上張望渊啰。 院中可真熱鬧,春花似錦、人聲如沸绘证。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嚷那。三九已至胞枕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間魏宽,已是汗流浹背腐泻。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留队询,地道東北人贫悄。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像娘摔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子唤反,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容