原文鏈接深入理解GCD之dispatch_semaphore
再研究完dispatch_queue
之后,本來是打算進(jìn)入到dispath_group
的源碼,但是dispath_group
基本是圍繞著dispatch_semaphore
即信號量實現(xiàn)的屠凶,所以我們先進(jìn)入到dispatch_semaphore
的源碼學(xué)習(xí)祠锣。在GCD中使用dispatch_semaphore
用來保證資源使用的安全性(隊列的同步執(zhí)行就是依賴信號量實現(xiàn))绒障。可想而知喘帚,dispatch_semaphore
的性能應(yīng)該是不差的。
dispatch_semaphore_t
dispatch_semaphore_s
是信號量的結(jié)構(gòu)體咒钟。代碼如下:
struct dispatch_semaphore_s {
DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
long dsema_value; //當(dāng)前信號量
long dsema_orig; //初始化信號量
size_t dsema_sent_ksignals;
#if USE_MACH_SEM && USE_POSIX_SEM
#error "Too many supported semaphore types"
#elif USE_MACH_SEM
semaphore_t dsema_port;
semaphore_t dsema_waiter_port;
#elif USE_POSIX_SEM
sem_t dsema_sem;
#else
#error "No supported semaphore type"
#endif
size_t dsema_group_waiters;
struct dispatch_sema_notify_s *dsema_notify_head; //notify鏈表頭部
struct dispatch_sema_notify_s *dsema_notify_tail; //notify鏈表尾部
};
typedef mach_port_t semaphore_t;
struct dispatch_sema_notify_s {
struct dispatch_sema_notify_s *volatile dsn_next; //下一個信號節(jié)點
dispatch_queue_t dsn_queue; //操作的隊列
void *dsn_ctxt; //上下文
void (*dsn_func)(void *); //執(zhí)行函數(shù)
};
雖然上面還有一些屬性不知道是做什么作用的吹由,但我們繼續(xù)往下走。
dispatch_semaphore_create
dispatch_semaphore_create
用于信號量的創(chuàng)建朱嘴。
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {//value必須大于等于0
return NULL;
}
//申請dispatch_semaphore_s的內(nèi)存
dsema = calloc(1, sizeof(struct dispatch_semaphore_s));
if (fastpath(dsema)) {
//設(shè)置dispatch_semaphore_s 的操作函數(shù)
dsema->do_vtable = &_dispatch_semaphore_vtable;
//設(shè)置鏈表尾部
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
//引用計數(shù)
dsema->do_ref_cnt = 1;
dsema->do_xref_cnt = 1;
//目標(biāo)隊列的設(shè)置
dsema->do_targetq = dispatch_get_global_queue(
DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
//當(dāng)前信號量和初始化信號的賦值
dsema->dsema_value = value;
dsema->dsema_orig = value;
#if USE_POSIX_SEM
int ret = sem_init(&dsema->dsema_sem, 0, 0);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
}
return dsema;
}
上面的源碼中dsema->do_vtable = &_dispatch_semaphore_vtable;
_dispatch_semaphore_vtable
定義如下:
const struct dispatch_semaphore_vtable_s _dispatch_semaphore_vtable = {
.do_type = DISPATCH_SEMAPHORE_TYPE,
.do_kind = "semaphore",
.do_dispose = _dispatch_semaphore_dispose,
.do_debug = _dispatch_semaphore_debug,
};
這里有個_dispatch_semaphore_dispose
函數(shù)就是信號量的銷毀函數(shù)倾鲫。代碼如下:
static void
_dispatch_semaphore_dispose(dispatch_semaphore_t dsema)
{
//信號量的當(dāng)前值小于初始化,會發(fā)生閃退萍嬉。因為信號量已經(jīng)被釋放了
if (dsema->dsema_value < dsema->dsema_orig) {
DISPATCH_CLIENT_CRASH(
"Semaphore/group object deallocated while in use");
}
#if USE_MACH_SEM
kern_return_t kr;
//釋放信號乌昔,這個信號是dispatch_semaphore使用的信號
if (dsema->dsema_port) {
kr = semaphore_destroy(mach_task_self(), dsema->dsema_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
//釋放信號,這個信號是dispatch_group使用的信號
if (dsema->dsema_waiter_port) {
kr = semaphore_destroy(mach_task_self(), dsema->dsema_waiter_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
#elif USE_POSIX_SEM
int ret = sem_destroy(&dsema->dsema_sem);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
_dispatch_dispose(dsema);
}
dispatch_semaphore_wait
創(chuàng)建好一個信號量后就會開始進(jìn)入等待信號發(fā)消息帚湘。
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
//原子性減1玫荣,這里說明dsema_value是當(dāng)前信號值,并將新值賦給value
long value = dispatch_atomic_dec2o(dsema, dsema_value);
dispatch_atomic_acquire_barrier();
if (fastpath(value >= 0)) {
//說明有資源可用大诸,直接返回0捅厂,表示等到信號量的信息了
return 0;
}
//等待信號量喚醒或者timeout超時
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
_dispatch_semaphore_wait_slow
在dispatch_semaphore_wait
中,如果value
小于0
资柔,就會執(zhí)行_dispatch_semaphore_wait_slow
等待信號量喚醒或者timeout超時焙贷。_dispatch_semaphore_wait_slow
的代碼如下:
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
again:
// Mach semaphores appear to sometimes spuriously wake up. Therefore,
// we keep a parallel count of the number of times a Mach semaphore is
// signaled (6880961).
//第一部分:
//只要dsema->dsema_sent_ksignals不為零就會進(jìn)入循環(huán)
//dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,orig - 1)的意思是
//dsema->dsema_sent_ksignals如果等于orig,則將orig - 1賦值給dsema_sent_ksignals贿堰,
//并且返回true辙芍,否則返回false。
//如果返回true羹与,說明又獲取了資源
while ((orig = dsema->dsema_sent_ksignals)) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
orig - 1)) {
return 0;
}
}
#if USE_MACH_SEM
mach_timespec_t _timeout;
kern_return_t kr;
//第二部分:dispatch_semaphore_s中的dsema_port賦值故硅,以懶加載的形式
_dispatch_semaphore_create_port(&dsema->dsema_port);
// From xnu/osfmk/kern/sync_sema.c:
// wait_semaphore->count = -1; /* we don't keep an actual count */
//
// The code above does not match the documentation, and that fact is
// not surprising. The documented semantics are clumsy to use in any
// practical way. The above hack effectively tricks the rest of the
// Mach semaphore logic to behave like the libdispatch algorithm.
//第三部分:
switch (timeout) {
default:
//計算剩余時間,調(diào)用mach內(nèi)核的等待函數(shù)semaphore_timedwait()進(jìn)行等待纵搁。
//如果在指定時間內(nèi)沒有得到通知吃衅,則會一直阻塞住,監(jiān)聽dsema_port等待其通知腾誉;
//當(dāng)超時的時候徘层,會執(zhí)行下面的case代碼(這個default沒有break)峻呕。
do {
uint64_t nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout));
} while (kr == KERN_ABORTED);
if (kr != KERN_OPERATION_TIMED_OUT) {
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
//若當(dāng)前信號量desma_value小于0,對其加一并返回超時信號KERN_OPERATION_TIMED_OUT趣效。
//KERN_OPERATION_TIMED_OUT代表等待超時而返回
//由于一開始在第一部分代碼中進(jìn)行了減1操作瘦癌,所以需要加1以撤銷之前的操作。
while ((orig = dsema->dsema_value) < 0) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
return KERN_OPERATION_TIMED_OUT;
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
//一直等待直到有信號跷敬。當(dāng)有信號的時候說明dsema_value大于0讯私,會跳轉(zhuǎn)到again,重新執(zhí)行本函數(shù)的流程
do {
kr = semaphore_wait(dsema->dsema_port);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
break;
}
#elif USE_POSIX_SEM
//此處的代碼省略干花,跟上面USE_MACH_SEM代碼類似
#endif
goto again;
}
在上面的源碼還有幾個地方需要注意:
第一部分的那個while循環(huán)和if條件妄帘。在
dsema_sent_ksignals
非0的情況下便會進(jìn)入while循環(huán),if的條件是dsema->dsema_sent_ksignals
如果等于orig
池凄,則將orig - 1
賦值給dsema_sent_ksignals
抡驼,并且返回true
,否則返回false
肿仑。很明顯致盟,只要能進(jìn)入循環(huán),這個條件是一定成立的尤慰,函數(shù)直接返回0馏锡,表示等到信號。而在初始化信號量的時候沒有對dsema_sent_ksignals
賦值伟端,所以就會進(jìn)入之后的代碼杯道。也就是說沒有信號量的實際通知或者遭受了系統(tǒng)異常通知,并不會解除等待在上面中出現(xiàn)了
semaphore_timedwait
和semaphore_wait
责蝠。這些方法是在semaphore.h
中的党巾。所以說dispatch_semaphore是基于mach內(nèi)核的信號量接口實現(xiàn)的。另外這兩個方法傳入的參數(shù)是dsema_port
即dsema_port
被mach內(nèi)核semaphore監(jiān)聽霜医,所以我們理解dsema_port
是dispatch_semaphore的信號齿拂。我們回過頭再看一下
dispatch_semaphore_s
結(jié)構(gòu)體中的dsema_waiter_port
。全局搜索一下可以發(fā)現(xiàn)肴敛,這個屬性是用在dispatch_group
中署海。之前也說了dispatch_group
的實現(xiàn)是基于dispatch_semaphore
,在dispatch_group
里semaphore_wait
監(jiān)聽的并不是dsema_port
而是dsema_waiter_port
医男。
dispatch_semaphore_wait
流程如下圖所示:
dispatch_semaphore_signal
發(fā)送信號的代碼相對等待信號來說簡單很多砸狞,它不需要阻塞,只發(fā)送喚醒镀梭。
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
dispatch_atomic_release_barrier();
//原子性加1趾代,value大于0 說明有資源立即返回
long value = dispatch_atomic_inc2o(dsema, dsema_value);
if (fastpath(value > 0)) {
return 0;
}
if (slowpath(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
_dispatch_semaphore_signal_slow
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
// Before dsema_sent_ksignals is incremented we can rely on the reference
// held by the waiter. However, once this value is incremented the waiter
// may return between the atomic increment and the semaphore_signal(),
// therefore an explicit reference must be held in order to safely access
// dsema after the atomic increment.
_dispatch_retain(dsema);
(void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals);
#if USE_MACH_SEM
_dispatch_semaphore_create_port(&dsema->dsema_port);
kern_return_t kr = semaphore_signal(dsema->dsema_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
#elif USE_POSIX_SEM
int ret = sem_post(&dsema->dsema_sem);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
_dispatch_release(dsema);
return 1;
}
_dispatch_semaphore_signal_slow
的作用就是內(nèi)核的semaphore_signal
函數(shù)喚醒在dispatch_semaphore_wait
中等待的線程量,然后返回1丰辣。
dispatch_semaphore_signal
流程如下圖所示:
總結(jié)
dispatch_semaphore
是基于mach內(nèi)核的信號量接口實現(xiàn)的調(diào)用
dispatch_semaphore_wait
信號量減1撒强,調(diào)用dispatch_semaphore_signal
信號量加1在
wait
中,信號量大于等于0代表有資源立即返回笙什,否則等待信號量或者返回超時飘哨;在signal
中,信號量大于0代表有資源立即返回琐凭,否則喚醒某個正在等待的線程dispatch_semaphore
利用了兩個變量desma_value
和dsema_sent_ksignals
來處理wait
和signal
芽隆,在singnal
中如果有資源,則不需要喚醒線程统屈,那么此時只需要使用desma_value
胚吁。當(dāng)需要喚醒線程的時候,發(fā)送的信號是dsema_sent_ksignals
的值愁憔,此時會重新執(zhí)行wait
的流程腕扶,所以在wait
中一開始是用dsema_sent_ksignals
做判斷。再看一下
dispatch_semaphore_s
結(jié)構(gòu)體的變量吨掌。
struct dispatch_semaphore_s {
DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
long dsema_value; //當(dāng)前信號量
long dsema_orig; //初始化信號量
size_t dsema_sent_ksignals; //喚醒時候的信號量
#if USE_MACH_SEM && USE_POSIX_SEM
#error "Too many supported semaphore types"
#elif USE_MACH_SEM
semaphore_t dsema_port; //結(jié)構(gòu)體使用的semaphore信號
semaphore_t dsema_waiter_port;//dispatch_group使用的使用的semaphore信號
#elif USE_POSIX_SEM
sem_t dsema_sem;
#else
#error "No supported semaphore type"
#endif
size_t dsema_group_waiters;
struct dispatch_sema_notify_s *dsema_notify_head; //notify鏈表頭部
struct dispatch_sema_notify_s *dsema_notify_tail; //notify鏈表尾部
};
補(bǔ)充
如何控制線程并發(fā)數(shù)
方法1:使用信號量進(jìn)行并發(fā)控制
dispatch_queue_t concurrentQueue = dispatch_queue_create("concurrentQueue", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue",DISPATCH_QUEUE_SERIAL);
dispatch_semaphore_t semaphore = dispatch_semaphore_create(4);
for (NSInteger i = 0; i < 15; i++) {
dispatch_async(serialQueue, ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
dispatch_async(concurrentQueue, ^{
NSLog(@"thread:%@開始執(zhí)行任務(wù)%d",[NSThread currentThread],(int)i);
sleep(1);
NSLog(@"thread:%@結(jié)束執(zhí)行任務(wù)%d",[NSThread currentThread],(int)i);
dispatch_semaphore_signal(semaphore);});
});
}
NSLog(@"主線程...!");
結(jié)果
方法2:YYDispatchQueuePool的實現(xiàn)思路
YYKit組件中的YYDispatchQueuePool
也能控制并發(fā)隊列的并發(fā)數(shù)
在iOS保持界面流暢的技巧
原文中提到:
其思路是為不同優(yōu)先級創(chuàng)建和 CPU 數(shù)量相同的 serial queue半抱,每次從 pool 中獲取 queue 時,會輪詢返回其中一個 queue膜宋。我把 App 內(nèi)所有異步操作窿侈,包括圖像解碼、對象釋放秋茫、異步繪制等史简,都按優(yōu)先級不同放入了全局的 serial queue 中執(zhí)行,這樣盡量避免了過多線程導(dǎo)致的性能問題肛著。