并行數(shù)據(jù)處理與性能

  • 并行流parallelStream
    并行流就是一個(gè)把內(nèi)容分成多個(gè)數(shù)據(jù)塊肄扎,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流嫌松。這樣就可以自動(dòng)把給定操作的工作負(fù)荷分配給多核處理器的所有內(nèi)核,讓這些處理器都忙起來(lái)恶耽。
    例如:下面的方法,接收數(shù)字n作為參數(shù)衅鹿,返回從1到給定參數(shù)的所有數(shù)字的和

    public static long sequentialSum(long n) {
         return Stream.iterate(1L, i -> i + 1)//生成自然數(shù)無(wú)限流
                      .limit(n)//限制前n個(gè)數(shù)
                      .reduce(0L, Long::sum);//對(duì)所有數(shù)字求和
    

  • iterate,Stream API提供由兩個(gè)靜態(tài)方法操你個(gè)函數(shù)生成流(常見(jiàn)的還有三種方法構(gòu)建流过咬,由值構(gòu)建大渤、由數(shù)組構(gòu)建、由文件構(gòu)建)掸绞,Stream.iterateStream.generate可以創(chuàng)建無(wú)限流泵三,會(huì)根據(jù)給定的函數(shù)按需創(chuàng)建。注意與generate區(qū)別:
    generate不是一次對(duì)每個(gè)新生成的值應(yīng)用函數(shù)衔掸,例如以下使用的供應(yīng)源是無(wú)狀態(tài)的烫幕,即它不會(huì)在任何地方記錄任何值,以備以后計(jì)算使用敞映。

    iterate供應(yīng)源不一定是無(wú)狀態(tài)的较曼。可以創(chuàng)建存儲(chǔ)狀態(tài)的供應(yīng)源驱显,它可以修改狀態(tài)诗芜,并在為流生成下一個(gè)值時(shí)使用

     Stream.generate(Math::random)
           .limit(5)
           .forEach(System.out::println);
    

這時(shí),若n特別大的時(shí)候埃疫,就可以做并行處理伏恐,將順序流轉(zhuǎn)換為并行流:對(duì)順序流調(diào)用parallel即可:在處理時(shí),會(huì)把Stream在內(nèi)部分成幾塊栓霜,在不同的塊獨(dú)立并行歸納操作翠桦,最后,同一個(gè)歸納操作會(huì)將各個(gè)子流的部分歸納結(jié)果合并起來(lái)胳蛮,得到整個(gè)原始流的歸納結(jié)果销凑。

  public static long parallelSum(long n) {
       return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .parallel()//將流轉(zhuǎn)換為并行流
                    .reduce(0L, Long::sum);
  } 
image.png

但是,在實(shí)際操作時(shí)仅炊,對(duì)順序流調(diào)用parallel方法并不意味著對(duì)流有任何實(shí)際的變化斗幼。它內(nèi)部實(shí)際上就是設(shè)了一個(gè)boolean標(biāo)志,表示你想讓調(diào)用parallel之后進(jìn)行的所有操作都并行執(zhí)行抚垄。對(duì)并行流調(diào)用sequential方法就可以把它變成順序流蜕窿。將兩個(gè)方法結(jié)合起來(lái),就可以更細(xì)化地控制在遍歷流時(shí)哪些操作要并行執(zhí)行呆馁,哪些要順序執(zhí)行桐经。例如:

  stream.parallel()
        .filter(...)
        .sequential()
        .map(...)
        .parallel()//最后一次parallel或sequential調(diào)用會(huì)影響整個(gè)流水線
        .reduce(); 

并行流默認(rèn)使用的線程使用了默認(rèn)的ForkJoinPool,它默認(rèn)的線程數(shù)量就是你的處理器數(shù)量浙滤。

  • 性能
    實(shí)際上阴挣,上面利用并行版本會(huì)比順序版本滿很多,主要有兩方面原因:

    1. iterate生成的是裝箱的對(duì)象纺腊,必須拆箱成數(shù)字才能求和畔咧;
    2. 很難把iterate分成多個(gè)獨(dú)立塊來(lái)并行執(zhí)行茎芭,由于某些流操作會(huì)比其他車(chē)操作更容易并行化,之所以iterate很難被分成獨(dú)立執(zhí)行的小塊盒卸,是因?yàn)槊看螒?yīng)用這個(gè)函數(shù)都要依賴前一次應(yīng)用的結(jié)果骗爆,也就說(shuō),在這種情況下蔽介,歸納進(jìn)程不像圖7.1那樣進(jìn)行摘投,整張數(shù)字列表在歸納過(guò)程開(kāi)始時(shí)沒(méi)有準(zhǔn)備好,因而無(wú)法有效地把流劃分為小塊來(lái)并行處理虹蓄。把流標(biāo)記成并行犀呼,其實(shí)是給順序處理增加了開(kāi)銷(xiāo),它還要把每次求和操作分到一個(gè)不同的線程上薇组。

    所以說(shuō)外臂,并行編程可能很復(fù)雜,如果用的不對(duì)(iterate)律胀,會(huì)讓整體的性能變差宋光。

  • 解決方法
    使用LongStream.rangeClosed方法,和iterate相比有兩個(gè)優(yōu)點(diǎn):

    1. LongStream.rangeClosed直接產(chǎn)生原始類(lèi)型的long數(shù)字炭菌,沒(méi)有裝箱拆箱的開(kāi)銷(xiāo)罪佳。

    2. LongStream.rangeClosed會(huì)生成數(shù)字范圍,很容易拆分為獨(dú)立的小塊黑低。

      public static long parallelRangedSum(long n) {
           return LongStream.rangeClosed(1, n)
                            .parallel()
                            .reduce(0L, Long::sum);
       } 
      

并行化是需要付出代價(jià)的赘艳,并行化過(guò)程本身需要對(duì)流做遞歸劃分,把每個(gè)子流的歸納操作分配到不同的線程克握,然后把這些操作的結(jié)果合并成一個(gè)值蕾管。但在多個(gè)內(nèi)核之間移動(dòng)數(shù)據(jù)的代價(jià)也可能比你想的要大,所以很重要的一點(diǎn)是要保證在內(nèi)核中并行執(zhí)行工作的時(shí)間比在內(nèi)核之間傳輸數(shù)據(jù)的時(shí)間長(zhǎng)菩暗。


  • 正確使用并行流
    上面直接使用并行化的錯(cuò)誤原因就是掰曾,使用的算法改變了某些共享狀態(tài)。
    1. 留意裝箱停团。自動(dòng)裝箱和拆箱操作會(huì)大大降低性能旷坦。Java 8中有原始類(lèi)型流(IntStreamLongStream客蹋、DoubleStream)來(lái)避免這種操作,
    2. 有些操作本身在并行流上的性能就比順序流差孽江。特別是limitfindFirst等依賴于元素順序的操作讶坯,它們?cè)诓⑿辛魃蠄?zhí)行的代價(jià)非常大。例如岗屏,findAny會(huì)比findFirst性能好辆琅,因?yàn)樗灰欢ㄒ错樞騺?lái)執(zhí)行漱办。
    3. 對(duì)于較小的數(shù)據(jù)量,選擇并行流幾乎從來(lái)都不是一個(gè)好的決定婉烟。并行處理少數(shù)幾個(gè)元素的好處還抵不上并行化造成的額外開(kāi)銷(xiāo)娩井。
    4. 要考慮流背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。例如似袁,ArrayList的拆分效率比LinkedList高得多洞辣,因?yàn)榍罢哂貌恢闅v就可以平均拆分,而后者則必須遍歷昙衅。另外扬霜,用range工廠方法創(chuàng)建的原始類(lèi)型流也可以快速分解。
    5. 還要考慮終端操作中合并步驟的代價(jià)是大是卸妗(例如Collector中的combiner方法)著瓶。如果這一步代價(jià)很大,那么組合每個(gè)子流產(chǎn)生的部分結(jié)果所付出的代價(jià)就可能會(huì)超出通過(guò)并行流得到的性能提升啼县。

根據(jù)可分解性總結(jié)的一些流數(shù)據(jù)源適不適合并行

可分解性
ArrayList 極佳
LinkedList
IntStream.range 極差
Stream.iterate
HashSet
TreeSet

分支/合并框架

分支/合并框架的目的是以遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù)材原,然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果。它是ExecutorService接口的一個(gè)實(shí)現(xiàn)季眷,它把子任務(wù)分配給線程池(稱為ForkJoinPool)中的工作線程余蟹。

  1. 使用RecursiveTask
    把任務(wù)提交到這個(gè)池,必須創(chuàng)建RecursiveTask<R>的一個(gè)子類(lèi)瘟裸,其中R是并行化任務(wù)(以及所有子任務(wù))產(chǎn)生的結(jié)果類(lèi)型客叉,或者如果任務(wù)不返回結(jié)果,則是RecursiveAction類(lèi)型,則需要實(shí)現(xiàn)它的抽象方法compute:
    protected abstract R compute();
    這個(gè)方法同時(shí)定義了將任務(wù)拆分成子任務(wù)的邏輯话告,以及無(wú)法再拆分或不方便再拆分時(shí)兼搏,生成單個(gè)子任務(wù)結(jié)果的邏輯。

     if (任務(wù)足夠小或不可分) {
         順序計(jì)算該任務(wù)
     } else {
          將任務(wù)分成兩個(gè)子任務(wù)
          遞歸調(diào)用本方法沙郭,拆分每個(gè)子任務(wù)佛呻,等待所有子任務(wù)完成
          合并每個(gè)子任務(wù)的結(jié)果
     } 
    

例如:利用分支/合并框架執(zhí)行并行求和

public class ForkJoinSumCalculator
             extends java.util.concurrent.RecursiveTask<Long> {

     private final long[] numbers;
     private final int start;
     private final int end;

     public static final long THRESHOLD = 10_000;//不再將任務(wù)分成子任務(wù)的大小

     public ForkJoinSumCalculator(long[] numbers) {
           this(numbers, 0, numbers.length);
     }

     private ForkJoinSumCalculator(long[] numbers, int start, int end) {
           this.numbers = numbers;
           this.start = start;
           this.end = end;
     }

   @Override
   protected Long compute() {
         int length = end - start;
         if (length <= THRESHOLD) {//如果大小小于或等于閾值,順序計(jì)算結(jié)果
               return computeSequentially();
         }
         ForkJoinSumCalculator leftTask =
                   new ForkJoinSumCalculator(numbers, start, start + length/2);
         leftTask.fork();//利用另一個(gè)ForkJoinPool線程異步執(zhí)行新創(chuàng)建的子任務(wù)
         ForkJoinSumCalculator rightTask =
                   new ForkJoinSumCalculator(numbers, start + length/2, end);//創(chuàng)建一個(gè)任務(wù)為數(shù)組的后一半數(shù)組求和
         Long rightResult = rightTask.compute();//同步執(zhí)行第二個(gè)子任務(wù)病线,有可能允許進(jìn)一步遞歸劃分
         Long leftResult = leftTask.join();//讀取第一個(gè)子任務(wù)的結(jié)果吓著,如果尚未完成就等待
         return leftResult + rightResult;
   }

    //在子任務(wù)不再可分時(shí)計(jì)算結(jié)果的簡(jiǎn)單算法
   private long computeSequentially() {
         long sum = 0;
         for (int i = start; i < end; i++) {{
             sum += numbers[i];
         }
       return sum;
   }
} 

把數(shù)字?jǐn)?shù)組傳給ForkJoinSumCalculator的構(gòu)造函數(shù),即可實(shí)現(xiàn)并行對(duì)前n個(gè)自然數(shù)求和:

public static long forkJoinSum(long n) {
     long[] numbers = LongStream.rangeClosed(1, n).toArray();
     //ForkJoinTask為RecursiveTask的父類(lèi)
     ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
     return new ForkJoinPool().invoke(task);//invoke用來(lái)執(zhí)行某個(gè)對(duì)象的目標(biāo)方法
} 
  • 工作竊人吞簟:
    分支/合并框架工程用一種稱為工作竊劝筝骸(work stealing)用于在線程池中的工作線程之間重新分配和平衡任務(wù)。在實(shí)際應(yīng)用中惕耕,這意味著這些任務(wù)差不多被平均分配到ForkJoinPool中的所有線程上纺裁。每個(gè)線程都為分配給它的任務(wù)保存一個(gè)雙向鏈?zhǔn)疥?duì)列,每完成一個(gè)任務(wù),就會(huì)從隊(duì)列頭上取出下一個(gè)任務(wù)開(kāi)始執(zhí)行欺缘《霸ィ基于前面所述的原因,某個(gè)線程可能早早完成了分配給它的所有任務(wù)谚殊,也就是它的隊(duì)列已經(jīng)空了丧鸯,而其他的線程還很忙。這時(shí)嫩絮,這個(gè)線程并沒(méi)有閑下來(lái)丛肢,而是隨機(jī)選了一個(gè)別的線程,從隊(duì)列的尾巴上“偷走”一個(gè)任務(wù)絮记。這個(gè)過(guò)程一直繼續(xù)下去摔踱,直到所有的任務(wù)都執(zhí)行完畢,所有的隊(duì)列都清空怨愤,有助于更好地在工作線程之間平衡負(fù)載派敷。

    image.png

  • Spliterator接口
    Iterator一樣,Spliterator也用于遍歷數(shù)據(jù)源中的元素撰洗,但它是為了并行執(zhí)行而設(shè)計(jì)的篮愉。

    public interface Spliterator<T> {
         boolean tryAdvance(Consumer<? super T> action);//類(lèi)似于Iterator,按順序一個(gè)一個(gè)使用Spliterator中的元素差导,如果有其他元素需要遍歷就返回true
         Spliterator<T> trySplit();//可以把一些元素劃出去分給第二個(gè)Spliterator试躏,讓他們并行處理
         long estimateSize();//估計(jì)剩下還有多少元素需要遍歷
         int characteristics();//代表Spliterator本身特征集的編碼∩韬郑可以用這些特征來(lái)更好地控制和優(yōu)化它的使用颠蕴。
    } 
    

    Stream拆分成多個(gè)部分的算法是一個(gè)遞歸過(guò)程,如圖所示助析。第一步是對(duì)第一個(gè)Spliterator調(diào)用trySplit犀被,生成第二個(gè)Spliterator。第二步對(duì)這兩個(gè)Spliterator調(diào)用trysplit外冀,這樣總共就有了四個(gè)Spliterator寡键。這個(gè)框架不斷對(duì)Spliterator調(diào)用trySplit直到它返回null,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割雪隧,如第三步所示西轩。最后,這個(gè)遞歸拆分過(guò)程到第四步就終止了脑沿,這時(shí)所有的Spliterator在調(diào)用trySplit時(shí)都返回了null藕畔。

image.png

//7.3.2自定義Spliterator

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市庄拇,隨后出現(xiàn)的幾起案子注服,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祠汇,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡熄诡,警方通過(guò)查閱死者的電腦和手機(jī)可很,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)凰浮,“玉大人我抠,你說(shuō)我怎么就攤上這事⊥嗉耄” “怎么了菜拓?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)笛厦。 經(jīng)常有香客問(wèn)我纳鼎,道長(zhǎng),這世上最難降的妖魔是什么裳凸? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任贱鄙,我火速辦了婚禮,結(jié)果婚禮上姨谷,老公的妹妹穿的比我還像新娘逗宁。我一直安慰自己,他們只是感情好梦湘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布瞎颗。 她就那樣靜靜地躺著,像睡著了一般捌议。 火紅的嫁衣襯著肌膚如雪哼拔。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天禁灼,我揣著相機(jī)與錄音管挟,去河邊找鬼。 笑死弄捕,一個(gè)胖子當(dāng)著我的面吹牛僻孝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播守谓,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼穿铆,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了斋荞?” 一聲冷哼從身側(cè)響起荞雏,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后凤优,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體悦陋,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年筑辨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了俺驶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡棍辕,死狀恐怖暮现,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情楚昭,我是刑警寧澤栖袋,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站抚太,受9級(jí)特大地震影響塘幅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜尿贫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一晌块、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧帅霜,春花似錦匆背、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至搂根,卻和暖如春珍促,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背剩愧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工猪叙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仁卷。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓穴翩,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親锦积。 傳聞我的和親對(duì)象是個(gè)殘疾皇子芒帕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 并行流:把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流丰介。 先做一個(gè)簡(jiǎn)單的測(cè)試背蟆,測(cè)試傳統(tǒng)for循環(huán)鉴分,...
    墻角的牽牛花閱讀 267評(píng)論 0 0
  • 并行流 parallel() 如果每次應(yīng)用函數(shù)都要依賴前一次應(yīng)用的結(jié)果带膀,并行只會(huì)比順序處理增加開(kāi)銷(xiāo)志珍。錯(cuò)用并行流的首...
    上海馬超23閱讀 565評(píng)論 0 0
  • 緒論 之前的幾章中,我們已經(jīng)看到了新的Stream接口可以讓你以聲明性方式處理數(shù)據(jù)集垛叨。我們還解釋了將外部迭代換為內(nèi)...
    潯它芉咟渡閱讀 3,229評(píng)論 0 2
  • Java8 in action 沒(méi)有共享的可變數(shù)據(jù)碴裙,將方法和函數(shù)即代碼傳遞給其他方法的能力就是我們平常所說(shuō)的函數(shù)式...
    鐵牛很鐵閱讀 1,233評(píng)論 1 2
  • 素弦管笛起秋風(fēng)。 夢(mèng)巫山点额,仙子青蔥。 舒袖展柔情莺琳, 飛天瞬若輕鴻还棱。 腸聲斷,蹙淚微紅惭等。 離別去珍手,非是簫郎弄曲, 月...
    斷紅塵閱讀 154評(píng)論 0 0