本文根據(jù)眾多互聯(lián)網(wǎng)博客內(nèi)容整理后形成瓤湘,引用內(nèi)容的版權(quán)歸原始作者所有,僅限于學(xué)習(xí)研究使用团搞,不得用于任何商業(yè)用途艰猬。
互斥
互斥是多線程系統(tǒng)中用于控制訪問的一個(gè)原對(duì)象(primitive object)。下面的例子給出了它最基本的用法:
std::mutex m;
int sh; //共享數(shù)據(jù)
// …
m.lock();
// 對(duì)共享數(shù)據(jù)進(jìn)行操作:
sh += 1;
m.unlock();
在任何時(shí)刻斧账,最多只能有一個(gè)線程執(zhí)行到lock()和unlock()之間的區(qū)域(通常稱為臨界區(qū))。當(dāng)?shù)谝粋€(gè)線程正在臨界區(qū)執(zhí)行時(shí)煞肾,后續(xù)執(zhí)行到m.lock()的線程將會(huì)被阻塞直到第一個(gè)進(jìn)程執(zhí)行到m.unlock()咧织。這個(gè)過程比較簡單,但是如何正確使用互斥并不簡單籍救。錯(cuò)誤地使用互斥將會(huì)導(dǎo)致一系列嚴(yán)重后果习绢。大家可以設(shè)想以下情形所導(dǎo)致的后果:一個(gè)線程只進(jìn)行了lock()而沒有執(zhí)行相應(yīng)unlock(); 一個(gè)線程對(duì)同一個(gè)mutex對(duì)象執(zhí)行了兩次lock()操作蝙昙;一個(gè)線程在等待unlock()操作時(shí)被阻塞了很久闪萄;一個(gè)線程需要對(duì)兩個(gè)mutex對(duì)象執(zhí)行l(wèi)ock()操作后才能執(zhí)行后續(xù)任務(wù)∑娴撸可以在很多書(譯者注:通常操作系統(tǒng)相關(guān)書籍中會(huì)講到)中找到這些問題的答案败去。在這里(包括Locks section一節(jié))所給出的都是一些入門級(jí)別的。
除了lock()烈拒,mutex還提供了try_lock()操作圆裕。線程可以借助該操作來嘗試進(jìn)入臨界區(qū)广鳍,這樣一來該線程不會(huì)在失敗的情況下被阻塞。下面例子給出了try_lock()的用法:
std::mutex m;
int sh; //共享數(shù)據(jù)
// …
if (m.try_lock()) {
//操作共享數(shù)據(jù)
sh += 1;
m.unlock();
}
else {
//可能在試圖進(jìn)入臨界區(qū)失敗后執(zhí)行其它代碼
}
recursive_mutex是一種能夠被同一線程連續(xù)鎖定多次的mutex吓妆。下面是recursive_mutex的一個(gè)實(shí)例:
std::recursive_mutex m;
int sh; //共享數(shù)據(jù)
//..
void f(int i)
{
//…
m.lock();
//對(duì)共享數(shù)據(jù)進(jìn)行操作
sh += 1;
if (–i>0) f(i); //注意:這里對(duì)f(i)進(jìn)行了遞歸調(diào)用赊时,
//將導(dǎo)致在m.unlock()之前多次執(zhí)行m.lock()
m.unlock();
//…
}
對(duì)于這點(diǎn),我曾經(jīng)夸耀過并且用f()調(diào)用它自身行拢。一般地祖秒,代碼會(huì)更加微妙。這是因?yàn)榇a中經(jīng)常會(huì)有間接遞歸調(diào)用舟奠。比如f()調(diào)用g()竭缝,而g()又調(diào)用了h(),最后h()又調(diào)用了f()鸭栖,這樣就形成了一個(gè)間接遞歸歌馍。
如果我想在未來的10秒內(nèi)進(jìn)入到一個(gè)mutex所劃定的臨界區(qū),該如果實(shí)現(xiàn)晕鹊? timed_mutex類可以解決這個(gè)問題松却。事實(shí)上,關(guān)于它的使用可以被看做是關(guān)聯(lián)了時(shí)間限制的try_lock()的一個(gè)特例溅话。
std::timed_mutex m;
int sh; //共享數(shù)據(jù)
//…
if ( m.try_lock_for(std::chrono::seconds(10))) {
//對(duì)共享數(shù)據(jù)進(jìn)行操作
sh += 1晓锻;
m.unlock();
}
else {
//進(jìn)入臨界區(qū)失敗,在此執(zhí)行其它代碼
}
try_lock_for()的參數(shù)是一個(gè)用相對(duì)時(shí)間表示的duration飞几。如果你不想這么做而是想等到一個(gè)固定的時(shí)間點(diǎn):一個(gè)time_point砚哆,你可以使用try_lock_until():
std::timed_mutex m;
int sh; //共享數(shù)據(jù)
// …
if ( m.try_lock_until(midnight)) {
//對(duì)共享數(shù)據(jù)進(jìn)行操作
sh += 1;
m.unlock();
}
else {
//進(jìn)入臨界區(qū)失敗,在此執(zhí)行其它代碼
}
這里使用midnight是一個(gè)冷笑話:對(duì)于mutex級(jí)別的操作屑墨,相應(yīng)的時(shí)間是毫秒級(jí)別的而不是小時(shí)躁锁。
當(dāng)然地,C++0x中也有recursive_timed_mutex卵史。
mutex可以被看做是一個(gè)資源(因?yàn)樗?jīng)常被用來代表一種真實(shí)的資源)战转,并且當(dāng)它對(duì)至少兩個(gè)線程可見時(shí)它才是有用的。必然地以躯,mutex不能被復(fù)制或者移動(dòng)(正如你不能復(fù)制一個(gè)硬件的輸入寄存器)槐秧。
令人驚訝地,實(shí)際中經(jīng)常很難做到lock()s與unlock()s的匹配忧设。設(shè)想一下那些復(fù)雜的控制結(jié)構(gòu)刁标,錯(cuò)誤以及異常,要做到匹配的確比較困難址晕。如果你可以選擇使用locks去管理你的互斥膀懈,這將為你和你的用戶節(jié)省大量的時(shí)間,再也不用熬夜通宵徹夜無眠了谨垃。(that will save you and your users a lot of sleep吏砂?撵儿?)。
std::future和std::promise
并行開發(fā)挺復(fù)雜的狐血,特別是在試圖用好線程和鎖的過程中淀歇。如果要用到條件變量或std-atomics(一種無鎖開發(fā)方式),那就更復(fù)雜了匈织。C++0x提供了future和promise來簡化任務(wù)線程間的返回值操作浪默;同時(shí)為啟動(dòng)任務(wù)線程提供了packaged_task以方便操作。其中的關(guān)鍵點(diǎn)是允許2個(gè)任務(wù)間使用無(顯式)鎖的方式進(jìn)行值傳遞缀匕;標(biāo)準(zhǔn)庫幫你高效的做好這些了纳决。基本思路很簡單:當(dāng)一個(gè)任務(wù)需要向父線程(啟動(dòng)它的線程)返回值時(shí)乡小,它把這個(gè)值放到promise中阔加。之后,這個(gè)返回值會(huì)出現(xiàn)在和此promise關(guān)聯(lián)的future中满钟。于是父線程就能讀到返回值胜榔。更簡單點(diǎn)的方法,參看async()湃番。
標(biāo)準(zhǔn)庫中提供了3種future:普通future和為復(fù)雜場(chǎng)合使用的shared_future和atomic_future夭织。在本主題中,只展示了普通future吠撮,它已經(jīng)完全夠用了尊惰。如果我們有一個(gè)future
f,通過get()可以獲得它的值:
X v = f.get(); // if necessary wait for the value to get computed
如果它的返回值還沒有到達(dá)泥兰,調(diào)用線程會(huì)進(jìn)行阻塞等待弄屡。要是等啊等啊,等到花兒也謝了的話鞋诗,get()會(huì)拋出異常的(從標(biāo)準(zhǔn)庫或等待的線程那個(gè)線程中拋出)琢岩。
如果我們不需要等待返回值(非阻塞方式),可以簡單詢問一下future师脂,看返回值是否已經(jīng)到達(dá):
if (f.wait_for(0))
{
// there is a value to get()
// do something
}
else
{
// do something else
}
但是,future最主要的目的還是提供一個(gè)簡單的獲取返回值的方法:get()江锨。
promise的主要目的是提供一個(gè)”put”(或”get”吃警,隨你)操作,以和future的get()對(duì)應(yīng)啄育。future和promise的名字是有歷史來歷的酌心,是一個(gè)雙關(guān)語。感覺有點(diǎn)別扭挑豌?請(qǐng)別怪我安券。
promise為future傳遞的結(jié)果類型有2種:傳一個(gè)普通值或者拋出一個(gè)異常
try {
X res;
// compute a value for res
p.set_value(res);
}
catch (…) { // oops: couldn’t compute res
p.set_exception(std::current_exception());
}
到目前為止還不錯(cuò)墩崩,不過我們?nèi)绾纹ヅ鋐uture/promise對(duì)呢?一個(gè)在我的線程侯勉,另一個(gè)在別的啥線程中嗎鹦筹?是這樣:既然future和promise可以被到處移動(dòng)(不是拷貝),那么可能性就挺多的址貌。最普遍的情況是父子線程配對(duì)形式铐拐,父線程用future獲取子線程promise返回的值。在這種情況下练对,使用async()是很優(yōu)雅的方法遍蟋。
packaged_task提供了啟動(dòng)任務(wù)線程的簡單方法。特別是它處理好了future和promise的關(guān)聯(lián)關(guān)系螟凭,同時(shí)提供了包裝代碼以保證返回值/異承榍啵可以放到promise中,示例代碼:
void comp(vector& v)
{
// package the tasks:
// (the task here is the standard
// accumulate() for an array of doubles):
packaged_task pt0{std::accumulate};
packaged_task pt1{std::accumulate};
auto f0 = pt0.get_future(); // get hold of the futures
auto f1 = pt1.get_future();
pt0(&v[0],&v[v.size()/2],0); // start the threads
pt1(&[v.size()/2],&v[size()],0);
return f0.get()+f1.get(); // get the results
}
async()
async()函數(shù)是一個(gè)簡單任務(wù)的”啟動(dòng)”(launcher)函數(shù)螺男。
下邊是一種優(yōu)于傳統(tǒng)的線程+鎖的并發(fā)編程方法示例(譯注:山寨map-reduce哦):
template<class T,class V> struct Accum { // 簡單的積函數(shù)對(duì)象
T* b;
T* e;
V val;
Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
V operator() ()
{ return std::accumulate(b,e,val); }
};
void comp(vector<double>& v)
// 如果v夠大棒厘,則產(chǎn)生很多任務(wù) {
if (v.size()<10000)
return std::accumulate(v.begin(),v.end(),0.0);
auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};
return f0.get()+f1.get()+f2.get()+f3.get();
}
盡管這只是一個(gè)簡單的并發(fā)編程示例(留意其中的”magic number“),不過我們可沒有使用線程烟号,鎖绊谭,緩沖區(qū)等概念。f*變量的類型(即async()的返回值)是”std::future”類型汪拥。future.get()表示如果有必要的話則等待相應(yīng)的線程(std::thread)運(yùn)行結(jié)束达传。async的工作是根據(jù)需要來啟動(dòng)新線程,而future的工作則是等待新線程運(yùn)行結(jié)束迫筑∠芨希”簡單性”是async/future設(shè)計(jì)中最重視的一個(gè)方面;future一般也可以和線程一起使用脯燃,不過不要使用async()來啟動(dòng)類似I/O操作搂妻,操作互斥體(mutex),多任務(wù)交互操作等復(fù)雜任務(wù)辕棚。async()背后的理念和range-for statement很類似:簡單事兒簡單做欲主,把復(fù)雜的事情留給一般的通用機(jī)制來搞定吧。
async()可以啟動(dòng)一個(gè)新線程或者復(fù)用一個(gè)它認(rèn)為合適的已有線程(非調(diào)用線程即可)(譯注:語義上并發(fā)即可逝嚎,不關(guān)心具體的調(diào)度策略扁瓢。和go語義中的goroutines有點(diǎn)像)。后者從用戶視角看更有效一些(只對(duì)簡單任務(wù)而言)补君。
線程(thread)
線程(譯注:大約是C++11中最激動(dòng)人心的特性了)是一種對(duì)程序中的執(zhí)行或者計(jì)算的表述引几。跟許多現(xiàn)代計(jì)算一樣,C++11中的線程之間能夠共享地址空間挽铁。從這點(diǎn)上來看伟桅,它不同于進(jìn)程:進(jìn)程一般不會(huì)直接跟其它進(jìn)程共享數(shù)據(jù)敞掘。在過去,C++針對(duì)不同的硬件和操作系統(tǒng)有著不同的線程實(shí)現(xiàn)版本楣铁。如今玖雁,C++將線程加入到了標(biāo)準(zhǔn)件庫中:一個(gè)標(biāo)準(zhǔn)線程ABI。
許多大部頭書籍以及成千上萬的論文都曾涉及到并發(fā)民褂、并行以及線程茄菊。在這一條FAQ里幾乎不涉及這些內(nèi)容。事實(shí)上赊堪,要做到清楚地思考并發(fā)非常難面殖。如果你想編寫并發(fā)程序,請(qǐng)至少看一本書哭廉。不要依賴于一本手冊(cè)脊僚、一個(gè)標(biāo)準(zhǔn)或者一條FAQ。
在用一個(gè)函數(shù)或者函數(shù)對(duì)象(包括lambda)構(gòu)造std::thread時(shí)遵绰,一個(gè)線程便啟動(dòng)了辽幌。
#include <thread>
void f();
struct F {
void operator()();
};
int main()
{
std::thread t1{f}; // f() 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t2{F()}; // F()() 在一個(gè)單獨(dú)的線程中執(zhí)行
}
然而,無論f()和F()執(zhí)行任何功能椿访,都不能給出有用的結(jié)果乌企。這是因?yàn)槌绦蚩赡軙?huì)在t1執(zhí)行f()之前或之后以及t2執(zhí)行F()之前或之后終結(jié)。我們所期望的是能夠等到兩個(gè)任務(wù)都完成成玫,這可以通過下述方法來實(shí)現(xiàn):
int main()
{
std::thread t1{f}; // f() 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t2{F()}; // F()()在一個(gè)單獨(dú)的線程中執(zhí)行
t1.join(); // 等待t1
t2.join(); // 等待t2
}
上面例子中的join()保證了在t1和t2完成后程序才會(huì)終結(jié)加酵。這里”join”的意思是等待線程返回后再終結(jié)。
通常我們需要傳遞一些參數(shù)給要執(zhí)行的任務(wù)哭当。例如:
void f(vector<double>&);
struct F {
vector<double>& v;
F(vector<double>& vv) :v{vv} { }
void operator()();
};
int main(){
// f(some_vec) 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t1{std::bind(f,some_vec)};
// F(some_vec)() 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t2{F(some_vec)};
t1.join();
t2.join();
}
上例中的標(biāo)準(zhǔn)庫函數(shù)bind會(huì)將一個(gè)函數(shù)對(duì)象作為它的參數(shù)猪腕。
通常我們需要在執(zhí)行完一個(gè)任務(wù)后得到返回的結(jié)果。對(duì)于那些簡單的對(duì)返回值沒有概念的钦勘,我建議使用std::future陋葡。另一種方法是,我們可以給任務(wù)傳遞一個(gè)參數(shù)彻采,從而這個(gè)任務(wù)可以把結(jié)果存在這個(gè)參數(shù)中腐缤。例如:
void f(vector<double>&, double* res); // 將結(jié)果存在res中
struct F {
vector<double>& v;
double* res;
F(vector<double>& vv, double* p) :v{vv}, res{p} { }
void operator()(); //將結(jié)果存在res中
};
int main()
{
double res1;
double res2;
// f(some_vec,&res1) 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t1{std::bind(f,some_vec,&res1)};
// F(some_vec,&res2)() 在一個(gè)單獨(dú)的線程中執(zhí)行
std::thread t2{F(some_vec,&res2)};
t1.join();
t2.join();
std::cout << res1 << " " << res2 << ‘\n’;
}
但是關(guān)于錯(cuò)誤呢?如果一個(gè)任務(wù)拋出了異常應(yīng)該怎么辦肛响?如果一個(gè)任務(wù)拋出一個(gè)異常并且它沒有捕獲到這個(gè)異常岭粤,這個(gè)任務(wù)將會(huì)調(diào)用std::terminate()。調(diào)用這個(gè)函數(shù)一般意味著程序的結(jié)束终惑。我們常常會(huì)為避免這個(gè)問題做諸多嘗試。std::future可以將異常傳送給父線程(這正是我喜歡future的原因之一)门扇。否則雹有,返回錯(cuò)誤代碼偿渡。
除非一個(gè)線程的任務(wù)已經(jīng)完成了,當(dāng)一個(gè)線程超出所在的域的時(shí)候霸奕,程序會(huì)結(jié)束溜宽。很明顯,我們應(yīng)該避免這一點(diǎn)质帅。
沒有辦法來請(qǐng)求(也就是說盡量文雅地請(qǐng)求它盡可能早的退出)一個(gè)線程結(jié)束或者是強(qiáng)制(也就是說殺死這個(gè)線程)它結(jié)束适揉。下面是可供我們選擇的操作:
- 設(shè)計(jì)我們自己的協(xié)作的中斷機(jī)制(通過使用共享數(shù)據(jù)來實(shí)現(xiàn)。父線程設(shè)置這個(gè)數(shù)據(jù)煤惩,子線程檢查這個(gè)數(shù)據(jù)(子線程將會(huì)在該數(shù)據(jù)被設(shè)置后很快退出))嫉嘀。
- 使用thread::native_handle()來訪問線程在操作系統(tǒng)中的符號(hào)
- 殺死進(jìn)程(std::quick_exit())
- 殺死程序(std::terminate())
這些是委員會(huì)能夠統(tǒng)一的所有的規(guī)則。特別地魄揉,來自POSIX的代表強(qiáng)烈地反對(duì)任何形式的“線程取消”剪侮。然而許多C++的資源模型都依賴于析構(gòu)器。對(duì)于每種系統(tǒng)和每種可能的應(yīng)有并沒有完美的解決方案洛退。
線程中的一個(gè)基本問題是數(shù)據(jù)競爭瓣俯。也就是當(dāng)在統(tǒng)一地址空間的兩個(gè)線程獨(dú)立訪問一個(gè)對(duì)象時(shí)將會(huì)導(dǎo)致沒有定義的結(jié)果。如果一個(gè)(或者兩個(gè))對(duì)對(duì)象執(zhí)行寫操作兵怯,而另一個(gè)(或者兩個(gè))對(duì)該對(duì)象執(zhí)行讀操作彩匕,兩個(gè)線程將在誰先完成操作方面進(jìn)行競爭。這樣得到的結(jié)果不僅僅是沒定義的媒区,而且常常無法預(yù)測(cè)最后的結(jié)果驼仪。為解決這個(gè)問題,C++0x提供了一些規(guī)則和保證從而能夠讓程序員避免數(shù)據(jù)競爭驻仅。
- C++標(biāo)準(zhǔn)庫函數(shù)不能直接或間接地訪問正在被其它線程訪問的對(duì)象谅畅。一種例外是該函數(shù)通過參數(shù)(包括this)來直接或間接訪問這個(gè)對(duì)象。
- C++標(biāo)準(zhǔn)庫函數(shù)不能直接或間接修改正在被其它線程訪問的對(duì)象噪服。一種例外是該函數(shù)通過非const參數(shù)(包括this)來直接或間接訪問這個(gè)對(duì)象毡泻。
- C++標(biāo)準(zhǔn)函數(shù)庫的實(shí)現(xiàn)需要避免在同時(shí)修改統(tǒng)一序列中的不同成員時(shí)的數(shù)據(jù)競爭。
除非已使用別的方式做了聲明粘优,多個(gè)線程同時(shí)訪問一個(gè)流對(duì)象仇味、流緩沖區(qū)對(duì)象,或者C庫中的流可能會(huì)導(dǎo)致數(shù)據(jù)競爭雹顺。因此除非你能夠控制丹墨,絕不要讓兩個(gè)線程來共享一個(gè)輸出流。
你可以
- 等待一個(gè)線程一定的時(shí)間
- 通過互斥來控制對(duì)數(shù)據(jù)的訪問
- 通過鎖來控制對(duì)數(shù)據(jù)的訪問
- 使用條件變量來等待另一個(gè)線程的行為
- 通過future來從線程中返回值
std::condition_variable 類介紹
std::condition_variable 是條件變量嬉愧,更多有關(guān)條件變量的定義參考維基百科贩挣。Linux 下使用 Pthread 庫中的 pthread_cond_*() 函數(shù)提供了與條件變量相關(guān)的功能, Windows 則參考 MSDN。
當(dāng) std::condition_variable 對(duì)象的某個(gè) wait 函數(shù)被調(diào)用的時(shí)候王财,它使用 std::unique_lock(封裝 std::mutex) 來鎖住當(dāng)前線程卵迂。當(dāng)前線程會(huì)一直被阻塞,直到另外一個(gè)線程在相同的 std::condition_variable 對(duì)象上調(diào)用了 notification 函數(shù)來喚醒當(dāng)前線程绒净。
std::condition_variable 對(duì)象通常使用 std::unique_lock<std::mutex> 來等待见咒,如果需要使用另外的 lockable 類型,可以使用 std::condition_variable_any 類挂疆,本文后面會(huì)講到 std::condition_variable_any 的用法改览。
首先我們來看一個(gè)簡單的例子:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標(biāo)志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果標(biāo)志位不為 true, 則等待...
cv.wait(lck); // 當(dāng)前線程被阻塞, 當(dāng)全局標(biāo)志位變?yōu)?true 之后,
// 線程被喚醒, 繼續(xù)往下執(zhí)行打印線程編號(hào)id.
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 設(shè)置全局標(biāo)志位為 true.
cv.notify_all(); // 喚醒所有線程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
執(zhí)行結(jié)果如下:
concurrency ) ./ConditionVariable-basic1
10 threads ready to race...
thread 1
thread 0
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9
好了,對(duì)條件變量有了一個(gè)基本的了解之后缤言,我們來看看 std::condition_variable 的各個(gè)成員函數(shù)宝当。
std::condition_variable 構(gòu)造函數(shù)
default (1) | condition_variable(); |
---|---|
copy [deleted] (2) | condition_variable (const condition_variable&) = delete; |
std::condition_variable 的拷貝構(gòu)造函數(shù)被禁用,只提供了默認(rèn)構(gòu)造函數(shù)墨闲。
std::condition_variable::wait()介紹
unconditional (1) | void wait (unique_lock<mutex>& lck); |
---|---|
predicate (2) | template <class Predicate>void wait (unique_lock<mutex>& lck, Predicate pred); |
std::condition_variable提供了兩種 wait()函數(shù)今妄。當(dāng)前線程調(diào)用 wait()后將被阻塞(此時(shí)當(dāng)前線程應(yīng)g該獲得了鎖(mutex),不妨設(shè)獲得鎖 lck)鸳碧,直到另外某個(gè)線程調(diào)用 notify_*喚醒了當(dāng)前線程盾鳞。
在線程被阻塞時(shí),該函數(shù)會(huì)自動(dòng)調(diào)用 lck.unlock()釋放鎖瞻离,使得其他被阻塞在鎖競爭上的線程得以繼續(xù)執(zhí)行腾仅。另外,一旦當(dāng)前線程獲得通知(notified套利,通常是另外某個(gè)線程調(diào)用 notify_*喚醒了當(dāng)前線程)推励,wait()函數(shù)也是自動(dòng)調(diào)用 lck.lock(),使得 lck的狀態(tài)和 wait函數(shù)被調(diào)用時(shí)相同肉迫。
在第二種情況下(即設(shè)置了 Predicate)验辞,只有當(dāng) pred條件為 false時(shí)調(diào)用 wait()才會(huì)阻塞當(dāng)前線程,并且在收到其他線程的通知后只有當(dāng) pred為 true時(shí)才會(huì)被解除阻塞喊衫。因此第二種情況類似以下代碼:
while (!pred()) wait(lck);
請(qǐng)看下面例子:
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0;
bool shipment_available()
{
return cargo != 0;
}
// 消費(fèi)者線程.
void consume(int n)
{
for (int i = 0; i < n; ++i) {
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
std::cout << cargo << '\n';
cargo = 0;
}
}
int main()
{
std::thread consumer_thread(consume, 10); // 消費(fèi)者線程.
// 主線程為生產(chǎn)者線程, 生產(chǎn) 10 個(gè)物品.
for (int i = 0; i < 10; ++i) {
while (shipment_available())
std::this_thread::yield();
std::unique_lock <std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}
consumer_thread.join();
return 0;
}
程序執(zhí)行結(jié)果如下:
concurrency ) ./ConditionVariable-wait
1
2
3
4
5
6
7
8
9
10
std::condition_variable::wait_for()介紹
unconditional (1) | template <class Rep, class Period>cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time); |
---|---|
predicate (2) | template <class Rep, class Period, class Predicate>bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep, Period>& rel_time, Predicate pred); |
與 std::condition_variable::wait()類似跌造,不過 wait_for可以指定一個(gè)時(shí)間段,在當(dāng)前線程收到通知或者指定的時(shí)間 rel_time超時(shí)之前族购,該線程都會(huì)處于阻塞狀態(tài)壳贪。而一旦超時(shí)或者收到了其他線程的通知,wait_for返回寝杖,剩下的處理步驟和 wait()類似违施。
另外,wait_for的重載版本(predicte(2))的最后一個(gè)參數(shù) pred 表示 wait_for的預(yù)測(cè)條件瑟幕,只有當(dāng) pred條件為 false時(shí)調(diào)用 wait()才會(huì)阻塞當(dāng)前線程磕蒲,并且在收到其他線程的通知后只有當(dāng) pred
為 true時(shí)才會(huì)被解除阻塞留潦,因此相當(dāng)于如下代碼:
return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));
請(qǐng)看下面的例子,下面的例子中辣往,主線程等待 th線程輸入一個(gè)值愤兵,然后將 th線程從終端接收的值打印出來,在 th線程接受到值之前排吴,主線程一直等待,每個(gè)一秒超時(shí)一次懦鼠,并打印一個(gè) ".":
#include <iostream> // std::cout
#include <thread> // std::thread
#include <chrono> // std::chrono::seconds
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status
std::condition_variable cv;
int value;
void do_read_value()
{
std::cin >> value;
cv.notify_one();
}
int main ()
{
std::cout << "Please, enter an integer (I'll be printing dots): \n";
std::thread th(do_read_value);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) {
std::cout << '.';
std::cout.flush();
}
std::cout << "You entered: " << value << '\n';
th.join();
return 0;
}
std::condition_variable::wait_until 介紹
unconditional (1) | template <class Clock, class Duration>cv_status wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time); |
---|---|
predicate (2) | template <class Clock, class Duration, class Predicate>bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time,Predicate pred); |
與 std::condition_variable::wait_for 類似钻哩,但是 wait_until 可以指定一個(gè)時(shí)間點(diǎn),在當(dāng)前線程收到通知或者指定的時(shí)間點(diǎn) abs_time 超時(shí)之前肛冶,該線程都會(huì)處于阻塞狀態(tài)街氢。而一旦超時(shí)或者收到了其他線程的通知,wait_until 返回睦袖,剩下的處理步驟和 wait_until() 類似珊肃。
另外,wait_until 的重載版本(predicte(2))的最后一個(gè)參數(shù) pred 表示 wait_until 的預(yù)測(cè)條件馅笙,只有當(dāng) pred 條件為 false 時(shí)調(diào)用 wait() 才會(huì)阻塞當(dāng)前線程伦乔,并且在收到其他線程的通知后只有當(dāng) pred 為 true 時(shí)才會(huì)被解除阻塞,因此相當(dāng)于如下代碼:
while (!pred())
if ( wait_until(lck,abs_time) == cv_status::timeout)
return pred();
return true;
std::condition_variable::notify_one() 介紹
喚醒某個(gè)等待(wait)線程董习。如果當(dāng)前沒有等待線程烈和,則該函數(shù)什么也不做,如果同時(shí)存在多個(gè)等待線程皿淋,則喚醒某個(gè)線程是不確定的(unspecified)招刹。
請(qǐng)看下例:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0; // shared value by producers and consumers
void consumer()
{
std::unique_lock < std::mutex > lck(mtx);
while (cargo == 0)
cv.wait(lck);
std::cout << cargo << '\n';
cargo = 0;
}
void producer(int id)
{
std::unique_lock < std::mutex > lck(mtx);
cargo = id;
cv.notify_one();
}
int main()
{
std::thread consumers[10], producers[10];
// spawn 10 consumers and 10 producers:
for (int i = 0; i < 10; ++i) {
consumers[i] = std::thread(consumer);
producers[i] = std::thread(producer, i + 1);
}
// join them back:
for (int i = 0; i < 10; ++i) {
producers[i].join();
consumers[i].join();
}
return 0;
}
std::condition_variable::notify_all() 介紹
喚醒所有的等待(wait)線程。如果當(dāng)前沒有等待線程窝趣,則該函數(shù)什么也不做疯暑。請(qǐng)看下面的例子:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標(biāo)志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果標(biāo)志位不為 true, 則等待...
cv.wait(lck); // 當(dāng)前線程被阻塞, 當(dāng)全局標(biāo)志位變?yōu)?true 之后,
// 線程被喚醒, 繼續(xù)往下執(zhí)行打印線程編號(hào)id.
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 設(shè)置全局標(biāo)志位為 true.
cv.notify_all(); // 喚醒所有線程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
std::condition_variable_any介紹
與 std::condition_variable類似,只不過 std::condition_variable_any的 wait函數(shù)可以接受任何 lockable參數(shù)哑舒,而 std::condition_variable只能接受 std::unique_lock<std::mutex>類型的參數(shù)妇拯,除此以外,和 std::condition_variable幾乎完全一樣散址。
std::cv_status枚舉類型介紹
cv_status::no_timeout | wait_for 或者 wait_until 沒有超時(shí)乖阵,即在規(guī)定的時(shí)間段內(nèi)線程收到了通知。 |
---|---|
cv_status::timeout | wait_for 或者 wait_until 超時(shí)预麸。 |
參考資料
【c++11FAQ】互斥
【c++11FAQ】std::future和std::promise
【c++11FAQ】async()
【c++11FAQ】線程(thread)
C++11 多線程——KingsLanding
第五章 條件變量與線程同步
5.2 條件變量詳解
4.3 鎖類型詳解