由于多核的出現(xiàn),使用多線程能夠顯著提高性能鲜侥。
C++11之前褂始,C++并沒有對并發(fā)提供語言層面的支持,C++標(biāo)準(zhǔn)庫也沒有描函。C++11之后:
- 語言層面崎苗,定義一個內(nèi)存模型,保證在兩個不同線程中對兩個對象的操作相互獨立舀寓,增加了
thread_local
關(guān)鍵字胆数。 - 標(biāo)準(zhǔn)庫提供了對開啟多線程、同步多線程的支持互墓。
The High-Level Interface: async() and Futures
- async()
提供了接口讓一個可調(diào)用的對象(如某個函數(shù))在獨立的線程中運行幅慌。 - future<> 類
允許等待某個線程完成,并訪問其結(jié)果轰豆。
一個使用 async() 以及 Future 的例子
計算 func1() + func2()
:
如果是單線程,只能依次運行齿诞,并把結(jié)果相加酸休。總時間是兩者時間之和祷杈。
多核多線程情況下斑司,如果兩者獨立,可以分別運行再相加但汞∷薰危總時間是兩者時間的最大值。
#include <future>
#include <iostream>
#include <random> // for default_random_engine, uniform_int_distribution
using namespace std;
int doSomething (char c) {
// random-number generator(use c as seed to get different sequences)
std::default_random_engine dre(c);
std::uniform_int_distribution<int> id(10,1000);
for (int i=0; i<10; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush(); // output immediately
}
return c;
}
int func1() {
return doSomething('.');
}
int func2() {
return doSomething('+');
}
int main() {
cout << "start func1() in background, and func2() in foreground: " << endl;
future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
cout << "\nresult of func1()+func2(): " << result << endl;
}
注意到在主函數(shù)中私蕾,我們使用了如下步驟:
// instead of:
// int result = func1() + func2();
future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
我們使用 async() 使 func1() 在別的線程運行僵缺,并將結(jié)果賦值給 future。func2() 繼續(xù)在主線程運行踩叭,最后綜合結(jié)果磕潮。
future 對象的作用體現(xiàn)在:
- 允許訪問 func1 產(chǎn)生的結(jié)果翠胰,可能是一個返回值也可能是一個異常。注意 future 是一個模板類自脯,我們指定為 func1 的返回值類型 int之景。對于無返回值的類型我們可以聲明一個
std::future<void>
。 - 保證 func1 的執(zhí)行膏潮。async() 僅僅是嘗試開始運行傳入的 functionality锻狗,如果并沒有運行,那在需要結(jié)果的時候焕参,future 對象強制開始運行轻纪。
最后,我們需要使用到異步執(zhí)行的函數(shù)的結(jié)果的時候龟糕,會使用 get()
桐磁。如:
int result = result1.get() + result2;
當(dāng)我們使用 get()
的時候,可能發(fā)生以下三種情況:
- 如果 func1() 是使用 async() 在別的線程啟動的讲岁,而且已經(jīng)運行結(jié)束我擂,可以立即得到結(jié)果。
- 如果 func1() 啟動了但是還未結(jié)束缓艳,則 get() 會阻塞直到拿到結(jié)果校摩。
- 如果 func1() 還未啟動,則會強制啟動阶淘,這時候就表現(xiàn)得像一個同步執(zhí)行的程序衙吩。
可以看出,綜合使用
std::future<int> result1(std::async(func1));
result1.get()
使得無論是否允許多線程溪窒,程序都能順利完成坤塞。
為達到最佳使用效果,我們需要記住一條準(zhǔn)則“call early and return late”澈蚌,給予異步線程足夠的執(zhí)行時間摹芙。
async()
的參數(shù)可以是任何可調(diào)用對象:函數(shù),成員函數(shù)宛瞄,函數(shù)對象浮禾,或者 lambda。注意 lambda 后面不要習(xí)慣性的加上小括號份汗。
std::async([]{ ... }) // try to perform ... asynchronously
Using Launch Policies
有時候我們希望子線程立刻開始盈电,而不要等待調(diào)度。那么我們就需要使用 lauch policy 來顯式指定杯活,如果開始失敗银还,會拋出系統(tǒng)錯誤異常娇昙。
// force func1() to start asynchronously now or throw std::system_error
std::future<long> result1 = std::async(std::launch::async, func1);
使用 std::launch::async 我們就不必再使用 get() 了凌简,因為如果 result1 的生命周期結(jié)束了,程序會等待 func1 完成荡澎。因此,如果我沒有調(diào)用 get()晤锹,在退出 result1 的作用域時一樣會等待 func1 結(jié)束摩幔。然而,出于代碼的可讀性鞭铆,推薦還是加上 get()或衡。
同理,我們也可以指定子線程在 get() 時再運行车遂。
auto f1 = std::async(std::launch::deferred, func1);
Waiting and Polling
future 的 get() 方法只能被調(diào)用一次封断,在使用 get() 之后,future 實例就變?yōu)?invalid 的了舶担。而 future 也提供了 wait() 方法坡疼,可以被調(diào)用多次,并且可以加上時限衣陶。其調(diào)用形式為:
std::future<...> f(std::async(func));
f.wait_for(std::chrono::seconds(10)); // wait at most 10 seconds
std::future<...> f(std::async(func));
f.wait_until(std::system_lock::now() + std::chrono::minites(1)); // wait until a specific timepoint has reached
這兩個函數(shù)的返回值一樣柄瑰,有三種:
- std::future_status::deferred
func 還沒有開始執(zhí)行。 - std::future_status::timeout
func 開始執(zhí)行但是還沒完成 - std::future_status::ready
func 執(zhí)行完畢
綜合使用 launch policy 和 wait 方法的例子如下剪况,我們可以讓兩個線程分別進行計算教沾,然后等待一定時間后輸出結(jié)果。
#include <cstdio>
#include <future>
#include <iostream>
// defined here, lifetime of accurateComputation() may be longer than
// bestResultInTime()
std::future<double> f_slow;
double accurateComputation() {
std::cout << "Begin accurate computation..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Yield accurate answer..." << std::endl;
return 3.1415;
}
double quickComputation() {
std::cout << "Begin quick computation..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Yield quick answer..." << std::endl;
return 3.14;
}
double bestResultInTime(int seconds) {
auto tp = std::chrono::system_clock::now() + std::chrono::seconds(seconds);
// 立即開始
f_slow = std::async(std::launch::async, accurateComputation);
// 這兩句順序不可交換
double quick_result = quickComputation();
std::future_status f_status = f_slow.wait_until(tp);
if (f_status == std::future_status::ready) {
return f_slow.get();
} else {
return quick_result;
}
}
int main() {
using namespace std::chrono;
int timeLimit;
printf("Input execute time (in seconds):\n");
std::cin >> timeLimit;
printf("Execute for %d seconds\n", timeLimit);
auto start = steady_clock::now();
std::cout << "Result: " << bestResultInTime(timeLimit) << std::endl;
std::cout
<< "time elapsed: "
<< duration_cast<duration<double>>(steady_clock::now() - start).count()
<< std::endl;
}
// g++ -o async2_test async2.cpp -std=c++0x -lpthread
在上面程序中尤為值得注意有兩點译断。
我們沒有把 future 放在 bestResultInTime() 中授翻。這是因為如果 future 是局部變量,退出 bestResultInTime() 時孙咪,future 的析構(gòu)函數(shù)會阻塞直到產(chǎn)生結(jié)果堪唐。
wait 方法會阻塞等待,也就是說如果順序不對翎蹈,就會變成順序執(zhí)行羔杨,而非并行。
// 這兩句順序不可交換
double quick_result = quickComputation();
std::future_status f_status = f_slow.wait_until(tp);
如果交換杨蛋,則會先等待直到 timepoint,然后再執(zhí)行 quickComputation()理澎,變成串行程序逞力。
以上程序輸出結(jié)果為:
Input execute time (in seconds):
3
Execute for 3 seconds
Begin quick computation...
Begin accurate computation...
Yield quick answer...
Result: 3.14
time elapsed: 3.00111
Yield accurate answer...
非常值得注意的是 main 函數(shù)結(jié)束后并沒有立刻退出,而是等待 func 執(zhí)行完畢后 future 析構(gòu)糠爬。
給 wait_for() 方法傳入 0寇荧,相當(dāng)于立即獲取 future 的狀態(tài)≈此恚可以利用這點來得知某個任務(wù) 現(xiàn)在 是否開始了揩抡,或者是否還在運行户侥。
The Low-Level Interface: Threads and Promises
C++ 標(biāo)準(zhǔn)庫也提供了底層接口來啟動和處理線程,我們可以先定義一個線程對象峦嗤,以一個可調(diào)用對象來初始化蕊唐,然后等待或者detach。
void doSomething();
std::thread t(doSomething); // start doSomething in the background
t.join(); // wait for t to finish (block until doSomething() ends)
如同 async() 一樣烁设,我們可以用任何可調(diào)用對象來初始化替梨。但是作為一個底層的接口,一些在 async() 中的特性是不能使用的装黑。
- thread 沒有 launch policy副瀑。它總是立即開啟新線程運行 func。類似于使用了 std::launch::async恋谭。
- 沒有用于處理結(jié)果的接口糠睡。我們能獲得的只有 thread ID(利用
get_id()
方法)。 - 如果出現(xiàn)異常疚颊,程序會立即停止狈孔。
- 我們需要聲明我們是需要等待 thread 運行結(jié)束(使用
join()
),或者讓它自己在別的線程運行串稀。(使用detach()
) - 如果 main() 函數(shù)結(jié)束后除抛,線程還在運行,則所有線程都會強制地結(jié)束母截。(future 會等待結(jié)束再析構(gòu))
以下例子顯示了 join 和 detach 的區(qū)別到忽。
我們新建了一個線程輸出 +
,另外5個線程 detach 輸出字母清寇。按任意鍵將輸出 +
的線程 join喘漏。程序在等待 +
打印結(jié)束后,就會停止华烟,不管 detach 線程是否完成了任務(wù)翩迈。
#include <exception>
#include <iostream>
#include <random>
#include <thread>
void doSomething(int num, char c) {
try {
std::default_random_engine dre(c);
std::uniform_int_distribution<int> distribution(10, 1000);
for (int i = 0; i < num; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(distribution(dre)));
std::cout.put(c).flush();
}
} catch (const std::exception &e) {
std::cerr << "THREAD-EXCEPTION (thread " << std::this_thread::get_id()
<< e.what() << std::endl;
} catch (...) {
std::cerr << "THREAD-EXCEPTION (thread " << std::this_thread::get_id()
<< ")" << std::endl;
}
}
int main() {
try {
std::thread t1(doSomething, 5, '+');
std::cout << "- started fg thread " << t1.get_id() << std::endl;
for (int i = 0; i < 5; i++) {
std::thread t(doSomething, 10, 'a' + i);
std::cout << "- detach started bg thread " << t.get_id() << std::endl;
t.detach();
}
std::cin.get();
std::cout << "- join fg thread " << t1.get_id() << std::endl;
t1.join();
} catch (const std::exception &e) {
std::cerr << "EXCEPTION: " << e.what() << std::endl;
}
}
使用 detached 線程的時候,一定要注意盡量避免使用非局部變量盔夜。也就是說负饲,它所使用的變量都要和它的生命周期相同。因為我們 detach 之后就丟失了對它的控制權(quán)喂链,不能保證它會對其他線程中的數(shù)據(jù)做什么更改返十。盡量使用傳值,而不要傳引用椭微。
對于 static 和 global 變量洞坑,我們無法阻止 detached 線程使用他們。如果我們已經(jīng)銷毀了某個 static 變量和 global 變量蝇率,detached 線程仍然在訪問迟杂,那就會出現(xiàn) undefined behavior刽沾。
在我們上面的程序中,detached 線程訪問了 std::cin
, std::cout
, std::cerr
這些全局的流排拷,但是這些訪問時安全的侧漓,因為這些流會持續(xù)直到程序結(jié)束。然而攻泼,其他的全局變量不一定能保證火架。
Promises
現(xiàn)在我們需要考慮一個問題:如何才能在線程之間傳遞參數(shù),以及處理異常忙菠。這也是上層的接口何鸡,例如 async()
的實現(xiàn)需要考慮的。當(dāng)然牛欢,我們可以簡單地進行處理骡男,需要參數(shù)則傳入?yún)?shù),需要返回值則傳入一個引用傍睹。
但是隔盛,如果我們需要獲取函數(shù)的返回值或者異常,那我們就需要用到 std::promise
了拾稳。它就是對應(yīng)于 future
的底層實現(xiàn)吮炕。我們可以利用 set_value()
以及set_exception()
方法來設(shè)置 promise 的值。
#include <future>
#include <iostream>
#include <thread>
void doSomething(std::promise<std::string> &p) {
try {
std::cout << "read char ('x' for exception): ";
char c = std::cin.get();
if (c == 'x') {
throw std::runtime_error(std::string("char ") + c + " read");
}
std::string s = std::string("char ") + c + " processed";
p.set_value(std::move(s)); // use move to avoid copying
} catch (...) {
p.set_exception(std::current_exception());
}
}
int main() {
try {
std::promise<std::string> p;
std::thread t(doSomething, std::ref(p));
t.detach();
std::future<std::string> f(p.get_future());
std::cout << "result: " << f.get() << std::endl;
} catch (const std::exception &e) {
std::cerr << "EXCEPTION: " << e.what() << std::endl;
} catch (...) {
std::cerr << "EXCEPTION " << std::endl;
}
}
以上程序定義了一個 promise访得,用這個 promise 初始化了一個 future龙亲,并在一個 detached 的線程之中為其賦值(可能返回 string 也可能是個 exception)。賦值過后悍抑,future 的狀態(tài)會變成 ready鳄炉。然后調(diào)用 get() 獲取。
Synchronizing Threads
使用多線程的時候搜骡,往往都伴隨著并發(fā)的數(shù)據(jù)訪問拂盯,很少有線程相互獨立的情況。
The only safe way to concurrently access the same data by multiple threads without synchronization is when ALL threads only READ the data.
然而记靡,當(dāng)多個線程訪問同一個變量谈竿,并且至少一個線程會對它作出更改時,就必須同步了摸吠。這就叫做 data race空凸。定義為“不同線程中的兩個沖突的動作,其中至少一個動作是非原子的蜕便,兩個動作同時發(fā)生”。
編程語言贩幻,例如C++轿腺,抽象化了不同的硬件和平臺两嘴。因此,會有一個標(biāo)準(zhǔn)來指定語句和操作的作用族壳,而不是每個語句具體生成什么匯編指令憔辫。也就是說,這些標(biāo)準(zhǔn)指定了 what仿荆,而不是 how贰您。
例如,函數(shù)參數(shù)的 evaluation 順序就是未指明的拢操。編譯器可以按照任何順序?qū)Σ僮鲾?shù)求值锦亦,也可以在多次求值同一個表達式時選擇不同的順序。
因此令境,編譯器幾乎是一個黑箱杠园,我們得到的只是外表看起來一致的程序。編譯器可能會展開循環(huán)舔庶,整理表達式抛蚁,去掉 dead code 等它認(rèn)為的“優(yōu)化”。
C++ 為了給編譯器和硬件預(yù)留了足夠的優(yōu)化空間惕橙,并沒有給出一些你期望的保證瞧甩。我們可能會遇到如下的問題:
-
Unsynchronized data access
如果兩個線程并行讀寫同樣的數(shù)據(jù),并不保證哪條語句先執(zhí)行弥鹦。
例如肚逸,下面的程序在單線程中確保了使用 val 的絕對值:
if (val >= 0) {
f(val);
} else {
f(-val);
}
但是在多線程中,就不一定能正常工作惶凝,val 的值可能在判斷后改變吼虎。
-
Half-written data
如果一個線程讀取數(shù)據(jù),另一個線程修改苍鲜,讀取的線程可能會在寫入的同時讀取思灰,讀到的既不是新數(shù)據(jù),也不是老數(shù)據(jù)混滔,而是不完整的修改中的數(shù)據(jù)洒疚。
例如,我們定義如下變量:
long long x = 0;
新開一個線程 t1 更改變量的值:
x = -1;
在其他線程 t2 讀扰饔臁:
std::cout << x;
那么我們可能得到:
- 0(舊值)油湖,如果 t1 還沒有賦值
- -1(新值),如果 t1 完成了賦值
- 其他值领跛,如果 t2 在 t1 賦值的時候讀取
這里解釋一下第三類情況乏德,假設(shè)已在一個 32 位機器上,存儲需要 2 個單位,假設(shè)第一個單位已經(jīng)被更改喊括,第二個單位還沒有更改胧瓜,然后 t2 開始讀取,就會出現(xiàn)其他值郑什。
這類情況不止發(fā)生在 long long
類型上府喳,即使是基礎(chǔ)類型,C++ 標(biāo)準(zhǔn)也沒有保證讀寫是原子操作蘑拯。
-
Reordered statements
表達式和操作有可能會被改變順序钝满,因此單線程運行可能沒問題,但是多線程運行申窘,就會出錯弯蚜。
假設(shè)我們需要在兩個線程之中共享一個 long,使用一個 bool 標(biāo)志數(shù)據(jù)是否準(zhǔn)備好偶洋。
// 定義
long data;
bool readyFlag = false;
// 線程 A
data = 42; // 1
readyFlag = true; // 2
// 線程 B
while (!readyFlag) {
;
}
foo(data);
粗一看似乎沒有什么問題熟吏,整個程序只有在線程 A 給 data 賦值后,readyFlag 才會變 true玄窝。
以上代碼的問題在于牵寺,如果編譯器改變了 1,2 的語句順序(這是允許的恩脂,因為編譯器只保證在 一個 線程中的執(zhí)行是符合預(yù)期的)帽氓,那么就會出現(xiàn)錯誤。
The Features to Solve the Problems
為了解決上述問題俩块,我們需要下面的幾個概念黎休。
- **Atomicity: ** 不被中斷地、獨占地讀寫變量玉凯。其他進程無法讀取到中間態(tài)势腮。
- **Order: ** 確保某些語句的順序不被改變。
C++ 標(biāo)準(zhǔn)庫提供了不同的方案來處理漫仆。
可以使用
future
以及promise
來同時保證 atomicity 以及 order捎拯,它保證了先設(shè)置 outcome,再處理 outcome盲厌,表明了讀寫肯定是不同步的署照。使用
mutex
和lock
來處理臨界區(qū)(critical section)。只有得到鎖的線程才能執(zhí)行代碼吗浩。使用條件變量建芙,使進程等待其他進程控制的斷言變?yōu)?true。
Mutexes and Locks
互斥量是通過提供獨占的訪問來控制資源的并發(fā)訪問的對象懂扼。為了實現(xiàn)獨占訪問禁荸,對應(yīng)的進程 “鎖住” 互斥量,阻止別的進程訪問直到 “解鎖”。
一般使用 mutex 的時候赶熟,可能會這么使用:
int val
std::mutex valMutex;
// Thread A
valMutex.lock();
if (val >= 0) {
f(val);
} else {
f(-val);
}
valMutex.unlock();
// Thread B
valMutex.lock();
++val;
valMutex.unlock();
看起來似乎能夠正常運行品嚣,但是如果 f(val) 中出現(xiàn) exception,那么 unlock() 就不會執(zhí)行钧大,資源會被一直鎖住。
lock_guard
為解決這個問題罩旋,C++ 標(biāo)準(zhǔn)庫提供了一個在析構(gòu)時能夠釋放鎖的類型:std::lock_guard
啊央。實現(xiàn)了類似于 Golang 中的 defer mu.unlock()
的功能。上面的例子可以改進為:
// Thread A
...
{ //新的scope
std::lock_guard<std::mutex> lg(valMutex);
if (val >= 0) {
f(val);
} else {
f(-val);
}
}
// Thread B
{
std::lock_guard<std::mutex> lg(valMutex);
++val;
}
需要注意新開了一個作用域涨醋。確保 lg 能在合適的地方析構(gòu)瓜饥。
再看一個完整的例子。
#include <future>
#include <mutex>
#include <iostream>
#include <string>
std::mutex printMutex;
void print(const std::string& str) {
std::lock_guard<std::mutex> lg(printMutex); // lg 初始化時自動鎖定
for (char c : str) {
std::cout.put(c);
}
std::cout << std::endl;
} // lg析構(gòu)時自動解鎖
int main() {
auto f1 = std::async(std::launch::async, print, "Hello from first thread");
auto f2 = std::async(std::launch::async, print, "Hello from second thread");
print("Hello from main thread");
}
如果不加鎖會亂序打印浴骂。
unique_lock
有時候乓土,我們并不希望在鎖初始化的同時就上鎖。C++ 還提供了 unique_lock<> 類溯警,它與 lock_guard<> 接口一致趣苏,但是它允許程序顯式地決定什么時候、怎樣上鎖和解鎖梯轻。它還提供了 owns_lock()
方法來查詢是否上鎖食磕。使用更佳靈活,最常用的場景就是配合條件變量使用喳挑。具體例子在后面介紹 condition_variable 的部分彬伦。
處理多個鎖
這個的處理多個鎖并不是說挨個上鎖,而是假設(shè)一個線程執(zhí)行同時需要用到多個資源伊诵,應(yīng)該要么一起鎖上单绑,要么全都不鎖。否則很容易出現(xiàn)死鎖曹宴。
例如線程 A 和 B 都需要鎖 m1 和 m2搂橙,而線程 A 獲得了 m1,在請求 m2浙炼,而線程 B 獲得了 m2份氧,在請求 m1,這時候就會相互等待弯屈,發(fā)生死鎖蜗帜。
C++ 標(biāo)準(zhǔn)庫提供了 lock()
函數(shù)以解決上述問題。它的功能簡單來講就是要么都鎖资厉,要么都不鎖厅缺。
以下實現(xiàn)了一個簡單的銀行轉(zhuǎn)帳的例程。
#include <mutex>
#include <thread>
#include <iostream>
struct Bank_account {
explicit Bank_account(int Balance) : balance(Balance) {}
int balance;
std::mutex mtx;
};
void transfer(Bank_account& from, Bank_account& to, int amount) {
/*
// std::adopt_lock 假設(shè)已經(jīng)上過鎖,在初始化時不會再加鎖湘捎,但是保留了析構(gòu)釋放鎖的功能
std::lock(from.mtx, to.mtx);
std::lock_guard<std::mutex> lg1(from.mtx, std::adopt_lock);
std::lock_guard<std::mutex> lg2(to.mtx, std::adopt_lock);
*/
// equivalent approach:
std::unique_lock<std::mutex> ulock1(from.mtx, std::defer_lock);
std::unique_lock<std::mutex> ulock2(to.mtx, std::defer_lock);
std::lock(ulock1, ulock2); // 這里鎖的是封裝過后的 mutex
from.balance -= amount;
to.balance += amount;
}
int main() {
Bank_account a(100);
Bank_account b(30);
// 注意使用 std::ref
std::thread t1(transfer, std::ref(a), std::ref(b), 20);
std::thread t2(transfer, std::ref(b), std::ref(a), 10);
t1.join();
t2.join();
std::cout << " a now has " << a.balance << ", b now has " << b.balance << std::endl;
}
以上例程有三個重點诀豁。已經(jīng)在程序中中文標(biāo)注。
重點講一下 std::ref窥妇,這里如果不加 std::ref 會報錯舷胜。說一下個人理解。
雖然這里已經(jīng)在函數(shù)中說明了是引用:
void transfer(Bank_account& from, Bank_account& to, int amount);
但是我們實際是傳參數(shù)給了 std::thread 類的構(gòu)造函數(shù)活翩。它是一個模板類烹骨,默認(rèn)肯定是進行值傳遞的。因此我們有必要在這里聲明是引用傳遞材泄,這樣可以將模板改為引用沮焕。例如
template <typename T>
void foo (T val);
如果我們使用
int x;
foo (std::ref(x));
則模板類自動以 int& 為參數(shù)類型。
具體可以參見 std::reference_wrapper<> 類拉宗。
Condition Variables
有時峦树,不同線程運行的任務(wù)可能需要互相等待。因此旦事,除了訪問同一個數(shù)據(jù)魁巩,我們還有其他使用同步的場景,即邏輯上的依賴關(guān)系姐浮。
你可能會認(rèn)為我們已經(jīng)介紹過了這種機制:Futures 允許我們阻塞直到另一個線程執(zhí)行結(jié)束歪赢。但是,F(xiàn)uture 實際上是設(shè)計來處理返回值的单料,在這個場景下使用并不方便埋凯。
這里我們將介紹條件變量,它可以用于同步線程間的邏輯依賴扫尖。
在引入條件變量之前白对,為了實現(xiàn)這個功能,只能采用輪詢的方法换怖,設(shè)置一個時間間隔甩恼,不斷去檢查。例如:
bool readyFlag;
std::mutex readyFlagMutex; // wait until readyFlag is true:
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
while (!readyFlag) {
ul.unlock();
std::this_thread::yield(); // hint to reschedule to the next thread
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ul.lock();
}
} // release lock
這顯然不是一個好的方案沉颂,因為時間間隔設(shè)置太短和太長都不行条摸。但是它表現(xiàn)了條件變量的基本思想。就是在未滿足條件時铸屉,放棄對鎖和 cpu 資源的占有钉蒲,讓別的線程運行,等待條件滿足跳出 while彻坛。但是條件變量沒有采取輪詢而是采用一個信號通知顷啼。
一個典型應(yīng)用是消費者-生產(chǎn)者模型踏枣。
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <future> // for async
std::queue<int> q;
std::mutex mu;
std::condition_variable condVar;
void provider(int val) {
for (int i=0; i<6; i++) {
{
std::lock_guard<std::mutex> lg(mu);
q.push(val + i);
}
condVar.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(val));
}
}
void consumer(int id) {
while (true) {
int val;
{
std::unique_lock<std::mutex> ul(mu);
condVar.wait(ul, [](){return !q.empty();});
val = q.front();
q.pop();
}
std::cout << "consumer " << id << ": " << val << std::endl;
}
}
int main() {
// 3 個生產(chǎn)者
auto p1 = std::async(std::launch::async, provider, 100);
auto p2 = std::async(std::launch::async, provider, 300);
auto p3 = std::async(std::launch::async, provider, 500);
// 2 個消費者
auto c1 = std::async(std::launch::async, consumer, 1);
auto c2 = std::async(std::launch::async, consumer, 2);
}
Atomic
這是實現(xiàn) lock-free 的重要類型。非常值得深入了解钙蒙。
atomic 的效率比鎖要快很多茵瀑,在 linux 下大概快 6 倍。
具體應(yīng)用可以看我的 github 項目:使用mmap實現(xiàn)文件極速無鎖并行寫入
書中有一處錯誤:
經(jīng)過實驗躬厌,實際上返回的并不是 new value 而是 previous value马昨。具體可以參考cppreference。
而且還能用于實現(xiàn) spinlock扛施。
附錄
補充:C++11 中的隨機數(shù)生成方法
關(guān)于為什么要引入新的隨機數(shù)生成方法偏陪,參考這里。
標(biāo)準(zhǔn)流程如下所示煮嫌。
std::random_device rd; // 隨機數(shù)種子 generator
std::default_random_engine e(rd()); // 原始隨機數(shù) generator
std::uniform_int_distribution<> u(5,20); // 在 [5,20] 上的均勻分布
for ( size_t i = 0 ; i < 10 ; i ++ ) {
cout << u ( e ) << endl ; // 迫使原始隨機數(shù)服從規(guī)定的分布
}
一個簡單的例子,觀察其轉(zhuǎn)化關(guān)系抱虐。
#include <iostream>
#include <random> // for default_random_engine, uniform_int_distribution
using namespace std;
void randomPrint () {
// random-number generator(use c as seed to get different sequences)
std::random_device rd;
std::default_random_engine dre(rd());
std::uniform_int_distribution<int> id(10,100);
for (int i=0; i<10; i++) {
// random test
cout << dre() << " => " <<id(dre) << endl;
}
}
int main() {
randomPrint();
}
輸出為:
1337774351 => 49
354763686 => 93
1223972804 => 56
827960471 => 33
54361696 => 94
540202105 => 51
90156615 => 84
1553865488 => 64
780656749 => 21
134206787 => 74
更加詳細(xì)的可參考這里昌阿。