最近在閱讀《多處理器編程藝術(shù)》一書是尔,掌握了很多Java多線程的底層知識憨闰,現(xiàn)在就做一下書中鏈表-鎖的作用一章的總結(jié)。
?為了節(jié)約你的時間礁阁,本文主要內(nèi)容如下:
- 帶鎖的鏈表隊(duì)列
- 細(xì)粒度同步
- 樂觀同步
- 惰性同步
- 非阻塞同步
粗粒度同步
?所謂粗粒度同步其實(shí)很簡單护桦,就是在List的add
,remove
,contains
函數(shù)的開始就直接使用Lock加鎖含衔,然后在函數(shù)結(jié)尾釋放。
?add
函數(shù)的代碼如下所示嘶炭,函數(shù)的主體就是鏈表的遍歷添加邏輯抱慌,只不過在開始和結(jié)束進(jìn)行了鎖的獲取和釋放逊桦。
private Node head;
private Lock lock = new ReentrantLock();
public boolean add(T item) {
Node pred, curr;
int key = item.hashCode();
lock.lock();
try {
pred = head;
curr = pred.next;
while(curr.key < key) {
pred = curr;
curr = pred.next;
}
if (key == curr.key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
return true;
}
} finally {
lock.unlock();
}
}
?大家看到這里就會想到眨猎,這不就是類似于Hashtable
的實(shí)現(xiàn)方式嗎?把可能出現(xiàn)多線程問題的函數(shù)都用一個重入鎖鎖住强经。但是這個方法的缺點(diǎn)很明顯睡陪,如果競爭激烈的話,對鏈表的操作效率會很低,因?yàn)?code>add,remove
,contains
三個函數(shù)都需要獲取鎖兰迫,也都需要等待鎖的釋放信殊。至于如何優(yōu)化,我們可以一步一步往下看
細(xì)粒度同步
?我們可以通過鎖定單個節(jié)點(diǎn)而不是整個鏈表來提高并發(fā)汁果。給每個節(jié)點(diǎn)增加一個Lock變量以及相關(guān)的lock()和unlock()函數(shù),當(dāng)線程遍歷鏈表的時候涡拘,若它是第一個訪問節(jié)點(diǎn)的線程,則鎖住被訪問的節(jié)點(diǎn)据德,在隨后的某個時刻釋放鎖鳄乏。這種細(xì)粒度的鎖機(jī)制允許并發(fā)線程以流水線的方式遍歷鏈表。
?使用這種方式來遍歷鏈表棘利,必須同時獲取兩個相鄰節(jié)點(diǎn)的鎖橱野,通過“交叉手”的方式來獲取鎖:除了初始的head哨兵節(jié)點(diǎn)外,只有在已經(jīng)獲取pred的鎖時善玫,才能獲取curr的鎖水援。
//每個Node對象中都有一個Lock對象,可以進(jìn)行l(wèi)ock()和unlock()操作
public boolean add(T item) {
int key = item.hashCode();
head.lock();
Node pred = head;
try {
Node curr = pred.next;
curr.lock();
try {
while (curr.key < key) {
pred.unlock();
pred = curr;
curr = pred.next;
curr.lock();
}
if (curr.key == key) {
return false;
}
Node newNode = new Node(item);
newNode.next = curr;
pred.next = newNode;
return true;
} finally {
curr.unlock();
}
} finally {
pred.unlock();
}
}
樂觀同步
?雖然細(xì)粒度鎖是對單一粒度鎖的一種改進(jìn)茅郎,但它仍然出現(xiàn)很長的獲取鎖和釋放鎖的序列蜗元。而且,訪問鏈表中不同部分的線程仍然可能相互阻塞只洒。例如许帐,一個正在刪除鏈表中第二個元素的線程將會阻塞所有試圖查找后繼節(jié)點(diǎn)的線程。
?減少同步代價(jià)的一種方式就是樂觀:不需要獲取鎖就可以查找毕谴,對找到的節(jié)點(diǎn)進(jìn)行加鎖成畦,然后確認(rèn)鎖住的節(jié)點(diǎn)是正確的;如果一個同步?jīng)_突導(dǎo)致節(jié)點(diǎn)被錯誤的鎖定涝开,則釋放這些鎖重新開始循帐。
public boolean add(T item) {
int key = item.hashCode();
while (true) { //如果不成功,就進(jìn)行重試
Node pred = head;
Node curr = pred.next;
while (curr.key < key) {
pred = curr;
curr = pred.next;
}
//找到目標(biāo)相關(guān)的pred和curr之后再將二者鎖住
pred.lock();
curr.lock();
try {
//鎖住二者之后再進(jìn)行判斷舀武,是否存在并發(fā)沖突
if (validate(pred, curr)) {
//如果不存在拄养,那么就直接進(jìn)行正常操作
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
}
}
} finally {
pred.unlock();
curr.unlock();
}
}
}
public boolean validate(Node pred, Node curr) {
//從隊(duì)列頭開始查找pred和curr,判斷是否存在并發(fā)沖突
Node node = head;
while (node.key <= pred.key) {
if (node == pred) {
return pred.next == curr;
}
node = node.next;
}
return false;
}
?由于不再使用能保護(hù)并發(fā)修改的鎖,所以每個方法調(diào)用都可能遍歷那些已經(jīng)被刪除的節(jié)點(diǎn)银舱,所以在進(jìn)行添加瘪匿,刪除獲取判斷是否存在的之前必須再次進(jìn)行驗(yàn)證。
惰性同步
?當(dāng)不用鎖遍歷兩次鏈表的代價(jià)比使用鎖遍歷一次鏈表的代價(jià)小很多時寻馏,樂觀同步的實(shí)現(xiàn)效果非常好棋弥。但是這種算法的缺點(diǎn)之一就是contains()方法在遍歷時需要鎖,這一點(diǎn)并不令人滿意诚欠,其原因在于對contains()的調(diào)用要比其他方法的調(diào)用頻繁得多顽染。
?使用惰性同步的方法漾岳,使得contains()調(diào)用是無等待的,同時add()和remove()方法即使在被阻塞的情況下也只需要遍歷一次鏈表粉寞。
?對每個節(jié)點(diǎn)增加一個布爾類型的marked域尼荆,用于說明該節(jié)點(diǎn)是否在節(jié)點(diǎn)集合中。現(xiàn)在唧垦,遍歷不再需要鎖定目標(biāo)結(jié)點(diǎn)捅儒,也沒有必須通過重新遍歷整個鏈表來驗(yàn)證結(jié)點(diǎn)是否可達(dá)。所有未被標(biāo)記的節(jié)點(diǎn)必然是可達(dá)的振亮。
//add方法和樂觀同步的方法一致野芒,只有檢驗(yàn)方法做了修改。
//只需要檢測節(jié)點(diǎn)的marked變量就可以双炕,并且查看pred的next是否還是指向curr狞悲,需要注意的是marked變量一定是voliate的。
private boolean validate(Node pred, Node curr) {
return !pred.marked && !curr.marked && pred.next == curr;
}
?惰性同步的優(yōu)點(diǎn)之一就是能夠?qū)㈩愃朴谠O(shè)置一個flag這樣的邏輯操作與類似于刪除結(jié)點(diǎn)的鏈接這種對結(jié)構(gòu)的物理改變分開妇斤。通常情況下摇锋,延遲操作可以是批量處理方式進(jìn)行,且在某個方便的時候再懶惰地進(jìn)行處理站超,從而降低了對結(jié)構(gòu)進(jìn)行物理修改的整體破裂性荸恕。惰性同步的主要缺點(diǎn)是add()和remove()調(diào)用是阻塞的:如果一個線程延遲,那么其他線程也將延遲死相。
非阻塞同步
?使用惰性同步的思維是非常有益處的融求。我們可以進(jìn)一步將add(),remove()和contains()這三個方法都變成非阻塞的。前兩個方法是無鎖的算撮,最后一個方法是無等待的生宛。我們無法直接使用compareAndSet()來改變next域來實(shí)現(xiàn),因?yàn)檫@樣會出現(xiàn)問題肮柜。但是我們可以將結(jié)點(diǎn)的next域和marked域看作是單個的原子單位:當(dāng)marked域?yàn)閠rue時陷舅,對next域的任何修改都將失敗。
?我們可以使用AtomicMarkableReference<T>對象將指向類型T的對象引用next和布爾值marked封裝在一起审洞。這些域可以一起或單個地原子更新莱睁。可以讓每個結(jié)點(diǎn)的next域?yàn)橐粋€AtomicMarkableReference<Node>芒澜。線程可以通過設(shè)置結(jié)點(diǎn)next域中的標(biāo)記位來邏輯地刪除curr,和其他正在執(zhí)行add()和remove()的線程共享物理刪除:當(dāng)每個線程遍歷鏈表時仰剿,通過物理刪除所有被標(biāo)記的節(jié)點(diǎn)來清理鏈表。
public Window find(Node head, int key) {
Node pred = null, curr = null, succ = null;
boolean[] marked = {false};
boolean snip;
retry: while(true) {
pred = head;
curr = curr.next.get(marked);
while(true) {
succ = curr.next.get(marked); //獲取succ,并且查看是被被標(biāo)記
while (marked[0]) {//如果被標(biāo)記了痴晦,說明curr被邏輯刪除了南吮,需要繼續(xù)物理刪除
snip = pred.next.compareAndSet(curr, succ, false, false);//
if (!snip) continue retry;
curr = succ;
succ = curr.next.get(marked);
}
//當(dāng)不需要刪除后,才繼續(xù)遍歷
if (curr.key >= key) {
return new Window(pred, curr);
}
pred = curr;
curr = succ;
}
}
}
public boolean add(T item) {
int key = item.hashCode();
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = new AtomicMarkableReference<>(curr, false);
if (pred.next.compareAndSet(curr, node, false, false)) {
return true;
}
}
}
}
public boolean remove(T item) {
int key = item.hashCode();
boolean sinp;
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key != key) {
return false;
} else {
Node succ = curr.next.getReference();
//要進(jìn)行刪除了阅酪,那么就直接將curr.next設(shè)置為false,然后在進(jìn)行真正的物理刪除旨袒。
sinp = curr.next.compareAndSet(curr, succ, false, true);
if (!sinp) {
continue;
}
pred.next.compareAndSet(curr, succ, false, false);
return true;
}
}
}
class Node {
AtomicMarkableReference<Node> next;
}
后記
?文中的代碼在我的github的這個repo中都可以找到。