Linux 多線程 - 線程異步與同步機制
I. 同步機制
線程間的同步機制主要包括三個:
互斥鎖:
以排他的方式,防止共享資源被并發(fā)訪問褐荷;
互斥鎖為二元變量转捕, 狀態(tài)為0-開鎖胳螟、1-上鎖;
開鎖必須由上鎖的線程執(zhí)行,不受其它線程干擾.條件變量:
滿足某個特定條件時接校,可通過條件變量通知其它線程do-something;
必須與互斥鎖*聯(lián)合使用猛频,單獨無法執(zhí)行.讀寫鎖:
針對多讀者,少寫者的情況設(shè)定> * 允許**多讀**,但此時**不可寫**鹿寻; > > * **唯一寫**睦柴,此時**不可讀**.
函數(shù)的頭文件為:
#include <phtread.h>
1. 互斥鎖
操作流程:
I. 創(chuàng)建互斥鎖
II. 申請鎖:若可用,立刻占用毡熏;否則爱只,阻塞等待
III. do-something
IV. 釋放鎖
V. 銷毀鎖
以下是互斥鎖的基本操作函數(shù):
功能 | 函數(shù) | 參數(shù) | 返回值 | 說明 |
---|---|---|---|---|
初始化鎖 | int pthread_mutex_init( |
pthread_mutex_t *mutex,
const pthread_mutexattr_t *attr) | 1. mutex: 欲建立的互斥鎖
2.attr:屬性,一般為NULL | 成功:0
失斦猩病:非零值 | |
| 阻塞申請鎖 | int pthread_mutex_lock(
pthread_mutex_t *mutex) | mutex:互斥鎖 | 成功:0
失斕袷浴:非零值 | 若未申請到,
阻塞等待 |
| 非阻塞申請 | int pthread_mutex_trylock(
pthread_mutex_t *mutex) | mutex:互斥鎖 | 成功:0
失敺枋睢:非零值 | 若未申請到训柴,
返回錯誤 |
| 釋放鎖 | int pthread_mutex_unlock(
pthread_mutex_t *mutex) | mutex:互斥鎖 | 成功:0
失敗:非零值 | |
| 銷毀鎖 | int pthread_mutex_destroy(
pthread_mutex_t *mutex) | mutex:互斥鎖 | 成功:0
失敻菊:非零值 | |
2. 條件變量
注意幻馁,條件變量必須與互斥鎖共同使用;
以下是條件變量的基本操作函數(shù):
功能 | 函數(shù) | 參數(shù) | 返回值 | 說明 |
---|---|---|---|---|
初始化鎖 | int pthread_cond_init( |
pthread_cond_t *cond,
const pthread_condattr_t *attr) | 1. cond: 欲建立的條件變量
2.attr:屬性越锈,一般為NULL | 成功:0
失斦锑隆:非零值 | |
| 等待條件變量 | int pthread_cond_wait(
pthread_cond_t *cond,
pthread_mutex_t *mutex) | 1.cond:條件變量
2.mutex:互斥鎖 | 成功:0
失敗:非零值 | 阻塞等待
隱含釋放申請到的互斥鎖 |
| 限時等待條件變量 | int pthread_cond_timewait(
pthread_cond_t *cond,
pthread_mutex_t *mutex,
const struct timespec *time) | 3.time:等待過期的絕對時間
從1970-1-1:0:0:0起 | 成功:0
失敻势尽:非零值 | struct timespec{long ts_sec;
long ts_nsec} |
| 單一通知 | int pthread_cond_signal(
pthread_cond_t *cond) | cond:條件變量 | 成功:0
失斚」铡:非零值 | 喚醒等待cond的第一個線程
隱含獲取需要的互斥鎖 |
| 廣播通知 | int pthread_cond_broadcast(
pthread_cond_t *cond) | cond:條件變量 | 成功:0
失敗:非零值 | 喚醒所有等待cond的線程
隱含獲取需要的互斥鎖 |
| 銷毀條件變量 | int pthread_cond_destroy(
pthread_cond_t *cond) | cond:條件變量 | 成功:0
失數と酢:非零值 | |
3. 讀寫鎖
讀寫基本原則:
若當(dāng)前線程讀數(shù)據(jù)德撬,則允許其他線程讀數(shù)據(jù),但不允許寫
若當(dāng)前線程寫數(shù)據(jù)躲胳,則不允許其他線程讀蜓洪、寫數(shù)據(jù)
以下是基本的操作:
功能 | 函數(shù) | 參數(shù) | 返回值 | 說明 |
---|---|---|---|---|
初始化鎖 | int pthread_rwlock_init( |
pthread_rwlock_t *rwlock,
const pthread_rwlockattr_t *attr) | 1. rwlock: 欲建立的讀寫鎖
2.attr:屬性,一般為NULL | 成功:0
失斉髌弧:非零值 | |
| 阻塞申請讀鎖 | int pthread_rwlock_rdlock(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失斅√础:非零值 | 若未申請到,
阻塞等待 |
| 非阻塞申請 | int pthread_rwlock_tryrdlock(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失敶馀取:非零值 | 若未申請到恐仑,
返回錯誤 |
| 阻塞申請寫鎖 | int pthread_rwlock_wrlock(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失敗:非零值 | 若未申請到再芋,
阻塞等待 |
| 非阻塞申請寫鎖 | int pthread_rwlock_trywrlock(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失斁账:非零值 | 若未申請到,
返回錯誤 |
| 釋放鎖 | int pthread_mutex_unlock(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失敿檬辍:非零值 | |
| 銷毀鎖 | int pthread_rwlock_destroy(
pthread_rwlock_t *rwlock) | rwlock:讀寫鎖 | 成功:0
失敿选:非零值 | |
4. 線程信號量
線程信號量類似進(jìn)程的信號量记某,主要是使得多個線程訪問共享資源時,順序互斥訪問构捡。
與互斥鎖的區(qū)別在于:
- 互斥鎖:只有一個bool類型的值液南,只允許2個線程進(jìn)行排隊;
- 信號量:允許多個線程共同等待一個共享資源
函數(shù)如下:
#include <semaphore.h>
功能 | 函數(shù) | 參數(shù) | 返回值 | 說明 |
---|---|---|---|---|
創(chuàng)建信號量 | int sem_init(sem_t *sem, | |||
int pshared, unsigned int value) | 1. sem:信號量地址; |
2. pshared:是(!=0)否(0)為共享信號量
3. value:信號量初值 | 0: 成功
-1: 失敗 | |
| P操作(阻塞) | int sem_wait(sem_t *sem) | sem:信號量地址 | 0: 成功
-1: 失敗 | |
| P操作(非阻塞) | int sem_trywait(sem_t *sem) | sem:信號量地址 | 0: 成功
-1: 失敗 | |
| P操作(時間) | int sem_timedwait(sem_t *sem,
const struct timespec *abs_timeout) | 1. sem:信號量地址
2. abs_timeout:超時時間 | 0: 成功
-1: 失敗 | struct timespec 見下面 |
| V操作 | int sem_post(sem_t *sem) | sem:信號量地址 | 0: 成功
-1: 失敗 | |
| 獲取信號量值 | int sem_getvalue(sem_t *sem, int *sval) | 1. sem:信號量地址
2. sval: 將信號量值放到該地址 | 0: 成功
-1: 失敗 | |
| 刪除信號量 | int sem_destroy(sem_t *sem) | sem:信號量地址 | 0: 成功
-1: 失敗 | |
struct timespec {
time_t tv_sec; /* Seconds */
long tv_nsec; /* Nanoseconds [0 .. 999999999] */
};
II. 異步機制 - 信號
線程的異步機制只有信號
勾徽,類似于線程的信號滑凉。
線程信號具備以下特點
- 任何線程都可以向其它線程(同一進(jìn)程下)發(fā)送信號;
- 每個線程都具備自己獨立的信號屏蔽集喘帚,不影響其它線程畅姊;
- 線程創(chuàng)建時,不繼承原線程的信號屏蔽集吹由;
- 同進(jìn)程下若未,所有線程共享對某信號的處理方式,即一個設(shè)置倾鲫,所有有效粗合;
- 多個線程的程序,向某一個線程發(fā)送終止信號乌昔,則整個進(jìn)程終止
信號的基本操作如下:
功能 | 函數(shù) | 參數(shù) | 返回值 | 說明 |
---|---|---|---|---|
安裝信號 | sighandler_t signal( |
int signum,
sighandler_t handler) | 1.signum:信號值
2.handler:信號操作 | ? | 詳情參見:
http://www.cnblogs.com/Jimmy1988/p/7575103.html |
| 發(fā)送信號 | int pthread_kill(
pthread_t threadid,
int signo | 1.threadid: 目標(biāo)線程id
2.signo:信號值 | 成功:0
失斚毒巍:非零值 | 若signo=0,
檢測該線程是否存在,
不發(fā)送信號 |
| 設(shè)置屏蔽集 | pthread_sigmask(int how,
const sigset_t *set,
sigset_t *oldset) | 1.how:如何更改信號掩碼
2.newmask:新的信號屏蔽集
3.原信號屏蔽集 | 成功:0
失斂牡馈:非零值 | how值:
?1.SIG_BLOCK:添加新掩碼
?2.SIG_UNBLOCK:刪除新掩碼
?3.SIG_SETMASK:設(shè)置新掩碼完全替換舊值 |
也可以參考這篇博客:https://www.cnblogs.com/coding-my-life/p/4782529.html
III供屉、示例代碼
1.同步機制:
1). 互斥鎖:
兩個線程:
- 讀線程:從
stdin
中讀取數(shù)據(jù),并存儲- 寫線程:從存儲buffer中讀取數(shù)據(jù)并顯示
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 128
pthread_mutex_t mutex;
int EXIT = 0;
char word[SIZE];
void * child(void *arg)
{
while(1)
{
while(strlen(word) == 0)
usleep(100);
pthread_mutex_lock(&mutex);
printf("The input words: %s\n", word);
pthread_mutex_unlock(&mutex);
if(strcmp("end\n", word) == 0)
{
printf("The process end\n");
EXIT = 1;
break;
}
memset(word, '\0', SIZE);
}
return ;
}
int main()
{
//1\. create the lock
pthread_mutex_init(&mutex, NULL);
//2.create a new thread
pthread_t tid;
pthread_create(&tid, NULL, (void *)*child, NULL);
//3\. Input words
while(EXIT == 0)
{
if(strlen(word)!=0)
usleep(100);
//add the lock
else
{
pthread_mutex_lock(&mutex);
printf("Input words: ");
fgets(word, SIZE, stdin);
pthread_mutex_unlock(&mutex);
}
}
pthread_join(tid, NULL);
printf("The child has joined\n");
pthread_mutex_destroy(&mutex);
return 0;
}
2). 條件變量:
生產(chǎn)者和消費者問題:
生產(chǎn)者:
向倉庫生產(chǎn)數(shù)據(jù)(大小可任意設(shè)定)捅厂,當(dāng)滿時贯卦,阻塞等待倉庫有空閑(由消費者消費完后通知)消費者:
從倉庫讀數(shù)據(jù),若倉庫為空焙贷,則阻塞等待,當(dāng)生產(chǎn)者再次生產(chǎn)產(chǎn)品后通知
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define SIZE 2
int Data[SIZE];
typedef struct
{
pthread_mutex_t lock;
pthread_cond_t notFull;
pthread_cond_t notEmpty;
int read_point;
int write_point;
}sCOND;
sCOND *pCondLock;
void init(void)
{
//memset(pCondLock, 0, sizeof(sCOND));
//1.Create a mutex lock
pthread_mutex_init(&pCondLock->lock, NULL);
//2.Create two condition variable
pthread_cond_init(&pCondLock->notFull, NULL);
pthread_cond_init(&pCondLock->notEmpty, NULL);
//set the read and write point 0
pCondLock->read_point = 0;
pCondLock->write_point = 0;
}
int put(int data)
{
//obtain the mutex lock
pthread_mutex_lock(&pCondLock->lock);
//check the global variable Data full or not
while((pCondLock->write_point+1)%SIZE == pCondLock->read_point)
{
printf("The buf is full, waitting for not_full signal\n");
pthread_cond_wait(&pCondLock->notFull, &pCondLock->lock);
}
//write the data to buffer
Data[pCondLock->write_point] = data;
pCondLock->write_point++;
if(pCondLock->write_point == SIZE)
pCondLock->write_point = 0;
//unlock the mutex lock
pthread_mutex_unlock(&pCondLock->lock);
//wake up the not_empty signal
pthread_cond_signal(&pCondLock->notEmpty);
return 0;
}
int get(int *data)
{
//obtain the mutex lock
pthread_mutex_lock(&pCondLock->lock);
//check the global variable Data empty or not
while(pCondLock->write_point == pCondLock->read_point)
{
printf("The buf is empty, waitting for not_empty signal\n");
pthread_cond_wait(&pCondLock->notEmpty, &pCondLock->lock);
}
//read the data from buffer
*data = Data[pCondLock->read_point];
pCondLock->read_point++;
if(pCondLock->read_point == SIZE)
pCondLock->read_point = 0;
//wake up the not_empty signal
pthread_cond_signal(&pCondLock->notFull);
pthread_mutex_unlock(&pCondLock->lock);
return *data;
}
void *produce(void)
{
int times=0;
//1\. first 5 times, every second write a data to buffer
for(times=0; times < 5; times++)
{
sleep(1);
put(times+1);
printf("Input date=%d\n", times+1);
}
//2\. last 5 times, every 3 seconds write a data to buffer
for(times = 5; times < 10; times++)
{
sleep(3);
put(times+1);
printf("Input date=%d\n", times+1);
}
}
void *consume(void)
{
int times=0;
int data=0;
//10 times, every 2 seconds read the buffer
for(times = 0; times < 10; times++)
{
sleep(2);
data = get(&data);
printf("The data is %d\n", data);
}
}
int main()
{
pthread_t tid1, tid2;
pCondLock = malloc(sizeof(sCOND));
memset(pCondLock, '\0', sizeof(sCOND));
//1.init the struct of sCondLock
init();
//2\. start two threads
pthread_create(&tid1, NULL, (void*)*produce, NULL);
pthread_create(&tid2, NULL, (void*)*consume, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
free(pCondLock);
return 0;
}
3). 讀寫鎖:
四個線程:兩讀兩寫贿堰;
多進(jìn)程可同時讀辙芍,但此時不可寫;
只有一個線程可寫羹与,其它線程等待該線程寫完后執(zhí)行響應(yīng)的讀/寫操作
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#define BUF_SIZE 128
char buf[BUF_SIZE];
pthread_rwlock_t rwlock;
int time_to_exit = 0;
void *read_first(void *arg);
void *read_second(void *arg);
void *write_first(void *arg);
void *write_second(void *arg);
int main()
{
pthread_t tid_rd1, tid_rd2;
pthread_t tid_wr1, tid_wr2;
//1.create a read-write-lock
int ret = pthread_rwlock_init(&rwlock, NULL);
if(ret != 0)
{
perror("pthread_rwlock_init");
exit(EXIT_FAILURE);
}
//2\. Create the read and write threads
ret = pthread_create(&tid_rd1, NULL, (void *)*read_first, NULL);
if(ret != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
ret = pthread_create(&tid_rd2, NULL, (void *)*read_second, NULL);
if(ret != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
ret = pthread_create(&tid_wr1, NULL, (void *)*write_first, NULL);
if(ret != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
ret = pthread_create(&tid_wr2, NULL, (void *)*write_second, NULL);
if(ret != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
//3\. wait for the threads finish
pthread_join(tid_rd1, NULL);
pthread_join(tid_rd2, NULL);
pthread_join(tid_wr1, NULL);
pthread_join(tid_wr2, NULL);
//4\. delete the read-write-lock
pthread_rwlock_destroy(&rwlock);
return 0;
}
/***************************************************/
// Write threads
void *write_first(void *arg)
{
while(!time_to_exit)
{
sleep(5);
//1\. get the read-lock
pthread_rwlock_wrlock(&rwlock);
printf("\nThis is thread write_first!\n");
printf("Pls input the string: ");
fgets(buf, BUF_SIZE, stdin);
pthread_rwlock_unlock(&rwlock);
}
printf("Exit the write_first!\n");
pthread_exit(0);
}
void *write_second(void *arg)
{
while(!time_to_exit)
{
sleep(10);
//1\. get the read-lock
pthread_rwlock_wrlock(&rwlock);
printf("\nThis is thread write_second!\n");
printf("Pls input the string: ");
fgets(buf, BUF_SIZE, stdin);
pthread_rwlock_unlock(&rwlock);
}
printf("Exit the write_second!\n");
pthread_exit(0);
}
//-----2\. read the threads
void *read_first(void *arg)
{
while(1)
{
sleep(5);
pthread_rwlock_rdlock(&rwlock);
printf("\nThis is thread read_first\n");
//if write an string of "end"
if(!strncmp("end", buf, 3))
{
printf("Exit the read_first!\n");
break;
}
//if nothing in the BUFFER
while(strlen(buf) == 0)
{
pthread_rwlock_unlock(&rwlock);
sleep(2);
pthread_rwlock_rdlock(&rwlock);
}
//output the string in BUFFER
printf("The string is: %s\n", buf);
pthread_rwlock_unlock(&rwlock);
}
pthread_rwlock_unlock(&rwlock);
//make the exit true
time_to_exit = 1;
pthread_exit(0);
}
void *read_second(void *arg)
{
while(1)
{
sleep(4);
pthread_rwlock_rdlock(&rwlock);
printf("\nThis is thread read_second\n");
//if write an string of "end"
if(!strncmp("end", buf, 3))
{
printf("Exit the read_second!\n");
break;
}
//if nothing in the BUFFER
while(strlen(buf) == 0)
{
pthread_rwlock_unlock(&rwlock);
sleep(2);
pthread_rwlock_rdlock(&rwlock);
}
//output the string in BUFFER
printf("The string is: %s\n", buf);
pthread_rwlock_unlock(&rwlock);
}
pthread_rwlock_unlock(&rwlock);
//make the exit true
time_to_exit = 1;
pthread_exit(0);
}
2. 異步機制 - 信號:
本程序包括兩個線程:
線程1安裝SIGUSR1,阻塞除SIGUSR2外的所有信號故硅;
線程2安裝SIGUSR2,不阻塞任何信號
操作流程:
1- 線程1、2安裝信號纵搁;
2- 主線程發(fā)送SIGUSR1和SIGUSR2至線程1和線程2吃衅;
3- 線程1接收到除SIGUSR2之外的信號,阻塞不執(zhí)行腾誉;當(dāng)收到SIGUSR2后徘层,執(zhí)行對應(yīng)操作峻呕;
4- 線程2接收到SIGUSR1和SIGUSR2后,分別執(zhí)行對應(yīng)操作
5- 主線程發(fā)送SIGKILL信號趣效,結(jié)束整個進(jìn)程
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
void *th_first(void *arg);
void *th_second(void *arg);
pthread_t tid1, tid2;
void handler(int signo)
{
printf("In handler: tid_%s, signo=%d\n", ((pthread_self() == tid1)?"first":"second"), signo);
}
int main()
{
int ret = 0;
//1. create first thread
ret = pthread_create(&tid1, NULL, (void *)*th_first, NULL);
if(0 !=ret)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
//2. create second thread
ret = pthread_create(&tid2, NULL, (void *)*th_second, NULL);
if(0 !=ret)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
sleep(2);
//3. send the signal of SIG_USER1 and SIG_USER2 to thread_first
ret = pthread_kill(tid1, SIGUSR1);
if(0 !=ret)
{
perror("pthread_kill");
exit(EXIT_FAILURE);
}
ret = pthread_kill(tid1, SIGUSR2);
if(0 !=ret)
{
perror("pthread_kill");
exit(EXIT_FAILURE);
}
//4. send the signal of SIG_USER1 and SIG_USER2 to thread_second_
sleep(1);
ret = pthread_kill(tid2, SIGUSR1);
if(0 !=ret)
{
perror("pthread_kill");
exit(EXIT_FAILURE);
}
ret = pthread_kill(tid2, SIGUSR2);
if(0 !=ret)
{
perror("pthread_kill");
exit(EXIT_FAILURE);
}
sleep(1);
//5. send SIGKILL to all threads
ret = pthread_kill(tid1, SIGKILL);
if(0 !=ret)
{
perror("pthread_kill");
exit(EXIT_FAILURE);
}
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
return 0;
}
void *th_first(void *arg)
{
//1. Add SIGUSR1 signal
signal(SIGUSR1, handler);
//2. Set the sinagl set
sigset_t set;
sigfillset(&set); //init set to be full, include all signal
sigdelset(&set, SIGUSR2); //delete the SIGUSR2 from the set variable
pthread_sigmask(SIG_SETMASK, &set, NULL); //set the current mask set to be defined set variable
//3. Circular wait the signal
int i;
for(i=0; i<5; i++)
{
printf("\nThis is th_first, tid=%#x\n ", pthread_self());
pause();
}
}
void *th_second(void *arg)
{
usleep(100);
//1. Add the signal of SIGUSR2
signal(SIGUSR2, handler);
//2. Circular wait the signal
int i;
for(i=0; i<5; i++)
{
printf("\nThis is th_second, tid=%#x\n", pthread_self());
pause();
}
}