本章重點(diǎn)走讀 redis 網(wǎng)絡(luò) I/O 的多線程部分源碼。
哈希表 + 內(nèi)存數(shù)據(jù)庫(kù) + 非阻塞系統(tǒng)調(diào)用 + 多路復(fù)用 I/O 事件驅(qū)動(dòng)王凑,使得 redis 單線程處理主邏輯足夠高效山林。當(dāng)并發(fā)上來(lái)后伍伤,數(shù)據(jù)的邏輯處理肯定要占用大量時(shí)間沧烈,那樣阴孟,客戶(hù)端與服務(wù)端通信處理就會(huì)變得遲鈍耘眨。所以在合適的時(shí)候(根據(jù)任務(wù)量自適應(yīng))采用多線程處理昼榛,充分地利用多核優(yōu)勢(shì),分擔(dān)主線程壓力剔难,使得客戶(hù)端和服務(wù)端通信更加敏捷胆屿。
redis 6.0 新增多線程處理網(wǎng)絡(luò) I/O奥喻,默認(rèn)是關(guān)閉的,需要修改配置開(kāi)啟莺掠。對(duì)于這個(gè)新特性衫嵌,redis 作者建議:如果項(xiàng)目確實(shí)遇到性能問(wèn)題,再開(kāi)啟多線程處理網(wǎng)絡(luò)讀寫(xiě)事件彻秆。否則開(kāi)啟沒(méi)什么意義楔绞,還會(huì)浪費(fèi) CPU 資源。線程數(shù)量不要超過(guò) cpu 核心數(shù)量 - 1唇兑,預(yù)留一個(gè)核心酒朵。
?? 文章來(lái)源:wenfh2020.com
1. 配置
多線程這兩個(gè)設(shè)置項(xiàng),默認(rèn)是關(guān)閉的扎附。
# redis.conf
# 配置多線程處理線程個(gè)數(shù)蔫耽,數(shù)量最好少于 cpu 核心,默認(rèn) 4留夜。
# io-threads 4
#
# 多線程是否處理讀事件匙铡,默認(rèn)關(guān)閉。
# io-threads-do-reads no
redis 作者建議:
- 配置線程數(shù)量碍粥,最好少于 cpu 核心鳖眼。起碼預(yù)留一個(gè)空閑核心處理系統(tǒng)其它業(yè)務(wù),線程數(shù)量超過(guò) cpu 核心對(duì) redis 性能有一定影響嚼摩,因?yàn)?redis 主線程處理主邏輯钦讳,如果被系統(tǒng)頻繁切換,效率會(huì)降低枕面。
- 提供了多線程處理網(wǎng)絡(luò)讀事件開(kāi)關(guān)愿卒。多線程處理網(wǎng)絡(luò)讀事件,對(duì) redis 性能影響不大潮秘。redis 作為緩存琼开,查詢(xún)操作的頻率比較大,系統(tǒng)的網(wǎng)絡(luò)瓶頸一般在查詢(xún)返回?cái)?shù)據(jù)枕荞,根據(jù)系統(tǒng)實(shí)際應(yīng)用場(chǎng)景進(jìn)行配置吧稠通。
2. 主線程工作流程
流程圖來(lái)源:《redis 異步網(wǎng)絡(luò)I/O通信流程 - 多線程》
- 主線程通過(guò)事件驅(qū)動(dòng)從內(nèi)核獲取就緒事件,記錄下需要延時(shí)操作的客戶(hù)端連接买猖。
- 多線程并行處理延時(shí)讀事件。
- 多線程處理延時(shí)寫(xiě)事件滋尉。
- 重新執(zhí)行第一步玉控,循環(huán)執(zhí)行。
- 加載循環(huán)事件管理狮惜。
int main(int argc, char **argv) {
...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
- 事件循環(huán)管理高诺。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 向內(nèi)核獲取就緒的可讀可寫(xiě)事件事件進(jìn)行處理碌识,處理時(shí)鐘事件。
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
- 獲取就緒事件處理和處理時(shí)鐘事件虱而。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
// 從內(nèi)核中取出就緒的可讀可寫(xiě)事件筏餐。
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
// 處理讀寫(xiě)事件。
}
...
// 處理時(shí)鐘事件牡拇。
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
...
}
- 讀寫(xiě)邏輯處理魁瞪。
void beforeSleep(struct aeEventLoop *eventLoop) {
...
// write
handleClientsWithPendingWritesUsingThreads();
...
}
void afterSleep(struct aeEventLoop *eventLoop) {
...
// read
handleClientsWithPendingReadsUsingThreads();
}
3. 多線程協(xié)作
流程圖來(lái)源:《redis 異步網(wǎng)絡(luò)I/O通信流程 - 多線程》
3.1. 特點(diǎn)
主線程實(shí)現(xiàn)主邏輯,子線程輔助實(shí)現(xiàn)任務(wù)惠呼。
- redis 主線程實(shí)現(xiàn)主邏輯导俘。
- 主線程與子線程共同處理延時(shí)客戶(hù)端網(wǎng)絡(luò)讀寫(xiě)事件。
- 主線程根據(jù)寫(xiě)事件用戶(hù)量大小剔蹋,開(kāi)啟/關(guān)閉多線程模式旅薄。
- 雖然多線程是并行處理邏輯,但是 redis 整體工作流程是串行的泣崩。
- 當(dāng)主線程處理延時(shí)讀寫(xiě)事件時(shí)少梁,把一次大任務(wù)進(jìn)行取模切割成小任務(wù),平均分配給(主+子)線程處理矫付。這樣每個(gè)客戶(hù)端連接被獨(dú)立的一個(gè)線程處理凯沪,不會(huì)出現(xiàn)多個(gè)線程同時(shí)處理一個(gè)客戶(hù)端連接邏輯。
- 主線程限制多線程子線程同一個(gè)時(shí)間段只能并行處理一種類(lèi)型操作:讀/寫(xiě)技即。
- 主線程先等待子線程處理完任務(wù)了著洼,再進(jìn)行下一步,處理分配給自己的等待事件而叼。
- 主線程在等待子線程處理任務(wù)過(guò)程中身笤,它不是通過(guò)
sleep
掛起線程讓出使用權(quán),而是通過(guò)for
循環(huán)進(jìn)行忙等葵陵,不斷檢測(cè)所有子線程處理的任務(wù)是否已經(jīng)完成液荸,如果完成再進(jìn)行下一步,處理自己的任務(wù)脱篙。相當(dāng)于主線程在等待過(guò)程中娇钱,并沒(méi)有做其它任務(wù),只是讓幫手去干活绊困,幫手都把活干完了文搂,它再干自己的,然后做一些善后工作秤朗。主線程在這里的角色有點(diǎn)像代理商或者包工頭煤蹭。 - 子線程在完成分配的任務(wù)后,也會(huì)通過(guò)
for
循環(huán)忙等,檢測(cè)主線程的工作調(diào)度硝皂,如果任務(wù)很少了常挚,等待主線程通過(guò)鎖,把自己掛起稽物。
3.2. 忙等
多線程模式奄毡,存在忙等現(xiàn)象,這個(gè)處理有點(diǎn)超出了常規(guī)思維贝或。
3.2.1. 源碼實(shí)現(xiàn)
- 主線程分配完任務(wù)后吼过,等待所有子線程完成任務(wù)后,再進(jìn)行下一步操作傀缩。
// write
int handleClientsWithPendingWritesUsingThreads(void) {
...
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
}
// read
int handleClientsWithPendingReadsUsingThreads(void) {
...
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
}
- 子線程完成任務(wù)后那先,保持繁忙狀態(tài),等待主線程上鎖掛起自己赡艰。
void *IOThreadMain(void *myid) {
...
while(1) {
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
...
}
}
3.2.2. 優(yōu)缺點(diǎn)
-
優(yōu)點(diǎn):
- 實(shí)現(xiàn)簡(jiǎn)單售淡,主線程可以通過(guò)鎖開(kāi)啟/暫停多線程工作模式,不需要復(fù)雜的通信慷垮。
- redis 讀寫(xiě)事件處理基本都是內(nèi)存級(jí)別操作揖闸,而且非阻塞,多線程處理任務(wù)非沉仙恚快汤纸。
- 反應(yīng)快,有任務(wù)能實(shí)時(shí)處理芹血。
- 宏觀上看贮泞,主線程是串行處理邏輯,邏輯清晰:讀寫(xiě)邏輯順序處理幔烛。主線程把一次大任務(wù)進(jìn)行取模切割成小任務(wù)啃擦,分配給子線程處理。主線程等子線程完成所有任務(wù)后饿悬,再完成自己的任務(wù)令蛉,再進(jìn)行下一步。
- 因?yàn)槎嗑€程處理的是客戶(hù)端鏈接的延時(shí)讀寫(xiě)邏輯狡恬,redis 服務(wù)應(yīng)用場(chǎng)景作為緩存珠叔,接入對(duì)象一般是服務(wù)端級(jí)別,而不是面向普通用戶(hù)的客戶(hù)端弟劲,所以鏈接不會(huì)太多祷安。而等待的讀寫(xiě)鏈接通過(guò)取模分散到不同的線程去處理,那每個(gè)線程處理的鏈接就會(huì)相對(duì)較少兔乞。每個(gè)線程處理任務(wù)也很快汇鞭。
-
缺點(diǎn):
忙等最大的問(wèn)題是以浪費(fèi)一定 cpu 性能為代價(jià)撇眯,如果 redis 鏈接并發(fā)量不是很高,redis 作者不建議開(kāi)啟多線程模式虱咧,所以主邏輯會(huì)根據(jù)寫(xiě)事件鏈接數(shù)量大小來(lái)開(kāi)啟/暫停多線程工作模式。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
// 如果單線程模式就直接返回锚国。
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
3.3. 源碼分析
3.3.1. 概述
-
網(wǎng)絡(luò)讀寫(xiě)核心接口:
接口 描述 readQueryFromClient 服務(wù)讀客戶(hù)端數(shù)據(jù)腕巡。 writeToClient 服務(wù)向客戶(hù)端寫(xiě)數(shù)據(jù)。 -
多線程工作模式核心接口(
networking.c
)血筑,其它延時(shí)處理邏輯也有一部分源碼绘沉。接口 描述 IOThreadMain 子線程處理邏輯。 initThreadedIO 主線程創(chuàng)建掛起子線程豺总。 startThreadedIO 主線程開(kāi)啟多線程工作模式车伞。 stopThreadedIO 主線程暫停多線程工作模式。 stopThreadedIOIfNeeded 主線程根據(jù)寫(xiě)并發(fā)量是否關(guān)閉多線程工作模式喻喳。 handleClientsWithPendingWritesUsingThreads 主線程多線程處理延時(shí)寫(xiě)事件另玖。 handleClientsWithPendingReadsUsingThreads 主線程多線程處理延時(shí)讀事件。 -
其它延時(shí)處理邏輯表伦,看看下面這些變量和宏在代碼中的邏輯谦去,這里不會(huì)詳細(xì)展開(kāi)。
變量/宏 描述 server.clients_pending_read 延時(shí)處理讀事件的客戶(hù)端連接鏈表蹦哼。 server.clients_pending_write 延時(shí)處理寫(xiě)事件的客戶(hù)端連接鏈表鳄哭。 CLIENT_PENDING_READ 延時(shí)處理讀事件標(biāo)識(shí)。 CLIENT_PENDING_WRITE 延時(shí)處理寫(xiě)事件標(biāo)識(shí)纲熏。 CLIENT_PENDING_COMMAND 延時(shí)處理命令邏輯標(biāo)識(shí)妆丘。
3.3.2. 源碼
-
變量/宏
io_threads_mutex
互斥變量數(shù)組,為了方便主線程喚醒/掛起控制子線程局劲。
io_threads_pending
原子變量勺拣,方便主線程統(tǒng)計(jì)子線程是否已經(jīng)處理完所有任務(wù)。
// 最大線程個(gè)數(shù)容握。
#define IO_THREADS_MAX_NUM 128
// 線程讀操作宣脉。
#define IO_THREADS_OP_READ 0
// 線程寫(xiě)操作。
#define IO_THREADS_OP_WRITE 1
// 線程數(shù)組剔氏。
pthread_t io_threads[IO_THREADS_MAX_NUM];
// 互斥變量數(shù)組塑猖,提供主線程上鎖和解鎖子線程工作。
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
// 原子變量數(shù)組谈跛,分別存儲(chǔ)每個(gè)線程要處理的延時(shí)處理鏈接數(shù)量羊苟。主線程用來(lái)統(tǒng)計(jì)線程是否處理完等待事件,從而進(jìn)行下一步操作感憾。
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
// 是否啟動(dòng)了多線程處理模式蜡励。
int io_threads_active;
// 線程操作類(lèi)型。多線程每次只能處理一種類(lèi)型的操作:讀/寫(xiě)。
int io_threads_op;
// 子線程列表凉倚,子線程個(gè)數(shù)為 IO_THREADS_MAX_NUM - 1兼都,因?yàn)橹骶€程也會(huì)處理延時(shí)任務(wù)。
list *io_threads_list[IO_THREADS_MAX_NUM];
- 主線程創(chuàng)建子線程
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
if (server.io_threads_num == 1) return;
// 檢查配置的線程數(shù)量是否超出限制稽寒。
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
// 創(chuàng)建 server.io_threads_num - 1 個(gè)子線程扮碧。
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
// 0 號(hào)線程不創(chuàng)建,0 號(hào)就是主線程杏糙,主線程也會(huì)處理任務(wù)邏輯慎王。
if (i == 0) continue;
// 創(chuàng)建子線程,主線程先對(duì)子線程上鎖宏侍,掛起子線程赖淤,不讓子線程進(jìn)入工作模式。
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
- 開(kāi)啟多線程模式
void startThreadedIO(void) {
serverAssert(io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
// 子線程因?yàn)樯湘i等待主線程解鎖谅河,當(dāng)主線程解鎖子線程咱旱,子線程重新進(jìn)入工作狀態(tài)。
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
}
- 子線程邏輯處理
void *IOThreadMain(void *myid) {
// 每個(gè)線程在創(chuàng)建的時(shí)候會(huì)產(chǎn)生一個(gè)業(yè)務(wù) id旧蛾。
long id = (unsigned long)myid;
while(1) {
// 替代 sleep莽龟,用忙等,這樣能實(shí)時(shí)處理業(yè)務(wù)锨天。但是也付出了耗費(fèi) cpu 的代價(jià)毯盈。
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 留機(jī)會(huì)給主線程上鎖,掛起當(dāng)前子線程病袄。
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
// 根據(jù)操作類(lèi)型搂赋,處理對(duì)應(yīng)的讀/寫(xiě)邏輯。
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
}
}
- 是否需要停止多線程模式
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
// 如果單線程模式就直接返回益缠。
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
- 暫停多線程處理模式
void stopThreadedIO(void) {
// 在停止線程前脑奠,仍然有等待處理的延時(shí)讀數(shù)據(jù)處理,需要先處理再停止線程幅慌。
handleClientsWithPendingReadsUsingThreads();
serverAssert(io_threads_active == 1);
// 主給子線程上鎖宋欺,掛起子線程。
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
- 處理延時(shí)的讀事件
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
// 將等待處理的鏈接胰伍,通過(guò)取模放進(jìn)不同的隊(duì)列中去齿诞。
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 分別統(tǒng)計(jì)每個(gè)隊(duì)列要處理鏈接的個(gè)數(shù)。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主線程處理第一個(gè)隊(duì)列骂租。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 讀客戶(hù)端發(fā)送的數(shù)據(jù)到緩存祷杈。
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 主線程處理完任務(wù)后,忙等其它線程渗饮,全部線程處理完任務(wù)后但汞,再處理命令實(shí)現(xiàn)邏輯宿刮。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
/* 主線程處理命令邏輯,因?yàn)殒溄佣紭?biāo)識(shí)了等待狀態(tài)私蕾,讀完數(shù)據(jù)后命令對(duì)應(yīng)的業(yè)務(wù)邏輯還沒(méi)有被處理僵缺。
* 這里去掉等待標(biāo)識(shí),處理命令業(yè)務(wù)邏輯踩叭。*/
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
// 讀取數(shù)據(jù)谤饭,解析協(xié)議取出命令參數(shù),執(zhí)行命令懊纳,填充回復(fù)緩沖區(qū)。
processCommandAndResetClient(c);
}
// 繼續(xù)解析協(xié)議亡容,取出命令參數(shù)嗤疯,執(zhí)行命令,填充回復(fù)緩沖區(qū)闺兢。
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
return processed;
}
- 處理延時(shí)的寫(xiě)事件
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
// 如果延時(shí)寫(xiě)事件對(duì)應(yīng)的 client 鏈接很少茂缚,關(guān)閉多線程模式,用主線程處理異步邏輯屋谭。
if (stopThreadedIOIfNeeded()) {
// 處理延時(shí)寫(xiě)事件脚囊。
return handleClientsWithPendingWrites();
}
if (!io_threads_active) startThreadedIO();
// 將等待處理的鏈接,通過(guò)取模放進(jìn)不同的隊(duì)列中去桐磁,去掉延遲寫(xiě)標(biāo)識(shí)悔耘。
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 線程處理寫(xiě)事件。
io_threads_op = IO_THREADS_OP_WRITE;
// 分別統(tǒng)計(jì)每個(gè)隊(duì)列要處理鏈接的個(gè)數(shù)我擂。
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主線程處理第一個(gè)隊(duì)列衬以。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 寫(xiě)數(shù)據(jù),發(fā)送給回復(fù)給客戶(hù)端校摩。
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 主線程處理完任務(wù)后看峻,忙等其它線程,全部線程處理完任務(wù)后衙吩,再處理命令實(shí)現(xiàn)邏輯互妓。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 如果緩存中還有沒(méi)有發(fā)送完的數(shù)據(jù),繼續(xù)發(fā)送或者下次繼續(xù)發(fā)坤塞,否則從事件驅(qū)動(dòng)刪除 fd 注冊(cè)的可寫(xiě)事件冯勉。
if (clientHasPendingReplies(c)
&& connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) {
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
4. 數(shù)據(jù)結(jié)構(gòu)
redisServer
和 client
分別 redis 是服務(wù)端和客戶(hù)端的數(shù)據(jù)結(jié)構(gòu),理解結(jié)構(gòu)的成員作用是走讀源碼邏輯的關(guān)鍵尺锚。有興趣的朋友下個(gè)斷點(diǎn)跑下邏輯珠闰,細(xì)節(jié)就不詳細(xì)展開(kāi)了。
- 客戶(hù)端結(jié)構(gòu)
// server.h
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
...
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
...
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
...
}
- 服務(wù)端結(jié)構(gòu)
struct redisServer {
...
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
...
}
5. 測(cè)試
8 核心瘫辩,16G 內(nèi)存伏嗜, mac book 本地測(cè)試坛悉。
redis 服務(wù)默認(rèn)開(kāi) 4 線程,壓測(cè)工具開(kāi) 2 線程承绸。有剩余核心處理機(jī)器的其它業(yè)務(wù)裸影,這樣不影響 redis 工作。
Linux 系統(tǒng)军熏,如果安裝不了 redis 最新版本轩猩,請(qǐng)升級(jí)系統(tǒng)
gcc
版本。
- 配置荡澎,多線程模式測(cè)試均践,開(kāi)啟讀寫(xiě)兩個(gè)選項(xiàng);單線程模式測(cè)試則會(huì)關(guān)閉摩幔。
# redis.conf
io-threads 4
io-threads-do-reads yes
- 壓測(cè)命令彤委,會(huì)針對(duì)客戶(hù)端鏈接數(shù)/測(cè)試包體大小進(jìn)行測(cè)試。
命令邏輯已整理成腳本或衡,放到 github焦影,順手錄制了測(cè)試視頻:壓力測(cè)試 redis 多線程處理網(wǎng)絡(luò) I/O。
# 壓測(cè)工具會(huì)模擬多個(gè)終端封断,防止超出限制斯辰,被停止。
ulimit -n 16384
# 可以設(shè)置對(duì)應(yīng)的鏈接數(shù)/包體大小進(jìn)行測(cè)試坡疼。
./redis-benchmark -c xxxx -r 1000000 -n 100000 -t set,get -q --threads 2 -d yyyy
- 壓測(cè)結(jié)果
在 mac book 上測(cè)試彬呻,從測(cè)試結(jié)果看,多線程沒(méi)有單線程好柄瑰》掀瘢看到網(wǎng)上很多同學(xué)用壓測(cè)工具測(cè)試,性能有很大的提升狱意,有時(shí)間用其它機(jī)器跑下湖苞。可能是機(jī)器配置不一樣详囤,但是至少一點(diǎn)财骨,這個(gè)多線程功能目前還有很大的優(yōu)化空間,所以新特性藏姐,還需要放到真實(shí)環(huán)境中測(cè)試過(guò)隆箩,才能投產(chǎn)。
6. 總結(jié)
- 多線程模式使得網(wǎng)絡(luò)讀寫(xiě)快速處理羔杨。
- 多線程模式會(huì)浪費(fèi)一定 cpu捌臊,并發(fā)量不高不建議開(kāi)啟多線程模式。
- 主線程實(shí)現(xiàn)主邏輯兜材,子線程輔助完成任務(wù)理澎。
- redis 即便開(kāi)啟多線程模式處理網(wǎng)絡(luò)讀寫(xiě)事件逞力,宏觀邏輯還是串行的。
- 實(shí)踐是檢驗(yàn)真理的試金石糠爬,壓測(cè)過(guò)程中寇荧,單線程比多線程優(yōu)秀,沒(méi)有體現(xiàn)出多線程應(yīng)有的性能提升执隧,其它尚待驗(yàn)證揩抡。