這是本教程的最后一篇.我們還是用到了上一篇中提到的那個工具類和其中的兩個方法.請看上篇文章,來獲取此代碼.
AutomicInteger
** java.concurrent.atomic**包中,提供了大量的有用的類,來執(zhí)行原子性的操作.原子性的意思是,所有的操作要不就都執(zhí)行成功,要不就都不執(zhí)行成功.
這些原子性的類內(nèi)部,都使用了著名的** CAS指令.現(xiàn)代處理器都支持這條指令.相對于使用鎖的 synchronized**關(guān)鍵字來說,這條指令操作起來更快.所以,如果我們只想并發(fā)的對一個共享變量進行操作,那使用這些原子性的類,更為方便和快捷.
我們首先看一下** AutomicInteger**這個類如何使用:
通過使用** AutomicInteger類型,而不是 Integer類型,我們能夠以線程安全的方式,來并發(fā)的修改一個數(shù)值.而不需要跟以前一樣,使用 synchronized關(guān)鍵字或者顯式鎖. AutomicInteger類型提供的 incrementAndGet()**方法,是線程安全的,所以我們可以放心的在多個線程中使用這個方法,而不需要擔(dān)心那些預(yù)料之外的事情發(fā)生.
** AutomicInteger類型還提供了很多其他的原子性的方法.其中updateAndGet()**方法允許我們傳入一個lambda表達(dá)式,來對此數(shù)值就行操作.
** accumulateAndGet()方法,接受一種 IntBinaryOperator**類型的lambda表達(dá)式.在下面這個例子中,我們使用這個方法,來并發(fā)的計算0到1000的和.
** java.concurrent.atomic包中,還提供了很多其他的原子類.如 AutomicBoolean, AutomicLong, AutomicReference**等.
LongAddr
** LongAddr和 AtomicLong**相似,用于連續(xù)的為數(shù)值增加一個值.
** LongAddr還提供了其他的原子性的,線程安全的函數(shù),比如 add(),和 increment().但是,這些方法并不只是單純的計算一個結(jié)果,它還在內(nèi)部維護了很多變量,用于減少線程之間的沖突.我們可以通過 sum()方法和 sumThenRest()**方法,來獲取計算的結(jié)果.
這個類,在執(zhí)行寫操作的線程多于執(zhí)行讀操作的線程這種情景中,比較常用.通常用在那些需要獲取統(tǒng)計數(shù)據(jù)的情況中.比如,你想要計算服務(wù)器接受的請求數(shù).** LongAddr**的缺點是,由于要在內(nèi)存中維護大量的變量,所以它比較耗內(nèi)存.
LongAccumulator
** LongAccumulator和 LongAddr相似,但是更加常用.它執(zhí)行的不是簡單的操作,它接受的是 LongBinaryOperator**類型的lambda表達(dá)式.如下例所示:
我們通過函數(shù)** 2 * x + y和初始值1,創(chuàng)建了一個 LongAccumulator.每次調(diào)用 accumulate(i)**函數(shù),當(dāng)前值會作為lambda表達(dá)式的x值, i會作為lambda表達(dá)式的y值,傳遞到lambda表達(dá)式中.
** LongAccumulator就像 LongAddr**一樣,內(nèi)部也維護了大量的變量,來減少線程之間的沖突.
ConcurrentMap
** ConcurrentMap接口,擴充了 Map**接口,成為了并發(fā)編程中,最有用的一個接口.
我們想創(chuàng)建一個包含四對數(shù)據(jù)的** CouncurrentMap**.在后面我們將用它來實驗?zāi)切┖瘮?shù).
** forEach()方法接受一個類型為 BiConsumer的lambda表達(dá)式作為參數(shù),其中此lambda表達(dá)式的參數(shù)為map中的鍵值對.我們可以用 forEach()**這個方法來在當(dāng)前線程內(nèi),串行的迭代這個map.
** putIfAbsent()方法,會在給定的key沒有value時,為其添加一個value.至少在 ConcurrentHashMap**中,其實現(xiàn)是線程安全的.
** getOrDefault()**方法,會嘗試獲取給定key的value,如果不存在,則返回我們指定的默認(rèn)值.
** replaceAll()**方法,用于替換此Map中,滿足條件的項的value.
** compute()**方法,允許我們對特定的項進行轉(zhuǎn)換.
除了** compute()方法,還有兩個變體, computeIfAbsent()和 computeIfPresent()**,分別在給定的key不存在時和存在時進行操作.
** merge()**方法,用于對給定key的value進行操作,生成一個新的值.
ConcurrentHashMap
上面介紹的函數(shù),都是** ConcurrentMap這個接口提供的.這些函數(shù)可以被任何實現(xiàn)了 ConcurrentMap的類使用.除此之外, ConcurrentHashMap**還提供了很多其他用于并發(fā)操作的函數(shù).
就像parallel streams一樣,這些函數(shù)內(nèi)部都使用** ForkJoinPool,在Java8中,我們可以通過 ForkJoinPool.commonPool()函數(shù)來獲得一個 ForkJoinPool**.這個線程池,默認(rèn)可以使用的線程數(shù),取決于你的機器上的CPU上,有幾個核心.在我的四核的機器上,其為3.
我們可以通過設(shè)置JVM的參數(shù),來修改這個數(shù)值.
我們還是使用上面的那個包含四條數(shù)據(jù)的map.但是這里我們不使用** ConcurrentMap這個接口了,而是使用 ConcurrentHashMap這個具體實現(xiàn)類,來使用 ConcurrentHashMap**中特有的函數(shù).
Java8中,提供了三種并行操作的函數(shù):** forEach(), ** search()和** reduce()**.這些函數(shù)的第一個參數(shù),都是如果要啟動并發(fā)執(zhí)行的話,Collection的最小閾值.比如,如果我們設(shè)置了這個閾值為500,而map的大小為499,那就會在一個線程中,串行的執(zhí)行.而如果map的大小大于500,就會開啟多個線程,并行的執(zhí)行.在后面的例子中,我們將這個閾值設(shè)置為1,這就意味著,總是并行的執(zhí)行操作.
ForEach
** forEach()方法,用于并行的執(zhí)行迭代map中的key/value對的操作.因為在我的機器上, ForkJoinPool**的最大尺寸為3,所以在下面的例子中,你會看到,最多啟動了三個線程.
Search
** search()**函數(shù)用于并行的查找map中給定的key的值,如果找到,就返回其value,如果找不到,就返回null.如果會找到多個,則其返回值不確定.也要注意,ConcurrentHashMap中的元素是無序的.
還有一個只用于搜索map的value的方法,如下圖所示:
Reduce
** reduce()函數(shù),接受兩個類型為 BigFunction的lambda表達(dá)式.第一個lambda表達(dá)式,會將map中的每一個key/value對轉(zhuǎn)換成一個值,然后第二個lambda表達(dá)式,會將這些轉(zhuǎn)換后的值,拼接成一個單一的結(jié)果.它會忽略 null**.