繼續(xù)閱讀《基于物理的渲染:從理論到實踐》一書凡桥,遇到了多線程的代碼蟀伸,發(fā)現(xiàn)自己看不懂它的代碼,于是找了本書補(bǔ)充了一下多線程的知識缅刽,終于理解了pbrt中的代碼啊掏,在此把學(xué)到的東西整理一下,希望能對讀者有所幫助衰猛。
提起多線程迟蜜,總有一種熟悉的陌生感,為什么呢啡省?因為我們可以從很多地方聽到線程的概念娜睛,比如4核8線程之類的髓霞,但是真正要用的話,卻總感覺無處下手畦戒,我們需要一種可以把概念轉(zhuǎn)換成代碼的方法方库。
多線程的應(yīng)用非常普遍,現(xiàn)在你幾乎找不到單線程的應(yīng)用了障斋,舉個簡單的例子薪捍,下圖是QQ擁有的線程:
Excuse me? 為啥你要這么多線程?Anyway配喳,從這個常用的軟件就能看出來多線程地運(yùn)用有多么普遍酪穿。
要使用多線程,我們必須解決下面四個問題:
- 為什么要用多線程晴裹?
- 如何進(jìn)行任務(wù)分割被济?
- 如何共享數(shù)據(jù)?
- 如何進(jìn)行線程同步涧团?
解決這些問題的過程只磷,也是我們使用多線程的過程。話不多說泌绣,我們開始吧钮追。
為什么要用多線程?
通常阿迈,我們用多線程有兩個原因:
- 提高性能元媚,加快運(yùn)行速度(99%的情況是如此)
- 分離關(guān)注點(diǎn)
第一個原因不用多說,很容易理解苗沧。如果我們有一個計算量很大的任務(wù)刊棕,我們自然希望把它拆成幾個子任務(wù),然后同時進(jìn)行待逞,充分利用計算機(jī)資源甥角,減少總體的運(yùn)行時間。
那么识樱,什么是分離關(guān)注點(diǎn)呢嗤无?雖然不太容易理解,但是用的也很普遍怜庸。舉個例子当犯,一個普通的應(yīng)用,需要一個UI線程和一個業(yè)務(wù)線程休雌。UI線程將用戶的動作捕捉灶壶,然后發(fā)給業(yè)務(wù)線程執(zhí)行,獲得業(yè)務(wù)線程的執(zhí)行結(jié)果后杈曲,再反饋給用戶驰凛。這里的UI線程起到的就是分離關(guān)注點(diǎn)的作用,它負(fù)責(zé)且僅負(fù)責(zé)與用戶的交互担扑,不負(fù)責(zé)任何具體的計算工作恰响。
如何進(jìn)行任務(wù)分割?
簡單分割
最簡單的情況就是沒有共享數(shù)據(jù)涌献,大家各干各的事胚宦,干完之后,整個任務(wù)也就完成了燕垃。由于不涉及數(shù)據(jù)共享枢劝,實現(xiàn)簡單分割的方式可以是直接開n個線程,然后把它們各自需要的數(shù)據(jù)傳過去卜壕,等著線程都執(zhí)行完畢您旁,收集結(jié)果。其執(zhí)行的方式如下圖所示:
快速排序非常適合多線程模式轴捎,其原理如下:
單線程百萬個整數(shù)排序執(zhí)行了3秒多的時間鹤盒,不知道多線程的話能有多快。
任務(wù)管線
如果我們的任務(wù)是對不同的數(shù)據(jù)進(jìn)行相同的一系列操作侦副,那么我們就可以使用任務(wù)管線來提高執(zhí)行效率侦锯。
任務(wù)管線的方法正如它的名字所“明示”的那樣,將任務(wù)分成多個階段秦驯,每個階段使用一個線程去執(zhí)行尺碰,這樣任務(wù)的一個階段執(zhí)行完后,就到下一個階段繼續(xù)執(zhí)行译隘,像是流水線一樣葱蝗,所有的線程都有任務(wù)做,直到所有的數(shù)據(jù)都操作完畢细燎。
管線這種設(shè)計并不只有多線程會用两曼,很多地方都用到了這個方法,最常見的就是CPU玻驻。CPU會將一個指令分解成多個階段悼凑,最經(jīng)典的是5個階段:
獲取指令(IF)
解碼指令并從寄存器獲取操作數(shù)(ID/RF)
執(zhí)行(EX)
讀取內(nèi)存(MEM)
寫回寄存器(WB)
執(zhí)行指令也是每一個階段都有執(zhí)行的元件,所有元件可以同時運(yùn)行璧瞬,提高了指令執(zhí)行的效率户辫。事實證明,這是一個非常好的策略嗤锉,CPU的運(yùn)行速度也大大提高渔欢,甚至有些CPU把指令分成幾十個階段以提高效率。
如何共享數(shù)據(jù)瘟忱?
多線程需要考慮如何共享數(shù)據(jù)是因為線程的調(diào)度精度實在是太高了奥额,在一個指令到另一個指令的間隔苫幢,有可能就切換成另一個線程運(yùn)行了,而我們寫的每一行代碼都會被分解成多個指令執(zhí)行垫挨,舉個簡單的粒子:
假如說有一個變量i韩肝,要把它自增1,我們使用代碼++i;就行了九榔。這就夠了嗎哀峻?遠(yuǎn)遠(yuǎn)不夠。++i在執(zhí)行過程中會分解成多個指令哲泊,在這些指令的間隔剩蟀,另外一個線程可能就執(zhí)行了,然后也是獲取i的數(shù)據(jù)切威,對其進(jìn)行修改育特,然后再切換回來,對i進(jìn)行修改牢屋,這樣另一個線程的操作完全就被覆蓋了且预。這種情況稱為競爭條件(race condition)。參考如下的代碼:
#include <iostream>
#include <thread>
int32_t i = 0;
void Add100Times1()
{
for (int32_t j = 0; j < 100000; ++j)
++i;
}
void Add100Times2()
{
for (int32_t j = 0; j < 100000; ++j)
++i;
}
int main()
{
std::thread t1(Add100Times1);
std::thread t2(Add100Times2);
std::cout << "The final i is " << std::endl;
t1.join();
t2.join();
std::cout << i << std::endl;
}
上面的代碼的輸出結(jié)果可能是185524,166968,200000,186661等等烙无。
這問題就非常嚴(yán)重了锋谐,如果我在寫代碼的時候都無法控制我的數(shù)據(jù),那運(yùn)行后的結(jié)果怎么可能對截酷?好在涮拗,我們有方法可以把數(shù)據(jù)保護(hù)起來,使得當(dāng)一個線程使用數(shù)據(jù)的時候迂苛,不允許其他的線程使用三热,這就要用到互斥體(mutex)。
互斥體的使用方式如下所示:
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 1
some_list.push_back(new_value); // 2
}
第1行代碼是獲得一個互斥體三幻,std::lock_guard會在初始化的時候獲得互斥體就漾,在離開作用于的時候自動釋放互斥體。這樣我們就不用擔(dān)心會忘了釋放而卡死其他線程了念搬。
C++17中可以使用std::scoped_guard來代替std::lock_guard抑堡。并且這是其推薦的做法,而std::scope_guard會逐漸廢棄朗徊。
還有一種使用互斥的方式是用std::unique_lock
首妖。它提供了它提供了lock和unlock操作,也就是說這可以循環(huán)利用爷恳。在初始化的時候它也會獲得互斥體有缆,離開作用于的時候也會自動釋放。也就是說,它比std::scoped_guard要靈活很多棚壁。
對上面的代碼使用互斥體后的效果就不貼出來了杯矩,用腳指頭想想也能知道結(jié)果是200000。
如何進(jìn)行線程同步灌曙?
同步的意思是管理和調(diào)度線程菊碟。像是管理一個團(tuán)隊一樣节芥,我們必須要知道團(tuán)隊中的每個人在做哪些事在刺,做到什么階段了,需要什么資源等等头镊。當(dāng)我們有很多線程的時候蚣驼,我們就必須采用某些方法來知道線程的狀態(tài),從而可以控制線程的執(zhí)行相艇。比如某一個線程需要在另一個線程執(zhí)行到一定階段之后才能開始執(zhí)行颖杏,或者某一個線程執(zhí)行得到某一個結(jié)果,然后另一個線程獲得這個結(jié)果然后繼續(xù)執(zhí)行等等坛芽。
最簡單也是使用地最廣泛的方式是條件變量(condition variable)留储,C++標(biāo)準(zhǔn)庫(C++ 11)就有提供,std::condition_variable咙轩。它的使用方法是:
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;}); // If the condition is not satified, the mutex will be unlocked.
// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";
// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed \n";
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again.(see notify_one for details)
lk.unlock();
cv.notify_one();
}
int main()
{
using namespace std::chrono_literals;
std::thread worker (worker_thread);
data = "Example data";
// send data to the worker thread
std::this_thread::sleep_for(1s);
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one(); // notify a thread to check its condition
// wait for the worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
condition_variable.wait()用來等待直到條件滿足获讳,返回true。然后繼續(xù)執(zhí)行下去活喊。condition_variable.notify_one()表示激活一個在此條件變量上等待的線程(如果有多個線程丐膝,那么無法確定哪一個被激活),激活之后钾菊,condition_variable會執(zhí)行其關(guān)聯(lián)的檢測函數(shù)帅矗,如果檢測函數(shù)返回true,則獲得鎖煞烫,然后繼續(xù)往下執(zhí)行浑此。除此之外,condition_variable還提供一個notify_all函數(shù)滞详,表示激活所有在此條件變量上等待的線程凛俱,在pbrt的代碼中就用到了這個函數(shù)。
線程池
這算是對多線程的一個高級應(yīng)用茵宪,嚴(yán)格來說最冰,可以不出現(xiàn)在“基礎(chǔ)”之中。不過稀火,pbrt中使用了線程池暖哨,所以,把線程池的概念也放到文章中來,以便對代碼有更好的理解篇裁。
說起來沛慢,線程池的概念也非常容易理解。在應(yīng)用啟動的時候达布,創(chuàng)建n個線程团甲,所有的線程初始化完成后就讓它進(jìn)入等待狀態(tài),直到有任務(wù)喚醒它為止黍聂。
當(dāng)有任務(wù)時躺苦,等待的線程被喚醒,執(zhí)行任務(wù)产还,完成之后繼續(xù)進(jìn)入等待狀態(tài)匹厘,直到再次被喚醒。
讓線程進(jìn)入等待狀態(tài)非常容易做到脐区,一個條件變量就可以了愈诚,pbrt中就是這樣做的。
pbrt中的并發(fā)代碼
ParallelInit函數(shù)
void ParallelInit() {
CHECK_EQ(threads.size(), 0);
int nThreads = MaxThreadIndex();
ThreadIndex = 0;
// Create a barrier so that we can be sure all worker threads get past
// their call to ProfilerWorkerThreadInit() before we return from this
// function. In turn, we can be sure that the profiling system isn't
// started until after all worker threads have done that.
std::shared_ptr<Barrier> barrier = std::make_shared<Barrier>(nThreads);
// Launch one fewer worker thread than the total number we want doing
// work, since the main thread helps out, too.
for (int i = 0; i < nThreads - 1; ++i)
threads.push_back(std::thread(workerThreadFunc, i + 1, barrier));
barrier->Wait();
}
代碼很簡單牛隅,就是創(chuàng)建了多個線程炕柔,把這些線程放容器中保存。比較難理解的是barrier對象媒佣,它也是一種同步機(jī)制匕累,是pbrt中自定義的一個結(jié)構(gòu),作用是讓所有的線程都執(zhí)行到一定程度后丈攒,ParallelInit函數(shù)才繼續(xù)執(zhí)行下去哩罪,這就是barrier->Wait()的作用。
std::thread(workerThreadFunc, i + 1, barrier)
表示創(chuàng)建一個新的線程巡验,線程的入口函數(shù)是workerThreadFunc际插,參數(shù)是i+1和barrier。每一個線程都需要一個入口函數(shù)显设,主線程也一樣框弛,所以我們才有int main()。注意捕捂,線程創(chuàng)建后會立刻執(zhí)行入口函數(shù)瑟枫,不會等到所有線程創(chuàng)建好了,運(yùn)行到barrier->Wait()才開始執(zhí)行指攒。
接著來看看workerThreadFunc函數(shù):
static void workerThreadFunc(int tIndex, std::shared_ptr<Barrier> barrier) {
LOG(INFO) << "Started execution in worker thread " << tIndex;
ThreadIndex = tIndex;
// Give the profiler a chance to do per-thread initialization for
// the worker thread before the profiling system actually stops running.
ProfilerWorkerThreadInit();
// The main thread sets up a barrier so that it can be sure that all
// workers have called ProfilerWorkerThreadInit() before it continues
// (and actually starts the profiling system).
barrier->Wait();
// Release our reference to the Barrier so that it's freed once all of
// the threads have cleared it.
barrier.reset();
std::unique_lock<std::mutex> lock(workListMutex);
while (!shutdownThreads) {
if (reportWorkerStats) {
ReportThreadStats();
if (--reporterCount == 0)
// Once all worker threads have merged their stats, wake up
// the main thread.
reportDoneCondition.notify_one();
// Now sleep again.
workListCondition.wait(lock);
} else if (!workList) {
// Sleep until there are more tasks to run
workListCondition.wait(lock);
} else {
// Get work from _workList_ and run loop iterations
ParallelForLoop &loop = *workList;
// Run a chunk of loop iterations for _loop_
// Find the set of loop iterations to run next
int64_t indexStart = loop.nextIndex;
int64_t indexEnd =
std::min(indexStart + loop.chunkSize, loop.maxIndex);
// Update _loop_ to reflect iterations this thread will run
loop.nextIndex = indexEnd;
if (loop.nextIndex == loop.maxIndex) workList = loop.next;
loop.activeWorkers++;
// Run loop indices in _[indexStart, indexEnd)_
lock.unlock();
for (int64_t index = indexStart; index < indexEnd; ++index) {
uint64_t oldState = ProfilerState;
ProfilerState = loop.profilerState;
if (loop.func1D) {
loop.func1D(index);
}
// Handle other types of loops
else {
CHECK(loop.func2D);
loop.func2D(Point2i(index % loop.nX, index / loop.nX));
}
ProfilerState = oldState;
}
lock.lock();
// Update _loop_ to reflect completion of iterations
loop.activeWorkers--;
if (loop.Finished()) workListCondition.notify_all();
}
}
LOG(INFO) << "Exiting worker thread " << tIndex;
}
我們最關(guān)心的是std::unique_lock<std::mutex> lock(workListMutex);
這一行之后的代碼慷妙。進(jìn)入循環(huán)中,如果工作列表中沒有任務(wù)了允悦,那么就在條件變量上等待膝擂,這個功能是由這兩行代碼實現(xiàn)的:
} else if (!workList) {
// Sleep until there are more tasks to run
workListCondition.wait(lock);
如果還有任務(wù),就從工作列表中取一個任務(wù)出來。要注意的是架馋,取任務(wù)這個操作是被互斥體包圍的狞山,這點(diǎn)在上面的代碼中就可以看到。取完之后叉寂,真正執(zhí)行任務(wù)的時候萍启,互斥體就被釋放了(lock.unlock();),然后執(zhí)行任務(wù)屏鳍。在任務(wù)執(zhí)行的過程中勘纯,其他線程可以從工作列表中獲取任務(wù)執(zhí)行,這是我們使用多線程的目的孕蝉。完成任務(wù)后屡律,繼續(xù)獲得互斥體(lock.lock()腌逢;)繼續(xù)循環(huán)看看是否還有任務(wù)降淮。
這些操作與我們之前學(xué)到的線程池是一致的,說明pbrt中實現(xiàn)了線程池搏讶。
ParallelFor函數(shù)
void ParallelFor(std::function<void(int64_t)> func, int64_t count,
int chunkSize) {
CHECK(threads.size() > 0 || MaxThreadIndex() == 1);
// Run iterations immediately if not using threads or if _count_ is small
if (threads.empty() || count < chunkSize) {
for (int64_t i = 0; i < count; ++i) func(i);
return;
}
// Create and enqueue _ParallelForLoop_ for this loop
ParallelForLoop loop(std::move(func), count, chunkSize,
CurrentProfilerState());
workListMutex.lock();
loop.next = workList;
workList = &loop;
workListMutex.unlock();
// Notify worker threads of work to be done
std::unique_lock<std::mutex> lock(workListMutex);
workListCondition.notify_all();
// Help out with parallel loop iterations in the current thread
while (!loop.Finished()) {
// Run a chunk of loop iterations for _loop_
// Find the set of loop iterations to run next
int64_t indexStart = loop.nextIndex;
int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex);
// Update _loop_ to reflect iterations this thread will run
loop.nextIndex = indexEnd;
if (loop.nextIndex == loop.maxIndex) workList = loop.next;
loop.activeWorkers++;
// Run loop indices in _[indexStart, indexEnd)_
lock.unlock();
for (int64_t index = indexStart; index < indexEnd; ++index) {
uint64_t oldState = ProfilerState;
ProfilerState = loop.profilerState;
if (loop.func1D) {
loop.func1D(index);
}
// Handle other types of loops
else {
CHECK(loop.func2D);
loop.func2D(Point2i(index % loop.nX, index / loop.nX));
}
ProfilerState = oldState;
}
lock.lock();
// Update _loop_ to reflect completion of iterations
loop.activeWorkers--;
}
}
ParallelFor函數(shù)主要做兩件事情:1佳鳖、把任務(wù)放到工作列表中去。2媒惕、和線程池中的線程一起完成任務(wù)系吩。第1件事容易理解,第2件事為啥要做呢妒蔚?
因為調(diào)用ParallelFor的線程也是資源啊穿挨,不能讓他閑著,和線程池中的線程一起工作肴盏,這樣也能加快速度科盛。
而且,執(zhí)行代碼與線程池中的線程有區(qū)別菜皂,就是它不需要去等待條件變量贞绵。它是被主線程調(diào)用的,如果任務(wù)完成恍飘,它還需要繼續(xù)往下執(zhí)行榨崩,所以直接檢測任務(wù)是否執(zhí)行完畢就行了。當(dāng)然章母,獲取任務(wù)的時候也需要互斥體保護(hù)母蛛。
下面來看使用ParallelFor的代碼:
// Compute Morton indices of primitives
std::vector<MortonPrimitive> mortonPrims(primitiveInfo.size());
ParallelFor([&](int i) {
// Initialize _mortonPrims[i]_ for _i_th primitive
PBRT_CONSTEXPR int mortonBits = 10;
PBRT_CONSTEXPR int mortonScale = 1 << mortonBits;
mortonPrims[i].primitiveIndex = primitiveInfo[i].primitiveNumber;
Vector3f centroidOffset = bounds.Offset(primitiveInfo[i].centroid);
mortonPrims[i].mortonCode = EncodeMorton3(centroidOffset * mortonScale);
}, primitiveInfo.size(), 512);
這段代碼的作用是將所有的primitive轉(zhuǎn)換成mortonPrims。就是將場景中的所有物體的包圍盒的中心坐標(biāo)乳怎,用Morton Code表示彩郊。這任務(wù)非常簡單,不涉及到數(shù)據(jù)共享,所以可以同時執(zhí)行轉(zhuǎn)換操作焦辅,這也就是為什么我們在線程中執(zhí)行這個函數(shù)的時候博杖,不用獲得互斥體的原因。
好了筷登,就到這里剃根,洗洗睡了:)
參考資料
C++ Concurrency in Action 2nd edition
C++并發(fā)編程第2版中文版:我同學(xué)翻譯的,質(zhì)量不錯
pbrt源碼第3版