C++11 最重要的新特性: 多線程
2 個(gè)好處
[1] 可寫(xiě) `平臺(tái)無(wú)關(guān)的 多線程程序, 移植性 提高`
[2] `并發(fā)` 提高性能
1 概述
1 并發(fā)
(1) 含義
`任務(wù)切換`
(2) 2 種方式
[1] `多進(jìn)程`
`進(jìn)程間通信: 信號(hào) / 套接字 / 文件 / 管道`
[2] `多線程`
2 并發(fā) 優(yōu)勢(shì)
(1) 關(guān)注點(diǎn)分離 ( SOC )`
線程間 交互
(2) ·算法/數(shù)據(jù) 并行 -> 提高性能`
[1] 數(shù)據(jù)相同 / 算法拆分
[2] 計(jì)算相同 / 數(shù)據(jù)拆分
3 C++11 多線程 特點(diǎn)
(1) `線程感知` 的 `內(nèi)存模型`
(2) 直接支持 `原子操作`
(3) `C++ 線程庫(kù) 性能` `接近 直接使用底層 API 時(shí)`
4 多線程入門(mén)
(2) 多線程
#include <iostream>
#include <thread> //(1)
void hello() //(2)
{
std::cout<<"Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); //(3)
t.join(); //(4)
}
1) 頭文件 <thread>
2) `線程 初始函數(shù)`: 新線程執(zhí)行入口
[1] `主 線程: main()`
[2] `其他 線程: std::thread 對(duì)象 ctor 中 指定`
3) std:thread 對(duì)象創(chuàng)建 完成 -> 開(kāi)始執(zhí)行新線程
4) join(): 讓 caller 等待 新線程 完成
2 管理線程: thread/join/detach/RAAI/std::ref/std::bind/move ownership
image.png
線索
1 啟動(dòng)線程, 等待它結(jié)束 or 放后臺(tái)運(yùn)行
2 給 線程函數(shù) 傳參
3 transfer ownership of thread
from current associated std::thread object to another
4 確定線程數(shù), 識(shí)別特殊線程
#1 基本 線程管理
1.1 啟動(dòng)線程
`線程 在 std::thread 對(duì)象創(chuàng)建 時(shí) 啟動(dòng)`
1 thread 對(duì)象
(1) 一旦構(gòu)造, 立即開(kāi)始執(zhí)行 `線程函數(shù)`
(2) 不可 copy, 可 move
copy/賦值 deleted
原因
線程對(duì)象 `copy/賦值` 可能 `弄丟已 join 的線程`
(3) 可能 `不 表示/關(guān)聯(lián) 任何線程` => safe to destroy & not joinable
after 4 場(chǎng)景之一
————————————————————————————————————————————————
[1] 默認(rèn)構(gòu)造
=> 無(wú) 線程 ( 函數(shù) ) 執(zhí)行
————————————————————————————————————————————————
[2] move from
=> 線程 move 給 other thread 對(duì)象去管理
————————————————————————————————————————————————
[3] detach
=> 線程 可能仍在執(zhí)行
————————————————————————————————————————————————
[4] join
=> 線程 執(zhí)行完成
————————————————————————————————————————————————
2 線程函數(shù)
[1] 不與 caller 通信時(shí), 若 拋出異常
std::terminate 被調(diào)用
以終止線程函數(shù)
[2] return value 或 異常 可 傳給 caller
2種方式
——————————————————————————
1] std::promise
2] 共享變量 (可能需 同步)
——————————————————————————
3 std::thread ctor 的 args: 可調(diào)用對(duì)象
(1) 入口點(diǎn)為 function
void f();
std::thread my_thread(f)
(2) 入口點(diǎn)為 `可調(diào)用對(duì)象`
可調(diào)用對(duì)象 copied into thread's storage
并 `invoked` from there
原始可調(diào)用對(duì)象 可被立即銷(xiāo)毀
問(wèn)題: `函數(shù)對(duì)象 臨時(shí)無(wú)名對(duì)象 作 std::thread ctor's arg`
會(huì)被 C++ 編譯器 解釋為 `函數(shù)聲明`
std::thread my_thread( F() );
3 種 解決
————————————————————————————————————————————————————
[1] 函數(shù)對(duì)象對(duì)象 外再加一層小括號(hào)
std::thread my_thread( (F() ) );
————————————————————————————————————————————————————
[2] 花括號(hào)
std::thread my_thread{ F() };
————————————————————————————————————————————————————
[3] lambda 表達(dá)式 啟動(dòng)線程
std::thread my_thread(
[]{ do_something(); }
);
————————————————————————————————————————————————————
4 join 還是 detach ?
(1) `必須用 + 何時(shí)用`
原因
`std::thread 對(duì)象 銷(xiāo)毀` 前線程 `沒(méi)被 joined 或 detached`
線程對(duì)象
——————————————————————————————————————
[1] joinable
——————————————————————————————————————
[2] dtor 調(diào) std::terminate() 結(jié)束程序
——————————————————————————————————————
(2) `detached 線程` 要在 `程序結(jié)束前 結(jié)束`
原因
main return 時(shí), 正在執(zhí)行的 detached 線程
——————————————————————————————————————
[1] 被暫停
——————————————————————————————————————
[2] 其 thread-local objects 銷(xiāo)毀
——————————————————————————————————————
(3) 用 join 還是 detach ?
————————————————————————————————————————————
[1] join
————————————————————————————————————————————
[2] 除非 你想 `更靈活`
并用 `同步` 機(jī)制 去 `等待` 線程完成
此時(shí) `detach`
————————————————————————————————————————————
5 std::terminate() in <exception>
被 C++ runtime 調(diào)用
——————————————————————————————————————————————
3 種 case
——————————————————————————————————————————
[1] std::thread 初始函數(shù) 拋出異常
——————————————————————————————————————————
[2] joinable 線程對(duì)象 `被銷(xiāo)毀` 或 `被賦值`
——————————————————————————————————————————
[3] `異常` 被拋出卻未被捕獲
——————————————————————————————————————————————
6 lifetime 問(wèn)題: 多(比單)線程 更易發(fā)生
`正在運(yùn)行的 detached 子線程`
access `已被 destroyed 的 object`
=> undefined behavior
|
| 如
|/
caller
——————————————————————————————————————————————————
[1] local 變量 ptr/ref -> 傳給 callee 線程 以 訪問(wèn)
——————————————————————————————————————————————————
[2] 已 return
——————————————————————————————————————————————————
[3] 相比 caller 線程, `callee 線程 lifetime 更長(zhǎng)`
=> `ptr/ref 懸掛` ( dangling )
|
|/
潛在的 訪問(wèn) 隱患
——————————————————————————————————————————————————
|
| 解決
|/
[1] 線程函數(shù) `自包含`
+
[2] `copy data into thread`
1.2 `等待 線程完成`
1 怎樣 `等待 線程完成`
關(guān)聯(lián)的 std::thread object 上調(diào) join()
2 join() 做了啥
清理 所關(guān)聯(lián)線程的 內(nèi)存
調(diào) 1 次 join 后
線程對(duì)象
[1] 不再關(guān)聯(lián) (已完成的)線程
[2] not joinable <=> joinable() == false
3 等待過(guò)程中 控制線程
——————————————————————
[1] 檢查線程是否完成
1] cv
2] futures 機(jī)制
——————————————————————
[2] 等待特定時(shí)間
——————————————————————
1.3 異常下的 等待
1 `call join() 的位置 要精挑細(xì)選`
(0) 問(wèn)題
call join() 前 拋出異常
join() 調(diào)用 被跳過(guò)
`lifetime problem`
(1) 解決 1
讓 join() 不被跳過(guò)
use try/catch, `catch 中 也調(diào) join()`
缺點(diǎn)
[1] try/catch 冗長(zhǎng)
[2] scope 易出錯(cuò)
(2) 解決 2
RAII
本質(zhì)含義
`讓 資源管理對(duì)象 own 資源, 在 析構(gòu)時(shí) 釋放資源
=> `資源的 lifetime 管理` 就和 `資源管理對(duì)象 lifetime 管理` 統(tǒng)一起來(lái)
=> 只要 `不泄漏 資源管理對(duì)象, 就能保證 不泄漏資源`
至于是 `讓 ctor 自己 創(chuàng)建資源`,
`還是 把資源創(chuàng)建好 再交給 ctor 保管`,
沒(méi)有優(yōu)劣之分
可用 `static Factory Method 創(chuàng)建資源對(duì)象`
`client 就不用管 里邊怎么實(shí)現(xiàn)了`
實(shí)現(xiàn)
線程對(duì)象
|
| 設(shè)
|/
保護(hù)類(lèi) thread_guard
[1] explicit ctor: 左值引用 para = std::thread& t_
|
| `線程對(duì)象` 作 arg 引用傳遞 構(gòu)造 `線程保護(hù)對(duì)象`
|/
[2] init. list 初始化 / 綁定
|
|/
[3] 左值引用 成員 = std::thread& t
=> `線程 的 ownership 沒(méi)有轉(zhuǎn)移`
=> `caller 可能 已對(duì)線程 調(diào) join()`
=> [4] thread_guard `dtor 中 調(diào) join() 前 必須 先判線程是否可 joinable`
=> 線程 `caller 函數(shù) 執(zhí)行 }`
即 ret 指令 `彈棧 逆序銷(xiāo)毀` local 對(duì)象 時(shí)
先 `銷(xiāo)毀 保護(hù)對(duì)象`
`調(diào) 保護(hù)對(duì)象 dtor 中 可能 調(diào) join()`
[5] copy 和 賦值 delete
因 `資源管理對(duì)象` 可能 `outlive 其管理的 `線程對(duì)象` 的 `scope`
//`RAII 等待線程完成
class thread_guard
{
private:
std::thread& t; // [3]
public:
explicit thread_guard(std::thread& t_) // [1]
:t(t_){} // [2]
~thread_guard()
{
// [4] 先 test joinable(), 再 join()
if( t.joinable() )
{
t.join();
}
}
// [5] copy 和 賦值 delete
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};
struct func; // defined in list2.1
void f()
{
int state = 0;
func f(state);
std::thread t(f);
// `新線程對(duì)象` 作 arg 引用傳遞 構(gòu)造 `線程保護(hù)對(duì)象`
thread_guard g(t);
do_something_in_current_thread();
}
(3) 解決 3
若 不必 等待線程結(jié)束
`detach & copy data into thread`
可避免 `異常安全` 問(wèn)題
`detach` 會(huì) `打破 線程 與 std::thread 對(duì)象` 間 `關(guān)聯(lián)`
只要 detach 線程 先于 main 函數(shù) 退出,
1.4 后臺(tái) / background 運(yùn)行線程
1 detach 線程: std::thread instance上調(diào) detach()`
`detached 線程` 特點(diǎn)
[1] 在后臺(tái)(typically 長(zhǎng)時(shí)間)運(yùn)行
稱(chēng) `守護(hù)線程`
應(yīng)用
1> 監(jiān)控文件系統(tǒng)
2> 實(shí)現(xiàn)發(fā)后即忘/fire and forget task:
只管消息發(fā)送, 不管消息接收
[2] 無(wú)法 獲得 關(guān)聯(lián)它的 std::thread object
=> 無(wú)法直接 communicate with it
it can `no longer be joined
=> 無(wú)法等待它完成`
[3] `ownership 和 control 被傳給 C++ runtime 庫(kù)`
保證了 線程關(guān)聯(lián)資源
在線程結(jié)束時(shí) 被正確回收
#2 傳參 給 線程函數(shù)
2.1 thread ctor 接口
——————————————————————————————————————————————————————————————————————————————————————————
線程函數(shù) | thread ctor 的 paraList
——————————————————————————————————————————————————————————————————————————————————————————
非成員函數(shù) | [1] callable obj [2] 相應(yīng) `函數(shù)調(diào)用運(yùn)算符` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
成員函數(shù) | [1] `成員函數(shù)指針` [2] 相應(yīng) 對(duì)象的指針 [3] 相應(yīng) `成員函數(shù)` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
2.2 內(nèi)部機(jī)制
1 `默認(rèn)下`
std::thread Ctor 實(shí)參
[1] copied
副本 在新線程 context 中 `隱式轉(zhuǎn)換 為 相應(yīng)參數(shù)類(lèi)型`
[2] 除非 用 `std::ref 等`
void f(std::string const& s);
std::thread t(f, "hello");
如
實(shí)參 `字符串 字面量`
[0] 是 ptr: char const*
[1] copied
[2] 副本 converted 為 std::string
[3] 表面上 引用傳遞
實(shí)際是 ptr copy 作 arg 調(diào) string Ctor
問(wèn)題
caller 函數(shù)
在 thread ctor 的 `ptr/ref 型 arg` 的
`副本 轉(zhuǎn)換為 相應(yīng)參數(shù)前 退出`
`所指對(duì)象銷(xiāo)毀`
`副本指針 懸掛`
=> undefined behavior
解決
[1] 先 強(qiáng)轉(zhuǎn) arg, 再 傳給 thread ctor`
[2] ptr 用 具有 `移動(dòng)語(yǔ)義` 的 對(duì)象 管理
std::thread t(f, std::string(buffer) );
2 std::ref() / std::bind wrap 想 `引用傳遞` 的 `arg`
|
| 想
|/
更新數(shù)據(jù)
[1] std::ref()
void f(Data& data);
Data data;
std::thread t(f, std::ref(data) );
[2] std::bind <functional>
std::thread ctor 與 std::bind 機(jī)制相同
X x;
std::thread t(&X::f, &x);
#3 轉(zhuǎn)移 線程 ownership
1 為什么要引出 move of std::thread
應(yīng)對(duì) 2 種場(chǎng)景
線程 ownership 轉(zhuǎn)交給
——————————————
[1] caller
[2] 其他 函數(shù)
——————————————
(1) `ownership moved out of a function`
`形似 pass by value, 實(shí)際卻是 move 語(yǔ)義`
std::thread caller()
{
void callee();
return std::thread(callee);
}
(2) `ownership moved into a function`
void g(std::thread t);
void f()
{
void thread_func();
g( std::thread(thread_func) );
}
2 move 機(jī)制
—————————————————————————————————————————
[1] 隱含自動(dòng) move
左 或 右運(yùn)算對(duì)象 至少一側(cè)為 右值
—————————————————————————————————————————
[2] 顯式 調(diào) std::move()
—————————————————————————————————————————
3 thread_guard 與 scoped_thread
————————————————————————————————————————————————————————————————————————————
是否轉(zhuǎn)移線程 ownship | 否 | 是
————————————————————————————————————————————————————————————————————————————
成員 | 左值引用 std::thread& t | 值 std::thread t
————————————————————————————————————————————————————————————————————————————
args 傳遞方式 | 左值引用 傳遞 | 值傳遞 + 移動(dòng)語(yǔ)義
————————————————————————————————————————————————————————————————————————————
dtor 中 call join 前 | |
是否 check joinable | 是 | 否
————————————————————————————————————————————————————————————————————————————
ctor 中 | |
是否 check joinable | 否: 啥也不干 | 是
————————————————————————————————————————————————————————————————————————————
#4 runtime 時(shí) 選擇 動(dòng)態(tài)數(shù)量的線程 : 用 線程 groups 來(lái) divide work
std::thread::hardware_concurrency()
硬件支持的并發(fā)線程數(shù) 的 hint
[1] 線程 vector
std::vector<std::thread> threads(n_threads);
[2] 啟動(dòng) n_threads 個(gè)子線程
for i = 0 ~ n_threads-1
threads[i] = std::thread(
f, block_start, block_end, std::ref(results[i] )
);
[3] wait 子線程
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join) );
#5 識(shí)別線程:目的 `把 data 或 behavior 與 線程 關(guān)聯(lián)起來(lái)`
1 線程 identifier 是 std::thread::id type
std::thread::id master_thread;
void f()
{
if( std::this_thread::get_id() == master_thread )
{
do_master_thread_work();
}
do_common_work();
}
2 線程ID 可作 關(guān)聯(lián)容器 中 `key`
3 在線程間 共享 data
image.png
image.png
2個(gè)目標(biāo)
[1] 避免潛在問(wèn)題
[2] 最大化收益
`并發(fā) 好處`
線程間 `共享數(shù)據(jù)` 容易、直接
1.1 線程間 共享 數(shù)據(jù)
雙向鏈表 delete node == update next + update prev
只 update next 時(shí), `invariant 被 打破了`
若此時(shí)另一線程 正 access this node & 沒(méi)做保護(hù)處理
=> 該 `race condition` 會(huì) 引起 bug
1 問(wèn)題: race condition 競(jìng)爭(zhēng)條件
`結(jié)果取決于 多個(gè)線程` 上 `操作` 執(zhí)行的 `相對(duì)順序`
多個(gè)線程 競(jìng)爭(zhēng)地去執(zhí)行 各自的 `操作`
|
|/
搶票
data race
對(duì)象 并發(fā)修改 對(duì)象
=> undefined behavior
2 解決
(1) `用 mutex wrap 數(shù)據(jù)結(jié)構(gòu)`
以保證 `只有 真正執(zhí)行修改的那個(gè)線程 可看到 invariants 被打破的狀態(tài)`
(2) lock-free 編程
修改 數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì) 及其 invariant
`分離 變化` 成 `n 個(gè) 不可分割的 更小變化`
每個(gè)小變化 保護(hù) 其 invariant
(3) STM / software transactional(事務(wù)) memory
數(shù)據(jù)庫(kù)更新
update 數(shù)據(jù)結(jié)構(gòu) == process a transaction/事務(wù)
`store` series of `數(shù)據(jù)修改和讀` to a `事務(wù)log`
commit it in a step
`commit 失敗, 事務(wù)重啟`
2.1 mutex 機(jī)制
1 `mut`ually `ex`clusive 互斥
保護(hù) 共享數(shù)據(jù)
access 前 lock 其關(guān)聯(lián)的 `mutex`
access 后 unlock
2 mutex 并不是 silver bullet 銀彈
更重要的是
[1] `設(shè)計(jì)代碼結(jié)構(gòu)` 來(lái) `保護(hù)數(shù)據(jù)` 2.2節(jié)
[2] `接口中內(nèi)在地避免 race conditions` 2.3節(jié)
3 mutex 問(wèn)題
[1] 死鎖
[2] 保護(hù)范圍 要多大
2.2 mutexes 使用
1 mutex 創(chuàng)建/lock/unlock
創(chuàng)建
上/解 鎖
成員函數(shù) lock()/unlock()
2 對(duì) mutex 不推薦 手工 調(diào) lock()
原因: mutex 是一種 資源
與 對(duì) thread 不推薦手工調(diào) join() 一樣
———————————————————————————————————————————————
資源 | 資源管理類(lèi) / RAII 類(lèi)
———————————————————————————————————————————————
thread 對(duì)象 | thread_guard 或 scoped_thread
———————————————————————————————————————————————
mutex 對(duì)象 | std::lock_guard
———————————————————————————————————————————————
|
|
| 重構(gòu)
|
| [1] 函數(shù) 作 public 成員
|
| [2] 被保護(hù)數(shù)據(jù) + mutex 作 private 成員
|/
2.3 Structuring code 以 保護(hù) 共享數(shù)據(jù)
1 問(wèn)題
`迷途( stray ) ptr/ref` 和 `后門(mén)`
若 成員函數(shù) 將 被保護(hù)數(shù)據(jù) 的 `ref/ptr 傳出` lock 的 scope
|
| 如
|/
——————————————————————————————————————————————————————————————————————————————————
[1] `隱晦地 傳出`
成員函數(shù) para 為 `( 用戶(hù)策略 ) 函數(shù)對(duì)象`
|
| 函數(shù)調(diào)用運(yùn)算符 operator() 的 para 為
|/
ref / ptr 參數(shù)
——————————————————————————————————————————————————————————————————————————————————
[2] 顯而易見(jiàn) 的 傳出
return
——————————————————————————————————————————————————————————————————————————————————
=> 留了 `后門(mén) ( backdoor )`
=> 任何能 訪問(wèn) ptr/ref 的 code 可
`繞過(guò)(bypass) mutex 的 保護(hù)` access `被保護(hù)數(shù)據(jù)`
|
|/
不用 lock mutex
(1) code 結(jié)構(gòu)
——————————————————————————————————————————————————————————
[1] ProtectedData + mutex 作 其 管理類(lèi)(DataWrapper) 成員
——————————————————————————————————————————————————————————
[2] 成員函數(shù) para 為 `函數(shù)對(duì)象`
接受 `用戶(hù)(惡意 malicious)策略函數(shù)`
|
| [3]
|/
——————————————————————————————————————————————
1] `引用參數(shù)` => 可 `取 被保護(hù)數(shù)據(jù) 的 ptr`
2] 繞過(guò) mutex 保護(hù) access 被保護(hù)數(shù)據(jù)
——————————————————————————————————————————————————————————
(2) 根因
沒(méi)有把 access 數(shù)據(jù) 的 所有代碼段 標(biāo)記為 `互斥`
`漏掉了 保護(hù) 傳出的 被保護(hù)數(shù)據(jù) 的 ref`
(3) 更深的 陷阱 ( pitfall )
接口函數(shù)間 調(diào)用順序( 接口設(shè)計(jì) ) 導(dǎo)致 race conditions — 2.3節(jié)
2.4 在接口本身中 發(fā)現(xiàn) ( Spotting ) race conditions
1 雙向 list
刪除 1個(gè) node
線程安全
阻止 前中后共 3個(gè)節(jié)點(diǎn) 的 并發(fā) accesses
問(wèn)題
單獨(dú)保護(hù) 每個(gè)節(jié)點(diǎn)
仍有 race conditions
解決
`單個(gè) mutex` 保護(hù) `整個(gè) 數(shù)據(jù)結(jié)構(gòu)(list)`
2 線程安全 的 棧
5個(gè)操作 時(shí)
(1) 問(wèn)題
調(diào)用順序 為 empty() -> top() -> pop() 時(shí), `not 線程安全`
[1] 線程1 empty() top() 間 線程2 pop()
empty() 判非空 -> pop() 使棧 空 -> top()
原因: 接口設(shè)計(jì)導(dǎo)致
[2] 線程1 top() pop() 間 線程2 top()
`2個(gè)線程 本打算 分別 處理 頂值和次頂值, 實(shí)際 都 處理 頂值`
次頂值 沒(méi)處理 就被移除了
(2) 解決
( 有 隱患 )
|\
|
[1] mutex + 聯(lián)合調(diào) top 和 pop
若 stack 對(duì)象 copy ctor 可能 拋出異常
Herb Sutter 給出 solution
[2] pop() = std::stack 的 empty() + top() + pop()
————————————————————————————————————————————————
1] `構(gòu)造 棧元素類(lèi)型 新值` + pass by reference
+
2] 賦值 給 構(gòu)造的新值 = `引用 實(shí)參/形參`
————————————————————————————————————————————————
缺點(diǎn)
1] 構(gòu)造 代價(jià)太高 for some type
2] popped value type 必須 `可 賦值`
許多 用戶(hù)定義類(lèi)型 不支持 assignment
[3] 用 copy ctor 或 move ctor 不拋出異常 的類(lèi)型
C++11 的 `右值引用`
使 更多類(lèi)型 move ctor 不會(huì)拋出異常, 雖然 copy ctor 會(huì)
缺點(diǎn)
不通用
用戶(hù)自定義類(lèi)型
1] copy ctor 可能 拋出異常
2] 沒(méi)有 move ctor
[4] `return ptr` 指向 popped item
`ptr 可安全 copy, 不會(huì) 拋出異常`
缺點(diǎn)
要內(nèi)存管理, 簡(jiǎn)單類(lèi)型時(shí) 開(kāi)銷(xiāo)大
用 std::shared_ptr
[5] 解決 [2] + [3] 或 [4]
接口簡(jiǎn)化
例 [2] + [4]
5個(gè)操作 變 3個(gè) 好處
empty empty
top
pop pop: 2 個(gè) 重載版本 [2] [4]
push push
swap
mutex 保護(hù) std::stack 上 `完整 3 操作` empty() -> top() -> pop()
——————————————————————————————————————————————————————————————————————————————————————————————————
| [4] | [2]
——————————————————————————————————————————————————————————————————————————————————————————————————
函數(shù)原型 | std::shared_ptr<T> pop() | void pop(T& value)
——————————————————————————————————————————————————————————————————————————————————————————————————
para | 空 | 模板參數(shù) T `引用類(lèi)型`
——————————————————————————————————————————————————————————————————————————————————————————————————
實(shí)現(xiàn)關(guān)鍵 | 似 placement new + args 轉(zhuǎn)發(fā) | 1] pop 的 caller 負(fù)責(zé) 構(gòu)造
取出 top 元素 | pop() 負(fù)責(zé) 構(gòu)造 | 2] 賦值 給 構(gòu)造的新值 = `引用 實(shí)參/形參` Note: 只是 引用參數(shù), 不是 取出/top 棧中 元素的引用
怎么辦 ? | std::shared_ptr<T> const res( | value = stk.top(); // T 可 賦值
用于`構(gòu)造 棧元素類(lèi)型 新值`| std::make_shared<T>( stk.top() ) ); |
——————————————————————————————————————————————————————————————————————————————————————————————————
記住 解決 [4] 易 => 解決 [2] 等
——————————————————————————————————————————————————————————————————————————————————————————————————
template< class T, class... Args >
shared_ptr<T> make_shared( Args&&... args ) // <memory>
T 非 數(shù)組
|\
| Note
|
構(gòu)造 T 型 object
wrap 進(jìn) std::shared_ptr
就像
::new (pv) T(std::forward<Args>(args)...) // placement new + args 轉(zhuǎn)發(fā)
|
| internal
|/
internal void*
// 解決 [4]: 記住 解決 [4] => 解決 [2] 等
template<typename T>
std::shared_ptr<T> threadsafe_stack::pop()
{
std::lock_guard<std::mutex> lock(m);
if( stk.empty() )
throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>( stk.top() ) );
stk.pop();
return res;
}
3 lock 粒度/范圍
————————————————————————————————————————————————————————————————————
[1] 大 + 1個(gè) mutex | 并發(fā)增益小
————————————————————————————————————————————————————————————————————
[2] 小 + 1個(gè) mutex | 沒(méi)有完全保護(hù)期望的操作 => race condition
————————————————————————————————————————————————————————————————————
[3] 更小 + 多個(gè) mutex | 死鎖: 兩線程相互等待, 結(jié)果誰(shuí)也不往前走
————————————————————————————————————————————————————————————————————
2.5 死鎖 解決
1 死鎖: 2個(gè)線程, 2個(gè)mutex
2線程 都要 lock 2個(gè) mutex 才能執(zhí)行
2個(gè)線程 各鎖 1個(gè)mutex 時(shí), 都在 等對(duì)方鎖住的 mutex 被釋放
2個(gè)線程都在死等
場(chǎng)景: 無(wú)鎖 case 下的 死鎖
`2 個(gè) thread func` 在 對(duì)方 `線程對(duì)象` 上 調(diào) join()
|
| std::thread t1;
|/ std::thread t2(f2, std::ref(t1) );
para 為 `線程對(duì)象 ref` t1 = std::thread(f1, std::ref(t2) );
void f1(std::thread& t_) { t_.join(); }
死鎖: 都等 對(duì)方結(jié)束 void f2(std::thread& t_) { /* 休眠 5秒 */ t_.join(); }
2.6 避免死鎖 的方法
思想
線程1 可能等 線程2 時(shí), 線程2 就不要 等線程 1 了
1 避免 嵌套鎖
2 當(dāng) 持有1個(gè)鎖 時(shí), 避免調(diào) `用戶(hù)代碼`
|
|/
可能 獲得 另一個(gè) 鎖
=> 可能會(huì) 嵌套鎖
3 每個(gè)線程 以 相同順序 獲得多個(gè)鎖
總在 mutex1 前 鎖住 mutex2
[1] 適用于: 不同 mutexes for 不同 purposes
|
| [2] 問(wèn)題: 不能 解決 的 死鎖
|/
2個(gè) mutex 保護(hù) 同 class 2個(gè) instance
swap
|
| 解決
|/
std::lock() + std::lock_guard + std::adopt_lock 作 第 2 參數(shù)
| |
| |/
1次 lock 多個(gè) mutex 告訴 std::lock_guard Ctor
[1] 其 第1參數(shù) mutex 已上鎖
[2] 構(gòu)造時(shí)只需 接收 mutex 上 mutex 的 ownership
| [3] 問(wèn)題: 不能 解決 的 死鎖
|
| 多個(gè)鎖 被 `分開(kāi) 獲得`
|/
4 鎖層次
[1] application 分 多層
[2] 某層 已擁有 某 mutex 上的鎖
就 `不允許 上一層 lock 另一 mutex`
2.7 std::unique_lock 實(shí)現(xiàn)更靈活的 locking
1 思想
松弛 invariants
并不總是擁有 mutex
代價(jià)
內(nèi)部 要存/更新 1個(gè) flag 來(lái)標(biāo)識(shí)是否擁有 mutex
2 std::defer_lock 作 第2參數(shù), 告訴 std::unique_lock Ctor, 構(gòu)造時(shí) `mutex unlocked`
3 std::unique_lock 對(duì)象
2種方法 加鎖
1) 其 上調(diào)用 lock()
2) 傳給 std::lock()
因?yàn)?std::unique_lock 提供 3 個(gè)成員函數(shù)
lock() try_lock() unlock()
4 std::lock_guard & std::unique_lock
同 都可
RAII 鎖管理
異
1) 成員函數(shù)
前者 只有 ctor & dtor: ctor 中 加鎖 + dtor 中 解鎖
后者 還有 3 個(gè)成員函數(shù) lock() try_lock() unlock() => 更靈活
2) 是否可 `管理 mutex 的 lifetime`
否/是
=>前者 最好將 mutex 設(shè)為 global 變量
swap 的 std::unique_lock 版本
std::unique_lock + std::defer_lock 作 第2參數(shù) + std::lock(uniLkm1, uniLkm2);
|
|/
告訴 std::unique_lock Ctor, 構(gòu)造時(shí) `mutex unlocked`
2.8 轉(zhuǎn)移 mutex ownership between scope
1 mutex 的 ownership 可在 std::unique_lock 間 move
std::unique_lock: movable 但 not copyable
2 std::unique_lock 3 種靈活應(yīng)用
(1) 函數(shù) lock mutex, transfer 該 mutex 的 ownership 給其 caller 的 lock
(2) lock 不直接返回, 而是作 gateway class 的 mem,
所有 access to data 都通過(guò) gateway 類(lèi)
gateway
[1] destory 時(shí), releases lock
[2] movable
(3) std::unique_lock 可在 `銷(xiāo)毀前 釋放 其 管理的 mutex`
|
|
|/
unlock() 中
=> 提高性能
2.9 合適粒度下 上鎖
`真正 訪問(wèn) 共享數(shù)據(jù) 時(shí) 再 lock` mutex
3.1 `初始化期間` 保護(hù) 共享數(shù)據(jù)
1 延遲 初始化
共享數(shù)據(jù) 只在 `初始化 并發(fā) access 階段` 才用 mutex 保護(hù)
|
| 如: 打開(kāi) 數(shù)據(jù)庫(kù)連接
|/
構(gòu)造代價(jià)
(1) 單檢測(cè): `資源 (指針)` 是否 已初始化
1) 思想
資源 使用前檢測(cè)
若 沒(méi)初始化
先 初始化, 再 解鎖, 再 使用
|
|/
spr.reset(new R);
2) code 結(jié)構(gòu)
—————————————————————————————————————————————————
[1] 加鎖 => 所有線程 串行
[2] 檢測(cè)
[3] 初始化
[4] 解鎖
[5] 使用 資源指針
—————————————————————————————————————————————————
加/解 鎖
std::unique_lock + 其 成員函數(shù) unlock()
—————————————————————————————————————————————————
3) 問(wèn)題
第1次檢測(cè) 前 加鎖 => 檢測(cè)處 `串行` => `并發(fā)性差`
|
| (半)解決
|/
(2) 雙檢測(cè)鎖 ( Double-Checked Locking )
1) 思想
`第2次檢查 前 才加鎖`
2) code 結(jié)構(gòu)
—————————————————————————————————————————————————
[1] 第1次檢測(cè)
[2] 加鎖
[3] 第2次檢測(cè)
—————————————————————————————————————————————————
加/解鎖
std::lock_guard + scope 結(jié)束 Dtor 解鎖
—————————————————————————————————————————————————
3) 問(wèn)題
1條 new 語(yǔ)句 分解為 3 條底層語(yǔ)句
——————————-————
[1] 分配 內(nèi)存
[2] 構(gòu)造
[3] 指針賦值
————————————————
compiler 可能重排執(zhí)行順序?yàn)?[1][3][2]
線程 1 資源 `構(gòu)造未完成 -> 資源 ptr 就 已賦值`
線程 2 檢測(cè) 資源 ptr 非空
`訪問(wèn) 初始化未完成` 的 資源
data race 型 race condition
undefined behavior
|
| 解決
|/
(3) std::call_once(std::once_flag, 資源初始化函數(shù), func_args)
[1] 多線程 `并發(fā)進(jìn)入` std::call_once 函數(shù) + `同步`
但 `只有1個(gè)線程 ( active 線程 ) 真正執(zhí)行` 第 2 arg: (資源初始化) `函數(shù)`
其他線程 ( passive/被動(dòng) 線程 ) 進(jìn)入 std::call_once 后
`等待` active 線程 完成初始化 + 返回 后, 再返回
|
|/
同步
[2] Note
當(dāng)前線程 從 std::call_once 返回時(shí),
資源初始化 已完成, 但可能由 另一線程 完成
[3] std::call_once 參數(shù)
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args ) // <mutex>
——————————————————————————————————————————————————————
1] 第1參數(shù) std::once_flag
——————————————————————————————————————————————————————
2] 其余參數(shù) 與 std::thread Ctor
——————————————————————————————————————————————————
1> 形式 同 `右值引用 args`
——————————————————————————————————————————————————
2> 目的 不同 `避免 copy, 而不是 實(shí)現(xiàn) move 語(yǔ)義`
因?yàn)?不必轉(zhuǎn)移到 另一線程 去執(zhí)行
——————————————————————————————————————————————————————
[4] std::once_flag
功能相當(dāng)于 std::mutex
同 `不能 copy`
2 `static local 變量` 初始化 時(shí) race condition
(1) static 局部變量 初始化
[1] 可能 真正在多個(gè)線程上 進(jìn)行(C++11 前編譯器) => problematic race condition
[2] 只能 真正在1個(gè)線程上 進(jìn)行 (C++11 compiler 解決)
=> race condition 只是說(shuō) `哪個(gè)線程 去 初始化`
(2) 應(yīng)用
`單例`
需要 `單個(gè) 全局 instance` 時(shí)
實(shí)現(xiàn) `像 std::call_once 一樣` 的 功能
3.2 很少更新 的 數(shù)據(jù)結(jié)構(gòu) 的 保護(hù)
boost 庫(kù) boost::shared_mutex
第4章 同步
并發(fā)操作 cv & future
& async & packaged_task & promise
image.png
image.png
image.png
image.png
多線程 2個(gè)要點(diǎn)
[1] 保護(hù) `共享數(shù)據(jù)`
第 3 章
[2] 線程 `同步`
線程 需 `等待` 另一線程 完成某任務(wù)后, 再去完成 自己的任務(wù)
實(shí)現(xiàn)
cv & futures
|
| cv: 類(lèi) std::condition_variable
|/
條件變量/condition variables
#1 循環(huán)等待 事件/條件: cv
##1.1 最簡(jiǎn)單的 方法
(1) code 結(jié)構(gòu) // 等待線程
|—— ——> `標(biāo)志變量 flag` = true /false 表示條件 是/否滿(mǎn)足
加鎖 | - - - - -
| |
check 條件 是否滿(mǎn)足 |
| => 周期性 ( 即 循環(huán) )
否 | +
| 等待
解鎖 - - - - - |\
|
sleep 固定時(shí)間 // Note: sleep 期間, other 線程 可獲得鎖 并 `modify 條件 使?jié)M足`
是
函數(shù)退出 => 自動(dòng)解鎖
(2) 問(wèn)題
sleep 多久 難確定
太短: 浪費(fèi)太多處理時(shí)間
太長(zhǎng): 事件早已發(fā)生, 等待線程還在 sleep
1] 丟幀 快節(jié)奏游戲 中
2] 過(guò)多消耗 時(shí)間片 實(shí)時(shí)應(yīng)用 中
(3) 解決
cv
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100) );
lk.lock();
}
}
##1.2 cv class
1 `同步` 基礎(chǔ)
可用于 同時(shí) `block/阻塞 多個(gè)線程`
until 通知線程
1] 修改 a `shared variable`
|
| 用于表示
|/
條件
2] notifies cv
——————————————————————————————
通知線程 modify & notify cv
|
shared variable
|
等待線程 check & wait on cv
——————————————————————————————
2 2 個(gè)問(wèn)題
(1) 問(wèn)題1
精確調(diào)度事件
=> 可能 `notify called on destroyed cv`
解決1
notify under lock 可能必要
(2) 問(wèn)題2
notify 發(fā)送時(shí), 等待線程 沒(méi)有 waiting on cv
=> `錯(cuò)過(guò) notification`
原因
————————————————
shared variable
————————————————
1] `atomic`
————————————————
2] 未加鎖
————————————————
Note
atomic copy delete
=>
不能用 cv 上 predicated wait 版本
即 不能為
cv.wait(lk, [] { return proceed; } ); // std::atomic<bool> proceed(false);
解決
1) 用 cv 上 non-predicated wait 版本
while ( !proceed ){ cv.wait(lk); }
2) shared variable 即使是 atomic
修改 也要 加鎖
3 cv 類(lèi)
[1] copy deleted
[2] notify_one & notify_all
[3] wait 2個(gè)重載
1] non-predicated 版本 (如) while ( !proceed ){ cv.wait(lk); }
非循環(huán)
`釋放` lock
`阻塞` current thread & 加到 cv 上的 `等待線程 隊(duì)列`
notified 或 spuriously wake up
`解阻塞`
`relock` + `wait exits`
2] predicated wait 版本 (如) cv.wait(lk, [] { return proceed; } );
循環(huán) + 條件
|
|/
滿(mǎn)足 = p() == true 時(shí), 返回
————————————————————————————————————————————
與 non-predicated 版本
————————————————————————————————————————
區(qū)別
僅多了1步
relock 后, (循環(huán)) 檢測(cè) 條件是否滿(mǎn)足
————————————————————————————————————————
聯(lián)系
視為 循環(huán)調(diào) non-predicated 版本
————————————————————————————————————————————
template<typename Pred>
void std::condition_variable::wait(unique_lock<mutex>& uniLk, Pred pred)
{
while ( !pred() )
wait(uniLk); // 視為 non-predicated 版本
}
//等價(jià)于
while ( !pred() )
{
wait(uniLk);
}
4 等待線程
(1) notify_one
通知線程 + 多個(gè)等待線程
沒(méi)必要 持有 the same mutex
否則
`悲觀鎖`
hurry up and wait
原因
awakened 后 可能無(wú)法 獲得 mutex ( 被 通知線程 占著 )
|
| 解決
|/
pthreads 實(shí)現(xiàn)
1] `識(shí)別` 悲觀鎖場(chǎng)景
2] `notify 內(nèi)` 把 等待線程
從 `cv 上的 隊(duì)列` 轉(zhuǎn)移到 `mutex 上的 隊(duì)列`
而 `不 喚醒它`
(2) notify_all
`驚群現(xiàn)象`
事件發(fā)生
`喚醒所有` 所有等待線程
但 `只能有1個(gè)線程 獲得事件處理權(quán)`
`其他線程 重新 陷入等待`
5 spurious wakeup / 假喚醒
notify_one/notify_all 之外的因素導(dǎo)致的
`wait 被喚醒, 但 條件不滿(mǎn)足`
|
| 解決
|/
predicate wait 版本
[1] 只影響 non-predicated wait
[2] 不影響 predicated wait
6 應(yīng)用
//------ App
threadsafe_queue<T> que;
void producer()
{
while( more_data_to_prepare() )
{
T const data = prepare_data();
que.push(data); // [1] 真正 push 前 加鎖: que.push() 中 internal std::queue 真正 push 前
}
}
void consumer()
{
while(true)
{
T data;
que.wait_and_pop(data);
process(data); // [2] process 前 解鎖: wait_and_pop() scope 結(jié)束 + 自動(dòng)解鎖
if( is_last_chunk(data) )
break;
}
}
// 記住這 1個(gè) 足夠 => others
template<typename T>
std::shared_ptr<T>
threadsafe_queue::wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
// 捕獲 this 目的: 可在 lambda 中用 當(dāng)前類(lèi)的 成員
cv.wait(lk,
[this]{ return !que.empty();} );
std::shared_ptr<T> res(
std::make_shared<T>( que.front() ) );
que.pop();
return res;
}
#2 等待 `1次性事件`: future
線程 以某種方式
獲得 future -> 表示事件
場(chǎng)景: 等待航班
##2.0 std::future
`等待 被 異步設(shè)定 的 value`
template< class T > class future; (1)
template< class T > class future<T&>; (2)
template<> class future<void>; (3)
1 異步 機(jī)制
準(zhǔn)備好
異步操作 - - - -> result + 修改 shared state
| /\ |
| 提供 / hold | link
|/ / |
std::future 對(duì)象 —— —— —— —— —— —— ——
| |\
| 給 | query / wait for / extract result: 阻塞, 直到 result 準(zhǔn)備好
|/ |
creator
std::async / std::packaged_task / std::promise
future shared_future async packaged_task promise
除 async 為 為 function template 外, 其余為 class template
##2.1 std::async
`異步 runs a function` ( 可能 in a new thread)
返回 std::future
future 上調(diào) get(), 當(dāng)前線程 阻塞, 直到 future ready, 然后 return result
傳參
與 std::thread 傳參方式相同
##2.2 std::packaged_task
std::packaged_task<> obj 綁定 future to callable object
task.get_future() 返回 future 對(duì)象
##2.3 std::promise
1 針對(duì)2種場(chǎng)景
[1] task 不能被表達(dá)為 函數(shù)調(diào)用
[2] 結(jié)果 來(lái)自多個(gè)地方
2 機(jī)制
// 3大成員函數(shù)
[1] get_future()
返回 promised 結(jié)果 關(guān)聯(lián)的 future
[2] set_value() : 主動(dòng)線程
會(huì) make state/future ready
[3] fut.get() : 被動(dòng)線程
第5章 C++內(nèi)存模型 和 原子類(lèi)型操作
用 原子操作 在 非原子操作間 迫使順序.png
順序一致 seq_cst:虛線 暗含 順序關(guān)系.png
relaxed.png
2個(gè) consumer 線程間若無(wú) release 語(yǔ)義 => 2個(gè) consumer 線程 data race. 虛線: release sequence/實(shí)線: happens-before 關(guān)系.png
C++ `原子類(lèi)型` 如何用于
[1] 線程間 `同步`
[2] `lock-free` 數(shù)據(jù)結(jié)構(gòu)
#1 C++11新標(biāo)準(zhǔn): 有 `多線程意識(shí)` 的 `內(nèi)存模型`
##1.1 `兩個(gè)線程 access 間 強(qiáng)迫順序`
2種辦法
(1) mutex
(2) 原子操作 的 同步特性: 迫使
##1.2 修改順序
1 對(duì)象 寫(xiě)過(guò)程中 不允許 寫(xiě)或讀
但每次寫(xiě)/讀 由 哪個(gè)線程來(lái)做, 沒(méi)有規(guī)定
內(nèi)核 線程調(diào)度 決定
2 coder 還是 編譯器 保證
(1) 非原子類(lèi)型: 由 coder 在代碼中保證
(2) 原子類(lèi)型: 編譯器保證
#2 C++中 原子 操作/類(lèi)型
`原子: 不可分割, 要么不執(zhí)行, 要么執(zhí)行完`
#2.1 標(biāo)準(zhǔn)原子類(lèi)型 <atomic>
`內(nèi)存順序參數(shù)`, 以指定 `內(nèi)存順序語(yǔ)義`
#3 同步操作 和 迫使順序
思想
`用 原子操作 在 非原子操作間 迫使順序`
=> 線程間 `同步/強(qiáng)加順序`
1 寫(xiě)/讀 線程
寫(xiě)線程1
填/populate 數(shù)據(jù)結(jié)構(gòu) -> data ready -> 原子變量 set flag
讀線程2:
直到 flag 被 set, 才 讀
#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);// [1] flag
void reader_thread()
{
while( !data_ready.load() ) // [2]
{
std::this_thread::sleep(std::milliseconds(1));
}
std::cout<<”The answer=”<< data[0] <<”\n”;
}
void writer_thread()
{
data.push_back(42);
data_ready = true; // [3]
}
2 synchronizes-with 關(guān)系 ( => happens-before ) + sequenced-before
線程間 (操作間) synchronizes-with ( 在...之前 ) 關(guān)系
=> `線程間 (操作間) happens-before` 關(guān)系
+ `線程內(nèi) sequenced-before` 關(guān)系
=> `執(zhí)行順序 傳遞性`
3 `內(nèi)存 ordering`: 用于 `原子操作`
內(nèi)存 排序 => synchronizes-with 關(guān)系
`3種模型 6種 memory ordering`
memory_order_
seq_cst // 1 順序一致: 默認(rèn) 最嚴(yán)格
relaxed // 2
//3 acquire-release
consume
acquire // acquire 操作 : 原子 load
release // release 操作 : 原子 store
acq_rel // acquire / release / both : 原子 read-modify-write ( fetch_add() / exchange() )
怎么選褐健?根據(jù)它們 對(duì)程序 behavior 的影響
[1] 順序一致 seq_cst
1) `多線程` 像 單線程
`所有線程 看到的 操作執(zhí)行 順序相同`
2) 從`同步` 角度 看
同一 原子變量, `所有線程` 間 `全局同步`
3) 優(yōu)劣
優(yōu): 最簡(jiǎn)單直觀
劣: `所有線程間 全局同步 -> 耗時(shí) / 耗資源`
[2] relaxed
線程內(nèi), 是 單線程 happens-before
`線程間, 線程1 看到 線程2 中的操作是 out of order/亂序的`
`未必是 線程2自己 看到的順序(代碼表現(xiàn)的順序)`
|
| 對(duì) relaxed ordering 在操作間 force 同步/順序關(guān)系
|/
[3] acquire-release
通過(guò) 2 個(gè)線程間 `局部順序` 強(qiáng)迫出 `全局順序`
| |
| 同一 |
|/ |/
原子變量 `令 release 操作(線程) synchronizes-with acquire 操作(線程)`
| |
store load
線程間同步
用 `1個(gè) 原子變量` 上
`2個(gè) 線程` 間 `release / (循環(huán)) acquire 操作`
來(lái) `force` 線程間 順序關(guān)系
————————————————————————————————————————————————————————————————————————————————————
release 線程 acquire 線程
release 前 操作
|
| 1 線程內(nèi) (natural) 順序關(guān)系 ( sequenced-before )
|
release - - - - - - - - - - - - -
| 2 強(qiáng)迫 線程間順序關(guān)系 ( synchronizes-with 關(guān)系 )
|
acquire
|
| 3 線程內(nèi) (natural) 順序關(guān)系 ( sequenced-before )
|
acquire 后操作
————————————————————————————————————————————————————————————————————————————————————
4 Fences 圍欄/隔離措施
`全局操作, 不修改數(shù)據(jù), 在代碼中 放一條線, 使 特定操作不能越過(guò)
=> 引入 happens-before 和 synchronizes-with 關(guān)系`
`release / acquire fence` 等價(jià)于
store / load tagged with memory_order_`release/acquire`
5 用原子操作 強(qiáng)迫 非原子操作
1 原子操作 的 happens-before + fence => 非原子操作 的 happens-before
第9章 高級(jí)線程管理 — 線程池
#1 線程池
work 即 task: 是 func_wrap obj
|
| 如
|/
std::function
1 3 階段
(1) 創(chuàng)建 線程池 object
啟動(dòng) 多線程 - - - - -> 線程函數(shù)統(tǒng)一入口 worker_thread()
push 多個(gè) thread object 到 `線程 vector`
(2) push 想 并行 的 `work` 到 work_queue
|
|/
submit
(3) thread_pool 自動(dòng)管理 線程 + work
`每個(gè) thread 的 線程函數(shù)` 從 `work_queue` 中
循環(huán) try_pop 出 work 去執(zhí)行: work()
調(diào) wrap 的 callable object 的 operator()
若 try_pop 失敗, 則 yield 切出 線程
Note
`每個(gè) work` 被 `worker threads` 中 `某線程 取出 并執(zhí)行`
|
|/
未指定
#1.1 最簡(jiǎn)單情況: `線程池` 中 `線程數(shù)量 固定`
1 `std::function` 可 store / copy / invoke
`有 copy 語(yǔ)義` 的 `callable object`
2 保證 線程一定會(huì)被 等待/join()
RAII 類(lèi)
class join_threads
{
private:
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_):
threads(threads_) {}
~join_threads()
{
for(unsigned long i=0; i<threads.size(); ++i)
{
if( threads[i].joinable() )
threads[i].join();
}
}
};
3 有 work 要做時(shí),
調(diào) submit( Func ) 去 push work 到 pending work_queue
4 適用場(chǎng)景
(1) work 間 完全獨(dú)立
(2) 等待線程完成 而不是 work 完成
(3) 無(wú) 阻塞
class thread_pool
{
private:
// (1) 標(biāo)志 thread_pool 是否已銷(xiāo)毀
std::atomic_bool done;
// (2) work_queue
thread_safe_queue< std::function< void() > > work_queue;
// (3) worker_threads: 用 vector
std::vector<std::thread> threads;
// (4) 等待 worker_threads 中 所有 thread 結(jié)束 => RAII 類(lèi)
join_threads joiner;
void
worker_thread()
{
while( !done ) // done==false / thread_pool 銷(xiāo)毀前, n 個(gè) 線程函數(shù) worker_thread() 均一直循環(huán) ( try_pop work 失敗, 立即 切出線程 )
{
std::function<void()> work; // std::function object
// `每個(gè) thread` 從 work_queue 中 `循環(huán) try_pop 出 work 去執(zhí)行: 若 try_pop 失敗, yield 切出 線程`
if( work_queue.try_pop(work) )
{
work(); // invoke std::function 內(nèi) store 的 target ( callable object ) 的 operator()
}
else
{
std::this_thread::yield();
}
}
}
public:
template<typename F>
void
submit(F f)
{
work_queue.push( std::function<void()>(f) );
}
thread_pool():
done(false), joiner(threads)
{
unsigned const thread_count =
std::thread::hardware_concurrency();
try
{
for(unsigned i=0; i < thread_count; ++i)
{
threads.push_back(
std::thread( &thread_pool::worker_thread, this) );
}
}
catch(...)
{
// 保證 線程啟動(dòng)失敗時(shí)矛缨,已啟動(dòng)的線程 被停掉 和 清理
done = true; // 只需把 線程統(tǒng)一入口函數(shù)中 循環(huán)停掉
throw;
}
}
// 2. dtor
~thread_pool()
{
done = true;
}
};
##1.2 等待 work 而不是 線程
|
|/
callable object
———————————————————————————————————————————————————————————
work wrap 進(jìn) Work
|
|/
[1] 只有 copy 語(yǔ)義 std::function 等待 線程
|
| 換為
|/
[2] 只含 move 語(yǔ)義
自定義: 提供 move 語(yǔ)義 的 ctor 等待 work
———————————————————————————————————————————————————————————
3 個(gè) 可 move class
[1] callable object
|
| 左值引用 para: Work 的 ctor / move ctor / move assignment
|/
[2] Work
unique_ptr
成員類(lèi)型
1] Ctor: callable object 左值引用 para
template<typename F>
Work(F&& f):
sp_impl( new impl_type<F>(std::move(f) ) ) {}
2] 函數(shù)調(diào)用運(yùn)算符 調(diào) call()
void operator()() { up_impl->call(); }
3] call() 調(diào) callable object 的 函數(shù)調(diào)用運(yùn)算符
[3] pask_task
1] std::result_of<CallableType()>::type
取出 callable object
函數(shù)調(diào)用運(yùn)算符 的 return type
|
| 作 future
|/
模板參數(shù)類(lèi)型 T
2] callable object 的 ownership
依次 轉(zhuǎn)移給 std::packaged_task // std::packaged_task< return_type() > pack_tsk( std::move(f) );
轉(zhuǎn)移給 work_queue 中 ( 隊(duì)尾 ) Work //work_queue.push( std::move(pack_tsk) );
template< class R, class ...Args >
class packaged_task< R(Args...) >; // R: return type
// ctor
template <class F>
explicit packaged_task( F&& f );
template< class S, class... Args >
class result_of< S (Args...) >;
Note
S: callable type, not return type
std::result_of<S(char, int&)>::type 是 S 的 operator() 的 return type
struct S {
double operator()(char, int&);
};
int main()
{
std::result_of<S(char, int&)>::type f = 3.14; // f has type double
}
// List9.2 線程池 with waitable works
class Work
{
private:
struct impl_base
{
virtual void call() = 0;
virtual ~impl_base() {}
};
template<typename F>
struct impl_type: impl_base
{
F f;
impl_type(F&& f_)
: f( std::move(f_) ) {}
void call() { f(); }
};
// (2) struct 用 unique_ptr 管理
std::unique_ptr<impl_base> up_impl;
public:
// (1) default ctor: empty
Work() = default;
// (2) move 語(yǔ)義: ctor / move ctor / move assignment
template<typename F>
Work(F&& f):
up_impl( new impl_type<F>(std::move(f) ) ) {}
Work(Work&& rhs):
up_impl( std::move(rhs.up_impl) ) {}
Work&
operator=(Work&& rhs)
{
up_impl=std::move(rhs.up_impl);
return *this;
}
// (3) 函數(shù)調(diào)用運(yùn)算符
void operator()() { up_impl->call(); }
// copy 語(yǔ)義 delete
Work(const Work&)=delete;
Work(Work&) = delete;
Work& operator=(const Work&)=delete;
};
class thread_pool
{
thread_safe_queue< Work > work_queue;
void worker_thread()
{
while(!done)
{
Work work;
if( work_queue.try_pop(work) )
{
work();
}
else
{
std::this_thread::yield();
}
}
}
public:
template<typename CallableType>
std::future<typename std::result_of<CallableType()>::type>
submit(CallableType f)
{
typedef typename std::result_of<CallableType()>::type
return_type;
std::packaged_task< return_type() > pack_tsk( std::move(f) );
std::future<return_type> fut( pack_tsk.get_future() );
work_queue.push( std::move(pack_tsk) );
return fut;
}
// rest as before
};