在聊聊高并發(fā)(六)實現(xiàn)幾種自旋鎖(一) 這篇中實現(xiàn)了兩種基本的自旋鎖:TASLock和TTASLock,它們的問題是會進行頻繁的CAS操作阳液,引發(fā)大量的緩存一致性流量誓酒,導(dǎo)致鎖的性能不好诗轻。
對TTASLock的一種改進是BackoffLock,它會在鎖高爭用的情況下對線程進行回退屏歹,減少競爭隐砸,減少緩存一致性流量。但是BackoffLock有三個主要的問題:
1. 還是有大量的緩存一致性流量蝙眶,因為所有線程在同一個共享變量上旋轉(zhuǎn)季希,每一次成功的獲取鎖都會產(chǎn)生緩存一致性流量
2. 因為回退的存在,不能及時獲取鎖釋放的信息幽纷,存在一個時間差式塌,導(dǎo)致獲取鎖的時間變長
3. 不能保證無饑餓,有的線程可能一直無法獲取鎖
這篇會實現(xiàn)2種基于隊列的鎖友浸,來解決上面提到的三個問題峰尝。主要的思路是將線程組織成一個隊列,有4個優(yōu)點:
1. 每個線程只需要檢查它的前驅(qū)線程的狀態(tài)收恢,將自旋的變量從一個分散到多個武学,減少緩存一致性流量
2. 可以即使獲取鎖釋放的通知
3. 隊列提供了先來先服務(wù)的公平性
4. 無饑餓,隊列中的每個線程都能保證被執(zhí)行到
隊列鎖分為兩類伦意,一類是基于有界隊列火窒,一類是基于無界隊列。
先看一下基于有界隊列的隊列鎖默赂。 ArrayLock有3個特點:
1. 基于一個volatile數(shù)組來組織線程
2. 通過一個原子變量tail來表示對尾線程
3. 通過一個ThreadLocal變量給每個線程一個索引號暇赤,表示它位于隊列的哪個位置田藐。
package com.test.lock;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 有界隊列鎖,使用一個volatile數(shù)組來組織線程
* 缺點是得預(yù)先知道線程的規(guī)模n蛔琅,所有線程獲取同一個鎖的次數(shù)不能超過n
* 假設(shè)L把鎖疾捍,那么鎖的空間復(fù)雜度為O(Ln)
* **/
public class ArrayLock implements Lock{
// 使用volatile數(shù)組來存放鎖標(biāo)志奈辰, flags[i] = true表示可以獲得鎖
private volatile boolean[] flags;
// 指向新加入的節(jié)點的后一個位置
private AtomicInteger tail;
// 總?cè)萘? private final int capacity;
private ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(){
protected Integer initialValue() {
return 0;
}
};
public ArrayLock(int capacity){
this.capacity = capacity;
flags = new boolean[capacity];
tail = new AtomicInteger(0);
// 默認(rèn)第一個位置可獲得鎖
flags[0] = true;
}
@Override
public void lock() {
int slot = tail.getAndIncrement() % capacity;
mySlotIndex.set(slot);
// flags[slot] == true 表示獲得了鎖, volatile變量保證鎖釋放及時通知
while(!flags[slot]){
}
}
@Override
public void unlock() {
int slot = mySlotIndex.get();
flags[slot] = false;
flags[(slot + 1) % capacity] = true;
}
<pre name="code" class="java">
public String toString(){
return "ArrayLock";
}
}
我們可以看到有界隊列鎖的缺點是:
1. 它必須知道線程的規(guī)模數(shù)乱豆,對于同一把鎖如果線程獲取的次數(shù)超過了n會出現(xiàn)線程狀態(tài)被覆蓋的問題
2. 空間復(fù)雜度是O(Ln)
3. 對于共享的volatile數(shù)組來保存線程獲取鎖的狀態(tài)奖恰,仍然可能存在緩存一致性。我們知道CPU讀取一次內(nèi)存時宛裕,會讀滿數(shù)據(jù)總線的位長瑟啃,比如64位總線,一次讀取64位長度的數(shù)據(jù)揩尸。那么對于boolean類型的數(shù)組蛹屿,boolean長度是1個字節(jié),那么一次讀取能讀到8個boolean變量岩榆,而高速緩存的一個緩存塊的長度也是64位错负,也就是說一個緩存塊上可以保存8個boolean變量坟瓢,所以如果一次CAS操作修改了一個變量導(dǎo)致一個緩存塊無效,它實際上可能導(dǎo)致8個變量失效犹撒。
解決辦法是把變量以8個長度為單位分散折联,比如flag[0] = thread1 flag[8] = thread2。這樣的問題是消耗的空間更大识颊。
無界隊列鎖可以克服有界隊列鎖的幾個問題诚镰。
1. 它使用鏈表來代替數(shù)組,實現(xiàn)無界隊列
2. 使用兩個ThreadLocal變量表示指針谊囚,一個指向自己的節(jié)點怕享,一個指向前一個節(jié)點
3. 使用一個原子引用變量指向隊尾
4. 空間復(fù)雜度降低,如果有L把鎖镰踏,n個線程函筋,每個線程只獲取一把鎖,那么空間復(fù)雜度為O(L + n)
5. 對同一個鎖奠伪,一個線程可以多次獲取而不增加空間復(fù)雜度
6. 當(dāng)線程結(jié)束后谨敛,GC會自動回收內(nèi)存
package com.test.lock;
import java.util.concurrent.atomic.AtomicReference;
/**
* 無界隊列鎖,使用一個鏈表來組織線程
* 假設(shè)L把鎖炊甲,n個線程,那么鎖的空間復(fù)雜度為O(L+n)
* **/
public class CLHLock implements Lock{
// 原子變量指向隊尾
private AtomicReference<QNode> tail;
// 兩個指針,一個指向自己的Node,一個指向前一個Node
ThreadLocal<QNode> myNode;
ThreadLocal<QNode> myPreNode;
public CLHLock(){
tail = new AtomicReference<QNode>(new QNode());
myNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return new QNode();
}
};
myPreNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return null;
}
};
}
@Override
public void lock() {
QNode node = myNode.get();
node.lock = true;
// CAS原子操作,保證原子性
QNode preNode = tail.getAndSet(node);
myPreNode.set(preNode);
// volatile變量野崇,能保證鎖釋放及時通知
// 只對前一個節(jié)點的狀態(tài)自旋,減少緩存一致性流量
while(preNode.lock){
}
}
@Override
public void unlock() {
QNode node = myNode.get();
node.lock = false;
// 把myNode指向preNode蕴侣,目的是保證同一個線程下次還能使用這個鎖蝠筑,因為myNode原來指向的節(jié)點有它的后一個節(jié)點的preNode引用
// 防止這個線程下次lock時myNode.get獲得原來的節(jié)點
myNode.set(myPreNode.get());
}
public static class QNode {
volatile boolean lock;
}
public String toString(){
return "CLHLock";
}
}
下面我們從正確性和平均獲取鎖的時間上來測試這兩種鎖挽封。
我們設(shè)計一個測試用例來驗證正確性: 使用50個線程對一個volatile變量++操作,由于volatile變量++操作不是原子的,在不加鎖的情況下,可能同時有多個線程同時對voaltile變量++, 最終的結(jié)果是無法預(yù)測的苗踪。然后使用這兩種鎖瓦呼,先獲取鎖再volatile變量++央串,由于volatile變量會防止重排序质和,并能保證可見性厦酬,我們可以確定如果鎖是正確獲取的昌讲,也就是說同一時刻只有一個線程對volatile變量++,那么結(jié)果肯定是順序的1到50筹裕。
先看不加鎖的情況
package com.test.lock;
public class Main {
//private static Lock lock = new ArrayLock(150);
private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
//lock.lock();
System.out.println("Value: " + ++value);
//lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
運行結(jié)果: 我們可以看到確實是發(fā)生的線程同時對volatile變量++的操作扎运,結(jié)果是無法預(yù)料的
Value: 1Value: 1Value: 2Value: 3Value: 4Value: 5Value: 6Value: 7Value: 8Value: 9Value: 10Value: 11Value: 13Value: 12Value: 14Value: 15Value: 16Value: 17Value: 18Value: 19Value: 20Value: 21Value: 22Value: 23Value: 24Value: 25Value: 26Value: 27Value: 28Value: 29Value: 30Value: 31Value: 32Value: 33Value: 34Value: 35Value: 36Value: 37Value: 38Value: 37Value: 39Value: 40Value: 41Value: 42Value: 43Value: 44Value: 45Value: 46Value: 47Value: 48Value: 50
使用有界隊列鎖:
package com.test.lock;
public class Main {
private static Lock lock = new ArrayLock(100);
//private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
運行結(jié)果是1到50的順序自增瑟曲,說明鎖保證了同一時刻只有一個線程在對volatile變量++,是正確的
Value: 1Value: 2Value: 3Value: 4Value: 5Value: 6Value: 7Value: 8Value: 9Value: 10Value: 11Value: 12Value: 13Value: 14Value: 15Value: 16Value: 17Value: 18Value: 19Value: 20Value: 21Value: 22Value: 23Value: 24Value: 25Value: 26Value: 27Value: 28Value: 29Value: 30Value: 31Value: 32Value: 33Value: 34Value: 35Value: 36Value: 37Value: 38Value: 39Value: 40Value: 41Value: 42Value: 43Value: 44Value: 45Value: 46Value: 47Value: 48Value: 49Value: 50
使用無界隊列鎖的情況也是正確的豪治,由于篇幅原因這里就不帖代碼了洞拨。
再看平均獲取鎖的時間。
package com.test.lock;
public class Main {
private static Lock lock = new TimeCost(new CLHLock());
//private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
//System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 100; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
在100個線程并發(fā)的情況下负拟,
ArrayLock獲取鎖的平均時間是: 719550 ns
CLHLock獲取鎖的平均時間是: 488577 ns
可以看到烦衣,隊列鎖在使用多個共享變量自旋的情況下,減少了一致性流量掩浙,比TASLock和TTASLock 提高了程序的性能花吟。而CLHLock比ArrayLock有更好的擴展性和性能,是一種很好的自旋鎖實現(xiàn)厨姚。