模擬hadoop的計(jì)算單詞個(gè)數(shù)統(tǒng)計(jì)的helloword程序蠕啄,計(jì)算給定語句的單詞出現(xiàn)次數(shù)丧没。進(jìn)一步優(yōu)化:按照次數(shù)進(jìn)行排序后輸出们拙。
使用了parallel并行計(jì)算癞季,三個(gè)參數(shù)的reduce進(jìn)行縮減計(jì)算劫瞳。三個(gè)參數(shù)時(shí)是最難以理解的。先來看其定義:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner)
分析下它的三個(gè)參數(shù):
- identity: 一個(gè)初始化的值绷柒;這個(gè)初始化的值其類型是泛型U志于,與Reduce方法返回的類型一致;注意此時(shí)Stream中元素的類型是T废睦,與U可以不一樣也可以一樣伺绽,這樣的話操作空間就大了;不管Stream中存儲(chǔ)的元素是什么類型,U都可以是任何類型奈应,如U可以是一些基本數(shù)據(jù)類型的包裝類型Integer澜掩、Long等;或者是String杖挣,又或者是一些集合類型ArrayList等肩榕;
- accumulator: 其類型是BiFunction,輸入是U與T兩個(gè)類型的數(shù)據(jù)惩妇,而返回的是U類型株汉;也就是說返回的類型與輸入的第一個(gè)參數(shù)類型是一樣的,而輸入的第二個(gè)參數(shù)類型與Stream中元素類型是一樣的歌殃。
- combiner: 其類型是BinaryOperator郎逃,支持的是對(duì)U類型的對(duì)象進(jìn)行操作;
代碼如下:
//1. 給定一段英文句子挺份,單詞用空格間隔
String words = " guo xiu zhi guo map reduce guo hadoop java stream stream parallel guo guo";
//2.1 由于是多線程要使用 ConcurrentHashMap褒翰,HashMap不支持并發(fā)操作
//2.2 給定語句通過空格分割成數(shù)組,轉(zhuǎn)換成普通Stream匀泊,再轉(zhuǎn)換并行流
//2.3 并行流進(jìn)行三參數(shù)的reduce計(jì)算优训。第一個(gè)參數(shù)是返回值,第二個(gè)參數(shù)是累加器各聘,第三個(gè)參數(shù)是各個(gè)線程返回值的合并操作揣非。
ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
Integer num = map.get(w);
if (num == null) {//這個(gè)單詞如果不存在于map中
map.put(w, 1);
} else {//這個(gè)單詞如果存在于map中,數(shù)量+1躲因,新值放入map
map.put(w, num + 1);
}
return map;
}, (m1, m2) -> {
m1.putAll(m2);//把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中
return m1;
});
//根據(jù)單詞的出現(xiàn)次數(shù)倒序排列后輸出到List對(duì)象
List<Entry<String, Integer>> collect = reduceMap.entrySet().stream().sorted((o1, o2) -> o2.getValue() - o1.getValue()).collect(Collectors.toList());
//輸出排序后的list數(shù)據(jù)
System.out.println("reduceMap = " + collect);
輸出內(nèi)容:
reduceMap = [guo=4, stream=2, reduce=1, =1, xiu=1, java=1, parallel=1, zhi=1, hadoop=1, map=1]
但是我們發(fā)現(xiàn)數(shù)據(jù)偶爾會(huì)有丟失早敬。對(duì)于parrallel之后元素?cái)?shù)量不固定的原因,就是多線程有可能同時(shí)讀取到相同的下標(biāo)n然后同時(shí)賦值大脉,這樣就會(huì)出現(xiàn)元素缺失的問題搞监。無非就是多個(gè)線程賦值可能同時(shí)操作同一個(gè)地址,后賦值的把先賦值的給覆蓋掉了镰矿,才會(huì)出現(xiàn)這種問題琐驴。所以在使用paralleStream
時(shí)不使用foreach、map
操作秤标,完成collect后的新Stream再使用绝淡。
什么方法可以防止并行流出現(xiàn)線程不安全操作?
-
【推薦方法】Java8 Stream的collect方法苍姜,就是收集Stream里的元素牢酵,返回List,Set或Map等衙猪,并且它是線程安全的馍乙。那就是最后調(diào)用collect(Collectors.tolist())玉罐,這種收集起來所有元素到新集合是線程安全的。在采用并行流收集元素到集合中時(shí)潘拨,由于我們使用了并發(fā)流要使用
Collectors.toConcurrentMap
收集到ConcurrentMap
,而不是使用普通的toMap()饶号,toList()
方法進(jìn)行收集铁追。
ConcurrentMap<String, Integer> toConcurrentMap = Arrays.stream(words.split(" ")).parallel().collect(Collectors.toConcurrentMap(s -> s, s -> 1,
(integer, integer2) -> integer + integer2));
List<Entry<String, Integer>> orderedCollection = toConcurrentMap.entrySet().stream().sorted((o1, o2) -> o2.getValue() - o1.getValue()).collect(Collectors.toList());
System.out.println("reduceMap = " + orderedCollection);
沒有使用外部定義的集合,并且使用到了parallelStream并行處理的優(yōu)勢(shì)茫船。輸出的數(shù)據(jù)不再丟失
reduceMap = [stream=3, guo=2, zhi=2, reduce=1, java=1, xiu=1, parallel=1, hadoop=1, map=1]
- 給map操作加鎖
Lock lock = new ReentrantLock();
ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
lock.lock();
Integer num = map.get(w);
if (num == null) {//這個(gè)單詞如果不存在于map中
map.put(w, 1);
} else {//這個(gè)單詞如果存在于map中琅束,數(shù)量+1,新值放入map
map.put(w, num + 1);
}
lock.unlock();
return map;
}, (m1, m2) -> {
//把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中算谈,m1和m2是一個(gè)對(duì)象涩禀,所以直接返回m1
System.out.println(Thread.currentThread().getName() + "," + (m1 == m2));
return m1;
});
- 抽取map操作片段為單獨(dú)方法
getStringIntegerConcurrentHashMap
,加上synchronized
ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
return getStringIntegerConcurrentHashMap(map, w);
}, (m1, m2) -> {
//把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中然眼,m1和m2是一個(gè)對(duì)象艾船,所以直接返回m1
System.out.println(Thread.currentThread().getName() + "," + (m1 == m2));
return m1;
});
public synchronized static ConcurrentHashMap<String, Integer> getStringIntegerConcurrentHashMap(ConcurrentHashMap<String, Integer> map, String w) {
Integer num = map.get(w);
if (num == null) {//這個(gè)單詞如果不存在于map中
map.put(w, 1);
} else {//這個(gè)單詞如果存在于map中,數(shù)量+1高每,新值放入map
map.put(w, num + 1);
}
return map;
}