1. 協(xié)程介紹
協(xié)程(coroutine)是近些年來在后臺開發(fā)方向比較火的一個概念瞬内,實際上秕衙,協(xié)程在歷史上比線程還要早些,而最近火起來則是因為近來后臺服務開發(fā)中遇到的C10K問題導致慷蠕。Wiki中給出的定義:
協(xié)程是一種程序組件掸冤,是由子例程(過程厘托、函數(shù)、例程稿湿、方法铅匹、子程序)的概念泛化而來的,子例程只有一個入口點且只返回一次饺藤,而協(xié)程允許多個入口點包斑,可以在指定位置掛起和恢復執(zhí)行。
單看定義涕俗,比較晦澀難懂罗丰。本質上,協(xié)程可以粗略理解為用戶態(tài)線程再姑,主要是用來解決在IO Bound型服務當中萌抵,如何更加有效地榨取CPU的使用效率。(同時元镀,還要兼顧程序開發(fā)的效率和可讀性绍填,因此異步回調的開發(fā)模式雖然在Nginx引領下大放異彩,但對程序開發(fā)人員來說并不友好)栖疑。用戶態(tài)線程有兩個問題必須被處理:
- 碰到阻塞條件(或者其他觸發(fā)條件)讨永,進程必須能夠被掛起;
- 由于沒有時鐘阻塞遇革,進程需要有自己進行線程調度的能力卿闹。
因此揭糕,如果一種“用戶態(tài)線程”的實現(xiàn),使得每個線程可以通過自己調用某個方法(如yield等)锻霎,主動交出控制權插佛,那么我們就稱這種用戶態(tài)線程為協(xié)程(協(xié)作式線程)。
一個例子就是生產者消費者模型量窘,如果在此處將線程換為協(xié)程雇寇,即一個協(xié)程負責生產產品并放入隊列,另一個負責把產品從隊列中取出并進行消費蚌铜,同時為了提高效率可以一次生產或消費多個產品锨侯,則偽代碼如下:
# producer coroutine
loop
while queue is not full
create some new items
add the items to queue
yield to consumer
# consumer coroutine
loop
while queue is not empty
remove some items from queue
use the items
yield to producer
2. 協(xié)程實現(xiàn)學習筆記
在C/C++中,函數(shù)調用通過壓棧的方式完成冬殃,具體過程如下:
- 第一個進棧的是主函數(shù)中函數(shù)調用后的下一條指令(函數(shù)調用語句的下一條可執(zhí)行語句)的地址囚痴;
- 然后是函數(shù)的各個參數(shù),在大多數(shù)的C編譯器中审葬,參數(shù)是由右往左入棧的深滚,然后是函數(shù)中的局部變量。注意靜態(tài)變量是不入棧的涣觉;
- 當本次函數(shù)調用結束后痴荐,局部變量先出棧,然后是參數(shù)官册,最后棧頂指針指向最開始存的地址生兆,也就是主函數(shù)中的下一條指令,程序由該點繼續(xù)運行膝宁。
在被調函數(shù)執(zhí)行完成前鸦难,主調函數(shù)無法獲取其上下文,因為函數(shù)變量已經壓棧暫時無法使用员淫。因此合蔽,實現(xiàn)協(xié)程本質就是如何讓C++實現(xiàn)yield的功能。
利用Linux提供的上下文保存機制介返,協(xié)程實現(xiàn)有ucontext系列和setjmp/longjmp系列拴事。這里先對ucontext系列進行簡單說明。ucontext是GNU C提供的一組用于創(chuàng)建映皆、保存挤聘、切換用戶態(tài)執(zhí)行上下文(context)的API轰枝,主要包括以下四個函數(shù):
void makecontext (ucontext_t *ucp, void (*func)(), int argc, ...);
int swapcontext (ucontext_t *oucp, ucontext_t *ucp);
int getcontext (ucontext_t *ucp);
int setcontext (const ucontext_t *ucp);
typedef struct ucontext {
struct ucontext *uc_link;
sigset_t uc_sigmask;
stack_t uc_stack;
mcontext_t uc_mcontext;
...
} ucontext_t;
其中捅彻,
- sigset_t和stack_t定義在標準頭文件<signal.h>中,uc_link字段保存當前context執(zhí)行結束后執(zhí)行的下一個context記錄鞍陨;
- uc_sigmask記錄該context運行階段需要屏蔽的信號步淹;
- uc_stack是該context運行時的棧信息从隆,最后一個字段uc_mcontext保存具體的程序執(zhí)行上下文——PC值、堆棧指針缭裆、寄存器值等键闺,實現(xiàn)方式與平臺、硬件相關澈驼。
簡單介紹下四個函數(shù):
int makecontext(ucontext_t ucp, void (func)(), int argc, ...)
makecontext函數(shù)用來初始化一個ucontext_t類型結構辛燥,func指明該context的入口函數(shù),argc指明參數(shù)個數(shù)缝其,后續(xù)緊跟各個參數(shù)(每個參數(shù)都是int型)挎塌;
另外,在調用makecontext之前内边,一般還需要顯式的指明其初始棧信息(棧指針SP及棧大辛穸肌)和運行時的信號屏蔽掩碼(signal mask)。同時也可以指定uc_link字段漠其,這樣在func函數(shù)返回后嘴高,就會切換到uc_link指向的context繼續(xù)執(zhí)行。
int getcontext(ucontext_t *ucp)
getcontext用來將當前執(zhí)行狀態(tài)上下文保存到ucp指向的上下文結構當中和屎,若后續(xù)調用setcontext或者swapcontext恢復該上下文拴驮,則程序會沿著getcontext調用點之后繼續(xù)執(zhí)行,看起來好像剛從getcontext函數(shù)返回一樣柴信。這個功能和setjmp類似莹汤,都是保存執(zhí)行狀態(tài)以便后續(xù)能夠繼續(xù)執(zhí)行,但是:getcontext函數(shù)的返回值只能表示本次操作是否執(zhí)行正確颠印,而不能用來區(qū)分是直接從getcontext操作返回纲岭,還是由于setcontext/swapcontext恢復狀態(tài)導致的返回,這與setjmp是不一樣的线罕。
int setcontext(const ucontext_t *ucp)
setcontext用來將當前程序執(zhí)行切換到ucp所指向的上下文狀態(tài)止潮,在執(zhí)行正確的情況下,setcontext直接切入到新的執(zhí)行狀態(tài)钞楼,不會再返回喇闸。比如我們用上面介紹的makecontext初始化了一個新的上下文,并將入口指向某函數(shù)func()询件,那么setcontext成功后就會馬上運行func()函數(shù)燃乍。
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
swapcontext可以理解為以上兩個操作的組合:
- 首先getcontext(oucp); //保存當前上下文到oucp
- 然后setcontext(ucp); //執(zhí)行ucp上下文
理論上有上面3個函數(shù)可以滿足需要。但由于getcontext不能區(qū)分返回狀態(tài)宛琅,因此進行上下文切換時需要保存額外的信息來判斷刻蟹,比較麻煩。為了簡化實現(xiàn)嘿辟,swapcontext用來“原子”地完成舊狀態(tài)的保存和切換到新狀態(tài)舆瘪。(并非真正的原子操作片效,在多線程情況下也會引入一些調度方面的問題)
一個具體的例子:
#include <ucontext.h>
#include <stdio.h>
void func1(void* arg)
{
puts("func1");
}
void func2(void* arg)
{
puts("func2");
}
int main()
{
char stack[1024*128];
ucontext_t child,main;
getcontext(&child); //獲取當前上下文
child.uc_stack.ss_sp = stack;//指定棧空間
child.uc_stack.ss_size = sizeof(stack);//指定椨⒐牛空間大小
child.uc_stack.ss_flags = 0;
child.uc_link = &main;//設置后繼上下文
makecontext(&child,(void (*)(void))func1,0);//設置child協(xié)程執(zhí)行func1函數(shù)
swapcontext(&main,&child);//保存當前上下文到main,執(zhí)行child上下文,因為child上下文后繼是main淀衣,所以執(zhí)行了func1函數(shù)后,會回到此處
puts("back to main 1st");//如果設置了后繼上下文召调,func1函數(shù)指向完后會返回此處
makecontext(&child,(void (*)(void))func2,0);
swapcontext(&main,&child);
puts("back to main 2nd");
return 0;
}
//程序輸出:
//func1
//back to main 1st
//func2
//back to main 2nd
在云風實現(xiàn)的協(xié)程庫中膨桥,就是依據(jù)ucontext庫,抽象并實現(xiàn)了自己的協(xié)程調度Scheduler唠叛,在Scheduler中申請了單獨的椆欤空間,與進程執(zhí)行流的棽J空間進行交換保存保存介牙,具體可以參考這一段(來自云風的協(xié)程庫):
void coroutine_resume(struct schedule * S, int id) {
assert(S->running == -1);
assert(id >=0 && id < S->cap);
struct coroutine *C = S->co[id];
if (C == NULL)
return;
int status = C->status;
switch(status) {
case COROUTINE_READY:
getcontext(&C->ctx);
C->ctx.uc_stack.ss_sp = S->stack;
C->ctx.uc_stack.ss_size = STACK_SIZE;
C->ctx.uc_link = &S->main;
S->running = id;
C->status = COROUTINE_RUNNING;
uintptr_t ptr = (uintptr_t)S;
makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
swapcontext(&S->main, &C->ctx);
break;
case COROUTINE_SUSPEND:
memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
S->running = id;
C->status = COROUTINE_RUNNING;
swapcontext(&S->main, &C->ctx);
break;
default:
assert(0);
}
}
static void _save_stack(struct coroutine *C, char *top) {
char dummy = 0;
assert(top - &dummy <= STACK_SIZE);
if (C->cap < top - &dummy) {
free(C->stack);
C->cap = top-&dummy;
C->stack = malloc(C->cap);
}
C->size = top - &dummy;
memcpy(C->stack, &dummy, C->size);
}
void coroutine_yield(struct schedule * S) {
int id = S->running;
assert(id >= 0);
struct coroutine * C = S->co[id];
assert((char *)&C > S->stack);
_save_stack(C,S->stack + STACK_SIZE);
C->status = COROUTINE_SUSPEND;
S->running = -1;
swapcontext(&C->ctx , &S->main);
}