condition_variable條件變量可以用來實(shí)現(xiàn)線程同步诀姚,它必須與互斥量mutex配合使用响牛。
條件變量適用場景:一個線程先對某一條件進(jìn)行判斷, 如果條件不滿足則進(jìn)入等待, 條件滿足的時候, 該線程被通知條件滿足, 繼續(xù)執(zhí)行任務(wù)
在wait()之前,必須先lock相關(guān)聯(lián)的mutex, 因?yàn)榧偃缒繕?biāo)條件未滿足,wait()實(shí)際上會unlock該mutex, 然后block,在目標(biāo)條件滿足后再重新lock該mutex, 然后返回
使用條件變量實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的簡單例子如下:
#include <iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
using namespace std;
//任務(wù)隊(duì)列
queue<int>products ;
mutex m ;
condition_variable cond ;
bool notify = false ;
bool done = false ;
void producer() ;
void consumer(){
while(!done){
//上鎖保護(hù)共享資源,unique_lock一次實(shí)現(xiàn)上鎖和解鎖
unique_lock<mutex>lk(m);
//等待生產(chǎn)者者通知有資源
while(!notify){
cond.wait(lk);
}
//要是隊(duì)列不為空的話
while(!products.empty()){
cout<<"consumer..."<<products.front()<<endl;
products.pop();
//通知生產(chǎn)者倉庫容量不足,生產(chǎn)產(chǎn)品
notify = false ;
cond.notify_one();
}
}
}
void producer(){
int i ;
for(i=0;i<10;i++){
//主動讓出cpu玷禽,不參與cpu 的本次調(diào)度赫段,讓其他線程使用,等一秒后再參與調(diào)度
// this_thread::sleep_for(chrono::seconds(1));
unique_lock<mutex>lk(m);
cout<<"producer..."<<i<<endl;
//如果倉庫中有產(chǎn)品,就等待消費(fèi)者消費(fèi)完后在生產(chǎn)
while(notify||!products.empty()){
cond.wait(lk);
}
//當(dāng)前倉庫里面沒有東西了,就將產(chǎn)品裝入倉庫
products.push(i);
//設(shè)置有產(chǎn)品的通知
notify = true ;
//通知消費(fèi)者可以取產(chǎn)品了
cond.notify_one();
}
//通知消費(fèi)者端不生產(chǎn)了
done = true ;
cond.notify_one();
}
int main()
{
thread t1(producer);
thread t2(consumer);
t1.join();
t2.join();
return 0;
}
下面實(shí)現(xiàn)了維護(hù)了緩沖區(qū)的結(jié)構(gòu)體,并每次返回相應(yīng)的位置矢赁,可以循環(huán)寫入的生產(chǎn)者消費(fèi)者模型:
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 1000; // How many items we plan to produce.
struct ItemRepository {
int item_buffer[kItemRepositorySize]; // 產(chǎn)品緩沖區(qū), 配合 read_position 和 write_position 模型環(huán)形隊(duì)列.
size_t read_position; // 消費(fèi)者讀取產(chǎn)品位置.
size_t write_position; // 生產(chǎn)者寫入產(chǎn)品位置.
std::mutex mtx; // 互斥量,保護(hù)產(chǎn)品緩沖區(qū)
std::condition_variable repo_not_full; // 條件變量, 指示產(chǎn)品緩沖區(qū)不為滿.
std::condition_variable repo_not_empty; // 條件變量, 指示產(chǎn)品緩沖區(qū)不為空.
} gItemRepository; // 產(chǎn)品庫全局變量, 生產(chǎn)者和消費(fèi)者操作該變量.
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while(((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); // 生產(chǎn)者等待"產(chǎn)品庫緩沖區(qū)不為滿"這一條件發(fā)生.
}
(ir->item_buffer)[ir->write_position] = item; // 寫入產(chǎn)品.
(ir->write_position)++; // 寫入位置后移.
if (ir->write_position == kItemRepositorySize) // 寫入位置若是在隊(duì)列最后則重新設(shè)置為初始位置.
ir->write_position = 0;
(ir->repo_not_empty).notify_all(); // 通知消費(fèi)者產(chǎn)品庫不為空.
lock.unlock(); // 解鎖.
}
int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while(ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消費(fèi)者等待"產(chǎn)品庫緩沖區(qū)不為空"這一條件發(fā)生.
}
data = (ir->item_buffer)[ir->read_position]; // 讀取某一產(chǎn)品
(ir->read_position)++; // 讀取位置后移
if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最后糯笙,則重新置位.
ir->read_position = 0;
(ir->repo_not_full).notify_all(); // 通知消費(fèi)者產(chǎn)品庫不為滿.
lock.unlock(); // 解鎖.
return data; // 返回產(chǎn)品.
}
void ProducerTask() // 生產(chǎn)者任務(wù)
{
for (int i = 1; i <= kItemsToProduce; ++i) {
// sleep(1);
std::cout << "Produce the " << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); // 循環(huán)生產(chǎn) kItemsToProduce 個產(chǎn)品.
}
}
void ConsumerTask() // 消費(fèi)者任務(wù)
{
static int cnt = 0;
while(1) {
sleep(1);
int item = ConsumeItem(&gItemRepository); // 消費(fèi)一個產(chǎn)品.
std::cout << "Consume the " << item << "^th item" << std::endl;
if (++cnt == kItemsToProduce) break; // 如果產(chǎn)品消費(fèi)個數(shù)為 kItemsToProduce, 則退出.
}
}
void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0; // 初始化產(chǎn)品寫入位置.
ir->read_position = 0; // 初始化產(chǎn)品讀取位置.
}
int main()
{
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask); // 創(chuàng)建生產(chǎn)者線程.
std::thread consumer(ConsumerTask); // 創(chuàng)建消費(fèi)之線程.
producer.join();
consumer.join();
return 0;
}
condition_variable條件變量線程同步與mutex互斥變量配合使用
每個線程的同步互斥控制流程如下:
A. 進(jìn)入后加互斥鎖
unique_lock<mutex> lck(mtx);
B.判斷此時是否能進(jìn)行讀寫,能則立刻進(jìn)行生產(chǎn)或消費(fèi)撩银,如不能則等待且釋放互斥鎖给涕,等到能夠生產(chǎn)消費(fèi)時,再加鎖進(jìn)行生產(chǎn)消費(fèi)操作额获。操作結(jié)束后通知生產(chǎn)者或者消費(fèi)者够庙,然后進(jìn)入D。
while(q.size() == maxSize) {produce.wait(lck);} task();consume.notify_all();
D.釋放互斥鎖
lck.unlock()
- C++11中加入了新的atomic原子性抄邀,可以用來進(jìn)行互斥操作耘眨。