也叫緩存綁定問(wèn)題(bounded- buffer)东跪,是一個(gè)經(jīng)典的止喷、多進(jìn)程同步問(wèn)題呛凶。
單生產(chǎn)者和單消費(fèi)者
有兩個(gè)進(jìn)程:一組生產(chǎn)者進(jìn)程和一組消費(fèi)者進(jìn)程共享一個(gè)初始為空彪杉、固定大小為n的緩存(緩沖區(qū))谣光。生產(chǎn)者的工作是制造一段數(shù)據(jù)檩淋,只有緩沖區(qū)沒(méi)滿時(shí),生產(chǎn)者才能把消息放入到緩沖區(qū)萄金,否則必須等待蟀悦,如此反復(fù); 同時(shí),只有緩沖區(qū)不空時(shí)氧敢,消費(fèi)者才能從中取出消息日戈,一次消費(fèi)一段數(shù)據(jù)(即將其從緩存中移出),否則必須等待孙乖。由于緩沖區(qū)是臨界資源浙炼,它只允許一個(gè)生產(chǎn)者放入消息,或者一個(gè)消費(fèi)者從中取出消息唯袄。
問(wèn)題的核心是:
1.要保證不讓生產(chǎn)者在緩存還是滿的時(shí)候仍然要向內(nèi)寫(xiě)數(shù)據(jù);
2.不讓消費(fèi)者試圖從空的緩存中取出數(shù)據(jù)弯屈。
生產(chǎn)者和消費(fèi)者對(duì)緩沖區(qū)互斥訪問(wèn)是互斥關(guān)系,同時(shí)生產(chǎn)者和消費(fèi)者又是一個(gè)相互協(xié)作的關(guān)系恋拷,只有生產(chǎn)者生產(chǎn)之后资厉,消費(fèi)者才能消費(fèi),他們也是同步關(guān)系蔬顾。
- 解決思路:對(duì)于生產(chǎn)者宴偿,如果緩存是滿的就去睡覺(jué)。消費(fèi)者從緩存中取走數(shù)據(jù)后就叫醒生產(chǎn)者阎抒,讓它再次將緩存填滿酪我。若消費(fèi)者發(fā)現(xiàn)緩存是空的,就去睡覺(jué)了且叁。下一輪中生產(chǎn)者將數(shù)據(jù)寫(xiě)入后就叫醒消費(fèi)者都哭。
不完善的解決方案會(huì)造成“死鎖”,即兩個(gè)進(jìn)程都在“睡覺(jué)”等著對(duì)方來(lái)“喚醒”。
只有生產(chǎn)者和消費(fèi)者兩個(gè)進(jìn)程欺矫,正好是這兩個(gè)進(jìn)程存在著互斥關(guān)系和同步關(guān)系纱新。那么需要解決的是互斥和同步PV操作的位置。使用“進(jìn)程間通信”穆趴,“信號(hào)標(biāo)”semaphore就可以解決喚醒的問(wèn)題:
我們使用了兩個(gè)信號(hào)標(biāo):full 和 empty 脸爱。信號(hào)量mutex作為互斥信號(hào)量,它用于控制互斥訪問(wèn)緩沖池未妹,互斥信號(hào)量初值為 1簿废;信號(hào)量 full 用于記錄當(dāng)前緩沖池中“滿”緩沖區(qū)數(shù),初值為0络它。信號(hào)量 empty 用于記錄當(dāng)前緩沖池中“空”緩沖區(qū)數(shù)族檬,初值為n。新的數(shù)據(jù)添加到緩存中后化戳,full 在增加单料,而 empty 則減少。如果生產(chǎn)者試圖在 empty 為0時(shí)減少其值点楼,生產(chǎn)者就會(huì)被“催眠”扫尖。下一輪中有數(shù)據(jù)被消費(fèi)掉時(shí),empty就會(huì)增加掠廓,生產(chǎn)者就會(huì)被“喚醒”换怖。
偽代碼:
semaphore mutex=1; //臨界區(qū)互斥信號(hào)量
semaphore empty=n; //空閑緩沖區(qū)
semaphore full=0; //緩沖區(qū)初始化為空
producer ()//生產(chǎn)者進(jìn)程
{
while(1)
{
produce an item in nextp; //生產(chǎn)數(shù)據(jù)
P(empty); //獲取空緩沖區(qū)單元
P(mutex); //進(jìn)入臨界區(qū).
add nextp to buffer; //將數(shù)據(jù)放入緩沖區(qū)
V(mutex); //離開(kāi)臨界區(qū),釋放互斥信號(hào)量
V(full); //滿緩沖區(qū)數(shù)加1
}
}
consumer ()//消費(fèi)者進(jìn)程
{
while(1)
{
P(full); //獲取滿緩沖區(qū)單元
P(mutex); // 進(jìn)入臨界區(qū)
remove an item from buffer; //從緩沖區(qū)中取出數(shù)據(jù)
V (mutex); //離開(kāi)臨界區(qū),釋放互斥信號(hào)量
V (empty) ; //空緩沖區(qū)數(shù)加1
consume the item; //消費(fèi)數(shù)據(jù)
}
}
該類(lèi)問(wèn)題要注意對(duì)緩沖區(qū)大小為n的處理蟀瞧,當(dāng)緩沖區(qū)中有空時(shí)便可對(duì)empty變量執(zhí)行P 操作狰域,一旦取走一個(gè)產(chǎn)品便要執(zhí)行V操作以釋放空閑區(qū)。對(duì)empty和full變量的P操作必須放在對(duì)mutex的P操作之前黄橘。
1兆览、若生產(chǎn)者進(jìn)程已經(jīng)將緩沖區(qū)放滿,消費(fèi)者進(jìn)程并沒(méi)有取產(chǎn)品塞关,即 empty = 0抬探,當(dāng)下次仍然是生產(chǎn)者進(jìn)程運(yùn)行時(shí),它先執(zhí)行 P(mutex)封鎖信號(hào)量帆赢,再執(zhí)行 P(empty)時(shí)將被阻塞小压,希望消費(fèi)者取出產(chǎn)品后將其喚醒。輪到消費(fèi)者進(jìn)程運(yùn)行時(shí)椰于,它先執(zhí)行 P(mutex)怠益,然而由于生產(chǎn)者進(jìn)程已經(jīng)封鎖 mutex 信號(hào)量,消費(fèi)者進(jìn)程也會(huì)被阻塞瘾婿,這樣一來(lái)生產(chǎn)者進(jìn)程與消費(fèi)者進(jìn)程都
將阻塞蜻牢,都指望對(duì)方喚醒自己烤咧,陷入了無(wú)休止的等待。
2抢呆、若消費(fèi)者進(jìn)程已經(jīng)將緩沖區(qū)取空煮嫌,即 full = 0,下次如果還是消費(fèi)者先運(yùn)行,也會(huì)出現(xiàn)類(lèi)似的死鎖抱虐。
不過(guò)生產(chǎn)者釋放信號(hào)量時(shí)昌阿,mutex、full 先釋放哪一個(gè)無(wú)所謂恳邀,消費(fèi)者先釋放 mutex 還是 empty 都可以懦冰。
多生產(chǎn)者多消費(fèi)者
在多個(gè)制造商和多個(gè)消費(fèi)者出現(xiàn)的情況下就會(huì)造成擁護(hù)不堪的情況,會(huì)導(dǎo)致兩個(gè)或多個(gè)進(jìn)程同時(shí)向一個(gè)磁道寫(xiě)入或讀出數(shù)據(jù)谣沸。要理解這種情況是如何出現(xiàn)的儿奶,我們可以借助于putItemIntoBuffer()函數(shù)。它包含兩個(gè)動(dòng)作:一個(gè)來(lái)判斷是否有可用磁道鳄抒,另一個(gè)則用來(lái)向其寫(xiě)入數(shù)據(jù)。如果進(jìn)程可以由多個(gè)制造商并發(fā)執(zhí)行椰弊,下面的情況則會(huì)出現(xiàn):
1许溅、 兩個(gè)制造商為emptyCount減值;
2秉版、 一個(gè)制造商判斷緩存中有可用磁道贤重;
3、 第二個(gè)制造商與第一個(gè)制造商一樣判斷緩存中有可用磁道清焕;
4并蝗、 兩個(gè)制造商同時(shí)向同一個(gè)磁道寫(xiě)入數(shù)據(jù)。
多個(gè)生產(chǎn)者向一個(gè)緩沖區(qū)中存入數(shù)據(jù)秸妥,多個(gè)生產(chǎn)者從緩沖區(qū)中取數(shù)據(jù)滚停。這是有界緩沖區(qū)問(wèn)題,隊(duì)列改寫(xiě)粥惧,生產(chǎn)者們之間键畴、消費(fèi)者們之間、生產(chǎn)者消費(fèi)者之間互相互斥突雪。
共享緩沖區(qū)作為一個(gè)環(huán)繞緩沖區(qū)起惕,存數(shù)據(jù)到尾時(shí)再?gòu)念^開(kāi)始。
- 我們使用一個(gè)互斥量保護(hù)生產(chǎn)者向緩沖區(qū)中存入數(shù)據(jù)咏删。由于有多個(gè)生產(chǎn)者惹想,因此需要記住現(xiàn)在向緩沖區(qū)中存入的位置。
- 使用一個(gè)互斥量保護(hù)緩沖區(qū)中消息的數(shù)目督函,這個(gè)生產(chǎn)的數(shù)據(jù)數(shù)目作為生產(chǎn)者和消費(fèi)者溝通的橋梁嘀粱。
- 使用一個(gè)條件變量用于喚醒消費(fèi)者激挪。由于有多個(gè)消費(fèi)者,同樣消費(fèi)者也需要記住每次取的位置草穆。
在選項(xiàng)中選擇生產(chǎn)條目的數(shù)目灌灾,生產(chǎn)者的線程數(shù)目,消費(fèi)者的線程數(shù)目悲柱。生產(chǎn)者將條目數(shù)目循環(huán)放入緩沖區(qū)中锋喜,消費(fèi)者從緩沖區(qū)中循環(huán)取出并在屏幕上打印出來(lái)。
為了克服這個(gè)問(wèn)題豌鸡,我們需要一個(gè)方法嘿般,以確保一次只有一個(gè)制造商在執(zhí)行調(diào)用函數(shù)。換個(gè)說(shuō)法來(lái)講涯冠,我們需要一個(gè)有“互斥信號(hào)標(biāo)”(mutal exclusion)的“關(guān)鍵扇區(qū)”(critical section)炉奴。為了實(shí)現(xiàn)這一點(diǎn),我們使用一個(gè)叫mutex二位信號(hào)標(biāo)蛇更。因?yàn)橐粋€(gè)二位信號(hào)標(biāo)的值只能是1或0瞻赶,只有一個(gè)進(jìn)程能執(zhí)行down(mutex)或up(mutex)。
代碼:
#include "unp.h"
static const int NBUFF = 10000;
static const int MAXNTHREADS = 100;
static int nitems; //總共生產(chǎn)的條目數(shù)
static int buff[NBUFF]; //生產(chǎn)者向其中放數(shù)據(jù)派任,消費(fèi)者從中取數(shù)據(jù)
static struct put//生產(chǎn)者使用的結(jié)構(gòu)砸逊,向其中互斥的放數(shù)據(jù)
{
pthread_mutex_t mutex;
int nput; //net position to put
int nval; //next value to store
} put =
{
PTHREAD_MUTEX_INITIALIZER
};
//記錄緩沖區(qū)的狀態(tài),準(zhǔn)備好的數(shù)目掌逛,消費(fèi)者唯一關(guān)注的結(jié)構(gòu)师逸,當(dāng)然生產(chǎn)者也會(huì)使用
static struct nready
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int nget;
int nready; //number ready for consumer
} nready =
{
PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER
};//
void *produce(void*);
void *consume(void*);
int main(int argc, char **argv)
{
if (argc != 4)
{
err_quit("Usage: a.out <#items> <#produce_nthreads>
<#consume_nthreads>");
}
nitems = atoi(argv[1]);
int produce_nthreads = min(atoi(argv[2]), MAXNTHREADS);
int consume_nthreads = min(atoi(argv[3]), MAXNTHREADS);
//Solaris 2.6需要設(shè)置線程并發(fā)數(shù)
//Set_concurrency(nthreads + 1);
pthread_t tid_produce[MAXNTHREADS];
for (int i = 0; i < produce_nthreads; ++i)
{
Pthread_create(&tid_produce[i], NULL, produce, NULL);
}
pthread_t tid_consume[MAXNTHREADS];
for (int i = 0; i < consume_nthreads; ++i)
{
Pthread_create(&tid_consume[i], NULL, consume, NULL);
}
//等待線程終止
for (int i = 0; i < produce_nthreads; ++i)
{
Pthread_join(tid_produce[i], NULL);
}
for (int i = 0; i < consume_nthreads; ++i)
{
Pthread_join(tid_consume[i], NULL);
}
exit(0);
}
void *produce(void *arg)
{
printf("producd\n");
//多個(gè)生產(chǎn)者
for ( ; ; )
{
Pthread_mutex_lock(&put.mutex);
//已存了需要多的數(shù)
if (put.nval >= nitems)
{
Pthread_mutex_unlock(&put.mutex);
return NULL;
}
buff[put.nput] = put.nval;
if (++put.nput >= NBUFF)
{
put.nput = 0;
}
++put.nval;
Pthread_mutex_unlock(&put.mutex);
//當(dāng)生產(chǎn)了數(shù)據(jù)后通知條件變量,應(yīng)該使臨界區(qū)盡量短豆混,寧愿使用多個(gè)互斥量
Pthread_mutex_lock(&nready.mutex);
if (nready.nready == 0)
{
Pthread_cond_signal(&nready.cond);
}
++nready.nready;
Pthread_mutex_unlock(&nready.mutex);
} //end for(;;)
return NULL;
}
void *consume(void *argv)
{
printf("consume\n");
//多個(gè)消費(fèi)者
//只生產(chǎn)nitems個(gè)選項(xiàng)
for ( ; ; )
{
Pthread_mutex_lock(&nready.mutex);
//while避免虛假喚醒
while (nready.nready == 0)
{
Pthread_cond_wait(&nready.cond, &nready.mutex);
}
//int ival = buff[nready.nget];
//if (++nready.nget == NBUFF) {
// nready.nget = 0;
//}
if (++nready.nget >= nitems)
{
//nget比較的取值為1..nitems篓像,當(dāng)為nitems時(shí)少操作了一次,總共操作nitems次
if (nready.nget == nitems)
{
printf("buff[%d] = %d\n", nready.nget - 1,
buff[(nready.nget - 1) % NBUFF]);
}
Pthread_cond_signal(&nready.cond);
Pthread_mutex_unlock(&nready.mutex);
return NULL;
}
--nready.nready;
Pthread_mutex_unlock(&nready.mutex);
//僅僅讀數(shù)據(jù)不許要互斥
//if (buff[nready.nget - 1] != nready.nget - 1)
{
printf("buff[%d] = %d\n", nready.nget - 1,
buff[(nready.nget - 1) % NBUFF]);
//printf("buff[%d] = %d\n", nready.nget,
buff[nready.nget]);
//}
} //end for(i:0..nitems)
return NULL;
}
參考資料:
5個(gè)經(jīng)典的同步問(wèn)題
經(jīng)典進(jìn)程同步問(wèn)題1:生產(chǎn)者-消費(fèi)者問(wèn)題
多個(gè)生產(chǎn)者——多個(gè)消費(fèi)者模型(互斥量條件變量實(shí)現(xiàn))
生產(chǎn)者消費(fèi)者模型你知道多少
秒殺多線程第十篇 生產(chǎn)者消費(fèi)者問(wèn)題
C++11 并發(fā)指南九(綜合運(yùn)用: C++11 多線程下四種生產(chǎn)者消費(fèi)者模型詳解)