本文同步發(fā)表在豆米的博客
既前篇nodejs深入學(xué)習(xí)系列之libuv基礎(chǔ)篇(一)學(xué)習(xí)的基本概念之后藕帜,我們?cè)诘诙獙Т蠹胰W(xué)習(xí)為什么libuv的并發(fā)能力這么優(yōu)秀讯柔?這并發(fā)后面的實(shí)現(xiàn)機(jī)制是什么?
3揪罕、libuv的事件循環(huán)機(jī)制
好了德澈,了解了上述的基本概念之后,我們來扯一扯Libuv的事件循環(huán)機(jī)制驮樊,也就是event-loop。還是以[譯文]libuv設(shè)計(jì)思想概述一文展示的兩張圖片片酝,再結(jié)合代碼來學(xué)習(xí)整個(gè)Libuv的事件循環(huán)機(jī)制囚衔。
3.1、解密第一張圖片
首先是第一張圖片:
細(xì)心的童鞋會(huì)發(fā)現(xiàn)這張圖片被我用紅框分割成了兩部分雕沿,為什么呢练湿?因?yàn)長(zhǎng)ibuv處理fs I/O和網(wǎng)絡(luò)I/O用了兩套機(jī)制去實(shí)現(xiàn),或者說更全面的講應(yīng)該是fs I/O和 DNS等實(shí)現(xiàn)的方式和網(wǎng)絡(luò) I/O是不一樣的审轮。為什么這么說呢肥哎?請(qǐng)看下圖,你就會(huì)明白了:
上圖左側(cè)是libuv的兩大基石:event-loop
線程和thread pool
断国。而從圖的右側(cè)有兩條軌跡分別連接到這兩個(gè)基石贤姆,我特別用紅色加粗標(biāo)記,可以看到:
- Network I/O最后的調(diào)用都會(huì)歸結(jié)到
uv__io_start
這個(gè)函數(shù)稳衬,而該函數(shù)會(huì)將需要執(zhí)行的I/O事件和回調(diào)塞到watcher
隊(duì)列中霞捡,之后uv_run
函數(shù)執(zhí)行的Poll for I/O
階段做的便是從watcher隊(duì)列中取出事件調(diào)用系統(tǒng)的接口,這是其中一條主線 - Fs I/O和DNS的所有操作都會(huì)歸結(jié)到調(diào)用
uv__work_sumit
這個(gè)函數(shù)薄疚,而該函數(shù)就是執(zhí)行線程池初始化并調(diào)度的終極函數(shù)碧信。這是另外一條主線赊琳。
3.2、解密第二張圖片
接著我們來看第二張圖片砰碴,我們依然將該圖片進(jìn)行改造如下:
整個(gè)事件循環(huán)的執(zhí)行主體是在uv_run
中躏筏,每一次的循環(huán)經(jīng)歷的階段對(duì)應(yīng)的函數(shù)在上圖中已經(jīng)標(biāo)注出來,有幾個(gè)重點(diǎn)要說一下:
-
循環(huán)是否退出(也就是進(jìn)程是否結(jié)束)取決于以下幾個(gè)條件中的一個(gè):
1.1呈枉、loop->stop_flag變?yōu)?并且uv__loop_alive返回不為0趁尼,也就是調(diào)用
uv_stop
函數(shù)并且loop不存在活躍的和被引用的句柄、活躍的請(qǐng)求或正在關(guān)閉的句柄猖辫。1.2酥泞、事件循環(huán)運(yùn)行模式等于
UV_RUN_ONCE
或者是UV_RUN_NOWAIT
-
I/O循環(huán)的超時(shí)時(shí)間的確定:
2.1、如果時(shí)間循環(huán)運(yùn)行模式是
UV_RUN_NOWAIT
啃憎,超時(shí)為0芝囤。2.2、如果循環(huán)將要停止(代碼調(diào)用了
uv_stop()
)辛萍,超時(shí)為0悯姊。2.3、如果沒有活躍句柄或請(qǐng)求贩毕,超時(shí)為0悯许。
2.4、如果有任何Idle句柄處于活躍狀態(tài)辉阶,超時(shí)為0岸晦。
2.5、如果有等待關(guān)閉的句柄睛藻,超時(shí)為0。
2.6邢隧、如果以上情況都不匹配店印,則采用最近的計(jì)時(shí)器的超時(shí)時(shí)間-當(dāng)前時(shí)間(handle->timeout-loop->time),或者如果沒有活動(dòng)計(jì)時(shí)器倒慧,則為無(wú)窮大(即返回-1)按摘。
I/O循環(huán)的實(shí)現(xiàn)主體
uv__io_poll
根據(jù)系統(tǒng)不同,使用方式不一樣纫谅,如果對(duì)linux系統(tǒng)熟悉的話炫贤,epoll方式應(yīng)該也會(huì)了解。更多epoll的只是可以參考該文章:Linux IO模式及 select付秕、poll兰珍、epoll詳解
4、libuv的線程池
說完時(shí)間循環(huán)的主線程询吴,接下去我們繼續(xù)揭秘libuv的線程池掠河。
libuv提供了一個(gè)threadpool亮元,可用來運(yùn)行用戶代碼并在事件循環(huán)線程(event-loop)中得到通知。這個(gè)線程池在內(nèi)部用于運(yùn)行所有文件系統(tǒng)操作唠摹,以及getaddrinfo和getnameinfo請(qǐng)求爆捞。當(dāng)然如果你想要將自己的代碼放在線程池中運(yùn)行也是可以的,libuv提供除了uv_queue_work
的方法供開發(fā)者自己選擇勾拉。
它的默認(rèn)大小為4煮甥,但是可以在啟動(dòng)時(shí)通過將UV_THREADPOOL_SIZE
環(huán)境變量設(shè)置為任意值(最大值為1024)來更改它。
threadpool是全局的藕赞,并在所有事件循環(huán)中共享成肘。當(dāng)一個(gè)特定的函數(shù)使用threadpool(即當(dāng)使用uv_queue_work())時(shí),libuv預(yù)先分配并初始化UV_THREADPOOL_SIZE所允許的最大線程數(shù)找默。這導(dǎo)致了相對(duì)較小的內(nèi)存開銷(128個(gè)線程大約1MB)艇劫,但在運(yùn)行時(shí)提高了線程的性能。
關(guān)于線程的操作惩激,demo中的文件是:傳送門
在實(shí)例中店煞,我們用了三種方式來實(shí)現(xiàn)和線程相關(guān)的一些操作:
- 從線程池中調(diào)度一個(gè)線程運(yùn)行回調(diào): uv_queue_work
- 使用
uv_async_send
來“喚醒” event loop主線程并執(zhí)行uv_async_init
當(dāng)初設(shè)置好的回調(diào) - 使用uv_thread_create手動(dòng)創(chuàng)建一個(gè)線程來執(zhí)行
我們?cè)谏弦还?jié)中知道,想要?jiǎng)?chuàng)建線程池并讓他們工作风钻,唯一繞不開的函數(shù)是uv__work_submit
顷蟀,大家可以在libuv源碼中搜尋這個(gè),可以發(fā)現(xiàn)能夠找到的也就這幾個(gè)文件:(以u(píng)nix系統(tǒng)為例)
threadpool.c
1. uv__work_submit實(shí)現(xiàn)地方
2. uv_queue_work調(diào)用
fs.c
1. 宏定義POST調(diào)用骡技,所有的fs操作都會(huì)調(diào)用POST這個(gè)宏
getaddrinfo.c
1. uv_getaddrinfo調(diào)用
getnameinfo.c
1. uv_getnameinfo調(diào)用
細(xì)心的童鞋發(fā)現(xiàn)鸣个,每一處調(diào)用的地方都會(huì)傳一個(gè)叫做enum uv__work_kind kind
的操作,根據(jù)上面的調(diào)用布朦,可以看出分為了3種任務(wù)類型:
- UV__WORK_CPU:CPU 密集型囤萤,UV_WORK 類型的請(qǐng)求被定義為這種類型。因此根據(jù)這個(gè)分類是趴,不推薦在 uv_queue_work 中做 I/O 密集的操作涛舍。
- UV__WORK_FAST_IO:快 IO 型,UV_FS 類型的請(qǐng)求被定義為這種類型唆途。
- UV__WORK_SLOW_IO:慢 IO 型富雅,UV_GETADDRINFO 和 UV_GETNAMEINFO 類型的請(qǐng)求被定義為這種類型
4.2、線程池的初始化
學(xué)習(xí)線程池初始化之前肛搬,我們先得普及一下線程間的同步原語(yǔ)没佑。這樣后面看的代碼才不會(huì)糊里糊涂
libuv提供了mutex鎖
、讀寫鎖
温赔、信號(hào)量(Semaphores)
蛤奢、條件量(Conditions)
、屏障(Barriers)
五種手段來實(shí)現(xiàn)線程間資源競(jìng)爭(zhēng)互斥同步等操作。接下去會(huì)簡(jiǎn)單地介紹远剩,以便待會(huì)的初始化流程可以讀懂扣溺。
4.2.1、Mutex鎖
互斥鎖用于對(duì)資源的互斥訪問瓜晤,當(dāng)你訪問的內(nèi)存資源可能被別的線程訪問到锥余,這個(gè)時(shí)候你就可以考慮使用互斥鎖,在訪問的時(shí)候鎖住痢掠。對(duì)應(yīng)的使用流程可能是這樣的:
- 初始化互斥鎖:uv_mutex_init(uv_mutex_t* handle)
- 鎖住互斥資源:uv_mutex_lock(uv_mutex_t* handle)
- 解鎖互斥資源:uv_mutex_unlock(uv_mutex_t* handle)
在線程初始化的過程中驱犹,我們會(huì)初始化一個(gè)全局的互斥鎖:
static void init_threads(void) {
...
if (uv_mutex_init(&mutex))
abort()
...
}
而后在每個(gè)線程的執(zhí)行實(shí)體worker
函數(shù)中,就使用互斥鎖對(duì)下面幾個(gè)公共資源進(jìn)行鎖住與解鎖:
- 請(qǐng)求隊(duì)列 wq:線程池收到 UV__WORK_CPU 和 UV__WORK_FAST_IO 類型的請(qǐng)求后將其插到此隊(duì)列的尾部足画,并通過 uv_cond_signal 喚醒 worker 線程去處理雄驹,這是線程池請(qǐng)求的主隊(duì)列。
- 慢 I/O 隊(duì)列 slow_io_pending_wq:線程池收到 UV__WORK_SLOW_IO 類型的請(qǐng)求后將其插到此隊(duì)列的尾部淹辞。
- 慢 I/O 標(biāo)志位節(jié)點(diǎn) run_slow_work_message:當(dāng)存在慢 I/O 請(qǐng)求時(shí)医舆,用來作為一個(gè)標(biāo)志位放在請(qǐng)求隊(duì)列 wq 中,表示當(dāng)前有慢 I/O 請(qǐng)求象缀,worker 線程處理請(qǐng)求時(shí)需要關(guān)注慢 I/O 隊(duì)列的請(qǐng)求蔬将;當(dāng)慢 I/O 隊(duì)列的請(qǐng)求都處理完畢后這個(gè)標(biāo)志位將從請(qǐng)求隊(duì)列 wq 中移除。
static void worker(void* arg) {
...
uv_mutex_lock(&mutex);
...
uv_mutex_unlock(&mutex);
}
4.2.2央星、讀寫鎖
讀寫鎖沒有用在線程的啟動(dòng)過程中霞怀,我們?cè)?a target="_blank">demo中用來實(shí)踐對(duì)某個(gè)全局變量的訪問。具體使用步驟參考代碼莉给,這里就不再贅述毙石。
4.2.3、信號(hào)量
信號(hào)量是一種專門用于提供不同進(jìn)程間或線程間同步手段的原語(yǔ)颓遏。信號(hào)量本質(zhì)上是一個(gè)非負(fù)整數(shù)計(jì)數(shù)器徐矩,代表共享資源的數(shù)目,通常是用來控制對(duì)共享資源的訪問叁幢。一般使用步驟是這樣的:
- 初始化信號(hào)量:int uv_sem_init(uv_sem_t* sem, unsigned int value)
- 信號(hào)量加1:void uv_sem_wait(uv_sem_t* sem)
- 信號(hào)量減1:void uv_sem_post(uv_sem_t* sem)
- 信號(hào)量銷毀:void uv_sem_wait(uv_sem_t* sem)
在線程池初始化過程中丧蘸,我們利用信號(hào)量來等待所有的線程初始化結(jié)束,如下代碼:
static void init_threads(void) {
...
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
// 而每個(gè)線程的執(zhí)行實(shí)體都會(huì)去將信號(hào)量-1:
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
...
}
這樣只要所有的線程沒有初始化完成遥皂,uv_sem_destroy
這個(gè)函數(shù)是不會(huì)執(zhí)行到的,整個(gè)初始化函數(shù)也不會(huì)返回刽漂,此時(shí)的主線程也就阻塞在這里了演训。
4.2.4、條件變量
而條件變量通過允許線程阻塞和等待另一個(gè)線程發(fā)送信號(hào)的方法彌補(bǔ)了互斥鎖的不足贝咙。條件變量的內(nèi)部實(shí)質(zhì)上是一個(gè)等待隊(duì)列样悟,放置等待(阻塞)的線程,線程在條件變量上等待和通知,互斥鎖用來保護(hù)等待隊(duì)列(因?yàn)樗械木€程都可以放入等待隊(duì)列窟她,所以等待隊(duì)列成為了一個(gè)共享的資源陈症,需要被上鎖保護(hù)),因此條件變量通常和互斥鎖一起使用震糖。一般使用步驟是這樣的:
- 初始化條件變量:int uv_cond_init(uv_cond_t* cond)
- 線程阻塞等待被喚醒:void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex)
- 別的線程喚醒阻塞的線程:void uv_cond_signal(uv_cond_t* cond)
libuv使用條件變量來阻塞線程池和喚醒線程池录肯,使用代碼如下:
static void init_threads(void) {
if (uv_cond_init(&cond))
abort();
}
static void worker(void* arg) {
...
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
and we're at the threshold for that. */
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
...
}
}
static void post(QUEUE* q, enum uv__work_kind kind) {
...
if (idle_threads > 0)
uv_cond_signal(&cond)
...
}
從上面三處代碼可以看到線程啟動(dòng)之后就進(jìn)入阻塞狀態(tài),直到有I/O請(qǐng)求調(diào)用uv_cond_signal來喚醒吊说,按照uv_cond_wait
調(diào)用的順序形成一個(gè)等待隊(duì)列论咏,循環(huán)調(diào)用。
4.2.5颁井、屏障
在多線程的時(shí)候厅贪,我們總會(huì)碰到一個(gè)需求,就是需要等待一組進(jìn)程全部執(zhí)行完畢后再執(zhí)行某些事雅宾,由于多線程是亂序的养涮,無(wú)法預(yù)估線程都執(zhí)行到哪里了,這就要求我們有一個(gè)屏障作為同步點(diǎn)眉抬,在所有有屏障的地方都會(huì)阻塞等待贯吓,直到所有的線程都的代碼都執(zhí)行到同步點(diǎn),再繼續(xù)執(zhí)行后續(xù)代碼吐辙。使用步驟一般是:
- 初始化屏障需要達(dá)到的個(gè)數(shù):int uv_barrier_init(uv_barrier_t* barrier, unsigned int count)
- 每當(dāng)達(dá)到條件便將計(jì)數(shù)+1:int uv_barrier_wait(uv_barrier_t* barrier)
- 銷毀屏障:void uv_barrier_destroy(uv_barrier_t* barrier)
只有當(dāng)初始化計(jì)數(shù)的值為0宣决,主線程才會(huì)繼續(xù)執(zhí)行,具體使用方法可以參考demo昏苏。
至此借助于線程間同步原語(yǔ)尊沸,我們就嘩啦啦地把線程的初始化以及大概的工作機(jī)制講完了,總結(jié)出了下面一張圖:
4.1贤惯、線程池工作調(diào)度
線程池的工作利用的是主線程post
函數(shù)和各個(gè)線程的worker
函數(shù)洼专,post
函數(shù)的工作內(nèi)容如下:
- 判斷請(qǐng)求的請(qǐng)求類型是否是 UV__WORK_SLOW_IO:
- 如果是,將這個(gè)請(qǐng)求插到慢 I/O 請(qǐng)求隊(duì)列 slow_io_pending_wq 的尾部孵构,同時(shí)在請(qǐng)求隊(duì)列 wq 的尾部插入一個(gè) run_slow_work_message 節(jié)點(diǎn)作為標(biāo)志位屁商,告知請(qǐng)求隊(duì)列 wq 當(dāng)前存在慢 I/O 請(qǐng)求。
- 如果不是颈墅,將請(qǐng)求插到請(qǐng)求隊(duì)列 wq 尾部蜡镶。
- 如果有空閑的線程,喚醒某一個(gè)去執(zhí)行請(qǐng)求恤筛。
并發(fā)的慢 I/O 的請(qǐng)求數(shù)量不會(huì)超過線程池大小的一半官还,這樣做的好處是避免多個(gè)慢 I/O 的請(qǐng)求在某段時(shí)間內(nèi)把所有線程都占滿,導(dǎo)致其它能夠快速執(zhí)行的請(qǐng)求需要排隊(duì)毒坛。
static unsigned int slow_work_thread_threshold(void) {
return (nthreads + 1) / 2;
}
而各個(gè)線程的工作內(nèi)容如下:
- 等待喚醒望伦。
- 取出請(qǐng)求隊(duì)列 wq 或者慢 I/O 請(qǐng)求隊(duì)列的頭部請(qǐng)求去執(zhí)行林说。 =>
w->work(w);
- 通知 uv loop 線程完成了一個(gè)請(qǐng)求的處理。=>
uv_async_send
- 回到最開始循環(huán)的位置屯伞。
4.2腿箩、線程間的通信
上一小節(jié)清晰地描述了libuv的主線程是如何將請(qǐng)求分給各個(gè)線程以及線程是如何處理請(qǐng)求的,那么上述過程中還有一個(gè)步驟:線程池里面的線程完成工作之后是如何通知主線程的劣摇?主線程收到通知之后又繼續(xù)做了些什么珠移?
這個(gè)過程我們稱之為線程間的通信。上一小節(jié)中或者我們的demo中已經(jīng)知道饵撑,完成這個(gè)事情的主要函數(shù)是uv_async_send
剑梳,那么這個(gè)函數(shù)是如何實(shí)現(xiàn)的呢?請(qǐng)看下圖:
從圖中我們可以看到滑潘,借助于io poll與管道垢乙,線程池的線程寫入數(shù)據(jù),被主線程輪詢出來语卤,知道有消息過來追逮,就開始執(zhí)行對(duì)應(yīng)的回調(diào)函數(shù)。整個(gè)流程就是這么easy~