C++基于消息隊(duì)列的線程池實(shí)現(xiàn)

實(shí)現(xiàn)消息隊(duì)列的關(guān)鍵因素是考量不同線程訪問消息隊(duì)列的同步問題吏垮。本實(shí)現(xiàn)涉及到幾個(gè)知識(shí)點(diǎn)

std::lock_guard 介紹

std::lock_gurad 是 C++11 中定義的模板類。定義如下:
template <class Mutex> class lock_guard;
lock_guard 對(duì)象通常用于管理某個(gè)鎖(Lock)對(duì)象罐旗,因此與 Mutex RAII 相關(guān)膳汪,方便線程對(duì)互斥量上鎖,即在某個(gè) lock_guard 對(duì)象的聲明周期內(nèi)九秀,它所管理的鎖對(duì)象會(huì)一直保持上鎖狀態(tài)遗嗽;而 lock_guard 的生命周期結(jié)束之后,它所管理的鎖對(duì)象會(huì)被解鎖(注:類似 shared_ptr 等智能指針管理動(dòng)態(tài)分配的內(nèi)存資源 )鼓蜒。
模板參數(shù) Mutex 代表互斥量類型痹换,例如 std::mutex 類型,它應(yīng)該是一個(gè)基本的 BasicLockable 類型都弹,標(biāo)準(zhǔn)庫(kù)中定義幾種基本的 BasicLockable 類型娇豫,分別 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock

std::unique_lock 介紹

lock_guard 最大的缺點(diǎn)也是簡(jiǎn)單畅厢,沒有給程序員提供足夠的靈活度冯痢,因此,C++11 標(biāo)準(zhǔn)中定義了另外一個(gè)與 Mutex RAII 相關(guān)類 unique_lock,該類與 lock_guard 類相似系羞,也很方便線程對(duì)互斥量上鎖郭计,但它提供了更好的上鎖和解鎖控制。
顧名思義椒振,unique_lock 對(duì)象以獨(dú)占所有權(quán)的方式( unique owership)管理 mutex 對(duì)象的上鎖和解鎖操作昭伸,所謂獨(dú)占所有權(quán),就是沒有其他的 unique_lock 對(duì)象同時(shí)擁有某個(gè) mutex 對(duì)象的所有權(quán)澎迎。
新創(chuàng)建的 unique_lock 對(duì)象管理 Mutex 對(duì)象 m庐杨,并嘗試調(diào)用 m.lock() 對(duì) Mutex 對(duì)象進(jìn)行上鎖,如果此時(shí)另外某個(gè) unique_lock 對(duì)象已經(jīng)管理了該 Mutex 對(duì)象 m夹供,則當(dāng)前線程將會(huì)被阻塞灵份。

std::condition介紹

當(dāng) std::condition_variable 對(duì)象的某個(gè) wait 函數(shù)被調(diào)用的時(shí)候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當(dāng)前線程哮洽。當(dāng)前線程會(huì)一直被阻塞填渠,直到另外一個(gè)線程在相同的 std::condition_variable 對(duì)象上調(diào)用了 notification 函數(shù)來喚醒當(dāng)前線程。
std::condition_variable 提供了兩種 wait() 函數(shù)鸟辅。當(dāng)前線程調(diào)用 wait() 后將被阻塞(此時(shí)當(dāng)前線程應(yīng)該獲得了鎖(mutex)氛什,不妨設(shè)獲得鎖 lck),直到另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程匪凉。
在線程被阻塞時(shí)枪眉,該函數(shù)會(huì)自動(dòng)調(diào)用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競(jìng)爭(zhēng)上的線程得以繼續(xù)執(zhí)行再层。另外贸铜,一旦當(dāng)前線程獲得通知(notified,通常是另外某個(gè)線程調(diào)用 notify_* 喚醒了當(dāng)前線程)聂受,wait() 函數(shù)也是自動(dòng)調(diào)用 lck.lock()蒿秦,使得 lck 的狀態(tài)和 wait 函數(shù)被調(diào)用時(shí)相同。
在第二種情況下(即設(shè)置了 Predicate)饺饭,只有當(dāng) pred 條件為 false 時(shí)調(diào)用 wait() 才會(huì)阻塞當(dāng)前線程渤早,并且在收到其他線程的通知后只有當(dāng) pred 為 true 時(shí)才會(huì)被解除阻塞职车。因此第二種情況類似以下代碼:
while (!pred()) wait(lck);

std::function介紹

使用std::function可以將普通函數(shù)瘫俊,lambda表達(dá)式和函數(shù)對(duì)象類統(tǒng)一起來。它們并不是相同的類型悴灵,然而通過function模板類扛芽,可以轉(zhuǎn)化為相同類型的對(duì)象(function對(duì)象),從而放入一個(gè)vector或其他容器里积瞒,方便回調(diào)川尖。

代碼實(shí)現(xiàn):

#pragma once

#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>

template<class Type>
class CMessageQueue
{
public:
    CMessageQueue& operator = (const CMessageQueue&) = delete;
    CMessageQueue(const CMessageQueue& mq) = delete;


    CMessageQueue() :_queue(), _mutex(), _condition(){}
    virtual ~CMessageQueue(){}

    void Push(Type msg){
        std::lock_guard <std::mutex> lock(_mutex);
        _queue.push(msg);
         //當(dāng)使用阻塞模式從消息隊(duì)列中獲取消息時(shí),由condition在新消息到達(dá)時(shí)提醒等待線程
        _condition.notify_one();
    }
        //blocked定義訪問方式是同步阻塞或者非阻塞模式
    bool Pop(Type& msg, bool isBlocked = true){
        if (isBlocked)
        {
            std::unique_lock <std::mutex> lock(_mutex);
            while (_queue.empty())
            {
                _condition.wait(lock);
                
            }
            //注意這一段必須放在if語句中茫孔,因?yàn)閘ock的生命域僅僅在if大括號(hào)內(nèi)
            msg = std::move(_queue.front());
            _queue.pop();
            return true;
            
        }
        else
        {
            std::lock_guard<std::mutex> lock(_mutex);
            if (_queue.empty())
                return false;


            msg = std::move(_queue.front());
            _queue.pop();
            return true;
        }

    }

    int32_t Size(){
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.size();
    }

    bool Empty(){
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }
private:
    std::queue<Type> _queue;//存儲(chǔ)消息的隊(duì)列
    mutable std::mutex _mutex;//同步鎖
    std::condition_variable _condition;//實(shí)現(xiàn)同步式獲取消息
};

#endif//MESSAGE_QUEUE_H

線程池可以直接在構(gòu)造函數(shù)中構(gòu)造線程叮喳,并傳入回調(diào)函數(shù)被芳,也可以寫一個(gè)Run函數(shù)顯示調(diào)用。這里我們選擇了第二種馍悟,對(duì)比:

  1. 在handler函數(shù)外部做循環(huán)接受消息畔濒,當(dāng)消息到達(dá)后調(diào)用hanlder處理。這種實(shí)現(xiàn)在上層做封裝锣咒,但是會(huì)在線程中頻繁的切換調(diào)用函數(shù)侵状。這種設(shè)計(jì)無法復(fù)用一些資源,如當(dāng)在handler中做數(shù)據(jù)庫(kù)操作時(shí)毅整,需要頻繁的連接和斷開連接趣兄,可以通過定義兩個(gè)虛函數(shù)Prehandler和AfterHandler來實(shí)現(xiàn)。
    5考怠M丁!構(gòu)造函數(shù)中調(diào)用虛函數(shù)并不會(huì)能真正的調(diào)用子類的實(shí)現(xiàn)O访铩1┣!
    雖然可以對(duì)虛函數(shù)進(jìn)行實(shí)調(diào)用辛臊,但程序員編寫虛函數(shù)的本意應(yīng)該是實(shí)現(xiàn)動(dòng)態(tài)聯(lián)編仙粱。在構(gòu)造函數(shù)中調(diào)用虛函數(shù),函數(shù)的入口地址是在編譯時(shí)靜態(tài)確定的彻舰,并未實(shí)現(xiàn)虛調(diào)用

  2. 寫一個(gè)Run函數(shù)伐割,將這一部分實(shí)現(xiàn)放在run函數(shù)中,顯示調(diào)用刃唤。
    《Effective C++ 》條款9:永遠(yuǎn)不要在構(gòu)造函數(shù)或析構(gòu)函數(shù)中調(diào)用虛函數(shù)

#ifndef THREAD_POOL_H
#define THREAD_POOL_H


#include <functional>
#include <vector>
#include <thread>

#include "MessageQueue.h"

#define MIN_THREADS 1

template<class Type>
class CThreadPool
{
    CThreadPool& operator = (const CThreadPool&) = delete;
    CThreadPool(const CThreadPool& other) = delete;

public:
    CThreadPool(int32_t threads, 
       std::function<void(Type& record, CThreadPool<Type>* pSub)> handler);
    virtual ~CThreadPool();

    void Run();
    virtual void PreHandler(){}
    virtual void AfterHandler(){}
    void Submit(Type record);


private:
    bool _shutdown;
    int32_t _threads;
    std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler;
    std::vector<std::thread> _workers;
    CMessageQueue<Type> _tasks;

};



template<class Type>
CThreadPool<Type>::CThreadPool(int32_t threads, 
    std::function<void(Type& record,  CThreadPool<Type>* pSub)> handler)
    :_shutdown(false),
    _threads(threads),
    _handler(handler),
    _workers(),
    _tasks()
{

    //第一種實(shí)現(xiàn)方案隔心,注意這里的虛函數(shù)調(diào)用不正確
    /*if (_threads < MIN_THREADS)
        _threads = MIN_THREADS;
    for (int32_t i = 0; i < _threads; i++)
    {
        
        _workers.emplace_back(
            [this]{
            PreHandler();
            while (!_shutdown){
                Type record;
                _tasks.Pop(record, true);
                _handler(record, this);
            }
            AfterHandler();
        }
        );
    }*/

}

//第二種實(shí)現(xiàn)方案
template<class Type>
void CThreadPool<Type>::Run()
{
    if (_threads < MIN_THREADS)
        _threads = MIN_THREADS;
    for (int32_t i = 0; i < _threads; i++)
    {
        _workers.emplace_back(
            [this]{
            PreHandler();
            while (!_shutdown){
                Type record;
                _tasks.Pop(record, true);
                _handler(record, this);
            }
            AfterHandler();
        }
        );
    }
}




template<class Type>
CThreadPool<Type>::~CThreadPool()
{
    for (std::thread& worker : _workers)
        worker.join();
}


template<class Type>
void CThreadPool<Type>::Submit(Type record)
{
    _tasks.Push(record);
}
#endif // !THREAD_POOL_H
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市尚胞,隨后出現(xiàn)的幾起案子硬霍,更是在濱河造成了極大的恐慌,老刑警劉巖笼裳,帶你破解...
    沈念sama閱讀 219,589評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唯卖,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡躬柬,警方通過查閱死者的電腦和手機(jī)拜轨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來允青,“玉大人橄碾,你說我怎么就攤上這事。” “怎么了法牲?”我有些...
    開封第一講書人閱讀 165,933評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵史汗,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我拒垃,道長(zhǎng)淹办,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評(píng)論 1 295
  • 正文 為了忘掉前任恶复,我火速辦了婚禮怜森,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谤牡。我一直安慰自己副硅,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評(píng)論 6 393
  • 文/花漫 我一把揭開白布翅萤。 她就那樣靜靜地躺著恐疲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪套么。 梳的紋絲不亂的頭發(fā)上培己,一...
    開封第一講書人閱讀 51,775評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音胚泌,去河邊找鬼省咨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛玷室,可吹牛的內(nèi)容都是我干的零蓉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼穷缤,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼敌蜂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起津肛,我...
    開封第一講書人閱讀 39,359評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤章喉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后身坐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秸脱,經(jīng)...
    沈念sama閱讀 45,854評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評(píng)論 3 338
  • 正文 我和宋清朗相戀三年掀亥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了撞反。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妥色。...
    茶點(diǎn)故事閱讀 40,146評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡搪花,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情撮竿,我是刑警寧澤吮便,帶...
    沈念sama閱讀 35,826評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站幢踏,受9級(jí)特大地震影響髓需,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜房蝉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評(píng)論 3 331
  • 文/蒙蒙 一僚匆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧搭幻,春花似錦咧擂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至俯逾,卻和暖如春贸桶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背桌肴。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工皇筛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人坠七。 一個(gè)月前我還...
    沈念sama閱讀 48,420評(píng)論 3 373
  • 正文 我出身青樓设联,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親灼捂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子离例,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評(píng)論 2 356

推薦閱讀更多精彩內(nèi)容

  • 接著上上節(jié) thread ,本節(jié)主要介紹mutex的內(nèi)容悉稠,練習(xí)代碼地址宫蛆。<mutex>:該頭文件主要聲明了與互斥量...
    jorion閱讀 12,502評(píng)論 2 4
  • 接著上節(jié) atomic,本節(jié)主要介紹condition_varible的內(nèi)容的猛,練習(xí)代碼地址耀盗。本文參考http://...
    jorion閱讀 8,495評(píng)論 0 7
  • <condition_variable > 頭文件主要包含了與條件變量相關(guān)的類和函數(shù)。相關(guān)的類包括 std::co...
    張霸天閱讀 3,775評(píng)論 1 0
  • 本文根據(jù)眾多互聯(lián)網(wǎng)博客內(nèi)容整理后形成卦尊,引用內(nèi)容的版權(quán)歸原始作者所有叛拷,僅限于學(xué)習(xí)研究使用,不得用于任何商業(yè)用途岂却。 互...
    深紅的眼眸閱讀 1,103評(píng)論 0 0
  • 參考cplusplus參考cppreference 1.mutex 用于保護(hù)臨界區(qū)(critical sectio...
    王偵閱讀 4,276評(píng)論 0 0