Java8 新特性 Stream 流教程

由于最近一直在用scala寫spark程序,發(fā)現java8的流新特性跟scala幾乎一個思維蚜点,所以整理出來互相學習下


image

Stream 流可以說是 Java8 新特性中用起來最爽的一個功能了帝璧,有了它先誉,從此操作集合告別繁瑣的 for 循環(huán)。但是還有很多小伙伴對 Stream 流不是很了解的烁。今天就通過這篇 @Winterbe 的譯文褐耳,一起深入了解下如何使用它吧。

目錄

一渴庆、Stream 流是如何工作的铃芦?

二、不同類型的 Stream 流

三襟雷、Stream 流的處理順序

四刃滓、中間操作順序這么重要?

五耸弄、數據流復用問題

六咧虎、高級操作

  • 6.1 Collect
  • 6.2 FlatMap
  • 6.3 Reduce

七、并行流

八计呈、結語


當我第一次閱讀 Java8 中的 Stream API 時砰诵,說實話征唬,我非常困惑,因為它的名字聽起來與 Java I0 框架中的 InputStreamOutputStream 非常類似茁彭。但是實際上总寒,它們完全是不同的東西。

Java8 Stream 使用的是函數式編程模式理肺,如同它的名字一樣摄闸,它可以被用來對集合進行鏈狀流式的操作。

本文就將帶著你如何使用 Java 8 不同類型的 Stream 操作妹萨。同時您還將了解流的處理順序贪薪,以及不同順序的流操作是如何影響運行時性能的。

我們還將學習終端操作 API reduce眠副,collect 以及flatMap的詳細介紹,最后我們再來深入的探討一下 Java8 并行流竣稽。

注意:如果您還不熟悉 Java 8 lambda 表達式囱怕,函數式接口以及方法引用,您可以先閱讀一下小哈的另一篇譯文 《Java8 新特性教程》

接下來毫别,就讓我們進入正題吧娃弓!

一、Stream 流是如何工作的岛宦?

流表示包含著一系列元素的集合台丛,我們可以對其做不同類型的操作,用來對這些元素執(zhí)行計算砾肺。聽上去可能有點拗口挽霉,讓我們用代碼說話:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream() // 創(chuàng)建流
    .filter(s -> s.startsWith("c")) // 執(zhí)行過濾,過濾出以 c 為前綴的字符串
    .map(String::toUpperCase) // 轉換成大寫
    .sorted() // 排序
    .forEach(System.out::println); // for 循環(huán)打印

// C1
// C2

我們可以對流進行中間操作或者終端操作变汪。小伙伴們可能會疑問侠坎?什么是中間操作?什么又是終端操作裙盾?

image
  • :中間操作會再次返回一個流实胸,所以,我們可以鏈接多個中間操作番官,注意這里是不用加分號的庐完。上圖中的filter 過濾,map 對象轉換徘熔,sorted 排序门躯,就屬于中間操作。
  • :終端操作是對流操作的一個結束動作近顷,一般返回 void 或者一個非流的結果生音。上圖中的 forEach循環(huán) 就是一個終止操作宁否。

看完上面的操作,感覺是不是很像一個流水線式操作呢缀遍。

實際上慕匠,大部分流操作都支持 lambda 表達式作為參數,正確理解域醇,應該說是接受一個函數式接口的實現作為參數台谊。

二、不同類型的 Stream 流

我們可以從各種數據源中創(chuàng)建 Stream 流譬挚,其中以 Collection 集合最為常見锅铅。如 ListSet 均支持 stream() 方法來創(chuàng)建順序流或者是并行流。

并行流是通過多線程的方式來執(zhí)行的减宣,它能夠充分發(fā)揮多核 CPU 的優(yōu)勢來提升性能盐须。本文在最后再來介紹并行流,我們先討論順序流:

Arrays.asList("a1", "a2", "a3")
    .stream() // 創(chuàng)建流
    .findFirst() // 找到第一個元素
    .ifPresent(System.out::println);  // 如果存在漆腌,即輸出

// a1

在集合上調用stream()方法會返回一個普通的 Stream 流贼邓。但是, 您大可不必刻意地創(chuàng)建一個集合,再通過集合來獲取 Stream 流闷尿,您還可以通過如下這種方式:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

例如上面這樣塑径,我們可以通過 Stream.of() 從一堆對象中創(chuàng)建 Stream 流。

除了常規(guī)對象流之外填具,Java 8還附帶了一些特殊類型的流统舀,用于處理原始數據類型intlong以及double劳景。說道這里誉简,你可能已經猜到了它們就是IntStreamLongStream還有DoubleStream盟广。

其中描融,IntStreams.range()方法還可以被用來取代常規(guī)的 for 循環(huán), 如下所示:

IntStream.range(1, 4)
    .forEach(System.out::println); // 相當于 for (int i = 1; i < 4; i++) {}

// 1
// 2
// 3

上面這些原始類型流的工作方式與常規(guī)對象流基本是一樣的,但還是略微存在一些區(qū)別:

  • 原始類型流使用其獨有的函數式接口衡蚂,例如IntFunction代替Function窿克,IntPredicate代替Predicate

  • 原始類型流支持額外的終端聚合操作毛甲,sum()以及average()年叮,如下所示:

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1) // 對數值中的每個對象執(zhí)行 2*n + 1 操作
    .average() // 求平均值
    .ifPresent(System.out::println);  // 如果值不為空,則輸出
// 5.0

但是玻募,偶爾我們也有這種需求只损,需要將常規(guī)對象流轉換為原始類型流,這個時候,中間操作 mapToInt()跃惫,mapToLong() 以及mapToDouble就派上用場了:

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1)) // 對每個字符串元素從下標1位置開始截取
    .mapToInt(Integer::parseInt) // 轉成 int 基礎類型類型流
    .max() // 取最大值
    .ifPresent(System.out::println);  // 不為空則輸出

// 3

如果說叮叹,您需要將原始類型流裝換成對象流,您可以使用 mapToObj()來達到目的:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i) // for 循環(huán) 1->4, 拼接前綴 a
    .forEach(System.out::println); // for 循環(huán)打印

// a1
// a2
// a3

下面是一個組合示例爆存,我們將雙精度流首先轉換成 int 類型流蛉顽,然后再將其裝換成對象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue) // double 類型轉 int
    .mapToObj(i -> "a" + i) // 對值拼接前綴 a
    .forEach(System.out::println); // for 循環(huán)打印

// a1
// a2
// a3

三、Stream 流的處理順序

上小節(jié)中先较,我們已經學會了如何創(chuàng)建不同類型的 Stream 流携冤,接下來我們再深入了解下數據流的執(zhí)行順序。

在討論處理順序之前闲勺,您需要明確一點曾棕,那就是中間操作的有個重要特性 —— 延遲性。觀察下面這個沒有終端操作的示例代碼:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

執(zhí)行此代碼段時菜循,您可能會認為翘地,將依次打印 "d2", "a2", "b1", "b3", "c" 元素。然而當你實際去執(zhí)行的時候癌幕,它不會打印任何內容子眶。

為什么呢?

原因是:當且僅當存在終端操作時序芦,中間操作操作才會被執(zhí)行。

是不是不信粤咪?接下來谚中,對上面的代碼添加 forEach終端操作:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

再次執(zhí)行,我們會看到輸出如下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

輸出的順序可能會讓你很驚訝寥枝!你腦海里肯定會想宪塔,應該是先將所有 filter 前綴的字符串打印出來,接著才會打印 forEach 前綴的字符串囊拜。

事實上某筐,輸出的結果卻是隨著鏈條垂直移動的。比如說冠跷,當 Stream 開始處理 d2 元素時南誊,它實際上會在執(zhí)行完 filter 操作后,再執(zhí)行 forEach 操作蜜托,接著才會處理第二個元素抄囚。

是不是很神奇?為什么要設計成這樣呢橄务?

原因是出于性能的考慮幔托。這樣設計可以減少對每個元素的實際操作數,看完下面代碼你就明白了:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A"); // 過濾出以 A 為前綴的元素
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

終端操作 anyMatch()表示任何一個元素以 A 為前綴,返回為 true重挑,就停止循環(huán)嗓化。所以它會從 d2 開始匹配,接著循環(huán)到 a2 的時候谬哀,返回為 true 刺覆,于是停止循環(huán)。

由于數據流的鏈式調用是垂直執(zhí)行的玻粪,map這里只需要執(zhí)行兩次隅津。相對于水平執(zhí)行來說,map會執(zhí)行盡可能少的次數劲室,而不是把所有元素都 map 轉換一遍伦仍。

四、中間操作順序這么重要很洋?

下面的例子由兩個中間操作mapfilter充蓝,以及一個終端操作forEach組成。讓我們再來看看這些操作是如何執(zhí)行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A"); // 過濾出以 A 為前綴的元素
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

學習了上面一小節(jié)喉磁,您應該已經知道了谓苟,mapfilter會對集合中的每個字符串調用五次,而forEach卻只會調用一次协怒,因為只有 "a2" 滿足過濾條件涝焙。

如果我們改變中間操作的順序,將filter移動到鏈頭的最開始孕暇,就可以大大減少實際的執(zhí)行次數:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s)
        return s.startsWith("a"); // 過濾出以 a 為前綴的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1
// filter:  b3
// filter:  c

現在仑撞,map僅僅只需調用一次,性能得到了提升妖滔,這種小技巧對于流中存在大量元素來說隧哮,是非常很有用的。

接下來座舍,讓我們對上面的代碼再添加一個中間操作sorted

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2); // 排序
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a"); // 過濾出以 a 為前綴的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出

sorted 是一個有狀態(tài)的操作沮翔,因為它需要在處理的過程中,保存狀態(tài)以對集合中的元素進行排序曲秉。

執(zhí)行上面代碼采蚀,輸出如下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

咦咦咦?這次怎么又不是垂直執(zhí)行了承二。你需要知道的是搏存,sorted是水平執(zhí)行的。因此矢洲,在這種情況下璧眠,sorted會對集合中的元素組合調用八次。這里,我們也可以利用上面說道的優(yōu)化技巧责静,將 filter 過濾中間操作移動到開頭部分:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

從上面的輸出中袁滥,我們看到了 sorted從未被調用過,因為經過filter過后的元素已經減少到只有一個灾螃,這種情況下题翻,是不用執(zhí)行排序操作的。因此性能被大大提高了腰鬼。

五嵌赠、數據流復用問題

Java8 Stream 流是不能被復用的翰萨,一旦你調用任何終端操作混巧,流就會關閉:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

當我們對 stream 調用了 anyMatch 終端操作以后,流即關閉了段直,再調用 noneMatch 就會拋出異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

為了克服這個限制彼硫,我們必須為我們想要執(zhí)行的每個終端操作創(chuàng)建一個新的流鏈炊豪,例如,我們可以通過 Supplier 來包裝一下流拧篮,通過 get() 方法來構建一個新的 Stream流词渤,如下所示:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

通過構造一個新的流,來避開流不能被復用的限制, 這也是取巧的一種方式串绩。

六缺虐、高級操作

Streams 支持的操作很豐富,除了上面介紹的這些比較常用的中間操作礁凡,如filtermap(參見Stream Javadoc)外高氮。還有一些更復雜的操作,如collect把篓,flatMap以及reduce。接下來腰涧,就讓我們學習一下:

本小節(jié)中的大多數代碼示例均會使用以下 List<Person>進行演示:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

// 構建一個 Person 集合
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

6.1 Collect

collect 是一個非常有用的終端操作韧掩,它可以將流中的元素轉變成另外一個不同的對象,例如一個List窖铡,SetMap疗锐。collect 接受入參為Collector(收集器),它由四個不同的操作組成:供應器(supplier)费彼、累加器(accumulator)滑臊、組合器(combiner)和終止器(finisher)。

這些都是個啥箍铲?別慌雇卷,看上去非常復雜的樣子,但好在大多數情況下,您并不需要自己去實現收集器关划。因為 Java 8通過Collectors類內置了各種常用的收集器小染,你直接拿來用就行了。

讓我們先從一個非常常見的用例開始:

List<Person> filtered =
    persons
        .stream() // 構建流
        .filter(p -> p.name.startsWith("P")) // 過濾出名字以 P 開頭的
        .collect(Collectors.toList()); // 生成一個新的 List

System.out.println(filtered);    // [Peter, Pamela]

你也看到了贮折,從流中構造一個 List 異常簡單裤翩。如果說你需要構造一個 Set 集合,只需要使用Collectors.toSet()就可以了调榄。

接下來這個示例踊赠,將會按年齡對所有人進行分組:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age)); // 以年齡為 key,進行分組

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

除了上面這些操作。您還可以在流上執(zhí)行聚合操作每庆,例如筐带,計算所有人的平均年齡:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age)); // 聚合出平均年齡

System.out.println(averageAge);     // 19.0

如果您還想得到一個更全面的統(tǒng)計信息,摘要收集器可以返回一個特殊的內置統(tǒng)計對象扣孟。通過它烫堤,我們可以簡單地計算出最小年齡、最大年齡凤价、平均年齡鸽斟、總和以及總數量。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age)); // 生成摘要統(tǒng)計

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下一個這個示例利诺,可以將所有人名連接成一個字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18) // 過濾出年齡大于等于18的
    .map(p -> p.name) // 提取名字
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); // 以 In Germany 開頭富蓄,and 連接各元素,再以 are of legal age. 結束

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

連接收集器的入參接受分隔符慢逾,以及可選的前綴以及后綴立倍。

對于如何將流轉換為 Map集合,我們必須指定 Map 的鍵和值侣滩。這里需要注意口注,Map 的鍵必須是唯一的,否則會拋出IllegalStateException 異常君珠。

你可以選擇傳遞一個合并函數作為額外的參數來避免發(fā)生這個異常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2)); // 對于同樣 key 的寝志,將值拼接

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

既然我們已經知道了這些強大的內置收集器,接下來就讓我們嘗試構建自定義收集器吧策添。

比如說材部,我們希望將流中的所有人轉換成一個字符串,包含所有大寫的名稱唯竹,并以|分割乐导。為了達到這種效果,我們需要通過Collector.of()創(chuàng)建一個新的收集器浸颓。同時物臂,我們還需要傳入收集器的四個組成部分:供應器旺拉、累加器、組合器和終止器鹦聪。

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier 供應器
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator 累加器
        (j1, j2) -> j1.merge(j2),               // combiner 組合器
        StringJoiner::toString);                // finisher 終止器

String names = persons
    .stream()
    .collect(personNameCollector); // 傳入自定義的收集器

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

由于Java 中的字符串是 final 類型的账阻,我們需要借助輔助類StringJoiner,來幫我們構造字符串泽本。

最開始供應器使用分隔符構造了一個StringJointer淘太。

累加器用于將每個人的人名轉大寫,然后加到StringJointer中规丽。

組合器將兩個StringJointer合并為一個蒲牧。

最終,終結器從StringJointer構造出預期的字符串赌莺。

6.2 FlatMap

上面我們已經學會了如通過map操作, 將流中的對象轉換為另一種類型冰抢。但是,Map只能將每個對象映射到另一個對象艘狭。

如果說挎扰,我們想要將一個對象轉換為多個其他對象或者根本不做轉換操作呢?這個時候巢音,flatMap就派上用場了遵倦。

FlatMap 能夠將流的每個元素, 轉換為其他對象的流。因此官撼,每個對象可以被轉換為零個梧躺,一個或多個其他對象,并以流的方式返回傲绣。之后掠哥,這些流的內容會被放入flatMap返回的流中。

在學習如何實際操作flatMap之前秃诵,我們先新建兩個類续搀,用來測試:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

接下來,通過我們上面學習到的流知識菠净,來實例化一些對象:

List<Foo> foos = new ArrayList<>();

// 創(chuàng)建 foos 集合
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// 創(chuàng)建 bars 集合
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

我們創(chuàng)建了包含三個foo的集合禁舷,每個foo中又包含三個 bar

flatMap 的入參接受一個返回對象流的函數嗤练。為了處理每個foo中的bar榛了,我們需要傳入相應 stream 流:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

如上所示在讶,我們已成功將三個 foo對象的流轉換為九個bar對象的流煞抬。

最后,上面的這段代碼可以簡化為單一的流式操作:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

flatMap也可用于Java8引入的Optional類构哺。OptionalflatMap操作返回一個Optional或其他類型的對象革答。所以它可以用于避免繁瑣的null檢查战坤。

接下來,讓我們創(chuàng)建層次更深的對象:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

為了處理從 Outer 對象中獲取最底層的 foo 字符串残拐,你需要添加多個null檢查來避免可能發(fā)生的NullPointerException途茫,如下所示:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

我們還可以使用OptionalflatMap操作,來完成上述相同功能的判斷溪食,且更加優(yōu)雅:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

如果不為空的話囊卜,每個flatMap的調用都會返回預期對象的Optional包裝,否則返回為nullOptional包裝類错沃。

筆者補充:關于 Optional 可參見我另一篇譯文《Java8 新特性如何防止空指針異痴ぷ椋》

6.3 Reduce

規(guī)約操作可以將流的所有元素組合成一個結果。Java 8 支持三種不同的reduce方法枢析。第一種將流中的元素規(guī)約成流中的一個元素玉掸。

讓我們看看如何使用這種方法,來篩選出年齡最大的那個人:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

reduce方法接受BinaryOperator積累函數醒叁。該函數實際上是兩個操作數類型相同的BiFunction司浪。BiFunction功能和Function一樣,但是它接受兩個參數把沼。示例代碼中啊易,我們比較兩個人的年齡,來返回年齡較大的人智政。

第二種reduce方法接受標識值和BinaryOperator累加器。此方法可用于構造一個新的 Person续捂,其中包含來自流中所有其他人的聚合名稱和年齡:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三種reduce方法接受三個參數:標識值垦垂,BiFunction累加器和類型的組合器函數BinaryOperator。由于初始值的類型不一定為Person牙瓢,我們可以使用這個歸約函數來計算所有人的年齡總和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

結果為76劫拗,但是內部究竟發(fā)生了什么呢?讓我們再打印一些調試日志:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

你可以看到矾克,累加器函數完成了所有工作页慷。它首先使用初始值0和第一個人年齡相加。接下來的三步中sum會持續(xù)增加胁附,直到76酒繁。

等等?好像哪里不太對控妻!組合器從來都沒有調用過爸萏弧?

我們以并行流的方式運行上面的代碼弓候,看看日志輸出:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

并行流的執(zhí)行方式完全不同郎哭。這里組合器被調用了他匪。實際上,由于累加器被并行調用夸研,組合器需要被用于計算部分累加值的總和邦蜜。

讓我們在下一章深入探討并行流。

七亥至、并行流

流是可以并行執(zhí)行的悼沈,當流中存在大量元素時,可以顯著提升性能姐扮。并行流底層使用的ForkJoinPool, 它由ForkJoinPool.commonPool()方法提供井辆。底層線程池的大小最多為五個 - 具體取決于 CPU 可用核心數:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

在我的機器上,公共池初始化默認值為 3溶握。你也可以通過設置以下JVM參數可以減小或增加此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合支持parallelStream()方法來創(chuàng)建元素的并行流杯缺。或者你可以在已存在的數據流上調用中間方法parallel()睡榆,將串行流轉換為并行流萍肆,這也是可以的。

為了詳細了解并行流的執(zhí)行行為胀屿,我們在下面的示例代碼中塘揣,打印當前線程的信息:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

通過日志輸出,我們可以對哪個線程被用于執(zhí)行流式操作宿崭,有個更深入的理解:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

如您所見亲铡,并行流使用了所有的ForkJoinPool中的可用線程來執(zhí)行流式操作。在持續(xù)的運行中葡兑,輸出結果可能有所不同奖蔓,因為所使用的特定線程是非特定的。

讓我們通過添加中間操作sort來擴展上面示例:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

運行代碼讹堤,輸出結果看上去有些奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

貌似sort只在主線程上串行執(zhí)行吆鹤。但是實際上,并行流中的sort在底層使用了Java8中新的方法Arrays.parallelSort()洲守。如 javadoc官方文檔解釋的疑务,這個方法會按照數據長度來決定以串行方式,或者以并行的方式來執(zhí)行梗醇。

如果指定數據的長度小于最小數值知允,它則使用相應的Arrays.sort方法來進行排序。

回到上小節(jié) reduce的例子叙谨。我們已經發(fā)現了組合器函數只在并行流中調用温鸽,而不不會在串行流中被調用。

讓我們來實際觀察一下涉及到哪個線程:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

通過控制臺日志輸出唉俗,累加器和組合器均在所有可用的線程上并行執(zhí)行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

總之嗤朴,你需要記住的是,并行流對含有大量元素的數據流提升性能極大虫溜。但是你也需要記住并行流的一些操作雹姊,例如reducecollect操作,需要額外的計算(如組合操作)衡楞,這在串行執(zhí)行時是并不需要吱雏。

此外,我們也了解了瘾境,所有并行流操作都共享相同的 JVM 相關的公共ForkJoinPool歧杏。所以你可能需要避免寫出一些又慢又卡的流式操作,這很有可能會拖慢你應用中迷守,嚴重依賴并行流的其它部分代碼的性能犬绒。

八、結語

Java8 Stream 流編程指南到這里就結束了兑凿。如果您有興趣了解更多有關 Java 8 Stream 流的相關信息凯力,我建議您使用 Stream Javadoc 閱讀官方文檔。如果您想了解有關底層機制的更多信息礼华,您也可以閱讀 Martin Fowlers 關于 Collection Pipelines 的文章咐鹤。

最后,祝您學習愉快圣絮!

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末祈惶,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子扮匠,更是在濱河造成了極大的恐慌捧请,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棒搜,死亡現場離奇詭異血久,居然都是意外死亡,警方通過查閱死者的電腦和手機帮非,發(fā)現死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門氧吐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人末盔,你說我怎么就攤上這事筑舅。” “怎么了陨舱?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵翠拣,是天一觀的道長。 經常有香客問我游盲,道長误墓,這世上最難降的妖魔是什么蛮粮? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮谜慌,結果婚禮上然想,老公的妹妹穿的比我還像新娘。我一直安慰自己欣范,他們只是感情好变泄,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著恼琼,像睡著了一般妨蛹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上晴竞,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天蛙卤,我揣著相機與錄音,去河邊找鬼噩死。 笑死表窘,一個胖子當著我的面吹牛,可吹牛的內容都是我干的甜滨。 我是一名探鬼主播乐严,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼衣摩!你這毒婦竟也來了昂验?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤艾扮,失蹤者是張志新(化名)和其女友劉穎既琴,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體泡嘴,經...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡甫恩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了酌予。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片磺箕。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖抛虫,靈堂內的尸體忽然破棺而出松靡,到底是詐尸還是另有隱情,我是刑警寧澤建椰,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布雕欺,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏屠列。R本人自食惡果不足惜啦逆,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望笛洛。 院中可真熱鬧夏志,春花似錦、人聲如沸撞蜂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蝌诡。三九已至,卻和暖如春枫吧,著一層夾襖步出監(jiān)牢的瞬間浦旱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工九杂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留颁湖,地道東北人。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓例隆,卻偏偏與公主長得像甥捺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子镀层,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內容