1. 簡介
??本教程志在細致入微、深入底層,你將體驗從Stream的創(chuàng)建開始(creation)到并行執(zhí)行(parallel execution)的完整過程抠蚣,以此體會Stream API的實際用處。
??為了理解下面的文章泵三,讀者需要掌握Java 7基礎知識(Lambda表達式遂跟、Optional、方法引用)以及熟悉Stream API透揣,如果你并不熟悉它們甚至一無所知,建議你先閱讀我們之前的文章-Java8 新特性 以及 Java 8 Streams 介紹川抡。
2. 創(chuàng)建Stream
??創(chuàng)建一個Stream實例有多種方式辐真,每種創(chuàng)建方式對應Stream的一個來源。但單個Stream實例每次創(chuàng)建之后崖堤,其來源將無法修改侍咱,這意味著Stream實例具備源頭不可變性,不過我們卻可以從單個源創(chuàng)建多個Stream實例密幔。
2.1 Empty Stream - 空Stream
??方法empty()被用于創(chuàng)建一個Empty Stream:
Stream<String> streamEmpty = Stream.empty;
??上述代碼段創(chuàng)建的Empty Stream通常被用于避免null對象或零元素對象的streams(streams with no element)返回結(jié)果為null:
public Stream<String> streamOf(List<String> list){
return lsit == null || list.isEmpty() ? Stream.empty() : list.streams();
}
2.2 Stream of Collection - 集合Steram
??我們可以創(chuàng)建任意Collection接口衍生類(Collection->List楔脯、Set、Queue)的Streams:
Collections<String> collection = Arrays.asList("a", "b", "c");
Stream<Stirng> streamOfCollection = collection.stream();
2.3 Stream of Array - 數(shù)組Stream
??接下來的這段代碼展示的是數(shù)組Stream:
Stream<String> streamOfArray = Stream.of("a", "b", "c");
??當然我們可以先創(chuàng)建熟悉的數(shù)組類型胯甩,再以它為源創(chuàng)建Stream昧廷,而且我們可以選擇Stream中包含的元素數(shù)量:
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);
2.4 Stream.builder() - 構(gòu)建器
??當builder被用于指定參數(shù)類型時,應被額外標識在聲明右側(cè)偎箫,否則方法build()將創(chuàng)建一個Stream(Object)實例:
Stream<String> streamBuilder = Stream.<String>builder().add("a").add("b").add("c").build();
2.5 Stream.generator() - 生成器
??方法generator()接受一個供應器Supplier<T>用于元素生成木柬。由于生產(chǎn)流(resulting stream)被定義之后屬于無限流(即無止境地不斷生產(chǎn)),開發(fā)者必須指定stream擁有流的目標大小淹办,否則方法generator()將持續(xù)生產(chǎn)直到jvm內(nèi)存到達頂值(memory limit):
Stream<String> streamOfGenerated = Stream.generate( () -> "element").limit(10);
??上述代碼將創(chuàng)建十個內(nèi)容為“element”的生成流眉枕。
2.6 Stream.iterate() - 迭代器
??另一種創(chuàng)建無限流的方法是通過調(diào)用方法iterate(),同樣的它也需要使用方法limit()對目標流的元素大小進行限制:
Stream<Integer> streamItreated = Stream.iterate(40, n -> n + 2).limit(20);
??迭代流即采用迭代的方法作為元素生產(chǎn)方式怜森,類似于高中數(shù)學中的f(x)速挑,f(f(x)),etc副硅。上述例子中姥宝,生成流的第一個元素是迭代器iterate()中的第一個元素40,從第二個元素開始的每個新元素都與上個元素有關(guān)想许,在此例中伶授,生成流中的元素為:40、42流纹、44糜烹、...78、80漱凝。
2.7 Stream of Primitives - 基元流
??Java8提供了創(chuàng)建三大基礎數(shù)據(jù)類型(int疮蹦、long、double)stream的方式茸炒。由于Stream<T>是一個類接口愕乎,我們無法采用泛型傳參的方式聲明基礎數(shù)據(jù)類型的stream阵苇,因此三個特殊的接口就被創(chuàng)造出來了:IntStream、LongStream感论、DoubleStream绅项。
使用它們能夠避免不必要的自動裝箱1以提高生產(chǎn)效率。
IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
??方法range(int startInclusive, int endInclusive)創(chuàng)建了一個有序流(從startInclusive到endInclusive)比肄。它使后面的值每個增加1快耿,但卻不包括最后一個參數(shù),即此方法的結(jié)果是具備上限的芳绩。方法rangeClosed(int startInclusive, int endInclusive)與range()大致相同掀亥,但它卻包含了最后一個值。
這兩個方法用于生成三大基本數(shù)據(jù)類型的stream妥色。
??此外搪花,Java8之后,類Random也提供了拓展方法用于生成基礎數(shù)據(jù)類型的stream嘹害。例如撮竿,下述代碼創(chuàng)建了一個含有三個隨機值的DoubleStream:
Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
2.8 Stream of String - 字符串流
??String類型也可以作為生成stream的源,這得益于方法chars()的幫助吼拥,此外由于JDK中沒有CharStream接口倚聚,IntStream也被用來表示字符流(stream of chars)
IntStream streamOfChars = "abc".chars();
??下例中通過特征的正則表達式將一個字符串割裂成(break into)其子串。
Stream<String> streamOfString =
Pattern.compile(", ").spitAsStream("a", "b", "c");
2.9 Stream of File - 文件流
??Java NIO2類文件允許通過方法lines()生成文本文件的Stream<String>凿可。文本的每一行都會變成stream的一個元素:
Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfString = Files.lines(path);
Stream<String> streamWithCharset = Files.lines(path, Charset.forName("utf-8"));
ps:在方法lines()中也可以通過Charset設置文件編碼惑折。
3. Referencing a Stream - 引用stream
??只要調(diào)用生成操作(中間操作)就會實例化一個stream并生成一個可獲取的引用,但執(zhí)行終端操作會使得stream無法訪問枯跑。為了證明這一點惨驶,我們不妨先忘記它,畢竟實踐是檢驗真理的唯一標準敛助。
以下代碼如果不考慮冗長的話將是有效的:
Stream<String> stream = Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
??但是倘若我們在執(zhí)行終端操作后重新使用相同的引用粗卜,則會不可避免的觸發(fā)IllegalStateException。
Optional<String> firstElement = stream.findFirst();
??IllegalStateException是一個運行時異常(RuntimeException)纳击,即編譯器將不會提示此錯誤续扔。因此必須記得,JAVA8 不允許重復使用stream
這一設計是合乎邏輯的焕数,因為stream從設計上旨在提供一個將有限操作(指函數(shù)體中元素的相關(guān)操作)的序列纱昧,而不是存儲元素。
因此想讓以前的代碼正常工作我們得先改一改:
List<String> elements =
Stream.of("a", "b", "c").filter(element -> element.contains("b"))
.collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
4. Stream Pipeline - 流的管道
??想要執(zhí)行源數(shù)據(jù)集的操作集并聚合它們堡赔,你需要以下三個部分——源(Source)识脆、中間操作(Intermediate operations)和終結(jié)操作(terminal operation)。
中間操作返回的是一個新的可操作stream。舉個例子灼捂,為了在一個包含少量元素Stream的基礎之上新建Stream离例,我們可以調(diào)用方法skip():
Stream<String> oneModifiedStream = Stream.of("abcd", "bbcd", "cbcd").skip(1);
??如果需要多次修改,則可以采用多次中間操作悉稠。假如我們還需要將Stream<String>中每個字符串替換為其子串subString(0, 3)宫蛆,則可以使用skip()和map()相連的方式完成:
Stream<String> twiceModifiedStream = stream.skip(1).map(element -> element.subString(0, 3));
??正如你所見,上例中map()使用Lambda表達式作為其參數(shù)對stream中的各元素進行處理的猛。
stream本身是毫無價值的洒扎,編程人員最感興趣的其實是終結(jié)操作(terminal operation),它可以是一個元素也可以是一個行為衰絮。只有在終結(jié)操作里才能對每個stream進行使用。正確的且最方便的stream操作方式就是Stream Pipeline磷醋,即stream源->中間操作->終結(jié)操作猫牡。如例:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
.map(element -> element.substring(0, 3)).sorted().count();
5. Lazy Invocation - 懶式調(diào)用
??中間操作是懶式調(diào)用的,這意味著只有在終結(jié)操作需要它們的時候中間操作才會被喚醒邓线。
為了證明這個事實淌友,假象我們有個方法wasCalled(),每當它被喚醒時使內(nèi)部變量counter自增骇陈。
private long counter;
private void wasCalled() {
counter++;
}
??接下來讓我們在filter()操作中喚起wasCalled():
List<String> list = Arrays.asList(“abc1”, “abc2”, “abc3”);
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("2");
});
??由于有三個變量震庭,想象中filter()中的代碼塊將被執(zhí)行三次,wasCalled()執(zhí)行三次之后counter的值應為3你雌,但是執(zhí)行之后counter并未發(fā)生改變器联,仍然為0,也就是說filter()一次也沒有被喚醒婿崭,這個原因就是缺失了終結(jié)操作(terminal operation)拨拓。
那接下來我們不妨再上述代碼的基礎之上添加一次map()操作和一個終結(jié)操作——findFirst(),并采用打日志的方式幫助我們了解方法調(diào)用時機及順序氓栈。
Optional<String> stream = list.stream().filter( element -> {
log.info("filter() was called!");
return element.contains("2");
}).map(element -> {
log.info("map() was called!");
return element.toUpperCase();
}).findFirst();
??日志結(jié)果顯示filter()被喚醒了兩次渣磷,而map()僅僅被調(diào)用一次,這是由于管道流是垂直執(zhí)行的授瘦。在此例中第一個元素不滿足filter()的要求醋界,因此filter()被調(diào)用第二次以查找合適的結(jié)果,通過之后即進行map()操作提完,此時就沒有第三次機會執(zhí)行filter()操作了形纺。findFirst()就能找出源數(shù)據(jù)集中第一個含有“2”的字符串的全大寫字符串了。因此氯葬,懶調(diào)用使得不必相繼調(diào)用兩個中間操作(filter()和map())才能完成任務了挡篓。
6. Order of Execution - 執(zhí)行順序
??從性能的角度考慮,正確的執(zhí)行順序是采用上文提到的流式管道(Stream Pipeline):
long size = list.stream().map(element -> {
wasCalled();
return element.substring(0, 3);
}).skip(2).count();
??執(zhí)行這段代碼將使counter自增長3次,這意味著stream的方法map()將被調(diào)用3次官研,但最終size的值為1秽澳。這意味著結(jié)果流(resulting stream)中僅僅只有一個元素,毫無疑問在三次消息處理中程序跳過了兩次處理戏羽。
如果我們改變skip()和map()的執(zhí)行順序担神,counter將只自增長一次措嵌。也即是map()只被調(diào)用一次:
long size = list.stream().skip(2).map(element -> {
wasCalled();
return element.substring(0, 3);
}).count();
??以上示例告訴我們一個規(guī)則:用于減少流中元素數(shù)量的中間操作能真,應當放置在處理操作之前。因此鸟顺,保證在你的Stream Pipeline規(guī)則中按照這樣的順序編碼:skip() --> filter() --> distinct()
7. Stream Reduction - 流的聚合
??API提供了大量的終端操作用以聚合一個stream為一種數(shù)據(jù)類型或變量酷宵。比如:count()亥贸、max()、min()浇垦、sum()炕置,但是這些方法都是預定義的。但如果用戶需要自定義一個stream的聚合操作呢男韧?官方提供了兩個方法用以實現(xiàn)此類需求:reduce() 和 collect()朴摊。
7.1 reduce()方法
??此方法提供了三種變種,不同之處是它們的簽名以及返回類型此虑。reduce()方法具有下列參數(shù):
identify(標識器) - 累積器的初始值或當stream為空時的默認值甚纲。
accumulator(累積器) - 提供設定聚合元素之邏輯的功能,每次規(guī)約(reducing)累積器都會創(chuàng)建一個新的值朦前,新值的大小等于stream的大小介杆,并且只有上一個值是可用的。這非常有助于提升性能韭寸。
combiner(組合器) - 提供聚合accumulator(累積器)中元素的功能这溅,combiner是唯一一個能從不同線程以并行模式聚合累積器中結(jié)果的方法。
好棒仍,讓我們來實戰(zhàn)一下吧:
OptionalInt reduced =
IntStream.range(1, 4).reduce((a, b) -> a + b);
reduced = 6 = 1 + 2 + 3悲靴。
int reducedTwoParams =
IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
reducedTwoParams = 16 = 10 + 1 + 2 + 3。
int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
??這一結(jié)果與上文中的16一樣莫其,并且不會打出日志癞尚,因為combiner沒有被喚起。為了喚醒combiner乱陡,stream應當是并行的:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
??此時浇揩,結(jié)果變?yōu)?6,并且combiner被喚起了兩次憨颠。規(guī)約(reduce)運轉(zhuǎn)的算法為:每當stream中的元素通過identify(標識器)時accumulator(累積器)均被調(diào)用胳徽,最終累積器調(diào)用了3次积锅。上述行為是并行完成的,因此造成了(10+1=11; 10+2=12; 10+3=13;)养盗。最終combiner(組合器)混合了三次的結(jié)果缚陷,通過兩次迭代完成運算(12+13=25; 25+11=36;)。
7.2 collect()方法
??stream的規(guī)約也可以被其他的終結(jié)方法執(zhí)行——collect()往核。它接收了一個名為collector的參數(shù)箫爷,此參數(shù)注明規(guī)約的流程。官方已經(jīng)創(chuàng)建了預定義的收集器聂儒,我們可以在這些收集器的幫助下訪問它們虎锚。
下面我們將看到使用List作為所有stream的來源:
List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
new Product(14, "orange"), new Product(13, "lemon"),
new Product(23, "bread"), new Product(13, "sugar"));
轉(zhuǎn)換一個stream為Collection集合(Collection、List衩婚、Set窜护、Queue、etc)非春。
List<String> collectorCollection =
productList.stream().map(Product::getName).collect(Collectors.toList());
規(guī)約為String類型:
String listToString = productList.stream().map(Product::getName)
.collect(Collectors.joining(", ", "[", "]"));
??join()方法擁有三個參數(shù)(delimiter, prefix, suffix)柄慰,使用join()最便捷之處在于程序員不需要考慮stream的起始與結(jié)束甚至界定符,Collector會考慮到這些的税娜。
計算stream中所有數(shù)字元素的平均值
double averagePrice = productList.stream()
.collect(Collectors.averagingInt(Product::getPrice));
計算stream中所有數(shù)字元素的和
int summingPrice = productList.stream()
.collect(Collectors.summingInt(Product::getPrice));
??方法averagingXX()、summingXX()和summarizingXX()適用于基礎數(shù)據(jù)類型(int,long,double)藏研,也適用于它們的封裝類( Integer,Long,Double)敬矩。一個很有效的功能技術(shù)提供映射,因此開發(fā)者也不是一定需要在collect()方法之后使用map()操作才能完成映射的蠢挡。
收集stream元素集的統(tǒng)計信息:
IntSummaryStatistics statistics = productList.stream()
.collect(Collectors.summarizingInt(Product::getPrice));
??通過使用IntSummaryStatistics的生成實例弧岳,開發(fā)者能夠通過請求toString()方法創(chuàng)建一個統(tǒng)計報告,結(jié)果將是一系列顯而易見的結(jié)果:IntSummaryStatistics{count=5, sum=86, min=13, average=17,200000, max=23}业踏。通過調(diào)用上述方法getCount()禽炬、getSum()、getMin()勤家、getAverage()腹尖、getMax(),我們也很容易從對象中提取出count、sum伐脖、min热幔、average的值,這是因為所有的值均可以從單個管道中獲取讼庇。
采用指定方法組合stream中的元素:
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
.collect(Collectors.groupingBy(Product::getPrice));
??此例中stream將根據(jù)group規(guī)則將所有元素規(guī)約成一個map绎巨。
根據(jù)一些描述對stream進行分組:
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
??這種相對特殊的情況里,collection將stream轉(zhuǎn)化為一個Set蠕啄,之后在此基礎上創(chuàng)建了一個不可變的Set场勤。
Custome collector(自定義收集器):
??假若我們因為一些特定的原因需要創(chuàng)建自定義的收集器戈锻,那更簡介輕快的方法是采用Collection的of()方法:
Collector<Product, ?, LinkedList<Product>> toLinkedList =
Collector.of(LinkedList::new, LinkedList::add,
(first, second) -> {
first.addAll(second);
return first;
});
LinkedList<Product> linkedListOfPersons =
productList.stream().collect(toLinkedList);
??在上例中,Collection的實例被規(guī)約成了一個LinkedList<Person>和媳。
Parallel Streams - 并行流
??在Java8之前格遭,并行化十分復雜。ExecutorService和FornJoin的出現(xiàn)大大降低了并行開發(fā)的復雜度窗价,但它們都無不避免的關(guān)注在如何創(chuàng)建一個特征鮮明的executor如庭,以及如何去運行它等等。Java8提倡了一種新的方式用于在函數(shù)類型中實現(xiàn)并行化撼港。
??API提供并行流用以并行化執(zhí)行操作坪它。當stream的源是一個數(shù)組或者Collection時,在parallelStream()方法的幫助下可以實現(xiàn)并行化:
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
.map(product -> product.getPrice() * 12)
.anyMatch(price -> price > 200);
但如果stream的源不是數(shù)組或者集合類型時帝牡,parallel()方法就應該被使用了:
IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();
上例中往毡,Stream API自動使用了ForkJoin框架去完成并行操作。默認情況下靶溜,公共線程池將被使用开瞭,不會(至少暫時不會)給它單獨分配線程。當stream處于并行狀態(tài)時罩息,應當注意可能產(chǎn)生阻塞的操作嗤详,當對時間效率有所追求且操作可并行時應當轉(zhuǎn)換為并行stream(理由是假如某個任務大小遠遠多于其他任務,那它將更加耗時)瓷炮。當然啦葱色,并行模式也可以轉(zhuǎn)換回串行模式,只要使用sequential()方法就能做到這點:
IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();
Conclusions - 結(jié)論
??Stream API在對鏈式數(shù)據(jù)進行操作時體現(xiàn)了其強大性娘香,但也易于理解苍狰。它通過引用的方法規(guī)約大容量的數(shù)據(jù),構(gòu)建了更健壯的程序烘绽,最主要的是提升了項目開發(fā)的生產(chǎn)力淋昭。
??在本文中stream均是未被關(guān)閉的(我們沒有調(diào)用close()方法或者其他的終結(jié)操作),但在實際項目中安接,不要這樣無節(jié)制的放縱stream的存在翔忽,這將逐步耗盡你的內(nèi)存,造成內(nèi)存泄漏程序崩潰的風險盏檐。
最后呀打,本文所對應的示例代碼你可以在github-core-java-8上獲取到。祝福你身體健康糯笙,編碼順利贬丛!
附錄
- 自動裝箱: 編譯器自動為語句進行語法解析,如類型補充等给涕。詳見Java 自動裝箱與拆箱(Autoboxing and unboxing)
- JAVA NIO: New I/O的簡稱豺憔,與舊式的基于流的I/O方法相對额获,從名字看,它表示新的一套Java I/O標準恭应。詳見NIO與AIO