MMKV的原理與實現(xiàn)(三)
MMKV多線程設計
1. C/C++中的線程:POSIX線程
POSIX斋射,全稱為可移植性操作系統(tǒng)接口。它包括了系統(tǒng)應用程序接口(簡稱API)蕾额。該標準的目的是定義了標準的基于UNIX操作系統(tǒng)的系統(tǒng)接口和環(huán)境來支持源代碼級的可移植性早芭,致力于提供基于不同語言的規(guī)范。POSIX的線程標準诅蝶,定義了創(chuàng)建和操縱線程的一套API退个。
2. 基本使用
#include <iostream>
#include <queue>
#include <pthread.h>
void *run(void* args) {
//異步方法
int i = *(int*)i; // 10
cout << "i === " << i << endl;
return nullptr;
}
int main() {
int i = 10;
pthread_t pid;
pthread_create(&pid, nullptr, run, &i);
pthread_join(pid, nullptr);
return 0;
}
上面運行會將10打印出來募壕,pthread_create中有4個參數(shù):
- pid, 線程id语盈,傳入pthread_t類型的地址引用
- attr舱馅, 這個是線程的類型,后面會詳細講到
- 相當于回調函數(shù)
- 將值傳入到回調函數(shù)中
3.線程同步
同java一樣刀荒,多線程同時讀寫同一份共享資源的時候代嗤,可能會引起沖突。需要引入線程“同步”機制缠借,即各線程之間有序地對共享資源進行操作干毅。
拿以下舉例:
queue<int> q;
void *pop(void *args) {
if (!q.empty()) {
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop();
} else {
printf("無數(shù)據(jù)\n");
}
return 0;
}
int main() {
for (size_t i = 0; i < 5; i++) {
q.push(i);
}
pthread_t pid[10];
for (size_t i = 0; i < 10; i++) {
pthread_create(&pid[i], 0, pop, &q);
}
return 0;
}
我們創(chuàng)建了5個隊列,然后創(chuàng)建10個線程分別從隊列里面取值烈炭,可以看到前3個線程都取了0溶锭,這就是線程不同步導致的問題,那么怎么解決呢符隙?
互斥量
首先要了解一下互斥量(
pthread_mutex_t
)的概念:互斥量就是一把鎖趴捅。 當一個線程要訪問一個共享變量時,先用鎖把變量鎖住霹疫,操作完了之后再釋放掉鎖拱绑。
當另一個線程也要訪問這個變量時,發(fā)現(xiàn)這個變量被鎖住了丽蝎,此時無法訪問猎拨,一直等待直到鎖沒了,才能夠上鎖與使用屠阻。
使用互斥量前要先初始化红省,使用的函數(shù)如下:
queue<int> q;
pthread_mutex_t mutex; //加入互斥量:鎖
void *pop(void *args) {
// 鎖
pthread_mutex_lock(&mutex);
if (!q.empty()) {
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop();
} else {
printf("無數(shù)據(jù)\n");
}
// 放
pthread_mutex_unlock(&mutex);
return 0;
}
int main() {
//初始化互斥鎖
pthread_mutex_init(&mutex, 0);
for (size_t i = 0; i < 5; i++) {
q.push(i);
}
pthread_t pid[10];
for (size_t i = 0; i < 10; i++) {
pthread_create(&pid[i], 0, pop, &q);
}
//需要釋放
pthread_mutex_destroy(&mutex);
return 0;
}
在以上的例子中,我們加入互斥量国觉,這樣打印出來就正常了吧恃。但是pthread_mutex_t鎖是默認是非遞歸的,即不可重入鎖麻诀。如果一個線程多次獲取同一個非遞歸鎖痕寓,則會產生死鎖:
(以下代碼在win編輯器中可能不會報錯,但是程序并不會正常結束)
queue<int> q;
pthread_mutex_t mutex; //加入互斥量:
void test(){
pthread_mutex_lock(&mutex); //線程阻塞蝇闭,死鎖
printf("隊列大小:%d\n", q.size());
pthread_mutex_unlock(&mutex);
}
void *pop(void *args) {
// 鎖
pthread_mutex_lock(&mutex);
if (!q.empty()) {
printf("取出數(shù)據(jù):%d\n", q.front());
q.pop();
} else {
printf("無數(shù)據(jù)\n");
}
// test中也鎖了一下呻率,這樣就產生了死鎖
test();
// 放
pthread_mutex_unlock(&mutex);
return 0;
}
int main() {
//初始化互斥鎖
pthread_mutex_init(&mutex, 0);
for (size_t i = 0; i < 5; i++) {
q.push(i);
}
pthread_t pid[10];
for (size_t i = 0; i < 10; i++) {
pthread_create(&pid[i], 0, pop, &q);
}
//需要釋放
pthread_mutex_destroy(&mutex);
return 0;
}
pthread_mutex_init(&mutex, 0);
互斥量初始化有兩個參數(shù),如果要讓互斥量成為一把可重入鎖呻引,需要在初始化的時候設置attr的類型:
// 鎖的屬性 : pthread_mutex_t鎖默認是非遞歸的
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
// 設置為遞歸鎖
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
// 初始化mutex
pthread_mutex_init(&mutex, &attr);
// 完成初始化后即可釋放
pthread_mutexattr_destroy(&attr);
這樣初始化test中的打印也就正常了礼仗。
4. 條件變量(干貨,不感興趣可跳過)
條件變量是線程間進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起藐守;另一個線程使"條件成立",從而喚醒掛起線程
舉個例子:
template <class T>
class SafeQueue {
public:
SafeQueue() {
pthread_mutex_init(&mutex,0);
}
~SafeQueue() {
pthread_mutex_destory(&mutex);
}
void enqueue(T t) {
pthread_mutex_lock(&mutex);
q.push(t);
pthread_mutex_unlock(&mutex);
}
int dequeue(T& t) {
pthread_mutex_lock(&mutex);
if (!q.empty())
{
t = q.front();
q.pop();
pthread_mutex_unlock(&mutex);
return 1;
}
pthread_mutex_unlock(&mutex);
return 0;
}
private:
queue<T> q;
pthread_mutex_t mutex;
};
上面的模板類存放數(shù)據(jù)T挪丢,并使用互斥鎖保證對queue的操作是線程安全的。這就是一個生產/消費模式卢厂。如果需要在取出數(shù)據(jù)的時候,queue為空惠啄,則一直等待慎恒,直到下一次enqueue加入數(shù)據(jù)。此時可以加入條件變量使 “dequeue” 掛起,直到由其他地方喚醒:
#include <queue>
using namespace std;
template <class T>
class SafeQueue {
queue<T> q;
pthread_mutex_t mutex;
pthread_cond_t cond; //條件變量
public:
SafeQueue() {
pthread_mutex_init(&mutex,0);
pthread_cond_init(&cond, 0); //初始化
}
~SafeQueue() {
pthread_mutex_destory(&mutex);
pthread_cond_destory(&cond); //銷毀
}
void enqueue(T t) {
pthread_mutex_lock(&mutex);
q.push(t);
//發(fā)出信號 通知掛起線程
//1撵渡、由系統(tǒng)喚醒一個線程(隨機)
//pthread_cond_signal(&cond);
//2融柬、廣播 喚醒所有等待條件線程
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
}
int dequeue(T& t) {
pthread_mutex_lock(&mutex);
//可能因為某些特殊條件虛假喚醒 所以while循環(huán)等待喚醒。(與Java的wait一樣)
while (q.empty())
{
pthread_cond_wait(&cond, &mutex); //等待并自動釋放互斥鎖
}
t = q.front();
q.pop();
pthread_mutex_unlock(&mutex);
return 1;
}
};
比如存在三個線程趋距,分別為:生產者P粒氧、消費者C1與C2。
1节腐、C1從隊列中取出數(shù)據(jù)外盯,此時隊列為空;
2翼雀、C2也想從隊列中獲取一個元素饱苟,但此時隊列為空,C2進入阻塞(cond.wait())狼渊,等待隊列非空箱熬;
3、 P將一個元素入隊狈邑,并喚醒條件變量城须;
4、C1與C2接收到喚醒信號米苹,解除阻塞狀態(tài)糕伐,上鎖并獲取隊列中的元素;
5、C2優(yōu)先獲取到鎖驱入,移除隊列元素并釋放鎖赤炒;
6、C1此時操作的隊列為空亏较,被虛假喚醒莺褒。
5. 鎖的自動管理
每次創(chuàng)建一個線程時,都需要初始化(init)和銷毀(destroy)雪情, 使用時也需要鎖(lock)和解鎖(unlock)遵岩。為了防止忘記手動銷毀和解鎖,同時方便使用,MMKV利用了C++中對象的構造函數(shù)和析構函數(shù)進行了封裝尘执,實現(xiàn)鎖的自動管理舍哄。
這里我們看一下源碼: ThreadLock.cpp
#include "ThreadLock.h"
#include "MMKVLog.h"
ThreadLock::ThreadLock() {
// 創(chuàng)建一把遞歸鎖
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&m_lock, &attr);
pthread_mutexattr_destroy(&attr);
}
ThreadLock::~ThreadLock() {
pthread_mutex_destroy(&m_lock);
}
void ThreadLock::lock() {
auto ret = pthread_mutex_lock(&m_lock);
if (ret != 0) {
// 失敗
MMKVError("fail to lock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
}
}
bool ThreadLock::try_lock() {
auto ret = pthread_mutex_trylock(&m_lock);
if (ret != 0) {
MMKVError("fail to try lock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
}
return (ret == 0);
}
void ThreadLock::unlock() {
auto ret = pthread_mutex_unlock(&m_lock);
if (ret != 0) {
MMKVError("fail to unlock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
}
}
這個類只是對鎖進行了封裝,使用的時候還需要手動調用lock()和unlock()方法誊锭。我們是不是可以把這一步也省略掉呢表悬?當然可以,我們來對這把鎖進行二次封裝:
ScopedLock.cpp源碼
#ifndef MMKV_SCOPEDLOCK_HPP
#define MMKV_SCOPEDLOCK_HPP
#include "MMKVLog.h"
template <typename T>
class ScopedLock {
T *m_lock;
// just forbid it for possibly misuse
ScopedLock(const ScopedLock<T> &other) = delete;
ScopedLock &operator=(const ScopedLock<T> &other) = delete;
public:
// 構造方法中上鎖
ScopedLock(T *oLock) : m_lock(oLock) {
assert(m_lock);
lock();
}
// 析構方法解除鎖
~ScopedLock() {
unlock();
m_lock = nullptr;
}
void lock() {
if (m_lock) {
m_lock->lock();
}
}
bool try_lock() {
if (m_lock) {
return m_lock->try_lock();
}
return false;
}
void unlock() {
if (m_lock) {
m_lock->unlock();
}
}
};
//宏函數(shù) __COUNTER__:初始值為0丧靡,編譯單元內每出現(xiàn)一次出現(xiàn)該宏,便會加1蟆沫。
#define SCOPEDLOCK(lock) _SCOPEDLOCK(lock, __COUNTER__)
#define _SCOPEDLOCK(lock, counter) __SCOPEDLOCK(lock, counter)
#define __SCOPEDLOCK(lock, counter) ScopedLock<decltype(lock)> __scopedLock##counter(&lock)
//#include <type_traits>
//#define __SCOPEDLOCK(lock, counter)
//decltype:推斷變量類型;__scopedLock##counter(&lock):##為連接符ScopedLock<std::remove_pointer<decltype(lock)>::type> __scopedLock##counter(lock)
#endif //MMKV_SCOPEDLOCK_HPP
可以看到温治,這個類在構造方法中上鎖饭庞,析構方法中解鎖,使用的時候只需要初始化這個類就可以了熬荆,函數(shù)調用完成之后舟山,會自動調用這個類的析構函數(shù),這樣就完成了自動上鎖 和解鎖的操作卤恳。
為了更加方便累盗,下面定義了一系列的宏函數(shù),最終會調用到ScopedLock<decltype(lock)> __scopedLock##counter(&lock)
這個方法纬黎。具體可以看下注釋幅骄。
使用的時候直接調用SCOPEDLOCK()宏函數(shù)即可
//使用
ThreadLock lock;
void test(){
//創(chuàng)建 ScopedLock<ThreadLock> __scopedLock1對象,使用lock上鎖
SCOPEDLOCK(lock);
//退出方法 執(zhí)行__scopedLock1析構本今,解鎖lock
}
這樣自動推導lock的類型并自動lock和unlock拆座。我們在MMKV的源碼中也可以找到大量這樣的語句:
不得不說C++還真是博大精深吶! 這一篇多線程的處理方案就到此為止了冠息。那么挪凑,MMKV的跨進程是如何實現(xiàn)的?
MMKV多進程設計
1. 文件鎖
和多線程一個道理逛艰,在多個進程同時操作同一份文件的過程中躏碳,很容易導致文件中的數(shù)據(jù)混亂,需要鎖操作來保證數(shù)據(jù)的完整性散怖。 在最新版本的MMKV中使用flock文件鎖來完成多進程操作文件的同步
#include <sys/file.h>
// Returns 0 on success, or -1 on error
int flock (int fd, int operation);
flock()系統(tǒng)調用是在整個文件中加鎖菇绵,通過對傳入的fd所指向的文件進行操作,然后在通過operation參數(shù)所設置
的值來確定做什么樣的操作镇眷。operation可以賦如下值:
LOCK_SH咬最,共享鎖,多個進程可以使用同一把鎖:讀鎖欠动;
LOCK_EX永乌,排他鎖惑申,同時只允許一個進程使用:寫鎖;
LOCK_UN翅雏,釋放鎖
LOCK_BN圈驼,發(fā)起非阻塞請求,如:LOCK_SH|LOCK_BN望几。
任意數(shù)量的進程可同時持有一個文件上的共享鎖(讀鎖)绩脆,但只能有一個進程能夠持有一個文件上的互斥鎖(寫
鎖)。flock支持鎖升級:只有自己進程存在讀鎖橄妆,可以直接升級為寫鎖衙伶,在轉換的過程中首先會刪除既有的鎖,然
后創(chuàng)建新鎖 害碾。若其他進程存在讀鎖,需要等待釋放讀鎖赦拘;
在設計MMKV中的文件鎖需要實現(xiàn):
以下內容可見官方文檔慌随,比較詳細,這里把重要 的知識點羅列出來躺同,具體細節(jié)可以去深究以下:
https://github.com/Tencent/MMKV/wiki/android_ipc
-
遞歸鎖
意思是如果一個進程/線程已經(jīng)擁有了鎖阁猜,那么后續(xù)的加鎖操作不會導致卡死,并且解鎖也不會導致外層的鎖
被解掉蹋艺。對于文件鎖來說剃袍,前者是滿足的,后者則不然捎谨。因為文件鎖是狀態(tài)鎖民效,沒有計數(shù)器,無論加了多少
次鎖涛救,一個解鎖操作就全解掉畏邢。只要用到子函數(shù),就非常需要遞歸鎖检吆。 -
鎖升級/降級
鎖升級是指將已經(jīng)持有的共享鎖舒萎,升級為互斥鎖,亦即將讀鎖升級為寫鎖蹭沛;鎖降級則是反過來臂寝。文件鎖支持
鎖升級,但是容易死鎖:假如 A摊灭、B 進程都持有了讀鎖咆贬,現(xiàn)在都想升級到寫鎖,就會陷入相互等待的困境斟或,產生死鎖素征。另外,由于文件鎖不支持遞歸鎖,也導致了鎖降級無法進行御毅,一降就降到?jīng)]有鎖根欧。
為了解決這兩個難題,需要對文件鎖進行封裝端蛆,增加讀鎖凤粗、寫鎖計數(shù)器。
- 加寫鎖時今豆,如果當前已經(jīng)持有讀鎖嫌拣,那么先嘗試加寫鎖(try_lock ),try_lock 失敗說明其他進程持有了讀
鎖呆躲,我們需要先將自己的讀鎖釋放掉异逐,再進行加寫鎖操作,以避免死鎖的發(fā)生插掂。 - 解寫鎖時灰瞻,假如之前曾經(jīng)持有讀鎖,那么我們不能直接釋放掉寫鎖辅甥,這樣會導致讀鎖也解了酝润。我們應該加一
個讀鎖,將鎖降級璃弄。
對于讀寫鎖的計數(shù)器要销,我們來看一下官方文檔的圖:
讀鎖計數(shù)器 | 寫鎖計數(shù)器 | 加讀鎖 | 加寫鎖 | 解讀鎖 | 解寫鎖 |
---|---|---|---|---|---|
0 | 0 | 加讀鎖 | 加寫鎖 | - | - |
0 | 1 | +1 | +1 | - | 解寫鎖 |
0 | N | +1 | +1 | - | -1 |
1 | 0 | +1 | 解讀鎖再加寫鎖 | 解讀鎖 | - |
1 | 1 | +1 | +1 | -1 | 加讀鎖 |
1 | N | +1 | +1 | -1 | -1 |
N | 0 | +1 | 解讀鎖再加寫鎖 | -1 | - |
N | 1 | +1 | +1 | -1 | 加讀鎖 |
N | N | +1 | +1 | -1 | -1 |
需要注意的地方有兩點:
- 加寫鎖時,如果當前已經(jīng)持有讀鎖夏块,那么先嘗試加寫鎖疏咐,try_lock 失敗說明其他進程持有了讀鎖,我們需要先將自己的讀鎖釋放掉拨扶,再進行加寫鎖操作凳鬓,以避免死鎖的發(fā)生。
- 解寫鎖時患民,假如之前曾經(jīng)持有讀鎖缩举,那么我們不能直接釋放掉寫鎖,這樣會導致讀鎖也解了匹颤。我們應該加一個讀鎖仅孩,將鎖降級。
2. MMKV源碼解讀
基于以上原理印蓖,封裝flock文件鎖C++類為:
#ifndef MMKV_INTERPROCESSLOCK_H
#define MMKV_INTERPROCESSLOCK_H
#include <cassert>
#include <fcntl.h>
enum LockType {
SharedLockType,
ExclusiveLockType,
};
// a recursive POSIX file-lock wrapper
// handles lock upgrade & downgrade correctly
class FileLock {
// 文件句柄
int m_fd;
// 讀鎖計數(shù)器
size_t m_sharedLockCount;
// 解鎖計數(shù)器
size_t m_exclusiveLockCount;
bool doLock(LockType lockType, bool wait);
bool isFileLockValid() { return m_fd >= 0; }
// just forbid it for possibly misuse
FileLock(const FileLock &other) = delete;
FileLock &operator=(const FileLock &other) = delete;
public:
FileLock(int fd) : m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0) {}
bool lock(LockType lockType);
bool try_lock(LockType lockType);
bool unlock(LockType lockType);
};
class InterProcessLock {
FileLock *m_fileLock;
LockType m_lockType;
public:
InterProcessLock(FileLock *fileLock, LockType lockType)
: m_fileLock(fileLock), m_lockType(lockType), m_enable(true) {
assert(m_fileLock);
}
bool m_enable;
void lock() {
if (m_enable) {
m_fileLock->lock(m_lockType);
}
}
bool try_lock() {
if (m_enable) {
return m_fileLock->try_lock(m_lockType);
}
return false;
}
void unlock() {
if (m_enable) {
m_fileLock->unlock(m_lockType);
}
}
};
#endif //MMKV_INTERPROCESSLOCK_H
在實現(xiàn)中辽慕,關鍵點再于讀寫計數(shù)器的操作,加鎖:
bool FileLock::doLock(LockType lockType, bool wait) {
if (!isFileLockValid()) {
return false;
}
bool unLockFirstIfNeeded = false;
// 讀鎖
if (lockType == SharedLockType) {
m_sharedLockCount++;
// 以下兩個判斷:如果本進程之前被上過讀鎖或者寫鎖 還未釋放赦肃,那么不再加讀鎖
if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
return true;
}
} else {
m_exclusiveLockCount++;
// 如果本進程之前上過寫鎖還未釋放
if (m_exclusiveLockCount > 1) {
return true;
}
// 如果當前已經(jīng)持有讀鎖溅蛉,那么先嘗試加寫鎖公浪,
// try_lock 失敗說明其他進程持有了讀鎖,需要先將自己的讀鎖釋放掉船侧,再進行加寫鎖操作欠气,以免其他進 程也在請求加寫鎖造成死鎖
if (m_sharedLockCount > 0) {
unLockFirstIfNeeded = true;
}
}
int realLockType = LockType2FlockType(lockType);
// LOCK_NB: 不阻塞
int cmd = wait ? realLockType : (realLockType | LOCK_NB);
if (unLockFirstIfNeeded) {
// // try lock,這里肯定就是 LOCK_EX|LOCK_NB 镜撩,
auto ret = flock(m_fd, realLockType | LOCK_NB);
if (ret == 0) { //加鎖成功
return true;
}
// 加鎖失敗, 先把自己的讀鎖釋放
ret = flock(m_fd, LOCK_UN);
if (ret != 0) {
MMKVError("fail to try unlock first fd=%d, ret=%d, error:%s", m_fd, ret,
strerror(errno));
}
}
//加鎖lock方法都是阻塞
auto ret = flock(m_fd, cmd);
if (ret != 0) {
MMKVError("fail to lock fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
return false;
} else {
return true;
}
}
解鎖:
bool FileLock::unlock(LockType lockType) {
if (!isFileLockValid()) {
return false;
}
bool unlockToSharedLock = false;
//讀鎖
if (lockType == SharedLockType) {
if (m_sharedLockCount == 0) {
//如果一個鎖都沒有预柒,還解什么,失敗
return false;
}
m_sharedLockCount--;
// don't want shared-lock to break any existing locks
// 還存在讀鎖(讀鎖計數(shù)器不為0)或者還存在寫鎖袁梗,不執(zhí)行解鎖
if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
//本次解鎖完成
return true;
}
} else {
//寫鎖計數(shù)器為0宜鸯,不操作
if (m_exclusiveLockCount == 0) {
return false;
}
//寫鎖計數(shù)器-1,不為0遮怜,同樣不操作
m_exclusiveLockCount--;
if (m_exclusiveLockCount > 0) {
return true;
}
// restore shared-lock when all exclusive-locks are done
//到這一步表示無寫鎖了(寫鎖計數(shù)器為0)
// 同時還存在讀鎖淋袖,不能解鎖,需要降級寫鎖為讀鎖
if (m_sharedLockCount > 0) {
unlockToSharedLock = true;
}
}
//是否降級
int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
auto ret = flock(m_fd, cmd);
if (ret != 0) {
MMKVError("fail to unlock fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
return false;
} else {
return true;
}
}
上面加鎖和解鎖的步驟锯梁,具體細節(jié)都在注釋里面了适贸,大家可以看看。
多進程的講解官方文檔比較詳細涝桅,這里再把官方地址貼一下,這里大家只需要了解加鎖和解鎖的步驟就完全oK
了烙样。
<a >https://github.com/Tencent/MMKV/wiki/android_ipc</a>