Redis內(nèi)存管理
Redis內(nèi)存管理相關(guān)文件為zmalloc.c/zmalloc.h笆环,其只是對(duì)C中內(nèi)存管理函數(shù)做了簡(jiǎn)單的封裝冀泻,屏蔽了底層平臺(tái)的差異住涉,并增加了內(nèi)存使用情況統(tǒng)計(jì)的功能战惊。
void *zmalloc(size_t size) {
// 多申請(qǐng)的一部分內(nèi)存用于存儲(chǔ)當(dāng)前分配了多少自己的內(nèi)存
void *ptr = malloc(size+PREFIX_SIZE);
if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
// 內(nèi)存分配統(tǒng)計(jì)
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}
Redis事件處理
Redis的事件類(lèi)型分為時(shí)間事件和文件事件缩挑,文件事件也就是網(wǎng)絡(luò)連接事件但两。時(shí)間事件的處理是在epoll_wait返回處理文件事件后處理的,每次epoll_wait的超時(shí)時(shí)間都是Redis最近的一個(gè)定時(shí)器時(shí)間供置。
Redis在進(jìn)行事件處理前谨湘,首先會(huì)進(jìn)行初始化,初始化的主要邏輯在main/initServer函數(shù)中芥丧。初始化流程主要做的工作如下:
- 設(shè)置信號(hào)回調(diào)函數(shù)紧阔。
- 創(chuàng)建事件循環(huán)機(jī)制,即調(diào)用epoll_create续担。
- 創(chuàng)建服務(wù)監(jiān)聽(tīng)端口擅耽,創(chuàng)建定時(shí)事件,并將這些事件添加到事件機(jī)制中物遇。
void initServer(void) {
int j;
// 設(shè)置信號(hào)對(duì)應(yīng)的處理函數(shù)
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
...
createSharedObjects();
adjustOpenFilesLimit();
// 創(chuàng)建事件循環(huán)機(jī)制乖仇,及調(diào)用epoll_create創(chuàng)建epollfd用于事件監(jiān)聽(tīng)
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// 創(chuàng)建監(jiān)聽(tīng)服務(wù)端口,socket/bind/listen
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
...
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
...
/* Create the serverCron() time event, that's our main way to process
* background operations. 創(chuàng)建定時(shí)事件 */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 將事件加入到事件機(jī)制中询兴,調(diào)用鏈為 aeCreateFileEvent/aeApiAddEvent/epoll_ctl
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
...
}
事件處理流程
事件處理函數(shù)鏈:aeMain / aeProcessEvents / aeApiPoll / epoll_wait乃沙。
常見(jiàn)的事件機(jī)制處理流程是:調(diào)用epoll_wait等待事件來(lái)臨,然后遍歷每一個(gè)epoll_event诗舰,提取epoll_event中的events和data域警儒,data域常用來(lái)存儲(chǔ)fd或者指針,不過(guò)一般的做法是提取出events和data.fd始衅,然后根據(jù)fd找到對(duì)應(yīng)的回調(diào)函數(shù)冷蚂,fd與對(duì)應(yīng)回調(diào)函數(shù)之間的映射關(guān)系可以存儲(chǔ)在特定的數(shù)據(jù)結(jié)構(gòu)中缭保,比如數(shù)組或者哈希表,然后調(diào)用事件回調(diào)函數(shù)來(lái)處理蝙茶。
Redis中用了一個(gè)數(shù)組來(lái)保存fd與回調(diào)函數(shù)的映射關(guān)系艺骂,使用數(shù)組的優(yōu)點(diǎn)就是簡(jiǎn)單高效,但是數(shù)組一般使用在建立的連接不太多情況隆夯,而Redis正好符合這個(gè)情況钳恕,一般Redis的文件事件大都是客戶(hù)端建立的連接,而客戶(hù)端的連接個(gè)數(shù)是一定的蹄衷,該數(shù)量通過(guò)配置項(xiàng)maxclients
來(lái)指定忧额。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 從eventLoop->events數(shù)組中查找對(duì)應(yīng)的回調(diào)函數(shù)
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
...
}
文件事件的監(jiān)聽(tīng)
Redis監(jiān)聽(tīng)端口的事件回調(diào)函數(shù)鏈?zhǔn)牵?code>acceptTcpHandler / acceptCommonHandler / createClient / aeCreateFileEvent / aeApiAddEvent / epoll_ctl。
在Reids監(jiān)聽(tīng)事件處理流程中愧口,會(huì)將客戶(hù)端的連接fd添加到事件機(jī)制中睦番,并設(shè)置其回調(diào)函數(shù)為readQueryFromClient
,該函數(shù)負(fù)責(zé)處理客戶(hù)端的命令請(qǐng)求耍属。
命令處理流程
命令處理流程鏈?zhǔn)牵?code>readQueryFromClient / processInputBuffer / processCommand / call / 對(duì)應(yīng)命令的回調(diào)函數(shù)(c->cmd->proc)托嚣,比如get key
命令的處理回調(diào)函數(shù)為getCommand
。getCommand
的執(zhí)行流程是先到client對(duì)應(yīng)的數(shù)據(jù)庫(kù)字典中根據(jù)key來(lái)查找數(shù)據(jù)厚骗,然后根據(jù)響應(yīng)消息格式將查詢(xún)結(jié)果填充到響應(yīng)消息中示启。
如何添加自定義命令
如何在Redis中添加自定的命令呢?其中只需要改動(dòng)以下幾個(gè)地方就行了领舰,比如自定義命令hello xxx
夫嗓,然后返回redis: xxx
,因?yàn)?code>hello xxx和get key
類(lèi)似冲秽,所以就依葫蘆畫(huà)瓢舍咖。
首先在redisCommandTable
數(shù)組中添加自定義的命令,redisCommandTable
數(shù)組定義在server.c
中劳跃。
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
...
{"hello",helloCommand,2,"rF",0,NULL,1,1,1,0,0},
};
然后在getCommand
定義處后面添加``helloCommand的定義谎仲,
getCommand定義在
t_string.c`中。
void getCommand(client *c) {
getGenericCommand(c);
}
void helloCommand(client *c)
{
char buff[512] = {0};
robj *o = NULL;
sprintf(buff, "%s %s", "redis: ", c->argv[1]);
o = createObject(OBJ_STRING, sdsnewlen(buff, strlen(buff)));
addReplyBulk(c, o);
sdsfree(o);
}
最后在server.h
中添加helloCommand
的聲明刨仑。
void getCommand(client *c);
void helloCommand(client *c);
至此為止郑诺,代碼已經(jīng)編寫(xiě)完畢,編譯redis然后運(yùn)行redis-server杉武。
參考資料:
- redis源碼分析(1)內(nèi)存管理
- redis源碼