并發(fā)編程之 Fork-Join 分而治之框架

前言

“分而治之” 一直是一個有效的處理大量數(shù)據(jù)的方法牵寺。著名的 MapReduce 也是采取了分而治之的思想绰更。簡單來說惰帽,就是如果你要處理1000個數(shù)據(jù)屉凯,但是你并不具備處理1000個數(shù)據(jù)的能力,那么你可以只處理其中的10個单山,然后碍现,分階段處理100次,將100次的結果進行合成米奸,那就是最終想要的對原始的1000個數(shù)據(jù)的處理結果昼接。

Fork & Join 的具體含義

Fork 一詞的原始含義是吃飯用的叉子,也有分叉的意思悴晰。在Linux 平臺中慢睡,函數(shù) fork()用來創(chuàng)建子進程,使得系統(tǒng)進程可以多一個執(zhí)行分支铡溪。在 Java 中也沿用了類似的命名方式漂辐。

而 Join() 的含義和 Thread 類的 join 類似,表示等待棕硫。也就是使用 fork() 后系統(tǒng)多了一個執(zhí)行分支(線程)髓涯,所以需要等待這個執(zhí)行分支執(zhí)行完畢,才有可能得到最終的結果哈扮,因此 join 就是表示等待纬纪。

在實際使用中,如果毫無顧忌的使用 fork 開啟線程進行處理滑肉,那么很有可能導致系統(tǒng)開啟過多的線程而嚴重影響性能育八。所以,在JDK中赦邻,給出一個 ForkJoinPool 線程池髓棋,對于 fork() 方法并不急著開啟線程,而是提交給 ForkJoiinPool 線程池進行處理,以節(jié)省系統(tǒng)資源按声。

由于線程池的優(yōu)化膳犹,提交的任務和線程數(shù)量并不是一對一的關系。在絕大多數(shù)情況下签则,一個物理線程實際上是需要處理多個邏輯任務的须床。因此,每個線程必然需要擁有一個任務隊列渐裂。因此豺旬,在實際執(zhí)行過程中,可能遇到這么一種情況:線程A已經(jīng)把自己的任務都處理完了柒凉,而線程B還有一堆任務等著處理族阅,此時,線程A就會“幫助” 線程B膝捞,從線程 B的任務隊列中拿一個任務來處理坦刀,盡可能的達到平衡。值得注意的是:當線程試圖幫助別人時蔬咬,總是從任務隊列的底部開始拿數(shù)據(jù)鲤遥,而線程試圖執(zhí)行自己的任務時,則從相反的頂部開始拿林艘。因此這種行為也十分有利于避免數(shù)據(jù)競爭盖奈。

我們看看線程池 ForkJoinPool 的一個接口:

    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

你可以向 ForkJoinPool 線程池提交一個 ForkJoinTask 任務。所謂 ForkJoinTask 任務就是支持 fork () 分解以及 join()等待的任務狐援。 ForkJoinTask 有兩個重要的子類钢坦,RecursiveAction 和 RecursiveTask。他們粉筆表示沒有返回值的任務和可以攜帶返回值的任務咕村。有點像 Rannable 和 Callable。

下面來要給簡單的例子展示 Fork/Join 框架的使用蚊俺。這里用來計算求和。

/**
 *  Fork/Join 核心思想:分而治之
 *
 * 著名的 MapReduce 也是這個思想。將任務進行分解捶惜,然后合并所有的結果盗冷。
 *
 */
public class CountTask extends RecursiveTask<Long> {

  /**
   * 閥值
   */
  static final int THRESHOLD = 10000;
  long start;
  long end;

  public CountTask(long start, long end) {
    this.start = start;
    this.end = end;
  }

  /**
   * 有返回值的
   * @return
   */
  @Override
  protected Long compute() {

    long sum = 0;
    // 當閥值小于10000則不分解了
    boolean canCompute = (end - start) < THRESHOLD;
    if (canCompute) {
      for (long i = start; i <= end; i++) {
        sum += i;
      }
    } else {
      // 2000
      long step = (start + end) / 100;
      ArrayList<CountTask> subTasks = new ArrayList<>();
      long pos = start;
      for (int i = 0; i < 100; i++) {
        long lastOne = pos + step;
        if (lastOne > end) {
          lastOne = end;
        }
        //0-2000 個計算任務 * 100
        CountTask subTask = new CountTask(pos, lastOne);
        pos += step + 1;
        subTasks.add(subTask);
        subTask.fork();// fork
      }

      for (CountTask t : subTasks) {
        sum += t.join();
      }
    }
    return sum;

  }

  public static void main(String[] args) {

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    CountTask task = new CountTask(0, 200000L);
    // 將一個大的任務提交到池中
    ForkJoinTask<Long> result = forkJoinPool.submit(task);
    long res = 0;
    try {
      // 等待運算結果
      res = result.get();
      System.out.println("sum = " + res);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }

  }
}

由于計算求和必須需要返回值,因此我們選擇了 RecursiveTask 作為任務的模型得封。首先我們構造了一個大任務埋心,提交給線程池,線程池會返回一個攜帶結果的任務忙上,通過 get 方法可以得到最終結果拷呆。如果執(zhí)行 get 方法時任務沒有結束,那么主線程就會在 get 方法等待。

再看看 CountTask 的實現(xiàn)茬斧,首先 CountTask 繼承自 RecursiveTask 腰懂,可以攜帶返回值,這里的返回值類型設置為 long项秉,定義一個 THRESHOLD 設置了任務分解的規(guī)模绣溜,也就是如果需要求和的總數(shù)大于 THRESHOLD 個,那么任務就需要再次分解娄蔼,否則就直接執(zhí)行怖喻。 每次分解時,簡單的將原有任務劃分成100個規(guī)模相等的小任務岁诉,并使用 fork() 提交子任務锚沸。之后,等待所有的子任務結束唉侄,并將結果再次求和咒吐。

再使用 ForkJoin的時候注意:如果任務的劃分層次很深,一直得不到返回属划,那么可能出現(xiàn)兩種情況: 第一恬叹,系統(tǒng)內的線程數(shù)量越來越多,導致性能嚴重下降同眯。第二绽昼,函數(shù)的調用層次變的很深,最終導致棧溢出须蜗。

此外硅确,F(xiàn)orkJoin 線程池使用一個無鎖的棧來管理空閑線程,如果一個工作線程暫時取不到可用的任務明肮,則可能會被掛起菱农,掛起的線程將會被壓入由線程池維護的棧中,待將來有任務可用時柿估,再從棧中喚醒這些線程循未。

總結

本文來源自 《Java 高并發(fā)程序設計》,沒有什么自己的見解秫舌。因為使用場景太少了的妖。不過還是可以看看源碼來漲漲姿勢的。嘿嘿足陨。

good luck I┧凇!D怠星虹!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末零抬,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子搁凸,更是在濱河造成了極大的恐慌媚值,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件护糖,死亡現(xiàn)場離奇詭異褥芒,居然都是意外死亡,警方通過查閱死者的電腦和手機嫡良,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門锰扶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人寝受,你說我怎么就攤上這事坷牛。” “怎么了很澄?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵京闰,是天一觀的道長。 經(jīng)常有香客問我甩苛,道長蹂楣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任讯蒲,我火速辦了婚禮痊土,結果婚禮上,老公的妹妹穿的比我還像新娘墨林。我一直安慰自己赁酝,他們只是感情好,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布旭等。 她就那樣靜靜地躺著酌呆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪搔耕。 梳的紋絲不亂的頭發(fā)上隙袁,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天,我揣著相機與錄音度迂,去河邊找鬼藤乙。 笑死猜揪,一個胖子當著我的面吹牛惭墓,可吹牛的內容都是我干的。 我是一名探鬼主播而姐,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼腊凶,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起钧萍,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤褐缠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后风瘦,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體队魏,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年万搔,在試婚紗的時候發(fā)現(xiàn)自己被綠了胡桨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡瞬雹,死狀恐怖昧谊,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情酗捌,我是刑警寧澤呢诬,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站胖缤,受9級特大地震影響尚镰,放射性物質發(fā)生泄漏。R本人自食惡果不足惜草姻,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一钓猬、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧撩独,春花似錦敞曹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至剧劝,卻和暖如春橄登,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背讥此。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工拢锹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人萄喳。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓卒稳,卻偏偏與公主長得像,于是被迫代替她去往敵國和親他巨。 傳聞我的和親對象是個殘疾皇子充坑,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內容