- 主要內(nèi)容
- 條件變量
- future
- async/packeged_task/promise
- shared_future
條件變量
std::mutex _mutex;
std::condition_variable _cv;
std::deque<std::string> _data;
void thread_process_data()
{
while(1){
std::unique_lock<std::mutex> lk(_mutex);
_cv.wait(lk, [](){return !_data.empty();});
std::string str = _data.front();
_data.pop();
lk.unlock();
do_something(str);
}
}
void thread_produce_data()
{
while(1){
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
{
std::lock_guard<std::mutex> lk(_mutex);
_data.push("hello cv");
}
_cv.notify_one();
}
}
等待條件的線程處理流程:
- 獲取一個
std::unique_lock<std::mutex>
用來保護(hù)共享變量 - 執(zhí)行
wait
(wait_for
或wait_until
)并级,這些函數(shù)的調(diào)用將以原子方式釋放互斥鎖并暫停線程執(zhí)行- 在釋放互斥鎖之前彻舰,這里會第一次檢查lambda返回值巧骚;
- 如果條件真,則線程從wait返回并繼續(xù)執(zhí)行画髓;
- 如果條件假系吩,則線程阻塞并同時釋放互斥鎖;
- 當(dāng)條件變量被通知、超時過期或出現(xiàn)虛假喚醒時線程被喚醒液肌,互斥鎖被原子地重新獲取。然后鸥滨,如果喚醒是假的嗦哆,線程應(yīng)該檢查條件并繼續(xù)等待。
- 喚醒時重新獲取互斥鎖
- 再次檢查lambda返回值
- 如果條件真婿滓,則線程繼續(xù)執(zhí)行老速;
- 如果條件假,則線程阻塞并同時釋放互斥鎖凸主;
使用std::unique_lock而不是std::lock_guard原因有二:
1橘券、等待的線程必須在等待時解鎖互斥鎖,并在等待之后再次鎖定它卿吐,而std::lock_guard不提供這種靈活性.
2旁舰、wait()的第一個參數(shù)要求的就是std::unique_lock的引用。
喚醒等待條件的線程:
- 獲取
std::lock_gurad<std::mutex>
- 執(zhí)行修改
- 執(zhí)行
notify_one
或notify_all
通知條件發(fā)生但两,等待條件的線程將被喚醒(通知不需要持有鎖)
condition variable適用于某個事件會反復(fù)的發(fā)生的情景鬓梅,在某些情景下線程只想等待一次事件為真,之后它將永遠(yuǎn)不會再等待這個事件發(fā)生谨湘。
future
async與future
async相比thread的不同:
- std::thread是一個類模板绽快,而std::async只是一個函數(shù)模板
- std::async返回std::future對象芥丧,讓異步操作創(chuàng)建者訪問異步結(jié)果.
- 調(diào)用std::thread總啟動一個新線程,且在新線程中立即執(zhí)行f
- 調(diào)用std::async不一定會啟動一個新線程坊罢,并且可以決定是否立即執(zhí)行f续担、延遲執(zhí)行、不執(zhí)行
async的policy參數(shù)
- 設(shè)置為launch::async, 將在新線程中立即執(zhí)行f
- 設(shè)置為launch::deferred活孩,async的f將延期執(zhí)行直到future對象被查詢(get/wait)物遇,并且f將會在查詢future對象的線程中執(zhí)行,這個線程甚至不一定是調(diào)用async的線程憾儒;如果future對象從未被查詢询兴,f將永遠(yuǎn)不會執(zhí)行。
- 設(shè)置為launch::deferred |std::launch::async,與實(shí)現(xiàn)有關(guān)起趾,可能立即異步執(zhí)行诗舰,也可能延遲執(zhí)行。
- launch::deferred和std::launch::async都沒設(shè)置训裆,C++14中為未定義
packaged_task與future
- 類模板std::packaged_task包裝任何Callable(函數(shù)眶根、lambda、bind表達(dá)式或其他函數(shù)對象)边琉。
- 與std::function不同的是属百,std::packaged_task提供了std::future.
- 其威力在于只要能夠訪問std::future,無論std::packaged_task作為對象被傳輸?shù)侥膫€線程中執(zhí)行变姨,都可以通過std::future獲取其結(jié)果族扰。
- 實(shí)例化的std::packaged_task本身也是一個Callable。它可以包裝在std::function對象中钳恕,也可以作為線程函數(shù)傳遞給std::thread别伏,或者傳遞給另一個需要Callable的函數(shù),甚至可以被直接調(diào)用忧额。
- std::packaged_task可以用作線程池或者任務(wù)隊列的構(gòu)建塊,作為“消息”在線程之間傳遞愧口。
包裝lambda
void task_lambda()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
//在當(dāng)前線程執(zhí)行std::pow(2,9)
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
包裝bind
int f(int x, int y) { return std::pow(x,y); }
void task_bind()
{
std::packaged_task<int(int,int)> task(bind(f,1,2));
std::future<int> result = task.get_future();
//在當(dāng)前線程執(zhí)行std::pow(2,9)
task(2, 9);
std::cout << "task_bind:\t" << result.get() << '\n';
}
作為線程函數(shù)
int f(int x, int y) { return std::pow(x,y); }
void task_thread()
{
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
//在新線程執(zhí)行std::pow(2,9)
std::thread t(std::move(task), 2, 9);
std::cout << "task_thread:\t" << result.get() << '\n';
}
等同于:
void equal_to_task_thread()
{
std::future<int> result = std::async(std::launch::async, f, 2, 9);
std::cout << "equal_to_async:\t" << result.get() << '\n';
}
promise與future
std::promise<T>
提供了一種設(shè)置值(類型為T)的方法睦番,稍后可以通過關(guān)聯(lián)的std::future
對象讀取該值。配對使用future
和promise
耍属,等待結(jié)果的線程可能會阻塞在future::wait()
或者future::get()
托嚣,而提供數(shù)據(jù)的線程可以使用配對的promise
來設(shè)置相關(guān)的值并使future
就緒。
一般的范式是:
想要獲取結(jié)果類型為T的線程構(gòu)造std::promise< T >厚骗,并通過promise::get_future獲取future對象
將promise作為參數(shù)傳遞給新的線程函數(shù)示启,在新的線程函數(shù)中通過promise::set_value設(shè)置結(jié)果
-
等待的線程通過future::wait等待結(jié)果誕生,通過future::get獲取結(jié)果领舰。
void do_work(arg, std::promise<T> promise_arg) { do_something(); promise_arg.set_value(T); } int main() { //使用promise<T>在線程之間傳遞結(jié)果 std::promise<T> one_promise; //構(gòu)造future對象 std::future<T> one_future = one_promise.get_future(); //promise作為參數(shù)傳遞給新的線程函數(shù) std::thread work_thread(do_work, some_arg, std::move(one_promise)); //等待結(jié)果 one_future.wait(); //獲取結(jié)果 std::cout << "result=" << one_future.get() << '\n'; work_thread.join(); }
promise不可拷貝夫嗓,不能直接將one_promise作為參數(shù)傳遞給新的線程函數(shù)迟螺。
異常與future
當(dāng)使用std::async、std::packeged_task時舍咖,如果線程函數(shù)函數(shù)矩父、被包裝的函數(shù)拋出異常,與future關(guān)聯(lián)的數(shù)據(jù)中將 儲存異常 而非結(jié)果排霉。
-
對于std::promise有點(diǎn)不一樣窍株,如果想存儲異常而非正常的結(jié)果,使用set_exception()替代set_value攻柠。
extern std::promise<double> some_promise; try { some_promise.set_value(calculate_value()); } catch(...) { //1使用current_exception提取calculate_value()中拋出的異常 //2使用set_exception()儲存異常 some_promise.set_exception(std::current_exception()); //或者 //some_promise.set_exception(std::copy_exception(std::logic_error("foo "))); }
當(dāng)異常被儲存球订,future將就緒,通過調(diào)用future::get()將重新拋出存儲的異常瑰钮。
待續(xù)
std::shared_future
-
std::future處理了線程之間傳輸數(shù)據(jù)所需的所有同步辙售,但是對std::future特定實(shí)例的成員函數(shù)調(diào)用不是線程安全的——本身如果從多個線程訪問單個std::future對象而不進(jìn)行額外的同步,那么將面臨數(shù)據(jù)競爭和未定義的行為飞涂。
std::future 模型對異步結(jié)果有唯一所有權(quán)旦部,只有一個線程可以提取到異步結(jié)果(通過get),在第一次調(diào)用get()之后较店,就已經(jīng)沒有要提取的值了士八。
-
std::shared_future可以讓多個線程可以等待相同的事件
std::future是可以moveable,對異步結(jié)果的唯一所有權(quán)可以在future實(shí)例之間轉(zhuǎn)移(通過移動構(gòu)造語義)梁呈,但是每次只有一個實(shí)例引用特定的異步結(jié)果婚度。但是std::shared_future實(shí)例是copyable,所以可以有多個shared_future對象引用相同的關(guān)聯(lián)狀態(tài).
-
單個shared_future對象上的成員函數(shù)仍然不同步官卡,從多個線程訪問單個對象時蝗茁,為了避免數(shù)據(jù)沖突,必須使用鎖來保護(hù)訪問
使用它的首選方法是獲取對象的副本寻咒,并讓每個線程訪問自己的副本哮翘。如果每個線程通過自己的std::shared_future對象訪問共享異步狀態(tài),那么從多個線程訪問該狀態(tài)是安全的毛秘。
-
通過future實(shí)例構(gòu)造shared_future實(shí)例饭寺,必須轉(zhuǎn)移所有權(quán)的方式進(jìn)行
std::promise<bool> p; std::future<bool> f = p.get_future(); //構(gòu)造shared_future方式一: std::shared_future<bool> sf(f); //error std::shared_future<bool> sf(std::move(f)); //move觸發(fā)移動構(gòu)造 //方式二: std::shared_future<bool> sf = f.share(); //方式三: std::shared_future<bool> sf(std::move(p.get_future())); assert(!f.valid()); assert(sf.valid());