bthread概述
[TOC]
TaskControl
TaskControl用于管理brpc創(chuàng)建的worker pthread。
初始化
創(chuàng)建一個(gè) TaskControl
對象后孕锄,調(diào)用 init
函數(shù)進(jìn)行初始化吮廉,主要做:
- 啟動(dòng)定時(shí)器線程
- 創(chuàng)建指定數(shù)量的worker pthreads,用于執(zhí)行bthread
- 曝光一些bvar,用來統(tǒng)計(jì)狀態(tài)
- 等待至少一個(gè)worker pthread創(chuàng)建完成
Worker Pthread
每個(gè)worker pthread運(yùn)行 worker_thread
函數(shù)畸肆,這個(gè)函數(shù)主要做:
- 創(chuàng)建一個(gè)
TaskGroup
對象宦芦,調(diào)用init
函數(shù)初始化完成后加入到TaskControl
中。在brpc中轴脐,每個(gè)worker pthread有各自的TaskGroup调卑。 - 然后調(diào)用
run_main_task
函數(shù),開始調(diào)用bthread大咱。
worker pthread在任意時(shí)刻只會運(yùn)行一個(gè)bthread恬涧。它優(yōu)先運(yùn)行本地隊(duì)列,遠(yuǎn)程隊(duì)列的bthread碴巾,如果沒有溯捆,就從其它TaskGroup的本地隊(duì)列或遠(yuǎn)程隊(duì)列中偷取。如果仍然沒有找到厦瓢,就會睡眠直到有新的bthread可以運(yùn)行時(shí)被喚醒提揍。
管理TaskGroup
添加TaskGroup
worker pthread會在一開始就創(chuàng)建自己的TaskGroup結(jié)構(gòu)啤月,然后調(diào)用 TaskControl::_add_group
將該TaskGroup注冊到TaskControl中。
刪除TaskGroup
worker pthread在退出前劳跃,調(diào)用 TaskContol::_destroy_group
將對應(yīng)的TaskGroup從TaskControl中刪除顽冶。
線程安全
不管是添加還是刪除接口,都有可能被多個(gè)線程同時(shí)調(diào)用售碳,所以brpc使用一個(gè)互斥量 _modify_group_mutex
來保護(hù)强重。
偷任務(wù)
前面說到,worker pthread 如果沒有bthread可以執(zhí)行贸人,就會嘗試從其他worker pthread 偷取间景,調(diào)用的是 TaskControl::steal_task
接口,
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
從上面可以看到艺智,偷任務(wù)的一些設(shè)計(jì)要點(diǎn):
- 盡量減少與其它也在偷任務(wù)的worker pthread的沖突倘要,使得它們盡量分散開,采用參數(shù)
seed
和offset
決定TaskGroup的索引十拣,而不是輪詢封拧。 - 避免沒有bthread時(shí)的無限循環(huán),限定了每次偷任務(wù)最多可查找
_ngroup
個(gè)TaskGroup夭问。 - 這個(gè)接口會與增加泽西、刪除TaskGroup接口并發(fā),理論上應(yīng)該使用鎖保護(hù)缰趋。但是出于性能考慮捧杉,這里沒有用鎖。所以秘血,我們需要更小心地分析多線程并發(fā)的競爭情況味抖。
我們先來分析下增加一個(gè)TaskGroup對偷任務(wù)的影響。從上面的代碼可知灰粮,讀取_ngroup
的值時(shí)采用了butil::memory_order_acquire
內(nèi)存序仔涩,這保證了當(dāng)我們看到_ngroup
的最新值時(shí),_groups
數(shù)組中的指針?biāo)赶虻腡askGroup對象都已經(jīng)初始化完成了粘舟。不會出現(xiàn)這種情況:指針不為空熔脂,但是指針?biāo)傅膶ο筮€沒初始化完成,引起程序crash蓖乘。如果我們看到的是_ngroup
的舊值锤悄,我們會錯(cuò)過這個(gè)新加的TaskGroup。但是這個(gè)沒有關(guān)系嘉抒,這個(gè)TaskGroup多余的bthread也會很快得到調(diào)度零聚。因?yàn)?code>TaskControl::_add_group 函數(shù)在最后會調(diào)用 TaskControl::signal_task
來喚醒休眠的worker pthread來偷任務(wù)。
我們再來分析下刪除一個(gè)TaskGroup對偷任務(wù)的影響。如果worker pthread找到自己的TaskGroup后隶症,立刻將它從 _groups
中刪除政模,就可能使得 TaskControl::steal_task
解引用一個(gè)已析構(gòu)的TaskGroup。所以蚂会,在 TaskContol::_destroy_group
中找到對應(yīng)的TaskGroup后沒有立刻刪除淋样,而是注冊一個(gè)定時(shí)器任務(wù),在1秒后再刪除胁住。
綜上所述趁猴,在 TaskControl::steal_task
中遍歷 _groups
數(shù)組是可以不需要加鎖的。
通知任務(wù)
當(dāng)有新的bthread需要調(diào)度時(shí)彪见,調(diào)用 TaskControl::signal_task
喚醒那些休眠的worker pthread儡司。如果沒有足夠的休眠worker pthread并且當(dāng)前worker pthread的數(shù)量小于設(shè)置的并發(fā)度,那么調(diào)用 TaskControl::add_workers
增加worker pthread余指。
TaskGroup
初始化
創(chuàng)建一個(gè) TaskGroup
對象后捕犬,調(diào)用 init
函數(shù)進(jìn)行初始化,主要做:
- 初始化
_rq
隊(duì)列酵镜,用來存儲worker pthread創(chuàng)建的bthread碉碉。隊(duì)列中的bthread可能被其他TaskGroup偷取。 - 初始化
_remote_rq
隊(duì)列淮韭,大小為_rq
隊(duì)列的一半垢粮,用來存儲non-worker pthread創(chuàng)建的bthread。隊(duì)列中的bthread可能被其他TaskGroup偷取缸濒。 - 為worker pthread分配
TaskMeta
對象和棧足丢。
偷任務(wù)
如果worker pthread執(zhí)行完TaskGroup _rq
隊(duì)列中的bthread后粱腻,會嘗試:
- 運(yùn)行
_remote_rq
隊(duì)列中的bthread庇配。 - 偷取其他 TaskGroup 對象
_rq
隊(duì)列或_remote_rq
隊(duì)列的bthread。
調(diào)度bthread
棧
bthread定義了幾種棧類型绍些,分別代表不同的棧大欣袒拧:
棧類型 | 說明 | 大小 |
---|---|---|
STACK_TYPE_MAIN | worker pthread的棧 | 默認(rèn)大小是8MB |
STACK_TYPE_PTHREAD | 使用worker pthread的棧,不需要額外分配 | 默認(rèn)大小是8MB |
STACK_TYPE_SMALL | 小型棧 | 32KB |
STACK_TYPE_NORMAL | 默認(rèn)棧 | 1MB |
STACK_TYPE_LARGE | 大型棧 | 8MB |
調(diào)用 bthread_start_*
創(chuàng)建的bthread默認(rèn)情況下棧大小是1MB(STACK_TYPE_NORMAL)柬批,可通過 attr
參數(shù)來選擇棧的大小啸澡。如果分配棧空間失敗氮帐,將其棧類型改為STACK_TYPE_PTHREAD嗅虏,即直接在worker pthread的棧上運(yùn)行該bthread。另外上沐,pthread task的棧類型也是STACK_TYPE_PTHREAD皮服。注意,worker pthread的棧不是brpc分配的,不能釋放它龄广。至于STACK_TYPE_MAIN硫眯,是為了告訴 get_stack
函數(shù)只需要分配棧控制結(jié)構(gòu)择同,不需要分配椓饺耄空間,因?yàn)閣orker pthread已經(jīng)由系統(tǒng)線程庫分配了椙貌牛空間裹纳。
這里要注意,創(chuàng)建bthread時(shí)沒有立刻為其分配棧紧武,直到第一次運(yùn)行時(shí)才會分配痊夭。這個(gè)便于我們優(yōu)化內(nèi)存的使用,如果前一個(gè)bthread即將退出并且棧類型和下一個(gè)bhtread相同脏里,我們可以直接轉(zhuǎn)移棧而不需要重新分配她我。
任務(wù)運(yùn)行函數(shù)
雖然調(diào)用 bthread_start_*
創(chuàng)建新的bthread傳入了上下文函數(shù)及其參數(shù),但是bthread上不是直接運(yùn)行該上下文函數(shù)迫横,實(shí)際上運(yùn)行的是
TaskGroup::task_runner
函數(shù),
- 執(zhí)行上一個(gè)bthread設(shè)置的
_last_context_remained
函數(shù)番舆,可能是用來釋放棧空間之類的矾踱。如果是在worker pthread的棧上運(yùn)行task恨狈,這個(gè)步驟會跳過。 - 運(yùn)行上下文函數(shù)呛讲。執(zhí)行過程中可能有多次跳出禾怠、跳回過程,所在的worker pthread可能也變了贝搁。
- 執(zhí)行完之后吗氏,執(zhí)行一些清理動(dòng)作,例如釋放tls等雷逆。
- 設(shè)置下一個(gè)bthread需要運(yùn)行的
_last_context_remained
函數(shù)弦讽。 - 找到下一個(gè)可以運(yùn)行的bthread,查找次序依次為本地隊(duì)列膀哲,遠(yuǎn)程隊(duì)列往产,其它TaskGroup的本地隊(duì)列或遠(yuǎn)程隊(duì)列。
- 如果沒有找到某宪,就返回worker pthread仿村。
要注意,有兩個(gè)地方會調(diào)用到TaskGroup::task_runner
函數(shù):
- bthread執(zhí)行時(shí)兴喂,實(shí)際執(zhí)行的是
TaskGroup::task_runner
函數(shù)蔼囊,而在該函數(shù)里面調(diào)用上下文函數(shù)包颁。 - worker pthread 執(zhí)行 pthread task時(shí)會調(diào)用
TaskGroup::task_runner
函數(shù)。
所以压真,兩種情況下會回到worker pthread:
- 沒有需要執(zhí)行的bthread娩嚼。返回后,可以調(diào)用
TaskGroup::wait_task
從其它TaskGroup偷取任務(wù)岳悟。如果仍然沒有,就休眠等待喚醒泼差。 - 下一個(gè)task使用的是worker-pthread的棧贵少。
調(diào)度策略
常見的調(diào)度策略:
- 星切:主線程 --> 協(xié)程1 --> 主線程 --> 協(xié)程2 --> ... -->主線程
- 環(huán)切:主線程 --> 協(xié)程1 --> 協(xié)程2 --> 協(xié)程3 --> ... -->主線程
從上可以看出滔灶,環(huán)切比星切少了一半的切換次數(shù)录平,效率更高斗这。bthread采用的是環(huán)切。
TaskMeta
TaskMeta
是bthread的控制結(jié)構(gòu)钮莲,管理bthread的相關(guān)信息免钻,包括
- 棧
- 運(yùn)行函數(shù)及其參數(shù)
待續(xù)。
tid
每個(gè)bthread都有一個(gè)唯一ID崔拥,大小為64bit极舔,
- 高32bit是版本號
- 低32biit是bthread控制結(jié)構(gòu)的起始地址
在 task_group_inl.h
文件中定義了tid相關(guān)函數(shù),
// Utilities to manipulate bthread_t
inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}
inline butil::ResourceId<TaskMeta> get_slot(bthread_t tid) {
butil::ResourceId<TaskMeta> id = { (tid & 0xFFFFFFFFul) };
return id;
}
inline uint32_t get_version(bthread_t tid) {
return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}