2017-08-10 Java并發(fā)框架——Fork/Join

1 資料集合

Fork/Join框架(一)引言
Fork/Join框架(二)創(chuàng)建一個Fork/Join池
Fork/Join框架(三)加入任務的結(jié)果
Fork/Join框架(四)異步運行任務
Fork/Join框架(五)在任務中拋出異常
Fork/Join框架(六)取消任務

全部摘自并發(fā)編程網(wǎng)莫其,特別好的翻譯和校對蔽挠,可以看特別多的內(nèi)容毛俏,感謝方騰飛,感謝并發(fā)編程網(wǎng) 希望你們越做越好溢十。

2 為什么要用?——業(yè)務需求

有200萬左右的歷史用戶需要贈送會員并根據(jù)用戶行為給予30天或365天的期限瓢喉。
因為有業(yè)務需求摹蘑,所以無法直接走DB變更。
因為用戶量并沒有特別大扰法,也沒用zk或者其他分布式服務均分用戶去做(服務器同時啟動對DB壓力會很大)蛹含。

最后準備用預發(fā)單節(jié)點去刷200萬的用戶。很適合用多線程去處理塞颁。

3 Fork/Join 框架

多線程經(jīng)驗并不豐富浦箱,但是知道用線程池來管理線程比較好,查詢一些Java并發(fā)的類祠锣,很多都是集中在concurrent包中酷窥,F(xiàn)ork/Join框架也在其中(Duog Lea 大神威武)。

既然是一個框架锤岸,自然已經(jīng)處理了線程池和待任務竖幔。使用過程主要依賴ForkJoinPool 和 ForkJoinTask的兩個子類RecursiveTask和RecursiveAction;ForkJoinPool繼承自AbstractExecutorService是偷,F(xiàn)orkJoinTask實現(xiàn)了Future接口拳氢。

典型ForkJoinPool的使用:

// 聲明pool
ForkJoinPool pool = new ForkJoinPool();
// 執(zhí)行task
pool.execute(task);
// 觀察執(zhí)行中
do {
    System.out.printf("******************************************\n");
    System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
    System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
    System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
    System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
    System.out.printf("******************************************\n");
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
} while (!task.isDone());
// 關閉池
pool.shutdown();
// 等待關閉
try {
    pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

典型ForkJoinTask的設計使用:

If (problem size < default size){
  tasks=divide(task);
  execute(tasks);
} else {
  resolve problem using another algorithm;
}

ForkJoinTask簡單實例(重寫compute方法):

@Override
protected Integer compute() {
    int result = 0;
    if (end - start < 10) {
        result = processLines(document, start, end, word);
    } else {
        int mid = (start + end) / 2;
        DocumentTask task1 = new DocumentTask(document, start, mid, word);
        DocumentTask task2 = new DocumentTask(document, mid, end, word);
        invokeAll(task1, task2);
        try {
            result = groupResults(task1.get(), task2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    return result;
}

4 結(jié)合業(yè)務場景使用 Fork/Join

Main類使用ForkJoinPool:

RefreshAgentToMemberTask task = new RefreshAgentToMemberTask(startId, endId, applicationContext);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(task);

do {
    logger.info("Thread Count: {}", pool.getActiveThreadCount());
    logger.info("Thread Steal: {}", pool.getStealCount());
    logger.info("Parallelism: {}", pool.getParallelism());
    try {
        TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
} while (!task.isDone());

pool.shutdown();

if (task.isCompletedNormally()) {
    logger.info("The process has completed normally.");
}

是不是很類似于之前的例子募逞?其實幾乎所有對ForkJoinPool使用方式都類似。

ForkJoinTask 類設計:

    public class RefreshAgentToMemberTask extends RecursiveAction {
        private static final long serialVersionUID = -7434939182574934204L;
        private Logger logger = LoggerFactory.getLogger(RefreshAgentToMemberTask.class);

        private long startId;
        private long endId;
        private ApplicationContext applicationContext; // 用于獲取spring bean
        private static final int THRESHOLD = 100;//10000; TODO test=100, prod=10000

        private Map<String, Long> times = new HashMap<>();

        RefreshAgentToMemberTask(long startId, long endId, ApplicationContext applicationContext) {
            this.startId = startId;
            this.endId = endId;
            this.applicationContext = applicationContext;
        }

        @Override
        protected void compute() {
            if (endId - startId <= THRESHOLD) {
                doAgentToMember();
            } else {
                long middleId = (startId + endId) / 2;
                RefreshAgentToMemberTask task1 = new RefreshAgentToMemberTask(startId, middleId, applicationContext);
                RefreshAgentToMemberTask task2 = new RefreshAgentToMemberTask(middleId, endId, applicationContext);
                invokeAll(task1, task2);
            }
        }

        /**
         * 到達閥值以內(nèi)的處理方法
         */
        private void doAgentToMember() {
            logger.info("task doAgentToMember range: startId={}, endId={}.", startId, endId);
            long start = System.currentTimeMillis();
            ......
            logger.info("startId={}, endId={}. total cost={}", startId, endId, (System.currentTimeMillis() - start));
        }
    }

5 測試結(jié)果

很遺憾馋评,沒有在線上環(huán)境使用放接,因為功能被大老板砍了。沒想到技術沒干成的事最后是老板砍了留特,估計產(chǎn)品也沒想到纠脾。

不過在測試環(huán)境還是做了嘗試,利用本機(Mac JVM 能拿到的CPU核數(shù)是4)蜕青,數(shù)據(jù)量1.8萬苟蹈,兩次執(zhí)行遍歷所有用時分別為330s,360s右核。全部遍歷大概需要 38,888s ~ 10小時慧脱。挺好的,預發(fā)機器估計比我性能好贺喝,上班點一下菱鸥,下班了就可以驗證。

6 感想

業(yè)務有需要躏鱼,加上一點點平時的積累氮采,沒準就能學到一些有用的新技術。為了技術點贊~染苛。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鹊漠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子殖侵,更是在濱河造成了極大的恐慌贸呢,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拢军,死亡現(xiàn)場離奇詭異楞陷,居然都是意外死亡,警方通過查閱死者的電腦和手機茉唉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門固蛾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人度陆,你說我怎么就攤上這事艾凯。” “怎么了懂傀?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵趾诗,是天一觀的道長。 經(jīng)常有香客問我,道長恃泪,這世上最難降的妖魔是什么郑兴? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮贝乎,結(jié)果婚禮上情连,老公的妹妹穿的比我還像新娘。我一直安慰自己览效,他們只是感情好却舀,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锤灿,像睡著了一般挽拔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上但校,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天篱昔,我揣著相機與錄音,去河邊找鬼始腾。 笑死,一個胖子當著我的面吹牛空执,可吹牛的內(nèi)容都是我干的浪箭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼辨绊,長吁一口氣:“原來是場噩夢啊……” “哼奶栖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起门坷,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤宣鄙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后默蚌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冻晤,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年绸吸,在試婚紗的時候發(fā)現(xiàn)自己被綠了鼻弧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡锦茁,死狀恐怖攘轩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情码俩,我是刑警寧澤度帮,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站稿存,受9級特大地震影響笨篷,放射性物質(zhì)發(fā)生泄漏瞳秽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一冕屯、第九天 我趴在偏房一處隱蔽的房頂上張望寂诱。 院中可真熱鬧,春花似錦安聘、人聲如沸痰洒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丘喻。三九已至,卻和暖如春念颈,著一層夾襖步出監(jiān)牢的瞬間泉粉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工榴芳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嗡靡,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓窟感,卻偏偏與公主長得像讨彼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子柿祈,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容