nfs-ganesha - thread model - work pool

work pool

內(nèi)部維護(hù)一個(gè)隊(duì)列刃榨,生產(chǎn)者調(diào)用work_pool_submit將entry插入隊(duì)列衰猛,有多個(gè)線程作為消費(fèi)者,去處理隊(duì)列中的entry无切。線程的數(shù)量會(huì)根據(jù)entry的個(gè)數(shù)自動(dòng)調(diào)節(jié)荡短。

pqh.qcount

pool->pqh.qcount小于0時(shí)候,代表有幾個(gè)entry在隊(duì)列中等待處理哆键。大于0時(shí)候掘托,代表有幾個(gè)沒事干的線程在睡覺。

彈性的調(diào)節(jié)線程數(shù)量

當(dāng)有所有線程都在干活且隊(duì)列里還有entry沒有處理籍嘹,或者當(dāng)有很少的線程睡覺時(shí)(流出一些裕量)闪盔,創(chuàng)建更多的處理線程。當(dāng)沒事做睡覺的線程太多時(shí)辱士,退出一些線程泪掀。

work_pool_submit的處理方法

當(dāng)所有線程都干活的時(shí)候, work_pool_submit將entry插入隊(duì)列颂碘。當(dāng)有一些沒事干而睡覺的線程時(shí)异赫,喚醒其中一個(gè),讓它去處理這個(gè)entry头岔。

svc_work_pool全局變量

struct work_pool svc_work_pool

(gdb) p svc_work_pool
$23 = {
  pqh = {
    qh = { //work pool entry處理隊(duì)列
      tqh_first = 0x7fffd80008c0, //隊(duì)列的第一個(gè)元素(每個(gè)元素都有prev和next指針)
      tqh_last = 0x7fffe00008c0 //隊(duì)列的最后一個(gè)元素
    },
    qmutex = {... }, //mutex
    qsize = 0,
    qcount = 10  //此值大于0塔拳,說明有10個(gè)線程無事可做在睡覺。如果小于零峡竣,說明所有線程都在干活靠抑,隊(duì)列中有多少entry等待處理
  },
  wptqh = { //working threads隊(duì)列
    tqh_first = 0x7e00d0,
    tqh_last = 0x7fffe8000cb8
  },
  name = 0x7e00b0 "svc_",
  attr = { ...  },
  params = {
    thrd_max = 200,
    thrd_min = 7
  },
  timeout_ms = 31000,
  n_threads = 15,
  worker_index = 15
}

函數(shù)

  • work_pool_init: 初始化work pool
  • work_pool_thread: 線程處理函數(shù),個(gè)數(shù)等于svc_work_pool.n_threads
  • work_pool_spawn: 為work pool創(chuàng)建新線程
  • work_pool_submit: 將work_pool_entry插入到pool中處理
  • work_pool_shutdown

work pool的應(yīng)用場(chǎng)景

  1. 每種連接(TCP,UDP,RDMA)都對(duì)應(yīng)一個(gè)channel澎胡,每個(gè)channel在創(chuàng)建時(shí)候孕荠,都會(huì)構(gòu)造work pool entry娩鹉,其處理函數(shù)是svc_rqst_run_task,將此entry插入到work pool稚伍。
  2. channel內(nèi)部對(duì)epoll的處理函數(shù)svc_rqst_epoll_events中弯予,如只有一個(gè)event,直接調(diào)用svc_rqst_xprt_task个曙。如果有大于1的event锈嫩,將多余的event構(gòu)造相應(yīng)的entry,并扔到work pool里處理垦搬。
  3. svc_rqst_epoll_events的退出呼寸,也將導(dǎo)致svc_rqst_run_task的退出。所以在svc_rqst_epoll_events退出前猴贰,重新將svc_rqst_run_task對(duì)應(yīng)的entry插入work pool中对雪。
  4. 在一定時(shí)間內(nèi),epoll沒有接到數(shù)據(jù)米绕,將svc_rqst_expire_task對(duì)應(yīng)的entry插入work pool中瑟捣。

數(shù)據(jù)結(jié)構(gòu)

struct work_pool {
    struct poolq_head pqh; //work pool entry list
    TAILQ_HEAD(work_pool_s, work_pool_thread) wptqh;//thread list
    char *name;
    pthread_attr_t attr;
    struct work_pool_params params;
    long timeout_ms;
    uint32_t n_threads;
    uint32_t worker_index;
};

//對(duì)worker thread的封裝
struct work_pool_thread {
    struct poolq_entry pqe;     /*** 1st ***/
    TAILQ_ENTRY(work_pool_thread) wptq;
    pthread_cond_t pqcond;

    struct work_pool *pool;
    struct work_pool_entry *work;
    char worker_name[16];
    pthread_t pt;
    uint32_t worker_index;
};

struct work_pool_entry {
    struct poolq_entry pqe;     /*** 1st ***/
    struct work_pool_thread *wpt;
    work_pool_fun_t fun;
    void *arg;
};

struct poolq_entry {
    TAILQ_ENTRY(poolq_entry) q; /*** 1st ***/
    u_int qsize;            /* allocated size of q entry,
                     * 0: default size */
    uint16_t qflags;
};

struct poolq_head {
    TAILQ_HEAD(poolq_head_s, poolq_entry) qh;
    pthread_mutex_t qmutex;

    u_int qsize;            /* default size of q entries,
                     * 0: static size */
    int qcount;         /* number of entries,
                     * < 0: has waiting workers. */
};

代碼注釋

static void * work_pool_thread(void *arg)
{
    struct work_pool_thread *wpt = arg;
    struct work_pool *pool = wpt->pool;
    struct poolq_entry *have;
    struct timespec ts;
    int rc;
    bool spawn;

    pthread_cond_init(&wpt->pqcond, NULL);
    pthread_mutex_lock(&pool->pqh.qmutex); 
    TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq); //將當(dāng)前線程插入pool->wptqh

    wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);

    do {
                //如果當(dāng)前線程有事做
        if (wpt->work) {
            wpt->work->wpt = wpt;
            spawn = pool->pqh.qcount < pool->params.thrd_min
                  && pool->n_threads < pool->params.thrd_max;
            if (spawn)
                pool->n_threads++;
            pthread_mutex_unlock(&pool->pqh.qmutex);

            if (spawn) {
                //線程不夠,需要?jiǎng)?chuàng)建新線程
                (void)work_pool_spawn(pool);
            }

            wpt->work->fun(wpt->work);
            wpt->work = NULL;
            pthread_mutex_lock(&pool->pqh.qmutex);
        }

                //pool->pqh.qcount小于0說明所有線程都在干活栅干,隊(duì)列積攢了很多entry需要處理
        if (0 > pool->pqh.qcount++) {
                        //從隊(duì)列中取出entry
            have = TAILQ_FIRST(&pool->pqh.qh);
            TAILQ_REMOVE(&pool->pqh.qh, have, q);
                        //告訴當(dāng)前線程去處理這個(gè)entry
            wpt->work = (struct work_pool_entry *)have;
            continue;
        }
                //小技巧迈套,將wpt->pqe插入隊(duì)列,等同于將當(dāng)前線程插入隊(duì)列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);

        clock_gettime(CLOCK_REALTIME_FAST, &ts);
        timespec_addms(&ts, pool->timeout_ms);
                //等待CLOCK_REALTIME_FAST時(shí)間碱鳞,看是否被work_pool_submit喚醒
        rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
                        &ts);
        if (!wpt->work) {
                        //如果這期間沒有發(fā)生work_pool_submit,wpt->work就還為NULL
                        //將剛才插入隊(duì)列假的entry桑李,從隊(duì)列中刪除
            pool->pqh.qcount--;
            TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
        }
    } while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
        //如果有太多無所事事的線程在睡覺,則退出當(dāng)前線程

    pool->n_threads--;
    TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
    pthread_mutex_unlock(&pool->pqh.qmutex);

    cond_destroy(&wpt->pqcond);
    mem_free(wpt, sizeof(*wpt));

    return (NULL);
}

int work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
{
    int rc = 0;

    pthread_mutex_lock(&pool->pqh.qmutex);

        //如果有沒事做的線程在睡覺窿给,此時(shí)隊(duì)列里的元素都是睡覺的線程贵白,而非需要處理的entry
    if (0 < pool->pqh.qcount--) {
        struct work_pool_thread *wpt = (struct work_pool_thread *)
            TAILQ_FIRST(&pool->pqh.qh);

        TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
                //告訴這個(gè)線程去做這件事情 
        wpt->work = work;

                //喚醒這個(gè)線程
        pthread_cond_signal(&wpt->pqcond);
    } else {
                 //如果所有線程都在忙,就把entry插入隊(duì)列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
    }

    pthread_mutex_unlock(&pool->pqh.qmutex);
    return rc;
}
  1. 最多同時(shí)幾個(gè)線程可以同時(shí)處理epoll產(chǎn)生的數(shù)據(jù)填大, RPC_Ioq_ThrdMax

Log分析

//svc_51 working thread 在等待事件
TRACE 0213 11:23:58.916742 10680  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_51 waiting

//svc_85 正在處理0x387a9990指向的work_pool_entry
TRACE 0213 11:23:58.916872  3267  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_85 task 0x387a9990

//接收了5440字節(jié)戒洼,但還有84192個(gè)字節(jié)沒有讀出來
TRACE 0213 11:23:58.921465  8398  : xxxxxxx : <no-file>:0 :rpc :svc_vc_recv: 0x3d60cc00 fd 274 recv 5440, need 84192, flags 2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市允华,隨后出現(xiàn)的幾起案子圈浇,更是在濱河造成了極大的恐慌,老刑警劉巖靴寂,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件磷蜀,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡百炬,警方通過查閱死者的電腦和手機(jī)褐隆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來剖踊,“玉大人庶弃,你說我怎么就攤上這事衫贬。” “怎么了歇攻?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵固惯,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我缴守,道長(zhǎng)葬毫,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任屡穗,我火速辦了婚禮贴捡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘村砂。我一直安慰自己烂斋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布箍镜。 她就那樣靜靜地躺著源祈,像睡著了一般煎源。 火紅的嫁衣襯著肌膚如雪色迂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天手销,我揣著相機(jī)與錄音歇僧,去河邊找鬼。 笑死锋拖,一個(gè)胖子當(dāng)著我的面吹牛诈悍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播兽埃,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼侥钳,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了柄错?” 一聲冷哼從身側(cè)響起舷夺,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎售貌,沒想到半個(gè)月后给猾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡颂跨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年敢伸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恒削。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡池颈,死狀恐怖尾序,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情躯砰,我是刑警寧澤蹲诀,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站弃揽,受9級(jí)特大地震影響脯爪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜矿微,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一痕慢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧涌矢,春花似錦掖举、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至名秀,卻和暖如春励负,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背匕得。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工继榆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人汁掠。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓略吨,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親考阱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子翠忠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容