Java8 Stream流操作總結

Java List操作1(分片 partition)
Java List操作2(分組group by)
Java List操作3(獲取list中bean對象中的某一列值—map)
Java List操作4(where,filter——過濾)
Java List操作5(sort—排序) Java List操作6(distinct—去重)

以前總結過使用java8 stream流操作處理List的方法,現在從stream流角度重新梳理一下stream流特性少孝。

1鲫售、介紹 從Java1.8開始提出了Stream流的概念溺忧,側重對于源數據計算能力的封裝微渠。

Stream 流操作可以分為 3 種類型:

  • 創(chuàng)建 Stream

  • Stream 中間處理

  • 終止 Steam

中間處理只是一種標記霎苗,只有終止操作才會觸發(fā)實際計算争剿。 中間操作又可以分為無狀態(tài)的(Stateless)和有狀態(tài)的(Stateful)已艰,無狀態(tài)中間操作是指元素的處理不受前面元素的影響,而有狀態(tài)的中間操作必須等到所有元素處理之后才知道最終結果蚕苇,比如排序是有狀態(tài)操作哩掺,在讀取所有元素之前并不能確定排序結果; 結束操作又可以分為短路操作和非短路操作涩笤,短路操作是指不用處理全部元素就可以返回結果嚼吞,比如 找到第一個滿足條件的元素。之所以要進行如此精細的劃分蹬碧,是因為底層對每一種情況的處理方式不同舱禽。

image.png

2、創(chuàng)建 Stream

  • stream() : 創(chuàng)建一個新的stream串行流對象

  • parallelStream():創(chuàng)建一個可并行執(zhí)行的stream流對象

  • Stream.of()/Stream.iterate()/Stream.generate():使用Stream的靜態(tài)方法創(chuàng)建一個新的stream串行流對象


//集合創(chuàng)建串行流
Stream<String> stream = Arrays.asList("a", "b", "c").stream();

//創(chuàng)建并行流
Stream<String> parallelStream = Arrays.asList("a", "b", "c").parallelStream();

//使用Stream的靜態(tài)方法
Stream<Integer> stream3 = Stream.of(1, 2, 3, 4, 5, 6);
Stream<Integer> stream4 = Stream.iterate(0, (x) -> x + 3).limit(4);
Stream<Double> stream5 = Stream.generate(Math::random).limit(3);
image

2.1 并行流

使用并行流可以有效利用計算機的多 CPU 硬件恩沽,提升邏輯的執(zhí)行速度誊稚。并行流通過將一整個 stream 劃分為多個片段,然后對各個分片流并行執(zhí)行處理邏輯罗心,最后將各個分片流的執(zhí)行結果匯總為一個整體流里伯。

圖片

并行流類似于多線程在并行處理,所以與多線程場景相關的一些問題同樣會存在渤闷,比如死鎖等問題疾瓮,所以在并行流終止執(zhí)行的函數邏輯,必須要保證線程安全飒箭。

3狼电、Stream 中間處理

負責對 Stream 進行處理操作,并返回一個新的 Stream 對象弦蹂,中間管道操作可以進行疊加漫萄。

image.png

4、終止 Steam

通過終止Stream操作之后盈匾,Stream 流將會結束腾务,最后可能會執(zhí)行某些邏輯處理,或者是按照要求返回某些執(zhí)行后的結果數據削饵。

一旦一個 Stream 被執(zhí)行了終止操作之后岩瘦,后續(xù)便不可以再讀這個流執(zhí)行其他的操作了未巫,否則會報錯。

image.png

5启昧、Stream收集器Collectors

Stream 主要用于對集合數據的處理場景叙凡,所以除了上面幾種獲取簡單結果的終止方法之外,更多的場景是獲取一個集合類的結果對象密末,比如 List握爷、Set 或者 HashMap 等。

這里就需要 collect 方法出場了严里,collect是Stream流的一個終止方法新啼,會使用傳入的收集器(入參)對結果執(zhí)行相關的操作,這個收集器必須是Collector接口的某個具體實現類刹碾,通常我們使用Collectors燥撞,Collectors是一個工具類,提供了很多的靜態(tài)工廠方法迷帜,提供了很多Collector接口的具體實現類物舒,是為了方便程序員使用而預置的一些較為通用的收集器(如果不使用Collectors類,而是自己去實現Collector接口戏锹,也可以)冠胯。

Stream結果收集操作的本質,其實就是將Stream中的元素通過收集器定義的函數處理邏輯進行加工锦针,然后輸出加工后的結果涵叮。

Collectors里常用搜集器如下:

[圖片上傳失敗...(image-f61f89-1675853638951)]

[圖片上傳失敗...(image-d0773e-1675853638951)]

初始案例:

List<String> servers = new ArrayList<>();
servers.add("Felordcn");
servers.add("Tomcat");
servers.add("Jetty");
servers.add("Undertow");
servers.add("Resin");

5.1 類型歸納

這是一個系列,作用是將元素分別歸納進可變容器 List伞插、MapSet盾碗、Collection 或者ConcurrentMap 媚污。

//Collectors.toList();
 //Collectors.toMap();
 //Collectors.toSet();
 //Collectors.toCollection();
 //Collectors.toConcurrentMap();

List<String> list = servers.stream().collect( Collectors.toList());</pre>

5.2 joining連接

將元素以某種規(guī)則連接起來。該方法有三種重載 joining(CharSequence delimiter)joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix)

 //   輸出 FelordcnTomcatJettyUndertowResin
 servers.stream().collect(Collectors.joining());

 //   輸出 Felordcn,Tomcat,Jetty,Undertow,Resin
 servers.stream().collect(Collectors.joining("," ));

 //   輸出 [Felordcn,Tomcat,Jetty,Undertow,Resin]
 servers.stream().collect(Collectors.joining(",", "[", "]")); </pre>

用的比較多的是讀取 HttpServletRequest 中的 body

HttpServletRequest.getReader().lines().collect(Collectors.joining());

5.3 groupingBy 聚合

按照條件對元素進行分組廷雅,和 SQL 中的 group by 用法有異曲同工之妙耗美,通常也建議使用 Java 進行分組處理以減輕數據庫壓力。groupingBy 也有三個重載方法 我們將 servers 按照長度進行分組:

// 按照字符串長度進行分組,符合條件的元素將組成一個 List 映射到以條件長度為key 的 Map<Integer, List<String>> 中
 Map<Integer, List<String>> listMap = servers.stream().collect(Collectors.groupingBy(String::length));
// 生成Map<Integer, Set<String>> 
 Map<Integer, Set<String>> setMap = servers.stream().collect(Collectors.groupingBy(String::length, Collectors.toSet()));

我要考慮同步安全問題怎么辦航缀? 當然使用線程安全的同步容器啊商架,那前兩種都用不成了吧! 看源碼芥玉,其實第二種等同于下面的寫法:

Supplier<Map<Integer,Set<String>>> mapSupplier = HashMap::new;
 Map<Integer,Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));

這就非常好辦了蛇摸,我們提供一個同步 Map 不就行了,于是問題解決了:

 Supplier<Map<Integer, Set<String>>> mapSupplier = () -> Collections.synchronizedMap(new HashMap<>());
 Map<Integer, Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));

其實同步安全問題 Collectors 的另一個方法 groupingByConcurrent 給我們提供了解決方案灿巧。用法和 groupingBy 差不多赶袄。

5.4 collectingAndThen

該方法先執(zhí)行了一個歸納操作揽涮,然后再對歸納的結果進行 Function 函數處理輸出一個新的結果。

 // 比如我們將servers joining 然后轉成大寫饿肺,結果為: FELORDCN,TOMCAT,JETTY,UNDERTOW,RESIN 
 servers.stream.collect(Collectors.collectingAndThen(Collectors.joining(","), String::toUpperCase));

5.5 partitioningBy 分區(qū)

分區(qū)是分組的特殊情況,由一個謂詞(返回一個布爾值的函數)作為分類函數.所以返回的Map集合只有兩個key,一個true,一個false.

// 長度大于5  {false=[Jetty, Resin], true=[Felordcn, Tomcat, Undertow]
Map<Boolean, List<String>> map = servers.stream().collect(partitioningBy(e -> e.length() > 5));

// 長度大于5  且  按照長度分組 蒋困,即先分區(qū)再分組
//{false={5=[Jetty, Resin]}, true={6=[Tomcat], 8=[Felordcn, Undertow]}}
Map<Boolean, Map<Integer, List<String>>> map2 = servers.stream().collect(partitioningBy(e -> e.length() > 5, groupingBy(String::length)));

5.6 counting 統(tǒng)計元素的的數量

該方法歸納元素的的數量,非常簡單敬辣,不再舉例說明雪标。

// 5  
long size = servers.stream().collect(Collectors.counting ());

3.7 maxBy/minBy 查找大小元素

這兩個方法分別提供了查找大小元素的操作,它們基于比較器接口 Comparator 來比較 溉跃,返回的是一個 Optional 對象村刨。 我們來獲取 servers 中最小長度的元素:

// Jetty  
Optional<String> min = servers.stream().collect(Collectors.minBy(Comparator.comparingInt(String::length)));

這里其實 Resin 長度也是最小,這里遵循了 "先入為主" 的原則 喊积。當然 Stream.min() 可以很方便的獲取最小長度的元素烹困。maxBy 同樣的道理。

5.8 summingInt/Double/Long 累加計算

用來做累加計算乾吻。計算元素某個屬性的總和,類似 Mysqlsum 函數髓梅,比如計算各個項目的盈利總和、計算本月的全部工資總和等等绎签。我們這里就計算一下 servers 中字符串的長度之和 (為了舉例不考慮其它寫法)枯饿。

 // 總長度 32 
 servers.stream.collect(Collectors.summingInt(s -> s.length()));

5.9 summarizingInt/Double/Long 統(tǒng)計數據

如果我們對 5.6章節(jié)-5.8章節(jié) 的操作結果都要怎么辦?難不成我們搞5個 Stream 流嗎诡必? 所以就有了 summarizingInt奢方、summarizingDoublesummarizingLong 三個方法爸舒。 這三個方法通過對元素某個屬性的提取蟋字,會返回對元素該屬性的統(tǒng)計數據對象,分別對應 IntSummaryStatistics扭勉、DoubleSummaryStatistics鹊奖、LongSummaryStatistics。我們對 servers 中元素的長度進行統(tǒng)計:

DoubleSummaryStatistics doubleSummaryStatistics = servers.stream.collect(Collectors.summarizingDouble(String::length));
 // {count=5, sum=32.000000, min=5.000000, average=6.400000, max=8.000000}
 System.out.println("doubleSummaryStatistics.toString() = " + doubleSummaryStatistics.toString());

結果 DoubleSummaryStatistics 中包含了 總數涂炎,總和忠聚,最小值,最大值唱捣,平均值 五個指標两蟀。

5.10 mapping

該方法是先對元素使用 Function 進行再加工操作,然后用另一個Collector 歸納震缭。比如我們先去掉 servers 中元素的首字母赂毯,然后將它們裝入 List

 // [elordcn, omcat, etty, ndertow, esin]
 servers.stream.collect(Collectors.mapping(s -> s.substring(1), Collectors.toList()));

有點類似 Stream 先進行了 map 操作再進行 collect

 servers.stream.map(s -> s.substring(1)).collect(Collectors.toList());

5.11 reducing

這個方法非常有用!但是如果要了解這個就必須了解其參數 BinaryOperator<T> 欢瞪。 這是一個函數式接口活烙,是給兩個相同類型的量,返回一個跟這兩個量相同類型的一個結果遣鼓,偽表達式為 (T,T) -> T啸盏。默認給了兩個實現 maxByminBy ,根據比較器來比較大小并分別返回最大值或者最小值骑祟。當然你可以靈活定制回懦。然后 reducing 就很好理解了,元素兩兩之間進行比較根據策略淘汰一個次企,隨著輪次的進行元素個數就是 reduce 的怯晕。那這個有什么用處呢? Java 官方給了一個例子:統(tǒng)計每個城市個子最高的人缸棵。

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Map<String, Optional<Person>> tallestByCity = people.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(BinaryOperator.maxBy(byHeight))));

結合最開始給的例子你可以使用 reducing 找出最長的字符串試試舟茶。

上面這一層是根據 Height 屬性找最高的 Person ,而且如果這個屬性沒有初始化值或者沒有數據堵第,很有可能拿不到結果所以給出的是 Optional<Person>吧凉。 如果我們給出了 identity 作一個基準值,那么我們首先會跟這個基準值進行 BinaryOperator 操作踏志。 比如我們給出高于 2 米 的人作為 identity阀捅。 我們就可以統(tǒng)計每個城市不低于 2 米 而且最高的那個人,當然如果該城市沒有人高于 2 米則返回基準值identity

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Person identity= new Person();
 identity.setHeight(2.);
 identity.setName("identity");
 Map<String, Person> collect = persons.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, BinaryOperator.maxBy(byHeight))));

這時候就確定一定會返回一個 Person 了针余,最起碼會是基準值identity 不再是 Optional 饲鄙。

還有些情況,我們想在 reducing 的時候把 Person 的身高先四舍五入一下圆雁。這就需要我們做一個映射處理忍级。定義一個 Function<? super T, ? extends U> mapper 來干這個活。那么上面的邏輯就可以變更為:

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Person identity = new Person();
 identity.setHeight(2.);
 identity.setName("identity");
 // 定義映射 處理 四舍五入
 Function<Person, Person> mapper = ps -> {
 Double height = ps.getHeight();

 BigDecimal decimal = new BigDecimal(height);
 Double d = decimal.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
 ps.setHeight(d);
 return ps;
 };
 Map<String, Person> collect = persons.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, mapper, BinaryOperator.maxBy(byHeight))));

6 parallelStream并發(fā)流線程安全問題

parallelStream是以多線程的方式伪朽,執(zhí)行定義的代碼塊轴咱。parallelStream中使用的是ForkJobTask。Fork/Join的框架是通過把一個大任務不斷fork成許多子任務驱负,然后多線程執(zhí)行這些子任務,最后再Join這些子任務得到最終結果患雇。

因為是多線程跃脊,所以在代碼塊里操作線程不安全的Collection,就會引發(fā)Concurrency問題苛吱。比如

public class TestParallelStream {
    public static void main(String[] args) {
        List<String> alist = new ArrayList<String>(Arrays.asList("1","2","3","4","5","6","7"));
        for(int i=0;i<10000;i++) {
            Map<String, String> result = new HashMap<String, String>();
            alist.parallelStream().forEach(item->{
                result.put(item, item);
            });
            System.out.println("i="+i+",map大小:"+result.size());
        }
    }
}


結果如下
......
i=5678,map大小:7
i=5679,map大小:7
i=5680,map大小:6      //出現map只有6個元素的情況了
i=5681,map大小:7
......

從程序上看酪术,就是先將alist集合fork成多段,然后多線程添加到HashMap中,而HashMap的add方法并不能保證原子性绘雁。

方法一:將parallelStream改成stream橡疼、foreach,串行執(zhí)行

但是這樣其實就失去了優(yōu)化

方法二:給collection上鎖

//對要存儲元素的map要求線程安全
Map<String, String> result = new HashMap<String, String>();
修改為
Map<String, String> result = Collections.synchronizedMap(new HashMap());
Map<String, String> result = new ConcurrentHashMap<>(new HashMap());

方法三:使用java8中的收集器(使用parallelStream后使用collect )

我們使用外面的集合庐舟,無非是為了收集元素欣除。Java8 Stream的collect方法,就是收集Stream里的元素挪略,返回List历帚,Set或Map等,并且它是線程安全的杠娱。

List<String> results = Collections.synchronizedList(new ArrayList<>());
sources.parallelStream().forEach(source -> {  
  results.add(sigmaString(source));  
});    

使用用collect改寫上面的代碼:

List<String> results = sources.parallelStream()  
  .flatMap(source -> Stream.of(sigmaString(source)))  
  .collect(Collectors.toList());  
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末挽牢,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子摊求,更是在濱河造成了極大的恐慌禽拔,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件室叉,死亡現場離奇詭異睹栖,居然都是意外死亡,警方通過查閱死者的電腦和手機太惠,發(fā)現死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進店門磨淌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人凿渊,你說我怎么就攤上這事梁只。” “怎么了埃脏?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵搪锣,是天一觀的道長。 經常有香客問我彩掐,道長构舟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任堵幽,我火速辦了婚禮狗超,結果婚禮上,老公的妹妹穿的比我還像新娘朴下。我一直安慰自己努咐,他們只是感情好,可當我...
    茶點故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布殴胧。 她就那樣靜靜地躺著渗稍,像睡著了一般佩迟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上竿屹,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天报强,我揣著相機與錄音,去河邊找鬼拱燃。 笑死秉溉,一個胖子當著我的面吹牛,可吹牛的內容都是我干的扼雏。 我是一名探鬼主播坚嗜,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诗充!你這毒婦竟也來了苍蔬?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤蝴蜓,失蹤者是張志新(化名)和其女友劉穎碟绑,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體茎匠,經...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡格仲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了诵冒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凯肋。...
    茶點故事閱讀 39,764評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖汽馋,靈堂內的尸體忽然破棺而出侮东,到底是詐尸還是另有隱情,我是刑警寧澤豹芯,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布悄雅,位于F島的核電站,受9級特大地震影響铁蹈,放射性物質發(fā)生泄漏宽闲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一握牧、第九天 我趴在偏房一處隱蔽的房頂上張望容诬。 院中可真熱鬧,春花似錦沿腰、人聲如沸览徒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吱殉。三九已至,卻和暖如春厘托,著一層夾襖步出監(jiān)牢的瞬間友雳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工铅匹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留押赊,地道東北人。 一個月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓包斑,卻偏偏與公主長得像流礁,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子罗丰,可洞房花燭夜當晚...
    茶點故事閱讀 44,665評論 2 354