五個(gè)同步問(wèn)題的經(jīng)典模型之一:生產(chǎn)者/消費(fèi)者問(wèn)題

也叫緩存綁定問(wèn)題(bounded- buffer)东跪,是一個(gè)經(jīng)典的止喷、多進(jìn)程同步問(wèn)題呛凶。

單生產(chǎn)者和單消費(fèi)者

有兩個(gè)進(jìn)程:一組生產(chǎn)者進(jìn)程和一組消費(fèi)者進(jìn)程共享一個(gè)初始為空彪杉、固定大小為n的緩存(緩沖區(qū))谣光。生產(chǎn)者的工作是制造一段數(shù)據(jù)檩淋,只有緩沖區(qū)沒(méi)滿時(shí),生產(chǎn)者才能把消息放入到緩沖區(qū)萄金,否則必須等待蟀悦,如此反復(fù); 同時(shí),只有緩沖區(qū)不空時(shí)氧敢,消費(fèi)者才能從中取出消息日戈,一次消費(fèi)一段數(shù)據(jù)(即將其從緩存中移出),否則必須等待孙乖。由于緩沖區(qū)是臨界資源浙炼,它只允許一個(gè)生產(chǎn)者放入消息,或者一個(gè)消費(fèi)者從中取出消息唯袄。

問(wèn)題的核心是:
1.要保證不讓生產(chǎn)者在緩存還是滿的時(shí)候仍然要向內(nèi)寫(xiě)數(shù)據(jù);
2.不讓消費(fèi)者試圖從空的緩存中取出數(shù)據(jù)弯屈。

生產(chǎn)者和消費(fèi)者對(duì)緩沖區(qū)互斥訪問(wèn)是互斥關(guān)系,同時(shí)生產(chǎn)者和消費(fèi)者又是一個(gè)相互協(xié)作的關(guān)系恋拷,只有生產(chǎn)者生產(chǎn)之后资厉,消費(fèi)者才能消費(fèi),他們也是同步關(guān)系蔬顾。

  • 解決思路:對(duì)于生產(chǎn)者宴偿,如果緩存是滿的就去睡覺(jué)。消費(fèi)者從緩存中取走數(shù)據(jù)后就叫醒生產(chǎn)者阎抒,讓它再次將緩存填滿酪我。若消費(fèi)者發(fā)現(xiàn)緩存是空的,就去睡覺(jué)了且叁。下一輪中生產(chǎn)者將數(shù)據(jù)寫(xiě)入后就叫醒消費(fèi)者都哭。
    不完善的解決方案會(huì)造成“死鎖”,即兩個(gè)進(jìn)程都在“睡覺(jué)”等著對(duì)方來(lái)“喚醒”。

只有生產(chǎn)者和消費(fèi)者兩個(gè)進(jìn)程欺矫,正好是這兩個(gè)進(jìn)程存在著互斥關(guān)系和同步關(guān)系纱新。那么需要解決的是互斥和同步PV操作的位置。使用“進(jìn)程間通信”穆趴,“信號(hào)標(biāo)”semaphore就可以解決喚醒的問(wèn)題:

我們使用了兩個(gè)信號(hào)標(biāo):full 和 empty 脸爱。信號(hào)量mutex作為互斥信號(hào)量,它用于控制互斥訪問(wèn)緩沖池未妹,互斥信號(hào)量初值為 1簿废;信號(hào)量 full 用于記錄當(dāng)前緩沖池中“滿”緩沖區(qū)數(shù),初值為0络它。信號(hào)量 empty 用于記錄當(dāng)前緩沖池中“空”緩沖區(qū)數(shù)族檬,初值為n。新的數(shù)據(jù)添加到緩存中后化戳,full 在增加单料,而 empty 則減少。如果生產(chǎn)者試圖在 empty 為0時(shí)減少其值点楼,生產(chǎn)者就會(huì)被“催眠”扫尖。下一輪中有數(shù)據(jù)被消費(fèi)掉時(shí),empty就會(huì)增加掠廓,生產(chǎn)者就會(huì)被“喚醒”换怖。

偽代碼:

semaphore mutex=1; //臨界區(qū)互斥信號(hào)量
semaphore empty=n;  //空閑緩沖區(qū)
semaphore full=0;  //緩沖區(qū)初始化為空
producer ()//生產(chǎn)者進(jìn)程 
{
    while(1)
    {
        produce an item in nextp;  //生產(chǎn)數(shù)據(jù)
        P(empty);  //獲取空緩沖區(qū)單元
        P(mutex);  //進(jìn)入臨界區(qū).
        add nextp to buffer;  //將數(shù)據(jù)放入緩沖區(qū)
        V(mutex);  //離開(kāi)臨界區(qū),釋放互斥信號(hào)量
        V(full);  //滿緩沖區(qū)數(shù)加1
    }
}

consumer ()//消費(fèi)者進(jìn)程
{
    while(1)
    {
        P(full);  //獲取滿緩沖區(qū)單元
        P(mutex);  // 進(jìn)入臨界區(qū)
        remove an item from buffer;  //從緩沖區(qū)中取出數(shù)據(jù)
        V (mutex);  //離開(kāi)臨界區(qū),釋放互斥信號(hào)量
        V (empty) ;  //空緩沖區(qū)數(shù)加1
        consume the item;  //消費(fèi)數(shù)據(jù)
    }
}

該類(lèi)問(wèn)題要注意對(duì)緩沖區(qū)大小為n的處理蟀瞧,當(dāng)緩沖區(qū)中有空時(shí)便可對(duì)empty變量執(zhí)行P 操作狰域,一旦取走一個(gè)產(chǎn)品便要執(zhí)行V操作以釋放空閑區(qū)。對(duì)empty和full變量的P操作必須放在對(duì)mutex的P操作之前黄橘。

1兆览、若生產(chǎn)者進(jìn)程已經(jīng)將緩沖區(qū)放滿,消費(fèi)者進(jìn)程并沒(méi)有取產(chǎn)品塞关,即 empty = 0抬探,當(dāng)下次仍然是生產(chǎn)者進(jìn)程運(yùn)行時(shí),它先執(zhí)行 P(mutex)封鎖信號(hào)量帆赢,再執(zhí)行 P(empty)時(shí)將被阻塞小压,希望消費(fèi)者取出產(chǎn)品后將其喚醒。輪到消費(fèi)者進(jìn)程運(yùn)行時(shí)椰于,它先執(zhí)行 P(mutex)怠益,然而由于生產(chǎn)者進(jìn)程已經(jīng)封鎖 mutex 信號(hào)量,消費(fèi)者進(jìn)程也會(huì)被阻塞瘾婿,這樣一來(lái)生產(chǎn)者進(jìn)程與消費(fèi)者進(jìn)程都
將阻塞蜻牢,都指望對(duì)方喚醒自己烤咧,陷入了無(wú)休止的等待。
2抢呆、若消費(fèi)者進(jìn)程已經(jīng)將緩沖區(qū)取空煮嫌,即 full = 0,下次如果還是消費(fèi)者先運(yùn)行,也會(huì)出現(xiàn)類(lèi)似的死鎖抱虐。
不過(guò)生產(chǎn)者釋放信號(hào)量時(shí)昌阿,mutex、full 先釋放哪一個(gè)無(wú)所謂恳邀,消費(fèi)者先釋放 mutex 還是 empty 都可以懦冰。

多生產(chǎn)者多消費(fèi)者

在多個(gè)制造商和多個(gè)消費(fèi)者出現(xiàn)的情況下就會(huì)造成擁護(hù)不堪的情況,會(huì)導(dǎo)致兩個(gè)或多個(gè)進(jìn)程同時(shí)向一個(gè)磁道寫(xiě)入或讀出數(shù)據(jù)谣沸。要理解這種情況是如何出現(xiàn)的儿奶,我們可以借助于putItemIntoBuffer()函數(shù)。它包含兩個(gè)動(dòng)作:一個(gè)來(lái)判斷是否有可用磁道鳄抒,另一個(gè)則用來(lái)向其寫(xiě)入數(shù)據(jù)。如果進(jìn)程可以由多個(gè)制造商并發(fā)執(zhí)行椰弊,下面的情況則會(huì)出現(xiàn):
1许溅、 兩個(gè)制造商為emptyCount減值;
2秉版、 一個(gè)制造商判斷緩存中有可用磁道贤重;
3、 第二個(gè)制造商與第一個(gè)制造商一樣判斷緩存中有可用磁道清焕;
4并蝗、 兩個(gè)制造商同時(shí)向同一個(gè)磁道寫(xiě)入數(shù)據(jù)。

多個(gè)生產(chǎn)者向一個(gè)緩沖區(qū)中存入數(shù)據(jù)秸妥,多個(gè)生產(chǎn)者從緩沖區(qū)中取數(shù)據(jù)滚停。這是有界緩沖區(qū)問(wèn)題,隊(duì)列改寫(xiě)粥惧,生產(chǎn)者們之間键畴、消費(fèi)者們之間、生產(chǎn)者消費(fèi)者之間互相互斥突雪。
共享緩沖區(qū)作為一個(gè)環(huán)繞緩沖區(qū)起惕,存數(shù)據(jù)到尾時(shí)再?gòu)念^開(kāi)始。

  • 我們使用一個(gè)互斥量保護(hù)生產(chǎn)者向緩沖區(qū)中存入數(shù)據(jù)咏删。由于有多個(gè)生產(chǎn)者惹想,因此需要記住現(xiàn)在向緩沖區(qū)中存入的位置。
  • 使用一個(gè)互斥量保護(hù)緩沖區(qū)中消息的數(shù)目督函,這個(gè)生產(chǎn)的數(shù)據(jù)數(shù)目作為生產(chǎn)者和消費(fèi)者溝通的橋梁嘀粱。
  • 使用一個(gè)條件變量用于喚醒消費(fèi)者激挪。由于有多個(gè)消費(fèi)者,同樣消費(fèi)者也需要記住每次取的位置草穆。

在選項(xiàng)中選擇生產(chǎn)條目的數(shù)目灌灾,生產(chǎn)者的線程數(shù)目,消費(fèi)者的線程數(shù)目悲柱。生產(chǎn)者將條目數(shù)目循環(huán)放入緩沖區(qū)中锋喜,消費(fèi)者從緩沖區(qū)中循環(huán)取出并在屏幕上打印出來(lái)。
為了克服這個(gè)問(wèn)題豌鸡,我們需要一個(gè)方法嘿般,以確保一次只有一個(gè)制造商在執(zhí)行調(diào)用函數(shù)。換個(gè)說(shuō)法來(lái)講涯冠,我們需要一個(gè)有“互斥信號(hào)標(biāo)”(mutal exclusion)的“關(guān)鍵扇區(qū)”(critical section)炉奴。為了實(shí)現(xiàn)這一點(diǎn),我們使用一個(gè)叫mutex二位信號(hào)標(biāo)蛇更。因?yàn)橐粋€(gè)二位信號(hào)標(biāo)的值只能是1或0瞻赶,只有一個(gè)進(jìn)程能執(zhí)行down(mutex)或up(mutex)。

代碼:

#include "unp.h"
static const int NBUFF         = 10000;
static const int MAXNTHREADS = 100;
static int nitems;      //總共生產(chǎn)的條目數(shù)
static int buff[NBUFF]; //生產(chǎn)者向其中放數(shù)據(jù)派任,消費(fèi)者從中取數(shù)據(jù)

static struct put//生產(chǎn)者使用的結(jié)構(gòu)砸逊,向其中互斥的放數(shù)據(jù)
{
    pthread_mutex_t mutex;
    int nput;            //net position to put
    int nval;            //next value to store
} put  = 
{
PTHREAD_MUTEX_INITIALIZER
};

//記錄緩沖區(qū)的狀態(tài),準(zhǔn)備好的數(shù)目掌逛,消費(fèi)者唯一關(guān)注的結(jié)構(gòu)师逸,當(dāng)然生產(chǎn)者也會(huì)使用
static struct nready
{
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int nget;
    int nready;            //number ready for consumer
} nready = 
{
PTHREAD_MUTEX_INITIALIZER,  PTHREAD_COND_INITIALIZER
};//

void *produce(void*);
void *consume(void*);

int main(int argc, char **argv)
{
    if (argc != 4)
    {
        err_quit("Usage: a.out <#items> <#produce_nthreads>
                  <#consume_nthreads>");
    }
    nitems = atoi(argv[1]);
    int produce_nthreads = min(atoi(argv[2]), MAXNTHREADS);
    int consume_nthreads = min(atoi(argv[3]), MAXNTHREADS);

    //Solaris 2.6需要設(shè)置線程并發(fā)數(shù)
    //Set_concurrency(nthreads + 1);

    pthread_t tid_produce[MAXNTHREADS];
    for (int i = 0; i < produce_nthreads; ++i) 
    {
        Pthread_create(&tid_produce[i], NULL, produce, NULL);
    }
    pthread_t tid_consume[MAXNTHREADS];
    for (int i = 0; i < consume_nthreads; ++i)
    {
        Pthread_create(&tid_consume[i], NULL, consume, NULL);
    }

    //等待線程終止
    for (int i = 0; i < produce_nthreads; ++i)
    {
        Pthread_join(tid_produce[i], NULL);
    }
    for (int i = 0; i < consume_nthreads; ++i)
    {
        Pthread_join(tid_consume[i], NULL);
    }

    exit(0);
}

void *produce(void *arg)
{
    printf("producd\n");

    //多個(gè)生產(chǎn)者
    for ( ; ; )
    {
        Pthread_mutex_lock(&put.mutex);
        //已存了需要多的數(shù)
        if (put.nval >= nitems)
        {
            Pthread_mutex_unlock(&put.mutex);
            return NULL;
        }
        buff[put.nput] = put.nval;
        if (++put.nput >= NBUFF)
        {
            put.nput = 0;
        }
        ++put.nval;
        Pthread_mutex_unlock(&put.mutex);

        //當(dāng)生產(chǎn)了數(shù)據(jù)后通知條件變量,應(yīng)該使臨界區(qū)盡量短豆混,寧愿使用多個(gè)互斥量
        Pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0)
        {
            Pthread_cond_signal(&nready.cond);
        }
        ++nready.nready;
        Pthread_mutex_unlock(&nready.mutex);
    } //end for(;;)
    return NULL;
}

void *consume(void *argv)
{
    printf("consume\n");

    //多個(gè)消費(fèi)者
    //只生產(chǎn)nitems個(gè)選項(xiàng)
    for ( ; ; )
    {
        Pthread_mutex_lock(&nready.mutex);
        //while避免虛假喚醒
        while (nready.nready == 0)
        {
            Pthread_cond_wait(&nready.cond, &nready.mutex);
        }
        //int ival = buff[nready.nget];
        //if (++nready.nget == NBUFF) {
        //    nready.nget = 0;
        //}
        if (++nready.nget >= nitems)
        {
       //nget比較的取值為1..nitems篓像,當(dāng)為nitems時(shí)少操作了一次,總共操作nitems次
            if (nready.nget == nitems)
            {
                printf("buff[%d] = %d\n", nready.nget - 1, 
                        buff[(nready.nget - 1) % NBUFF]);
            }
            Pthread_cond_signal(&nready.cond);
            Pthread_mutex_unlock(&nready.mutex);
            return NULL;
        }
        --nready.nready;
        Pthread_mutex_unlock(&nready.mutex);

        //僅僅讀數(shù)據(jù)不許要互斥
        //if (buff[nready.nget - 1] != nready.nget - 1)
        {
            printf("buff[%d] = %d\n", nready.nget - 1, 
                    buff[(nready.nget - 1) % NBUFF]);
            //printf("buff[%d] = %d\n", nready.nget, 
                      buff[nready.nget]);
       //}
    } //end for(i:0..nitems)
    return NULL;
}

參考資料:
5個(gè)經(jīng)典的同步問(wèn)題
經(jīng)典進(jìn)程同步問(wèn)題1:生產(chǎn)者-消費(fèi)者問(wèn)題
多個(gè)生產(chǎn)者——多個(gè)消費(fèi)者模型(互斥量條件變量實(shí)現(xiàn))
生產(chǎn)者消費(fèi)者模型你知道多少
秒殺多線程第十篇 生產(chǎn)者消費(fèi)者問(wèn)題
C++11 并發(fā)指南九(綜合運(yùn)用: C++11 多線程下四種生產(chǎn)者消費(fèi)者模型詳解)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末皿伺,一起剝皮案震驚了整個(gè)濱河市员辩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鸵鸥,老刑警劉巖屈暗,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異脂男,居然都是意外死亡养叛,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)宰翅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)弃甥,“玉大人,你說(shuō)我怎么就攤上這事汁讼∠ィ” “怎么了阔墩?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瓶珊。 經(jīng)常有香客問(wèn)我啸箫,道長(zhǎng),這世上最難降的妖魔是什么伞芹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任忘苛,我火速辦了婚禮,結(jié)果婚禮上唱较,老公的妹妹穿的比我還像新娘扎唾。我一直安慰自己,他們只是感情好南缓,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布胸遇。 她就那樣靜靜地躺著,像睡著了一般汉形。 火紅的嫁衣襯著肌膚如雪纸镊。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,457評(píng)論 1 311
  • 那天概疆,我揣著相機(jī)與錄音逗威,去河邊找鬼。 笑死届案,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的罢艾。 我是一名探鬼主播楣颠,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼咐蚯!你這毒婦竟也來(lái)了童漩?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤春锋,失蹤者是張志新(化名)和其女友劉穎矫膨,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體期奔,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡侧馅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了呐萌。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片馁痴。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖肺孤,靈堂內(nèi)的尸體忽然破棺而出罗晕,到底是詐尸還是另有隱情济欢,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布小渊,位于F島的核電站法褥,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏酬屉。R本人自食惡果不足惜半等,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望梆惯。 院中可真熱鬧酱鸭,春花似錦、人聲如沸垛吗。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)怯屉。三九已至蔚舀,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間锨络,已是汗流浹背赌躺。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留羡儿,地道東北人礼患。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像掠归,于是被迫代替她去往敵國(guó)和親缅叠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容