C++生產(chǎn)者消費(fèi)者
- 基于鎖和條件變量的同步機(jī)制
- 基于pipe的同步機(jī)制
基于變量的同步機(jī)制挟裂,在面對(duì)select/poll等場(chǎng)景時(shí),無(wú)法做到loop線程的喚醒遣鼓。因此刻撒,面對(duì)讀寫事件時(shí)骨田,需要使用pipe來(lái)進(jìn)行同步。
- 基于鎖和條件變量的同步機(jī)制
#include <iostream>
#include <mutex>
#include <thread>
#include <memory>
#include <vector>
#include <string>
class context{
public:
virtual int run()=0;
};
class TaskManager{
private:
std::mutex mtx;
std::condition_variable cond;
std::thread m_handle;
std::thread::id thread_id;
bool stopped;
std::string thread_name;
private:
virtual void entry();
std::vector<std::shared_ptr<context>> task;
void _process(std::vector<std::shared_ptr<context>>&);
public:
void create(const std::string& name){
thread_name = name;
m_handle = std::thread(&TaskManager::entry,this);
thread_id = m_handle.get_id();
}
void join(){
if (m_handle.joinable())
m_handle.join();
}
void detach(){
m_handle.detach();
}
int add_task(std::shared_ptr<context>);
void stop(){
std::lock_guard<std::mutex> lock{mtx};
stopped = true;
cond.notify_all();
}
};
void TaskManager::entry(){
std::unique_lock<std::mutex> lock{mtx};
while (!stopped || !task.empty()){
cond.wait(lock,[this](){return (!task.empty() || stopped);});
std::vector<std::shared_ptr<context>> tmp;
tmp.swap(task);
lock.unlock();
_process(tmp);
lock.lock();
}
}
void TaskManager::_process(std::vector<std::shared_ptr<context>>& tsk){
for (const auto& tk:tsk){
tk->run();
}
}
int TaskManager::add_task(std::shared_ptr<context> ts){
if (stopped)
return -1;
{
std::lock_guard<std::mutex> lock{mtx};
task.push_back(ts);
cond.notify_all();
}
return 0;
}
class test: public context{
private:
std::string name;
public:
test(const std::string& name):name(name){}
int run() override {
std::cout<<"Task Run: "<<name<<std::endl;
return 0;
}
};
int main(int argc,char* argv[]){
std::shared_ptr<TaskManager> tm = std::make_shared<TaskManager>();
tm->create("TaskManager");
tm->add_task(std::make_shared<test>("TaskA"));
tm->add_task(std::make_shared<test>("TaskB"));
tm->add_task(std::make_shared<test>(std::string("TaskC")));
tm->stop();
tm->join();
}
- 基于pipe的同步機(jī)制
基于鎖和條件變量的同步機(jī)制声怔,無(wú)法適用于讀寫事件的同步态贤,如select/poll/epoll。主線程在select中等待時(shí)醋火,則需要其它喚醒方式悠汽。
- 可以使用pipe進(jìn)行同步
- 將pipe的讀端加入select監(jiān)聽
- 需要喚醒時(shí),對(duì)pipe寫端進(jìn)行寫入芥驳,此時(shí)喚醒等待線程
實(shí)現(xiàn)如下柿冲,注冊(cè)監(jiān)聽一組fd,利用管道實(shí)現(xiàn)對(duì)監(jiān)聽fd的添加和刪除:
#include <iostream>
#include <mutex>
#include <thread>
#include <memory>
#include <map>
#include <string>
#include <functional>
#include <array>
#include <string_view>
#include <algorithm>
#include <fcntl.h>
#include <unistd.h>
#include <sys/select.h>
int pipe_ocloexc(int pipefd[2]){
/*
** pipe init
*/
if (pipe2(pipefd, O_CLOEXEC) < 0){
return -1;
}
if (::fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0){
::close(pipefd[1]);
::close(pipefd[0]);
return -1;
}
return 0;
}
struct io_handler{
io_handler(const std::string user):user(user){
int r = pipe_ocloexc(pipefd);
if (r < 0){
exit(-1);
}
}
std::string user;
int pipefd[2];
std::function<int(void*)> callback;
};
int read(int fd){
std::array<char, 256> buf;
int ret = ::read(fd, (void *)buf.data(), buf.size());
if (ret < 0) {
return -1;
}
std::cout<<"Read Data: "<<std::string_view(buf.data(),ret)<<std::endl;
return 0;
}
int process(void* arg){
io_handler* ptr = (io_handler*)arg;
//std::cout<<"IO: "<<ptr->user<<std::endl;
int ret = read(ptr->pipefd[0]);
if (ret < 0 )
return -1;
return 0;
}
class TaskManager{
private:
std::mutex mtx;
int thread_pipe[2];
std::thread m_handle;
std::thread::id thread_id;
std::string thread_name;
bool stopped;
private:
virtual void entry();
using mio = std::map<int,std::shared_ptr<io_handler>>;
mio io_callback;
int signal_thread();
int clear_signal(int);
public:
int create(const std::string& name);
void join();
void detach();
int register_fd(std::shared_ptr<io_handler>);
void unregister_fd(std::shared_ptr<io_handler>);
void stop();
virtual ~TaskManager(){
::close(thread_pipe[1]);
::close(thread_pipe[0]);
}
};
int TaskManager::create(const std::string& name){
thread_name = name;
if (pipe_ocloexc(thread_pipe)<0)
return -1;
m_handle = std::thread(&TaskManager::entry,this);
thread_id = m_handle.get_id();
return 0;
}
int TaskManager::clear_signal(int fd)
{
std::array<char, 256> buf;
int ret = ::read(fd, (void *)buf.data(), buf.size());
if (ret < 0) {
return -1;
}
return 0;
}
void TaskManager::join(){
if (m_handle.joinable())
m_handle.join();
}
void TaskManager::detach(){
m_handle.detach();
}
void TaskManager::stop(){
if (stopped)
return;
stopped = true;
signal_thread();
}
void TaskManager::entry(){
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
while (!stopped || !io_callback.empty()){
int maxfd = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
FD_SET(thread_pipe[0],&fdread);
maxfd = thread_pipe[0]+1;
std::unique_lock<std::mutex> lock{mtx};
for (const auto& io:io_callback){
FD_SET(io.first,&fdread);
if (io.first >= maxfd){
maxfd = io.first+1;
}
}
lock.unlock();
int ret = select(maxfd,&fdread,&fdwrite,&fdexcep,NULL);
if ( ret < 0)
return;
lock.lock();
for (const auto& io:io_callback){
if (FD_ISSET(io.first,&fdread)){
//clear_signal(io.first);
(io.second)->callback(io.second.get());
}
}
if (FD_ISSET(thread_pipe[0],&fdread)){
clear_signal(thread_pipe[0]);
}
lock.unlock();
}
}
int TaskManager::signal_thread(){
uint32_t buf = 0;
int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
if (ret < 0) {
return -1;
}
return 0;
}
int TaskManager::register_fd(std::shared_ptr<io_handler> handler){
if (stopped)
return -1;
{
std::lock_guard<std::mutex> lock{mtx};
io_callback.insert(std::pair<int,std::shared_ptr<io_handler>>(handler->pipefd[0],handler));
signal_thread();
}
return 0;
}
void TaskManager::unregister_fd(std::shared_ptr<io_handler> io){
{
std::lock_guard<std::mutex> lock{mtx};
io_callback.erase(io->pipefd[0]);
signal_thread();
}
}
int test(std::shared_ptr<io_handler> io){
std::array<char,256> buf;
std::copy(io->user.cbegin(),io->user.cend(),buf.data());
int ret = write(io->pipefd[1], (void *)&buf, io->user.size());
if (ret < 0) {
return -1;
}
return 0;
}
int main(int argc,char* argv[]){
std::shared_ptr<TaskManager> tm = std::make_shared<TaskManager>();
tm->create("TaskManager");
std::shared_ptr<io_handler> io1 = std::make_shared<io_handler>("io_1");
io1->callback = process;
std::shared_ptr<io_handler> io2 = std::make_shared<io_handler>("io_2");
io2->callback = process;
std::shared_ptr<io_handler> io3 = std::make_shared<io_handler>("io_3");
io3->callback = process;
tm->register_fd(io1);
tm->register_fd(io2);
tm->register_fd(io3);
test(io1);
test(io3);
test(io2);
sleep(3);
tm->unregister_fd(io1);
tm->unregister_fd(io2);
tm->unregister_fd(io3);
tm->stop();
tm->join();
}