work pool
內(nèi)部維護(hù)一個(gè)隊(duì)列刃榨,生產(chǎn)者調(diào)用work_pool_submit
將entry插入隊(duì)列衰猛,有多個(gè)線程作為消費(fèi)者,去處理隊(duì)列中的entry无切。線程的數(shù)量會(huì)根據(jù)entry的個(gè)數(shù)自動(dòng)調(diào)節(jié)荡短。
pqh.qcount
pool->pqh.qcount
小于0時(shí)候,代表有幾個(gè)entry在隊(duì)列中等待處理哆键。大于0時(shí)候掘托,代表有幾個(gè)沒事干的線程在睡覺。
彈性的調(diào)節(jié)線程數(shù)量
當(dāng)有所有線程都在干活且隊(duì)列里還有entry沒有處理籍嘹,或者當(dāng)有很少的線程睡覺時(shí)(流出一些裕量)闪盔,創(chuàng)建更多的處理線程。當(dāng)沒事做睡覺的線程太多時(shí)辱士,退出一些線程泪掀。
work_pool_submit的處理方法
當(dāng)所有線程都干活的時(shí)候, work_pool_submit將entry插入隊(duì)列颂碘。當(dāng)有一些沒事干而睡覺的線程時(shí)异赫,喚醒其中一個(gè),讓它去處理這個(gè)entry头岔。
svc_work_pool全局變量
struct work_pool svc_work_pool
(gdb) p svc_work_pool
$23 = {
pqh = {
qh = { //work pool entry處理隊(duì)列
tqh_first = 0x7fffd80008c0, //隊(duì)列的第一個(gè)元素(每個(gè)元素都有prev和next指針)
tqh_last = 0x7fffe00008c0 //隊(duì)列的最后一個(gè)元素
},
qmutex = {... }, //mutex
qsize = 0,
qcount = 10 //此值大于0塔拳,說明有10個(gè)線程無事可做在睡覺。如果小于零峡竣,說明所有線程都在干活靠抑,隊(duì)列中有多少entry等待處理
},
wptqh = { //working threads隊(duì)列
tqh_first = 0x7e00d0,
tqh_last = 0x7fffe8000cb8
},
name = 0x7e00b0 "svc_",
attr = { ... },
params = {
thrd_max = 200,
thrd_min = 7
},
timeout_ms = 31000,
n_threads = 15,
worker_index = 15
}
函數(shù)
- work_pool_init: 初始化work pool
- work_pool_thread: 線程處理函數(shù),個(gè)數(shù)等于svc_work_pool.n_threads
- work_pool_spawn: 為work pool創(chuàng)建新線程
- work_pool_submit: 將
work_pool_entry
插入到pool中處理 - work_pool_shutdown
work pool的應(yīng)用場(chǎng)景
- 每種連接(TCP,UDP,RDMA)都對(duì)應(yīng)一個(gè)channel澎胡,每個(gè)channel在創(chuàng)建時(shí)候孕荠,都會(huì)構(gòu)造work pool entry娩鹉,其處理函數(shù)是
svc_rqst_run_task
,將此entry插入到work pool稚伍。 - channel內(nèi)部對(duì)epoll的處理函數(shù)
svc_rqst_epoll_events
中弯予,如只有一個(gè)event,直接調(diào)用svc_rqst_xprt_task个曙。如果有大于1的event锈嫩,將多余的event構(gòu)造相應(yīng)的entry,并扔到work pool里處理垦搬。 -
svc_rqst_epoll_events
的退出呼寸,也將導(dǎo)致svc_rqst_run_task
的退出。所以在svc_rqst_epoll_events
退出前猴贰,重新將svc_rqst_run_task
對(duì)應(yīng)的entry插入work pool中对雪。 - 在一定時(shí)間內(nèi),epoll沒有接到數(shù)據(jù)米绕,將
svc_rqst_expire_task
對(duì)應(yīng)的entry插入work pool中瑟捣。
數(shù)據(jù)結(jié)構(gòu)
struct work_pool {
struct poolq_head pqh; //work pool entry list
TAILQ_HEAD(work_pool_s, work_pool_thread) wptqh;//thread list
char *name;
pthread_attr_t attr;
struct work_pool_params params;
long timeout_ms;
uint32_t n_threads;
uint32_t worker_index;
};
//對(duì)worker thread的封裝
struct work_pool_thread {
struct poolq_entry pqe; /*** 1st ***/
TAILQ_ENTRY(work_pool_thread) wptq;
pthread_cond_t pqcond;
struct work_pool *pool;
struct work_pool_entry *work;
char worker_name[16];
pthread_t pt;
uint32_t worker_index;
};
struct work_pool_entry {
struct poolq_entry pqe; /*** 1st ***/
struct work_pool_thread *wpt;
work_pool_fun_t fun;
void *arg;
};
struct poolq_entry {
TAILQ_ENTRY(poolq_entry) q; /*** 1st ***/
u_int qsize; /* allocated size of q entry,
* 0: default size */
uint16_t qflags;
};
struct poolq_head {
TAILQ_HEAD(poolq_head_s, poolq_entry) qh;
pthread_mutex_t qmutex;
u_int qsize; /* default size of q entries,
* 0: static size */
int qcount; /* number of entries,
* < 0: has waiting workers. */
};
代碼注釋
static void * work_pool_thread(void *arg)
{
struct work_pool_thread *wpt = arg;
struct work_pool *pool = wpt->pool;
struct poolq_entry *have;
struct timespec ts;
int rc;
bool spawn;
pthread_cond_init(&wpt->pqcond, NULL);
pthread_mutex_lock(&pool->pqh.qmutex);
TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq); //將當(dāng)前線程插入pool->wptqh
wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);
do {
//如果當(dāng)前線程有事做
if (wpt->work) {
wpt->work->wpt = wpt;
spawn = pool->pqh.qcount < pool->params.thrd_min
&& pool->n_threads < pool->params.thrd_max;
if (spawn)
pool->n_threads++;
pthread_mutex_unlock(&pool->pqh.qmutex);
if (spawn) {
//線程不夠,需要?jiǎng)?chuàng)建新線程
(void)work_pool_spawn(pool);
}
wpt->work->fun(wpt->work);
wpt->work = NULL;
pthread_mutex_lock(&pool->pqh.qmutex);
}
//pool->pqh.qcount小于0說明所有線程都在干活栅干,隊(duì)列積攢了很多entry需要處理
if (0 > pool->pqh.qcount++) {
//從隊(duì)列中取出entry
have = TAILQ_FIRST(&pool->pqh.qh);
TAILQ_REMOVE(&pool->pqh.qh, have, q);
//告訴當(dāng)前線程去處理這個(gè)entry
wpt->work = (struct work_pool_entry *)have;
continue;
}
//小技巧迈套,將wpt->pqe插入隊(duì)列,等同于將當(dāng)前線程插入隊(duì)列尾部
TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);
clock_gettime(CLOCK_REALTIME_FAST, &ts);
timespec_addms(&ts, pool->timeout_ms);
//等待CLOCK_REALTIME_FAST時(shí)間碱鳞,看是否被work_pool_submit喚醒
rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
&ts);
if (!wpt->work) {
//如果這期間沒有發(fā)生work_pool_submit,wpt->work就還為NULL
//將剛才插入隊(duì)列假的entry桑李,從隊(duì)列中刪除
pool->pqh.qcount--;
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
}
} while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
//如果有太多無所事事的線程在睡覺,則退出當(dāng)前線程
pool->n_threads--;
TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
pthread_mutex_unlock(&pool->pqh.qmutex);
cond_destroy(&wpt->pqcond);
mem_free(wpt, sizeof(*wpt));
return (NULL);
}
int work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
{
int rc = 0;
pthread_mutex_lock(&pool->pqh.qmutex);
//如果有沒事做的線程在睡覺窿给,此時(shí)隊(duì)列里的元素都是睡覺的線程贵白,而非需要處理的entry
if (0 < pool->pqh.qcount--) {
struct work_pool_thread *wpt = (struct work_pool_thread *)
TAILQ_FIRST(&pool->pqh.qh);
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
//告訴這個(gè)線程去做這件事情
wpt->work = work;
//喚醒這個(gè)線程
pthread_cond_signal(&wpt->pqcond);
} else {
//如果所有線程都在忙,就把entry插入隊(duì)列尾部
TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
}
pthread_mutex_unlock(&pool->pqh.qmutex);
return rc;
}
- 最多同時(shí)幾個(gè)線程可以同時(shí)處理epoll產(chǎn)生的數(shù)據(jù)填大,
RPC_Ioq_ThrdMax
Log分析
//svc_51 working thread 在等待事件
TRACE 0213 11:23:58.916742 10680 : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_51 waiting
//svc_85 正在處理0x387a9990指向的work_pool_entry
TRACE 0213 11:23:58.916872 3267 : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_85 task 0x387a9990
//接收了5440字節(jié)戒洼,但還有84192個(gè)字節(jié)沒有讀出來
TRACE 0213 11:23:58.921465 8398 : xxxxxxx : <no-file>:0 :rpc :svc_vc_recv: 0x3d60cc00 fd 274 recv 5440, need 84192, flags 2