[C++]C風(fēng)格届氢、C++風(fēng)格和C++11特性的線程池

線程池概念

假設(shè)完成一項(xiàng)任務(wù)需要的時(shí)間=創(chuàng)建線程時(shí)間T1+線程執(zhí)行任務(wù)時(shí)間T2+銷(xiāo)毀線程時(shí)間T3乖菱,如果T1+T3的時(shí)間遠(yuǎn)大于T2助币,通常就可以考慮采取線程池來(lái)提高服務(wù)器的性能

thread pool就是線程的一種使用模式静陈,一個(gè)線程池中維護(hù)著多個(gè)線程等待接收管理者分配的可并發(fā)執(zhí)行的任務(wù)燕雁。

  • 避免了處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷(xiāo)毀線程的代價(jià)

  • 既保證內(nèi)核的充分利用,又能防止過(guò)度調(diào)度

  • 可用線程數(shù)量應(yīng)該取決于可用的并發(fā)處理器鲸拥、處理器內(nèi)核拐格、內(nèi)存、網(wǎng)絡(luò)sockets的數(shù)量

線程池組成部分

  • 線程池管理器(thread pool):創(chuàng)建刑赶、銷(xiāo)毀線程池

  • 工作線程(pool wroker):在沒(méi)有任務(wù)時(shí)處于等待狀態(tài)捏浊,循環(huán)讀取并執(zhí)行任務(wù)隊(duì)列中的任務(wù)

  • 任務(wù)(task):抽象一個(gè)任務(wù),主要規(guī)定任務(wù)的入口撞叨、任務(wù)執(zhí)行完后的收尾工作金踪、任務(wù)的執(zhí)行狀態(tài)等

  • 任務(wù)隊(duì)列(task queue):存放沒(méi)有處理的任務(wù)浊洞,提供一種緩沖機(jī)制

C風(fēng)格ThreadPool

1. 抽象一個(gè)任務(wù)

將待處理的任務(wù)抽象成task結(jié)構(gòu):

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n26" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;</pre>

2. 任務(wù)隊(duì)列

  • threadpool中用firstlast指針指向首尾兩個(gè)任務(wù)

  • task結(jié)構(gòu)體保證每個(gè)task都能指向任務(wù)隊(duì)列中下一個(gè)task

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n33" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;

typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;
</pre>

3. 線程安全的問(wèn)題

設(shè)計(jì)了condition_t類(lèi)來(lái)實(shí)現(xiàn)安全并發(fā):

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n36" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct condition {
/**
* 互斥鎖
/
pthread_mutex_t pmutex;
/
*
* 條件變量
*/
pthread_cond_t pcond;
} condition_t;</pre>

提供對(duì)應(yīng)的接口:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n38" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">/**

  • 初始化
    /
    int condition_init(condition_t
    cond);

/**

  • 加鎖
    /
    int condition_lock(condition_t
    cond);
    /**
  • 解鎖
    /
    int condition_unlock(condition_t
    cond);

/**

  • 條件等待
  • pthread_cond_wait(cond, mutex)的功能有3個(gè):
    1. 調(diào)用者線程首先釋放mutex
    1. 然后阻塞, 等待被別的線程喚醒
    1. 當(dāng)調(diào)用者線程被喚醒后,調(diào)用者線程會(huì)再次獲取mutex
      /
      int condition_wait(condition_t
      cond);

/**

  • 計(jì)時(shí)等待
    /
    int condition_timedwait(condition_t
    cond, const timespec* abstime);

/**

  • 激活一個(gè)等待該條件的線程
    1. 作用: 發(fā)送一個(gè)信號(hào)給另外一個(gè)處于阻塞等待狀態(tài)的線程, 使其脫離阻塞狀態(tài)繼續(xù)執(zhí)行
    1. 如果沒(méi)有線程處在阻塞狀態(tài), 那么pthread_cond_signal也會(huì)成功返回, 所以需要判斷下idle thread的數(shù)量
    1. 最多只會(huì)給一個(gè)線程發(fā)信號(hào)胡岔,不會(huì)有「驚群現(xiàn)象」
    1. 首先根據(jù)線程優(yōu)先級(jí)的高低確定發(fā)送給哪個(gè)線程信號(hào), 如果優(yōu)先級(jí)相同則優(yōu)先發(fā)給等待最久的線程
    1. 重點(diǎn): pthread_cond_wait必須放在lock和unlock之間, 因?yàn)樗鶕?jù)共享變量的狀態(tài)決定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之間法希,也可以放在lock和unlock之后
      /
      int condition_signal(condition_t cond);
      /
  • 喚醒所有等待線程
    */
    int condition_broadcast(condition_t *cond);

/**

  • 銷(xiāo)毀
    */
    int condition_destroy(condition_t *cond);</pre>

4. 線程池的實(shí)現(xiàn)

4.1 初始化一個(gè)線程池

僅僅是初始化了conditionmutex,還有一些線程池的屬性靶瘸。但是任務(wù)隊(duì)列是空的苫亦,而且此時(shí)也一個(gè)線程都沒(méi)有

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n42" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}</pre>

4.2 向線程池中添加任務(wù)奕锌,并分配給它一個(gè)線程

首先構(gòu)建task結(jié)構(gòu)體著觉,然后將其加入任務(wù)隊(duì)列。

  • 如果當(dāng)前有空閑線程那么直接調(diào)用空閑線程執(zhí)行函數(shù)

  • 如果無(wú)空閑線程且當(dāng)前線程數(shù)未滿時(shí)創(chuàng)建一個(gè)新的線程執(zhí)行任務(wù)

  • 如果無(wú)空閑線程且當(dāng)前線程數(shù)已滿時(shí)惊暴,任務(wù)會(huì)呆在任務(wù)隊(duì)列中等待線程池釋放出空閑線程

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n52" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (run)(void arg), void arg) {
// create a task
task_t
new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;

// lock the condition
condition_lock(&pool->ready);

// add the task to task queue
if (pool->first == NULL) {
    pool->first = new_task;
} else {  // else add to the last task
    pool->last->next = new_task;
}
pool->last = new_task;

/*
 * after you add a task to task queue, you need to allocate it to a thread:
 * (1)if idle thread num > 0: awake a idle thread
 * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
 */
if (pool->idle > 0) {
    // awake a thread that wait for longest time
    condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
    // define a tid to get the thread identifier that we are going to create
    pthread_t tid;
    /*
     * pthread_create():
     * (1)thread identifier
     * (2)set the pthread attribute
     * (3)the function that thread is going to run
     * (4)the args of run func
     *
     *  A realistic limit of thread num is 200 to 400 threads
     *  https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
     */
    pthread_create(&tid, NULL, thread_routine, pool);
    pool->counter++;
} else {  // when (idle == 0 & counter = max_threads), then wait
    printf("Warning: no idle thread, please wait...\n");
}

condition_unlock(&pool->ready);

}</pre>

5. 線程的執(zhí)行過(guò)程

5.1 如果任務(wù)隊(duì)列為空

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n55" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when task queue is empty, then block 2 second to get the new task
// If timeout, then destroy the thread
while (pool->first == NULL && !pool->quit) {
printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
// get the system time
clock_gettime(CLOCK_REALTIME, &abs_name);
abs_name.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second
if (status == ETIMEDOUT) {
printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
timeout = true;
break;
}
}

...

// if visit task queue timeout(means no task in queue), quit destory the thread
if (timeout) {
pool->counter--;
condition_unlock(&pool->ready);
break; // destroy the thread
}</pre>

5.2 如果任務(wù)隊(duì)列非空

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n57" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when the thread run the task, we should unlock the thread pool
if (pool->first != NULL) {
// get the task from task queue
task_t* t = pool->first;
pool->first = t->next;
// unlock the thread pool to make other threads visit task queue
condition_unlock(&pool->ready);

// run the task run func
t->run(t->arg);
free(t);

// lock
condition_lock(&pool->ready);

}</pre>

5.3 沒(méi)有任務(wù)且收到退出信號(hào)

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n59" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when task queue is clean and quit flag is 1, then destroy the thread
if (pool->quit && pool->first == NULL) {
pool->counter--;
// 若線程池中線程數(shù)為0饼丘,通知等待線程(主線程)全部任務(wù)已經(jīng)完成
if (pool->counter == 0) {
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break; // destroy the thread
}</pre>

6. 代碼

condition.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n62" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef CONDITION_H_

define CONDITION_H_

include <pthread.h>

include <cstdio>

typedef struct condition {
/**
* 互斥鎖
/
pthread_mutex_t pmutex;
/
*
* 條件變量
*/
pthread_cond_t pcond;
} condition_t;

/**

  • 初始化
    /
    int condition_init(condition_t
    cond);

/**

  • 加鎖
    /
    int condition_lock(condition_t
    cond);
    /**
  • 解鎖
    /
    int condition_unlock(condition_t
    cond);

/**

  • 條件等待
  • pthread_cond_wait(cond, mutex)的功能有3個(gè):
    1. 調(diào)用者線程首先釋放mutex
    1. 然后阻塞, 等待被別的線程喚醒
    1. 當(dāng)調(diào)用者線程被喚醒后,調(diào)用者線程會(huì)再次獲取mutex
      /
      int condition_wait(condition_t
      cond);

/**

  • 計(jì)時(shí)等待
    /
    int condition_timedwait(condition_t
    cond, const timespec* abstime);

/**

  • 激活一個(gè)等待該條件的線程
    1. 作用: 發(fā)送一個(gè)信號(hào)給另外一個(gè)處于阻塞等待狀態(tài)的線程, 使其脫離阻塞狀態(tài)繼續(xù)執(zhí)行
    1. 如果沒(méi)有線程處在阻塞狀態(tài), 那么pthread_cond_signal也會(huì)成功返回, 所以需要判斷下idle thread的數(shù)量
    1. 最多只會(huì)給一個(gè)線程發(fā)信號(hào)辽话,不會(huì)有「驚群現(xiàn)象」
    1. 首先根據(jù)線程優(yōu)先級(jí)的高低確定發(fā)送給哪個(gè)線程信號(hào), 如果優(yōu)先級(jí)相同則優(yōu)先發(fā)給等待最久的線程
    1. 重點(diǎn): pthread_cond_wait必須放在lock和unlock之間, 因?yàn)樗鶕?jù)共享變量的狀態(tài)決定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之間肄鸽,也可以放在lock和unlock之后
      /
      int condition_signal(condition_t cond);
      /
  • 喚醒所有等待線程
    */
    int condition_broadcast(condition_t *cond);

/**

  • 銷(xiāo)毀
    */
    int condition_destroy(condition_t *cond);

endif // CONDITION_H_</pre>

condition.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n64" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include "condition.h"

// 初始化
int condition_init(condition_t* cond) {
int status;
status = pthread_mutex_init(&cond->pmutex, NULL);
if (status != 0) {
printf("Error: pthread_mutex_init failed, return value:%d\n", status);
return status;
}
status = pthread_cond_init(&cond->pcond, NULL);
if (status != 0) {
printf("Error: pthread_cond_init failed, return value:%d\n", status);
return status;
}
return 0;
}

// 加鎖
int condition_lock(condition_t* cond) {
return pthread_mutex_lock(&cond->pmutex);
}

// 解鎖
int condition_unlock(condition_t* cond) {
return pthread_mutex_unlock(&cond->pmutex);
}

// 條件等待
int condition_wait(condition_t* cond) {
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

// 計(jì)時(shí)等待
int condition_timedwait(condition_t* cond, const timespec* abstime) {
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

// 激活一個(gè)等待該條件的線程
int condition_signal(condition_t *cond) {
return pthread_cond_signal(&cond->pcond);
}

// 喚醒所有等待線程
int condition_broadcast(condition_t *cond) {
return pthread_cond_broadcast(&cond->pcond);
}

// 銷(xiāo)毀
int condition_destroy(condition_t *cond) {
int status;
status = pthread_mutex_destroy(&cond->pmutex);
if (status != 0) {
return status;
}

status = pthread_cond_destroy(&cond->pcond);
if (status != 0) {
    return status;
}
return 0;

}</pre>

threadpool.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n66" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POLL_H_

define THREAD_POLL_H_

include "condition.h"

typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;

typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;

/**

  • initialize threadpool
    /
    void threadpool_init(threadpool_t
    pool, int threads_num);

/**

  • add a task to threadpool
    /
    void threadpool_add_task(threadpool_t
    pool, void* (*run)(void args), void arg);

/**

  • destroy threadpool
    /
    void threadpool_destroy(threadpool_t
    pool);

endif // THREAD_POLL_H_</pre>

Threadpool.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n68" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">

include <pthread.h>

include <cstdlib>

include <cstdio>

include <cerrno>

include <ctime>

include "threadpool.h"

void *thread_routine(void *arg);

// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}

// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (run)(void arg), void arg) {
// create a task
task_t
new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;

// lock the condition
condition_lock(&pool->ready);

// add the task to task queue
if (pool->first == NULL) {
    pool->first = new_task;
} else {  // else add to the last task
    pool->last->next = new_task;
}
pool->last = new_task;

/*
 * after you add a task to task queue, you need to allocate it to a thread:
 * (1)if idle thread num > 0: awake a idle thread
 * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
 */
if (pool->idle > 0) {
    // awake a thread that wait for longest time
    condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
    // define a tid to get the thread identifier that we are going to create
    pthread_t tid;
    /*
     * pthread_create():
     * (1)thread identifier
     * (2)set the pthread attribute
     * (3)the function that thread is going to run
     * (4)the args of run func
     *
     *  A realistic limit of thread num is 200 to 400 threads
     *  https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
     */
    pthread_create(&tid, NULL, thread_routine, pool);
    pool->counter++;
} else {  // when (idle == 0 & counter = max_threads), then wait
    printf("Warning: no idle thread, please wait...\n");
}

condition_unlock(&pool->ready);

}

// create a thread to run the task run func
// and the void *arg means the arg passed by pthread_create: pool
void *thread_routine(void *arg) {
struct timespec abs_name;
bool timeout;
printf("Info: create thread, and the thread id is: %ld\n", (u_int64_t)pthread_self());
threadpool_t *pool = reinterpret_cast<threadpool_t *>(arg);

// keep visiting the task queue
while (true) {
    timeout = false;
    condition_lock(&pool->ready);
    pool->idle++;

    // when task queue is empty, then block 2 second to get the new task
    // If timeout, then destroy the thread
    while (pool->first == NULL && !pool->quit) {
        printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
        // get the system time
        clock_gettime(CLOCK_REALTIME, &abs_name);
        abs_name.tv_sec += 2;
        int status;
        status = condition_timedwait(&pool->ready, &abs_name);  // block for 2 second
        if (status == ETIMEDOUT) {
            printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
            timeout = true;
            break;
        }
    }

    pool->idle--;
    // when the thread run the task, we should unlock the thread pool
    if (pool->first != NULL) {
        // get the task from task queue
        task_t* t = pool->first;
        pool->first = t->next;
        // unlock the thread pool to make other threads visit task queue
        condition_unlock(&pool->ready);

        // run the task run func
        t->run(t->arg);
        free(t);

        // lock
        condition_lock(&pool->ready);
    }

    // when task queue is clean and quit flag is 1, then destroy the thread
    if (pool->quit && pool->first == NULL) {
        pool->counter--;
        // 若線程池中線程數(shù)為0,通知等待線程(主線程)全部任務(wù)已經(jīng)完成
        if (pool->counter == 0) {
            condition_signal(&pool->ready);
        }
        condition_unlock(&pool->ready);
        break;  // destroy the thread
    }

    // if visit task queue timeout(means no task in queue), quit destory the thread
    if (timeout) {
        pool->counter--;
        condition_unlock(&pool->ready);
        break;  // destroy the thread
    }

    condition_unlock(&pool->ready);
}

// if break, destroy the thread
printf("Info: thread %ld quit\n", (u_int64_t)pthread_self());
return NULL;

}

/*

  • destroy a thread pool:

    1. awake all the idle thread
    1. wait for the running thread to finish
      */
      void threadpool_destroy(threadpool_t *pool) {
      if (pool->quit) {
      return;
      }

    condition_lock(&pool->ready);
    pool->quit = 1;
    if (pool->counter > 0) {
    if (pool->idle > 0) {
    condition_broadcast(&pool->ready);
    }
    while (pool->counter > 0) {
    condition_wait(&pool->ready);
    }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
    }</pre>

test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n70" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <stdlib.h>

include <stdio.h>

include "threadpool.h"

define THREADPOOL_MAX_NUM 30

void* mytask(void *arg) {
printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), reinterpret_cast<int>(arg));
sleep(1);
free(arg);
return NULL;
}

int main(int argc, char* argv[]) {
threadpool_t pool;
threadpool_init(&pool, THREADPOOL_MAX_NUM);

// add task to task queue
for (int i=0; i < 100; i++) {
    int *arg = reinterpret_cast<int *>(malloc(sizeof(int)));
    *arg = i;
    threadpool_add_task(&pool, mytask, arg);
}
threadpool_destroy(&pool);
return 0;

}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n72" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp threadpool.cpp condition.cpp -o test -std=c++11 -lpthread./test
Info: initialize the thread pool successfully!
Info: create thread, and the thread id is: 139898193295104
Info: create thread, and the thread id is: 139898176509696
Info: thread 139898176509696 is working on task 0
Info: create thread, and the thread id is: 139898168116992
Info: create thread, and the thread id is: 139898184902400
Info: create thread, and the thread id is: 139898134546176
Info: create thread, and the thread id is: 139898126153472
Info: create thread, and the thread id is: 139898117760768
Info: thread 139898117760768 is working on task 1
Info: create thread, and the thread id is: 139898100975360
Info: create thread, and the thread id is: 139898092582656
Info: create thread, and the thread id is: 139898084189952
Info: create thread, and the thread id is: 139898159724288
Info: create thread, and the thread id is: 139898109368064
Info: create thread, and the thread id is: 139898067404544
Info: create thread, and the thread id is: 139898059011840
Info: create thread, and the thread id is: 139898050619136
Info: create thread, and the thread id is: 139898042226432
Info: create thread, and the thread id is: 139898033833728
Info: create thread, and the thread id is: 139898025441024
Info: create thread, and the thread id is: 139898017048320
Info: create thread, and the thread id is: 139898008655616
Info: create thread, and the thread id is: 139898075797248
Info: create thread, and the thread id is: 139898000262912
Info: create thread, and the thread id is: 139898142938880
Info: create thread, and the thread id is: 139898151331584
Info: thread 139898159724288 is working on task 2
Info: thread 139898151331584 is working on task 3
Info: create thread, and the thread id is: 139897991870208
Info: create thread, and the thread id is: 139897966692096
Info: create thread, and the thread id is: 139897958299392
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: create thread, and the thread id is: 139897949906688
Info: create thread, and the thread id is: 139897983477504
Info: create thread, and the thread id is: 139897975084800
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898067404544 is working on task 4
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898168116992 is working on task 5
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898142938880 is working on task 6
Warning: no idle thread, please wait...
Info: thread 139898042226432 is working on task 7
Warning: no idle thread, please wait...
Info: thread 139897949906688 is working on task 8
Info: thread 139898184902400 is working on task 11
Info: thread 139898134546176 is working on task 13
Info: thread 139898017048320 is working on task 14
Info: thread 139898008655616 is working on task 16
Info: thread 139898193295104 is working on task 18
Info: thread 139898000262912 is working on task 20
Info: thread 139898100975360 is working on task 21
Info: thread 139897983477504 is working on task 9
Info: thread 139897975084800 is working on task 10
Info: thread 139898092582656 is working on task 26
Info: thread 139898050619136 is working on task 24
Info: thread 139897991870208 is working on task 28
Info: thread 139898025441024 is working on task 12
Info: thread 139898084189952 is working on task 15
Info: thread 139898109368064 is working on task 17
Info: thread 139897966692096 is working on task 25
Info: thread 139898075797248 is working on task 19
Info: thread 139898059011840 is working on task 22
Info: thread 139897958299392 is working on task 27
Info: thread 139898033833728 is working on task 29
Info: thread 139898126153472 is working on task 23
Info: thread 139898176509696 is working on task 30
Info: thread 139898117760768 is working on task 31
Info: thread 139898159724288 is working on task 32
Info: thread 139898151331584 is working on task 33
Info: thread 139898067404544 is working on task 34
Info: thread 139898168116992 is working on task 35
Info: thread 139898142938880 is working on task 36
Info: thread 139898042226432 is working on task 37
Info: thread 139897949906688 is working on task 38
Info: thread 139898184902400 is working on task 39
Info: thread 139898000262912 is working on task 40
Info: thread 139898017048320 is working on task 41
Info: thread 139898008655616 is working on task 42
Info: thread 139898134546176 is working on task 43
Info: thread 139898193295104 is working on task 44
Info: thread 139898050619136 is working on task 49
Info: thread 139897991870208 is working on task 50
Info: thread 139898025441024 is working on task 51
Info: thread 139898084189952 is working on task 52
Info: thread 139898109368064 is working on task 53
Info: thread 139898075797248 is working on task 54
Info: thread 139897975084800 is working on task 48
Info: thread 139898100975360 is working on task 45
Info: thread 139898033833728 is working on task 57
Info: thread 139897983477504 is working on task 47
Info: thread 139897966692096 is working on task 55
Info: thread 139897958299392 is working on task 56
Info: thread 139898092582656 is working on task 46
Info: thread 139898126153472 is working on task 58
Info: thread 139898059011840 is working on task 59
Info: thread 139898176509696 is working on task 60
Info: thread 139898117760768 is working on task 61
Info: thread 139898159724288 is working on task 62
Info: thread 139898151331584 is working on task 63
Info: thread 139898067404544 is working on task 64
Info: thread 139898168116992 is working on task 65
Info: thread 139898142938880 is working on task 66
Info: thread 139898042226432 is working on task 67
Info: thread 139897949906688 is working on task 69
Info: thread 139898184902400 is working on task 68
Info: thread 139898000262912 is working on task 70
Info: thread 139898008655616 is working on task 71
Info: thread 139898017048320 is working on task 72
Info: thread 139898050619136 is working on task 73
Info: thread 139898134546176 is working on task 74
Info: thread 139898109368064 is working on task 78
Info: thread 139898100975360 is working on task 80
Info: thread 139897975084800 is working on task 82
Info: thread 139898075797248 is working on task 81
Info: thread 139897966692096 is working on task 85
Info: thread 139897958299392 is working on task 86
Info: thread 139898126153472 is working on task 88
Info: thread 139898059011840 is working on task 89
Info: thread 139898092582656 is working on task 87
Info: thread 139898025441024 is working on task 76
Info: thread 139898084189952 is working on task 77
Info: thread 139898193295104 is working on task 75
Info: thread 139897991870208 is working on task 79
Info: thread 139898033833728 is working on task 83
Info: thread 139897983477504 is working on task 84
Info: thread 139898176509696 is working on task 90
Info: thread 139898117760768 is working on task 91
Info: thread 139898159724288 is working on task 92
Info: thread 139898151331584 is working on task 93
Info: thread 139898067404544 is working on task 94
Info: thread 139898168116992 is working on task 95
Info: thread 139898142938880 is working on task 96
Info: thread 139898042226432 is working on task 97
Info: thread 139897949906688 is working on task 98
Info: thread 139898184902400 is working on task 99
Info: thread 139898000262912 quit
Info: thread 139898008655616 quit
Info: thread 139898017048320 quit
Info: thread 139898050619136 quit
Info: thread 139898134546176 quit
Info: thread 139898109368064 quit
Info: thread 139898100975360 quit
Info: thread 139897966692096 quit
Info: thread 139898126153472 quit
Info: thread 139897975084800 quit
Info: thread 139898075797248 quit
Info: thread 139897958299392 quit
Info: thread 139898059011840 quit
Info: thread 139898092582656 quit
Info: thread 139898025441024 quit
Info: thread 139898084189952 quit
Info: thread 139898193295104 quit
Info: thread 139897991870208 quit
Info: thread 139898033833728 quit
Info: thread 139897983477504 quit
Info: thread 139898176509696 quit
Info: thread 139898117760768 quit
Info: thread 139898159724288 quit
Info: thread 139898151331584 quit
Info: thread 139898067404544 quit
Info: thread 139898168116992 quit
Info: thread 139898142938880 quit
Info: thread 139898042226432 quit
Info: thread 139897949906688 quit
Info: thread 139898184902400 quit</pre>

C++03特性的ThreadPool

1. 基于條件變量的線程池

threadpool.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n76" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POOL_H

define THREAD_POOL_H

include <stdio.h>

include <pthread.h>

include <functional>

include <vector>

include <queue>

class ThreadPool {
public:
typedef void (WrokerFunc)(void arg);

struct Task {
    WrokerFunc* run;
    void* arg;
};

explicit ThreadPool(int thread_num);
~ThreadPool();
void addTask(WrokerFunc* func, void* arg);

private:
std::queue<Task*> task_queue_;
std::vector<pthread_t> thread_list_;
bool is_running_; // note: is_running_不用原子變量或者鎖操作可能存在卡死問(wèn)題
pthread_mutex_t mutex_;
pthread_cond_t condition_;

static void* thread_routine(void* pool_ptr);
void thread_worker();

};

// =========================implementation=========================
inline ThreadPool::ThreadPool(int thread_num) : is_running_(true) {
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&condition_, NULL);

for (int i = 0; i < thread_num; i++) {
    pthread_t pid;
    pthread_create(&pid, NULL, thread_routine, this);
    thread_list_.push_back(pid);
}

}

inline ThreadPool::~ThreadPool() {
pthread_mutex_lock(&mutex_);
is_running_ = false;
pthread_mutex_unlock(&mutex_);

pthread_cond_broadcast(&condition_);  // wakeup all threads that block to get task
for (int i = 0; i < thread_list_.size(); i++) {
    pthread_join(thread_list_[i], NULL);
}

pthread_cond_destroy(&condition_);
pthread_mutex_destroy(&mutex_);

}

inline void ThreadPool::addTask(WrokerFunc* func, void* arg) {
Task* task = new Task();
task->run = func;
task->arg = arg;

pthread_mutex_lock(&mutex_);
task_queue_.push(task);
pthread_mutex_unlock(&mutex_);
pthread_cond_signal(&condition_);

}

inline void* ThreadPool::thread_routine(void* pool_ptr) {
ThreadPool* pool = static_cast<ThreadPool*>(pool_ptr);
pool->thread_worker();
}

inline void ThreadPool::thread_worker() {
Task* task = NULL;

while (true) {
    pthread_mutex_lock(&mutex_);
    if (!is_running_) {
        pthread_mutex_unlock(&mutex_);
        break;
    }

    if (task_queue_.empty()) {
        pthread_cond_wait(&condition_, &mutex_);  // 獲取不到任務(wù)時(shí)阻塞, 直到有新的任務(wù)入隊(duì)
        if (task_queue_.empty()) {
            pthread_mutex_unlock(&mutex_);
            continue;
        }
    }
    task = task_queue_.front();
    task_queue_.pop();
    pthread_mutex_unlock(&mutex_);

    (*(task->run))(task->arg);
    delete task;
    task = NULL;
}

// 線程池終止時(shí)(is_running_ = false)確保任務(wù)隊(duì)列為空后退出
while (true) {
    pthread_mutex_lock(&mutex_);
    if (task_queue_.empty()) {
        pthread_mutex_unlock(&mutex_);
        break;
    }
    task = task_queue_.front();
    task_queue_.pop();
    pthread_mutex_unlock(&mutex_);
    delete task;
    task = NULL;
}

printf("Info: thread[%lu] exit\n", pthread_self());

}

endif // THREAD_POOL_H</pre>

測(cè)試代碼threadpool_test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n78" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <stdio.h>

include <unistd.h>

include <stdlib.h>

include <pthread.h>

include <vector>

include "threadpool.h"

void* MyTaskFunc(void* arg) {
int* i = static_cast<int*>(arg);
printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), *i);
return NULL;
}

int main() {
ThreadPool pool(10);

for (int i = 0; i < 100; i++) {
    int* arg = new int(i);
    pool.addTask(&MyTaskFunc, arg);
}

return 0;

}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n80" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g threadpool_test.cpp -o threadpool_test -lpthread./threadpool_test
[MyTaskFunc]: thread[140224777099008] is working on 0
[MyTaskFunc]: thread[140224793884416] is working on 8
[MyTaskFunc]: thread[140224844240640] is working on 2
Info: thread[140224844240640] exit
[MyTaskFunc]: thread[140224827455232] is working on 4
Info: thread[140224827455232] exit
[MyTaskFunc]: thread[140224810669824] is working on 6
Info: thread[140224810669824] exit
[MyTaskFunc]: thread[140224777099008] is working on 10
Info: thread[140224777099008] exit
[MyTaskFunc]: thread[140224852633344] is working on 1
Info: thread[140224852633344] exit
[MyTaskFunc]: thread[140224835847936] is working on 3
Info: thread[140224835847936] exit
[MyTaskFunc]: thread[140224802277120] is working on 7
Info: thread[140224802277120] exit
Info: thread[140224793884416] exit
[MyTaskFunc]: thread[140224819062528] is working on 5
Info: thread[140224819062528] exit
[MyTaskFunc]: thread[140224785491712] is working on 9
Info: thread[140224785491712] exit</pre>

2. 基于信號(hào)量的同步任務(wù)隊(duì)列

上述的線程池?zé)o法很好地支持同步任務(wù),因此我們基于信號(hào)量實(shí)現(xiàn)了SyncTaskQueue粤蝎。

sync_task_queue.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n84" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef SYNC_TASK_QUEUE_H

define SYNC_TASK_QUEUE_H

include <semaphore.h>

include <vector>

include "threadpool.h"

class SyncTaskQueue {
public:
struct SyncTask {
ThreadPool::Task task;
sem_t* sem;
};

explicit SyncTaskQueue(ThreadPool* pool_ptr);
~SyncTaskQueue();
void addTask(ThreadPool::WrokerFunc* func, void* sync_task_ptr);
void wait();

private:
ThreadPool* threadpool_;
sem_t sem_;
int sync_task_num_; // 務(wù)必保證單線程讀寫(xiě), 否則需要加鎖

static ThreadPool::WrokerFunc workerFuncWrapper;

};

inline SyncTaskQueue::SyncTaskQueue(ThreadPool* pool_ptr) : threadpool_(pool_ptr), sync_task_num_(0) {
sem_init(&sem_, 0, 0);
}

inline SyncTaskQueue::~SyncTaskQueue() {
// make sure that all task has been finished before destory
if (sync_task_num_ > 0) {
wait();
}
sem_destroy(&sem_);
}

inline void SyncTaskQueue::addTask(ThreadPool::WrokerFunc* func, void* arg) {
sync_task_num_++;

// wrapper the worker function with sem
SyncTask* sync_task = new SyncTask();
sync_task->sem = &(this->sem_);
sync_task->task.run = func;
sync_task->task.arg = arg;
threadpool_->addTask(&workerFuncWrapper, sync_task);

}

inline void SyncTaskQueue::wait() {
while (sync_task_num_) {
int sem_value = 0;
sem_wait(&sem_);
sync_task_num_--;
}
}

inline void* SyncTaskQueue::workerFuncWrapper(void* sync_task_ptr) {
SyncTask* sync_task = static_cast<SyncTask>(sync_task_ptr);
(
(sync_task->task.run))(sync_task->task.arg);
sem_post(sync_task->sem);
delete sync_task;
}

endif // SYNC_TASK_QUEUE_H</pre>

測(cè)試文件sync_task_queue_test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n86" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "sync_task_queue.h"

void* MyTaskFunc(void* arg) {
int i = static_cast<int>(arg);
printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), i);
sleep(2);
return NULL;
}

int main() {
ThreadPool threadpool(20);
SyncTaskQueue sync_task_queue(&threadpool);

for (int i = 0; i < 15; i++) {
    int* arg = new int(i);
    sync_task_queue.addTask(&MyTaskFunc, arg);
}

printf("====================================wait for result===================================\n");
sync_task_queue.wait();

}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n88" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g sync_task_queue_test.cpp -o sync_task_queue_test -lpthread./sync_task_queue_test
[MyTaskFunc]: thread[140349199148800] is working on 0
[MyTaskFunc]: thread[140349266290432] is working on 12
[MyTaskFunc]: thread[140349358610176] is working on 2
[MyTaskFunc]: thread[140349341824768] is working on 3
[MyTaskFunc]: thread[140349333432064] is working on 4
[MyTaskFunc]: thread[140349325039360] is working on 5
[MyTaskFunc]: thread[140349308253952] is working on 6
[MyTaskFunc]: thread[140349299861248] is working on 8
[MyTaskFunc]: thread[140349316646656] is working on 7
[MyTaskFunc]: thread[140349283075840] is working on 9
[MyTaskFunc]: thread[140349291468544] is working on 10
[MyTaskFunc]: thread[140349274683136] is working on 11
[MyTaskFunc]: thread[140349257897728] is working on 13
[MyTaskFunc]: thread[140349249505024] is working on 14
[MyTaskFunc]: thread[140349350217472] is working on 1
====================================wait for result===================================
Info: thread[140349232719616] exit
Info: thread[140349241112320] exit
Info: thread[140349266290432] exit
Info: thread[140349207541504] exit
Info: thread[140349299861248] exit
Info: thread[140349283075840] exit
Info: thread[140349291468544] exit
Info: thread[140349249505024] exit
Info: thread[140349274683136] exit
Info: thread[140349333432064] exit
Info: thread[140349325039360] exit
Info: thread[140349308253952] exit
Info: thread[140349224326912] exit
Info: thread[140349316646656] exit
Info: thread[140349199148800] exit
Info: thread[140349350217472] exit
Info: thread[140349257897728] exit
Info: thread[140349215934208] exit
Info: thread[140349358610176] exit
Info: thread[140349341824768] exit</pre>

C++11特性的ThreadPool

傳統(tǒng)C++線程池僅能接受特殊的Task(執(zhí)行函數(shù)需要滿足特殊的格式)外邓,使用C++11特性的線程池可以更好地支持任意類(lèi)型參數(shù)的Task。

1. 代碼

threadpool.h:

參考自:https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

另外一種實(shí)現(xiàn):https://github.com/mtrebi/thread-pool/blob/master/include/ThreadPool.h

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n96" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POOL_H

define THREAD_POOL_H

include <vector>

include <queue>

include <memory>

include <thread>

include <mutex>

include <condition_variable>

include <future>

include <utility>

include <functional>

include <stdexcept>

class ThreadPool {
public:
explicit ThreadPool(size_t);
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();

private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers_;
// the task queue
std::queue<std::function<void()>> tasks_;

// synchronization
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;

};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop_(false) {
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back(
[this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock,
[this] { return this->stop_ || !this->tasks_.empty(); });
if (this->stop_ && this->tasks_.empty()) {
return;
}
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()>> (
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
    std::unique_lock<std::mutex> lock(queue_mutex_);

    // don't allow enqueueing after stopping the pool
    if (stop_) {
        throw std::runtime_error("enqueue on stopped ThreadPool");
    }

    tasks_.emplace([task](){ (*task)(); });
}
condition_.notify_one();
return res;

}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread &worker : workers_) {
worker.join();
}
}

endif // THREAD_POOL_H</pre>

test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n98" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "threadpool.h"

void mytask(int i) {
printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i);
sleep(1);
return;
}

int main() {
ThreadPool threadpool(20);
for (int i = 0; i < 100; ++i) {
threadpool.enqueue(mytask, i);
}
return 0;
}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n100" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
Info: thread 139679726323456 is working on task 1
Info: thread 139679709538048 is working on task 3
Info: thread 139679717930752 is working on task 2
Info: thread 139679734716160 is working on task 0
Info: thread 139679600432896 is working on task 15
Info: thread 139679684359936 is working on task 6
Info: thread 139679617218304 is working on task 14
Info: thread 139679634003712 is working on task 12
Info: thread 139679667574528 is working on task 8
Info: thread 139679625611008 is working on task 13
Info: thread 139679575254784 is working on task 19
Info: thread 139679659181824 is working on task 9
Info: thread 139679701145344 is working on task 4
Info: thread 139679692752640 is working on task 5
Info: thread 139679592040192 is working on task 17
Info: thread 139679583647488 is working on task 18
Info: thread 139679675967232 is working on task 7
Info: thread 139679642396416 is working on task 11
Info: thread 139679608825600 is working on task 16
Info: thread 139679650789120 is working on task 10
Info: thread 139679684359936 is working on task 21
Info: thread 139679617218304 is working on task 24
Info: thread 139679600432896 is working on task 28
Info: thread 139679709538048 is working on task 23
Info: thread 139679659181824 is working on task 30
Info: thread 139679692752640 is working on task 32
Info: thread 139679583647488 is working on task 34
Info: thread 139679608825600 is working on task 35
Info: thread 139679592040192 is working on task 33
Info: thread 139679634003712 is working on task 25
Info: thread 139679625611008 is working on task 29
Info: thread 139679726323456 is working on task 20
Info: thread 139679701145344 is working on task 31
Info: thread 139679650789120 is working on task 36
Info: thread 139679667574528 is working on task 27
Info: thread 139679575254784 is working on task 37
Info: thread 139679734716160 is working on task 26
Info: thread 139679675967232 is working on task 38
Info: thread 139679717930752 is working on task 22
Info: thread 139679642396416 is working on task 39
Info: thread 139679684359936 is working on task 40
Info: thread 139679692752640 is working on task 45
Info: thread 139679625611008 is working on task 51
Info: thread 139679583647488 is working on task 43
Info: thread 139679659181824 is working on task 44
Info: thread 139679575254784 is working on task 55
Info: thread 139679592040192 is working on task 47
Info: thread 139679617218304 is working on task 41
Info: thread 139679717930752 is working on task 57
Info: thread 139679726323456 is working on task 49
Info: thread 139679634003712 is working on task 50
Info: thread 139679650789120 is working on task 52
Info: thread 139679675967232 is working on task 59
Info: thread 139679667574528 is working on task 54
Info: thread 139679608825600 is working on task 46
Info: thread 139679734716160 is working on task 56
Info: thread 139679600432896 is working on task 48
Info: thread 139679642396416 is working on task 58
Info: thread 139679709538048 is working on task 42
Info: thread 139679701145344 is working on task 53
Info: thread 139679684359936 is working on task 60
Info: thread 139679625611008 is working on task 62
Info: thread 139679692752640 is working on task 61
Info: thread 139679583647488 is working on task 63
Info: thread 139679659181824 is working on task 64
Info: thread 139679575254784 is working on task 65
Info: thread 139679592040192 is working on task 66
Info: thread 139679617218304 is working on task 67
Info: thread 139679717930752 is working on task 68
Info: thread 139679726323456 is working on task 69
Info: thread 139679650789120 is working on task 71
Info: thread 139679634003712 is working on task 70
Info: thread 139679675967232 is working on task 72
Info: thread 139679667574528 is working on task 73
Info: thread 139679608825600 is working on task 74
Info: thread 139679734716160 is working on task 75
Info: thread 139679642396416 is working on task 77
Info: thread 139679709538048 is working on task 78
Info: thread 139679701145344 is working on task 79
Info: thread 139679600432896 is working on task 76
Info: thread 139679684359936 is working on task 80
Info: thread 139679625611008 is working on task 81
Info: thread 139679583647488 is working on task 83
Info: thread 139679692752640 is working on task 82
Info: thread 139679659181824 is working on task 84
Info: thread 139679575254784 is working on task 85
Info: thread 139679717930752 is working on task 88
Info: thread 139679617218304 is working on task 87
Info: thread 139679592040192 is working on task 86
Info: thread 139679634003712 is working on task 91
Info: thread 139679650789120 is working on task 90
Info: thread 139679667574528 is working on task 93
Info: thread 139679675967232 is working on task 92
Info: thread 139679608825600 is working on task 94
Info: thread 139679726323456 is working on task 89
Info: thread 139679734716160 is working on task 95
Info: thread 139679709538048 is working on task 96
Info: thread 139679642396416 is working on task 97
Info: thread 139679701145344 is working on task 98
Info: thread 139679600432896 is working on task 99</pre>

2. 使用方式

2.1 全局線程池 + 異步任務(wù)

創(chuàng)建一個(gè)ThreadPool的全局變量逮诲,將所有需要異步執(zhí)行的任務(wù)丟到該線程池中即可:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n104" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "threadpool.h"

// 全局異步線程池
ThreadPool g_threadpool2(20);

int main() {
// 執(zhí)行異步任務(wù)
g_threadpool2.enqueue(
[] {
sleep(1);
printf("async task done\n");
});
return 0;
}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n106" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
async task done</pre>

2.2 全局線程池 + 同步任務(wù)

創(chuàng)建一個(gè)ThreadPool的全局變量并添加同步任務(wù),通過(guò)std::futurewait()方法阻塞等待同步結(jié)果幽告,也可以使用get()方法獲取到函數(shù)返回值梅鹦。

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n109" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <memory>

include "threadpool.h"

// 全局異步線程池
ThreadPool g_threadpool2(20);

int main() {
// 創(chuàng)建同步任務(wù)
auto res = g_threadpool2.enqueue(
[] {
sleep(1);
printf("sync task done\n");
});

// 阻塞等待同步結(jié)果
res.wait();

return 0;

}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n111" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
sync task done</pre>

2.3 局部線程池實(shí)現(xiàn)并發(fā)同步

創(chuàng)建一個(gè)臨時(shí)ThreadPool,利用其析構(gòu)函數(shù)完成并發(fā)同步任務(wù):

需要注意的是冗锁,這種用法已經(jīng)脫離了線程池的初衷(避免處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷(xiāo)毀線程的代價(jià))齐唆,它的主要用途是實(shí)現(xiàn)「多線程并發(fā)」,常用于并發(fā)多個(gè)IO請(qǐng)求并等待同步結(jié)果冻河。

考慮這個(gè)場(chǎng)景:代碼中僅在某種特殊場(chǎng)景(極少觸發(fā))下需要并發(fā)請(qǐng)求多個(gè)http鏈接箍邮,一方面我們不希望這些請(qǐng)求影響到進(jìn)程的業(yè)務(wù)線程池,另一方面我們又不想單獨(dú)為這個(gè)場(chǎng)景創(chuàng)建一個(gè)全局線程池使其大部分時(shí)間都在空跑叨叙。

2.3這種用法解決了我們「臨時(shí)創(chuàng)建線程+執(zhí)行并行任務(wù)+銷(xiāo)毀線程」的局部并發(fā)問(wèn)題锭弊,避免我們直接在用戶代碼處直接創(chuàng)建線程。

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n118" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <memory>

include "threadpool.h"

int main() {
// 創(chuàng)建并發(fā)度為5的局部線程池
std::shared_ptr<ThreadPool> threadpool = std::make_shared<ThreadPool>(5);

// 創(chuàng)建30個(gè)異步任務(wù)
for (int i = 0; i < 30; i++) {
    threadpool->enqueue(
        [i] {
            sleep(1);
            printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i);
        });
}

// 阻塞直至獲取同步結(jié)果
threadpool.reset();

return 0;

}</pre>

編譯運(yùn)行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n120" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
Info: thread 139811129124608 is working on task 4
Info: thread 139811145910016 is working on task 2
Info: thread 139811137517312 is working on task 3
Info: thread 139811162695424 is working on task 0
Info: thread 139811154302720 is working on task 1
Info: thread 139811129124608 is working on task 5
Info: thread 139811137517312 is working on task 7
Info: thread 139811145910016 is working on task 6
Info: thread 139811162695424 is working on task 8
Info: thread 139811154302720 is working on task 9
Info: thread 139811129124608 is working on task 10
Info: thread 139811137517312 is working on task 11
Info: thread 139811162695424 is working on task 13
Info: thread 139811154302720 is working on task 14
Info: thread 139811145910016 is working on task 12
Info: thread 139811129124608 is working on task 15
Info: thread 139811137517312 is working on task 18
Info: thread 139811145910016 is working on task 19
Info: thread 139811162695424 is working on task 16
Info: thread 139811154302720 is working on task 17
Info: thread 139811129124608 is working on task 21
Info: thread 139811162695424 is working on task 23
Info: thread 139811154302720 is working on task 24
Info: thread 139811145910016 is working on task 22
Info: thread 139811137517312 is working on task 20
Info: thread 139811162695424 is working on task 25
Info: thread 139811154302720 is working on task 26
Info: thread 139811129124608 is working on task 27
Info: thread 139811137517312 is working on task 29
Info: thread 139811145910016 is working on task 28</pre>

Reference

[1] https://www.cnblogs.com/ailumiyana/p/10016965.html

[2] https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

[3] https://blog.csdn.net/qq_34771252/article/details/90319537

[4] https://github.com/lizhenghn123/zl_threadpool/tree/master/ThreadPoolCpp03

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末摔敛,一起剝皮案震驚了整個(gè)濱河市廷蓉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖桃犬,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件刹悴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡攒暇,警方通過(guò)查閱死者的電腦和手機(jī)土匀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)形用,“玉大人就轧,你說(shuō)我怎么就攤上這事√锒龋” “怎么了妒御?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)镇饺。 經(jīng)常有香客問(wèn)我乎莉,道長(zhǎng),這世上最難降的妖魔是什么奸笤? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任惋啃,我火速辦了婚禮,結(jié)果婚禮上监右,老公的妹妹穿的比我還像新娘边灭。我一直安慰自己,他們只是感情好健盒,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布绒瘦。 她就那樣靜靜地躺著,像睡著了一般扣癣。 火紅的嫁衣襯著肌膚如雪椭坚。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,007評(píng)論 1 284
  • 那天搏色,我揣著相機(jī)與錄音,去河邊找鬼券册。 笑死频轿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烁焙。 我是一名探鬼主播航邢,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼骄蝇!你這毒婦竟也來(lái)了膳殷?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤九火,失蹤者是張志新(化名)和其女友劉穎赚窃,沒(méi)想到半個(gè)月后册招,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡勒极,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年是掰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辱匿。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡键痛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出匾七,到底是詐尸還是另有隱情絮短,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布昨忆,位于F島的核電站丁频,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏扔嵌。R本人自食惡果不足惜限府,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望痢缎。 院中可真熱鬧胁勺,春花似錦、人聲如沸独旷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嵌洼。三九已至案疲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間麻养,已是汗流浹背褐啡。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鳖昌,地道東北人备畦。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像许昨,于是被迫代替她去往敵國(guó)和親懂盐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容