POSIX線程
POSIX耳舅,全稱為可移植性操作系統(tǒng)接口。它包括了系統(tǒng)應(yīng)用程序接口(簡稱API)均芽。該標準的目的是定義了標準的 基于UNIX操作系統(tǒng)的系統(tǒng)接口和環(huán)境來支持源代碼級的可移植性丘逸,致力于提供基于不同語言的規(guī)范。POSIX的線程標準掀宋,定義了創(chuàng)建和操縱線程的一套API深纲。
基本使用
#include <iostream>
#include <queue>
#include <pthread.h>
void *run(void* args) { //異步方法
int i = *(int*)i; // 1
return 0;
}
int main() {
int i = 1; //線程參數(shù)
pthread_create(&pid, 0, run, &i);//創(chuàng)建線程 pthread_join(pid,0);//等待線程結(jié)束 system("pause");
return 0;
}
線程同步
多線程同時讀寫同一份共享資源的時候,可能會引起沖突劲妙。需要引入線程“同步”機制湃鹊,即各位線程之間有序 地對共享資源進行操作。
#include <pthread.h>
using namespace std;
queue<int> q;
void *pop(void* args) {
//線程未同步導致的多線程安全問題
// 會有重復的數(shù)據(jù)取出并出現(xiàn)異常
if (!q.empty())
{
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop();
}
else {
printf("無數(shù)據(jù)\n");
}
return 0;
}
int main() {
for (size_t i = 0; i < 5; i++) {
q.push(i);
}
pthread_t pid[10];
for (size_t i = 0; i < 10; i++) {
pthread_create(&pid[i], 0, pop, &q);
}
system("pause");
return 0;
}
互斥量
pthread_mutex_t 互斥量就是一把鎖镣奋。 當一個線程要訪問一個共享變量時币呵,先用鎖把變量鎖住,操作完了之后再 釋放掉鎖侨颈。
當另一個線程也要訪問這個變量時余赢,發(fā)現(xiàn)這個變量被鎖住了芯义,此時無法訪問,一直等待直到鎖沒了妻柒,才能夠上鎖與 使用扛拨。
使用互斥量前要先初始化,使用的函數(shù)如下:
加入互斥鎖
queue<int> q;
pthread_mutex_t mutex; //互斥量:鎖
void *pop(void* args) {
// 鎖
pthread_mutex_lock(&mutex);
if (!q.empty())
{
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop(); }
else {
printf("無數(shù)據(jù)\n");
}
// 放
pthread_mutex_unlock(&mutex);
return 0;
}
int main() {
//初始化互斥鎖 p
thread_mutex_init(&mutex, 0);
for (size_t i = 0; i < 5; i++) {
q.push(i);
}
pthread_t pid[10];
for (size_t i = 0; i < 10; i++) {
pthread_create(&pid[i], 0, pop, &q);
}
for (size_t i = 0; i < 10; i++) {
pthread_join(pid[i], 0);
}
//需要釋放
pthread_mutex_destroy(&mutex);
return 0;
}
但是pthread_mutex_t鎖是默認是非遞歸的蛤奢,即不可重入鎖鬼癣。如果一個線程多次獲取同一個非遞歸鎖,則會產(chǎn)生死 鎖:
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
//互斥量 : 鎖
pthread_mutex_t mutex;
queue<int> q;
void test(){
pthread_mutex_lock(&mutex); //線程阻塞啤贩,死鎖
printf("隊列大小:%d\n", q.size());
pthread_mutex_unlock(&mutex);
}
void *pop(void* args) {
int ret= pthread_mutex_lock(&mutex);
if (!q.empty()) {
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop();
}
else {
printf("無數(shù)據(jù)\n");
}
test(); //死鎖
pthread_mutex_unlock(&mutex);
return 0;
}
創(chuàng)建遞歸鎖 需要在初始化 pthread_mutex_t 時指明:
// 鎖的屬性 : pthread_mutex_t鎖默認是非遞歸的
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
// 設(shè)置為遞歸鎖
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
// 初始化mutex
pthread_mutex_init(&mutex, &attr);
// 完成初始化后即可釋放
pthread_mutexattr_destroy(&attr);
條件變量
條件變量是線程間進行同步的一種機制待秃,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起; 另一個線程使"條件成立",從而喚醒掛起線程
template <class T>
class SafeQueue {
public:
SafeQueue() {
pthread_mutex_init(&mutex,0);
}
~SafeQueue() {
pthread_mutex_destory(&mutex);
}
void enqueue(T t) { pthread_mutex_lock(&mutex); q.push(t); pthread_mutex_unlock(&mutex);
}
int dequeue(T& t) {
pthread_mutex_lock(&mutex);
if (!q.empty())
{
t = q.front();
q.pop(); pthread_mutex_unlock(&mutex);
return 1;
}
pthread_mutex_unlock(&mutex);
return 0;
}
private:
queue<T> q;
pthread_mutex_t mutex;
};
上面的模板類存放數(shù)據(jù)T,并使用互斥鎖保證對queue的操作是線程安全的痹屹。這就是一個生產(chǎn)/消費模式章郁。
如果需要在取出數(shù)據(jù)的時候,queue為空志衍,則一直等待暖庄,直到下一次enqueue加入數(shù)據(jù)。
此時可以加入條件變量使 “dequeue” 掛起,直到由其他地方喚醒:
#include <queue>
using namespace std;
template <class T>
class SafeQueue {
queue<T> q;
pthread_mutex_t mutex;
pthread_cond_t cond; //條件變量
public:
SafeQueue() {
pthread_mutex_init(&mutex,0);
pthread_cond_init(&cond, 0); //初始化
}
~SafeQueue() {
pthread_mutex_destory(&mutex);
pthread_cond_destory(&cond); //銷毀
}
void enqueue(T t) {
pthread_mutex_lock(&mutex);
q.push(t);
//發(fā)出信號 通知掛起線程
//1楼肪、由系統(tǒng)喚醒一個線程(隨機)
//pthread_cond_signal(&cond);
//2培廓、廣播 喚醒所有等待條件線程
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
}
int dequeue(T& t) {
pthread_mutex_lock(&mutex);
//可能因為某些特殊條件虛假喚醒 所以while循環(huán)等待喚醒。(與Java的wait一樣)
while (q.empty())
{
pthread_cond_wait(&cond, &mutex); //等待并自動釋放互斥鎖
}
t = q.front();
q.pop();
pthread_mutex_unlock(&mutex);
return 1;
}
};
存在三個線程春叫,分別為:生產(chǎn)者P肩钠、消費者C1與C2。
1暂殖、C1從隊列中取出數(shù)據(jù)价匠,此時隊列為空;
2、C2也想從隊列中獲取一個元素呛每,但此時隊列為空踩窖,C2進入阻塞(cond.wait()),等待隊列非空;
3晨横、 P將一個元素入隊洋腮,并喚醒條件變量;
4、C1與C2接收到喚醒信號手形,解除阻塞狀態(tài)啥供,上鎖并獲取隊列中的元素;
5、C2優(yōu)先獲取到鎖叁幢,移除隊列元素并釋放鎖;
6、C1此時操作的隊列為空坪稽,被虛假喚醒曼玩。
自動管理
在使用pthread_mutex_t時鳞骤,lock之后,一定需要unlock黍判。為了防止忘記解鎖豫尽,同時方便使用,可以利用C++中構(gòu) 造方法與析構(gòu)方法對鎖進行封裝顷帖,實現(xiàn)鎖的自動管理美旧。
//封裝互斥量: 鎖
class ThreadLock {
private:
pthread_mutex_t m_lock;
public:
ThreadLock(){
//遞歸鎖
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&m_lock, &attr);
pthread_mutexattr_destroy(&attr);
}
~ThreadLock(){
pthread_mutex_destroy(&m_lock);
}
void lock(){
auto ret = pthread_mutex_lock(&m_lock);
if (ret != 0) { //失敗
}
}
bool try_lock(){
auto ret = pthread_mutex_trylock(&m_lock);
return (ret == 0);
}
void unlock(){
auto ret = pthread_mutex_unlock(&m_lock);
if (ret != 0) {
//失敗
}
}
};
#ifndef MMKV_SCOPEDLOCK_H
#define MMKV_SCOPEDLOCK_H
/**
* 利用C++類的構(gòu)造與析構(gòu)函數(shù)自動加鎖與釋放
* @tparam T
*/
template<typename T>
class ScopedLock {
T *m_lock;
//表示禁止使用 編譯器的拷貝默認構(gòu)造函數(shù)和默認的=操作符
ScopedLock(const ScopedLock<T> &other) = delete;
ScopedLock &operator=(const ScopedLock<T> &other) = delete;
public:
ScopedLock(T *oLock) : m_lock(oLock) {
lock();
}
~ScopedLock() {
unlock();
m_lock = nullptr;
}
void lock() {
if (m_lock) {
m_lock->lock();
}
}
bool try_lock() {
if (m_lock) {
return m_lock->try_lock();
}
return false;
}
void unlock() {
if (m_lock) {
m_lock->unlock();
}
}
};
// __COUNTER__ 表示這個宏函數(shù)被調(diào)用幾次,整型值
#define SCOPEDLOCK(lock) _SCOPEDLOCK(lock, __COUNTER__)
//編譯前才處理贬墩,將__COUNTER__ 變?yōu)榫唧w的值榴嗅,而這里如果沒有這個玩意過度,則編輯時候會直接被識別為 __COUNTER__
#define _SCOPEDLOCK(lock, counter) __SCOPEDLOCK(lock, counter)
#define __SCOPEDLOCK(lock, counter) ScopedLock<decltype(lock)> __scopedLock##counter(&lock)
#endif //MMKV_SCOPEDLOCK_H
//使用
ThreadLock lock;
void test(){
//創(chuàng)建 ScopedLock<ThreadLock> __scopedLock1對象陶舞,使用lock上鎖
SCOPEDLOCK(lock);
//退出方法 執(zhí)行__scopedLock1析構(gòu)嗽测,解鎖lock
}
文件鎖
在多個進程同時操作同一份文件的過程中,很容易導致文件中的數(shù)據(jù)混亂肿孵,需要鎖操作來保證數(shù)據(jù)的完整性唠粥。 在在
最新版本的MMKV中使用flock文件鎖來完成多進程操作文件的同步
#include <sys/file.h>
// Returns 0 on success, or -1 on error
int flock (int fd, int operation);
flock()系統(tǒng)調(diào)用是在整個文件中加鎖,通過對傳入的fd所指向的文件進行操作停做,然后在通過operation參數(shù)所設(shè)置 的值來確定做什么樣的操作晤愧。operation可以賦如下值:
- LOCK_SH,共享鎖蛉腌,多個進程可以使用同一把鎖:讀鎖;
- LOCK_EX官份,排他鎖,同時只允許一個進程使用:寫鎖;
- LOCK_UN眉抬,釋放鎖
- LOCK_BN贯吓,發(fā)起非阻塞請求,如:LOCK_SH|LOCK_BN蜀变。
任意數(shù)量的進程可同時持有一個文件上的共享鎖(讀鎖)悄谐,但只能有一個進程能夠持有一個文件上的互斥鎖(寫 鎖)。flock支持鎖升級:只有自己進程存在讀鎖库北,可以直接升級為寫鎖爬舰,在轉(zhuǎn)換的過程中首先會刪除既有的鎖,然 后創(chuàng)建新鎖 寒瓦。若其他進程存在讀鎖情屹,需要等待釋放讀鎖;
在設(shè)計MMKV中的文件鎖需要實現(xiàn):
- 遞歸鎖
意思是如果一個進程/線程已經(jīng)擁有了鎖,那么后續(xù)的加鎖操作不會導致卡死杂腰,并且解鎖也不會導致外層的鎖 被解掉垃你。對于文件鎖來說,前者是滿足的,后者則不然惜颇。因為文件鎖是狀態(tài)鎖皆刺,沒有計數(shù)器,無論加了多少 次鎖凌摄,一個解鎖操作就全解掉羡蛾。只要用到子函數(shù),就非常需要遞歸鎖锨亏。
- 鎖升級/降級
鎖升級是指將已經(jīng)持有的共享鎖痴怨,升級為互斥鎖,亦即將讀鎖升級為寫鎖;鎖降級則是反過來器予。文件鎖支持 鎖升級浪藻,但是容易死鎖:假如 A、B 進程都持有了讀鎖劣摇,現(xiàn)在都想升級到寫鎖珠移,就會陷入相互等待的困境,發(fā)生死鎖末融。另外钧惧,由于文件鎖不支持遞歸鎖,也導致了鎖降級無法進行勾习,一降就降到?jīng)]有鎖浓瞪。
為了解決這兩個難題,需要對文件鎖進行封裝巧婶,增加讀鎖乾颁、寫鎖計數(shù)器。
- 加寫鎖時艺栈,如果當前已經(jīng)持有讀鎖英岭,那么先嘗試加寫鎖(try_lock ),try_lock 失敗說明其他進程持有了讀 鎖湿右,我們需要先將自己的讀鎖釋放掉诅妹,再進行加寫鎖操作,以避免死鎖的發(fā)生毅人。
- 解寫鎖時吭狡,假如之前曾經(jīng)持有讀鎖,那么我們不能直接釋放掉寫鎖丈莺,這樣會導致讀鎖也解了划煮。我們應(yīng)該加一 個讀鎖,將鎖降級缔俄。
基于上訴原理弛秋,封裝flock文件鎖C++類為:
enum LockType {
SharedLockType, //共享鎖 讀鎖器躏,我讀你也只能讀,不可加寫鎖
ExclusiveLockType, //排他鎖 寫鎖蟹略,只能一個單位獲得
};
/**
* 封裝支持遞歸鎖和鎖升降級的文件鎖
* 遞歸鎖
* 意思是如果一個進程/線程已經(jīng)擁有了鎖邀桑,那么后續(xù)的加鎖操作不會導致卡死,并且解鎖也不會導致外層的鎖被解掉科乎。
* 對于文件鎖來說,前者是滿足的贼急,后者則不然茅茂。
* 因為文件鎖是狀態(tài)鎖,沒有計數(shù)器太抓,無論加了多少次鎖空闲,一個解鎖操作就全解掉。只要用到子函數(shù)走敌,就非常需要遞歸鎖碴倾。
* 鎖升級/降級
* 鎖升級是指將已經(jīng)持有的共享鎖,升級為互斥鎖掉丽,亦即將讀鎖升級為寫鎖跌榔;鎖降級則是反過來。
* 文件鎖支持鎖升級捶障,但是容易死鎖:
* 假如 A僧须、B 進程都持有了讀鎖,現(xiàn)在都想升級到寫鎖项炼,就會陷入相互等待的困境担平,發(fā)生死鎖。
* 另外锭部,由于文件鎖不支持遞歸鎖暂论,也導致了鎖降級無法進行,一降就降到?jīng)]有鎖拌禾。
*/
class FileLock {
//文件句柄
int m_fd;
//文件鎖
flock m_lockInfo;
//讀計數(shù)
size_t m_sharedLockCount;
//寫計數(shù)
size_t m_exclusiveLockCount;
bool doLock(LockType lockType, bool wait);
bool isFileLockValid() { return m_fd >= 0; }
FileLock(const FileLock &other) = delete;
FileLock &operator=(const FileLock &other) = delete;
public:
FileLock(int fd) : m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0) {}
bool lock(LockType lockType);
bool try_lock(LockType lockType);
bool unlock(LockType lockType);
};
在實現(xiàn)中取胎,關(guān)鍵在于對讀寫計數(shù)器的操作,加鎖:
bool FileLock::doLock(LockType lockType, bool wait) {
if (!isFileLockValid()) {
return false;
}
bool unLockFirstIfNeeded = false;
//讀鎖
if (lockType == SharedLockType) {
// flock(寫鎖); // 還在用寫鎖蹋砚!
// flock(讀鎖); // 這時候上讀鎖,降級了6蟛ぁ! 但是寫鎖還在用坝咐,所以不能降級
m_sharedLockCount++;
// 如果本進程之前被上過讀鎖或者寫鎖 還未釋放循榆,那么不再加讀鎖
if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
return true;
}
} else {
//寫鎖
m_exclusiveLockCount++;
// 如果本進程之前上過寫鎖還未釋放
if (m_exclusiveLockCount > 1) {
return true;
}
// 如果當前已經(jīng)持有讀鎖,那么先嘗試加寫鎖墨坚,
// try_lock 失敗說明其他進程持有了讀鎖秧饮,需要先將自己的讀鎖釋放掉映挂,再進行加寫鎖操作,以免其他進程也在請求加寫鎖造成死鎖
if (m_sharedLockCount > 0) {
unLockFirstIfNeeded = true;
}
}
int realLockType = LockType2FlockType(lockType);
// LOCK_NB: 不阻塞
int cmd = wait ? realLockType : (realLockType | LOCK_NB);
if (unLockFirstIfNeeded) {
// try lock盗尸,這里肯定就是 LOCK_EX|LOCK_NB 柑船,
auto ret = flock(m_fd, realLockType | LOCK_NB);
if (ret == 0) { //加鎖成功
return true;
}
// 加鎖失敗, 先把自己的讀鎖釋放
flock(m_fd, LOCK_UN);
}
auto ret = flock(m_fd, cmd); //加鎖lock方法都是阻塞
if (ret != 0) {
return false;
} else {
return true;
}
}
解鎖:
bool FileLock::unlock(LockType lockType) {
if (!isFileLockValid()) {
return false;
}
bool unlockToSharedLock = false;
if (lockType == SharedLockType) {
if (m_sharedLockCount == 0) {
//沒鎖解,失敗
return false;
}
m_sharedLockCount--;
// 計數(shù)器不為0泼各,不解鎖
if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
//本次解鎖完成
return true;
}
} else {
if (m_exclusiveLockCount == 0) {
return false;
}
m_exclusiveLockCount--;
if (m_exclusiveLockCount > 0) {
return true;
}
// 寫鎖解除完了(計數(shù)為0)并且讀鎖還有計數(shù)鞍时,還原鎖為讀鎖
if (m_sharedLockCount > 0) {
unlockToSharedLock = true;
}
}
//unlockToSharedLock: 為true,需要解寫鎖,然而讀鎖還存在扣蜻!
int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
auto ret = flock(m_fd, cmd);
if (ret != 0) {
return false;
} else {
return true;
}
}
MMKV多進程設(shè)計
上面我們講過了flock文件鎖能夠?qū)崿F(xiàn)同一時間只有一個進程在操作持久化文件逆巍,但是如果存在AB進程,在B進程修 改完成之后莽使,A進程如何知道B進程的修改?
MMKV 本質(zhì)上是將文件 mmap 到內(nèi)存塊中锐极,將新增的 key-value 統(tǒng)統(tǒng) append 到內(nèi)存中;到達邊界后,進行重整 回寫以騰出空間芳肌,空間還是不夠的話灵再,就 double 內(nèi)存空間;對于內(nèi)存文件中可能存在的重復鍵值,MMKV 只選用 最后寫入的作為有效鍵值亿笤。那么其他進程為了保持數(shù)據(jù)一致翎迁,就需要處理這三種情況:寫指針增長、內(nèi)存重整净薛、內(nèi) 存增長鸳兽。但首先還得解決一個問題:怎么讓其他進程感知這三種情況?
-
寫指針的同步
我們可以在每個進程內(nèi)部緩存自己的寫指針,然后在寫入鍵值的同時罕拂,還要把最新的寫指針位置也寫到 mmap 內(nèi)存中;這樣每個進程只需要對比一下緩存的指針與 mmap 內(nèi)存的寫指針揍异,如果不一樣,就說明其他 進程進行了寫操作爆班。事實上 MMKV 原本就在文件頭部保存了有效內(nèi)存的大小衷掷,這個數(shù)值剛好就是寫指針的內(nèi) 存偏移量,我們可以重用這個數(shù)值來校對寫指針柿菩。
-
內(nèi)存重整的感知
考慮使用一個單調(diào)遞增的序列號戚嗅,每次發(fā)生內(nèi)存重整,就將序列號遞增枢舶。將這個序列號也放到 mmap 內(nèi)存 中懦胞,每個進程內(nèi)部也緩存一份,只需要對比序列號是否一致凉泄,就能夠知道其他進程是否觸發(fā)了內(nèi)存重整躏尉。
-
內(nèi)存增長的感知
事實上 MMKV 在內(nèi)存增長之前,會先嘗試通過內(nèi)存重整來騰出空間后众,重整后還不夠空間才申請新的內(nèi)存胀糜。所 以內(nèi)存增長可以跟內(nèi)存重整一樣處理颅拦。至于新的內(nèi)存大小,可以通過查詢文件大小來獲得教藻,無需在 mmap 內(nèi) 存另外存放距帅。
在MMKV中會生成一份與數(shù)據(jù)文件同名的.crc文件,此文件中記錄兩個關(guān)鍵數(shù)據(jù):數(shù)據(jù)內(nèi)容的crc校驗碼與單調(diào)遞增 的序列號括堤。
CRC校驗碼:循環(huán)冗余校驗 碌秸,類似文件MD5值。我們下載軟件往往會附帶MD5值悄窃,如AS哮肚。比較MD5值就能 知道文件是否合法、完整广匙,一旦修改了文件數(shù)據(jù),MD5值將不匹配恼策。
遞增的序列號:每次去重鸦致、擴容即執(zhí)行全量更新,序列號+1并記錄在crc文件中涣楷,不匹配則需要重新解析全部 文件分唾。
每次寫入與獲取數(shù)據(jù)需要執(zhí)行以下檢查:
void MMKV::checkLoadData() {
if (!m_isInterProcess) {
return;
}
SCOPEDLOCK(m_sharedProcessLock);
MMKVMetaInfo metaInfo;
metaInfo.read(m_metaFile.getMemory());
//本次讀取和記錄的不同
if (m_metaInfo.m_sequence != metaInfo.m_sequence) {
//內(nèi)存重整,序列號遞增
// 當一個進程發(fā)現(xiàn)內(nèi)存被重整了狮斗,就意味著原寫指針前面的鍵值全部失效绽乔,那么最簡單的做法是全部拋棄掉,從頭開始重新加載一遍碳褒。
LOGI("checkData:序列號改變");
clearMemoryState();
loadFromFile();
} else if (m_metaInfo.m_crcDigest != metaInfo.m_crcDigest) {
LOGI("checkData:校驗碼改變");
size_t fileSize = 0;
struct stat st = {0};
if (fstat(m_fd, &st) != -1) {
fileSize = (size_t) st.st_size;
}
if (m_size != fileSize) {
// 發(fā)生文件增長折砸,必然已經(jīng)先發(fā)生了內(nèi)存重整,與內(nèi)存重整一樣的處理
LOGI("checkData:文件大小改變");
clearMemoryState();
loadFromFile();
} else {
LOGI("checkData:寫指針增長");
// 文件大小不變沙峻,可能寫指針增長
partialLoadFromFile();
}
}
}