[redis 源碼走讀] 多線程通信 I/O

本章重點(diǎn)走讀 redis 網(wǎng)絡(luò) I/O 的多線程部分源碼。

哈希表 + 內(nèi)存數(shù)據(jù)庫(kù) + 非阻塞系統(tǒng)調(diào)用 + 多路復(fù)用 I/O 事件驅(qū)動(dòng)王凑,使得 redis 單線程處理主邏輯足夠高效山林。當(dāng)并發(fā)上來(lái)后伍伤,數(shù)據(jù)的邏輯處理肯定要占用大量時(shí)間沧烈,那樣阴孟,客戶(hù)端與服務(wù)端通信處理就會(huì)變得遲鈍耘眨。所以在合適的時(shí)候(根據(jù)任務(wù)量自適應(yīng))采用多線程處理昼榛,充分地利用多核優(yōu)勢(shì),分擔(dān)主線程壓力剔难,使得客戶(hù)端和服務(wù)端通信更加敏捷胆屿。


redis 6.0 新增多線程處理網(wǎng)絡(luò) I/O奥喻,默認(rèn)是關(guān)閉的,需要修改配置開(kāi)啟莺掠。對(duì)于這個(gè)新特性衫嵌,redis 作者建議:如果項(xiàng)目確實(shí)遇到性能問(wèn)題,再開(kāi)啟多線程處理網(wǎng)絡(luò)讀寫(xiě)事件彻秆。否則開(kāi)啟沒(méi)什么意義楔绞,還會(huì)浪費(fèi) CPU 資源。線程數(shù)量不要超過(guò) cpu 核心數(shù)量 - 1唇兑,預(yù)留一個(gè)核心酒朵。


?? 文章來(lái)源:wenfh2020.com


1. 配置

多線程這兩個(gè)設(shè)置項(xiàng),默認(rèn)是關(guān)閉的扎附。

# redis.conf

# 配置多線程處理線程個(gè)數(shù)蔫耽,數(shù)量最好少于 cpu 核心,默認(rèn) 4留夜。
# io-threads 4
#
# 多線程是否處理讀事件匙铡,默認(rèn)關(guān)閉。
# io-threads-do-reads no

redis 作者建議:

  • 配置線程數(shù)量碍粥,最好少于 cpu 核心鳖眼。起碼預(yù)留一個(gè)空閑核心處理系統(tǒng)其它業(yè)務(wù),線程數(shù)量超過(guò) cpu 核心對(duì) redis 性能有一定影響嚼摩,因?yàn)?redis 主線程處理主邏輯钦讳,如果被系統(tǒng)頻繁切換,效率會(huì)降低枕面。
  • 提供了多線程處理網(wǎng)絡(luò)讀事件開(kāi)關(guān)愿卒。多線程處理網(wǎng)絡(luò)讀事件,對(duì) redis 性能影響不大潮秘。redis 作為緩存琼开,查詢(xún)操作的頻率比較大,系統(tǒng)的網(wǎng)絡(luò)瓶頸一般在查詢(xún)返回?cái)?shù)據(jù)枕荞,根據(jù)系統(tǒng)實(shí)際應(yīng)用場(chǎng)景進(jìn)行配置吧稠通。

2. 主線程工作流程

redis 多線程I/O通信流程

流程圖來(lái)源:《redis 異步網(wǎng)絡(luò)I/O通信流程 - 多線程

  1. 主線程通過(guò)事件驅(qū)動(dòng)從內(nèi)核獲取就緒事件,記錄下需要延時(shí)操作的客戶(hù)端連接买猖。
  2. 多線程并行處理延時(shí)讀事件。
  3. 多線程處理延時(shí)寫(xiě)事件滋尉。
  4. 重新執(zhí)行第一步玉控,循環(huán)執(zhí)行。

  • 加載循環(huán)事件管理狮惜。
int main(int argc, char **argv) {
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    ...
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}
  • 事件循環(huán)管理高诺。
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 向內(nèi)核獲取就緒的可讀可寫(xiě)事件事件進(jìn)行處理碌识,處理時(shí)鐘事件。
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
  • 獲取就緒事件處理和處理時(shí)鐘事件虱而。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    ...
    // 從內(nèi)核中取出就緒的可讀可寫(xiě)事件筏餐。
    numevents = aeApiPoll(eventLoop, tvp);

    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop);

    for (j = 0; j < numevents; j++) {
        // 處理讀寫(xiě)事件。
    }
    ...
    // 處理時(shí)鐘事件牡拇。
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    ...
}
  • 讀寫(xiě)邏輯處理魁瞪。
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    // write
    handleClientsWithPendingWritesUsingThreads();
    ...
}

void afterSleep(struct aeEventLoop *eventLoop) {
    ...
    // read
    handleClientsWithPendingReadsUsingThreads();
}

3. 多線程協(xié)作

redis 多線程I/O通信流程

流程圖來(lái)源:《redis 異步網(wǎng)絡(luò)I/O通信流程 - 多線程


3.1. 特點(diǎn)

主線程實(shí)現(xiàn)主邏輯,子線程輔助實(shí)現(xiàn)任務(wù)惠呼。

  • redis 主線程實(shí)現(xiàn)主邏輯导俘。
  • 主線程與子線程共同處理延時(shí)客戶(hù)端網(wǎng)絡(luò)讀寫(xiě)事件。
  • 主線程根據(jù)寫(xiě)事件用戶(hù)量大小剔蹋,開(kāi)啟/關(guān)閉多線程模式旅薄。
  • 雖然多線程是并行處理邏輯,但是 redis 整體工作流程是串行的泣崩。
  • 當(dāng)主線程處理延時(shí)讀寫(xiě)事件時(shí)少梁,把一次大任務(wù)進(jìn)行取模切割成小任務(wù),平均分配給(主+子)線程處理矫付。這樣每個(gè)客戶(hù)端連接被獨(dú)立的一個(gè)線程處理凯沪,不會(huì)出現(xiàn)多個(gè)線程同時(shí)處理一個(gè)客戶(hù)端連接邏輯。
  • 主線程限制多線程子線程同一個(gè)時(shí)間段只能并行處理一種類(lèi)型操作:讀/寫(xiě)技即。
  • 主線程先等待子線程處理完任務(wù)了著洼,再進(jìn)行下一步,處理分配給自己的等待事件而叼。
  • 主線程在等待子線程處理任務(wù)過(guò)程中身笤,它不是通過(guò) sleep 掛起線程讓出使用權(quán),而是通過(guò) for 循環(huán)進(jìn)行忙等葵陵,不斷檢測(cè)所有子線程處理的任務(wù)是否已經(jīng)完成液荸,如果完成再進(jìn)行下一步,處理自己的任務(wù)脱篙。相當(dāng)于主線程在等待過(guò)程中娇钱,并沒(méi)有做其它任務(wù),只是讓幫手去干活绊困,幫手都把活干完了文搂,它再干自己的,然后做一些善后工作秤朗。主線程在這里的角色有點(diǎn)像代理商或者包工頭煤蹭。
  • 子線程在完成分配的任務(wù)后,也會(huì)通過(guò) for 循環(huán)忙等,檢測(cè)主線程的工作調(diào)度硝皂,如果任務(wù)很少了常挚,等待主線程通過(guò)鎖,把自己掛起稽物。

3.2. 忙等

多線程模式奄毡,存在忙等現(xiàn)象,這個(gè)處理有點(diǎn)超出了常規(guī)思維贝或。


3.2.1. 源碼實(shí)現(xiàn)

  • 主線程分配完任務(wù)后吼过,等待所有子線程完成任務(wù)后,再進(jìn)行下一步操作傀缩。
// write
int handleClientsWithPendingWritesUsingThreads(void) {
    ...
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    ...
}

// read
int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    ...
}
  • 子線程完成任務(wù)后那先,保持繁忙狀態(tài),等待主線程上鎖掛起自己赡艰。
void *IOThreadMain(void *myid) {
    ...
    while(1) {
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        ...
    }
}

3.2.2. 優(yōu)缺點(diǎn)

  • 優(yōu)點(diǎn):

    1. 實(shí)現(xiàn)簡(jiǎn)單售淡,主線程可以通過(guò)鎖開(kāi)啟/暫停多線程工作模式,不需要復(fù)雜的通信慷垮。
    2. redis 讀寫(xiě)事件處理基本都是內(nèi)存級(jí)別操作揖闸,而且非阻塞,多線程處理任務(wù)非沉仙恚快汤纸。
    3. 反應(yīng)快,有任務(wù)能實(shí)時(shí)處理芹血。
    4. 宏觀上看贮泞,主線程是串行處理邏輯,邏輯清晰:讀寫(xiě)邏輯順序處理幔烛。主線程把一次大任務(wù)進(jìn)行取模切割成小任務(wù)啃擦,分配給子線程處理。主線程等子線程完成所有任務(wù)后饿悬,再完成自己的任務(wù)令蛉,再進(jìn)行下一步。
    5. 因?yàn)槎嗑€程處理的是客戶(hù)端鏈接的延時(shí)讀寫(xiě)邏輯狡恬,redis 服務(wù)應(yīng)用場(chǎng)景作為緩存珠叔,接入對(duì)象一般是服務(wù)端級(jí)別,而不是面向普通用戶(hù)的客戶(hù)端弟劲,所以鏈接不會(huì)太多祷安。而等待的讀寫(xiě)鏈接通過(guò)取模分散到不同的線程去處理,那每個(gè)線程處理的鏈接就會(huì)相對(duì)較少兔乞。每個(gè)線程處理任務(wù)也很快汇鞭。
  • 缺點(diǎn):

    忙等最大的問(wèn)題是以浪費(fèi)一定 cpu 性能為代價(jià)撇眯,如果 redis 鏈接并發(fā)量不是很高,redis 作者不建議開(kāi)啟多線程模式虱咧,所以主邏輯會(huì)根據(jù)寫(xiě)事件鏈接數(shù)量大小來(lái)開(kāi)啟/暫停多線程工作模式。

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    // 如果單線程模式就直接返回锚国。
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

3.3. 源碼分析

3.3.1. 概述

  • 網(wǎng)絡(luò)讀寫(xiě)核心接口:

    接口 描述
    readQueryFromClient 服務(wù)讀客戶(hù)端數(shù)據(jù)腕巡。
    writeToClient 服務(wù)向客戶(hù)端寫(xiě)數(shù)據(jù)。
  • 多線程工作模式核心接口(networking.c)血筑,其它延時(shí)處理邏輯也有一部分源碼绘沉。

    接口 描述
    IOThreadMain 子線程處理邏輯。
    initThreadedIO 主線程創(chuàng)建掛起子線程豺总。
    startThreadedIO 主線程開(kāi)啟多線程工作模式车伞。
    stopThreadedIO 主線程暫停多線程工作模式。
    stopThreadedIOIfNeeded 主線程根據(jù)寫(xiě)并發(fā)量是否關(guān)閉多線程工作模式喻喳。
    handleClientsWithPendingWritesUsingThreads 主線程多線程處理延時(shí)寫(xiě)事件另玖。
    handleClientsWithPendingReadsUsingThreads 主線程多線程處理延時(shí)讀事件。
  • 其它延時(shí)處理邏輯表伦,看看下面這些變量和宏在代碼中的邏輯谦去,這里不會(huì)詳細(xì)展開(kāi)。

    變量/宏 描述
    server.clients_pending_read 延時(shí)處理讀事件的客戶(hù)端連接鏈表蹦哼。
    server.clients_pending_write 延時(shí)處理寫(xiě)事件的客戶(hù)端連接鏈表鳄哭。
    CLIENT_PENDING_READ 延時(shí)處理讀事件標(biāo)識(shí)。
    CLIENT_PENDING_WRITE 延時(shí)處理寫(xiě)事件標(biāo)識(shí)纲熏。
    CLIENT_PENDING_COMMAND 延時(shí)處理命令邏輯標(biāo)識(shí)妆丘。

3.3.2. 源碼

  • 變量/宏

    io_threads_mutex 互斥變量數(shù)組,為了方便主線程喚醒/掛起控制子線程局劲。
    io_threads_pending 原子變量勺拣,方便主線程統(tǒng)計(jì)子線程是否已經(jīng)處理完所有任務(wù)。

// 最大線程個(gè)數(shù)容握。
#define IO_THREADS_MAX_NUM 128

// 線程讀操作宣脉。
#define IO_THREADS_OP_READ 0

// 線程寫(xiě)操作。
#define IO_THREADS_OP_WRITE 1

// 線程數(shù)組剔氏。
pthread_t io_threads[IO_THREADS_MAX_NUM];

// 互斥變量數(shù)組塑猖,提供主線程上鎖和解鎖子線程工作。
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];

// 原子變量數(shù)組谈跛,分別存儲(chǔ)每個(gè)線程要處理的延時(shí)處理鏈接數(shù)量羊苟。主線程用來(lái)統(tǒng)計(jì)線程是否處理完等待事件,從而進(jìn)行下一步操作感憾。
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

// 是否啟動(dòng)了多線程處理模式蜡励。
int io_threads_active;

// 線程操作類(lèi)型。多線程每次只能處理一種類(lèi)型的操作:讀/寫(xiě)。
int io_threads_op;

// 子線程列表凉倚,子線程個(gè)數(shù)為 IO_THREADS_MAX_NUM - 1兼都,因?yàn)橹骶€程也會(huì)處理延時(shí)任務(wù)。
list *io_threads_list[IO_THREADS_MAX_NUM];

  • 主線程創(chuàng)建子線程
void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    if (server.io_threads_num == 1) return;

    // 檢查配置的線程數(shù)量是否超出限制稽寒。
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    // 創(chuàng)建 server.io_threads_num - 1 個(gè)子線程扮碧。
    for (int i = 0; i < server.io_threads_num; i++) {
        io_threads_list[i] = listCreate();

        // 0 號(hào)線程不創(chuàng)建,0 號(hào)就是主線程杏糙,主線程也會(huì)處理任務(wù)邏輯慎王。
        if (i == 0) continue;

        // 創(chuàng)建子線程,主線程先對(duì)子線程上鎖宏侍,掛起子線程赖淤,不讓子線程進(jìn)入工作模式。
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]);
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
  • 開(kāi)啟多線程模式
void startThreadedIO(void) {
    serverAssert(io_threads_active == 0);
    for (int j = 1; j < server.io_threads_num; j++)
        // 子線程因?yàn)樯湘i等待主線程解鎖谅河,當(dāng)主線程解鎖子線程咱旱,子線程重新進(jìn)入工作狀態(tài)。
        pthread_mutex_unlock(&io_threads_mutex[j]);
    io_threads_active = 1;
}
  • 子線程邏輯處理
void *IOThreadMain(void *myid) {
    // 每個(gè)線程在創(chuàng)建的時(shí)候會(huì)產(chǎn)生一個(gè)業(yè)務(wù) id旧蛾。
    long id = (unsigned long)myid;

    while(1) {
        // 替代 sleep莽龟,用忙等,這樣能實(shí)時(shí)處理業(yè)務(wù)锨天。但是也付出了耗費(fèi) cpu 的代價(jià)毯盈。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 留機(jī)會(huì)給主線程上鎖,掛起當(dāng)前子線程病袄。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        // 根據(jù)操作類(lèi)型搂赋,處理對(duì)應(yīng)的讀/寫(xiě)邏輯。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
    }
}
  • 是否需要停止多線程模式
int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    // 如果單線程模式就直接返回益缠。
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}
  • 暫停多線程處理模式
void stopThreadedIO(void) {
    // 在停止線程前脑奠,仍然有等待處理的延時(shí)讀數(shù)據(jù)處理,需要先處理再停止線程幅慌。
    handleClientsWithPendingReadsUsingThreads();

    serverAssert(io_threads_active == 1);

    // 主給子線程上鎖宋欺,掛起子線程。
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    io_threads_active = 0;
}
  • 處理延時(shí)的讀事件
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    // 將等待處理的鏈接胰伍,通過(guò)取模放進(jìn)不同的隊(duì)列中去齿诞。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 分別統(tǒng)計(jì)每個(gè)隊(duì)列要處理鏈接的個(gè)數(shù)。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主線程處理第一個(gè)隊(duì)列骂租。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 讀客戶(hù)端發(fā)送的數(shù)據(jù)到緩存祷杈。
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 主線程處理完任務(wù)后,忙等其它線程渗饮,全部線程處理完任務(wù)后但汞,再處理命令實(shí)現(xiàn)邏輯宿刮。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    /* 主線程處理命令邏輯,因?yàn)殒溄佣紭?biāo)識(shí)了等待狀態(tài)私蕾,讀完數(shù)據(jù)后命令對(duì)應(yīng)的業(yè)務(wù)邏輯還沒(méi)有被處理僵缺。
     * 這里去掉等待標(biāo)識(shí),處理命令業(yè)務(wù)邏輯踩叭。*/
    listRewind(server.clients_pending_read,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~ CLIENT_PENDING_COMMAND;
            // 讀取數(shù)據(jù)谤饭,解析協(xié)議取出命令參數(shù),執(zhí)行命令懊纳,填充回復(fù)緩沖區(qū)。
            processCommandAndResetClient(c);
        }
        // 繼續(xù)解析協(xié)議亡容,取出命令參數(shù)嗤疯,執(zhí)行命令,填充回復(fù)緩沖區(qū)闺兢。
        processInputBufferAndReplicate(c);
    }
    listEmpty(server.clients_pending_read);
    return processed;
}
  • 處理延時(shí)的寫(xiě)事件
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0;

    // 如果延時(shí)寫(xiě)事件對(duì)應(yīng)的 client 鏈接很少茂缚,關(guān)閉多線程模式,用主線程處理異步邏輯屋谭。
    if (stopThreadedIOIfNeeded()) {
        // 處理延時(shí)寫(xiě)事件脚囊。
        return handleClientsWithPendingWrites();
    }

    if (!io_threads_active) startThreadedIO();

    // 將等待處理的鏈接,通過(guò)取模放進(jìn)不同的隊(duì)列中去桐磁,去掉延遲寫(xiě)標(biāo)識(shí)悔耘。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 線程處理寫(xiě)事件。
    io_threads_op = IO_THREADS_OP_WRITE;

    // 分別統(tǒng)計(jì)每個(gè)隊(duì)列要處理鏈接的個(gè)數(shù)我擂。
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主線程處理第一個(gè)隊(duì)列衬以。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 寫(xiě)數(shù)據(jù),發(fā)送給回復(fù)給客戶(hù)端校摩。
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 主線程處理完任務(wù)后看峻,忙等其它線程,全部線程處理完任務(wù)后衙吩,再處理命令實(shí)現(xiàn)邏輯互妓。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 如果緩存中還有沒(méi)有發(fā)送完的數(shù)據(jù),繼續(xù)發(fā)送或者下次繼續(xù)發(fā)坤塞,否則從事件驅(qū)動(dòng)刪除 fd 注冊(cè)的可寫(xiě)事件冯勉。
        if (clientHasPendingReplies(c)
            && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

4. 數(shù)據(jù)結(jié)構(gòu)

redisServerclient 分別 redis 是服務(wù)端和客戶(hù)端的數(shù)據(jù)結(jié)構(gòu),理解結(jié)構(gòu)的成員作用是走讀源碼邏輯的關(guān)鍵尺锚。有興趣的朋友下個(gè)斷點(diǎn)跑下邏輯珠闰,細(xì)節(jié)就不詳細(xì)展開(kāi)了。

用 gdb 調(diào)試 redis

  • 客戶(hù)端結(jié)構(gòu)
// server.h
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    connection *conn;
    ...
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    ...
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
    ...
}
  • 服務(wù)端結(jié)構(gòu)
struct redisServer {
    ...
    list *clients;              /* List of active clients */
    list *clients_to_close;     /* Clients to close asynchronously */
    list *clients_pending_write; /* There is to write or install handler. */
    list *clients_pending_read;  /* Client has pending read socket buffers. */
    ...
}

5. 測(cè)試

8 核心瘫辩,16G 內(nèi)存伏嗜, mac book 本地測(cè)試坛悉。

redis 服務(wù)默認(rèn)開(kāi) 4 線程,壓測(cè)工具開(kāi) 2 線程承绸。有剩余核心處理機(jī)器的其它業(yè)務(wù)裸影,這樣不影響 redis 工作。

Linux 系統(tǒng)军熏,如果安裝不了 redis 最新版本轩猩,請(qǐng)升級(jí)系統(tǒng) gcc 版本。

  • 配置荡澎,多線程模式測(cè)試均践,開(kāi)啟讀寫(xiě)兩個(gè)選項(xiàng);單線程模式測(cè)試則會(huì)關(guān)閉摩幔。
# redis.conf

io-threads 4
io-threads-do-reads yes
  • 壓測(cè)命令彤委,會(huì)針對(duì)客戶(hù)端鏈接數(shù)/測(cè)試包體大小進(jìn)行測(cè)試。

命令邏輯已整理成腳本或衡,放到 github焦影,順手錄制了測(cè)試視頻:壓力測(cè)試 redis 多線程處理網(wǎng)絡(luò) I/O

# 壓測(cè)工具會(huì)模擬多個(gè)終端封断,防止超出限制斯辰,被停止。
ulimit -n 16384

# 可以設(shè)置對(duì)應(yīng)的鏈接數(shù)/包體大小進(jìn)行測(cè)試坡疼。
./redis-benchmark -c xxxx -r 1000000 -n 100000 -t set,get -q --threads 2  -d yyyy
  • 壓測(cè)結(jié)果

在 mac book 上測(cè)試彬呻,從測(cè)試結(jié)果看,多線程沒(méi)有單線程好柄瑰》掀瘢看到網(wǎng)上很多同學(xué)用壓測(cè)工具測(cè)試,性能有很大的提升狱意,有時(shí)間用其它機(jī)器跑下湖苞。可能是機(jī)器配置不一樣详囤,但是至少一點(diǎn)财骨,這個(gè)多線程功能目前還有很大的優(yōu)化空間,所以新特性藏姐,還需要放到真實(shí)環(huán)境中測(cè)試過(guò)隆箩,才能投產(chǎn)。

redis 壓測(cè)過(guò)程

6. 總結(jié)

  • 多線程模式使得網(wǎng)絡(luò)讀寫(xiě)快速處理羔杨。
  • 多線程模式會(huì)浪費(fèi)一定 cpu捌臊,并發(fā)量不高不建議開(kāi)啟多線程模式。
  • 主線程實(shí)現(xiàn)主邏輯兜材,子線程輔助完成任務(wù)理澎。
  • redis 即便開(kāi)啟多線程模式處理網(wǎng)絡(luò)讀寫(xiě)事件逞力,宏觀邏輯還是串行的。
  • 實(shí)踐是檢驗(yàn)真理的試金石糠爬,壓測(cè)過(guò)程中寇荧,單線程比多線程優(yōu)秀,沒(méi)有體現(xiàn)出多線程應(yīng)有的性能提升执隧,其它尚待驗(yàn)證揩抡。

7. 參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市镀琉,隨后出現(xiàn)的幾起案子峦嗤,更是在濱河造成了極大的恐慌,老刑警劉巖屋摔,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寻仗,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡凡壤,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)耙替,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)亚侠,“玉大人,你說(shuō)我怎么就攤上這事俗扇∠趵茫” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵铜幽,是天一觀的道長(zhǎng)滞谢。 經(jīng)常有香客問(wèn)我,道長(zhǎng)除抛,這世上最難降的妖魔是什么狮杨? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮到忽,結(jié)果婚禮上橄教,老公的妹妹穿的比我還像新娘。我一直安慰自己喘漏,他們只是感情好护蝶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著翩迈,像睡著了一般持灰。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上负饲,一...
    開(kāi)封第一講書(shū)人閱讀 51,692評(píng)論 1 305
  • 那天堤魁,我揣著相機(jī)與錄音喂链,去河邊找鬼。 笑死姨涡,一個(gè)胖子當(dāng)著我的面吹牛衩藤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播涛漂,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赏表,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了匈仗?” 一聲冷哼從身側(cè)響起瓢剿,我...
    開(kāi)封第一講書(shū)人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎悠轩,沒(méi)想到半個(gè)月后间狂,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡火架,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年鉴象,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片何鸡。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡纺弊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出骡男,到底是詐尸還是另有隱情淆游,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布隔盛,位于F島的核電站犹菱,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏吮炕。R本人自食惡果不足惜腊脱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望龙亲。 院中可真熱鬧虑椎,春花似錦、人聲如沸俱笛。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)迎膜。三九已至泥技,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間磕仅,已是汗流浹背珊豹。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工簸呈, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人店茶。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓蜕便,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親贩幻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子轿腺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355