C++ 中的多線程的使用和線程池建設(shè)

C++ 11 引入了 std::thread 標準庫,方便了多線程相關(guān)的開發(fā)工作。

說到多線程開發(fā)俊鱼,可不僅僅是創(chuàng)建一個新線程就好了汛聚,不可避免的要涉及到線程同步的問題锹安。

而保證線程同步,實現(xiàn)線程安全倚舀,就要用到相關(guān)的工具了叹哭,比如信號量、互斥量痕貌、條件變量风罩、原子變量等等。

這些名詞概念都是來操作系統(tǒng)里面引申來的舵稠,并不是屬于哪一種編程語言所特有的超升,在不同語言上的表現(xiàn)形式不一樣,但其背后的原理是一致的哺徊。

C++ 11 同樣引入了 mutex室琢、condition_variable、future 等實現(xiàn)線程安全的類落追,下面就來一一了解它們盈滴。

mutex

mutex 作為互斥量,提供了獨占所有權(quán)的特性轿钠。

一個線程將互斥量鎖住雹熬,直到調(diào)用 unlock 之前,該線程都是擁有該鎖的谣膳,而其他線程訪問被鎖住的互斥量竿报,則會被阻塞住。

使用示例:

#include <thread>
#include <iostream>

int num = 0;
std::mutex mutex;

void plus(){
    std::lock_guard<std::mutex> guard(mutex);
    std::cout << num++ <<std::endl;
}

int main(){
    std::thread threads[10];
    for (auto & i : threads) {
        i = std::thread(plus);
    }
    for (auto & thread : threads) {
        thread.join();
    }
    return 0;
}

如上代碼继谚,創(chuàng)建了 10 個線程烈菌,每個線程都會先打印 num 的值,然后再將 num 變量加一花履,依次打印 0 到 9 芽世。

眾所周知,+1 的操作不是線程安全的诡壁,實際包含了三步济瓢,先讀取,再加一妹卿,最后賦值旺矾。但是因為使用了互斥量 mutex 蔑鹦,保證獨占性,所以結(jié)果都是按照順序遞增打印的箕宙。

如果不使用互斥量嚎朽,那么可能前一個線程還沒有賦值完,后一個線程就進行了讀取柬帕,最后的結(jié)果就是隨機不可預料的哟忍。

condition_variable

condition_variable 作為條件變量,它會調(diào)用 wait 函數(shù)等待某個條件滿足陷寝,如果不滿足的話锅很,就通過 unique_lock 來鎖住當前線程,當前線程就處于阻塞狀態(tài)凤跑,直到其他線程調(diào)用了條件變量的 nofity 函數(shù)來喚醒粗蔚。

使用示例:

#include <iostream>
#include <thread>

int num = 0;
std::mutex mutex;
std::condition_variable cv;

void plus(int target){
    std::unique_lock<std::mutex> lock(mutex);
    cv.wait(lock,[target]{return num == target;});
    num++;
    std::cout << target <<std::endl;
    cv.notify_all();
}

int main(){
    std::thread threads[10];
    for (int i = 0; i < 10; ++i) {
        threads[i] = std::thread(plus,9-i);
    }
    for (auto & thread : threads) {
        thread.join();
    }
    return 0;
}

同樣是創(chuàng)建了 10 個線程,每個線程都會有一個 target 參數(shù)饶火,代表該線程要打印的數(shù)值鹏控,按照 9 -> 0 順序創(chuàng)建線程,最后運行結(jié)果是依次打印 0 -> 9 肤寝。

每個線程運行時都會先調(diào)用 wait 函數(shù)等待 num == target 這一條件滿足当辐,一旦滿足就會將 num 加一,并打印 target 值鲤看,然后喚醒下一個滿足條件的值缘揪。

通過改變條件變量的 wait 函數(shù)喚醒條件,就可以實現(xiàn)不同的多線程模式义桂,比如常見的生產(chǎn)者-消費者模型找筝。

condiation_variable 實現(xiàn)生產(chǎn)消費者模式

#include <iostream>
#include <thread>
#include <queue>

int main(){
    std::queue<int> products;
    std::condition_variable cv_pro,cv_con;
    std::mutex mtx;

    bool end = false;
    std::thread producer([&]{
        for (int i = 0; i < 10; ++i) {
            std::unique_lock<std::mutex> lock(mtx);
            cv_pro.wait(lock,[&]{return products.empty();});
            products.push(i);
            cv_con.notify_all();
        }
        cv_con.notify_all();
        end = true;
    });

    std::thread consumer([&]{
        while (!end){
            std::unique_lock<std::mutex> lock(mtx);
            cv_con.wait(lock,[&]{return !products.empty();});
            int d = products.front();
            products.pop();
            std::cout << d << std::endl;
            cv_pro.notify_all();
        }
    });

    producer.join();
    consumer.join();
    return 0;
}

future & promise

future 這一特性在日常開發(fā)中用的比較少,它可以用來獲取異步任務的結(jié)果慷吊,也可以當做一種線程間同步的手段袖裕。

假設(shè)現(xiàn)在程序要創(chuàng)建一個線程去執(zhí)行耗時操作,并且等耗時操作結(jié)束了之后要拿到返回值溉瓶,那么可以通過 condiation_variable 來實現(xiàn)急鳄,在異步線程執(zhí)行完了之后調(diào)用 notify 方法來喚醒主線程,同樣也可以通過 future 來實現(xiàn)堰酿。

當程序通過特定方法創(chuàng)建了一個異步操作之后會返回一個 future 疾宏,該 future 可以訪問到異步線程的狀態(tài)。

在異步線程里面設(shè)置某個共享狀態(tài)的值触创,與該共享狀態(tài)相關(guān)聯(lián)的 future 就可以通過 get 方法來獲取結(jié)果坎藐,get() 方法會阻塞調(diào)用線程,從而等待異步線程完成設(shè)置哼绑。

future 的 get 方法其實就相當于 condiation_variable 的 wait 方法岩馍,而異步線程設(shè)置共享狀態(tài)的值的方法就相當于 condiation_variable 的 notify 方法碉咆。

future 的創(chuàng)建有如下三種方式:

std::promise

promise 就像它的字面意思一樣,代表承諾兼雄,說明它一定會在異步線程設(shè)置共享狀態(tài)的,而 future 就耐心等待好了帽蝶。

promise 和 future 的調(diào)用流程如下圖所示:

image

代碼示例如下:

#include <iostream>
#include <thread>
#include <chrono>
#include <future>

void task(std::promise<int>& promise){
    std::this_thread::sleep_for(std::chrono::seconds(1));
    promise.set_value(10);
}

int main(){
    std::promise<int> promise;
    std::future<int> future = promise.get_future();
    std::thread t(task,std::ref(promise));
    int result = future.get();
    std::cout << "thread result is " << result << std::endl;
    t.join();
    return 0;
}

promise 通過 get_future 方法獲取與該 promise 關(guān)聯(lián)的 future 對象赦肋,并且通過 set_value 方法設(shè)置共享狀態(tài)的值。

std::packaged_task

packaged_task 可以用來包裝一個可調(diào)用的對象 励稳,并且能作為線程的運行函數(shù)佃乘,有點類似于 std::function 。

但不同的是驹尼,它將其包裝的可調(diào)用對象的執(zhí)行結(jié)果傳遞一個相關(guān)聯(lián)的 future 趣避,從而實現(xiàn)狀態(tài)的共享,future 通過 get 方法來等待可調(diào)用對象執(zhí)行結(jié)束新翎。

如下代碼所示:

#include <iostream>
#include <thread>
#include <chrono>
#include <future>

int task(){
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 10;
}

int main(){
    std::packaged_task<int(void)> packaged_task(task);
    std::future<int> future = packaged_task.get_future();
    std::thread thread(std::move(packaged_task));
    int result = future.get();
    std::cout << "thread result is " << result << std::endl;
    thread.join();
    return 0;
}

packaged_task 通過 get_future 方法來獲得相關(guān)聯(lián)的 future 對象程帕。

std::async

async 也能創(chuàng)建 future ,并且它更像是對 std::thread地啰,std::packaged_task愁拭,std::promise 的封裝。

如下代碼所示:

#include <iostream>
#include <thread>
#include <chrono>
#include <future>

int task(){
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 10;
}

int main(){
    std::future<int> future = std::async(std::launch::async,task);
    int result = future.get();
    std::cout << "thread result is " << result << std::endl;
    return 0;
}

通過 async 直接創(chuàng)建異步線程并且獲取相關(guān)聯(lián)的 future 對象亏吝,連 thread 創(chuàng)建線程的操作都省了岭埠。

async 有兩種執(zhí)行策略,launch::async 和 launch::deferred 蔚鸥。其中前者是立即執(zhí)行惜论,后者是等調(diào)用 future.get() 方法時才會創(chuàng)建線程執(zhí)行任務。

線程池的建設(shè)

了解了以上的線程相關(guān)操作類止喷,就可以進一步進階馆类,通過它們來打造一個線程池了。

關(guān)于線程池建設(shè)弹谁,根據(jù)具體業(yè)務和使用場景蹦掐,會有很多不同支持,但有些本質(zhì)內(nèi)容還是不會變的僵闯。

線程池的出發(fā)點當然是為了減少在頻繁創(chuàng)建和銷毀線程上所花的時間和系統(tǒng)資源的開銷卧抗,表現(xiàn)形式上就是有一池子的線程,向線程池提交任務鳖粟,最終分配到某個線程上去執(zhí)行社裆。

如下圖所示,就是一個簡易線程池的雛形向图,有任務 Task泳秀,有 Thread Pool 來分發(fā)任務标沪,也有 Worker Thread 來最終執(zhí)行任務,麻雀雖小五臟俱全嗜傅。

image

接下來就詳細拆解以上部分內(nèi)容金句。

任務類型

任務 Task 的類型根據(jù)業(yè)務需求可以有多種定義,主要差別在于任務需要的參數(shù)類型以及返回值類型吕嘀。

另外违寞,任務本身也可以有一些屬性來標識該屬性的類型,應該放到什么樣的線程去執(zhí)行等等偶房。

簡單起見趁曼,定義一個簡單的 Task 類型,無需參數(shù)棕洋,也不需要返回值類型挡闰。

using task = std::function<void()>;

線程數(shù)量

一個線程池該有多少線程呢?如果數(shù)量太多掰盘,會導致資源浪費摄悯,有些線程不一定能充分使用。如果太少就會導致頻繁創(chuàng)建新線程愧捕。

一個靈活的線程池應該是可以動態(tài)改變線程數(shù)量的射众,參考 Java線程池實現(xiàn)原理及其在美團業(yè)務中的實踐

在 Java 的 ThreadPoolExecutor 中通過 corePoolSize 和 maximumPoolSize 來限制線程的數(shù)量晃财,線程數(shù)量會在 [0 ~ corePoolSize][corePoolSize ~ maximumPoolSize]之間波動叨橱。

當任務吃緊,線程和緩存都滿了断盛,就會申請線程罗洗,數(shù)量達到 [corePoolSize ~ maximumPoolSize] 范圍,一旦任務松懈钢猛,就會釋放一些空閑線程伙菜,數(shù)量回落到 [0 ~ corePoolSize] 范圍,如果任務持續(xù)吃緊命迈,那么就會拒絕任務了贩绕。

當然,也有其他確定線程數(shù)量的策略壶愤,根據(jù)具體的業(yè)務需求來核定淑倾,比如根據(jù) CPU 多核來決定線程數(shù)量多少。

簡單起見征椒,這里就以固定線程數(shù)量作為演示了娇哆。

size_t N = std::thread::hardware_concurrency();

任務緩存

假設(shè)現(xiàn)在已經(jīng)固定了 N 個線程,并且每個線程都有任務在執(zhí)行,這時有來了一個新任務碍讨,那么該怎么處理呢治力? 這時候就需要任務緩存機制了(當然也可以直接拒絕該任務)。

任務緩存也分多種形式:

  1. 全局緩存
  2. 線程緩存
  3. 全局緩存 + 線程緩存

全局緩存

全局緩存勃黍,顧名思義就是在線程池有一個全局的緩存隊列宵统,凡是進入到線程池的任務都會先進到全局緩存中,然后由全局緩存進行任務的分發(fā)覆获,最后由不同的工作線程去執(zhí)行任務马澈。

線程緩存

線程緩存,顧名思義就是在每個工作線程都有一個緩存隊列锻梳,然后線程不斷循環(huán)處理自己緩存隊列上的任務箭券。凡是進入到線程池的任務净捅,都會由線程池進到調(diào)度和分發(fā)疑枯,然后進入到工作線程對應的緩存隊列中,最終被執(zhí)行結(jié)束

全局緩存 + 線程緩存

全局緩存 + 線程緩存 就是將上面兩者結(jié)合起來了蛔六,用如下圖來匯總演示:

image

這種緩存方式算是比較復雜的情形了荆永,適用于那種計算量大且快速執(zhí)行的情形蹦狂,一般來說還是全局緩存用的比較普遍鸽捻。

緩存隊列

有了任務緩存简卧,那么還應該定義一下緩存隊列断箫。毫無疑問糟把,緩存隊列必須是線程安全的缨历,因為它要在多個工作線程之間共享任務衫贬。

緩存隊列的形式有很多蝴乔,比如阻塞隊列四啰,雙向鏈表的阻塞隊列等等宁玫,這里定義一個簡單的隊列,把 std::queue 做一下線程安全的封裝柑晒。

#pragma once

#include <mutex>
#include <queue>

// Thread safe implementation of a Queue using an std::queue
template <typename T>
class SafeQueue {
private:
  std::queue<T> m_queue;
  std::mutex m_mutex;
public:
  SafeQueue() {

  }

  bool empty() {
    std::unique_lock<std::mutex> lock(m_mutex);
    return m_queue.empty();
  }
  
  int size() {
    std::unique_lock<std::mutex> lock(m_mutex);
    return m_queue.size();
  }

  void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
  }
  
  bool dequeue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);

    if (m_queue.empty()) {
      return false;
    }
    t = std::move(m_queue.front());
    
    m_queue.pop();
    return true;
  }
};

定義了 enqueue 和 dequeue 方法向隊列中塞任務和取任務欧瘪,通過加鎖來保證線程安全。

線程調(diào)度

線程池最核心的部分也就是線程調(diào)度了匙赞,假設(shè)使用了全局緩存的形式佛掖,那么如何把全局緩存中的任務分發(fā)給空閑線程呢?

實際上從某種角度來說涌庭,全局緩存的線程池也可以認為是一個單生產(chǎn)者-多消費者模式芥被,全局緩存就是生產(chǎn)者,而多個線程就是多個消費者了坐榆。

在前面的代碼實踐中已經(jīng)寫了一個單生產(chǎn)者-單消費者模式撕彤,當生產(chǎn)者生產(chǎn)了 Task 之后,就通過 notify 方法來喚醒消費者,從而將 Task 分配到消費者去執(zhí)行羹铅。由于只有一個消費者蚀狰,那喚醒的就是唯一的那個了。

那假若有多個消費者职员,喚醒的又是哪一個呢麻蹋?答案是隨機的。調(diào)用 notify_one 方法會隨機喚醒一個線程焊切,調(diào)用 notify_all 則會喚醒全部線程扮授。

但是喚醒并不代表線程就會消費 Task,一個 Task 對應多個線程专肪,線程喚醒之后會去全局緩存搶奪 Task 任務刹勃,一旦得手就執(zhí)行,而其他沒有搶到的線程則繼續(xù)掛起嚎尤,等待下一次的喚醒荔仁。

線程池中線程的運行狀態(tài)如下圖所示:

image

本質(zhì)上,線程池還是通過 notify 方法來喚醒線程芽死,從而實現(xiàn)任務分發(fā)和調(diào)度的乏梁。

這種方式具有一定的隨機性,不能確保到底喚醒了哪個線程关贵,可以根據(jù)業(yè)務需要定制相關(guān)的調(diào)度邏輯遇骑,比如只喚醒某些具有共同屬性的線程,或者根據(jù) Task 任務的要求來喚醒指定線程揖曾,更可以不通過 notify 的方式落萎,直接把任務派發(fā)給對應的線程去執(zhí)行。

根據(jù)上述流程圖就可以給出工作線程運行的代碼了:

class WorkerThread {
private:
    int m_id;
    ThreadPool *m_pool;
public:
    WorkerThread(ThreadPool *pool, int id) : m_pool(pool), m_id(id) {

    }

    void operator()() {
        task func;
        bool dequeued;
        while (!m_pool->m_shutdown) {
            std::unique_lock<std::mutex> lock(m_pool->m_mutex);
            if (m_pool->m_queue.empty()){
                m_pool->m_condition_variable.wait(lock);
            }
            dequeued = m_pool->m_queue.dequeue(func);
            if (dequeued) {
                func();
            }
        }
    }
};

下面就是一個簡單的線程池代碼實踐:


#ifndef THREAD_POOL_THREADPOOL_H
#define THREAD_POOL_THREADPOOL_H

#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include "SafeQueue.h"

using task = std::function<void()>;

class ThreadPool {
public:
    ThreadPool(size_t thread_num = std::thread::hardware_concurrency()) : m_threads(
            std::vector<std::thread>(thread_num)), m_shutdown(false) {
    }

    void init() {
        for (int i = 0; i < m_threads.size(); ++i) {
            m_threads[i] = std::thread(WorkerThread(this, i));
        }
    }

    void shutdown() {
        m_shutdown = true;
        m_condition_variable.notify_all();
        for (int i = 0; i < m_threads.size(); ++i) {
            if (m_threads[i].joinable()) {
                m_threads[i].join();
            }
        }
    }


    std::future<void> submit(task t){
        auto p_task = std::make_shared<std::packaged_task<void()>>(t);
        task wrapper_task = [p_task](){
             (*p_task)();
        };
        m_queue.enqueue(wrapper_task);
        m_condition_variable.notify_one();
        return p_task->get_future();
    }

private:
    class WorkerThread {
    private:
        int m_id;
        ThreadPool *m_pool;
    public:
        WorkerThread(ThreadPool *pool, int id) : m_pool(pool), m_id(id) {

        }

        void operator()() {
            task func;
            bool dequeued;
            while (!m_pool->m_shutdown) {
                std::unique_lock<std::mutex> lock(m_pool->m_mutex);
                if (m_pool->m_queue.empty()){
                    m_pool->m_condition_variable.wait(lock);
                }
                dequeued = m_pool->m_queue.dequeue(func);
                if (dequeued) {
                    func();
                }
            }
        }
    };

    bool m_shutdown;
    SafeQueue<task> m_queue;
    std::vector<std::thread> m_threads;
    std::mutex m_mutex;
    std::condition_variable m_condition_variable;

};

#endif //THREAD_POOL_THREADPOOL_H

通過 submit 方法提交任務到全局緩存隊列中炭剪,然后喚醒線程去消費任務執(zhí)行练链。

小結(jié)

關(guān)于 C++ 多線程的使用還有很多知識點,以上只是介紹了部分內(nèi)容念祭,還有很多不足之處兑宇,后續(xù)再補充了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末粱坤,一起剝皮案震驚了整個濱河市隶糕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌站玄,老刑警劉巖枚驻,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異株旷,居然都是意外死亡再登,警方通過查閱死者的電腦和手機尔邓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锉矢,“玉大人梯嗽,你說我怎么就攤上這事」了穑” “怎么了灯节?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绵估。 經(jīng)常有香客問我炎疆,道長,這世上最難降的妖魔是什么国裳? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任形入,我火速辦了婚禮,結(jié)果婚禮上缝左,老公的妹妹穿的比我還像新娘亿遂。我一直安慰自己,他們只是感情好盒使,可當我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布崩掘。 她就那樣靜靜地躺著七嫌,像睡著了一般少办。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诵原,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天英妓,我揣著相機與錄音,去河邊找鬼绍赛。 笑死蔓纠,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的吗蚌。 我是一名探鬼主播腿倚,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蚯妇!你這毒婦竟也來了敷燎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤箩言,失蹤者是張志新(化名)和其女友劉穎硬贯,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體陨收,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡饭豹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拄衰。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡它褪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出翘悉,到底是詐尸還是另有隱情列赎,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布镐确,位于F島的核電站包吝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏源葫。R本人自食惡果不足惜诗越,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望息堂。 院中可真熱鬧嚷狞,春花似錦、人聲如沸荣堰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽振坚。三九已至薇搁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間渡八,已是汗流浹背啃洋。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留屎鳍,地道東北人宏娄。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像逮壁,于是被迫代替她去往敵國和親孵坚。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容