簡(jiǎn)介
無(wú)鎖隊(duì)列是lock-free中最基本的數(shù)據(jù)結(jié)構(gòu)劈愚,一般應(yīng)用場(chǎng)景是資源分配,比如TimerId的分配,WorkerId的分配猾蒂,上電內(nèi)存初始?jí)K數(shù)的申請(qǐng)等等。
對(duì)于多線程用戶來(lái)說(shuō)蚊逢,無(wú)鎖隊(duì)列的入隊(duì)和出隊(duì)操作是線程安全的烙荷,不用再加鎖控制。
API
ErrCode initQueue(void** queue, U32 unitSize, U32 maxUnitNum);
ErrCode enQueue(void* queue, void* unit);
ErrCode deQueue(void* queue, void* unit);
U32 getQueueSize(void* queue);
BOOL isQueueEmpty(void* queue);
initQueue
初始化隊(duì)列:根據(jù)unitSize和maxUnitNum申請(qǐng)內(nèi)存,并對(duì)內(nèi)存進(jìn)行初始化亩码。
enQueue
入隊(duì):從隊(duì)尾增加元素
dequeue
出隊(duì):從隊(duì)頭刪除元素
getQueueSize
獲取隊(duì)列大小:返回隊(duì)列中的元素?cái)?shù)
isQueueEmpty
隊(duì)列是否為空:true表示隊(duì)列為空吏廉,false表示隊(duì)列非空
API使用示例
我們以定時(shí)器Id的管理為例汹买,了解一下無(wú)鎖隊(duì)列主要API的使用生巡。
初始化:主線程調(diào)用
ErrCode ret = initQueue(&queue, sizeof(U32), MAX_TMCB_NUM);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("lock free init_queue error: %u\n", ret);
return;
}
for (U32 timerId = 0; timerId < MAX_TMCB_NUM; timerId++)
{
ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("lock free enqueue error: %u\n", ret);
return;
}
}
timerId分配:多線程調(diào)用
U32 timerId;
ErrCode ret = deQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("deQueue failed!");
return INVALID_TIMER_ID;
}
timerId回收:多線程調(diào)用
ErrCode ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("enQueue failed!");
}
核心實(shí)現(xiàn)
顯然须揣,隊(duì)列操作的核心實(shí)現(xiàn)為入隊(duì)和出隊(duì)操作遂庄。
入隊(duì)
入隊(duì)的關(guān)鍵點(diǎn)有下面幾點(diǎn):
- 通過(guò)寫(xiě)次數(shù)確保隊(duì)列元素?cái)?shù)小于最大元素?cái)?shù)
- 獲取next tail的位置
- 將新元素插入到隊(duì)尾
- 尾指針偏移
- 讀次數(shù)加1
最大元素?cái)?shù)校驗(yàn)
do
在入隊(duì)操作開(kāi)始凛澎,就判斷寫(xiě)次數(shù)是否超過(guò)隊(duì)列元素的最大值沫换,如果已超過(guò)垮兑,則反饋隊(duì)列已滿的錯(cuò)誤碼系枪,否則通過(guò)CAS操作將寫(xiě)次數(shù)加1。如果CAS操作失敗衬浑,說(shuō)明有多個(gè)線程同時(shí)判斷了寫(xiě)次數(shù)都小于隊(duì)列最大元素?cái)?shù),那么只有一個(gè)線程CAS操作成功,其他線程則需要重新做循環(huán)操作奠支。
獲取next tail的位置
do
{
do
{
nextTail = queueHead->nextTail;
} while (!__sync_bool_compare_and_swap(&queueHead->nextTail, nextTail, (nextTail + 1) % (queueHead->maxUnitNum + 1)));
unitHead = UNIT_HEAD(queue, nextTail);
} while (unitHead->hasUsed);
當(dāng)前next tail的位置就是即將入隊(duì)的元素的目標(biāo)位置叉抡,并通過(guò)CAS操作更新隊(duì)列頭中nextTail的值季春。如果CAS操作失敗,則說(shuō)明其他線程也正在進(jìn)行入隊(duì)操作宇攻,并比本線程快,則需要進(jìn)行重新嘗試仑最,從而更新next tail的值,確保該入隊(duì)元素的位置正確损敷。
然而事情并沒(méi)有這么簡(jiǎn)單!
由于多線程的搶占洋丐,導(dǎo)致隊(duì)列并不是按下標(biāo)大小依次鏈接起來(lái)的肪笋,所以要判斷一下next tail的位置是否正被占用墙懂。如果next tail的位置正被占用辞槐,則需要重新競(jìng)爭(zhēng)next tail掷漱,直到next tail的位置是空閑的。
將新元素插入到隊(duì)尾
初始化新元素:
unitHead->next = LIST_END;
memcpy(UNIT_DATA(queue, nextTail), unit, queueHead->unitSize);
插入到隊(duì)尾:
do
{
listTail = queueHead->listTail;
oldListTail = listTail;
unitHead = UNIT_HEAD(queue, listTail);
if ((++tryTimes) >= 3)
{
while (unitHead->next != LIST_END)
{
listTail = unitHead->next;
unitHead = UNIT_HEAD(queue, listTail);
}
}
} while (!__sync_bool_compare_and_swap(&unitHead->next, LIST_END, nextTail));
通過(guò)CAS操作判斷當(dāng)前指針是否到達(dá)隊(duì)尾,如果到達(dá)隊(duì)尾喳魏,則將新元素連接到隊(duì)尾元素之后(next),否則進(jìn)行追趕嗡害。
在這里霸妹,我們做了優(yōu)化台盯,當(dāng)CAS操作連續(xù)失敗3次后,那么就直接通過(guò)next的遞推找到隊(duì)尾,這樣比CAS操作的效率高很多。我們?cè)跍y(cè)試多線程的隊(duì)列操作時(shí)魂务,出現(xiàn)過(guò)一個(gè)線程插入到tail為400多的時(shí)候粘姜,已有線程插入到tail為1000多的場(chǎng)景豺裆。
尾指針偏移
do
{
__sync_val_compare_and_swap(&queueHead->listTail, oldListTail, nextTail);
oldListTail = queueHead->listTail;
unitHead = UNIT_HEAD(queue, oldListTail);
nextTail = unitHead->next;
} while (nextTail != LIST_END);
在多線程場(chǎng)景下,隊(duì)尾指針是動(dòng)態(tài)變化的次屠,當(dāng)前尾可能不是新尾了,所以通過(guò)CAS操作更新隊(duì)尾裸违。當(dāng)CAS操作失敗時(shí)供汛,說(shuō)明隊(duì)尾已經(jīng)被其他線程更新朱监,此時(shí)不能將nextTail賦值給隊(duì)尾。
讀次數(shù)加1
__sync_fetch_and_add(&queueHead->readCount, 1);
寫(xiě)次數(shù)加1是為了保證隊(duì)列元素的數(shù)不能超過(guò)最大元素?cái)?shù),而讀次數(shù)加1是為了確保不能從空隊(duì)列出隊(duì)。
出隊(duì)
出隊(duì)的關(guān)鍵點(diǎn)有下面幾點(diǎn):
- 通過(guò)讀次數(shù)確保不能從空隊(duì)列出隊(duì)
- 頭指針偏移
- dummy頭指針
- 寫(xiě)次數(shù)減1
空隊(duì)列校驗(yàn)
do
{
readCount = queueHead->readCount;
if (readCount == 0) return ERR_QUEUE_HAS_EMPTY;
} while (!__sync_bool_compare_and_swap(&queueHead->readCount, readCount, readCount - 1));
讀次數(shù)為0翘县,說(shuō)明隊(duì)列為空,否則通過(guò)CAS操作將讀次數(shù)減1也糊。如果CAS操作失敗炼蹦,說(shuō)明其他線程已更新讀次數(shù)成功,必須重試狸剃,直到成功掐隐。
頭指針偏移
U32 readCount;
do
{
listHead = queueHead->listHead;
unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));
如果CAS操作失敗,說(shuō)明隊(duì)頭指針已經(jīng)在其他線程的操作下進(jìn)行了偏移钞馁,所以要重試虑省,直到成功。
dummy頭指針
memcpy(unit, UNIT_DATA(queue, unitHead->next), queueHead->unitSize);
我們可以看出僧凰,頭元素為head->next探颈,這就是說(shuō)隊(duì)列的第一個(gè)元素都是基于head->next而不是head。
這樣設(shè)計(jì)是有原因的训措。
考慮一個(gè)邊界條件:在隊(duì)列只有一個(gè)元素條件下伪节,如果head和tail指針指向同一個(gè)結(jié)點(diǎn),這樣入隊(duì)操作和出隊(duì)操作本身就需要互斥了绩鸣。
通過(guò)引入一個(gè)dummy頭指針來(lái)解決這個(gè)問(wèn)題怀大,如下圖所示。
寫(xiě)次數(shù)減1
__sync_fetch_and_sub(&queueHead->writeCount, 1);
出隊(duì)操作結(jié)束前呀闻,要將寫(xiě)次數(shù)減1化借,以便入隊(duì)操作能成功。
無(wú)鎖隊(duì)列的ABA問(wèn)題分析
我們?cè)倏匆幌骂^指針偏移的代碼:
do
{
listHead = queueHead->listHead;
unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));
假設(shè)隊(duì)列中只有兩個(gè)元素A和B捡多,那么
- 線程T1 執(zhí)行出隊(duì)操作蓖康,當(dāng)執(zhí)行到while循環(huán)時(shí)被線程T2 搶占,線程T1 等待
- 線程T2 成功執(zhí)行了兩次出隊(duì)操作垒手,并free了A和B結(jié)點(diǎn)的內(nèi)存
- 線程T3 進(jìn)行入隊(duì)操作蒜焊,malloc的內(nèi)存地址和A相同,入隊(duì)操作成功
- 線程T1 重新獲得CPU淫奔,執(zhí)行while中的CAS操作山涡,發(fā)現(xiàn)A的地址沒(méi)有變,其實(shí)內(nèi)容已經(jīng)今非昔比了唆迁,而線程T1 并不知情鸭丛,繼續(xù)更新頭指針,使得頭指針指向B所在結(jié)點(diǎn)唐责,但是B所在結(jié)點(diǎn)的內(nèi)存早已釋放鳞溉。
這就是無(wú)鎖隊(duì)列的ABA問(wèn)題。
為解決ABA問(wèn)題鼠哥,我們可以采用具有原子性的內(nèi)存引用計(jì)數(shù)等辦法熟菲,而利用循環(huán)數(shù)組實(shí)現(xiàn)無(wú)鎖隊(duì)列也可以解決ABA問(wèn)題看政,因?yàn)檠h(huán)數(shù)組不涉及內(nèi)存的動(dòng)態(tài)分配和回收,在線程T2 成功執(zhí)行兩次出隊(duì)操作后抄罕,隊(duì)列的head指針已經(jīng)變化(指向到了下標(biāo)head+2)允蚣,線程T3 進(jìn)行入隊(duì)操作不會(huì)改變隊(duì)列的head指針,當(dāng)線程T1 重新獲得CPU進(jìn)行CAS操作時(shí)呆贿,會(huì)因失敗重新do while嚷兔,這時(shí)臨時(shí)head更新成了隊(duì)列head,所以規(guī)避了ABA問(wèn)題做入。
小結(jié)
我們?cè)谏弦黄?jiǎn)要介紹了無(wú)鎖編程基礎(chǔ)冒晰,這一篇通過(guò)深度剖析無(wú)鎖隊(duì)列,對(duì)CAS原子操作的使用有了感性的認(rèn)識(shí)竟块,我們了解了無(wú)鎖隊(duì)列的API和核心實(shí)現(xiàn)壶运,可以看出,要實(shí)現(xiàn)一個(gè)沒(méi)有問(wèn)題且高效的無(wú)鎖隊(duì)列是非常困難的浪秘,最后對(duì)無(wú)鎖隊(duì)列的ABA問(wèn)題進(jìn)行了實(shí)例化蒋情,筆者在無(wú)鎖隊(duì)列的編程實(shí)踐中通過(guò)循環(huán)數(shù)組的方法規(guī)避了ABA問(wèn)題。
你是否已感到有些燒腦耸携?
我們將在下一篇文章中深度剖析無(wú)鎖雙向鏈表恕出,是否會(huì)燒的更厲害?讓我們拭目以待违帆。