簡介
本文是本系列的最后一篇辟汰,討論兩個(gè)主題:關(guān)于實(shí)現(xiàn)基于互斥鎖的并發(fā)鏈表的設(shè)計(jì)方法和設(shè)計(jì)不使用互斥鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)。對于后一個(gè)主題,我選擇實(shí)現(xiàn)一個(gè)并發(fā)堆棧并解釋設(shè)計(jì)這種數(shù)據(jù)結(jié)構(gòu)涉及的一些問題。用 C++
設(shè)計(jì)獨(dú)立于平臺的不使用互斥鎖的數(shù)據(jù)結(jié)構(gòu)目前還不可行,所以我選用 GCC version 4.3.4 作為編譯器并在代碼中使用 GCC 特有的 _sync*
函數(shù)休偶。如果您是 WIndows? C++
開發(fā)人員,可以考慮使用 Interlocked* 系列函數(shù)實(shí)現(xiàn)相似的功能辜羊。
并發(fā)單向鏈表的設(shè)計(jì)方法
清單 1 給出最基本的并發(fā)單向鏈表接口踏兜。是不是缺少什么東西?
清單 1. 并發(fā)單向鏈表接口
template <typename T>
class SList {
private:
typedef struct Node {
T data;
Node *next;
Node(T& data) : value(data), next(NULL) { }
} Node;
pthread_mutex_t _lock;
Node *head, *tail;
public:
void push_back(T& value);
void insert_after(T& previous, T& value); // insert data after previous
void remove(const T& value);
bool find(const T& value); // return true on success
SList( );
~SList( );
};
清單 2 給出 push_back
方法定義八秃。
清單 2. 把數(shù)據(jù)添加到并發(fā)鏈表中
void SList<T>::push_back(T& data)
{
pthread_mutex_lock(&_lock);
if (head == NULL) {
head = new Node(data);
tail = head;
} else {
tail->next = new Node(data);
tail = tail->next;
}
pthread_mutex_unlock(&_lock);
}
現(xiàn)在碱妆,假設(shè)一個(gè)線程試圖通過調(diào)用 push_back
把 n 個(gè)整數(shù)連續(xù)地添加到這個(gè)鏈表中。這個(gè)接口本身要求獲取并釋放互斥鎖 n 次喜德,即使在第一次獲取鎖之前已經(jīng)知道要插入的所有數(shù)據(jù)山橄。更好的做法是定義另一個(gè)方法,它接收一系列整數(shù),只獲取并釋放互斥鎖一次航棱。清單 3 給出方法定義睡雇。
清單 3. 在鏈表中插入數(shù)據(jù)的更好方法
void SList<T>::push_back(T* data, int count) // or use C++ iterators
{
Node *begin = new Node(data[0]);
Node *temp = begin;
for (int i=1; i<count; ++i) {
temp->next = new Node(data[i]);
temp = temp->next;
}
pthread_mutex_lock(&_lock);
if (head == NULL) {
head = begin;
tail = head;
} else {
tail->next = begin;
tail = temp;
}
pthread_mutex_unlock(&_lock);
}
優(yōu)化搜索元素
現(xiàn)在,我們來優(yōu)化鏈表中的搜索元素 — 即 find
方法饮醇。下面是幾種可能出現(xiàn)的情況:
- 當(dāng)一些線程正在迭代鏈表時(shí)它抱,出現(xiàn)插入或刪除請求。
- 當(dāng)一些線程正在迭代鏈表時(shí)朴艰,出現(xiàn)迭代請求观蓄。
- 當(dāng)一些線程正在插入或刪除數(shù)據(jù)時(shí),出現(xiàn)迭代請求祠墅。
顯然侮穿,應(yīng)該能夠同時(shí)處理多個(gè)迭代請求。如果系統(tǒng)中的插入/刪除操作很少毁嗦,主要活動是搜索亲茅,那么基于單一鎖的方法性能會很差。在這種情況下狗准,應(yīng)該考慮使用讀寫鎖克锣,即 pthread_rwlock_t
。在本文的示例中腔长,將在 SList
中使用 pthread_rwlock_t
而不是 pthread_mutex_t
袭祟。這么做就允許多個(gè)線程同時(shí)搜索鏈表。插入和刪除操作仍然會鎖住整個(gè)鏈表捞附,這是合適的巾乳。清單 4 給出使用 pthread_rwlock_t
的鏈表實(shí)現(xiàn)。
清單 4. 使用讀寫鎖的并發(fā)單向鏈表
template <typename T>
class SList {
private:
typedef struct Node {
// … same as before
} Node;
pthread_rwlock_t _rwlock; // Not pthread_mutex_t any more!
Node *head, *tail;
public:
// … other API remain as-is
SList( ) : head(NULL), tail(NULL) {
pthread_rwlock_init(&_rwlock, NULL);
}
~SList( ) {
pthread_rwlock_destroy(&_rwlock);
// … now cleanup nodes
}
};
清單 5 給出鏈表搜索代碼故俐。
清單 5. 使用讀寫鎖搜索鏈表
bool SList<T>::find(const T& value)
{
pthread_rwlock_rdlock (&_rwlock);
Node* temp = head;
while (temp) {
if (temp->value == data) {
status = true;
break;
}
temp = temp->next;
}
pthread_rwlock_unlock(&_rwlock);
return status;
}
清單 6 給出使用讀寫鎖的 push_back
方法想鹰。
清單 6. 使用讀寫鎖在并發(fā)單向鏈表中添加數(shù)據(jù)
void SList<T>::push_back(T& data)
{
pthread_setschedprio(pthread_self( ), SCHED_FIFO);
pthread_rwlock_wrlock(&_rwlock);
// … All the code here is same as Listing 2
pthread_rwlock_unlock(&_rwlock);
}
我們來分析一下紊婉。這里使用兩個(gè)鎖定函數(shù)調(diào)用(pthread_rwlock_rdlock 和 pthread_rwlock_wrlock)管理同步药版,使用 pthread_setschedprio 調(diào)用設(shè)置寫線程的優(yōu)先級。如果沒有寫線程在這個(gè)鎖上阻塞(換句話說喻犁,沒有插入/刪除請求)槽片,那么多個(gè)請求鏈表搜索的讀線程可以同時(shí)操作,因?yàn)樵谶@種情況下一個(gè)讀線程不會阻塞另一個(gè)讀線程肢础。如果有寫線程等待這個(gè)鎖还栓,當(dāng)然不允許新的讀線程獲得鎖,寫線程等待传轰,到現(xiàn)有的讀線程完成操作時(shí)寫線程開始操作剩盒。如果不按這種方式使用 pthread_setschedprio 設(shè)置寫線程的優(yōu)先級,根據(jù)讀寫鎖的性質(zhì)慨蛙,很容易看出寫線程可能會餓死辽聊。
下面是對于這種方式要記住的幾點(diǎn):
- 如果超過了最大讀鎖數(shù)量(由實(shí)現(xiàn)定義)纪挎,pthread_rwlock_rdlock 可能會失敗。
- 如果有 n 個(gè)并發(fā)的讀鎖跟匆,一定要調(diào)用 pthread_rwlock_unlock n 次异袄。
允許并發(fā)插入
應(yīng)該了解的最后一個(gè)方法是 insert_after。同樣玛臂,預(yù)期的使用模式要求調(diào)整數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)烤蜕。如果一個(gè)應(yīng)用程序使用前面提供的鏈表,它執(zhí)行的插入和搜索操作數(shù)量差不多相同迹冤,但是刪除操作很少讽营,那么在插入期間鎖住整個(gè)鏈表是不合適的。在這種情況下泡徙,最好允許在鏈表中的分離點(diǎn)(disjoint point)上執(zhí)行并發(fā)插入斑匪,同樣使用基于讀寫鎖的方式。下面是構(gòu)造鏈表的方法:
在兩個(gè)級別上執(zhí)行鎖定(見 清單 7):鏈表有一個(gè)讀寫鎖锋勺,* 各個(gè)節(jié)點(diǎn)包含一個(gè)互斥鎖蚀瘸。如果想節(jié)省空間,可以考慮共享互斥鎖 — 可以維護(hù)節(jié)點(diǎn)與互斥鎖的映射庶橱。
- 在插入期間贮勃,寫線程在鏈表上建立讀鎖,然后繼續(xù)處理苏章。在插入數(shù)據(jù)之前寂嘉,鎖住要在其后添加新數(shù)據(jù)的節(jié)點(diǎn),插入之后釋放此節(jié)點(diǎn)枫绅,然后釋放讀寫鎖泉孩。
- 刪除操作在鏈表上建立寫鎖。不需要獲得與節(jié)點(diǎn)相關(guān)的鎖并淋。
- 與前面一樣寓搬,可以并發(fā)地執(zhí)行搜索。
清單 7. 使用兩級鎖定的并發(fā)單向鏈表
template <typename T>
class SList {
private:
typedef struct Node {
pthread_mutex_lock lock;
T data;
Node *next;
Node(T& data) : value(data), next(NULL) {
pthread_mutex_init(&lock, NULL);
}
~Node( ) {
pthread_mutex_destroy(&lock);
}
} Node;
pthread_rwlock_t _rwlock; // 2 level locking
Node *head, *tail;
public:
// … all external API remain as-is
}
};
清單 8 給出在鏈表中插入數(shù)據(jù)的代碼县耽。
清單 8. 使用雙重鎖定在鏈表中插入數(shù)據(jù)
void SList<T>:: insert_after(T& previous, T& value)
{
pthread_rwlock_rdlock (&_rwlock);
Node* temp = head;
while (temp) {
if (temp->value == previous) {
break;
}
temp = temp->next;
}
Node* newNode = new Node(value);
pthread_mutex_lock(&temp->lock);
newNode->next = temp->next;
temp->next = newNode;
pthread_mutex_unlock(&temp->lock);
pthread_rwlock_unlock(&_rwlock);
return status;
}
基于互斥鎖的方法的問題
到目前為止句喷,都是在數(shù)據(jù)結(jié)構(gòu)中使用一個(gè)或多個(gè)互斥鎖管理同步。但是兔毙,這種方法并非沒有問題唾琼。請考慮以下情況:
- 等待互斥鎖會消耗寶貴的時(shí)間 — 有時(shí)候是很多時(shí)間。這種延遲會損害系統(tǒng)的可伸縮性澎剥。
- 低優(yōu)先級的線程可以獲得互斥鎖锡溯,因此阻礙需要同一互斥鎖的高優(yōu)先級線程。這個(gè)問題稱為優(yōu)先級倒置(priority inversion ) (更多信息見 參考資料 中的鏈接)。
- 可能因?yàn)榉峙涞臅r(shí)間片結(jié)束祭饭,持有互斥鎖的線程被取消調(diào)度涌乳。這對于等待同一互斥鎖的其他線程有不利影響,因?yàn)榈却龝r(shí)間現(xiàn)在會更長甜癞。這個(gè)問題稱為鎖護(hù)送(lock convoying)(更多信息見 參考資料中的鏈接)夕晓。
互斥鎖的問題還不只這些。最近悠咱,出現(xiàn)了一些不使用互斥鎖的解決方案蒸辆。話雖如此,盡管使用互斥鎖需要謹(jǐn)慎析既,但是如果希望提高性能躬贡,肯定應(yīng)該研究互斥鎖。
比較并交換指令
在研究不使用互斥鎖的解決方案之前眼坏,先討論一下從 80486 開始在所有 Intel? 處理器上都支持的 CMPXCHG 匯編指令拂玻。清單 9 從概念角度說明這個(gè)指令的作用。
清單 9. 比較并交換指令的行為
int compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
int old_value = *memory_location;
if (old_value == expected_value)
*memory_location = new_value;
return old_value;
}
這里發(fā)生的操作是:指令檢查一個(gè)內(nèi)存位置是否包含預(yù)期的值宰译;如果是這樣檐蚜,就把新的值復(fù)制到這個(gè)位置。清單 10 提供匯編語言偽代碼沿侈。
清單 10. 比較并交換指令的匯編語言偽代碼
CMPXCHG OP1, OP2
if ({AL or AX or EAX} = OP1)
zero = 1 ;Set the zero flag in the flag register
OP1 = OP2
else
zero := 0 ;Clear the zero flag in the flag register
{AL or AX or EAX}= OP1
CPU 根據(jù)操作數(shù)的寬度(8闯第、16 或 32)選擇 AL、AX 或 EAX 寄存器缀拭。如果 AL/AX/EAX 寄存器的內(nèi)容與操作數(shù) 1 的內(nèi)容匹配咳短,就把操作數(shù) 2 的內(nèi)容復(fù)制到操作數(shù) 1;否則蛛淋,用操作數(shù) 2 的值更新 AL/AX/EAX 寄存器咙好。Intel Pentium? 64 位處理器有相似的指令 CMPXCHG8B,它支持 64 位的比較并交換褐荷。注意勾效,CMPXCHG 指令是原子性的,這意味著在這個(gè)指令結(jié)束之前沒有可見的中間狀態(tài)诚卸。它要么完整地執(zhí)行葵第,要么根本沒有開始。在其他平臺上有等效的指令合溺,例如 Motorola MC68030 處理器的 compare and swap (CAS) 指令有相似的語義。
我們?yōu)槭裁磳?CMPXCHG 感興趣缀台?這意味著要使用匯編語言編寫代碼嗎棠赛?
需要了解 CMPXCHG 和 CMPXCHG8B 等相關(guān)指令是因?yàn)樗鼈儤?gòu)成了無鎖解決方案的核心。但是,不必使用匯編語言編寫代碼睛约。GCC (GNU Compiler Collection鼎俘,4.1 和更高版本)提供幾個(gè)原子性的內(nèi)置函數(shù)(見 參考資料),可以使用它們?yōu)?x86 和 x86-64 平臺實(shí)現(xiàn) CAS 操作辩涝。實(shí)現(xiàn)這一支持不需要包含頭文件贸伐。在本文中,我們要在無鎖數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)中使用 GCC 內(nèi)置函數(shù)怔揩∽叫希看一下這些內(nèi)置函數(shù):
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
__sync_bool_compare_and_swap
內(nèi)置函數(shù)比較 oldval
和 ptr
。如果它們匹配商膊,就把 newval
復(fù)制到ptr
伏伐。如果 oldval
和 *ptr
匹配,返回值是 True晕拆,否則是 False藐翎。__sync_val_compare_and_swap
內(nèi)置函數(shù)的行為是相似的,只是它總是返回舊值实幕。清單 11 提供一個(gè)使用示例吝镣。
清單 11. GCC CAS 內(nèi)置函數(shù)的使用示例
#include <iostream>
using namespace std;
int main()
{
bool lock(false);
bool old_value = __sync_val_compare_and_swap( &lock, false, true);
cout >> lock >> endl; // prints 0x1
cout >> old_value >> endl; // prints 0x0
}
設(shè)計(jì)無鎖并發(fā)堆棧
既然了解了 CAS,現(xiàn)在就來設(shè)計(jì)一個(gè)并發(fā)堆棧昆庇。這個(gè)堆棧沒有鎖赤惊;這種無鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)也稱為非阻塞數(shù)據(jù)結(jié)構(gòu)。清單 12 給出代碼接口凰锡。
清單 12. 基于鏈表的非阻塞堆棧實(shí)現(xiàn)
template <typename T>
class Stack {
typedef struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(0) { }
} Node;
Node *top;
public:
Stack( ) : top(0) { }
void push(const T& data);
T pop( ) throw (…);
};
清單 13 給出壓入操作的定義未舟。
清單 13. 在非阻塞堆棧中壓入數(shù)據(jù)
void Stack<T>::push(const T& data)
{
Node *n = new Node(data);
while (1) {
n->next = top;
if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS
break;
}
}
}
壓入(Push)操作做了什么?從單一線程的角度來看掂为,創(chuàng)建了一個(gè)新節(jié)點(diǎn)裕膀,它的 next 指針指向堆棧的頂部。接下來勇哗,調(diào)用 CAS 內(nèi)置函數(shù)昼扛,把新的節(jié)點(diǎn)復(fù)制到 top 位置。
從多個(gè)線程的角度來看欲诺,完全可能有兩個(gè)或更多線程同時(shí)試圖把數(shù)據(jù)壓入堆棧抄谐。假設(shè)線程 A 試圖把 20 壓入堆棧,線程 B 試圖壓入 30扰法,而線程 A 先獲得了時(shí)間片蛹含。但是,在 n->next = top
指令結(jié)束之后塞颁,調(diào)度程序暫停了線程 A∑窒洌現(xiàn)在吸耿,線程 B 獲得了時(shí)間片(它很幸運(yùn)),它能夠完成 CAS酷窥,把 30 壓入堆棧后結(jié)束咽安。接下來,線程 A 恢復(fù)執(zhí)行蓬推,顯然對于這個(gè)線程 *top
和 n->next
不匹配妆棒,因?yàn)榫€程 B 修改了 top 位置的內(nèi)容。因此沸伏,代碼回到循環(huán)的開頭糕珊,指向正確的 top 指針(線程 B 修改后的),調(diào)用 CAS馋评,把 20 壓入堆棧后結(jié)束放接。整個(gè)過程沒有使用任何鎖。
彈出操作
清單 14 給出從堆棧彈出數(shù)據(jù)的代碼留特。
清單 14. 從非阻塞堆棧彈出數(shù)據(jù)
T Stack<T>::pop( )
{
if (top == NULL)
throw std::string(“Cannot pop from empty stack”);
while (1) {
Node* next = top->next;
if (__sync_bool_compare_and_swap(&top, top, next)) { // CAS
return top->data;
}
}
}
用與 push
相似的代碼定義彈出操作語義纠脾。堆棧的頂存儲在 result
中,使用 CAS 把 top 位置更新為top->next
并返回適當(dāng)?shù)臄?shù)據(jù)蜕青。如果恰在執(zhí)行 CAS 之前線程失去執(zhí)行權(quán)苟蹈,那么在線程恢復(fù)執(zhí)行之后,CAS 會失敗右核,繼續(xù)循環(huán)慧脱,直到有有效的數(shù)據(jù)可用為止。
結(jié)果好就一切都好
不幸的是贺喝,這種堆棧彈出實(shí)現(xiàn)有問題 — 包括明顯的問題和不太明顯的問題菱鸥。明顯的問題是 NULL 檢查必須放在 while
循環(huán)中。如果線程 P 和線程 Q 都試圖從只剩一個(gè)元素的堆棧彈出數(shù)據(jù)躏鱼,而線程 P 恰在執(zhí)行 CAS 之前失去執(zhí)行權(quán)氮采,那么當(dāng)它重新獲得執(zhí)行權(quán)時(shí),堆棧中已經(jīng)沒有可彈出的數(shù)據(jù)了染苛。因?yàn)?top
是 NULL鹊漠,訪問 &top
肯定會導(dǎo)致崩潰 — 這顯然是可以避免的 bug。這個(gè)問題也突顯了并發(fā)數(shù)據(jù)結(jié)構(gòu)的基本設(shè)計(jì)原則之一:決不要假設(shè)任何代碼會連續(xù)執(zhí)行茶行。
清單 15 給出解決了此問題的代碼躯概。
清單 15. 從非阻塞堆棧彈出數(shù)據(jù)
T Stack<T>::pop( )
{
while (1) {
if (top == NULL)
throw std::string(“Cannot pop from empty stack”);
Node* next = top->next;
if (top && __sync_bool_compare_and_swap(&top, top, next)) { // CAS
return top->data;
}
}
}
下一個(gè)問題比較復(fù)雜,但是如果您了解內(nèi)存管理程序的工作方式(更多信息見 參考資料 中的鏈接)畔师,應(yīng)該不太難理解娶靡。清單 16 展示這個(gè)問題。
清單 16. 內(nèi)存的回收利用會導(dǎo)致 CAS 出現(xiàn)嚴(yán)重的問題
T* ptr1 = new T(8, 18);
T* old = ptr1;
// .. do stuff with ptr1
delete ptr1;
T* ptr2 = new T(0, 1);
// We can't guarantee that the operating system will not recycle memory
// Custom memory managers recycle memory often
if (old1 == ptr2) {
…
}
在此代碼中,無法保證 old
和 ptr2
有不同的值宽气。根據(jù)操作系統(tǒng)和定制的應(yīng)用程序內(nèi)存管理系統(tǒng)的具體情況,完全可能回收利用已刪除的內(nèi)存 — 也就是說,刪除的內(nèi)存放在應(yīng)用程序?qū)S玫某刂邪稍谛枰獣r(shí)重用,而不返回給系統(tǒng)懂傀。這顯然會改進(jìn)性能趾诗,因?yàn)椴恍枰ㄟ^系統(tǒng)調(diào)用請求更多內(nèi)存。盡管在一般情況下這是有利的蹬蚁,但是對于非阻塞堆棧不好∈牙幔現(xiàn)在我們來看看這是為什么。
假設(shè)有兩個(gè)線程 — A 和 B犀斋。A 調(diào)用 pop
并恰在執(zhí)行 CAS 之前失去執(zhí)行權(quán)贝乎。然后 B 調(diào)用 pop
,然后壓入數(shù)據(jù)叽粹,數(shù)據(jù)的一部分來自從前面的彈出操作回收的內(nèi)存览效。清單 17 給出偽代碼。
清單 17. 序列圖
Thread A tries to pop
Stack Contents: 5 10 14 9 100 2
result = pointer to node containing 5
Thread A now de-scheduled
Thread B gains control
Stack Contents: 5 10 14 9 100 2
Thread B pops 5
Thread B pushes 8 16 24 of which 8 was from the same memory that earlier stored 5
Stack Contents: 8 16 24 10 14 9 100 2
Thread A gains control
At this time, result is still a valid pointer and *result = 8
But next points to 10, skipping 16 and 24!!!
糾正方法相當(dāng)簡單:不存儲下一個(gè)節(jié)點(diǎn)虫几。清單 18 給出代碼锤灿。
清單 18. 從非阻塞堆棧彈出數(shù)據(jù)
T Stack<T>::pop( )
{
while (1) {
Node* result = top;
if (result == NULL)
throw std::string(“Cannot pop from empty stack”);
if (top && __sync_bool_compare_and_swap(&top, result, result->next)) { // CAS
return top->data;
}
}
}
這樣,即使線程 B 在線程 A 試圖彈出數(shù)據(jù)的同時(shí)修改了堆棧頂辆脸,也可以確保不會跳過堆棧中的元素但校。
結(jié)束語
本系列討論了如何設(shè)計(jì)支持并發(fā)訪問的數(shù)據(jù)結(jié)構(gòu)。您已經(jīng)看到啡氢,設(shè)計(jì)可以基于互斥鎖状囱,也可以是無鎖的。無論采用哪種方式倘是,要考慮的問題不僅僅是這些數(shù)據(jù)結(jié)構(gòu)的基本功能 — 具體來說亭枷,必須一直記住線程會爭奪執(zhí)行權(quán),要考慮線程重新執(zhí)行時(shí)如何恢復(fù)操作辨绊。目前奶栖,解決方案(尤其是無鎖解決方案)與平臺/編譯器緊密相關(guān)。請研究用于實(shí)現(xiàn)線程和鎖的 Boost 庫门坷,閱讀 John Valois 關(guān)于無鎖鏈表的文章(見參考資料 中的鏈接)宣鄙。C++0x
標(biāo)準(zhǔn)提供了 std::thread
類,但是目前大多數(shù)編譯器對它的支持很有限默蚌,甚至不支持它冻晤。
轉(zhuǎn)自(https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html)