C++ 11 引入了 std::thread 標準庫,方便了多線程相關(guān)的開發(fā)工作。
說到多線程開發(fā)俊鱼,可不僅僅是創(chuàng)建一個新線程就好了汛聚,不可避免的要涉及到線程同步的問題锹安。
而保證線程同步,實現(xiàn)線程安全倚舀,就要用到相關(guān)的工具了叹哭,比如信號量、互斥量痕貌、條件變量风罩、原子變量等等。
這些名詞概念都是來操作系統(tǒng)里面引申來的舵稠,并不是屬于哪一種編程語言所特有的超升,在不同語言上的表現(xiàn)形式不一樣,但其背后的原理是一致的哺徊。
C++ 11 同樣引入了 mutex室琢、condition_variable、future 等實現(xiàn)線程安全的類落追,下面就來一一了解它們盈滴。
mutex
mutex 作為互斥量,提供了獨占所有權(quán)的特性轿钠。
一個線程將互斥量鎖住雹熬,直到調(diào)用 unlock 之前,該線程都是擁有該鎖的谣膳,而其他線程訪問被鎖住的互斥量竿报,則會被阻塞住。
使用示例:
#include <thread>
#include <iostream>
int num = 0;
std::mutex mutex;
void plus(){
std::lock_guard<std::mutex> guard(mutex);
std::cout << num++ <<std::endl;
}
int main(){
std::thread threads[10];
for (auto & i : threads) {
i = std::thread(plus);
}
for (auto & thread : threads) {
thread.join();
}
return 0;
}
如上代碼继谚,創(chuàng)建了 10 個線程烈菌,每個線程都會先打印 num 的值,然后再將 num 變量加一花履,依次打印 0 到 9 芽世。
眾所周知,+1 的操作不是線程安全的诡壁,實際包含了三步济瓢,先讀取,再加一妹卿,最后賦值旺矾。但是因為使用了互斥量 mutex 蔑鹦,保證獨占性,所以結(jié)果都是按照順序遞增打印的箕宙。
如果不使用互斥量嚎朽,那么可能前一個線程還沒有賦值完,后一個線程就進行了讀取柬帕,最后的結(jié)果就是隨機不可預料的哟忍。
condition_variable
condition_variable 作為條件變量,它會調(diào)用 wait 函數(shù)等待某個條件滿足陷寝,如果不滿足的話锅很,就通過 unique_lock 來鎖住當前線程,當前線程就處于阻塞狀態(tài)凤跑,直到其他線程調(diào)用了條件變量的 nofity 函數(shù)來喚醒粗蔚。
使用示例:
#include <iostream>
#include <thread>
int num = 0;
std::mutex mutex;
std::condition_variable cv;
void plus(int target){
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock,[target]{return num == target;});
num++;
std::cout << target <<std::endl;
cv.notify_all();
}
int main(){
std::thread threads[10];
for (int i = 0; i < 10; ++i) {
threads[i] = std::thread(plus,9-i);
}
for (auto & thread : threads) {
thread.join();
}
return 0;
}
同樣是創(chuàng)建了 10 個線程,每個線程都會有一個 target 參數(shù)饶火,代表該線程要打印的數(shù)值鹏控,按照 9 -> 0 順序創(chuàng)建線程,最后運行結(jié)果是依次打印 0 -> 9 肤寝。
每個線程運行時都會先調(diào)用 wait 函數(shù)等待 num == target 這一條件滿足当辐,一旦滿足就會將 num 加一,并打印 target 值鲤看,然后喚醒下一個滿足條件的值缘揪。
通過改變條件變量的 wait 函數(shù)喚醒條件,就可以實現(xiàn)不同的多線程模式义桂,比如常見的生產(chǎn)者-消費者模型找筝。
condiation_variable 實現(xiàn)生產(chǎn)消費者模式
#include <iostream>
#include <thread>
#include <queue>
int main(){
std::queue<int> products;
std::condition_variable cv_pro,cv_con;
std::mutex mtx;
bool end = false;
std::thread producer([&]{
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
cv_pro.wait(lock,[&]{return products.empty();});
products.push(i);
cv_con.notify_all();
}
cv_con.notify_all();
end = true;
});
std::thread consumer([&]{
while (!end){
std::unique_lock<std::mutex> lock(mtx);
cv_con.wait(lock,[&]{return !products.empty();});
int d = products.front();
products.pop();
std::cout << d << std::endl;
cv_pro.notify_all();
}
});
producer.join();
consumer.join();
return 0;
}
future & promise
future 這一特性在日常開發(fā)中用的比較少,它可以用來獲取異步任務的結(jié)果慷吊,也可以當做一種線程間同步的手段袖裕。
假設(shè)現(xiàn)在程序要創(chuàng)建一個線程去執(zhí)行耗時操作,并且等耗時操作結(jié)束了之后要拿到返回值溉瓶,那么可以通過 condiation_variable 來實現(xiàn)急鳄,在異步線程執(zhí)行完了之后調(diào)用 notify 方法來喚醒主線程,同樣也可以通過 future 來實現(xiàn)堰酿。
當程序通過特定方法創(chuàng)建了一個異步操作之后會返回一個 future 疾宏,該 future 可以訪問到異步線程的狀態(tài)。
在異步線程里面設(shè)置某個共享狀態(tài)的值触创,與該共享狀態(tài)相關(guān)聯(lián)的 future 就可以通過 get 方法來獲取結(jié)果坎藐,get() 方法會阻塞調(diào)用線程,從而等待異步線程完成設(shè)置哼绑。
future 的 get 方法其實就相當于 condiation_variable 的 wait 方法岩馍,而異步線程設(shè)置共享狀態(tài)的值的方法就相當于 condiation_variable 的 notify 方法碉咆。
future 的創(chuàng)建有如下三種方式:
std::promise
promise 就像它的字面意思一樣,代表承諾兼雄,說明它一定會在異步線程設(shè)置共享狀態(tài)的,而 future 就耐心等待好了帽蝶。
promise 和 future 的調(diào)用流程如下圖所示:
代碼示例如下:
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
void task(std::promise<int>& promise){
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(10);
}
int main(){
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread t(task,std::ref(promise));
int result = future.get();
std::cout << "thread result is " << result << std::endl;
t.join();
return 0;
}
promise 通過 get_future 方法獲取與該 promise 關(guān)聯(lián)的 future 對象赦肋,并且通過 set_value 方法設(shè)置共享狀態(tài)的值。
std::packaged_task
packaged_task 可以用來包裝一個可調(diào)用的對象 励稳,并且能作為線程的運行函數(shù)佃乘,有點類似于 std::function 。
但不同的是驹尼,它將其包裝的可調(diào)用對象的執(zhí)行結(jié)果傳遞一個相關(guān)聯(lián)的 future 趣避,從而實現(xiàn)狀態(tài)的共享,future 通過 get 方法來等待可調(diào)用對象執(zhí)行結(jié)束新翎。
如下代碼所示:
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
int task(){
std::this_thread::sleep_for(std::chrono::seconds(1));
return 10;
}
int main(){
std::packaged_task<int(void)> packaged_task(task);
std::future<int> future = packaged_task.get_future();
std::thread thread(std::move(packaged_task));
int result = future.get();
std::cout << "thread result is " << result << std::endl;
thread.join();
return 0;
}
packaged_task 通過 get_future 方法來獲得相關(guān)聯(lián)的 future 對象程帕。
std::async
async 也能創(chuàng)建 future ,并且它更像是對 std::thread地啰,std::packaged_task愁拭,std::promise 的封裝。
如下代碼所示:
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
int task(){
std::this_thread::sleep_for(std::chrono::seconds(1));
return 10;
}
int main(){
std::future<int> future = std::async(std::launch::async,task);
int result = future.get();
std::cout << "thread result is " << result << std::endl;
return 0;
}
通過 async 直接創(chuàng)建異步線程并且獲取相關(guān)聯(lián)的 future 對象亏吝,連 thread 創(chuàng)建線程的操作都省了岭埠。
async 有兩種執(zhí)行策略,launch::async 和 launch::deferred 蔚鸥。其中前者是立即執(zhí)行惜论,后者是等調(diào)用 future.get() 方法時才會創(chuàng)建線程執(zhí)行任務。
線程池的建設(shè)
了解了以上的線程相關(guān)操作類止喷,就可以進一步進階馆类,通過它們來打造一個線程池了。
關(guān)于線程池建設(shè)弹谁,根據(jù)具體業(yè)務和使用場景蹦掐,會有很多不同支持,但有些本質(zhì)內(nèi)容還是不會變的僵闯。
線程池的出發(fā)點當然是為了減少在頻繁創(chuàng)建和銷毀線程上所花的時間和系統(tǒng)資源的開銷卧抗,表現(xiàn)形式上就是有一池子的線程,向線程池提交任務鳖粟,最終分配到某個線程上去執(zhí)行社裆。
如下圖所示,就是一個簡易線程池的雛形向图,有任務 Task泳秀,有 Thread Pool 來分發(fā)任務标沪,也有 Worker Thread 來最終執(zhí)行任務,麻雀雖小五臟俱全嗜傅。
接下來就詳細拆解以上部分內(nèi)容金句。
任務類型
任務 Task 的類型根據(jù)業(yè)務需求可以有多種定義,主要差別在于任務需要的參數(shù)類型以及返回值類型吕嘀。
另外违寞,任務本身也可以有一些屬性來標識該屬性的類型,應該放到什么樣的線程去執(zhí)行等等偶房。
簡單起見趁曼,定義一個簡單的 Task 類型,無需參數(shù)棕洋,也不需要返回值類型挡闰。
using task = std::function<void()>;
線程數(shù)量
一個線程池該有多少線程呢?如果數(shù)量太多掰盘,會導致資源浪費摄悯,有些線程不一定能充分使用。如果太少就會導致頻繁創(chuàng)建新線程愧捕。
一個靈活的線程池應該是可以動態(tài)改變線程數(shù)量的射众,參考 Java線程池實現(xiàn)原理及其在美團業(yè)務中的實踐。
在 Java 的 ThreadPoolExecutor 中通過 corePoolSize 和 maximumPoolSize 來限制線程的數(shù)量晃财,線程數(shù)量會在 [0 ~ corePoolSize]
和 [corePoolSize ~ maximumPoolSize]
之間波動叨橱。
當任務吃緊,線程和緩存都滿了断盛,就會申請線程罗洗,數(shù)量達到 [corePoolSize ~ maximumPoolSize]
范圍,一旦任務松懈钢猛,就會釋放一些空閑線程伙菜,數(shù)量回落到 [0 ~ corePoolSize]
范圍,如果任務持續(xù)吃緊命迈,那么就會拒絕任務了贩绕。
當然,也有其他確定線程數(shù)量的策略壶愤,根據(jù)具體的業(yè)務需求來核定淑倾,比如根據(jù) CPU 多核來決定線程數(shù)量多少。
簡單起見征椒,這里就以固定線程數(shù)量作為演示了娇哆。
size_t N = std::thread::hardware_concurrency();
任務緩存
假設(shè)現(xiàn)在已經(jīng)固定了 N 個線程,并且每個線程都有任務在執(zhí)行,這時有來了一個新任務碍讨,那么該怎么處理呢治力? 這時候就需要任務緩存機制了(當然也可以直接拒絕該任務)。
任務緩存也分多種形式:
- 全局緩存
- 線程緩存
- 全局緩存 + 線程緩存
全局緩存
全局緩存勃黍,顧名思義就是在線程池有一個全局的緩存隊列宵统,凡是進入到線程池的任務都會先進到全局緩存中,然后由全局緩存進行任務的分發(fā)覆获,最后由不同的工作線程去執(zhí)行任務马澈。
線程緩存
線程緩存,顧名思義就是在每個工作線程都有一個緩存隊列锻梳,然后線程不斷循環(huán)處理自己緩存隊列上的任務箭券。凡是進入到線程池的任務净捅,都會由線程池進到調(diào)度和分發(fā)疑枯,然后進入到工作線程對應的緩存隊列中,最終被執(zhí)行結(jié)束
全局緩存 + 線程緩存
全局緩存 + 線程緩存 就是將上面兩者結(jié)合起來了蛔六,用如下圖來匯總演示:
這種緩存方式算是比較復雜的情形了荆永,適用于那種計算量大且快速執(zhí)行的情形蹦狂,一般來說還是全局緩存用的比較普遍鸽捻。
緩存隊列
有了任務緩存简卧,那么還應該定義一下緩存隊列断箫。毫無疑問糟把,緩存隊列必須是線程安全的缨历,因為它要在多個工作線程之間共享任務衫贬。
緩存隊列的形式有很多蝴乔,比如阻塞隊列四啰,雙向鏈表的阻塞隊列等等宁玫,這里定義一個簡單的隊列,把 std::queue 做一下線程安全的封裝柑晒。
#pragma once
#include <mutex>
#include <queue>
// Thread safe implementation of a Queue using an std::queue
template <typename T>
class SafeQueue {
private:
std::queue<T> m_queue;
std::mutex m_mutex;
public:
SafeQueue() {
}
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
定義了 enqueue 和 dequeue 方法向隊列中塞任務和取任務欧瘪,通過加鎖來保證線程安全。
線程調(diào)度
線程池最核心的部分也就是線程調(diào)度了匙赞,假設(shè)使用了全局緩存的形式佛掖,那么如何把全局緩存中的任務分發(fā)給空閑線程呢?
實際上從某種角度來說涌庭,全局緩存的線程池也可以認為是一個單生產(chǎn)者-多消費者模式芥被,全局緩存就是生產(chǎn)者,而多個線程就是多個消費者了坐榆。
在前面的代碼實踐中已經(jīng)寫了一個單生產(chǎn)者-單消費者模式撕彤,當生產(chǎn)者生產(chǎn)了 Task 之后,就通過 notify 方法來喚醒消費者,從而將 Task 分配到消費者去執(zhí)行羹铅。由于只有一個消費者蚀狰,那喚醒的就是唯一的那個了。
那假若有多個消費者职员,喚醒的又是哪一個呢麻蹋?答案是隨機的。調(diào)用 notify_one 方法會隨機喚醒一個線程焊切,調(diào)用 notify_all 則會喚醒全部線程扮授。
但是喚醒并不代表線程就會消費 Task,一個 Task 對應多個線程专肪,線程喚醒之后會去全局緩存搶奪 Task 任務刹勃,一旦得手就執(zhí)行,而其他沒有搶到的線程則繼續(xù)掛起嚎尤,等待下一次的喚醒荔仁。
線程池中線程的運行狀態(tài)如下圖所示:
本質(zhì)上,線程池還是通過 notify 方法來喚醒線程芽死,從而實現(xiàn)任務分發(fā)和調(diào)度的乏梁。
這種方式具有一定的隨機性,不能確保到底喚醒了哪個線程关贵,可以根據(jù)業(yè)務需要定制相關(guān)的調(diào)度邏輯遇骑,比如只喚醒某些具有共同屬性的線程,或者根據(jù) Task 任務的要求來喚醒指定線程揖曾,更可以不通過 notify 的方式落萎,直接把任務派發(fā)給對應的線程去執(zhí)行。
根據(jù)上述流程圖就可以給出工作線程運行的代碼了:
class WorkerThread {
private:
int m_id;
ThreadPool *m_pool;
public:
WorkerThread(ThreadPool *pool, int id) : m_pool(pool), m_id(id) {
}
void operator()() {
task func;
bool dequeued;
while (!m_pool->m_shutdown) {
std::unique_lock<std::mutex> lock(m_pool->m_mutex);
if (m_pool->m_queue.empty()){
m_pool->m_condition_variable.wait(lock);
}
dequeued = m_pool->m_queue.dequeue(func);
if (dequeued) {
func();
}
}
}
};
下面就是一個簡單的線程池代碼實踐:
#ifndef THREAD_POOL_THREADPOOL_H
#define THREAD_POOL_THREADPOOL_H
#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include "SafeQueue.h"
using task = std::function<void()>;
class ThreadPool {
public:
ThreadPool(size_t thread_num = std::thread::hardware_concurrency()) : m_threads(
std::vector<std::thread>(thread_num)), m_shutdown(false) {
}
void init() {
for (int i = 0; i < m_threads.size(); ++i) {
m_threads[i] = std::thread(WorkerThread(this, i));
}
}
void shutdown() {
m_shutdown = true;
m_condition_variable.notify_all();
for (int i = 0; i < m_threads.size(); ++i) {
if (m_threads[i].joinable()) {
m_threads[i].join();
}
}
}
std::future<void> submit(task t){
auto p_task = std::make_shared<std::packaged_task<void()>>(t);
task wrapper_task = [p_task](){
(*p_task)();
};
m_queue.enqueue(wrapper_task);
m_condition_variable.notify_one();
return p_task->get_future();
}
private:
class WorkerThread {
private:
int m_id;
ThreadPool *m_pool;
public:
WorkerThread(ThreadPool *pool, int id) : m_pool(pool), m_id(id) {
}
void operator()() {
task func;
bool dequeued;
while (!m_pool->m_shutdown) {
std::unique_lock<std::mutex> lock(m_pool->m_mutex);
if (m_pool->m_queue.empty()){
m_pool->m_condition_variable.wait(lock);
}
dequeued = m_pool->m_queue.dequeue(func);
if (dequeued) {
func();
}
}
}
};
bool m_shutdown;
SafeQueue<task> m_queue;
std::vector<std::thread> m_threads;
std::mutex m_mutex;
std::condition_variable m_condition_variable;
};
#endif //THREAD_POOL_THREADPOOL_H
通過 submit 方法提交任務到全局緩存隊列中炭剪,然后喚醒線程去消費任務執(zhí)行练链。
小結(jié)
關(guān)于 C++ 多線程的使用還有很多知識點,以上只是介紹了部分內(nèi)容念祭,還有很多不足之處兑宇,后續(xù)再補充了。