在關(guān)于同步的一點(diǎn)思考-下一文中谬运,我們知道glibc的pthread_cond_timedwait
底層是用linux futex機(jī)制實(shí)現(xiàn)的阔拳。
更多文章見個人博客:https://github.com/farmerjohngit/myblog
理想的同步機(jī)制應(yīng)該是沒有鎖沖突時在用戶態(tài)利用原子指令就解決問題勃痴,而需要掛起等待時再使用內(nèi)核提供的系統(tǒng)調(diào)用進(jìn)行睡眠與喚醒古瓤。換句話說膀藐,在用戶態(tài)的自旋失敗時媚值,能不能讓進(jìn)程掛起睦焕,由持有鎖的線程釋放鎖時將其喚醒藐握?
如果你沒有較深入地考慮過這個問題,很可能想當(dāng)然的認(rèn)為類似于這樣就行了(偽代碼):
void lock(int lockval) {
//trylock是用戶級的自旋鎖
while(!trylock(lockval)) {
wait();//釋放cpu垃喊,并將當(dāng)期線程加入等待隊(duì)列猾普,是系統(tǒng)調(diào)用
}
}
boolean trylock(int lockval){
int i=0;
//localval=1代表上鎖成功
while(!compareAndSet(lockval,0,1)){
if(++i>10){
return false;
}
}
return true;
}
void unlock(int lockval) {
compareAndSet(lockval,1,0);
notify();
}
上述代碼的問題是trylock和wait兩個調(diào)用之間存在一個窗口:
如果一個線程trylock失敗,在調(diào)用wait時持有鎖的線程釋放了鎖本谜,當(dāng)前線程還是會調(diào)用wait進(jìn)行等待初家,但之后就沒有人再喚醒該線程了。
為了解決上述問題,linux內(nèi)核引入了futex機(jī)制溜在,futex主要包括等待和喚醒兩個方法:futex_wait
和futex_wake
陌知,其定義如下
//uaddr指向一個地址,val代表這個地址期待的值掖肋,當(dāng)*uaddr==val時仆葡,才會進(jìn)行wait
int futex_wait(int *uaddr, int val);
//喚醒n個在uaddr指向的鎖變量上掛起等待的進(jìn)程
int futex_wake(int *uaddr, int n);
futex在真正將進(jìn)程掛起之前會檢查addr指向的地址的值是否等于val,如果不相等則會立即返回志笼,由用戶態(tài)繼續(xù)trylock沿盅。否則將當(dāng)期線程插入到一個隊(duì)列中去,并掛起纫溃。
在關(guān)于同步的一點(diǎn)思考-上文章中對futex的背景與基本原理有介紹腰涧,對futex不熟悉的人可以先看下。
本文將深入分析futex的實(shí)現(xiàn)紊浩,讓讀者對于鎖的最底層實(shí)現(xiàn)方式有直觀認(rèn)識窖铡,再結(jié)合之前的兩篇文章(關(guān)于同步的一點(diǎn)思考-上和關(guān)于同步的一點(diǎn)思考-下)能對操作系統(tǒng)的同步機(jī)制有個全面的理解。
下文中的進(jìn)程一詞包括常規(guī)進(jìn)程與線程郎楼。
futex_wait
在看下面的源碼分析前万伤,先思考一個問題:如何確保掛起進(jìn)程時,val的值是沒有被其他進(jìn)程修改過的呜袁?
代碼在kernel/futex.c中
static int futex_wait(u32 __user *uaddr, int fshared,
u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q;
int ret;
...
//設(shè)置hrtimer定時任務(wù):在一定時間(abs_time)后敌买,如果進(jìn)程還沒被喚醒則喚醒wait的進(jìn)程
if (abs_time) {
...
hrtimer_init_sleeper(to, current);
...
}
retry:
//該函數(shù)中判斷uaddr指向的值是否等于val,以及一些初始化操作
ret = futex_wait_setup(uaddr, val, fshared, &q, &hb);
//如果val發(fā)生了改變阶界,則直接返回
if (ret)
goto out;
//將當(dāng)前進(jìn)程狀態(tài)改為TASK_INTERRUPTIBLE虹钮,并插入到futex等待隊(duì)列,然后重新調(diào)度膘融。
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
//如果unqueue_me成功芙粱,則說明是超時觸發(fā)(因?yàn)閒utex_wake喚醒時,會將該進(jìn)程移出等待隊(duì)列氧映,所以這里會失敶号稀)
if (!unqueue_me(&q))
goto out_put_key;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out_put_key;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current)) {
put_futex_key(fshared, &q.key);
goto retry;
}
ret = -ERESTARTSYS;
if (!abs_time)
goto out_put_key;
...
out_put_key:
put_futex_key(fshared, &q.key);
out:
if (to) {
//取消定時任務(wù)
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
在將進(jìn)程阻塞前會將當(dāng)期進(jìn)程插入到一個等待隊(duì)列中,需要注意的是這里說的等待隊(duì)列其實(shí)是一個類似Java HashMap的結(jié)構(gòu)岛都,全局唯一律姨。
struct futex_hash_bucket {
spinlock_t lock;
//雙向鏈表
struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
著重看futex_wait_setup
和兩個函數(shù)futex_wait_queue_me
static int futex_wait_setup(u32 __user *uaddr, u32 val, int fshared,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
retry:
q->key = FUTEX_KEY_INIT;
//初始化futex_q
ret = get_futex_key(uaddr, fshared, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//獲得自旋鎖
*hb = queue_lock(q);
//原子的將uaddr的值設(shè)置到uval中
ret = get_futex_value_locked(&uval, uaddr);
...
//如果當(dāng)期uaddr指向的值不等于val,即說明其他進(jìn)程修改了
//uaddr指向的值臼疫,等待條件不再成立择份,不用阻塞直接返回。
if (uval != val) {
//釋放鎖
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}
...
return ret;
}
函數(shù)futex_wait_setup
中主要做了兩件事烫堤,一是獲得自旋鎖荣赶,二是判斷*uaddr是否為預(yù)期值凤价。
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
//設(shè)置進(jìn)程狀態(tài)為TASK_INTERRUPTIBLE,cpu調(diào)度時只會選擇
//狀態(tài)為TASK_RUNNING的進(jìn)程
set_current_state(TASK_INTERRUPTIBLE);
//將當(dāng)期進(jìn)程(q封裝)插入到等待隊(duì)列中去拔创,然后釋放自旋鎖
queue_me(q, hb);
//啟動定時任務(wù)
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
//如果沒有設(shè)置過期時間 || 設(shè)置了過期時間且還沒過期
if (!timeout || timeout->task)
//系統(tǒng)重新進(jìn)行進(jìn)程調(diào)度利诺,這個時候cpu會去執(zhí)行其他進(jìn)程,該進(jìn)程會阻塞在這里
schedule();
}
//走到這里說明又被cpu選中運(yùn)行了
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me
中主要做幾件事:
- 將當(dāng)期進(jìn)程插入到等待隊(duì)列
- 啟動定時任務(wù)
- 重新調(diào)度進(jìn)程
如何保證條件與等待之間的原子性
在futex_wait_setup
方法中會加自旋鎖剩燥;在futex_wait_queue_me
中將狀態(tài)設(shè)置為TASK_INTERRUPTIBLE
立轧,調(diào)用queue_me
將當(dāng)期線程插入到等待隊(duì)列中,然后才釋放自旋鎖躏吊。也就是說檢查uaddr的值的過程跟進(jìn)程掛起的過程放在同一個臨界區(qū)中。當(dāng)釋放自旋鎖后帐萎,這時再更改addr地址的值已經(jīng)沒有關(guān)系了比伏,因?yàn)楫?dāng)期進(jìn)程已經(jīng)加入到等待隊(duì)列中,能被wake喚醒疆导,不會出現(xiàn)本文開頭提到的沒人喚醒的問題赁项。
futex_wait小結(jié)
總結(jié)下futex_wait
流程:
- 加自旋鎖
- 檢測*uaddr是否等于val,如果不相等則會立即返回
- 將進(jìn)程狀態(tài)設(shè)置為
TASK_INTERRUPTIBLE
- 將當(dāng)期進(jìn)程插入到等待隊(duì)列中
- 釋放自旋鎖
- 創(chuàng)建定時任務(wù):當(dāng)超過一定時間還沒被喚醒時澈段,將進(jìn)程喚醒
- 掛起當(dāng)前進(jìn)程
futex_wake
futex_wake
static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;
...
//根據(jù)uaddr的值填充&key的內(nèi)容
ret = get_futex_key(uaddr, fshared, &key, VERIFY_READ);
if (unlikely(ret != 0))
goto out;
//根據(jù)&key獲得對應(yīng)uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
//對該hb加自旋鎖
spin_lock(&hb->lock);
head = &hb->chain;
//遍歷該hb的鏈表悠菜,注意鏈表中存儲的節(jié)點(diǎn)是plist_node類型,而而這里的this卻是futex_q類型败富,這種類型轉(zhuǎn)換是通過c中的container_of機(jī)制實(shí)現(xiàn)的
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex (&this->key, &key)) {
...
//喚醒對應(yīng)進(jìn)程
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
//釋放自旋鎖
spin_unlock(&hb->lock);
put_futex_key(fshared, &key);
out:
return ret;
}
futex_wake
流程如下:
- 找到uaddr對應(yīng)的
futex_hash_bucket
悔醋,即代碼中的hb - 對hb加自旋鎖
- 遍歷fb的鏈表,找到uaddr對應(yīng)的節(jié)點(diǎn)
- 調(diào)用
wake_futex
喚起等待的進(jìn)程 - 釋放自旋鎖
wake_futex
中將制定進(jìn)程狀態(tài)設(shè)置為TASK_RUNNING
并加入到系統(tǒng)調(diào)度列表中兽叮,同時將進(jìn)程從futex的等待隊(duì)列中移除掉芬骄,具體代碼就不分析了,有興趣的可以自行研究鹦聪。
End
Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進(jìn)行線程同步账阻,理解futex的實(shí)現(xiàn)能幫助你更好的理解與使用這些上層的同步機(jī)制。另外因篇幅與精力有限泽本,涉及到進(jìn)程調(diào)度的相關(guān)內(nèi)容沒有具體分析淘太,不過并不妨礙理解文章內(nèi)容,