導(dǎo)航
memcached源碼分析
memcached源碼分析-網(wǎng)絡(luò)模塊
memcached源碼分析-指令解析模塊
memcached源碼分析-哈希表(hashtable)模塊
memcached源碼分析-slab存儲機制
1.前言
前面一章節(jié)我們介紹了libevent網(wǎng)絡(luò)事件模塊刀疙,當(dāng)連接有數(shù)據(jù)請求過來的時候胆筒,就會觸發(fā)work線程的讀寫事件回調(diào)函數(shù):event_handler研叫,這個方法調(diào)用drive_machine函數(shù)胶滋,根據(jù)狀態(tài)解析事件量瓜。我們都知道Memcached是一個高性能的開源分布式內(nèi)存對象緩存系統(tǒng)跪呈,但memcached的服務(wù)器客戶端通信并不使用復(fù)雜的XML等格式妇汗,而使用簡單的基于文本行的協(xié)議孩革,我們使用簡單文本指令就可以實現(xiàn)對memcached的操作际歼。
接下來主要選取set指令和get指令進(jìn)行分析惶翻,從而了解memcached是如何解析客戶的指令請求的。
指令簡要說明:
set
指令語法:
set key flags exptime bytes [noreply]
value
參數(shù)說明:
-
key
:鍵值key-value
結(jié)構(gòu)中的key
鹅心,用于查找緩存值吕粗。 -
flags
:可以包括鍵值對的整型參數(shù),客戶機使用它存儲關(guān)于鍵值對的額外信息 旭愧。 -
exptime
:在緩存中保存鍵值對的時間長度(以秒為單位颅筋,0
表示永遠(yuǎn)) -
bytes
:在緩存中存儲的字節(jié)數(shù) -
noreply
(可選): 該參數(shù)告知服務(wù)器不需要返回數(shù)據(jù) -
value
:存儲的值(始終位于第二行)(可直接理解為key-value
結(jié)構(gòu)中的value
)
get
指令語法:
get key
參數(shù)說明如下:
key
:鍵值 key-value
結(jié)構(gòu)中的 key
宙暇,用于查找緩存值。
2.指令流程圖
當(dāng)客戶端連接發(fā)送指令給memcached议泵,就會觸發(fā)work線程的讀寫事件回調(diào)函數(shù):event_handler占贫,這個方法調(diào)用drive_machine狀態(tài)機函數(shù),下面就是set
和get
指令的狀態(tài)機執(zhí)行流程圖
3.源碼分析
針對上述流程圖先口,做進(jìn)一步的源碼分析
在分析源碼之前型奥,這里主要介紹一下相關(guān)模塊的重要數(shù)據(jù)結(jié)構(gòu)
struct conn {
int sfd;
#ifdef TLS
SSL *ssl;
char *ssl_wbuf;
bool ssl_enabled;
#endif
sasl_conn_t *sasl_conn;
bool sasl_started;
bool authenticated;
enum conn_states state;
enum bin_substates substate;
rel_time_t last_cmd_time;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
//rbuf 用于存儲讀取命令的內(nèi)存
char *rbuf; /** buffer to read commands into */
//如果我們已經(jīng)解析了部分?jǐn)?shù)據(jù),rcurr游標(biāo)執(zhí)行已經(jīng)解析的位置
char *rcurr; /** but if we parsed some already, this is where we stopped */
//為rbuf分配空間大小
int rsize; /** total allocated size of rbuf */
//未解析的數(shù)據(jù)字節(jié)數(shù) rbytes = rszie - (rcur - rbuf)
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
//指向item結(jié)構(gòu)碉京,在set命令中ritem指向了item結(jié)構(gòu)中內(nèi)存保存關(guān)鍵字key之后的位置厢汹,客戶端讀取
//value值之后,將value值保存于ritem所指向的內(nèi)存收夸。
//例如客戶端第一步執(zhí)行set testkey 0 0 5,服務(wù)端接收到該指令坑匠,然后申請item結(jié)構(gòu)保存指令內(nèi)容,
//ritem則執(zhí)向item結(jié)構(gòu)中保存testkey之后的那段內(nèi)存空間卧惜,rlbytes為5厘灼,表示要讀取5個字節(jié)的value值,
//第二步客戶端輸入value值之后咽瓷,服務(wù)端觸發(fā)事件设凹,服務(wù)端根據(jù)rlbytes大小從客戶端連接中讀取相應(yīng)大小的數(shù)據(jù)
//保存于ritem執(zhí)向的內(nèi)存空間
char *ritem; /** when we read in an item's value, it goes here */
//需要讀取內(nèi)容的大小
int rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
//為set命令申請的相關(guān)內(nèi)存結(jié)構(gòu)
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
#ifdef EXTSTORE
int io_wrapleft;
unsigned int recache_counter;
io_wrap *io_wraplist; /* linked list of io_wraps */
bool io_queued; /* FIXME: debugging flag */
#endif
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
bool noreply; /* True if the reply should not be sent. */
/* current stats command */
struct {
char *buffer;
size_t size;
size_t offset;
} stats;
/* Binary protocol stuff */
/* This is where the binary header goes */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* current command being processed */
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
ssize_t (*read)(conn *c, void *buf, size_t count);
ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
ssize_t (*write)(conn *c, void *buf, size_t count);
};
drive_machine
函數(shù)就是整個事件執(zhí)行流程的核心函數(shù),內(nèi)部while循環(huán)切換不同的狀態(tài)茅姜,完成不同狀態(tài)下的業(yè)務(wù)邏輯處理
static void drive_machine(conn *c) {
//...
while (!stop) {
switch(c->state) {
//...
case conn_waiting:
//...
//設(shè)置為可讀狀態(tài)闪朱,等待可讀事件的觸發(fā)
conn_set_state(c, conn_read);
stop = true;
break;
case conn_read:
//...
//從conn->sfd中讀取指令數(shù)據(jù)
//例如:set testkey 0 0 5 \r\n
res = try_read_network(c);
switch (res) {
//....
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
//...
}
break;
case conn_parse_cmd:
//命令的解析
if (try_read_command(c) == 0) {
/* wee need more data! */
//數(shù)據(jù)不完整,需要等待完整讀取數(shù)據(jù)指令
conn_set_state(c, conn_waiting);
}
break;
case conn_new_cmd:
--nreqs;
if (nreqs >= 0) {
//設(shè)置事件的狀態(tài)
reset_cmd_handler(c);
}else{
//....
}
break;
case conn_nread:
if (c->rlbytes == 0) {
//數(shù)據(jù)讀取完畢
complete_nread(c);
break;
}
//...
if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
//...
/* now try reading from the socket */
//從socket讀取數(shù)據(jù)內(nèi)容保存到c->ritem指向的內(nèi)存空間
//例如:set testkey 0 0 5,指令部分try_read_network已經(jīng)讀取
//這里read就是獲取key-val對應(yīng)的value內(nèi)容(該value內(nèi)容被指定為5個字節(jié))
res = c->read(c, c->ritem, c->rlbytes);
//...
}
break;
case conn_write:
//...
/* fall through... */
//注意這里钻洒,conn_write狀態(tài)執(zhí)行完畢奋姿,沒有break,直接執(zhí)行conn_mwrite狀態(tài)
case conn_mwrite:
//...
//給客戶端寫數(shù)據(jù)素标,例如客戶端的get指令獲取存儲的value
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {
conn_release_items(c);
/* XXX: I don't know why this wasn't the general case */
if(c->protocol == binary_prot) {
conn_set_state(c, c->write_and_go);
} else {
//設(shè)置連接狀態(tài)
//conn_new_cmd狀態(tài)中如果rbuf中還有數(shù)據(jù)未進(jìn)行處理称诗,那么繼續(xù)處理
//如果所有數(shù)據(jù)都處理完畢,那么會轉(zhuǎn)為conn_waiting狀態(tài)头遭,等待事件觸發(fā)
conn_set_state(c, conn_new_cmd);
}
}
//...
break;
//...
}
break;
}
}
}
conn_read
狀態(tài)下調(diào)用try_read_network
函數(shù)進(jìn)行連接數(shù)據(jù)的讀取
/*
* read from network as much as we can, handle buffer overflow and connection
* close.
* before reading, move the remaining incomplete fragment of a command
* (if any) to the beginning of the buffer.
*
* To protect us from someone flooding a connection with bogus data causing
* the connection to eat up all available memory, break out and start looking
* at the data I've got after a number of reallocs...
*
* @return enum try_read_result
*/
//讀取客戶端傳遞過來的命令數(shù)據(jù)
static enum try_read_result try_read_network(conn *c) {
//rbuf 用于存儲讀取命令的內(nèi)存
//rcur 如果我們已經(jīng)解析了部分?jǐn)?shù)據(jù)寓免,rcurr游標(biāo)執(zhí)行已經(jīng)解析的位置
//rsize 為rbuf分配空間大小
//rbytes 未解析的數(shù)據(jù)字節(jié)數(shù) rbytes = rszie - (rcur - rbuf)
//在讀取命令數(shù)據(jù)之前,首先判斷c->rcurr != c->rbuf
if (c->rcurr != c->rbuf) {
if (c->rbytes != 0)
//如果發(fā)現(xiàn)之前還有部分命令未解析完全计维,那么將未解析的數(shù)據(jù)拷貝到存儲命令空間rbuf的首位置
memmove(c->rbuf, c->rcurr, c->rbytes);
//將rcurr移動到rbuf的位置袜香,也就是存儲命令的首位置
c->rcurr = c->rbuf;
}
//盡可能多的嘗試讀取命令數(shù)據(jù)
while (1) {
//如果讀取的字節(jié)數(shù)大于等于rbuf的內(nèi)存空間,則重新分配內(nèi)存鲫惶,memcached做了次數(shù)限制
if (c->rbytes >= c->rsize) {
if (num_allocs == 4) {
return gotdata;
}
++num_allocs;
char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
//...
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}
//rbuf的剩余空間大小
int avail = c->rsize - c->rbytes;
//盡可能的讀取avail字節(jié)長度內(nèi)容
res = c->read(c, c->rbuf + c->rbytes, avail);
if (res > 0) {
//...
c->rbytes += res;
//如果實際讀取的字節(jié)數(shù)和我們嘗試讀取的字節(jié)數(shù)相等蜈首,
//那么極有可能還有數(shù)據(jù)可讀, continue繼續(xù)嘗試讀取socket數(shù)據(jù)
if (res == avail) {
continue;
}else{
break;
}
}
//...
}
return gotdata;
}
conn_parse_cmd
狀態(tài)下調(diào)用try_read_command
函數(shù)進(jìn)入命令解析的入口
/*
* if we have a complete line in the buffer, process it.
*/
//處理指令數(shù)據(jù)
static int try_read_command(conn *c) {
//...
if (c->protocol == binary_prot) {
//...
}else{
char *el, *cont;
//沒有可解析數(shù)據(jù)
if (c->rbytes == 0)
return 0;
//查找指令數(shù)據(jù)中的\n
//例如:set testkey 0 0 5 \r\n
//get testkey \r\n等等
el = memchr(c->rcurr, '\n', c->rbytes);
if (!el) {
//...
}
cont = el + 1;
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
//將\r替換為'\0',此時c->rcurr內(nèi)容為set testkey 0 0 5 \0\n
*el = '\0';
//...
//解析命令
process_command(c, c->rcurr);
//剩余未解析命令數(shù)據(jù)
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
}
return 1;
}
//命令行數(shù)據(jù)分解
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
size_t ntokens;
int comm;
//...
//指令分解保存到tokens數(shù)組
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
//get命令處理
process_get_command(c, tokens, ntokens, false, false);
}else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
//set指令的解析
process_update_command(c, tokens, ntokens, comm, false);
}//...
return;
}
//指令分解
//例如:set testkey 0 0 5分解為
//tokens[0].value = 'set' ; tokens[0].length = 3
//tokens[1].value = 'testkey' ; tokens[1].length = 7
//tokens[2].value = '0' ; tokens[2].length = 1
//tokens[3].value = '0' ; tokens[3].length = 1
//tokens[4].value = '5' ; tokens[4].length = 1
//tokens[5].value = '\0' ; tokens[5].length = 1
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
char *s, *e;
size_t ntokens = 0;
size_t len = strlen(command);
unsigned int i = 0;
assert(command != NULL && tokens != NULL && max_tokens > 1);
s = e = command;
for (i = 0; i < len; i++) {
if (*e == ' ') {
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
*e = '\0';
if (ntokens == max_tokens - 1) {
e++;
s = e; /* so we don't add an extra token */
break;
}
}
s = e + 1;
}
e++;
}
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
}
/*
* If we scanned the whole string, the terminal value pointer is null,
* otherwise it is the first unprocessed character.
*/
tokens[ntokens].value = *e == '\0' ? NULL : e;
tokens[ntokens].length = 0;
ntokens++;
return ntokens;
}
get
指令處理函數(shù)
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
bool return_cas, bool should_touch) {
//...
//key nkey
token_t *key_token = &tokens[KEY_TOKEN];
//...
//主要根據(jù)key值取內(nèi)存中查詢對應(yīng)value值
//然后格式化回復(fù)內(nèi)容
//...
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
if (fail_length) {
out_string(c, "CLIENT_ERROR bad command line format");
} else {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
}
conn_release_items(c);
}else {
//設(shè)置連接狀態(tài)
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
set
指令處理函數(shù)
static void process_update_command(conn *c, token_t *tokens, const size_t ntokens,
int comm, bool handle_cas){
//...
//key
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;
//將拆解的命令轉(zhuǎn)換為整形
if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
//...
//為操作指令分配相關(guān)空間資源,我會在后面的學(xué)習(xí)中進(jìn)行具體分析
//這里涉及slab相關(guān)知識,暫時不做分析
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//...
//ITEM_data 用于計算命令key所在內(nèi)存的偏移位置
//c->ritem指向命令key值之后的內(nèi)存疾就,在conn_nread中會將value值讀到c->ritem指向的內(nèi)存位置
//當(dāng)成功讀取value時,it中保存內(nèi)容key value
c->ritem = ITEM_data(it);
//需要讀取的value值長度
c->rlbytes = it->nbytes;
//指令類型
c->cmd = comm;
conn_set_state(c, conn_nread);
}
transmit將結(jié)果發(fā)送客戶端
static enum transmit_result transmit(conn *c) {
//...
if (c->msgcurr < c->msgused) {
//...
//將查詢的結(jié)果返回請求的客戶端
res = c->sendmsg(c, m, 0);
//...
}
}