生產(chǎn)者消費(fèi)者C++代碼

生產(chǎn)者消費(fèi)者

生產(chǎn)者消費(fèi)者是一種常見(jiàn)的并發(fā)編程模型宿崭,用于解決多線(xiàn)程或多進(jìn)程之間的數(shù)據(jù)共享和協(xié)作問(wèn)題具钥。在該模型中污它,生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù)或任務(wù),并將其放入一個(gè)共享的緩沖區(qū)中驼鞭,而消費(fèi)者則負(fù)責(zé)從緩沖區(qū)中獲取數(shù)據(jù)或任務(wù)并進(jìn)行處理秦驯。

生產(chǎn)者和消費(fèi)者之間通過(guò)共享的緩沖區(qū)進(jìn)行通信,這個(gè)緩沖區(qū)可以是一個(gè)隊(duì)列挣棕、一個(gè)緩存區(qū)或者其他數(shù)據(jù)結(jié)構(gòu)译隘。生產(chǎn)者將數(shù)據(jù)放入緩沖區(qū)后亲桥,消費(fèi)者可以從緩沖區(qū)中取出數(shù)據(jù)進(jìn)行處理。如果緩沖區(qū)為空固耘,消費(fèi)者就需要等待题篷,直到有新的數(shù)據(jù)被生產(chǎn)者放入緩沖區(qū)。同樣地厅目,如果緩沖區(qū)已滿(mǎn)番枚,生產(chǎn)者就需要等待,直到有空閑的空間可以放入新的數(shù)據(jù)损敷。

這種模型的一個(gè)重要特點(diǎn)是解耦了生產(chǎn)者和消費(fèi)者之間的耦合關(guān)系葫笼。生產(chǎn)者和消費(fèi)者可以獨(dú)立地進(jìn)行操作,不需要彼此直接通信拗馒,而通過(guò)共享的緩沖區(qū)進(jìn)行間接通信路星。這種解耦可以提高系統(tǒng)的靈活性和可擴(kuò)展性,并允許生產(chǎn)者和消費(fèi)者的處理速度不一致诱桂。

生產(chǎn)者消費(fèi)者模型可以應(yīng)用于多種場(chǎng)景奥额,例如生產(chǎn)者可以是一個(gè)數(shù)據(jù)生成器,而消費(fèi)者可以是一個(gè)數(shù)據(jù)處理器访诱;生產(chǎn)者可以是一個(gè)任務(wù)調(diào)度器,而消費(fèi)者可以是執(zhí)行任務(wù)的線(xiàn)程或進(jìn)程等韩肝。通過(guò)合理設(shè)計(jì)和實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型触菜,可以有效地控制并發(fā)訪(fǎng)問(wèn)共享資源,避免競(jìng)態(tài)條件和資源爭(zhēng)用問(wèn)題哀峻,提高系統(tǒng)的性能和可靠性

代碼解析

對(duì)源代碼做了部分修改涡相、刪減、添加注釋

#ifndef __CPM_HPP__
#define __CPM_HPP__

// Comsumer Producer Model

#include <algorithm>
#include <condition_variable>
#include <future>
#include <memory>
#include <queue>
#include <thread>

namespace cpm {

// 三個(gè)模板分別為結(jié)果類(lèi)型剩蟀、輸入類(lèi)型催蝗、處理數(shù)據(jù)的模型類(lèi)型
// 在深度學(xué)習(xí)模型識(shí)別中,Model可以為模型識(shí)別類(lèi)的結(jié)果或指針
template <typename Result, typename Input, typename Model>
class Instance {
 protected:
  struct Item {
    Input input;
    std::shared_ptr<std::promise<Result>> pro;
  };
  // 每次放入隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
  // input 存儲(chǔ)需要處理的數(shù)據(jù)
  // pro 存儲(chǔ)對(duì)數(shù)據(jù)的處理結(jié)果
 
   // 條件變量
  std::condition_variable cond_;
  // 緩沖隊(duì)列育特,所有commit的數(shù)據(jù)都存儲(chǔ)在這個(gè)隊(duì)列中
  std::queue<Item> input_queue_;
  // 隊(duì)列鎖丙号,避免同時(shí)讀取修改隊(duì)列數(shù)據(jù)
  std::mutex queue_lock_;
  // 消費(fèi)者線(xiàn)程,不斷消耗隊(duì)列中的數(shù)據(jù)
  std::shared_ptr<std::thread> worker_;
  //  是否運(yùn)行標(biāo)識(shí)
  volatile bool run_ = false;

 public:
  virtual ~Instance() { stop(); }
  
  // 結(jié)束函數(shù)
  void stop() {
    // 運(yùn)行狀態(tài)置為false再通知工作線(xiàn)程結(jié)束缰冤,跳出循環(huán)
    run_ = false;
    cond_.notify_one();
    {
      // 清空隊(duì)列
      std::unique_lock<std::mutex> l(queue_lock_);
      while (!input_queue_.empty()) {
        auto &item = input_queue_.front();
        if (item.pro) item.pro->set_value(Result());
        input_queue_.pop();
      }
    };
    
    if (worker_) {
      worker_->join();
      worker_.reset();
    }
  }

  // 生產(chǎn)者函數(shù)犬缨,不斷向隊(duì)列中commit數(shù)據(jù)
  virtual std::shared_future<Result> commit(const Input &input) {
    Item item;
    item.input = input;
    item.pro.reset(new std::promise<Result>());
    {
      // 隊(duì)列加鎖,避免同時(shí)讀取修改隊(duì)列
      std::unique_lock<std::mutex> __lock_(queue_lock_);
      input_queue_.push(item);
    }
    // 通知生產(chǎn)者線(xiàn)程干活
    cond_.notify_one();
    // 返回結(jié)果的shared_future
    return item.pro->get_future();
  }

  // 開(kāi)始函數(shù)
  // 1. 初始化模型棉浸,通過(guò)傳入的loadmethod方法來(lái)初始化模型
  // 2. 先stop一下怀薛,保證不會(huì)重復(fù)啟動(dòng),釋放資源
  // 3. 開(kāi)啟工作線(xiàn)程
  template <typename LoadMethod>
  bool start(const LoadMethod &loadmethod) {
    stop();
    std::promise<bool> status;
    worker_ = std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
                                            std::ref(loadmethod), std::ref(status));
    return status.get_future().get();
  }

 private:
  // 工作線(xiàn)程迷郑,在start方法中傳入loadmethod方法枝恋,對(duì)于深度學(xué)習(xí)模型部署中一般用于初始化模型
  template <typename LoadMethod>
  void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
    std::shared_ptr<Model> model = loadmethod();
    if (model == nullptr) {
      status.set_value(false);
      return;
    }

    run_ = true;
    status.set_value(true);

    Item fetch_item;
    Input input;
    // 等待從隊(duì)列中獲取數(shù)據(jù)
    // 隊(duì)列為空則一直阻塞在這里创倔,當(dāng)隊(duì)列中塞入數(shù)據(jù)時(shí)cond_會(huì)通知該線(xiàn)程開(kāi)始干活
    while (get_item_and_wait(fetch_item)) {
      input = fetect_item.input;
      // 初始化的模型中的forward方法用于處理數(shù)據(jù)
      auto ret = model->forward(input);
      // pro設(shè)置返回結(jié)果,設(shè)置返回結(jié)果后future在get的時(shí)候就不會(huì)一直阻塞了
      fetch_item.pro->set_value(ret);
    }
    model.reset();
    run_ = false;
  }
  
  // 封裝的等待獲取數(shù)據(jù)的方法
  virtual bool get_item_and_wait(Item &fetch_item) {
    std::unique_lock<std::mutex> l(queue_lock_);
    cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });

    if (!run_) return false;
    // 獲取數(shù)據(jù)并從隊(duì)列彈出
    // std::move避免數(shù)據(jù)拷貝
    fetch_item = std::move(input_queue_.front());
    input_queue_.pop();
    return true;
  }
};
};  // namespace cpm

#endif  // __CPM_HPP__

使用例子

ps : 在不用深度學(xué)習(xí)模型部署的情況也可以使用該代碼焚碌,十分通用畦攘、好用

太長(zhǎng)不放了,有時(shí)間繼續(xù)寫(xiě)??♂?

代碼來(lái)源

infer/src/cpm.hpp at main · shouxieai/infer · GitHub

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末呐能,一起剝皮案震驚了整個(gè)濱河市念搬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌摆出,老刑警劉巖朗徊,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異偎漫,居然都是意外死亡爷恳,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)象踊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)温亲,“玉大人,你說(shuō)我怎么就攤上這事杯矩≌恍椋” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵史隆,是天一觀(guān)的道長(zhǎng)魂务。 經(jīng)常有香客問(wèn)我,道長(zhǎng)泌射,這世上最難降的妖魔是什么粘姜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮熔酷,結(jié)果婚禮上孤紧,老公的妹妹穿的比我還像新娘。我一直安慰自己拒秘,他們只是感情好号显,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著躺酒,像睡著了一般咙轩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阴颖,一...
    開(kāi)封第一講書(shū)人閱讀 51,698評(píng)論 1 305
  • 那天活喊,我揣著相機(jī)與錄音,去河邊找鬼。 笑死钾菊,一個(gè)胖子當(dāng)著我的面吹牛帅矗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播煞烫,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼浑此,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了滞详?” 一聲冷哼從身側(cè)響起凛俱,我...
    開(kāi)封第一講書(shū)人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎料饥,沒(méi)想到半個(gè)月后蒲犬,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡岸啡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年原叮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巡蘸。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡奋隶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出悦荒,到底是詐尸還是另有隱情唯欣,我是刑警寧澤,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布搬味,位于F島的核電站境氢,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏身腻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一匹厘、第九天 我趴在偏房一處隱蔽的房頂上張望嘀趟。 院中可真熱鬧,春花似錦愈诚、人聲如沸她按。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)酌泰。三九已至,卻和暖如春匕累,著一層夾襖步出監(jiān)牢的瞬間陵刹,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工欢嘿, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衰琐,地道東北人也糊。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像羡宙,于是被迫代替她去往敵國(guó)和親狸剃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容