關(guān)鍵數(shù)據(jù)結(jié)構(gòu)
- CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};
可以將這個(gè)結(jié)構(gòu)體看著是主線程accept觸發(fā)時(shí)即有客戶端連入時(shí),主線程寫入工作線程有關(guān)socket連接相關(guān)句柄數(shù)據(jù)結(jié)構(gòu),綁定了socket描述符、狀態(tài)、發(fā)生的事件、讀buffer大小等树肃,不難對(duì)號(hào)入座。
- CQ
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
};
這是socket連接通知隊(duì)列瀑罗。
- LIBEVENT_THREAD
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
可以將這個(gè)結(jié)構(gòu)體看著是線程句柄數(shù)據(jù)結(jié)構(gòu)胸嘴,綁定了線程ID、Libevent實(shí)例斩祭、用于通知管道的event劣像、通知接收的socket描述符,通知發(fā)送的socket描述符摧玫、socket連接通知隊(duì)列耳奕,這也很好對(duì)號(hào)入座吧。
- conn
typedef struct conn conn;
struct conn {
int sfd;
...
struct event event;
short ev_flags;
short which; /** which events were just triggered */
...
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
這個(gè)結(jié)構(gòu)非常龐大诬像,提取關(guān)鍵的幾個(gè)字段來分析一下屋群, 可以將這個(gè)結(jié)構(gòu)體看著是socket連接句柄數(shù)據(jù)結(jié)構(gòu),綁定了socket描述符颅停、觸發(fā)的事件谓晌、處理連接的線程指針等,這個(gè)也很好對(duì)號(hào)入座吧癞揉。
整體流程
- 在main函數(shù)中調(diào)用main_base = event_init()來初始化主線程Libevent實(shí)例。
- 在main函數(shù)中調(diào)用thread_init來初始化工作線程溺欧,并將主線程Libevent實(shí)例作為參數(shù)傳入喊熟。
- 在thread_init函數(shù)中為指定數(shù)量的工作線程分配內(nèi)存,為每個(gè)線程創(chuàng)建管道姐刁,并分別綁定到通知收和發(fā)的socket描述符上芥牌,調(diào)用函數(shù)setup_thread初始化線程信息,調(diào)用函數(shù)create_worker為每個(gè)線程注冊(cè)回調(diào)函數(shù)聂使。關(guān)鍵代碼:
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
...
}
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);
...
}
// Create threads after we've done all the libevent setup.
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
- 在setup_thread函數(shù)中壁拉,為工作線程初始化Libevent實(shí)例谬俄,為主線程通知讀(notify_receive_fd)注冊(cè)回調(diào)函數(shù)thread_libevent_process,初始化cq隊(duì)列弃理,關(guān)鍵代碼如下:
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();
...
/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
...
}
me->new_conn_queue = malloc(sizeof(struct conn_queue));
...
cq_init(me->new_conn_queue);
...
}
- 在thread_libevent_process函數(shù)中溃论,讀取主線程發(fā)送的通知接收消息,將主線程accept來的fd注冊(cè)到工作線程的Libevent實(shí)例中痘昌,主線程accept來的fd從conn_queue隊(duì)列獲取钥勋,關(guān)鍵代碼如下:
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
if (read(fd, buf, 1) != 1)
...
switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
...
}
}
- 在函數(shù)conn_new中,創(chuàng)建conn句柄辆苔,為句柄注冊(cè)回調(diào)函數(shù)event_handler處理事件算灸,將該句柄作為參數(shù)傳入回調(diào)函數(shù)并設(shè)置到Libevent中,該函數(shù)的關(guān)鍵代碼如下:
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size,
enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();
if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
...
}
...
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
...
}
...
}
}
- 在create_worker函數(shù)中驻啤,創(chuàng)建工作線程并注冊(cè)回調(diào)函數(shù)菲驴,在工作線程的回調(diào)函數(shù)work_libevent中,開始Libevent主循環(huán)骑冗。
- 在main函數(shù)中赊瞬,調(diào)用函數(shù)server_sockets,再調(diào)用函數(shù)server_socket沐旨,進(jìn)而調(diào)用函數(shù)new_socket森逮,在調(diào)用函數(shù)conn_new,創(chuàng)建并注冊(cè)listen fd到主線程Libevent實(shí)例上磁携,最后開始Libevent主循環(huán)即event_base_loop褒侧。在conn_new函數(shù)關(guān)鍵代碼見步驟(6)
- 在event_handler函數(shù)中,調(diào)用函數(shù)drive_machine谊迄,在該函數(shù)中處理所有事件闷供,其關(guān)鍵代碼如下:
static void drive_machine(conn *c) {
...
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
...
}
...
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
...
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
...
}
}
return;
}
在處理事件時(shí),如果是listening事件统诺,則調(diào)用函數(shù)dispatch_conn_new將accept fd分配給工作線程歪脏。
- 在dispatch_conn_new函數(shù)中,根據(jù)round-robin算法將新連接push到所分配線程的CQ隊(duì)列中粮呢,并通過管道發(fā)送通知消息“c”婿失,關(guān)鍵代碼如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
...
cq_push(thread->new_conn_queue, item);
...
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
dispatch_conn_new函數(shù)只在主線程中調(diào)用,last_thread為靜態(tài)變量啄寡,每次將該變量值+1豪硅,再模線程數(shù)來選擇工作線程。
線程模型
Libevent本身是單線程的挺物,Memcached采用消息通知+同步層機(jī)制使得其支持多線程懒浮,整體模型見如下神圖:
每個(gè)線程包括主線程都各自有獨(dú)立的Libevent實(shí)例,Memcached的listen fd注冊(cè)到主線程的Libevent實(shí)例上识藤,由主線程來accept新的連接砚著,接受新的連接后根據(jù)Round-robin算法選擇工作線程次伶,將新連接的socket fd封裝為CQ_ITEM后push到所選工作線程的CQ隊(duì)列中,然后主線程(notify_send_fd)通過管道發(fā)送字符“c”到工作線程(notify_receive_fd)稽穆,而notify_receive_fd已經(jīng)注冊(cè)到工作線程的Libevent實(shí)例上了冠王,這樣工作線程就能收到通知“c”,然后從該工作線程的CQ隊(duì)列中pop出CQ_ITEM進(jìn)而取出新連接并將fd注冊(cè)到工作線程的Libevent實(shí)例上秧骑,從而由工作線程來處理該連接的所有后續(xù)事件版确。 需要注意的數(shù)據(jù):Memcached默認(rèn)開啟線程數(shù)為4,也可以通過參數(shù)-t來指定開啟線程數(shù)乎折,當(dāng)線程數(shù)大于64時(shí)會(huì)給出錯(cuò)誤提示绒疗,建議線程數(shù)為小于或等于CPU核數(shù)。