一、線程池功能組件
總共包含三個(gè)組件:線程池渠驼、線程執(zhí)行任務(wù)蜈块,任務(wù)詳情。
線程池包含條件等待迷扇,鎖百揭,鏈中線程任務(wù)某一個(gè),鏈中job隊(duì)列中某一個(gè)蜓席。
線程執(zhí)行任務(wù)包含線程池器一,當(dāng)前線程,當(dāng)前線程的的prev和next厨内。
任務(wù)詳情包含回調(diào)函數(shù)祈秕,回調(diào)函數(shù)的參數(shù)渺贤,當(dāng)前job的prev和next。
二请毛、代碼編寫流程
- 初始化線程池:ThreadPool pool
- 為線程池添加線程:參數(shù)是線程池指針志鞍,線程數(shù)量
2.1 如果線程數(shù)量小于1,賦值1
2.2 為線程池pool分配內(nèi)存空間
2.3 創(chuàng)建條件信號量方仿,將條件信號量賦值給線程池的條件信號量
2.4 創(chuàng)建鎖固棚,將鎖賦值給線程池的鎖
2.5 遍歷線程數(shù)量,生成執(zhí)行任務(wù)的結(jié)構(gòu)體仙蚜,并分配內(nèi)存空間此洲,同時(shí)初始化執(zhí)行任務(wù)結(jié)構(gòu)體
2.6 創(chuàng)建線程pthread_create(&worker->thread,NULL,ntyWorkThread,(void*)worker),將其指針給執(zhí)行任務(wù)的線程委粉,并且執(zhí)行的方法是線程執(zhí)行任務(wù)的方法黍翎,worker是線程方法的參數(shù)。通過worker的線程池的等待任務(wù)隊(duì)列中艳丛,獲取對應(yīng)的job回調(diào)函數(shù)和回調(diào)函數(shù)的參數(shù)匣掸,并執(zhí)行任務(wù)的回調(diào)函數(shù),釋放worker氮双,線程退出 - 為線程池添加任務(wù): 參數(shù)是線程池指針碰酝,和job結(jié)構(gòu)體
3.1 加鎖
3.2 將線程池添加job - 關(guān)閉線程池
4.1 遍歷線程池,將所有執(zhí)行線程設(shè)置終止條件設(shè)置為1
4.2 加鎖
4.3 將線程池的當(dāng)前work設(shè)置為NULL
4.4 線程池的等待隊(duì)列job設(shè)置為NULL
4.5 廣播信號量
4.6 解鎖
main的執(zhí)行順序
- 初始化線程池結(jié)構(gòu)體
- 初始化線程池
- 添加job
- 關(guān)閉線程池
三戴差、具體代碼
//
// Created by rosy on 2022/7/24.
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#define MAX_NUM_THREADS 100
#define NUM_JOBS 1000
#define LL_ADD(item,list) do{ \
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item,list) do { \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next != NULL) item->next->prev = item->prev; \
if(list == item) list = item->next; \
item->prev = item->next = NULL ;\
} while(0)
typedef struct NWORK{
pthread_t thread;
struct THREAD_POOL *thread_pool;
struct NWORK *prev;
struct NWORK *next;
int terminate;
} nWork;
typedef struct THREAD_POOL{
pthread_cond_t job_cond;
pthread_mutex_t job_mtx;
struct NWORK *works;
struct NJOB *jobs;
} thread_pool;
typedef struct NJOB{
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
static void *work_do_job(void *ptr){
nWork *work = (nWork*)ptr;
while(1) {
pthread_mutex_lock(&work->thread_pool->job_mtx);
while(work->thread_pool->jobs == NULL){
if(work->terminate){
break;
}
pthread_cond_wait(&work->thread_pool->job_cond,&work->thread_pool->job_mtx);
}
if(work->terminate){
pthread_mutex_unlock(&work->thread_pool->job_mtx);
break;
}
nJob *job = work->thread_pool->jobs;
if(job != NULL){ //移除當(dāng)前job
LL_REMOVE(job,work->thread_pool->jobs);
}
pthread_mutex_unlock(&work->thread_pool->job_mtx);
if(job == NULL) continue;
job->job_function(job);
}
free(work);
pthread_exit(NULL);
}
int createThreadPool(thread_pool* pool,int num_works){
if(num_works < 1) {
num_works = 1;
}
memset(pool,0,sizeof(thread_pool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->job_cond,&blank_cond,sizeof(pool->job_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->job_mtx,&blank_mutex,sizeof(pool->job_mtx));
for(int i = 0; i < num_works;i++){
nWork *work = (nWork*)malloc(sizeof(nWork));
if(work == NULL){
perror("malloc work");
return 1;
}
memset(work,0,sizeof(nWork));
work->thread_pool = pool;
int ret = pthread_create(&work->thread,NULL,work_do_job,(void *)work);
if(ret){
perror("pthread_create");
free(work);
return 1;
}
LL_ADD(work,work->thread_pool->works);
}
return 0;
}
void add_job(thread_pool *pool,nJob* job){
pthread_mutex_lock(&pool->job_mtx);
LL_ADD(job,pool->jobs);
pthread_cond_signal(&pool->job_cond);
pthread_mutex_unlock(&pool->job_mtx);
}
void shut_down(thread_pool *pool){
nWork *work = NULL;
for(work = pool->works; work != NULL; work = work->next){
work->terminate = 1;
}
pthread_mutex_lock(&pool->job_mtx);
pool->works = NULL;
pool->jobs = NULL;
pthread_cond_broadcast(&pool->job_cond);
pthread_mutex_unlock(&pool->job_mtx);
}
void counter(nJob *job){
int index = *(int*)job->user_data;
printf("index:%d,selfid:%lu\n",index,pthread_self());
free(job->user_data);
free(job);
}
int main(int argc,char *argv[]) {
printf("hello yes\n");
thread_pool pool;
createThreadPool(&pool,MAX_NUM_THREADS);
printf("create thread pool\n");
for(int i = 0; i < NUM_JOBS; i++){
nJob *job = (nJob*)malloc(sizeof(nJob));
if(job == NULL){
perror("malloc");
exit(1);
}
job->job_function = counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
add_job(&pool,job);
}
printf("關(guān)閉...\n");
shut_down(&pool);
getchar();
printf("finish\n");
}