使用Stream實(shí)現(xiàn)多線程計(jì)算

模擬hadoop的計(jì)算單詞個(gè)數(shù)統(tǒng)計(jì)的helloword程序蠕啄,計(jì)算給定語句的單詞出現(xiàn)次數(shù)丧没。進(jìn)一步優(yōu)化:按照次數(shù)進(jìn)行排序后輸出们拙。

使用了parallel并行計(jì)算癞季,三個(gè)參數(shù)的reduce進(jìn)行縮減計(jì)算劫瞳。三個(gè)參數(shù)時(shí)是最難以理解的。先來看其定義:

<U> U reduce(U identity,
        BiFunction<U, ? super T, U> accumulator,
        BinaryOperator<U> combiner)

分析下它的三個(gè)參數(shù):

  1. identity: 一個(gè)初始化的值绷柒;這個(gè)初始化的值其類型是泛型U志于,與Reduce方法返回的類型一致;注意此時(shí)Stream中元素的類型是T废睦,與U可以不一樣也可以一樣伺绽,這樣的話操作空間就大了;不管Stream中存儲(chǔ)的元素是什么類型,U都可以是任何類型奈应,如U可以是一些基本數(shù)據(jù)類型的包裝類型Integer澜掩、Long等;或者是String杖挣,又或者是一些集合類型ArrayList等肩榕;
  2. accumulator: 其類型是BiFunction,輸入是U與T兩個(gè)類型的數(shù)據(jù)惩妇,而返回的是U類型株汉;也就是說返回的類型與輸入的第一個(gè)參數(shù)類型是一樣的,而輸入的第二個(gè)參數(shù)類型與Stream中元素類型是一樣的歌殃。
  3. combiner: 其類型是BinaryOperator郎逃,支持的是對(duì)U類型的對(duì)象進(jìn)行操作;

代碼如下:

        //1. 給定一段英文句子挺份,單詞用空格間隔
        String words = " guo xiu zhi guo map reduce guo hadoop java stream stream parallel guo guo";
        //2.1  由于是多線程要使用 ConcurrentHashMap褒翰,HashMap不支持并發(fā)操作
        //2.2  給定語句通過空格分割成數(shù)組,轉(zhuǎn)換成普通Stream匀泊,再轉(zhuǎn)換并行流
        //2.3  并行流進(jìn)行三參數(shù)的reduce計(jì)算优训。第一個(gè)參數(shù)是返回值,第二個(gè)參數(shù)是累加器各聘,第三個(gè)參數(shù)是各個(gè)線程返回值的合并操作揣非。
        ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
            Integer num = map.get(w);
            if (num == null) {//這個(gè)單詞如果不存在于map中
                map.put(w, 1);
            } else {//這個(gè)單詞如果存在于map中,數(shù)量+1躲因,新值放入map
                map.put(w, num + 1);
            }
            return map;
        }, (m1, m2) -> {
            m1.putAll(m2);//把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中
            return m1;
        });

        //根據(jù)單詞的出現(xiàn)次數(shù)倒序排列后輸出到List對(duì)象
        List<Entry<String, Integer>> collect = reduceMap.entrySet().stream().sorted((o1, o2) -> o2.getValue() - o1.getValue()).collect(Collectors.toList());
        //輸出排序后的list數(shù)據(jù)
        System.out.println("reduceMap = " + collect);

輸出內(nèi)容:
reduceMap = [guo=4, stream=2, reduce=1, =1, xiu=1, java=1, parallel=1, zhi=1, hadoop=1, map=1]

統(tǒng)計(jì)單詞數(shù)量結(jié)果

但是我們發(fā)現(xiàn)數(shù)據(jù)偶爾會(huì)有丟失早敬。對(duì)于parrallel之后元素?cái)?shù)量不固定的原因,就是多線程有可能同時(shí)讀取到相同的下標(biāo)n然后同時(shí)賦值大脉,這樣就會(huì)出現(xiàn)元素缺失的問題搞监。無非就是多個(gè)線程賦值可能同時(shí)操作同一個(gè)地址,后賦值的把先賦值的給覆蓋掉了镰矿,才會(huì)出現(xiàn)這種問題琐驴。所以在使用paralleStream時(shí)不使用foreach、map操作秤标,完成collect后的新Stream再使用绝淡。

什么方法可以防止并行流出現(xiàn)線程不安全操作?

  1. 【推薦方法】Java8 Stream的collect方法苍姜,就是收集Stream里的元素牢酵,返回List,Set或Map等衙猪,并且它是線程安全的馍乙。那就是最后調(diào)用collect(Collectors.tolist())玉罐,這種收集起來所有元素到新集合是線程安全的。在采用并行流收集元素到集合中時(shí)潘拨,由于我們使用了并發(fā)流要使用Collectors.toConcurrentMap收集到ConcurrentMap,而不是使用普通的toMap()饶号,toList()方法進(jìn)行收集铁追。
ConcurrentMap<String, Integer> toConcurrentMap = Arrays.stream(words.split(" ")).parallel().collect(Collectors.toConcurrentMap(s -> s, s -> 1,
                (integer, integer2) -> integer + integer2));
List<Entry<String, Integer>> orderedCollection = toConcurrentMap.entrySet().stream().sorted((o1, o2) -> o2.getValue() - o1.getValue()).collect(Collectors.toList());
System.out.println("reduceMap = " + orderedCollection);

沒有使用外部定義的集合,并且使用到了parallelStream并行處理的優(yōu)勢(shì)茫船。輸出的數(shù)據(jù)不再丟失
reduceMap = [stream=3, guo=2, zhi=2, reduce=1, java=1, xiu=1, parallel=1, hadoop=1, map=1]

  1. 給map操作加鎖

        Lock lock = new ReentrantLock();
        ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
            lock.lock();
            Integer num = map.get(w);
            if (num == null) {//這個(gè)單詞如果不存在于map中
                map.put(w, 1);
            } else {//這個(gè)單詞如果存在于map中琅束,數(shù)量+1,新值放入map
                map.put(w, num + 1);
            }
            lock.unlock();
            return map;
        }, (m1, m2) -> {
            //把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中算谈,m1和m2是一個(gè)對(duì)象涩禀,所以直接返回m1
            System.out.println(Thread.currentThread().getName() + "," + (m1 == m2));
            return m1;
        });
  1. 抽取map操作片段為單獨(dú)方法getStringIntegerConcurrentHashMap,加上synchronized
        ConcurrentHashMap<String, Integer> reduceMap = Arrays.stream(words.split(" ")).parallel().reduce(new ConcurrentHashMap<String, Integer>(), (map, w) -> {
            return getStringIntegerConcurrentHashMap(map, w);
        }, (m1, m2) -> {
            //把并行計(jì)算的各個(gè)結(jié)果歸并整合到m1中然眼,m1和m2是一個(gè)對(duì)象艾船,所以直接返回m1
            System.out.println(Thread.currentThread().getName() + "," + (m1 == m2));
            return m1;
        });

    public synchronized static ConcurrentHashMap<String, Integer> getStringIntegerConcurrentHashMap(ConcurrentHashMap<String, Integer> map, String w) {
        Integer num = map.get(w);
        if (num == null) {//這個(gè)單詞如果不存在于map中
            map.put(w, 1);
        } else {//這個(gè)單詞如果存在于map中,數(shù)量+1高每,新值放入map
            map.put(w, num + 1);
        }
        return map;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末屿岂,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鲸匿,更是在濱河造成了極大的恐慌爷怀,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件带欢,死亡現(xiàn)場離奇詭異运授,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)乔煞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門吁朦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人渡贾,你說我怎么就攤上這事喇完。” “怎么了剥啤?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵锦溪,是天一觀的道長。 經(jīng)常有香客問我府怯,道長刻诊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任牺丙,我火速辦了婚禮则涯,結(jié)果婚禮上复局,老公的妹妹穿的比我還像新娘。我一直安慰自己粟判,他們只是感情好亿昏,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著档礁,像睡著了一般角钩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上呻澜,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天递礼,我揣著相機(jī)與錄音,去河邊找鬼羹幸。 笑死脊髓,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的栅受。 我是一名探鬼主播将硝,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼屏镊!你這毒婦竟也來了袋哼?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤闸衫,失蹤者是張志新(化名)和其女友劉穎涛贯,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蔚出,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡弟翘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了骄酗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稀余。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖趋翻,靈堂內(nèi)的尸體忽然破棺而出睛琳,到底是詐尸還是另有隱情,我是刑警寧澤踏烙,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布师骗,位于F島的核電站,受9級(jí)特大地震影響讨惩,放射性物質(zhì)發(fā)生泄漏辟癌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一荐捻、第九天 我趴在偏房一處隱蔽的房頂上張望黍少。 院中可真熱鬧寡夹,春花似錦、人聲如沸厂置。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昵济。三九已至智绸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間砸紊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來泰國打工囱挑, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留醉顽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓平挑,卻偏偏與公主長得像游添,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子通熄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355