ForkJoin 學(xué)習(xí)使用筆記

ForkJoin 學(xué)習(xí)使用筆記

Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架勺馆, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù)戏售,最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架

背景

在日常的業(yè)務(wù)需求中,經(jīng)常出現(xiàn)的批量查詢草穆,批量寫入等接口的提供灌灾,一般來(lái)說(shuō),最簡(jiǎn)單最low的方式就是寫一個(gè)for循環(huán)來(lái)一次執(zhí)行悲柱,但是當(dāng)業(yè)務(wù)方對(duì)接口的性能要求較高時(shí)锋喜,就比較尷尬了

通常可以想到的方式是采用并發(fā)操作豌鸡,首先想到可以實(shí)現(xiàn)的方式就是利用線程池來(lái)做

通常實(shí)現(xiàn)方式如下

// 1. 創(chuàng)建線程池

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
      new ThreadPoolExecutor.CallerRunsPolicy());

// 2. 創(chuàng)建執(zhí)行任務(wù)
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
        futureList.add(executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
              // xxx
            }
        }));
}

// 3. 結(jié)果獲取
for(Future f: futureList) {
    Object obj = f.get();
}

用上面的這種方式并沒(méi)有什么問(wèn)題嘿般,我們接下來(lái)考慮的是如何使用ForkJoin框架來(lái)實(shí)現(xiàn)類似的功能

ForkJoin 基本知識(shí)

Fork: 將大任務(wù)拆分成若干個(gè)可以并發(fā)執(zhí)行的小任務(wù)

Join: 合并所有小任務(wù)的執(zhí)行結(jié)果

forkjoin

任務(wù)分割

ForkJoinTask : 基本任務(wù),使用forkjoin框架必須創(chuàng)建的對(duì)象直颅,提供fork,join操作博个,常用的兩個(gè)子類

  • RecursiveAction : 無(wú)結(jié)果返回的任務(wù)
  • RecursiveTask : 有返回結(jié)果的任務(wù)

說(shuō)明:

  1. fork : 讓task異步執(zhí)行
  2. join : 讓task同步執(zhí)行,可以獲取返回值
  3. ForkJoinTask 在不顯示使用ForkJoinPool.execute/invoke/submit()方法進(jìn)行執(zhí)行的情況下功偿,也可以使用自己的fork/invoke方法進(jìn)行執(zhí)行

結(jié)果合并

ForkJoinPool 執(zhí)行 ForkJoinTask盆佣,

  • 任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部械荷。
  • 當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒(méi)有任務(wù)時(shí)共耍,它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)

三中提交方式:

  1. execute 異步,無(wú)返回結(jié)果
  2. submit 異步吨瞎,有返回結(jié)果 (返回Future<T>
  3. invoke 同步痹兜,有返回結(jié)果 (會(huì)阻塞)

使用說(shuō)明

結(jié)合兩個(gè)場(chǎng)景,給出使用姿勢(shì)

1. 累加

實(shí)現(xiàn)從 start - end 的累加求和

首先是定義一個(gè)CountTask 來(lái)實(shí)現(xiàn)求和

首先是確定任務(wù)分割的閥值颤诀,當(dāng) end-start 的差值大于閥值時(shí)字旭,將任務(wù)一分為二

public class CountTask extends RecursiveTask<Integer> {

    private int start;
    private int end;

    private static final int THRED_HOLD = 30;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRED_HOLD;
        if (canCompute) { // 不需要拆分
            for (int i = start; i <= end; i++) {
                sum += i;
            }

            System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
        } else {
            int mid = (end + start) / 2;
            CountTask left = new CountTask(start, mid);
            CountTask right = new CountTask(mid + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }
}

調(diào)用case

@Test
public void testFork() throws ExecutionException, InterruptedException {
    int start = 0;
    int end = 200;

    CountTask task = new CountTask(start, end);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    Future<Integer> ans = pool.submit(task);
    int sum = ans.get();
    System.out.println(sum);
}

輸出結(jié)果:

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100

2. 排序

int 數(shù)組進(jìn)行排序

同樣先定義一個(gè)SortTask, 主要是為了演示ForkJoin的使用姿勢(shì),具體的排序和合并的邏輯比較簡(jiǎn)陋的實(shí)現(xiàn)了一下(這塊不是重點(diǎn))

public class SortTask extends RecursiveTask<List<Integer>> {

    private List<Integer> list;

    private final static int THRESHOLD = 5;

    public SortTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected List<Integer> compute() {
        if (list.size() < THRESHOLD) {
            Collections.sort(list);

            System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
            return list;
        }


        int mid = list.size() >> 1;


        SortTask l = new SortTask(list.subList(0,  mid));
        SortTask r = new SortTask(list.subList(mid, list.size()));

        l.fork();
        r.fork();

        List<Integer> left = l.join();
        List<Integer> right = r.join();

        return merge(left, right);
    }

    private List<Integer> merge(List<Integer> left, List<Integer> right) {
        List<Integer> result = new ArrayList<>(left.size() + right.size());

        int rightIndex = 0;
        for (int i = 0; i < left.size(); i++) {
            if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
                result.add(left.get(i));
            } else {
                result.add(right.get(rightIndex++));
                i -= 1;
            }
        }

        if (rightIndex < right.size()) {
            result.addAll(right.subList(rightIndex, right.size()));
        }

        return result;
    }
}

測(cè)試case和上面基本一樣崖叫,我們改用 invoke 替換上面的 submit

@Test
public void testMerge() throws ExecutionException, InterruptedException {
    List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
    SortTask sortTask = new SortTask(list);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    List<Integer> ans = pool.invoke(sortTask);
    System.out.println(ans);
}

輸出結(jié)果

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末遗淳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子心傀,更是在濱河造成了極大的恐慌屈暗,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異养叛,居然都是意外死亡种呐,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門弃甥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)爽室,“玉大人,你說(shuō)我怎么就攤上這事淆攻“怪” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵卜录,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我眶明,道長(zhǎng)艰毒,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任搜囱,我火速辦了婚禮丑瞧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蜀肘。我一直安慰自己绊汹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布扮宠。 她就那樣靜靜地躺著西乖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪坛增。 梳的紋絲不亂的頭發(fā)上获雕,一...
    開(kāi)封第一講書人閱讀 52,156評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音收捣,去河邊找鬼届案。 笑死,一個(gè)胖子當(dāng)著我的面吹牛罢艾,可吹牛的內(nèi)容都是我干的楣颠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼咐蚯,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼童漩!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起仓蛆,我...
    開(kāi)封第一講書人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤睁冬,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體豆拨,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡直奋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了施禾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脚线。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖弥搞,靈堂內(nèi)的尸體忽然破棺而出邮绿,到底是詐尸還是另有隱情,我是刑警寧澤攀例,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布船逮,位于F島的核電站,受9級(jí)特大地震影響粤铭,放射性物質(zhì)發(fā)生泄漏挖胃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一梆惯、第九天 我趴在偏房一處隱蔽的房頂上張望酱鸭。 院中可真熱鬧,春花似錦垛吗、人聲如沸凹髓。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蔚舀。三九已至,卻和暖如春锨络,著一層夾襖步出監(jiān)牢的瞬間蝗敢,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工足删, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留寿谴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓失受,卻偏偏與公主長(zhǎng)得像讶泰,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拂到,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 一痪署、多線程 說(shuō)明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是兄旬,通過(guò) New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,683評(píng)論 0 44
  • 摘要 這篇論文描述了Fork/Join框架的設(shè)計(jì)狼犯、實(shí)現(xiàn)以及性能余寥。這個(gè)框架通過(guò)(遞歸的)把問(wèn)題劃分為子任務(wù),然后并行...
    itonyli閱讀 1,165評(píng)論 0 5
  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-01】 更新日志 日期更新內(nèi)容備注2017-11-01新建文章V1...
    一字馬胡閱讀 7,369評(píng)論 9 134
  • 導(dǎo)讀目錄 線程組(ThreadGroup) 線程池(Thread Pool) Fork/Join框架和Execut...
    ql2012jz閱讀 1,459評(píng)論 0 0
  • 最近《長(zhǎng)城》電影火熱上映悯森, 雖然上映5天票房達(dá)到5億多宋舷,但對(duì)于這部電影的議論卻從來(lái)沒(méi)有停止。 長(zhǎng)城從古至今就是一個(gè)...
    一顆流心閱讀 484評(píng)論 0 2