(轉(zhuǎn))Memcached多線程模型

原文

關(guān)鍵數(shù)據(jù)結(jié)構(gòu)

  1. CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;        
struct conn_queue_item {                
    int                     sfd;                
    enum conn_states        init_state;               
    int                     event_flags;                
    int                     read_buffer_size;                
    enum network_transport  transport;                
    CQ_ITEM                  *next;        
};

可以將這個(gè)結(jié)構(gòu)體看著是主線程accept觸發(fā)時(shí)即有客戶端連入時(shí),主線程寫入工作線程有關(guān)socket連接相關(guān)句柄數(shù)據(jù)結(jié)構(gòu),綁定了socket描述符、狀態(tài)、發(fā)生的事件、讀buffer大小等树肃,不難對(duì)號(hào)入座。

  1. CQ
typedef struct conn_queue CQ;        
struct conn_queue {            
      CQ_ITEM *head;            
      CQ_ITEM *tail;            
      pthread_mutex_t lock;            
      pthread_cond_t  cond;        
};

這是socket連接通知隊(duì)列瀑罗。

  1. LIBEVENT_THREAD
typedef struct {                
      pthread_t thread_id;        /* unique ID of this thread */                
      struct event_base *base;    /* libevent handle this thread uses */ 
      struct event notify_event;  /* listen event for notify pipe */
      int notify_receive_fd;      /* receiving end of notify pipe */
      int notify_send_fd;         /* sending end of notify pipe */
      struct thread_stats stats;  /* Stats generated by this thread */
      struct conn_queue *new_conn_queue; /* queue of new connections to handle */                
      cache_t *suffix_cache;      /* suffix cache */                
      uint8_t item_lock_type;     /* use fine-grained or global item lock */        
} LIBEVENT_THREAD;

可以將這個(gè)結(jié)構(gòu)體看著是線程句柄數(shù)據(jù)結(jié)構(gòu)胸嘴,綁定了線程ID、Libevent實(shí)例斩祭、用于通知管道的event劣像、通知接收的socket描述符,通知發(fā)送的socket描述符摧玫、socket連接通知隊(duì)列耳奕,這也很好對(duì)號(hào)入座吧。

  1. conn
typedef struct conn conn;        
struct conn {            
      int    sfd;       
       ...            
      struct event event;            
      short  ev_flags;            
      short  which;   /** which events were just triggered */    
        ...            
      LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */        
};

這個(gè)結(jié)構(gòu)非常龐大诬像,提取關(guān)鍵的幾個(gè)字段來分析一下屋群, 可以將這個(gè)結(jié)構(gòu)體看著是socket連接句柄數(shù)據(jù)結(jié)構(gòu),綁定了socket描述符颅停、觸發(fā)的事件谓晌、處理連接的線程指針等,這個(gè)也很好對(duì)號(hào)入座吧癞揉。

整體流程

?整體流程
  1. 在main函數(shù)中調(diào)用main_base = event_init()來初始化主線程Libevent實(shí)例。
  2. 在main函數(shù)中調(diào)用thread_init來初始化工作線程溺欧,并將主線程Libevent實(shí)例作為參數(shù)傳入喊熟。
  3. 在thread_init函數(shù)中為指定數(shù)量的工作線程分配內(nèi)存,為每個(gè)線程創(chuàng)建管道姐刁,并分別綁定到通知收和發(fā)的socket描述符上芥牌,調(diào)用函數(shù)setup_thread初始化線程信息,調(diào)用函數(shù)create_worker為每個(gè)線程注冊(cè)回調(diào)函數(shù)聂使。關(guān)鍵代碼:
for (i = 0; i < nthreads; i++) {
      int fds[2];
      if (pipe(fds)) {
      ...
      }

      threads[i].notify_receive_fd = fds[0];
      threads[i].notify_send_fd = fds[1];

      setup_thread(&threads[i]);
      ...
}

  // Create threads after we've done all the libevent setup. 
  for (i = 0; i < nthreads; i++) {
       create_worker(worker_libevent, &threads[i]);
  }
  1. 在setup_thread函數(shù)中壁拉,為工作線程初始化Libevent實(shí)例谬俄,為主線程通知讀(notify_receive_fd)注冊(cè)回調(diào)函數(shù)thread_libevent_process,初始化cq隊(duì)列弃理,關(guān)鍵代碼如下:
static void setup_thread(LIBEVENT_THREAD *me) {
       me->base = event_init();
       ...
       /* Listen for notifications from other threads */
       event_set(&me->notify_event, me->notify_receive_fd,
                 EV_READ | EV_PERSIST, thread_libevent_process, me);
       event_base_set(me->base, &me->notify_event);

       if (event_add(&me->notify_event, 0) == -1) {
               ...
       }

       me->new_conn_queue = malloc(sizeof(struct conn_queue));
       ...
       cq_init(me->new_conn_queue);
       ...
}
  1. 在thread_libevent_process函數(shù)中溃论,讀取主線程發(fā)送的通知接收消息,將主線程accept來的fd注冊(cè)到工作線程的Libevent實(shí)例中痘昌,主線程accept來的fd從conn_queue隊(duì)列獲取钥勋,關(guān)鍵代碼如下:
static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;
        char buf[1];

        if (read(fd, buf, 1) != 1)
                ...

        switch (buf[0]) {
        case 'c':
        item = cq_pop(me->new_conn_queue);

        if (NULL != item) {
                conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport, me->base);
       ...
        }
}
  1. 在函數(shù)conn_new中,創(chuàng)建conn句柄辆苔,為句柄注冊(cè)回調(diào)函數(shù)event_handler處理事件算灸,將該句柄作為參數(shù)傳入回調(diào)函數(shù)并設(shè)置到Libevent中,該函數(shù)的關(guān)鍵代碼如下:
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, 
                enum network_transport transport,
                struct event_base *base) {
      conn *c = conn_from_freelist();

      if (NULL == c) {
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
                ...
        }
        ...
        event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
        event_base_set(base, &c->event);
        c->ev_flags = event_flags;

        if (event_add(&c->event, 0) == -1) {
              ...
        }
        ...
      }
}
  1. 在create_worker函數(shù)中驻啤,創(chuàng)建工作線程并注冊(cè)回調(diào)函數(shù)菲驴,在工作線程的回調(diào)函數(shù)work_libevent中,開始Libevent主循環(huán)骑冗。
  2. 在main函數(shù)中赊瞬,調(diào)用函數(shù)server_sockets,再調(diào)用函數(shù)server_socket沐旨,進(jìn)而調(diào)用函數(shù)new_socket森逮,在調(diào)用函數(shù)conn_new,創(chuàng)建并注冊(cè)listen fd到主線程Libevent實(shí)例上磁携,最后開始Libevent主循環(huán)即event_base_loop褒侧。在conn_new函數(shù)關(guān)鍵代碼見步驟(6)
  3. 在event_handler函數(shù)中,調(diào)用函數(shù)drive_machine谊迄,在該函數(shù)中處理所有事件闷供,其關(guān)鍵代碼如下:
static void drive_machine(conn *c) {
        ...
        while (!stop) {
            switch(c->state) {
                case conn_listening:
                        addrlen = sizeof(addr);
                        if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                               ...                                       
                        }
                       ...
                       if (settings.maxconns_fast &&
                            stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                                ...
                       } else {
                            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
                      }

                    stop = true;
                    break;
                    ...
                }
            }

        return;
}

在處理事件時(shí),如果是listening事件统诺,則調(diào)用函數(shù)dispatch_conn_new將accept fd分配給工作線程歪脏。

  1. 在dispatch_conn_new函數(shù)中,根據(jù)round-robin算法將新連接push到所分配線程的CQ隊(duì)列中粮呢,并通過管道發(fā)送通知消息“c”婿失,關(guān)鍵代碼如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new();
        char buf[1];
        int tid = (last_thread + 1) % settings.num_threads;

        LIBEVENT_THREAD *thread = threads + tid;

        last_thread = tid;

        ...

        cq_push(thread->new_conn_queue, item);
        ...
        buf[0] = 'c';
        if (write(thread->notify_send_fd, buf, 1) != 1) {
            perror("Writing to thread notify pipe");
        }
}

dispatch_conn_new函數(shù)只在主線程中調(diào)用,last_thread為靜態(tài)變量啄寡,每次將該變量值+1豪硅,再模線程數(shù)來選擇工作線程。

線程模型

Libevent本身是單線程的挺物,Memcached采用消息通知+同步層機(jī)制使得其支持多線程懒浮,整體模型見如下神圖:

?線程模型

每個(gè)線程包括主線程都各自有獨(dú)立的Libevent實(shí)例,Memcached的listen fd注冊(cè)到主線程的Libevent實(shí)例上识藤,由主線程來accept新的連接砚著,接受新的連接后根據(jù)Round-robin算法選擇工作線程次伶,將新連接的socket fd封裝為CQ_ITEM后push到所選工作線程的CQ隊(duì)列中,然后主線程(notify_send_fd)通過管道發(fā)送字符“c”到工作線程(notify_receive_fd)稽穆,而notify_receive_fd已經(jīng)注冊(cè)到工作線程的Libevent實(shí)例上了冠王,這樣工作線程就能收到通知“c”,然后從該工作線程的CQ隊(duì)列中pop出CQ_ITEM進(jìn)而取出新連接并將fd注冊(cè)到工作線程的Libevent實(shí)例上秧骑,從而由工作線程來處理該連接的所有后續(xù)事件版确。 需要注意的數(shù)據(jù):Memcached默認(rèn)開啟線程數(shù)為4,也可以通過參數(shù)-t來指定開啟線程數(shù)乎折,當(dāng)線程數(shù)大于64時(shí)會(huì)給出錯(cuò)誤提示绒疗,建議線程數(shù)為小于或等于CPU核數(shù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末骂澄,一起剝皮案震驚了整個(gè)濱河市吓蘑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坟冲,老刑警劉巖磨镶,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異健提,居然都是意外死亡琳猫,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門私痹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來脐嫂,“玉大人,你說我怎么就攤上這事紊遵≌饲В” “怎么了?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵暗膜,是天一觀的道長(zhǎng)匀奏。 經(jīng)常有香客問我,道長(zhǎng)学搜,這世上最難降的妖魔是什么娃善? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮瑞佩,結(jié)果婚禮上会放,老公的妹妹穿的比我還像新娘。我一直安慰自己钉凌,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布捂人。 她就那樣靜靜地躺著御雕,像睡著了一般矢沿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上酸纲,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天捣鲸,我揣著相機(jī)與錄音,去河邊找鬼闽坡。 笑死栽惶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的疾嗅。 我是一名探鬼主播外厂,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼代承!你這毒婦竟也來了汁蝶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤论悴,失蹤者是張志新(化名)和其女友劉穎掖棉,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體膀估,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡幔亥,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了察纯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帕棉。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖捐寥,靈堂內(nèi)的尸體忽然破棺而出笤昨,到底是詐尸還是另有隱情,我是刑警寧澤握恳,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布瞒窒,位于F島的核電站,受9級(jí)特大地震影響乡洼,放射性物質(zhì)發(fā)生泄漏崇裁。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一束昵、第九天 我趴在偏房一處隱蔽的房頂上張望拔稳。 院中可真熱鬧,春花似錦锹雏、人聲如沸巴比。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽轻绞。三九已至采记,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間政勃,已是汗流浹背唧龄。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留奸远,地道東北人既棺。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像懒叛,于是被迫代替她去往敵國(guó)和親丸冕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容