今天來更新一下第三章的下半部分內(nèi)容,喜歡的可以關(guān)注和轉(zhuǎn)發(fā)哦~
小編會努力更新噠!
3. JDK的并發(fā)容器
- 并發(fā)集合
ConcurrentHashMap:這是一個(gè)高效的并發(fā)HashMap.你可以把它理解為一個(gè)線程安全的HashMap研铆。
CopyOnWriteArrayList:這是一個(gè)List抒倚,從名字看就知道它和ArrayList是一族的夭拌。在讀多寫少的場合小作,這個(gè)List的性能非常好,遠(yuǎn)遠(yuǎn)優(yōu)于Vector即纲。
ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)博肋〉驼可以看作一個(gè)線程安全的LinkedList.
BlockingQueue:這是一個(gè)接口,JDK內(nèi)部通過鏈表匪凡、數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口膊畴。表示阻塞隊(duì)列,非常適合作為數(shù)據(jù)共享的通道病游。
ConcurrentSkipListMap:跳表的實(shí)現(xiàn)唇跨。這是一個(gè)Map,使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行快速查找衬衬。
- 線程安全的HashMap
可用Collections類來使普通HashMap轉(zhuǎn)為線程安全的map
Collections.synchronizedMap(new HashMap())
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // 傳入的map
final Object mutex; // 鎖資源對象,對map的任何操作都會鎖該對象
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
....... //省略
}
-
List的線程安全
Collections.synchronizedList(new LinkedList<String>())
- 高效讀寫隊(duì)列ConcurrentLinkedQueue類
高并發(fā)環(huán)境中性能最好的隊(duì)列,主要是利用CAS進(jìn)行無鎖操作,非阻塞隊(duì)列买猖。首先我們來看下它的Node節(jié)點(diǎn):
private static class Node<E> {
volatile E item; //當(dāng)前對象
volatile Node<E> next; //下一個(gè)對象,以此來構(gòu)建鏈表
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) { //(期望值,設(shè)置目標(biāo)值),cas操作
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
ConcurrentLinkedQueue類內(nèi)部的tail指針更新并不是實(shí)時(shí)的,可能存在拖延現(xiàn)象,每次更新跳躍兩個(gè)元素,如下圖:
然后再看一下新增節(jié)點(diǎn)offer()方法:
public boolean offer(E e) {
checkNotNull(e); //非空校驗(yàn)
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { //for循環(huán) 無出口,知道設(shè)置成功
Node<E> q = p.next; //獲取tail節(jié)點(diǎn)的next對象
if (q == null) { //第一次插入,p.next對象為空
// p 為最后一個(gè)節(jié)點(diǎn)
if (p.casNext(null, newNode)) { //插入新元素,此時(shí)p=t
//每兩次更新tail
if (p != t)
casTail(t, newNode);
return true;
}
// cas競爭失敗,再次循環(huán)
}
else if (p == q) //遇到哨兵
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q; //t!=(t=tail) !=并不是原子操作,先取左邊t的值,再取右邊t=tail
}
}
- 高效讀取:不變模式下的CopyOnWriteArrayList類
使用場景: 讀操作遠(yuǎn)遠(yuǎn)大于寫操作,讀操作越快越好,寫操作慢一些也沒事
特點(diǎn): 讀取不用加鎖,寫入不會阻塞讀取操作,只有寫入和寫入需要同步等待,讀性能大幅提升
原理: 寫入時(shí)進(jìn)行一次自我復(fù)制,修改內(nèi)容寫入副本中,寫完后,再用副本內(nèi)容替代原來的數(shù)據(jù)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //進(jìn)行復(fù)制
newElements[len] = e; //新數(shù)組代替老數(shù)組
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
- 數(shù)據(jù)共享通道:BlockingQueue
BlockingQueue是接口,實(shí)現(xiàn)類有ArrayBlockingQueue以及LinkedBlockingQueue.當(dāng)BlockingQueue為空時(shí),會等待,當(dāng)有消息進(jìn)入隊(duì)列后,自動喚醒線程,Condition.await()和Condition.signal(),祥見上一篇 Condition重入鎖
注意:
一般生產(chǎn)者消費(fèi)者模型中,往往采用BlockingQueue而不是ConcurrentLinkedQueeu,因?yàn)锽lockingQueue帶有阻塞功能,可以控制生產(chǎn)消費(fèi)者的速率(await和signal)
- 隨機(jī)數(shù)據(jù)結(jié)構(gòu):跳表
使用環(huán)境:高并發(fā)環(huán)境
特點(diǎn):快速查找,類似平衡樹,平衡樹插入和刪除往往會引起一次全局調(diào)整,而跳表只需局部調(diào)整,且在高并發(fā)環(huán)境下,平衡樹需要全局鎖,而跳表只需要局部;隨機(jī)算法,跳表的本質(zhì)是維護(hù)多個(gè)鏈表;有序性
原理:如下
4. JMH性能測試
用于測試方法的執(zhí)行效率,精度達(dá)毫秒級.
maven:
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.18</version>
<scope>provided</scope>
</dependency>
基本概念
1.模式(Model):
model表示JMH的測量方式和角度,共四種
Throughput:整體吞吐量,一秒可執(zhí)行多少次
AverageTime:調(diào)用平均時(shí)間
SampleTime:隨機(jī)取樣,最后輸出取樣結(jié)果,如"99%的調(diào)用在xxx毫秒內(nèi)"
SingleShotTime:只運(yùn)行一次,無warmup(熱身),用于測試啟動時(shí)的性能
2.迭代(Iteration)
迭代表示一次測試單位,一般為1秒
3.預(yù)熱(warmup)
預(yù)熱是為了測試在JIT編譯后的性能
4.狀態(tài)(State)
指測試范圍,一種是線程范,一個(gè)線程一個(gè)對象.另外一種是基準(zhǔn)測試范圍(Benchmark),多個(gè)線程共享一個(gè)實(shí)例
5.配置類(Options)
指定一些參數(shù),如指定測試類(include),使用進(jìn)程個(gè)數(shù)(fork),預(yù)熱迭代次數(shù)(warmuoIterations)
代碼
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class ListTest {
CopyOnWriteArrayList smallCopyOnWriteList = new CopyOnWriteArrayList();
ConcurrentLinkedQueue smallConcurrentList = new ConcurrentLinkedQueue();
CopyOnWriteArrayList bigCopyOnWriteList = new CopyOnWriteArrayList();
ConcurrentLinkedQueue bigConcurrentList = new ConcurrentLinkedQueue();
@Setup
public void setup() {
for (int i = 0; i < 10; i++) {
smallCopyOnWriteList.add(new Object());
smallConcurrentList.add(new Object());
}
for (int i = 0; i < 1000; i++) {
bigCopyOnWriteList.add(new Object());
bigCopyOnWriteList.add(new Object());
}
}
@Benchmark
public void copyOnWriteGet() {
smallCopyOnWriteList.get(0);
}
@Benchmark
public void copyOnWriteSize() {
smallCopyOnWriteList.size();
}
@Benchmark
public void concurrentListGet() {
smallConcurrentList.peek();
}
@Benchmark
public void concurrentListSize() {
smallConcurrentList.size();
}
@Benchmark
public void smallCopyOnWriteWrite() {
smallCopyOnWriteList.add(new Object());
smallCopyOnWriteList.remove(0);
}
@Benchmark
public void smallConcurrentListWrite() {
smallConcurrentList.add(new Object());
smallConcurrentList.remove(0);
}
@Benchmark
public void bigCopyOnWriteWrite() {
bigCopyOnWriteList.add(new Object());
bigCopyOnWriteList.remove(0);
}
@Benchmark
public void bigConcurrentListWrite() {
bigConcurrentList.offer(new Object());
bigConcurrentList.remove(0);
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder().include(ListTest.class.getSimpleName()).forks(1).warmupIterations(5)
.measurementIterations(5).threads(4).build();
new Runner(opt).run();
}
}
- 性能思考
hashmap和concurrenthashmap的對比
單線程下,hashmap的get方法比concurrenthashmap略慢,size()方法卻快得多,同步hashmap,size方法僅比concurrenthashmap略快一點(diǎn)
CopyOnWriteArrayList類與ConcurrentLinkedQueue類
當(dāng)元素總量不大時(shí),絕大部分場景中CopyOnWriteArrayList性能要優(yōu)于ConcurrentLinkedQueue