Java List操作1(分片 partition)
Java List操作2(分組group by)
Java List操作3(獲取list中bean對象中的某一列值—map)
Java List操作4(where,filter——過濾)
Java List操作5(sort—排序) Java List操作6(distinct—去重)
以前總結過使用java8 stream流操作處理List的方法,現在從stream流角度重新梳理一下stream流特性少孝。
1鲫售、介紹 從Java1.8開始提出了Stream流的概念溺忧,側重對于源數據計算能力的封裝微渠。
Stream 流操作可以分為 3 種類型:
創(chuàng)建 Stream
Stream 中間處理
終止 Steam
中間處理只是一種標記霎苗,只有終止操作才會觸發(fā)實際計算争剿。 中間操作又可以分為無狀態(tài)的(Stateless)和有狀態(tài)的(Stateful)已艰,無狀態(tài)中間操作是指元素的處理不受前面元素的影響,而有狀態(tài)的中間操作必須等到所有元素處理之后才知道最終結果蚕苇,比如排序是有狀態(tài)操作哩掺,在讀取所有元素之前并不能確定排序結果; 結束操作又可以分為短路操作和非短路操作涩笤,短路操作是指不用處理全部元素就可以返回結果嚼吞,比如 找到第一個滿足條件的元素。之所以要進行如此精細的劃分蹬碧,是因為底層對每一種情況的處理方式不同舱禽。
2、創(chuàng)建 Stream
stream() : 創(chuàng)建一個新的stream串行流對象
parallelStream():創(chuàng)建一個可并行執(zhí)行的stream流對象
Stream.of()/Stream.iterate()/Stream.generate():使用Stream的靜態(tài)方法創(chuàng)建一個新的stream串行流對象
//集合創(chuàng)建串行流
Stream<String> stream = Arrays.asList("a", "b", "c").stream();
//創(chuàng)建并行流
Stream<String> parallelStream = Arrays.asList("a", "b", "c").parallelStream();
//使用Stream的靜態(tài)方法
Stream<Integer> stream3 = Stream.of(1, 2, 3, 4, 5, 6);
Stream<Integer> stream4 = Stream.iterate(0, (x) -> x + 3).limit(4);
Stream<Double> stream5 = Stream.generate(Math::random).limit(3);
2.1 并行流
使用并行流可以有效利用計算機的多 CPU 硬件恩沽,提升邏輯的執(zhí)行速度誊稚。并行流通過將一整個 stream 劃分為多個片段,然后對各個分片流并行執(zhí)行處理邏輯罗心,最后將各個分片流的執(zhí)行結果匯總為一個整體流里伯。
并行流類似于多線程在并行處理,所以與多線程場景相關的一些問題同樣會存在渤闷,比如死鎖等問題疾瓮,所以在并行流終止執(zhí)行的函數邏輯,必須要保證線程安全飒箭。
3狼电、Stream 中間處理
負責對 Stream 進行處理操作,并返回一個新的 Stream 對象弦蹂,中間管道操作可以進行疊加漫萄。
4、終止 Steam
通過終止Stream操作之后盈匾,Stream 流將會結束腾务,最后可能會執(zhí)行某些邏輯處理,或者是按照要求返回某些執(zhí)行后的結果數據削饵。
一旦一個 Stream 被執(zhí)行了終止操作之后岩瘦,后續(xù)便不可以再讀這個流執(zhí)行其他的操作了未巫,否則會報錯。
5启昧、Stream收集器Collectors
Stream 主要用于對集合數據的處理場景叙凡,所以除了上面幾種獲取簡單結果的終止方法之外,更多的場景是獲取一個集合類的結果對象密末,比如 List握爷、Set 或者 HashMap 等。
這里就需要 collect 方法出場了严里,collect是Stream流的一個終止方法新啼,會使用傳入的收集器(入參)對結果執(zhí)行相關的操作,這個收集器必須是Collector接口的某個具體實現類刹碾,通常我們使用Collectors燥撞,Collectors是一個工具類,提供了很多的靜態(tài)工廠方法迷帜,提供了很多Collector接口的具體實現類物舒,是為了方便程序員使用而預置的一些較為通用的收集器(如果不使用Collectors類,而是自己去實現Collector接口戏锹,也可以)冠胯。
Stream結果收集操作的本質,其實就是將Stream中的元素通過收集器定義的函數處理邏輯進行加工锦针,然后輸出加工后的結果涵叮。
Collectors里常用搜集器如下:
[圖片上傳失敗...(image-f61f89-1675853638951)]
[圖片上傳失敗...(image-d0773e-1675853638951)]
初始案例:
List<String> servers = new ArrayList<>();
servers.add("Felordcn");
servers.add("Tomcat");
servers.add("Jetty");
servers.add("Undertow");
servers.add("Resin");
5.1 類型歸納
這是一個系列,作用是將元素分別歸納進可變容器 List
伞插、Map
、Set
盾碗、Collection
或者ConcurrentMap
媚污。
//Collectors.toList();
//Collectors.toMap();
//Collectors.toSet();
//Collectors.toCollection();
//Collectors.toConcurrentMap();
List<String> list = servers.stream().collect( Collectors.toList());</pre>
5.2 joining連接
將元素以某種規(guī)則連接起來。該方法有三種重載 joining(CharSequence delimiter)
和 joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix)
// 輸出 FelordcnTomcatJettyUndertowResin
servers.stream().collect(Collectors.joining());
// 輸出 Felordcn,Tomcat,Jetty,Undertow,Resin
servers.stream().collect(Collectors.joining("," ));
// 輸出 [Felordcn,Tomcat,Jetty,Undertow,Resin]
servers.stream().collect(Collectors.joining(",", "[", "]")); </pre>
用的比較多的是讀取 HttpServletRequest
中的 body :
HttpServletRequest.getReader().lines().collect(Collectors.joining());
5.3 groupingBy 聚合
按照條件對元素進行分組廷雅,和 SQL 中的 group by
用法有異曲同工之妙耗美,通常也建議使用 Java 進行分組處理以減輕數據庫壓力。groupingBy
也有三個重載方法 我們將 servers
按照長度進行分組:
// 按照字符串長度進行分組,符合條件的元素將組成一個 List 映射到以條件長度為key 的 Map<Integer, List<String>> 中
Map<Integer, List<String>> listMap = servers.stream().collect(Collectors.groupingBy(String::length));
// 生成Map<Integer, Set<String>>
Map<Integer, Set<String>> setMap = servers.stream().collect(Collectors.groupingBy(String::length, Collectors.toSet()));
我要考慮同步安全問題怎么辦航缀? 當然使用線程安全的同步容器啊商架,那前兩種都用不成了吧! 看源碼芥玉,其實第二種等同于下面的寫法:
Supplier<Map<Integer,Set<String>>> mapSupplier = HashMap::new;
Map<Integer,Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));
這就非常好辦了蛇摸,我們提供一個同步 Map
不就行了,于是問題解決了:
Supplier<Map<Integer, Set<String>>> mapSupplier = () -> Collections.synchronizedMap(new HashMap<>());
Map<Integer, Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));
其實同步安全問題 Collectors
的另一個方法 groupingByConcurrent
給我們提供了解決方案灿巧。用法和 groupingBy
差不多赶袄。
5.4 collectingAndThen
該方法先執(zhí)行了一個歸納操作揽涮,然后再對歸納的結果進行 Function
函數處理輸出一個新的結果。
// 比如我們將servers joining 然后轉成大寫饿肺,結果為: FELORDCN,TOMCAT,JETTY,UNDERTOW,RESIN
servers.stream.collect(Collectors.collectingAndThen(Collectors.joining(","), String::toUpperCase));
5.5 partitioningBy 分區(qū)
分區(qū)是分組的特殊情況,由一個謂詞(返回一個布爾值的函數)作為分類函數.所以返回的Map集合只有兩個key,一個true,一個false.
// 長度大于5 {false=[Jetty, Resin], true=[Felordcn, Tomcat, Undertow]
Map<Boolean, List<String>> map = servers.stream().collect(partitioningBy(e -> e.length() > 5));
// 長度大于5 且 按照長度分組 蒋困,即先分區(qū)再分組
//{false={5=[Jetty, Resin]}, true={6=[Tomcat], 8=[Felordcn, Undertow]}}
Map<Boolean, Map<Integer, List<String>>> map2 = servers.stream().collect(partitioningBy(e -> e.length() > 5, groupingBy(String::length)));
5.6 counting 統(tǒng)計元素的的數量
該方法歸納元素的的數量,非常簡單敬辣,不再舉例說明雪标。
// 5
long size = servers.stream().collect(Collectors.counting ());
3.7 maxBy/minBy 查找大小元素
這兩個方法分別提供了查找大小元素的操作,它們基于比較器接口 Comparator
來比較 溉跃,返回的是一個 Optional
對象村刨。 我們來獲取 servers
中最小長度的元素:
// Jetty
Optional<String> min = servers.stream().collect(Collectors.minBy(Comparator.comparingInt(String::length)));
這里其實 Resin
長度也是最小,這里遵循了 "先入為主" 的原則 喊积。當然 Stream.min()
可以很方便的獲取最小長度的元素烹困。maxBy
同樣的道理。
5.8 summingInt/Double/Long 累加計算
用來做累加計算乾吻。計算元素某個屬性的總和,類似 Mysql 的 sum
函數髓梅,比如計算各個項目的盈利總和、計算本月的全部工資總和等等绎签。我們這里就計算一下 servers
中字符串的長度之和 (為了舉例不考慮其它寫法)枯饿。
// 總長度 32
servers.stream.collect(Collectors.summingInt(s -> s.length()));
5.9 summarizingInt/Double/Long 統(tǒng)計數據
如果我們對 5.6章節(jié)-5.8章節(jié) 的操作結果都要怎么辦?難不成我們搞5個 Stream
流嗎诡必? 所以就有了 summarizingInt
奢方、summarizingDouble
、summarizingLong
三個方法爸舒。 這三個方法通過對元素某個屬性的提取蟋字,會返回對元素該屬性的統(tǒng)計數據對象,分別對應 IntSummaryStatistics
扭勉、DoubleSummaryStatistics
鹊奖、LongSummaryStatistics
。我們對 servers
中元素的長度進行統(tǒng)計:
DoubleSummaryStatistics doubleSummaryStatistics = servers.stream.collect(Collectors.summarizingDouble(String::length));
// {count=5, sum=32.000000, min=5.000000, average=6.400000, max=8.000000}
System.out.println("doubleSummaryStatistics.toString() = " + doubleSummaryStatistics.toString());
結果 DoubleSummaryStatistics
中包含了 總數涂炎,總和忠聚,最小值,最大值唱捣,平均值 五個指標两蟀。
5.10 mapping
該方法是先對元素使用 Function
進行再加工操作,然后用另一個Collector
歸納震缭。比如我們先去掉 servers
中元素的首字母赂毯,然后將它們裝入 List
。
// [elordcn, omcat, etty, ndertow, esin]
servers.stream.collect(Collectors.mapping(s -> s.substring(1), Collectors.toList()));
有點類似 Stream
先進行了 map
操作再進行 collect
:
servers.stream.map(s -> s.substring(1)).collect(Collectors.toList());
5.11 reducing
這個方法非常有用!但是如果要了解這個就必須了解其參數 BinaryOperator<T>
欢瞪。 這是一個函數式接口活烙,是給兩個相同類型的量,返回一個跟這兩個量相同類型的一個結果遣鼓,偽表達式為 (T,T) -> T
啸盏。默認給了兩個實現 maxBy
和 minBy
,根據比較器來比較大小并分別返回最大值或者最小值骑祟。當然你可以靈活定制回懦。然后 reducing
就很好理解了,元素兩兩之間進行比較根據策略淘汰一個次企,隨著輪次的進行元素個數就是 reduce
的怯晕。那這個有什么用處呢? Java 官方給了一個例子:統(tǒng)計每個城市個子最高的人缸棵。
Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
Map<String, Optional<Person>> tallestByCity = people.stream()
.collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(BinaryOperator.maxBy(byHeight))));
結合最開始給的例子你可以使用 reducing
找出最長的字符串試試舟茶。
上面這一層是根據 Height
屬性找最高的 Person
,而且如果這個屬性沒有初始化值或者沒有數據堵第,很有可能拿不到結果所以給出的是 Optional<Person>
吧凉。 如果我們給出了 identity
作一個基準值,那么我們首先會跟這個基準值進行 BinaryOperator
操作踏志。 比如我們給出高于 2 米 的人作為 identity
阀捅。 我們就可以統(tǒng)計每個城市不低于 2 米 而且最高的那個人,當然如果該城市沒有人高于 2 米則返回基準值identity
:
Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
Person identity= new Person();
identity.setHeight(2.);
identity.setName("identity");
Map<String, Person> collect = persons.stream()
.collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, BinaryOperator.maxBy(byHeight))));
這時候就確定一定會返回一個 Person
了针余,最起碼會是基準值identity
不再是 Optional
饲鄙。
還有些情況,我們想在 reducing
的時候把 Person
的身高先四舍五入一下圆雁。這就需要我們做一個映射處理忍级。定義一個 Function<? super T, ? extends U> mapper
來干這個活。那么上面的邏輯就可以變更為:
Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
Person identity = new Person();
identity.setHeight(2.);
identity.setName("identity");
// 定義映射 處理 四舍五入
Function<Person, Person> mapper = ps -> {
Double height = ps.getHeight();
BigDecimal decimal = new BigDecimal(height);
Double d = decimal.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
ps.setHeight(d);
return ps;
};
Map<String, Person> collect = persons.stream()
.collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, mapper, BinaryOperator.maxBy(byHeight))));
6 parallelStream并發(fā)流線程安全問題
parallelStream是以多線程的方式伪朽,執(zhí)行定義的代碼塊轴咱。parallelStream中使用的是ForkJobTask。Fork/Join的框架是通過把一個大任務不斷fork成許多子任務驱负,然后多線程執(zhí)行這些子任務,最后再Join這些子任務得到最終結果患雇。
因為是多線程跃脊,所以在代碼塊里操作線程不安全的Collection,就會引發(fā)Concurrency問題苛吱。比如
public class TestParallelStream {
public static void main(String[] args) {
List<String> alist = new ArrayList<String>(Arrays.asList("1","2","3","4","5","6","7"));
for(int i=0;i<10000;i++) {
Map<String, String> result = new HashMap<String, String>();
alist.parallelStream().forEach(item->{
result.put(item, item);
});
System.out.println("i="+i+",map大小:"+result.size());
}
}
}
結果如下
......
i=5678,map大小:7
i=5679,map大小:7
i=5680,map大小:6 //出現map只有6個元素的情況了
i=5681,map大小:7
......
從程序上看酪术,就是先將alist集合fork成多段,然后多線程添加到HashMap中,而HashMap的add方法并不能保證原子性绘雁。
方法一:將parallelStream改成stream橡疼、foreach,串行執(zhí)行
但是這樣其實就失去了優(yōu)化
方法二:給collection上鎖
//對要存儲元素的map要求線程安全
Map<String, String> result = new HashMap<String, String>();
修改為
Map<String, String> result = Collections.synchronizedMap(new HashMap());
Map<String, String> result = new ConcurrentHashMap<>(new HashMap());
方法三:使用java8中的收集器(使用parallelStream后使用collect )
我們使用外面的集合庐舟,無非是為了收集元素欣除。Java8 Stream的collect方法,就是收集Stream里的元素挪略,返回List历帚,Set或Map等,并且它是線程安全的杠娱。
List<String> results = Collections.synchronizedList(new ArrayList<>());
sources.parallelStream().forEach(source -> {
results.add(sigmaString(source));
});
使用用collect改寫上面的代碼:
List<String> results = sources.parallelStream()
.flatMap(source -> Stream.of(sigmaString(source)))
.collect(Collectors.toList());