libuv 源碼分析1: loop和poll
0 背景
libuv是一個開源異步I/O庫(Asynchronous I/O)薄嫡。主頁在這里libuv
應用案例:Nodejs . 比起libevent來說八酒,比較年輕盐数。
前提假設:本文假設你對unix上的套接字編程比較熟悉,熟悉阻塞/非阻塞套接字错洁,了解select, poll, epoll映穗。
先上一張libuv架構(gòu)圖:
在linux上荆烈,libuv是對epoll的封裝淌山;在windows上裸燎,是對完成端口的封裝;在macOS/FreeBSD上泼疑,是對kqueue的封裝德绿。
本文不討論libuv基本的用法,只是稍微分析一下源碼退渗。本文只分析loop, poll和tcp移稳。tcp下一章再講。
1. uv_run執(zhí)行了些什么
uv_run是uv消息隊列的入口函數(shù)会油,我們看一下里面執(zhí)行了些什么:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
我們可以看到主要執(zhí)行了以下幾個步驟:
-
uv__update_time
: 更新時間 -
uv__run_timers
: 調(diào)用timer秒裕。 -
uv__run_pending
: 調(diào)用回調(diào)。通常來說回調(diào)一般在poll fd結(jié)束后就立即執(zhí)行钞啸,但是總有例外:有一些I/O回調(diào)會延遲到下一次迭代中執(zhí)行。那些被延遲的回調(diào)正是在這里執(zhí)行 -
uv__run_idle
: 執(zhí)行idle。查看uv__run_idle
的實現(xiàn)体斩,發(fā)現(xiàn)僅僅是對idle進行遍歷梭稚、執(zhí)行回調(diào),并沒有刪除的操作絮吵,所以idle是每次while都執(zhí)行的弧烤。idle名不副實。(除了idle, prepare, check蹬敲,都是執(zhí)行完一個刪除一個暇昂。既然回調(diào)都執(zhí)行完了,保存也肯定沒有必要) -
uv__run_prepare
: 執(zhí)行prepare -
uv__io_poll(loop, timeout)
: 把新增的需要被監(jiān)聽的fd放到poll中伴嗡;poll我們所關心的fd急波,注意有一個timeout。 -
uv__run_check
: 執(zhí)行check -
uv__run_closing_handles
: 執(zhí)行close handle瘪校。
這和uv官網(wǎng)上貼的圖是一致的:
下面我們簡要來看一下幾個主要函數(shù):
1.0 uv_handle_t
uv_handle_t
: 一個handle, 是uv_tcp_t, uv_udp_t, uv_timer_t, uv_poll_t
等的公共父類(通過包含uv_handle_t
結(jié)構(gòu)體的所有成員來達到繼承澄暮,通過uv_handle_t.type
達到辨別是哪個子類)。
對與uv_handle_t
的任何子類的關閉阱扬,比如uv_tcp_t
泣懊,都需要調(diào)用uv_close(uv_handle_t*, uv_close_cb)
。其中uv_close_cb
一般用于釋放資源麻惶,比如我們的handle是通過malloc得到的馍刮,此時就要free。如果是在棧上分配的窃蹋,就不能free卡啰,可以進行其他清理工作。特別是timer脐彩,不僅僅要stop碎乃,還要close。
1.1 uv_run_timers
看一下實現(xiàn):
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
heap_node = heap_min((struct heap*) &loop->timer_heap);
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle);
}
}
從一個小堆中依次取出node惠奸,如果超時了梅誓,執(zhí)行cb,再把timer放回小堆里面佛南;如果沒超時梗掰,則break,后續(xù)的也不再檢查(最小的都沒超時嗅回,后面更大的也不可能超時)及穗。
1.2 uv_run_pending
簡單的從隊列從取出,再自行回調(diào):
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
1.3 uv__run_idle
uv__run_prepare
uv__run_check
同上绵载。不過idle從隊列取出后埂陆,會再放回去苛白。所以idle每次都要手動的stop。我就碰到過一次焚虱,cpu占用率100%购裙,檢查后發(fā)現(xiàn)是idle沒有暫停。
QUEUE_MOVE(&loop->name##_handles, &queue); \
while (!QUEUE_EMPTY(&queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
QUEUE_INSERT_TAIL(&loop->name##_handles, q); \
h->name##_cb(h); \
}
可以看到先把loop中的隊列move到queue上鹃栽,依次遍歷queue每個handle后躏率,再把遍歷的handle放到loop中的隊列。
uv__run_prepare
民鼓、uv__run_check
和uv__run_idle
都是一樣的薇芝。因為它們的init, start, stop,run定義都是通過同一個宏實現(xiàn)的(所以prepare和check也要手動停止):
UV_LOOP_WATCHER_DEFINE(prepare, PREPARE)
UV_LOOP_WATCHER_DEFINE(check, CHECK)
UV_LOOP_WATCHER_DEFINE(idle, IDLE)
#define UV_LOOP_WATCHER_DEFINE(name, type) \
int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
uv__handle_init(loop, (uv_handle_t*)handle, UV_##type); \
handle->name##_cb = NULL; \
return 0; \
} \
int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
... \
}
int uv_##name##_stop(uv_##name##_t* handle) { \
... \
}
void uv__run_##name(uv_loop_t* loop) { \
... \
} \
\
void uv__##name##_close(uv_##name##_t* handle) { \
... \
}
UV_LOOP_WATCHER_DEFINE
定義了init, start, top, run函數(shù)丰嘉,有興趣的可以看一下
1.4 uv_backend_timeout
和 uv__io_poll
這個是監(jiān)聽文件描述符的函數(shù)夯到,也就是調(diào)用epoll/kqueue/IOCP監(jiān)聽套接字的函數(shù)。
首先看一下它的timeout是怎么計算的:
int uv_backend_timeout(const uv_loop_t* loop) {
if (loop->stop_flag != 0)
return 0;
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;
heap_node = heap_min((const struct heap*) &loop->timer_heap);
if (heap_node == NULL)
return -1; /* block indefinitely */
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout <= loop->time)
return 0;
diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;
return diff;
}
- 沒有活躍的handle供嚎,返回0. epoll中黄娘,timeout為0的意思是立即返回。
- idle或pending_queue不為空克滴,返回0
- loop被關閉返會0
- timer隊列為空逼争,返回-1. epoll中,timeout為-1的意思是阻塞直到有fd有event產(chǎn)生
- timer隊列不為空劝赔,如果timer超時了誓焦,返回0。否則timeout為從現(xiàn)在到最早timer要超時的時間着帽。比如現(xiàn)在是19:00:00, timer最早超時未19:00:10杂伟,那么timeout為10s。
這也就解釋了仍翰,為什么不手動stop idle赫粥,loop會一直轉(zhuǎn)。就是因為這里io poll不等待予借,使得while一直空轉(zhuǎn)越平。
下面看看真正的poll:
void uv__io_poll(uv_loop_t* loop, int timeout) {
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
// ...
uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_ADD, w->fd, &e)
// ...
}
for (;;) {
// ...
nfds = uv__epoll_wait(loop->backend_fd, events, ARRAY_SIZE(events), timeout);
// ...
pe->events &= w->pevents | POLLERR | POLLHUP;
w->cb(loop, w, pe->events);
}
// ...
}
int uv__epoll_pwait(int epfd, struct uv__epoll_event* events, int nevents, int timeout, uint64_t sigmask) {
#if defined(__NR_epoll_pwait)
int result;
result = syscall(__NR_epoll_pwait, epfd, events, nevents, timeout, &sigmask, sizeof(sigmask));
#if MSAN_ACTIVE
if (result > 0)
__msan_unpoison(events, sizeof(events[0]) * result);
#endif
return result;
#else
return errno = ENOSYS, -1;
#endif
}
- 先把
watcher_queue
中剩余的fd移出來并放到poll隊列中; - poll等待灵迫,如果在超時之前有fd返回瀑粥,則調(diào)用對應fd的回調(diào), 然后繼續(xù)等待直到timeout超時避咆。
uv__io_poll
中包含了非常精巧的一點:libuv中的timer是由poll等待fd的時間來達到的:如果還沒達到timer超時度气,則繼續(xù)等待膨报;如果超時了适荣,就不在等待,進入到loop下一個步驟當中够吩。并不是調(diào)用了操作系統(tǒng)timer相關的API。
通過syscall調(diào)用epoll湾笛。我們調(diào)用系統(tǒng)函數(shù),不一定要通過具體名字临扮,可以通過syscall然后傳入要調(diào)用的系統(tǒng)函數(shù)對應的id。
1.5 uv__run_closing_handles
遍歷等待關閉的隊列杆勇,關閉stream(包括tcp路捧,pipe等)或者udp以及其他handle,調(diào)用handle對應的close_cb
總結(jié):uv在linux上是對epoll的封裝佳遣;idle零渐、prepare风宁、check要手動關閉,否則while會一直循環(huán)。
下面來整理一下loop的數(shù)據(jù)結(jié)構(gòu)(挑選了幾個比較重要的field,有刪減):
struct uv_loop_s {
int backend_fd; // epoll_create返回的fd歉铝。最終針對這個fd進行epoll操作
void* data; // 用戶可以用這個字段來保存數(shù)據(jù)
void* handle_queue[2]; // handle雙鏈表臼勉。一切uv_handle_t子類(包括但不限于tcp, udp餐弱,pipe)實例的地址宴霸、地址、地址膏蚓,都在這個雙鏈表里面瓢谢。為了簡便,后面會忽略存儲的是handle的地址驮瞧。
void* watcher_queue[2]; // 用戶在上一個迭代中新增加的需要監(jiān)聽的fd氓扛,這些fd還沒有添加到epoll中。這些fd,將在這次迭代中被添加到epoll中
uv__io_t** watchers; // 保存我們監(jiān)聽的fd相關的數(shù)據(jù)結(jié)構(gòu)(即uv__io_t采郎,包括fd本身千所,callback回調(diào),epoll關心的events等)蒜埋。是通過watchers[fd]來索引得到uv__io_t的淫痰。
unsigned int nwatchers; // watchers的大小,不是個數(shù)
unsigned int nfds; // watchers的個數(shù)
struct {
void* min;
unsigned int nelts;
} timer_heap; // 小堆整份。用來保存timer
uv_handle_t* closing_handles; // 待關閉隊列待错。執(zhí)行了uv_close后的handle被放入到這個隊列
};
2. uv_poll_t
要對一個fd poll的話,主要有以下步驟:
int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* handle);
-
uv_close
: 把handle放到loop->closing_handles隊列中皂林。
2.1 uv_poll_init
主要執(zhí)行了一下幾個步驟:
-
uv__io_check_fd
: 檢測fd是否有效朗鸠。先把fd放到epoll隊列里(UV__EPOLL_CTL_ADD),再移出來(UV__EPOLL_CTL_DEL)础倍,出錯了返回或者abort; -
uv__nonblock
: 設置套接字為非阻塞 -
uv__handle_init
: 初始化handle。把handle插入到loop->handle_queue的隊列尾端胎挎,設置handle類型為UV_POLL
沟启。 -
uv__io_init
: 初始化uv__io_t
2.2 uv_poll_start
主要執(zhí)行了以下幾個步驟:
-
uv__io_start
: 把fd對應的uv__io_t
添加到watcher_queue
隊尾。更新loop->watchers, loop->nfds等等犹菇。如果loop->watchers太小德迹,則先擴容再更新loop->watchers - 把我們傳入進來的回調(diào)賦值給handle->poll_cb。當有數(shù)據(jù)來了后揭芍,會調(diào)用我們這個回調(diào)胳搞。
2.2 uv_poll_stop
主要做了以下幾個步驟:
- 把
uv_poll_t
對應的uv__io_t
(因為是handle子類,所有有uv__io_t
)從loop->watcher_queue中移出称杨。
2.3 uv_close
- 把fd對應的handle放到
loop->closing_handles
中肌毅。uv會在下個循環(huán)close掉closing_handles
中的fd,以及會釋放相關資源姑原。