0.為什么需要線程池滨巴?
當(dāng)我們需要完成一些持續(xù)時間短、發(fā)生頻率高的工作時,每次為他們開啟一個線程既顯得繁瑣又會造成不必要的開銷夭咬,所以為這一類工作寫一個簡單的線程池就很有必要了。
1.如何設(shè)計我們的線程池铆隘?
考慮這樣一個應(yīng)用場景卓舵,我們有一個設(shè)計軟件,當(dāng)我們需要下載一些模板用作接下來的設(shè)計工作膀钠。我們顯然不能將下載工作放在主線程去完成掏湾,如果你的用戶網(wǎng)絡(luò)速度飛快還好說,一旦他的網(wǎng)絡(luò)稍有波動肿嘲,你的設(shè)計軟件將會停止GUI界面的渲染融击,卡在那邊不懂,這顯然不是好的體驗雳窟。正確的做法是另外開啟一個線程去下載尊浪,一旦這個下載工作完成后,我們可以通知主線程去做一個提示,例如彈出一個對話框告訴用戶模板已經(jīng)可以用了拇涤。
如果這個設(shè)計軟件需要頻繁的做下載工作捣作,那么每次單開一個線程就比較麻煩。我們可以將下載任務(wù)打包好工育,一個一個的推送到隊列中虾宇,由線程池自動從隊列中獲取任務(wù),完成下載工作如绸。當(dāng)隊列為空時嘱朽,線程池里面的線程就進入等待狀態(tài),這也不會造成資源的占用怔接。
2.先完成隊列的設(shè)計
STL中提供了一個關(guān)于隊列的實現(xiàn)std::queue
搪泳,但是std::queue
并不是線程安全的,我們需要在此基礎(chǔ)上進行一些包裝扼脐,讓它在多線程的情況下也可以使用岸军。
我們的隊列需要擁有這幾個功能:
1.從隊列里彈出一個任務(wù)
2.向隊列里推送任務(wù)
3.了解隊列里的任務(wù)還有多少個
4.清除任務(wù)隊列
下面是一個簡單的線程安全隊列:
namespace multi_thread {
template<typename Task>
class thread_safe_queue {
public:
thread_safe_queue() :done_(false) {
}
void push(const Task& t) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(t);
ready_.notify_one();
}
void wait_and_pop(Task& t) {
std::unique_lock<std::mutex> lock(mtx_);
if (queue_.empty() && !done_)
ready_.wait(lock);
if (done_)
return;
t = queue_.front();
queue_.pop();
}
bool empty()const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
void clear() {
std::lock_guard<std::mutex> lock(mtx_);
for (int i = 0; i < queue_.size(); ++i)
queue_.pop();
}
void done() {
done_ = true;
ready_.notify_all();
}
private:
std::queue<Task> queue_;
mutable std::mutex mtx_;
std::condition_variable ready_;
std::atomic_bool done_;
};
}
簡單講解一下:
我們的線程安全隊列thread_safe_queue
遵循了STL的命名規(guī)范组力,對于任意容器猴誊,他們的判斷為空的函數(shù)簽名都是bool empty()const;
,清除容器中的內(nèi)容的都為void clear()
提佣。對于推入任務(wù)的函數(shù)肚吏,我們跟std::queue
保持了一致方妖,而彈出函數(shù)的命名我們做了一些改變,與函數(shù)的操作更加切合罚攀,表示對std::queue::pop()
的一種擴展党觅。
這里我們采用了std::mutex
作為我們的互斥量,每次訪問隊列中的內(nèi)容時斋泄,都用std::lock_guard
或者std::unique_lock
加鎖杯瞻,保證在同一時段只有一個線程可以讀寫隊列。std::lock_guard
是一個方便的RAII的體現(xiàn)炫掐,他會在析構(gòu)時自動調(diào)用unlock()
魁莉,std::unique_lock
與之的區(qū)別就是,你可以顯式調(diào)用unlock()
函數(shù)完成解鎖募胃,它更加的靈活沛厨,當(dāng)然你可以用{}
和std::lock_guard
完成相同的工作。我們將std::mutex_
設(shè)計成mutable
摔认,是因為我們需要在const member functionempty()
中使用它逆皮。
在push
函數(shù)中,每次完成push后就調(diào)用std::conditional_variable::notify_one()
來喚醒一個處于等待狀態(tài)的線程参袱,通知他隊列中已經(jīng)有任務(wù)了电谣,可以開始彈出了秽梅。在對應(yīng)了wait_and_pop
函數(shù)中,同樣有一個std::conditional_variable::wait()
剿牺,這個函數(shù)可以傳入一個或者兩個參數(shù)企垦,第一個參數(shù)表示要釋放的鎖,第二個函數(shù)是一個predicate晒来,不接受參數(shù)钞诡,返回值為bool
。他會在調(diào)用wait
時首先調(diào)用這個predicate湃崩,如果值是true
荧降,那么就會重新獲得鎖并繼續(xù)向下執(zhí)行,如果值是false
攒读,他會釋放鎖并且進入等待狀態(tài)朵诫,直到接收到前面所說的喚醒通知,他才會重新檢查predicate并做前述操作薄扁。這里我們沒有使用predicate剪返,僅使用了簡單的版本,他會在接收到喚醒通知時直接向下執(zhí)行邓梅。
再來看最后一個變量std::atomic_bool
脱盲,它是標(biāo)準(zhǔn)庫提供的一個原子類型,在訪問時標(biāo)準(zhǔn)保證在同一時刻只有一個線程在讀寫他日缨。原子類型是不可拷貝且不可移動的钱反,所以我們不能對他進行類內(nèi)初始化,這會調(diào)用copy constructor殿遂,正確的方法是使用constructor initializer list進行初始化诈铛,當(dāng)然你也可以在構(gòu)造函數(shù)體內(nèi)使用賦值運算符給其賦上一個值乙各,但是請務(wù)必在這兩個中間選擇一個墨礁。回到代碼耳峦,這個變量的作用很簡單恩静,就是讓彈出的函數(shù)wait_and_pop
直接結(jié)束,我們只有在需要析構(gòu)這個線程池時才會這么做蹲坷,具體做法在線程池中細說驶乾。
3.完成線程池的設(shè)計
在啟用線程池時,我們需要指定在線程池中存放的線程的數(shù)量循签,當(dāng)然我們也可以在后續(xù)為線程池添加新的線程级乐。
我們將使用STL提供的容器std::vector
來存放我們的線程,這些線程做的工作都是一致的县匠,他們將在隊列中不斷地彈出任務(wù)并執(zhí)行它风科,如果隊列中沒有任務(wù)了撒轮,他們就將進入等待狀態(tài)。
結(jié)合上面的隊列實現(xiàn)贼穆,給出一個簡單的線程池實現(xiàn):
namespace multi_thread {
template<typename Task>
class thread_safe_queue {
public:
thread_safe_queue() :done_(false) {
}
void push(const Task& t) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(t);
ready_.notify_one();
}
void wait_and_pop(Task& t) {
std::unique_lock<std::mutex> lock(mtx_);
if (queue_.empty() && !done_)
ready_.wait(lock);
if (done_)
return;
t = queue_.front();
queue_.pop();
}
bool empty()const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
void clear() {
std::lock_guard<std::mutex> lock(mtx_);
for (int i = 0; i < queue_.size(); ++i)
queue_.pop();
}
void done() {
done_ = true;
ready_.notify_all();
}
private:
std::queue<Task> queue_;
mutable std::mutex mtx_;
std::condition_variable ready_;
std::atomic_bool done_;
};
class thread_pool {
public:
thread_pool(const int threadNum) :done_(false) {
for (int i = 0; i < threadNum; ++i) {
threads_.emplace_back(&thread_pool::workerThread, this);
}
}
~thread_pool() {
done_.store(true);
queue_.done();
for (auto& thread : threads_) {
if (thread.joinable())
thread.join();
}
}
void submit(const std::function<void(void)>& t) {
queue_.push(t);
}
bool isEmpty()const {
return queue_.empty();
}
void clearTask() {
queue_.clear();
}
private:
void workerThread() {
while (true) {
std::function<void(void)> t;
queue_.wait_and_pop(t);
if (done_)
break;
if (t)
t();
}
}
private:
std::vector<std::thread> threads_;
thread_safe_queue<std::function<void(void)>> queue_;
std::atomic_bool done_;
};
}
隊列里存放的是std::function<void(void)>
题山,是一個沒有參數(shù)和返回值的Callable
。在C++中故痊,常用的Callable包括lambda表達式顶瞳、function object(也就是operator()
)、std::bind
和std::function
愕秫。我個人比較喜歡使用lambda和std::bind
慨菱,他們都很簡單易用,在使用std::bind
時豫领,如果你的函數(shù)是一個member function抡柿,別忘了把this
作為第一個參數(shù)。
這個線程池非常簡單等恐,當(dāng)你調(diào)用構(gòu)造函數(shù)時洲劣,傳入一個你想要的線程數(shù)量,然后構(gòu)造函數(shù)體里就會開啟對應(yīng)數(shù)量的線程课蔬。std::thread
接受一個Callable和它的參數(shù)作為構(gòu)造函數(shù)的參數(shù)囱稽,構(gòu)造函數(shù)完成后線程立即啟動,這里的workerThread
是一個member function二跋,所以战惊,我們還得加上this
作為參數(shù),跟std::bind
一樣扎即。
workerThread
函數(shù)里面循環(huán)地從隊列中獲取任務(wù)并執(zhí)行吞获,如果沒有任務(wù),他就會處于等待狀態(tài)谚鄙「骺剑可以看到這里同樣有一個std::atomic_bool
的變量done_
,它用于使線程結(jié)束闷营。我們會在線程池的析構(gòu)函數(shù)里將這個變量設(shè)為true
烤黍,然后我們調(diào)用隊列的done
函數(shù),讓隊列退出傻盟。最后我們逐個檢查線程池中的線程速蕊,如果他們還在運行,等待他們結(jié)束娘赴。一個std::thread
在析構(gòu)前必須指定等待該線程結(jié)束還是分離該線程规哲,否則會發(fā)生錯誤。如果我們不做這些工作诽表,那么線程池在析構(gòu)時要么是其中的線程一直處于等待狀態(tài)唉锌,要么是一直處于循環(huán)狀態(tài)腥光,這會導(dǎo)致析構(gòu)函數(shù)停滯不前,也就是程序的卡死糊秆。
4.總結(jié)
這次實現(xiàn)的線程池只能說是實現(xiàn)了最基礎(chǔ)的功能武福,還缺少一些功能,例如任務(wù)返回值的缺失等痘番。本文實現(xiàn)的線程安全隊列也十分簡單捉片,還有很多可以優(yōu)化的地方。