生產(chǎn)者消費(fèi)者
生產(chǎn)者消費(fèi)者是一種常見(jiàn)的并發(fā)編程模型宿崭,用于解決多線(xiàn)程或多進(jìn)程之間的數(shù)據(jù)共享和協(xié)作問(wèn)題具钥。在該模型中污它,生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù)或任務(wù),并將其放入一個(gè)共享的緩沖區(qū)中驼鞭,而消費(fèi)者則負(fù)責(zé)從緩沖區(qū)中獲取數(shù)據(jù)或任務(wù)并進(jìn)行處理秦驯。
生產(chǎn)者和消費(fèi)者之間通過(guò)共享的緩沖區(qū)進(jìn)行通信,這個(gè)緩沖區(qū)可以是一個(gè)隊(duì)列挣棕、一個(gè)緩存區(qū)或者其他數(shù)據(jù)結(jié)構(gòu)译隘。生產(chǎn)者將數(shù)據(jù)放入緩沖區(qū)后亲桥,消費(fèi)者可以從緩沖區(qū)中取出數(shù)據(jù)進(jìn)行處理。如果緩沖區(qū)為空固耘,消費(fèi)者就需要等待题篷,直到有新的數(shù)據(jù)被生產(chǎn)者放入緩沖區(qū)。同樣地厅目,如果緩沖區(qū)已滿(mǎn)番枚,生產(chǎn)者就需要等待,直到有空閑的空間可以放入新的數(shù)據(jù)损敷。
這種模型的一個(gè)重要特點(diǎn)是解耦了生產(chǎn)者和消費(fèi)者之間的耦合關(guān)系葫笼。生產(chǎn)者和消費(fèi)者可以獨(dú)立地進(jìn)行操作,不需要彼此直接通信拗馒,而通過(guò)共享的緩沖區(qū)進(jìn)行間接通信路星。這種解耦可以提高系統(tǒng)的靈活性和可擴(kuò)展性,并允許生產(chǎn)者和消費(fèi)者的處理速度不一致诱桂。
生產(chǎn)者消費(fèi)者模型可以應(yīng)用于多種場(chǎng)景奥额,例如生產(chǎn)者可以是一個(gè)數(shù)據(jù)生成器,而消費(fèi)者可以是一個(gè)數(shù)據(jù)處理器访诱;生產(chǎn)者可以是一個(gè)任務(wù)調(diào)度器,而消費(fèi)者可以是執(zhí)行任務(wù)的線(xiàn)程或進(jìn)程等韩肝。通過(guò)合理設(shè)計(jì)和實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型触菜,可以有效地控制并發(fā)訪(fǎng)問(wèn)共享資源,避免競(jìng)態(tài)條件和資源爭(zhēng)用問(wèn)題哀峻,提高系統(tǒng)的性能和可靠性
代碼解析
對(duì)源代碼做了部分修改涡相、刪減、添加注釋
#ifndef __CPM_HPP__
#define __CPM_HPP__
// Comsumer Producer Model
#include <algorithm>
#include <condition_variable>
#include <future>
#include <memory>
#include <queue>
#include <thread>
namespace cpm {
// 三個(gè)模板分別為結(jié)果類(lèi)型剩蟀、輸入類(lèi)型催蝗、處理數(shù)據(jù)的模型類(lèi)型
// 在深度學(xué)習(xí)模型識(shí)別中,Model可以為模型識(shí)別類(lèi)的結(jié)果或指針
template <typename Result, typename Input, typename Model>
class Instance {
protected:
struct Item {
Input input;
std::shared_ptr<std::promise<Result>> pro;
};
// 每次放入隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
// input 存儲(chǔ)需要處理的數(shù)據(jù)
// pro 存儲(chǔ)對(duì)數(shù)據(jù)的處理結(jié)果
// 條件變量
std::condition_variable cond_;
// 緩沖隊(duì)列育特,所有commit的數(shù)據(jù)都存儲(chǔ)在這個(gè)隊(duì)列中
std::queue<Item> input_queue_;
// 隊(duì)列鎖丙号,避免同時(shí)讀取修改隊(duì)列數(shù)據(jù)
std::mutex queue_lock_;
// 消費(fèi)者線(xiàn)程,不斷消耗隊(duì)列中的數(shù)據(jù)
std::shared_ptr<std::thread> worker_;
// 是否運(yùn)行標(biāo)識(shí)
volatile bool run_ = false;
public:
virtual ~Instance() { stop(); }
// 結(jié)束函數(shù)
void stop() {
// 運(yùn)行狀態(tài)置為false再通知工作線(xiàn)程結(jié)束缰冤,跳出循環(huán)
run_ = false;
cond_.notify_one();
{
// 清空隊(duì)列
std::unique_lock<std::mutex> l(queue_lock_);
while (!input_queue_.empty()) {
auto &item = input_queue_.front();
if (item.pro) item.pro->set_value(Result());
input_queue_.pop();
}
};
if (worker_) {
worker_->join();
worker_.reset();
}
}
// 生產(chǎn)者函數(shù)犬缨,不斷向隊(duì)列中commit數(shù)據(jù)
virtual std::shared_future<Result> commit(const Input &input) {
Item item;
item.input = input;
item.pro.reset(new std::promise<Result>());
{
// 隊(duì)列加鎖,避免同時(shí)讀取修改隊(duì)列
std::unique_lock<std::mutex> __lock_(queue_lock_);
input_queue_.push(item);
}
// 通知生產(chǎn)者線(xiàn)程干活
cond_.notify_one();
// 返回結(jié)果的shared_future
return item.pro->get_future();
}
// 開(kāi)始函數(shù)
// 1. 初始化模型棉浸,通過(guò)傳入的loadmethod方法來(lái)初始化模型
// 2. 先stop一下怀薛,保證不會(huì)重復(fù)啟動(dòng),釋放資源
// 3. 開(kāi)啟工作線(xiàn)程
template <typename LoadMethod>
bool start(const LoadMethod &loadmethod) {
stop();
std::promise<bool> status;
worker_ = std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
std::ref(loadmethod), std::ref(status));
return status.get_future().get();
}
private:
// 工作線(xiàn)程迷郑,在start方法中傳入loadmethod方法枝恋,對(duì)于深度學(xué)習(xí)模型部署中一般用于初始化模型
template <typename LoadMethod>
void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
std::shared_ptr<Model> model = loadmethod();
if (model == nullptr) {
status.set_value(false);
return;
}
run_ = true;
status.set_value(true);
Item fetch_item;
Input input;
// 等待從隊(duì)列中獲取數(shù)據(jù)
// 隊(duì)列為空則一直阻塞在這里创倔,當(dāng)隊(duì)列中塞入數(shù)據(jù)時(shí)cond_會(huì)通知該線(xiàn)程開(kāi)始干活
while (get_item_and_wait(fetch_item)) {
input = fetect_item.input;
// 初始化的模型中的forward方法用于處理數(shù)據(jù)
auto ret = model->forward(input);
// pro設(shè)置返回結(jié)果,設(shè)置返回結(jié)果后future在get的時(shí)候就不會(huì)一直阻塞了
fetch_item.pro->set_value(ret);
}
model.reset();
run_ = false;
}
// 封裝的等待獲取數(shù)據(jù)的方法
virtual bool get_item_and_wait(Item &fetch_item) {
std::unique_lock<std::mutex> l(queue_lock_);
cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
if (!run_) return false;
// 獲取數(shù)據(jù)并從隊(duì)列彈出
// std::move避免數(shù)據(jù)拷貝
fetch_item = std::move(input_queue_.front());
input_queue_.pop();
return true;
}
};
}; // namespace cpm
#endif // __CPM_HPP__
使用例子
ps : 在不用深度學(xué)習(xí)模型部署的情況也可以使用該代碼焚碌,十分通用畦攘、好用
太長(zhǎng)不放了,有時(shí)間繼續(xù)寫(xiě)??♂?