C++: 多線程同步機(jī)制

C++生產(chǎn)者消費(fèi)者

  • 基于鎖和條件變量的同步機(jī)制
  • 基于pipe的同步機(jī)制

基于變量的同步機(jī)制挟裂,在面對(duì)select/poll等場(chǎng)景時(shí),無(wú)法做到loop線程的喚醒遣鼓。因此刻撒,面對(duì)讀寫事件時(shí)骨田,需要使用pipe來(lái)進(jìn)行同步。

  1. 基于鎖和條件變量的同步機(jī)制
#include <iostream>
#include <mutex>
#include <thread>
#include <memory>
#include <vector>
#include <string>

class context{

public:
    virtual int run()=0;
};

class TaskManager{
private:
    std::mutex mtx;
    std::condition_variable cond;

    std::thread m_handle;

    std::thread::id thread_id;
    bool stopped;
    std::string thread_name;

private:
    virtual void entry();
    std::vector<std::shared_ptr<context>> task;

    void _process(std::vector<std::shared_ptr<context>>&);

public:
    void create(const std::string& name){
        thread_name = name;
        m_handle = std::thread(&TaskManager::entry,this);
        thread_id = m_handle.get_id();
    }

    void join(){
        if (m_handle.joinable())
            m_handle.join();
    }
    void detach(){
        m_handle.detach();
    }
    int add_task(std::shared_ptr<context>);
    void stop(){
        std::lock_guard<std::mutex> lock{mtx};
        stopped = true;
        cond.notify_all();
    }
};

void TaskManager::entry(){
    std::unique_lock<std::mutex> lock{mtx};
    while (!stopped || !task.empty()){
        cond.wait(lock,[this](){return (!task.empty() || stopped);});
        std::vector<std::shared_ptr<context>> tmp;
        tmp.swap(task);
        lock.unlock();
        _process(tmp);
        lock.lock();
    }
}

void TaskManager::_process(std::vector<std::shared_ptr<context>>& tsk){
    for (const auto& tk:tsk){
        tk->run();
    }
}

int TaskManager::add_task(std::shared_ptr<context> ts){
    if (stopped)
        return -1;
    {
        std::lock_guard<std::mutex> lock{mtx};
        task.push_back(ts);
        cond.notify_all();
    }
    return 0;
}


class test: public context{
private:
    std::string name;
public:
    test(const std::string& name):name(name){}
    int run() override {
        std::cout<<"Task Run: "<<name<<std::endl;
        return 0;
    }
 };

int main(int argc,char* argv[]){
    std::shared_ptr<TaskManager> tm = std::make_shared<TaskManager>();
    tm->create("TaskManager");
    tm->add_task(std::make_shared<test>("TaskA"));
    tm->add_task(std::make_shared<test>("TaskB"));
    tm->add_task(std::make_shared<test>(std::string("TaskC")));
    tm->stop();
    tm->join();
}
  1. 基于pipe的同步機(jī)制
    基于鎖和條件變量的同步機(jī)制声怔,無(wú)法適用于讀寫事件的同步态贤,如select/poll/epoll。主線程在select中等待時(shí)醋火,則需要其它喚醒方式悠汽。
  • 可以使用pipe進(jìn)行同步
  • 將pipe的讀端加入select監(jiān)聽
  • 需要喚醒時(shí),對(duì)pipe寫端進(jìn)行寫入芥驳,此時(shí)喚醒等待線程

實(shí)現(xiàn)如下柿冲,注冊(cè)監(jiān)聽一組fd,利用管道實(shí)現(xiàn)對(duì)監(jiān)聽fd的添加和刪除:

#include <iostream>
#include <mutex>
#include <thread>
#include <memory>
#include <map>
#include <string>
#include <functional>
#include <array>
#include <string_view>
#include <algorithm>

#include <fcntl.h>
#include <unistd.h>
#include <sys/select.h>

int pipe_ocloexc(int pipefd[2]){
     /*
     ** pipe init
     */
    if (pipe2(pipefd, O_CLOEXEC) < 0){
        return -1;
    }   
    if (::fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0){
         ::close(pipefd[1]);
         ::close(pipefd[0]);
         return -1;
    }
    return 0;
}

struct io_handler{
    io_handler(const std::string user):user(user){
        int r = pipe_ocloexc(pipefd);
        if (r < 0){
            exit(-1);
        }
    }
    std::string user;
    int pipefd[2];
    std::function<int(void*)> callback;
};

int read(int fd){
    std::array<char, 256> buf;
    int ret = ::read(fd, (void *)buf.data(), buf.size());
    if (ret < 0) { 
        return -1;
    }
    std::cout<<"Read Data: "<<std::string_view(buf.data(),ret)<<std::endl;
    return 0;
}

int process(void* arg){
    io_handler* ptr = (io_handler*)arg;
    //std::cout<<"IO: "<<ptr->user<<std::endl;
    int ret = read(ptr->pipefd[0]);
    if (ret < 0 )
        return -1;
    return 0;
}

class TaskManager{
private:
    std::mutex mtx;
    int thread_pipe[2];

    std::thread m_handle;

    std::thread::id thread_id;
    std::string thread_name;

    bool stopped;

private:
    virtual void entry();
    using mio = std::map<int,std::shared_ptr<io_handler>>;
    mio io_callback;

    int signal_thread();
    int clear_signal(int);

public:
    int create(const std::string& name);

    void join();
    void detach();
    int register_fd(std::shared_ptr<io_handler>);
    void unregister_fd(std::shared_ptr<io_handler>);
    void stop();
    virtual ~TaskManager(){
        ::close(thread_pipe[1]);
        ::close(thread_pipe[0]);
    }
};

int TaskManager::create(const std::string& name){
    thread_name = name;

    if (pipe_ocloexc(thread_pipe)<0)
        return -1;

    m_handle = std::thread(&TaskManager::entry,this);
    thread_id = m_handle.get_id();
    return 0;
}

int TaskManager::clear_signal(int fd)
{
    std::array<char, 256> buf;
    int ret = ::read(fd, (void *)buf.data(), buf.size());
    if (ret < 0) { 
        return -1;
    }
    return 0;
}

void TaskManager::join(){
    if (m_handle.joinable())
        m_handle.join();
}

void TaskManager::detach(){
    m_handle.detach();
}

void TaskManager::stop(){
    if (stopped)
        return;
    stopped = true;
    signal_thread();
}

void TaskManager::entry(){
    fd_set fdread;
    fd_set fdwrite;
    fd_set fdexcep;
    while (!stopped || !io_callback.empty()){
        int maxfd = -1;
        FD_ZERO(&fdread);
        FD_ZERO(&fdwrite);
        FD_ZERO(&fdexcep);

        FD_SET(thread_pipe[0],&fdread);
        maxfd = thread_pipe[0]+1;

        std::unique_lock<std::mutex> lock{mtx};
        for (const auto& io:io_callback){
            FD_SET(io.first,&fdread);
            if (io.first >= maxfd){
                maxfd = io.first+1;
            }
        }
        lock.unlock();
        int ret = select(maxfd,&fdread,&fdwrite,&fdexcep,NULL);
        if ( ret < 0)
            return;
        lock.lock();
        for (const auto& io:io_callback){
            if (FD_ISSET(io.first,&fdread)){
                //clear_signal(io.first);
                (io.second)->callback(io.second.get());
            }
        }
        if (FD_ISSET(thread_pipe[0],&fdread)){
            clear_signal(thread_pipe[0]);
        }
        lock.unlock();
    }
}

int TaskManager::signal_thread(){
    uint32_t buf = 0;
    int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
    if (ret < 0) {
        return -1;
    }
    return 0;
}

int TaskManager::register_fd(std::shared_ptr<io_handler> handler){
    if (stopped)
        return -1;
    {
        std::lock_guard<std::mutex> lock{mtx};
        io_callback.insert(std::pair<int,std::shared_ptr<io_handler>>(handler->pipefd[0],handler));
        signal_thread();
    }
    return 0;
}

void TaskManager::unregister_fd(std::shared_ptr<io_handler> io){
    {
        std::lock_guard<std::mutex> lock{mtx};
        io_callback.erase(io->pipefd[0]);
        signal_thread();
    }
}

int test(std::shared_ptr<io_handler> io){
    std::array<char,256> buf;
    std::copy(io->user.cbegin(),io->user.cend(),buf.data());
    int ret = write(io->pipefd[1], (void *)&buf, io->user.size());
    if (ret < 0) {
        return -1;
    }
    return 0;
}

int main(int argc,char* argv[]){
    std::shared_ptr<TaskManager> tm = std::make_shared<TaskManager>();
    tm->create("TaskManager");
    std::shared_ptr<io_handler> io1 = std::make_shared<io_handler>("io_1");
    io1->callback = process;
    std::shared_ptr<io_handler> io2 = std::make_shared<io_handler>("io_2");
    io2->callback = process;
    std::shared_ptr<io_handler> io3 = std::make_shared<io_handler>("io_3");
    io3->callback = process;
    tm->register_fd(io1);
    tm->register_fd(io2);
    tm->register_fd(io3);

    test(io1);
    test(io3);
    test(io2);

    sleep(3);
    
    tm->unregister_fd(io1);
    tm->unregister_fd(io2);
    tm->unregister_fd(io3);
    tm->stop();
    tm->join();
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末晚树,一起剝皮案震驚了整個(gè)濱河市姻采,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌爵憎,老刑警劉巖慨亲,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異宝鼓,居然都是意外死亡刑棵,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門愚铡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蛉签,“玉大人,你說(shuō)我怎么就攤上這事沥寥“幔” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵邑雅,是天一觀的道長(zhǎng)片橡。 經(jīng)常有香客問(wèn)我,道長(zhǎng)淮野,這世上最難降的妖魔是什么捧书? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任吹泡,我火速辦了婚禮,結(jié)果婚禮上经瓷,老公的妹妹穿的比我還像新娘爆哑。我一直安慰自己,他們只是感情好舆吮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布揭朝。 她就那樣靜靜地躺著,像睡著了一般色冀。 火紅的嫁衣襯著肌膚如雪萝勤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天呐伞,我揣著相機(jī)與錄音,去河邊找鬼慎式。 笑死伶氢,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的瘪吏。 我是一名探鬼主播癣防,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼掌眠!你這毒婦竟也來(lái)了蕾盯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蓝丙,失蹤者是張志新(化名)和其女友劉穎级遭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渺尘,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡挫鸽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鸥跟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片丢郊。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖医咨,靈堂內(nèi)的尸體忽然破棺而出枫匾,到底是詐尸還是另有隱情,我是刑警寧澤拟淮,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布干茉,位于F島的核電站,受9級(jí)特大地震影響惩歉,放射性物質(zhì)發(fā)生泄漏等脂。R本人自食惡果不足惜俏蛮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望上遥。 院中可真熱鬧搏屑,春花似錦、人聲如沸粉楚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)模软。三九已至伟骨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間燃异,已是汗流浹背携狭。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留回俐,地道東北人逛腿。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像仅颇,于是被迫代替她去往敵國(guó)和親单默。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355