前言
作為一個(gè)網(wǎng)絡(luò)框架流强,最為核心的就是消息的接受與發(fā)送。高效的 reactor
模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole
中的 reactor
模塊。
Reactor
的數(shù)據(jù)結(jié)構(gòu)
-
Reactor
的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先object
是具體Reactor
對象的首地址泪漂,ptr
是擁有Reactor
對象的類的指針廊营, -
event_num
存放現(xiàn)有監(jiān)控的fd
個(gè)數(shù)歪泳,max_event_num
存放允許持有的最大事件數(shù)目,flag
為標(biāo)記位露筒, -
id
用于存放對應(yīng)reactor
的id
呐伞,running
用于標(biāo)記該reactor
是否正在運(yùn)行,一般是創(chuàng)建時(shí)會(huì)被置為 1慎式,start
標(biāo)記著reactor
是否已經(jīng)被啟動(dòng)伶氢,一般是進(jìn)行wait
監(jiān)控時(shí)被置為 1,once
標(biāo)志著reactor
是否是僅需要一次性監(jiān)控瘪吏,check_timer
標(biāo)志著是否要檢查定時(shí)任務(wù) -
singal_no
:每次reactor
由于fd
的就緒返回時(shí)癣防,reactor
都會(huì)檢查這個(gè)singal_no
,如果這個(gè)值不為空掌眠,那么就會(huì)調(diào)用相應(yīng)的信號(hào)回調(diào)函數(shù) -
disable_accept
標(biāo)志著是否接受新的連接蕾盯,這個(gè)只有主reactor
中才會(huì)設(shè)置為 0,其他reactor
線程不需要接受新的連接蓝丙,只需要接受數(shù)據(jù)即可 -
check_signalfd
標(biāo)志著是否需要檢查signalfd
-
thread
用于標(biāo)記當(dāng)前是使用reactor
多線程模式還是多進(jìn)程模式级遭,一般都會(huì)使用多線程模式 -
timeout_msec
用于記錄每次reactor->wait
的超時(shí) -
max_socket
記錄著reactor
中最大的連接數(shù),與max_connection
的值一致;socket_list
是reactor
多線程模式的監(jiān)聽的socket
渺尘,與connection_list
保持一致挫鸽;socket_array
是reactor
多進(jìn)程模式中的監(jiān)聽的fd
-
handle
是默認(rèn)就緒的回調(diào)函數(shù),write_handle
是寫就緒的回調(diào)函數(shù),error_handle
包含錯(cuò)誤就緒的回調(diào)函數(shù) -
timewheel
鸥跟、heartbeat_interval
丢郊、last_heartbeat_time
是心跳檢測,專門剔除空閑連接 -
last_malloc_trim_time
記錄了上次返還給系統(tǒng)的時(shí)間锌雀,swoole
會(huì)定期的通過malloc_trim
函數(shù)返回空閑的內(nèi)存空間
struct _swReactor
{
void *object;
void *ptr; //reserve
/**
* last signal number
*/
int singal_no;
uint32_t event_num;
uint32_t max_event_num;
uint32_t check_timer :1;
uint32_t running :1;
uint32_t start :1;
uint32_t once :1;
/**
* disable accept new connection
*/
uint32_t disable_accept :1;
uint32_t check_signalfd :1;
/**
* multi-thread reactor, cannot realloc sockets.
*/
uint32_t thread :1;
/**
* reactor->wait timeout (millisecond) or -1
*/
int32_t timeout_msec;
uint16_t id; //Reactor ID
uint16_t flag; //flag
uint32_t max_socket;
#ifdef SW_USE_MALLOC_TRIM
time_t last_malloc_trim_time;
#endif
#ifdef SW_USE_TIMEWHEEL
swTimeWheel *timewheel;
uint16_t heartbeat_interval;
time_t last_heartbeat_time;
#endif
/**
* for thread
*/
swConnection *socket_list;
/**
* for process
*/
swArray *socket_array;
swReactor_handle handle[SW_MAX_FDTYPE]; //默認(rèn)事件
swReactor_handle write_handle[SW_MAX_FDTYPE]; //擴(kuò)展事件1(一般為寫事件)
swReactor_handle error_handle[SW_MAX_FDTYPE]; //擴(kuò)展事件2(一般為錯(cuò)誤事件,如socket關(guān)閉)
int (*add)(swReactor *, int fd, int fdtype);
int (*set)(swReactor *, int fd, int fdtype);
int (*del)(swReactor *, int fd);
int (*wait)(swReactor *, struct timeval *);
void (*free)(swReactor *);
int (*setHandle)(swReactor *, int fdtype, swReactor_handle);
swDefer_callback *defer_callback_list;
swDefer_callback idle_task;
swDefer_callback future_task;
void (*onTimeout)(swReactor *);
void (*onFinish)(swReactor *);
void (*onBegin)(swReactor *);
void (*enable_accept)(swReactor *);
int (*can_exit)(swReactor *);
int (*write)(swReactor *, int, void *, int);
int (*close)(swReactor *, int);
int (*defer)(swReactor *, swCallback, void *);
};
reactor
的創(chuàng)建
-
reactor
的創(chuàng)建主要是調(diào)用swReactorEpoll_create
函數(shù) -
setHandle
函數(shù)是為監(jiān)聽的fd
設(shè)置回調(diào)函數(shù)蚂夕,包括讀就緒、寫就緒腋逆、錯(cuò)誤 -
onFinish
是每次調(diào)用epoll
函數(shù)返回后婿牍,處理具體邏輯后,最后調(diào)用的回調(diào)函數(shù) -
onTimeout
是每次調(diào)用epoll
函數(shù)超時(shí)后的回調(diào)函數(shù) -
write
函數(shù)是利用reactor
向socket
發(fā)送數(shù)據(jù)的接口 -
defer
函數(shù)用于添加defer_callback_list
成員變量惩歉,這個(gè)成員變量是回調(diào)函數(shù)列表等脂,epoll
函數(shù)超時(shí)和onFinish
都會(huì)循環(huán)defer_callback_list
里面的回調(diào)函數(shù) -
socket_array
是監(jiān)聽的fd
列表
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);
reactor->running = 1;
reactor->setHandle = swReactor_setHandle;
reactor->onFinish = swReactor_onFinish;
reactor->onTimeout = swReactor_onTimeout;
reactor->write = swReactor_write;
reactor->defer = swReactor_defer;
reactor->close = swReactor_close;
reactor->socket_array = swArray_new(1024, sizeof(swConnection));
if (!reactor->socket_array)
{
swWarn("create socket array failed.");
return SW_ERR;
}
return ret;
}
reactor
的函數(shù)
reactor
設(shè)置文件就緒回調(diào)函數(shù) swReactor_setHandle
-
reactor
中設(shè)置的fd
由兩部分構(gòu)成,一種是swFd_type
撑蚌,標(biāo)識(shí)著文件描述符的類型上遥,一種是swEvent_type
標(biāo)識(shí)著文件描述符感興趣的讀寫事件
enum swFd_type
{
SW_FD_TCP = 0, //tcp socket
SW_FD_LISTEN = 1, //server socket
SW_FD_CLOSE = 2, //socket closed
SW_FD_ERROR = 3, //socket error
SW_FD_UDP = 4, //udp socket
SW_FD_PIPE = 5, //pipe
SW_FD_STREAM = 6, //stream socket
SW_FD_WRITE = 7, //fd can write
SW_FD_TIMER = 8, //timer fd
SW_FD_AIO = 9, //linux native aio
SW_FD_SIGNAL = 11, //signalfd
SW_FD_DNS_RESOLVER = 12, //dns resolver
SW_FD_INOTIFY = 13, //server socket
SW_FD_USER = 15, //SW_FD_USER or SW_FD_USER+n: for custom event
SW_FD_STREAM_CLIENT = 16, //swClient stream
SW_FD_DGRAM_CLIENT = 17, //swClient dgram
};
enum swEvent_type
{
SW_EVENT_DEAULT = 256,
SW_EVENT_READ = 1u << 9,
SW_EVENT_WRITE = 1u << 10,
SW_EVENT_ERROR = 1u << 11,
SW_EVENT_ONCE = 1u << 12,
};
-
swReactor_fdtype
用于從文件描述符中提取swFd_type
,也就是文件描述符的類型:
static sw_inline int swReactor_fdtype(int fdtype)
{
return fdtype & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR);
}
-
swReactor_event_read
争涌、swReactor_event_write
粉楚、swReactor_event_error
這三個(gè)函數(shù)與swFd_type
正相反,是從文件描述符中提取讀寫事件
static sw_inline int swReactor_event_read(int fdtype)
{
return (fdtype < SW_EVENT_DEAULT) || (fdtype & SW_EVENT_READ);
}
static sw_inline int swReactor_event_write(int fdtype)
{
return fdtype & SW_EVENT_WRITE;
}
static sw_inline int swReactor_event_error(int fdtype)
{
return fdtype & SW_EVENT_ERROR;
}
-
swReactor_setHandle
用于為文件描述符_fdtype
設(shè)定讀就緒、寫就緒的回調(diào)函數(shù)
int swReactor_setHandle(swReactor *reactor, int _fdtype, swReactor_handle handle)
{
int fdtype = swReactor_fdtype(_fdtype);
if (fdtype >= SW_MAX_FDTYPE)
{
swWarn("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
return SW_ERR;
}
if (swReactor_event_read(_fdtype))
{
reactor->handle[fdtype] = handle;
}
else if (swReactor_event_write(_fdtype))
{
reactor->write_handle[fdtype] = handle;
}
else if (swReactor_event_error(_fdtype))
{
reactor->error_handle[fdtype] = handle;
}
else
{
swWarn("unknow fdtype");
return SW_ERR;
}
return SW_OK;
}
reactor
添加 defer
函數(shù)
-
defer
函數(shù)會(huì)在每次事件循環(huán)結(jié)束或超時(shí)的時(shí)候調(diào)用 -
swReactor_defer
函數(shù)會(huì)為defer_callback_list
添加新的回調(diào)函數(shù)
static int swReactor_defer(swReactor *reactor, swCallback callback, void *data)
{
swDefer_callback *cb = sw_malloc(sizeof(swDefer_callback));
if (!cb)
{
swWarn("malloc(%ld) failed.", sizeof(swDefer_callback));
return SW_ERR;
}
cb->callback = callback;
cb->data = data;
LL_APPEND(reactor->defer_callback_list, cb);
return SW_OK;
}
reactor
超時(shí)回調(diào)函數(shù)
epoll
在設(shè)置的時(shí)間內(nèi)沒有返回的話模软,也會(huì)自動(dòng)返回伟骨,這個(gè)時(shí)候就會(huì)調(diào)用超時(shí)回調(diào)函數(shù):
static void swReactor_onTimeout(swReactor *reactor)
{
swReactor_onTimeout_and_Finish(reactor);
if (reactor->disable_accept)
{
reactor->enable_accept(reactor);
reactor->disable_accept = 0;
}
}
-
swReactor_onTimeout_and_Finish
函數(shù)用于在超時(shí)、finish
等情況下調(diào)用 - 這個(gè)函數(shù)首先會(huì)檢查是否存在定時(shí)任務(wù)燃异,如果有定時(shí)任務(wù)就會(huì)調(diào)用
swTimer_select
執(zhí)行回調(diào)函數(shù) - 接下來就要執(zhí)行存儲(chǔ)在
defer_callback_list
的多個(gè)回調(diào)函數(shù)携狭, 該list
是事先定義好的需要defer
執(zhí)行的函數(shù) -
idle_task
是EventLoop
中使用的每一輪事件循環(huán)結(jié)束時(shí)調(diào)用的函數(shù)。 - 如果當(dāng)前
reactor
當(dāng)前在work
進(jìn)程回俐,那么就要調(diào)用swWorker_try_to_exit
函數(shù)來判斷event_num
是不是為 0逛腿,如果為 0 ,那么就置running
為0仅颇,停止等待事件就緒 - 如果當(dāng)前
SwooleG.serv
為空单默,swReactor_empty
函數(shù)用于判斷當(dāng)前reactor
是否還有事件在監(jiān)聽,如果沒有灵莲,那么就會(huì)設(shè)置running
為 0 - 判斷當(dāng)前時(shí)間是否可以調(diào)用
malloc_trim
釋放空閑的內(nèi)存雕凹,如果距離上次釋放內(nèi)存的時(shí)間超過了SW_MALLOC_TRIM_INTERVAL
,就更新last_malloc_trim_time
并調(diào)用malloc_trim
static void swReactor_onTimeout_and_Finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(&SwooleG.timer);
}
//defer callback
swDefer_callback *cb, *tmp;
swDefer_callback *defer_callback_list = reactor->defer_callback_list;
reactor->defer_callback_list = NULL;
LL_FOREACH(defer_callback_list, cb)
{
cb->callback(cb->data);
}
LL_FOREACH_SAFE(defer_callback_list, cb, tmp)
{
sw_free(cb);
}
//callback at the end
if (reactor->idle_task.callback)
{
reactor->idle_task.callback(reactor->idle_task.data);
}
#ifdef SW_COROUTINE
//coro timeout
if (!swIsMaster())
{
coro_handle_timeout();
}
#endif
//server worker
swWorker *worker = SwooleWG.worker;
if (worker != NULL)
{
if (SwooleWG.wait_exit == 1)
{
swWorker_try_to_exit();
}
}
//not server, the event loop is empty
if (SwooleG.serv == NULL && swReactor_empty(reactor))
{
reactor->running = 0;
}
#ifdef SW_USE_MALLOC_TRIM
if (SwooleG.serv && reactor->last_malloc_trim_time < SwooleG.serv->gs->now - SW_MALLOC_TRIM_INTERVAL)
{
malloc_trim(SW_MALLOC_TRIM_PAD);
reactor->last_malloc_trim_time = SwooleG.serv->gs->now;
}
#endif
}
-
swReactor_empty
用來判斷當(dāng)前的reactor
是否還有事件需要監(jiān)聽 - 可以從函數(shù)中可以看出來政冻,如果定時(shí)任務(wù)
timer
里面還有等待的任務(wù)枚抵,那么就可以返回 false -
event_num
如果為 0,可以返回 true明场,結(jié)束事件循環(huán) - 對于協(xié)程來說汽摹,還要調(diào)用
can_exit
來判斷是否可以退出事件循環(huán)
int swReactor_empty(swReactor *reactor)
{
//timer
if (SwooleG.timer.num > 0)
{
return SW_FALSE;
}
int empty = SW_FALSE;
//thread pool
if (SwooleAIO.init && reactor->event_num == 1 && SwooleAIO.task_num == 0)
{
empty = SW_TRUE;
}
//no event
else if (reactor->event_num == 0)
{
empty = SW_TRUE;
}
//coroutine
if (empty && reactor->can_exit && reactor->can_exit(reactor))
{
empty = SW_TRUE;
}
return empty;
}
reactor
事件循環(huán)結(jié)束函數(shù)
- 每次事件循環(huán)結(jié)束之后,都會(huì)調(diào)用
onFinish
函數(shù) - 該函數(shù)主要函數(shù)調(diào)用
swReactor_onTimeout_and_Finish
苦锨,在此之前還會(huì)檢查在事件循環(huán)過程中是否有信號(hào)觸發(fā)
static void swReactor_onFinish(swReactor *reactor)
{
//check signal
if (reactor->singal_no)
{
swSignal_callback(reactor->singal_no);
reactor->singal_no = 0;
}
swReactor_onTimeout_and_Finish(reactor);
}
reactor
事件循環(huán)關(guān)閉函數(shù)
- 當(dāng)一個(gè)
socket
關(guān)閉的時(shí)候逼泣,會(huì)調(diào)用close
函數(shù),對應(yīng)的回調(diào)函數(shù)就是swReactor_close
- 該函數(shù)用于釋放
swConnection
內(nèi)部申請的內(nèi)存舟舒,并調(diào)用close
函數(shù)關(guān)閉連接
int swReactor_close(swReactor *reactor, int fd)
{
swConnection *socket = swReactor_get(reactor, fd);
if (socket->out_buffer)
{
swBuffer_free(socket->out_buffer);
}
if (socket->in_buffer)
{
swBuffer_free(socket->in_buffer);
}
if (socket->websocket_buffer)
{
swString_free(socket->websocket_buffer);
}
bzero(socket, sizeof(swConnection));
socket->removed = 1;
swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd);
return close(fd);
}
-
swReactor_get
用于從reactor
中根據(jù)文件描述符獲取對應(yīng)swConnection
對象的場景拉庶,由于swoole
一般都會(huì)采用reactor
多線程模式,因此基本只會(huì)執(zhí)行return &reactor->socket_list[fd];
這一句秃励。 -
socket_list
這個(gè)列表與connection_list
保持一致氏仗,是事先申請的大小為max_connection
的類型是swConnection
的數(shù)組 -
socket_list
中的數(shù)據(jù)有一部分是已經(jīng)建立連接的swConnection
的對象,有一部分僅僅是空的swConnection
夺鲜,這個(gè)時(shí)候swConnection->fd
為 0
static sw_inline swConnection* swReactor_get(swReactor *reactor, int fd)
{
if (reactor->thread)
{
return &reactor->socket_list[fd];
}
swConnection *socket = (swConnection*) swArray_alloc(reactor->socket_array, fd);
if (socket == NULL)
{
return NULL;
}
if (!socket->active)
{
socket->fd = fd;
}
return socket;
}
reactor
的數(shù)據(jù)寫入
- 如果想對一個(gè)
socket
寫入數(shù)據(jù)皆尔,并不能簡單的直接調(diào)用send
函數(shù),因?yàn)檫@個(gè)函數(shù)可能被信號(hào)打斷(EINTR)币励、可能暫時(shí)不可用(EAGAIN)慷蠕、可能只寫入了部分?jǐn)?shù)據(jù),也有可能寫入成功食呻。因此流炕,reactor
定義了一個(gè)函數(shù)專門處理寫數(shù)據(jù)這一邏輯 - 首先要利用
swReactor_get
取出對應(yīng)的swConnection
對象 - 如果取出的對象
fd
是 0澎现,說明這個(gè)fd
文件描述符事先并沒有在reactor
里面進(jìn)行監(jiān)聽 - 如果這個(gè)
socket
的out_buffer
為空,那么就先嘗試?yán)?swConnection_send
函數(shù)調(diào)用send
函數(shù)每辟,觀察是否可以直接把所有數(shù)據(jù)發(fā)送成功- 如果返回
EINTR
昔头,那么說明被信號(hào)打斷了,重新發(fā)送即可 - 如果返回
EAGAIN
影兽,那么說明此時(shí)socket
暫時(shí)不可用,此時(shí)需要將fd
文件描述符的寫就緒狀態(tài)添加到reactor
中莱革,然后將數(shù)據(jù)拷貝到out_buffer
中去 - 如果返回寫入的數(shù)據(jù)量小于
n
峻堰,說明只寫入了部分,此時(shí)需要把沒有寫入的部分拷貝到out_buffer
中去
- 如果返回
- 如果
out_buffer
不為空盅视,那么說明此時(shí)socket
不可寫捐名,那么就要將數(shù)據(jù)拷貝到out_buffer
中去,等著reactor
監(jiān)控到寫就緒之后闹击,把out_buffer
發(fā)送出去镶蹋。 - 如果此時(shí)
out_buffer
存儲(chǔ)空間不足,那么就要swYield
讓進(jìn)程休眠一段時(shí)間赏半,等待fd
的寫就緒狀態(tài)
int swReactor_write(swReactor *reactor, int fd, void *buf, int n)
{
int ret;
swConnection *socket = swReactor_get(reactor, fd);
swBuffer *buffer = socket->out_buffer;
if (socket->fd == 0)
{
socket->fd = fd;
}
if (socket->buffer_size == 0)
{
socket->buffer_size = SwooleG.socket_buffer_size;
}
if (socket->nonblock == 0)
{
swoole_fcntl_set_option(fd, 1, -1);
socket->nonblock = 1;
}
if (n > socket->buffer_size)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "data is too large, cannot exceed buffer size.");
return SW_ERR;
}
if (swBuffer_empty(buffer))
{
if (socket->ssl_send)
{
goto do_buffer;
}
do_send:
ret = swConnection_send(socket, buf, n, 0);
if (ret > 0)
{
if (n == ret)
{
return ret;
}
else
{
buf += ret;
n -= ret;
goto do_buffer;
}
}
#ifdef HAVE_KQUEUE
else if (errno == EAGAIN || errno == ENOBUFS)
#else
else if (errno == EAGAIN)
#endif
{
do_buffer:
if (!socket->out_buffer)
{
buffer = swBuffer_new(sizeof(swEventData));
if (!buffer)
{
swWarn("create worker buffer failed.");
return SW_ERR;
}
socket->out_buffer = buffer;
}
socket->events |= SW_EVENT_WRITE;
if (socket->events & SW_EVENT_READ)
{
if (reactor->set(reactor, fd, socket->fdtype | socket->events) < 0)
{
swSysError("reactor->set(%d, SW_EVENT_WRITE) failed.", fd);
}
}
else
{
if (reactor->add(reactor, fd, socket->fdtype | SW_EVENT_WRITE) < 0)
{
swSysError("reactor->add(%d, SW_EVENT_WRITE) failed.", fd);
}
}
goto append_buffer;
}
else if (errno == EINTR)
{
goto do_send;
}
else
{
SwooleG.error = errno;
return SW_ERR;
}
}
else
{
append_buffer: if (buffer->length > socket->buffer_size)
{
if (socket->dontwait)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
return SW_ERR;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow.", fd);
swYield();
swSocket_wait(fd, SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
}
}
if (swBuffer_append(buffer, buf, n) < 0)
{
return SW_ERR;
}
}
return SW_OK;
}