5. JDK8的并行數(shù)據(jù)處理

一贵少、簡(jiǎn)述

JDK8為了讓處理大數(shù)據(jù)量集合更快速组砚,使用了并行的形式來進(jìn)行處理峻呕。在上面的例子中我們也看到了利职,如果我們需要一個(gè)并行流的話,只要對(duì)一個(gè)集合打開parallelStream即可瘦癌。在JDK7以前猪贪,想要對(duì)一個(gè)集合進(jìn)行并行處理似乎是一件困難的事情。所以這一篇文章我們可以看看JDK8是怎么實(shí)現(xiàn)并行處理的讯私。

二热押、將順序流轉(zhuǎn)換為并行流

有一個(gè)需求是,求從1開始到n的所有數(shù)字的和斤寇。
JDK7的做法是:

/**
 * JDK7的做法
 */
@Test
public void test01(){
    long l = iterativeSum(100);
    System.out.println(l);
}
public long iterativeSum(long n){
    long res = 0;
    for(long i = 0; i <= n; i++){
        res += i;
    }
    return res;
}

JDK8有兩種方式桶癣,一種是順序流,另外一種就是并行流了娘锁。
順序流的方式:

/**
 * JDK8的順序流做法
 */
@Test
public void test02(){
    long l = sequentialSum(100);
    System.out.println(l);
}
/** 順序流的方式 */
public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
            .limit(n)
            .reduce(0L, Long::sum);
}

然后我們只要對(duì)以上的代碼加以修改就可以變成并行流的方式了:

/**
 * JDK8的并行流做法
 */
@Test
public void test03(){
    long l = parallelSum(100);
    System.out.println(l);
}
/** 并行流的方式 */
public long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
            .limit(n)
            .parallel()// 只需要對(duì)一個(gè)順序流調(diào)用此方法牙寞,就可以實(shí)現(xiàn)順序流到并行流的轉(zhuǎn)換
            .reduce(0L, Long::sum);
}

注意

這里有個(gè)需要注意的地方就是,可能有人會(huì)以為我們可以通過調(diào)用不同的轉(zhuǎn)換(parallel轉(zhuǎn)換為并行莫秆,sequential轉(zhuǎn)換為順序流)來控制到每一步操作是否通過順序流還是并行流间雀,但是其實(shí)并不是這樣的,最后一次調(diào)用會(huì)影響到整個(gè)流的操作镊屎,例如:
Stream.parallel().filter(...). sequential().map(...).parallel().reduce(..)
這個(gè)語句其實(shí)整個(gè)操作都是并行操作的惹挟,因?yàn)樵谶@段代碼中,最后一次調(diào)用的方法是parallel

使用的核心數(shù)

并行流使用的核心數(shù)是默認(rèn)的處理器的數(shù)量杯道,如果需要更改匪煌,我們可以通過java.util.concurrent.ForkJoinPool.common.paramllelism來修改默認(rèn)的核心數(shù)量责蝠。比如通過調(diào)用的System.setProperty來更改党巾。但是默認(rèn)核心數(shù)量是一個(gè)很好的設(shè)置萎庭,如果沒有特別的需求的話并不建議修改。

/**
 * 查看電腦核心數(shù)量
 */
@Test
public void test04(){
    System.out.println(Runtime.getRuntime().availableProcessors());
}

正確使用并行

在使用并行的時(shí)候我們應(yīng)該盡量去避免對(duì)一個(gè)值進(jìn)行修改齿拂,因?yàn)樵诙嗑€程情況下驳规,這個(gè)值的修改會(huì)被競(jìng)爭(zhēng),這樣就會(huì)出現(xiàn)計(jì)算結(jié)果錯(cuò)誤的后果署海,但是當(dāng)我們?nèi)ソo這個(gè)值加上鎖的時(shí)候吗购,這個(gè)操作又失去了并行的意義。

/**
 * 并行流錯(cuò)誤使用示范
 */
@Test
public void test05() {
    /** 順序流的方式 */
    Accmulator accmulator = new Accmulator();
    LongStream.rangeClosed(1, 100).forEach(accmulator::add);
    System.out.println(accmulator.total);// 5050

    /** 并行流的方式 */
    for (int i = 0; i < 10; i++) {
        accmulator = new Accmulator();
        LongStream.rangeClosed(1, 100).parallel().forEach(accmulator::add);
        System.out.println(accmulator.total);
    }
    /*
    4004
    4872
    4952
    5050
    4874
    4719
    4935
    4798
    5050
    4392
     */
}
/**
 * 創(chuàng)建一個(gè)類用于修改這個(gè)類里的變量
 */
class Accmulator {
    long total = 0;

    public void add(long v) {
        total += v;
    }
}

當(dāng)輸出來的值只有一次是正確的時(shí)候砸狞,性能似乎就變得不太那么重要了捻勉。

三、使用分支/合并框架進(jìn)行求和

JDK8并行流的使用我們需要考慮一定的因素刀森,并不是所有的操作都是用并行流就是好的踱启。如果數(shù)據(jù)量較小,使用并行流并不比使用順序流的速度要快研底,因?yàn)閯?chuàng)建線程是需要耗費(fèi)資源的埠偿。

接下來就來體驗(yàn)一下JDK7的ForkJoin框架。(說真的榜晦,我寫代碼這么久以來還沒用過這個(gè)框架...)

如果使用過線程池的同學(xué)就知道冠蒋,ExecutorService是用來配置系統(tǒng)中線程池信息的,而ForkJoin就是對(duì)該接口的一個(gè)實(shí)現(xiàn)乾胶,稱為ForkJoinPool抖剿,顧名思義就是一個(gè)分割線程的池。而這些任務(wù)就在這個(gè)池里面獲取線程資源進(jìn)行執(zhí)行识窿。

需求:定義一個(gè)函數(shù)牙躺,對(duì)Long類型的前n個(gè)數(shù)進(jìn)行求和。

package cn.liweidan.forkandjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * <p>Desciption:并行求和Long數(shù)組</p>
 * CreateTime : 2017/7/17 上午11:25
 * Author : Weidan
 * Version : V1.0
 */
public class ForkJoinCaculator extends RecursiveTask<Long> {// 需要繼承RecursiveTask腕扶,傳遞并行處理后返回值的類型孽拷;如果執(zhí)行的沒有返回值,則繼承RecursiveAction

    /** 需要進(jìn)行計(jì)算的long數(shù)組 */
    private final long[] numbers;
    /** 子任務(wù)處理數(shù)組的其實(shí)和終止位置 */
    private final int start;
    private final int end;

    /** 進(jìn)行數(shù)組分割的閾值 */
    public static final long THRESHOLD = 10_000;

    public ForkJoinCaculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinCaculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    /**
     * JDK會(huì)通過線程池中的一個(gè)線程調(diào)用該方法半抱,這里需要我們確定一個(gè)條件判斷脓恕,判斷這個(gè)任務(wù)是否需要繼續(xù)進(jìn)行切割
     * 在這里我們運(yùn)行的時(shí)候,定義一個(gè)閾值窿侈,如果數(shù)組小于這個(gè)閾值炼幔,則直接進(jìn)行計(jì)算,如果大于這個(gè)閾值史简,則對(duì)這個(gè)數(shù)組進(jìn)行分割
     * @return
     */
    @Override
    protected Long compute() {
        /** 計(jì)算長(zhǎng)度 */
        int length = end - start;
        /** 如果長(zhǎng)度小于閾值則直接進(jìn)行計(jì)算 */
        if(length <= THRESHOLD){
            return computeSequentially();
        }

        /** 創(chuàng)建一個(gè)從開始到開始+長(zhǎng)度的一半的數(shù)組計(jì)算線程 */
        ForkJoinCaculator leftTask =
                new ForkJoinCaculator(numbers, start, start + length / 2);
        /** 加入線程池 */
        leftTask.fork();

        /** 創(chuàng)建一個(gè)任務(wù)為后面剩余部分求和 */
        ForkJoinCaculator rightTask =
                new ForkJoinCaculator(numbers, start + length / 2, end);
        /** 同步執(zhí)行 */
        Long rightRes = rightTask.compute();

        /** 讀取剛剛創(chuàng)建的第一個(gè)任務(wù)的結(jié)果乃秀,join方法會(huì)等待該線程返回結(jié)果,未完成線程會(huì)阻塞 */
        Long leftRes = leftTask.join();

        /** 返回左右兩個(gè)任務(wù)計(jì)算出來的結(jié)果 */
        return leftRes + rightRes;
    }

    /**
     * 用于順序計(jì)算
     * @return
     */
    private Long computeSequentially() {
        long sum = 0;
        for (long number : numbers) {
            sum += number;
        }
        return sum;
    }

    public static long forkjoinSum(long n) {
        long numbers[] = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinCaculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

注意代碼中的join方法是會(huì)進(jìn)行線程阻塞的,在那里需要等待每個(gè)任務(wù)的返回結(jié)果再進(jìn)行下去跺讯。

接下來就是運(yùn)行了枢贿。

public class ForkJoinCaculatorTest {
    @Test
    public void forkjoinSum() throws Exception {
        long startTime = System.nanoTime();
        long l = ForkJoinCaculator.forkjoinSum(10_000_000);
        long endTime = System.nanoTime();
        System.out.println("共耗時(shí):" + ((endTime -startTime)/1000000) + "毫秒, 結(jié)果是:" + l);
    }

}

運(yùn)行完成以后,輸出共耗時(shí):3400毫秒, 結(jié)果是:51200005120000000

運(yùn)行以上程序的時(shí)候刀脏,Java會(huì)把長(zhǎng)度為10_000_000的long數(shù)組傳遞給了ForkJoinCaculator局荚,期間通過compute進(jìn)行分割,將數(shù)組分割到足夠小的長(zhǎng)度的時(shí)候進(jìn)行計(jì)算愈污,最后耀态,程序把每個(gè)線程計(jì)算出來的結(jié)果進(jìn)行合并,得出以上的結(jié)果暂雹。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末首装,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子杭跪,更是在濱河造成了極大的恐慌仙逻,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揍魂,死亡現(xiàn)場(chǎng)離奇詭異桨醋,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)现斋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門喜最,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人庄蹋,你說我怎么就攤上這事瞬内。” “怎么了限书?”我有些...
    開封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵虫蝶,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我倦西,道長(zhǎng)能真,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任扰柠,我火速辦了婚禮粉铐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘卤档。我一直安慰自己蝙泼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開白布劝枣。 她就那樣靜靜地躺著汤踏,像睡著了一般织鲸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上溪胶,一...
    開封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天搂擦,我揣著相機(jī)與錄音,去河邊找鬼载荔。 笑死盾饮,一個(gè)胖子當(dāng)著我的面吹牛采桃,可吹牛的內(nèi)容都是我干的懒熙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼普办,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼工扎!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起衔蹲,我...
    開封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤肢娘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后舆驶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體橱健,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年沙廉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拘荡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撬陵,死狀恐怖珊皿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情巨税,我是刑警寧澤蟋定,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站草添,受9級(jí)特大地震影響驶兜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜远寸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一抄淑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧而晒,春花似錦蝇狼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)贱枣。三九已至,卻和暖如春颤专,著一層夾襖步出監(jiān)牢的瞬間纽哥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工栖秕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留春塌,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓簇捍,卻偏偏與公主長(zhǎng)得像只壳,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子暑塑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容