入隊列
EnQueue(x) //進(jìn)隊列
{
//準(zhǔn)備新加入的結(jié)點數(shù)據(jù)
q = new record();
q->value = x;
q->next = NULL;
do {
p = tail; //取鏈表尾指針的快照
} while( CAS(p->next, NULL, q) != TRUE); //如果沒有把結(jié)點鏈在尾指針上风罩,再試
CAS(tail, p, q); //置尾結(jié)點
}
我們可以看到,程序中的那個 do- while 的 Re-Try-Loop舵稠。就是說超升,很有可能我在準(zhǔn)備在隊列尾加入結(jié)點時,別的線程已經(jīng)加成功了哺徊,于是tail指針就變了廓俭,于是我的CAS返回了false,于是程序再試唉工,直到試成功為止。這個很像我們的搶電話熱線的不停重播的情況汹忠。
這里有一個潛在的問題——如果T1線程在用CAS更新tail指針的之前淋硝,線程停掉或是掛掉了,那么其它線程就進(jìn)入死循環(huán)了宽菜。下面是改良版的EnQueue()
EnQueue(x) //進(jìn)隊列改良版
{
q = new record();
q->value = x;
q->next = NULL;
p = tail;
oldp = p
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p->next, NULL, q) != TRUE); //如果沒有把結(jié)點鏈在尾上谣膳,再試
CAS(tail, oldp, q); //置尾結(jié)點
}
DeQueue() //出隊列
{
do{
p = head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
while( CAS(head, p, p->next) != TRUE );
return p->next->value;
}
ABA問題
所謂ABA(見維基百科的ABA詞條),問題基本是這個樣子:
- 1 進(jìn)程P1在共享變量中讀到值為A
- 2 P1被搶占了铅乡,進(jìn)程P2執(zhí)行
- 3 P2把共享變量里的值從A改成了B继谚,再改回到A,此時被P1搶占阵幸。
- 4 P1回來看到共享變量里的值沒有被改變花履,于是繼續(xù)執(zhí)行。
雖然P1以為變量值沒有改變挚赊,繼續(xù)執(zhí)行了诡壁,但是這個會引發(fā)一些潛在的問題。ABA問題最容易發(fā)生在lock free 的算法中的荠割,CAS首當(dāng)其沖妹卿,因為CAS判斷的是指針的地址旺矾。如果這個地址被重用了呢,問題就很大了夺克。(地址被重用是很經(jīng)常發(fā)生的箕宙,一個內(nèi)存分配后釋放了,再分配铺纽,很有可能還是原來的地址)
比如上述的DeQueue()函數(shù)柬帕,因為我們要讓head和tail分開,所以我們引入了一個dummy指針給head室囊,當(dāng)我們做CAS的之前雕崩,如果head的那塊內(nèi)存被回收并被重用了,而重用的內(nèi)存又被EnQueue()進(jìn)來了融撞,這會有很大的問題盼铁。(內(nèi)存管理中重用內(nèi)存基本上是一種很常見的行為)
簡單的實現(xiàn)
下面實現(xiàn)一個簡單的無鎖的隊列,這個隊列使用鏈表數(shù)據(jù)結(jié)構(gòu)尝偎,
并且沒有考慮ABA問題饶火。
/*lock_free.h*/
#ifndef _LOCK_FREE_H_
#define _LOCK_FREE_H_
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
typedef struct node_s node_t;
struct node_s {
node_t *next;
void *data;
};
node_t *create_queue();
node_t *enqueue(void * d);
void* dequeue();
#endif
/*lock_free.c*/
#include "lock_free.h"
node_t *head = NULL;
node_t *tail = NULL;
node_t *create_queue()
{
if (head != NULL)
return head;
head = (node_t*)malloc(sizeof(node_t));
if (!head) {
fprintf(stderr, "malloc error\n");
return NULL;
}
head->next = NULL;
head->data = NULL;
tail = head;
return head;
}
node_t* enqueue(void *d)
{
node_t *p = (node_t*)malloc(sizeof(node_t));
if (!p) {
fprintf(stderr, "malloc error\n");
return NULL;
}
p->next = NULL;
p->data = d;
node_t *q;
do {
q = tail;
} while (!__sync_bool_compare_and_swap(&(q->next), NULL, p));
__sync_bool_compare_and_swap(&tail, q, p);
return p;
}
void* dequeue()
{
node_t *p;
void *res;
do {
p = head;
if (p->next == NULL)
return NULL;
res = p->next->data;
} while(!__sync_bool_compare_and_swap(&head, p, p->next));
/*
在釋放頭節(jié)點之前保存返回的結(jié)果,
如果沒有保存這個值致扯,當(dāng)有線程1都更新了head值后并且還沒有返回肤寝,
此時另一個線程卻刪除這個值,會導(dǎo)致線程1返回的結(jié)果為一個無效指針
*/
if (p) {
free(p);
p = NULL;
}
return res;
}
/*main.c*/
#include "lock_free.h"
#include <pthread.h>
#include <string.h>
void *entry(void *data);
int main()
{
if (!create_queue()) {
fprintf(stderr, "create queue error\n");
exit(1);
}
int i;
pthread_t pid;
for (i = 0; i < 4; i ++) {
if (0 != pthread_create(&pid, NULL, entry, NULL)) {
perror("pthread create error\n");
}
}
getchar();
return 0;
}
void *entry(void *data)
{
char str[64] = {0};
char *dst = NULL;
sprintf(str, "%lu", pthread_self());
int i;
for (i = 0; i < 4; i ++) {
dst = (char*)malloc(128);
memset(dst, 0, 128);
sprintf(dst, "%s-%c", str, i + 65);
if (!enqueue((void *)(dst))) {
fprintf(stderr, "enqueue error\n");
break;
}
}
void *res = dequeue();
for (; res ;) {
fprintf(stdout, "%s\n", (char*)(res));
free(res);
res = NULL;
res = dequeue();
}
return NULL;
}