前面我們介紹了線程(std::thread)和互斥量(std::mutex),互斥量是多線程間同時(shí)訪問(wèn)某一共享變量時(shí),保證變量可被安全訪問(wèn)的手段。在多線程編程中柴钻,還有另一種十分常見(jiàn)的行為:線程同步。線程同步是指線程間需要按照預(yù)定的先后次序順序進(jìn)行的行為垢粮。C++11對(duì)這種行為也提供了有力的支持贴届,這就是條件變量。條件變量位于頭文件condition_variable下。本章我們將簡(jiǎn)要介紹一下該類(lèi)毫蚓,在文章的最后我們會(huì)綜合運(yùn)用std::mutex和std::condition_variable占键,實(shí)現(xiàn)一個(gè)chan類(lèi),該類(lèi)可在多線程間安全的通信元潘,具有廣泛的應(yīng)用場(chǎng)景畔乙。
1. std::condition_variable
條件變量提供了兩類(lèi)操作:wait和notify。這兩類(lèi)操作構(gòu)成了多線程同步的基礎(chǔ)翩概。
1.1 wait
wait是線程的等待動(dòng)作牲距,直到其它線程將其喚醒后,才會(huì)繼續(xù)往下執(zhí)行钥庇。下面通過(guò)偽代碼來(lái)說(shuō)明其用法:
std::mutex mutex;
std::condition_variable cv;
// 條件變量與臨界區(qū)有關(guān)牍鞠,用來(lái)獲取和釋放一個(gè)鎖,因此通常會(huì)和mutex聯(lián)用上沐。
std::unique_lock lock(mutex);
// 此處會(huì)釋放lock皮服,然后在cv上等待,直到其它線程通過(guò)cv.notify_xxx來(lái)喚醒當(dāng)前線程参咙,cv被喚醒后會(huì)再次對(duì)lock進(jìn)行上鎖,然后wait函數(shù)才會(huì)返回硫眯。
// wait返回后可以安全的使用mutex保護(hù)的臨界區(qū)內(nèi)的數(shù)據(jù)蕴侧。此時(shí)mutex仍為上鎖狀態(tài)
cv.wait(lock)
需要注意的一點(diǎn)是, wait有時(shí)會(huì)在沒(méi)有任何線程調(diào)用notify的情況下返回,這種情況就是有名的spurious wakeup两入。因此當(dāng)wait返回時(shí)净宵,你需要再次檢查wait的前置條件是否滿足,如果不滿足則需要再次wait裹纳。wait提供了重載的版本择葡,用于提供前置檢查。
template <typename Predicate>
void wait(unique_lock<mutex> &lock, Predicate pred) {
while(!pred()) {
wait(lock);
}
}
除wait外, 條件變量還提供了wait_for和wait_until剃氧,這兩個(gè)名稱(chēng)是不是看著有點(diǎn)兒眼熟敏储,std::mutex也提供了_for和_until操作。在C++11多線程編程中朋鞍,需要等待一段時(shí)間的操作已添,一般情況下都會(huì)有xxx_for和xxx_until版本。前者用于等待指定時(shí)長(zhǎng)滥酥,后者用于等待到指定的時(shí)間更舞。
1.2 notify
了解了wait,notify就簡(jiǎn)單多了:?jiǎn)拘褀ait在該條件變量上的線程坎吻。notify有兩個(gè)版本:notify_one和notify_all缆蝉。
- notify_one 喚醒等待的一個(gè)線程,注意只喚醒一個(gè)。
- notify_all 喚醒所有等待的線程刊头。使用該函數(shù)時(shí)應(yīng)避免出現(xiàn)驚群效應(yīng)贝搁。
其使用方式見(jiàn)下例:
std::mutex mutex;
std::condition_variable cv;
std::unique_lock lock(mutex);
// 所有等待在cv變量上的線程都會(huì)被喚醒。但直到lock釋放了mutex芽偏,被喚醒的線程才會(huì)從wait返回雷逆。
cv.notify_all(lock)
2. 線程間通信 - chan的實(shí)現(xiàn)
有了上面的基礎(chǔ)我們就可以設(shè)計(jì)我們的線程間通訊工具"chan"了。我們的設(shè)計(jì)目標(biāo):
- 在線程間安全的傳遞數(shù)據(jù)污尉。golang社區(qū)有一句經(jīng)典的話:不要通過(guò)共享內(nèi)存來(lái)通信膀哲,要通過(guò)通信來(lái)共享內(nèi)存。
- 消除線程線程同步帶來(lái)的復(fù)雜性被碗。
我們先來(lái)看一下chan的實(shí)際使用效果, 生產(chǎn)者-消費(fèi)者(一個(gè)生產(chǎn)者某宪,多個(gè)消費(fèi)者)
#include <stdio.h>
#include <thread>
#include "chan.h" // chan的頭文件
using namespace std::chrono;
// 消費(fèi)數(shù)據(jù)
void consume(chan<int> ch, int thread_id) {
int n;
while(ch >> n) {
printf("[%d] %d\n", thread_id, n);
std::this_thread::sleep_for(milliseconds(100));
}
}
int main() {
chan<int> chInt(3);
// 消費(fèi)者
std::thread consumers[5];
for (int i = 0; i < 5; i++) {
consumers[i] = std::thread(consume, chInt, i+1);
}
// 生產(chǎn)數(shù)據(jù)
for (int i = 0; i < 16; i++) {
chInt << i;
}
chInt.close(); // 數(shù)據(jù)生產(chǎn)完畢
for (std::thread &thr: consumers) {
thr.join();
}
return 0;
}
附: 源碼(可在github上下載到)
下面附上chan.simple.h的實(shí)現(xiàn),是chan的較為簡(jiǎn)單的實(shí)現(xiàn)锐朴,完整實(shí)現(xiàn)請(qǐng)去github下載兴喂。該代碼在g++和vc 2015下均編譯通過(guò),其它平臺(tái)未驗(yàn)證焚志。
// chan.simple.h
#pragma once
#include <condition_variable> // std::condition_variable
#include <list> // std::list
#include <mutex> // std::mutex
template <typename T>
class chan {
class queue_t {
mutable std::mutex mutex_;
std::condition_variable cv_;
std::list<T> data_;
const size_t capacity_; // data_容量
const bool enable_overflow_;
bool closed_ = false; // 隊(duì)列是否已關(guān)閉
size_t pop_count_ = 0; // 計(jì)數(shù)衣迷,累計(jì)pop的數(shù)量
public:
queue_t(size_t capacity) :
capacity_(capacity == 0 ? 1 : capacity),
enable_overflow_(capacity == 0) {
}
bool is_empty() const {
return data_.empty();
}
size_t free_count() const {
// capacity_為0時(shí),允許放入一個(gè)酱酬,但_queue會(huì)處于overflow狀態(tài)
return capacity_ - data_.size();
}
bool is_overflow() const {
return enable_overflow_ && data_.size() >= capacity_;
}
bool is_closed() const {
std::unique_lock<std::mutex> lock(this->mutex_);
return this->closed_;
}
// close以后的入chan操作會(huì)返回false, 而出chan則在隊(duì)列為空后才返回false
void close() {
std::unique_lock<std::mutex> lock(this->mutex_);
this->closed_ = true;
if (this->is_overflow()) {
// 消除溢出
this->data_.pop_back();
}
this->cv_.notify_all();
}
template <typename TR>
bool pop(TR &data) {
std::unique_lock<std::mutex> lock(this->mutex_);
this->cv_.wait(lock, [&]() { return !is_empty() || closed_; });
if (this->is_empty()) {
return false; // 已關(guān)閉
}
data = this->data_.front();
this->data_.pop_front();
this->pop_count_++;
if (this->free_count() == 1) {
// 說(shuō)明以前是full或溢出狀態(tài)
this->cv_.notify_all();
}
return true;
}
template <typename TR>
bool push(TR &&data) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return free_count() > 0 || closed_; });
if (closed_) {
return false;
}
data_.push_back(std::forward<TR>(data));
if (data_.size() == 1) {
cv_.notify_all();
}
// 當(dāng)queue溢出,需等待queue回復(fù)正常
if (is_overflow()) {
const size_t old = this->pop_count_;
cv_.wait(lock, [&]() { return old != pop_count_ || closed_; });
}
return !this->closed_;
}
};
std::shared_ptr<queue_t> queue_;
public:
explicit chan(size_t capacity = 0) {
queue_ = std::make_shared<queue_t>(capacity);
}
// 支持拷貝
chan(const chan &) = default;
chan &operator=(const chan &) = default;
// 支持move
chan(chan &&) = default;
chan &operator=(chan &&) = default;
// 入chan壶谒,支持move語(yǔ)義
template <typename TR>
bool operator<<(TR &&data) {
return queue_->push(std::forward<TR>(data));
}
// 出chan(支持兼容類(lèi)型的出chan)
template <typename TR>
bool operator>>(TR &data) {
return queue_->pop(data);
}
// close以后的入chan操作返回false, 而出chan則在隊(duì)列為空后才返回false
void close() {
queue_->close();
}
bool is_closed() const {
return queue_->is_closed();
}
};
上一篇 C++11多線程-mutex(2) |
目錄 | 下一篇 C++11多線程-promise |
---|