簡體字豆茫、馮|java11出來了侨歉,java8里邊最好用的并發(fā)框架 fork-join怎么玩匙睹,會了嗎饲宿?


title: 2018-8-3 聊聊 fork-join框架
tags: java骂倘,并發(fā)娶耍,線程池
grammar_cjkRuby: true


what闭树?

Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架挥萌, 是一個把大任務(wù)分割成若干個小任務(wù)悍引,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架家肯。

why?

同樣是線程池倦挂,為什么我們要用Fork/Join不用別的棋枕。

工作竊取算法(work-stealing):

先執(zhí)行完當前線程的任務(wù),直接去從其他為執(zhí)行完任務(wù)的線程中竊取任務(wù)執(zhí)行妒峦;

圖:

任務(wù)竊取

如圖線程1先執(zhí)行完自己的4個任務(wù)重斑,線程2還有沒執(zhí)行的任務(wù),線程1接著從線程2中獲取未執(zhí)行的任務(wù)肯骇。實現(xiàn)是基于 雙向隊列

How窥浪?

RecursiveAction

執(zhí)行沒有返回結(jié)果的任務(wù)

  • 新建TestRecursiveAction 對任務(wù)進行分割的主線程類
package com.fengxg.test.recursive.action;

import java.util.concurrent.RecursiveAction;

/**
 * @Desccription 用于沒有返回結(jié)果的任務(wù),對任務(wù)進行分割的主線程
 * @auther Fengxg
 * @create 2018/7/25
 */
public class TestRecursiveAction extends RecursiveAction {

    private static final int ONE_TASK_DEAL_NUMBER = 10;

    /**
     * 計數(shù)器笛丙,計算器執(zhí)行完漾脂,表示最后一個任務(wù)也跑完
     */

    String[] data = {};

    public TestRecursiveAction(String[] data) {
        super();
        this.data = data;
    }

    @Override
    protected void compute() {
        //需要拆分成多少個任務(wù)單數(shù)
        int needSplitCount = 1;
        //剩余多少任務(wù)單
        int leftCount = data.length % ONE_TASK_DEAL_NUMBER;
        needSplitCount = (leftCount == 0?data.length/ONE_TASK_DEAL_NUMBER:data.length/ONE_TASK_DEAL_NUMBER+1);
        //初始化計數(shù)器
//        countDownLatch = new CountDownLatch(needSplitCount);

        //拆分任務(wù)單
        for(int i=0; i<needSplitCount; i++)
        {
            ActionWorkTask workTask = null;
            if(i == needSplitCount-1 && leftCount!=0)
            {
                workTask = new ActionWorkTask(i*ONE_TASK_DEAL_NUMBER, i*ONE_TASK_DEAL_NUMBER+leftCount, data,i);
            }else {
                workTask = new ActionWorkTask(i*ONE_TASK_DEAL_NUMBER, (i+1)*ONE_TASK_DEAL_NUMBER, data,i);
            }
            //提交到線程池隊列中
            workTask.fork();
        }
    }

    public String[] getData() {
        return data;
    }

    public void setData(String[] data) {
        this.data = data;
    }
}

  • 新建ActionWorkTask 分片后執(zhí)行具體業(yè)務(wù)任務(wù)的類
package com.fengxg.test.recursive.action;

import java.util.concurrent.RecursiveAction;

/**
 * @Desccription 分片后執(zhí)行業(yè)務(wù)任務(wù)的具體類
 * @auther Fengxg
 * @create 2018/7/25
 */
public class ActionWorkTask  extends RecursiveAction{

    public ActionWorkTask(int startIndex, int endIndex, String[] datas,int workNum) {
        super();
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.datas = datas;
        this.workNum = workNum;
    }

    private int workNum;
    private int startIndex;
    private int endIndex;
//    private CountDownLatch countDownLatch;
    private String[] datas;

    @Override
    protected void compute() {
        Thread t = Thread.currentThread();
        for (int i = startIndex; i < endIndex; i++) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("workNum" + workNum  +",第" + datas[i] + "任務(wù)," + "執(zhí)行線程:" + t.getName());
        }
//        countDownLatch.countDown();
    }

    public int getStartIndex() {
        return startIndex;
    }

    public void setStartIndex(int startIndex) {
        this.startIndex = startIndex;
    }

    public int getEndIndex() {
        return endIndex;
    }

    public void setEndIndex(int endIndex) {
        this.endIndex = endIndex;
    }

    public String[] getDatas() {
        return datas;
    }

    public void setDatas(String[] datas) {
        this.datas = datas;
    }
}

RecursiveTask

執(zhí)行沒有返回結(jié)果的任務(wù)

  • 新建 TestRecursiveTask 對任務(wù)進行分割的主線程類

    package com.fengxg.test.recursive.task;
      import com.fengxg.test.recursive.dto.TaskResultDTO;
    
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.RecursiveTask;
    
      /**
       * @Desccription 返回有結(jié)果任務(wù)
       * @auther Fengxg
       * @create 2018/7/25
       */
      public class TestRecursiveTask extends RecursiveTask<TaskResultDTO> {
    
    
      private static final int ONE_TASK_DEAL_NUMBER = 10;
      private List<TaskWorkTask> taskWorkTasks = new ArrayList<TaskWorkTask>();
      /**
       * 計數(shù)器胚鸯,計算器執(zhí)行完骨稿,表示最后一個任務(wù)也跑完
       */
      private CountDownLatch countDownLatch = null;
      /**
       * 計數(shù)器,計算器執(zhí)行完姜钳,表示最后一個任務(wù)也跑完
       */
    
      String[] data = {};
    
      public TestRecursiveTask(String[] data) {
          super();
          this.data = data;
      }
    
      @Override
      protected TaskResultDTO compute() {
          //需要拆分成多少個任務(wù)單數(shù)
          int needSplitCount = 1;
          //剩余多少任務(wù)單
          int leftCount = data.length % ONE_TASK_DEAL_NUMBER;
          needSplitCount = (leftCount == 0?data.length/ONE_TASK_DEAL_NUMBER:data.length/ONE_TASK_DEAL_NUMBER+1);
          //初始化計數(shù)器
          countDownLatch = new CountDownLatch(needSplitCount);
    
          //拆分任務(wù)單
          for(int i=0; i<needSplitCount; i++)
          {
              TaskWorkTask workTask = null;
              if(i == needSplitCount-1 && leftCount!=0)
              {
                  workTask = new TaskWorkTask(i*ONE_TASK_DEAL_NUMBER, i*ONE_TASK_DEAL_NUMBER+leftCount,countDownLatch, data,i);
              }else {
                  workTask = new TaskWorkTask(i*ONE_TASK_DEAL_NUMBER, (i+1)*ONE_TASK_DEAL_NUMBER,countDownLatch, data,i);
              }
    
              taskWorkTasks.add(workTask);
              //提交到線程池隊列中
              workTask.fork();
          }
    
          //整個任務(wù)直接結(jié)果
          TaskResultDTO taskResultDTO = new TaskResultDTO();
    
          for(TaskWorkTask taskWorkTask: taskWorkTasks){
              //遍歷子任務(wù)返回的結(jié)果集坦冠,統(tǒng)計總結(jié)果
              TaskResultDTO taskWorkTaskResultDTO    = taskWorkTask.join();
              taskResultDTO.setErrorCount(taskResultDTO.getErrorCount() + taskWorkTaskResultDTO.getErrorCount());
              taskResultDTO.setSuccessCount(taskResultDTO.getSuccessCount() + taskWorkTaskResultDTO.getSuccessCount());
              taskResultDTO.setTotalCount(taskResultDTO.getTotalCount() + taskWorkTaskResultDTO.getTotalCount());
          }
          
          return taskResultDTO;
      }
    
    
      public String[] getData() {
          return data;
      }
    
      public void setData(String[] data) {
          this.data = data;
      }
      }
    
  • 新建 TaskWorkTask 分片后執(zhí)行具體業(yè)務(wù)任務(wù)的類

package com.fengxg.test.recursive.task;

import com.fengxg.test.recursive.dto.TaskResultDTO;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RecursiveTask;

/**
* @Desccription 分片后執(zhí)行業(yè)務(wù)任務(wù)的具體類
* @auther Fengxg
* @create 2018/7/25
*/
public class TaskWorkTask extends RecursiveTask<TaskResultDTO>{

  public TaskWorkTask(int startIndex, int endIndex, CountDownLatch countDownLatch, String[] datas, int workNum) {
      super();
      this.startIndex = startIndex;
      this.endIndex = endIndex;
      this.datas = datas;
      this.workNum = workNum;
      this.countDownLatch = countDownLatch;
  }

  private int workNum;
  private int startIndex;
  private int endIndex;
  private CountDownLatch countDownLatch;
  private String[] datas;

  @Override
  protected TaskResultDTO compute() {
      TaskResultDTO taskResultDTO = new TaskResultDTO();
      Long errorCount = 0L;
      Long successCount = 0L;
      Long totalCount = 0L;
      taskResultDTO.setErrorCount(errorCount);
      taskResultDTO.setSuccessCount(successCount);
      taskResultDTO.setTotalCount(totalCount);
      Thread t = Thread.currentThread();
      for (int i = startIndex; i < endIndex; i++) {
          try {
              Thread.sleep(100L);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          System.out.println("workNum" + workNum  +",第" + datas[i] + "任務(wù)," + "執(zhí)行線程:" + t.getName());
          
          if (0 == i%3) {
              taskResultDTO.setErrorCount(taskResultDTO.getErrorCount() + 1);
          } else {
              taskResultDTO.setSuccessCount(taskResultDTO.getSuccessCount() + 1);
          }

          taskResultDTO.setTotalCount(taskResultDTO.getTotalCount() + 1);
      }
      countDownLatch.countDown();
      return taskResultDTO;
  }

  public int getStartIndex() {
      return startIndex;
  }

  public void setStartIndex(int startIndex) {
      this.startIndex = startIndex;
  }

  public int getEndIndex() {
      return endIndex;
  }

  public void setEndIndex(int endIndex) {
      this.endIndex = endIndex;
  }

  public String[] getDatas() {
      return datas;
  }

  public void setDatas(String[] datas) {
      this.datas = datas;
  }
}

調(diào)用 ,測試類

package com.fengxg.test;

import com.fengxg.test.recursive.action.TestRecursiveAction;
import com.fengxg.test.recursive.dto.TaskResultDTO;
import com.fengxg.test.recursive.task.TestRecursiveTask;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 * @Desccription
 * @auther Fengxg
 * @create 2018/7/25
 */
public class TestForkJoinMain {
    public static void main(String[] args) throws InterruptedException {
        int count = 50;
        String[] datas= new String[count];
        for (int i = 0; i < count; i++) {
            datas[i] = String.valueOf(i);
        }

        Long start = System.currentTimeMillis();
        /**
         * 有返回值
         */
//        testRecursiveTask(datas);
        /**
         * 無返回值
         */
        testRecursiveAction(datas);
        System.out.println(System.currentTimeMillis() - start);
        Thread.sleep(10000L);
    }

    public static void testRecursiveTask(String[] datas){
        TestRecursiveTask action = new TestRecursiveTask(datas);
        ForkJoinPool fork = new ForkJoinPool();
        Future<TaskResultDTO> future = fork.submit(action);
        try {
           TaskResultDTO taskResultDTO =  future.get();
            System.out.println(taskResultDTO.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }


    /**
     * @author fengxg
     * @description  測試沒有返回結(jié)果的任務(wù)
     * @params []
     * @return void
     * @data 10:15 2018/7/25
     */
    public static void testRecursiveAction(String[] datas){
        TestRecursiveAction action = new TestRecursiveAction(datas);
        ForkJoinPool fork = new ForkJoinPool();
        Future future = fork.submit(action);
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

So?

? ? 本示例只做fork-join應(yīng)用展示哥桥,后續(xù)會有專門文章對其原理進行探討辙浑。
? ? 對于<font color="red">批量</font>操作<font color="red">獨立任務(wù)</font>的需求,fork/join 架構(gòu)有著明顯優(yōu)勢拟糕,尤其是在任務(wù)量大的時候判呕。 但需要注意的是倦踢,多線程能發(fā)揮優(yōu)勢的環(huán)境是<font color="red">多核CPU(運行環(huán)境)</font>。除此以外侠草,需要使用者 針對需求自行調(diào)試出最佳分片(影響因素主要有CPU核數(shù)辱挥,任務(wù)量規(guī)模)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末边涕,一起剝皮案震驚了整個濱河市般贼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌奥吩,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蕊梧,死亡現(xiàn)場離奇詭異霞赫,居然都是意外死亡,警方通過查閱死者的電腦和手機肥矢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門端衰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人甘改,你說我怎么就攤上這事旅东。” “怎么了十艾?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵抵代,是天一觀的道長。 經(jīng)常有香客問我忘嫉,道長荤牍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任庆冕,我火速辦了婚禮康吵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘访递。我一直安慰自己晦嵌,他們只是感情好,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布拷姿。 她就那樣靜靜地躺著惭载,像睡著了一般。 火紅的嫁衣襯著肌膚如雪响巢。 梳的紋絲不亂的頭發(fā)上棕兼,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天,我揣著相機與錄音抵乓,去河邊找鬼伴挚。 笑死靶衍,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的茎芋。 我是一名探鬼主播颅眶,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼田弥!你這毒婦竟也來了涛酗?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤偷厦,失蹤者是張志新(化名)和其女友劉穎商叹,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體只泼,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡剖笙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了请唱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弥咪。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖十绑,靈堂內(nèi)的尸體忽然破棺而出聚至,到底是詐尸還是另有隱情,我是刑警寧澤本橙,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布扳躬,位于F島的核電站,受9級特大地震影響甚亭,放射性物質(zhì)發(fā)生泄漏坦报。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一狂鞋、第九天 我趴在偏房一處隱蔽的房頂上張望片择。 院中可真熱鬧,春花似錦骚揍、人聲如沸字管。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嘲叔。三九已至,卻和暖如春抽活,著一層夾襖步出監(jiān)牢的瞬間硫戈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工下硕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留丁逝,地道東北人汁胆。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像霜幼,于是被迫代替她去往敵國和親嫩码。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容