(一)父子進(jìn)程通信
nginx父子進(jìn)程之間或子進(jìn)程之間勢(shì)必涉及到進(jìn)程間通信狈蚤,這里采用了socketpair進(jìn)行通信茫孔。在Linux下誉裆,可使用socketpair函數(shù)創(chuàng)造一對(duì)的政勃、相互連接的域套接字眨层。套接字對(duì)建立的通道是雙向的涣楷,每一端都可以進(jìn)行讀寫
// socketpair — create a pair of connected sockets
int socketpair(int domain, int type, int protocol, int *sv);
前一篇文章(nginx啟動(dòng)過(guò)程中的進(jìn)程創(chuàng)建)中提到了nginx啟動(dòng)子進(jìn)程的函數(shù)凿将,ngx_spawn_process此叠,nginx進(jìn)程間通信的套接字就是在這個(gè)函數(shù)中創(chuàng)建的堤撵。其主要代碼如下:
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}
/* ...*/
pid = fork();
switch (pid) {
/* ... */
}
重點(diǎn)在這一句:
socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel)
AF_UNIX用于同一臺(tái)機(jī)器上的進(jìn)程間通信;SOCK_STREAM提供的穩(wěn)定數(shù)據(jù)傳輸仁讨,即TCP協(xié)議; ngx_processes是全局變量,ngx_processes[s].channel用于指定存儲(chǔ)套接字的容器实昨。
由于socketpair函數(shù)是在fork之前調(diào)用的洞豁,所以在fork之后,父子進(jìn)程都會(huì)擁有該套接字荒给。那么丈挟,只要父進(jìn)程使用channel[0]志电,子進(jìn)程使用channel[1],就能實(shí)現(xiàn)父子進(jìn)程之間的通信挑辆。
(二)子進(jìn)程間通信
若是不同子進(jìn)程之間想要通信,又該如何呢之拨?既然套接字存儲(chǔ)在ngx_processes[s]
.channel中,而ngx_processes又是全局變量蚀乔,只要子進(jìn)程的ngx_processes中存儲(chǔ)著其它所有子進(jìn)程的channel信息烁竭,就能給任意一個(gè)子進(jìn)程發(fā)送消息。
顯然吉挣,ngx_processes是從父進(jìn)程處繼承而來(lái)的派撕,雖然父進(jìn)程中的ngx_processes始終是最新最全,但子進(jìn)程之間是有先后順序的睬魂。比如說(shuō)终吼,在用戶自定義工作進(jìn)程為5個(gè)時(shí),nginx的master進(jìn)程將for循環(huán)5次產(chǎn)生5個(gè)子進(jìn)程氯哮,則第5個(gè)子進(jìn)程可以從父進(jìn)程處獲得前四個(gè)子進(jìn)程的channel信息际跪,而第4個(gè)子進(jìn)程繼承父進(jìn)程時(shí),由于第5個(gè)子進(jìn)程還未產(chǎn)生,自然無(wú)法獲得第5個(gè)子進(jìn)程的channel信息姆打。
如此一來(lái)良姆,后產(chǎn)生的子進(jìn)程擁有其“哥哥”們的channel信息,可以給“哥哥”們發(fā)消息幔戏,而“哥哥”們沒(méi)有后產(chǎn)生的子進(jìn)程的channel信息玛追,便無(wú)法給“弟弟”們發(fā)消息。解決辦法很簡(jiǎn)單闲延,只需把“弟弟”們的相關(guān)信息發(fā)送給“哥哥”們即可痊剖。
執(zhí)行這個(gè)任務(wù)的,正是父進(jìn)程(master)垒玲。在ngx_start_worker_processes中的定義如下:
for (i = 0; i < n; i++) {
cpu_affinity = ngx_get_cpu_affinity(i);
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
"worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
}
ngx_spawn_process函數(shù)用于產(chǎn)生子進(jìn)程陆馁,然后將子進(jìn)程的信息存儲(chǔ)在結(jié)構(gòu)體變量ch中,最后用 ngx_pass_open_channel函數(shù)將存儲(chǔ)了新子進(jìn)程相關(guān)信息的結(jié)構(gòu)體ch發(fā)送給其它子進(jìn)程侍匙。ngx_pass_open_channel的定義如下:
static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{
ngx_int_t i;
for (i = 0; i < ngx_last_process; i++) {
/* ... */
ngx_write_channel(ngx_processes[i].channel[0],
ch, sizeof(ngx_channel_t), cycle->log);
}
}
代碼很清晰氮惯,就是不斷地把子進(jìn)程的信息通過(guò)第i個(gè)子進(jìn)程的channel[0]發(fā)送,當(dāng)?shù)趇個(gè)進(jìn)程接收到結(jié)構(gòu)體ch(存儲(chǔ)著新子進(jìn)程的pid和channel信息)后想暗,再進(jìn)行相關(guān)處理即可。
那么當(dāng)子進(jìn)程收到其它新子進(jìn)程的信息時(shí)帘不,具體是怎么處理的呢说莫?
在上一篇文章中,已經(jīng)知道在子進(jìn)程產(chǎn)生后寞焙,會(huì)執(zhí)行ngx_worker_process_cycle函數(shù)储狭。此函數(shù)的開(kāi)頭將調(diào)用ngx_worker_process_init函數(shù)初始化子進(jìn)程,而在初始化過(guò)程中捣郊,將把自己從父進(jìn)程繼承的ngx_channel[1](channel[0]被用于父進(jìn)程或其它子進(jìn)程寫消息)加入到讀事件監(jiān)聽(tīng)集里辽狈。
下面先看一看ngx_worker_process_init的定義:
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler) == NGX_ERROR)
{
/* fatal */
exit(2);
}
其中,ngx_channel 是全局變量呛牲,在ngx_spawn_process中被賦值為:
ngx_channel = ngx_processes[s].channel[1];
ngx_channel_handler是對(duì)應(yīng)的處理函數(shù)刮萌,在該函數(shù)中,對(duì)此事件的處理方式為:
case NGX_CMD_OPEN_CHANNEL:
/*...*/
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
也就是將接收到的新子進(jìn)程的pid和channel信息存儲(chǔ)到全局變量ngx_processes的相應(yīng)位置中娘扩,如此一來(lái)着茸,進(jìn)程之間都互相有了彼此的channel信息和pid號(hào)琐旁,也就可以互相通信了。
(三)關(guān)于nginx_channel.c
這里關(guān)注一下nginx_channel.c敬特。該文件定義了有關(guān)nginx利用channel通信的函數(shù)。其結(jié)構(gòu)如下:
可以看到辣之,只有四個(gè)函數(shù)召烂,分別對(duì)應(yīng)著寫奏夫、讀历筝、添加事件監(jiān)聽(tīng)和關(guān)閉channel的功能。
-
關(guān)于寫消息的函數(shù)麻削,其定義如下春弥。
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
該函數(shù)通過(guò)socket s發(fā)送大小為size字節(jié)的消息ch匿沛。
-
關(guān)于讀消息的函數(shù),其定義如下鳖孤。
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
該函數(shù)通過(guò)socket s讀取大小為size字節(jié)的消息ch苏揣。
-
關(guān)于關(guān)閉channel的函數(shù)推姻,其定義如下。
void ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log) { if (close(fd[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "close() channel failed"); } if (close(fd[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "close() channel failed"); } }
該函數(shù)將socket_pair的兩個(gè)文件描述符依次關(guān)閉吐葱。
-
關(guān)于ngx_add_channel_event弟跑,其定義為
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler)
這里大概是將某文件描述符添加到某事件集中孟辑,涉及到nginx的事件監(jiān)聽(tīng)和處理,其具體的運(yùn)行原理饲嗽,待我下一章詳述貌虾。