這部分準備分析下Pebble里的協(xié)程實現(xiàn),它和上部分的Phxrpc協(xié)程有一部分相似點蜈漓,即都使用了ucontext_t
,也有協(xié)程管理器融虽,調(diào)度器,定時器等設計思想般又,和Phxrpc不同的是定時器實現(xiàn)并非小根堆,是用了STL中的unordered_map
組件巍佑;另外和Libco中的協(xié)程不同的是茴迁,后者沒使用ucontext_t
,直接使用匯編實現(xiàn)協(xié)程上下文切換的邏輯和數(shù)據(jù)結(jié)構(gòu)萤衰,后期在分析Libco協(xié)程的時候會結(jié)合ucontext_t
相關(guān)的實現(xiàn)重點分析下切換時的工作原理堕义。
Pebble中與協(xié)程相關(guān)的主要有以下幾個類聲明:
1)協(xié)程實例
68 struct coroutine {
69 coroutine_func func;
71 void *ud;
72 ucontext_t ctx;
73 struct schedule * sch;
74 int status;
76 char* stack;
77 int32_t result;
78
79 coroutine() {
//more code ...
87 memset(&ctx, 0, sizeof(ucontext_t));
88 }
89 };
每個成員變量的作用從聲明可以得知;
2)協(xié)程管理器
92 struct schedule {
93 ucontext_t main;
94 int64_t nco;
95 int64_t running;
96 cxx::unordered_map<int64_t, coroutine*> co_hash_map;
97 std::list<coroutine*> co_free_list;
98 int32_t co_free_num;
99 uint32_t stack_size;
100 };
其中main
為主協(xié)程脆栋,running
表示正在運行的協(xié)程id
倦卖,co_hash_map
為協(xié)程管理器,co_free_list
為空閑的協(xié)程實例椿争,stack_size
大小文中為256k
怕膛;
3)協(xié)程任務類相關(guān)
173 class CoroutineTask {
174 friend class CoroutineSchedule;
175 public:
176 CoroutineTask();
177 virtual ~CoroutineTask();
178
179 int64_t Start(bool is_immediately = true);
180 virtual void Run() = 0;
181
186 int32_t Yield(int32_t timeout_ms = -1);
188 CoroutineSchedule* schedule_obj();
189 private:
190 int64_t id_;
191 CoroutineSchedule* schedule_obj_;
192 };
193
194 class CommonCoroutineTask : public CoroutineTask {
195 public:
196 CommonCoroutineTask() {}
198 virtual ~CommonCoroutineTask() {}
199
200 void Init(const cxx::function<void(void)>& run) { m_run = run; }
201
202 virtual void Run() {
203 m_run();
204 }
206 private:
207 cxx::function<void(void)> m_run;
208 };
4)協(xié)程調(diào)試管理器
233 class CoroutineSchedule {
234 friend class CoroutineTask;
235 public:
236 int Init(Timer* timer = NULL, uint32_t stack_size = 256 * 1024);
237
238 int Close();
239 CoroutineTask* CurrentTask() const;
240
241 int64_t CurrentTaskId() const;
242
243 int32_t Yield(int32_t timeout_ms = -1);
244 int32_t Resume(int64_t id, int32_t result = 0);
245
246 template<typename TASK>
247 TASK* NewTask() {
248 if (CurrentTaskId() != INVALID_CO_ID) {
249 return NULL;
250 }
251 TASK* task = new TASK();
252 if (AddTaskToSchedule(task)) {
253 delete task;
254 task = NULL;
255 }
256 return task;
257 }
259 private:
260 int AddTaskToSchedule(CoroutineTask* task);
262 int32_t OnTimeout(int64_t id);
263
264 struct schedule* schedule_;
265 Timer* timer_;
266 cxx::unordered_map<int64_t, CoroutineTask*> task_map_;
267 std::set<CoroutineTask*> pre_start_task_;
268 };
下面從使用的角度去分析每個接口的實現(xiàn),從pebble_server.cpp中:
741 int32_t PebbleServer::InitCoSchedule() {
742 if (m_coroutine_schedule) {
743 return 0;
744 }
745
746 m_coroutine_schedule = new CoroutineSchedule();
747 int32_t ret = m_coroutine_schedule->Init(GetTimer(), m_options._co_stack_size_bytes);
748 if (ret != 0) {
749 delete m_coroutine_schedule;
750 m_coroutine_schedule = NULL;
752 return -1;
753 }
755 return 0;
756 }
創(chuàng)建一個協(xié)程調(diào)度器丘薛,在Init實現(xiàn)中:
579 int CoroutineSchedule::Init(Timer* timer, uint32_t stack_size) {
580 timer_ = timer;
581 schedule_ = coroutine_open(stack_size);
582 if (schedule_ == NULL)
583 return -1;
584 return 0;
585 }
創(chuàng)建一個協(xié)程管理器嘉竟,coroutine_open
主要做的事情如下:
262 struct schedule *
263 coroutine_open(uint32_t stack_size) {
264 if (0 == stack_size) {
265 stack_size = 256 * 1024;
266 }
267 pid_t pid = GetPid();
268 stCoRoutineEnv_t *env = GetCoEnv(pid);
269 if (env) {
270 return env->co_schedule;
271 }
272
273 void* p = calloc(1, sizeof(stCoRoutineEnv_t));
274 env = reinterpret_cast<stCoRoutineEnv_t*>(p);
275 SetCoEnv(pid, env);
278 struct schedule *S = new schedule;
279 S->nco = 0;
280 S->running = -1;
281 S->co_free_num = 0;
282 S->stack_size = stack_size;
283
284 env->co_schedule = S;
285
286 stCoEpoll_t *ev = AllocEpoll();
287 SetEpoll(env, ev);
290 return S;
291 }
coroutine_open
是獲取協(xié)程調(diào)度器,沒有的話則為每個線程創(chuàng)建一個stCoRoutineEnv_t
洋侨,根據(jù)自己的線程id索引到g_CoEnvArrayForThread
舍扰,不存在的話會初始化:設置棧大小,創(chuàng)建協(xié)程管理器希坚,分配stCoEpoll_t
129 stCoEpoll_t *AllocEpoll() {
130 stCoEpoll_t *ctx = reinterpret_cast<stCoEpoll_t*>(calloc(1, sizeof(stCoEpoll_t)));
131
132 ctx->iEpollFd = epoll_create(stCoEpoll_t::_EPOLL_SIZE);
133 ctx->pTimeout = AllocTimeout(60 * 1000);
134
135 ctx->pstActiveList = reinterpret_cast<stTimeoutItemLink_t*>
136 (calloc(1, sizeof(stTimeoutItemLink_t)));
137 ctx->pstTimeoutList = reinterpret_cast<stTimeoutItemLink_t*>
138 (calloc(1, sizeof(stTimeoutItemLink_t)));
139
140 return ctx;
141 }
這個類的聲明會在分析調(diào)度器時會列出來边苹,這里先跳過。
下面舉例有一個client connect過來后裁僧,整個框架是如何工作的呢个束?先介紹下大概工作流程吧:
Serve()--->Update()--->ProcessMessage()--->Message::Poll()--->RawMessageDriver::Poll()--->NetMessage::Poll()--->Accept()
在accept
到new_socket
時慕购,會設置非阻塞并初始化相關(guān)的數(shù)據(jù),并m_epoll->AddFd(new_socket, EPOLLIN | EPOLLERR, net_addr)
進行對new_socket
監(jiān)聽可讀和錯誤事件茬底。
如果不考慮網(wǎng)絡層的具體實現(xiàn)細節(jié)沪悲,在上層收到完整的rpc請求數(shù)據(jù)后殿如,會進行ProcessRequest
涉馁,會創(chuàng)建一個task,綁定處理函數(shù)ProcessRequestInCoroutine
帮坚,并Start
:
181 int32_t RpcUtil::ProcessRequest(int64_t handle, const RpcHead& rpc_head,
182 const uint8_t* buff, uint32_t buff_len) {
183 if (!m_coroutine_schedule) {
184 return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
185 }
186
187 if (m_coroutine_schedule->CurrentTaskId() != INVALID_CO_ID) {
188 return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
189 }
190
191 CommonCoroutineTask* task = m_coroutine_schedule->NewTask<CommonCoroutineTask>();
192 cxx::function<void(void)> run = cxx::bind(&RpcUtil::ProcessRequestInCoroutine, this,
193 handle, rpc_head, buff, buff_len);
194 task->Init(run);
195 task->Start();
196
197 return kRPC_SUCCESS;
198 }
200 int32_t RpcUtil::ProcessRequestInCoroutine(int64_t handle, const RpcHead& rpc_head,
201 const uint8_t* buff, uint32_t buff_len) {
202 return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
203 }
協(xié)程相關(guān)的創(chuàng)建工作:
287 template<typename TASK>
288 TASK* NewTask() {
289 if (CurrentTaskId() != INVALID_CO_ID) {
290 return NULL;
291 }
292 TASK* task = new TASK();
293 if (AddTaskToSchedule(task)) {
294 delete task;
295 task = NULL;
296 }
297 return task;
298 }
639 int CoroutineSchedule::AddTaskToSchedule(CoroutineTask* task) {
640 task->schedule_obj_ = this;
641 pre_start_task_.insert(task);
642 return 0;
643 }
546 int64_t CoroutineTask::Start(bool is_immediately) {
547 if (is_immediately && schedule_obj_->CurrentTaskId() != INVALID_CO_ID) {
548 delete this;
549 return -1;
550 }
551 id_ = coroutine_new(schedule_obj_->schedule_, DoTask, this);
552 if (id_ < 0)
553 id_ = -1;
554 int64_t id = id_;
555 schedule_obj_->task_map_[id_] = this;
556 schedule_obj_->pre_start_task_.erase(this);
557 if (is_immediately) {
558 int32_t ret = coroutine_resume(schedule_obj_->schedule_, id_);
559 if (ret != 0) {
560 id = -1;
561 }
562 }
563 return id;
564 }
以上實現(xiàn)是開始一個協(xié)程任務忘朝,結(jié)束后會delete task
,coroutine_new
主要是分配一個協(xié)程對象悦昵,并設置入口函數(shù)DoTask
但指,由于默認形參為true
,所以會立即coroutine_resume
:
342 int64_t coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
343 if (NULL == S || NULL == func) {
344 return -1;
345 }
346 struct coroutine *co = _co_new(S, func, ud);
347 int64_t id = S->nco;
348 S->co_hash_map[id] = co;
349 S->nco++;
352 return id;
353 }
231 struct coroutine *
232 _co_new(struct schedule *S, coroutine_func func, void *ud) {
233 if (NULL == S) {
234 assert(0);
235 return NULL;
236 }
237
238 struct coroutine * co = NULL;
239
240 if (S->co_free_list.empty()) {
241 co = new coroutine;
242 co->stack = new char[S->stack_size];
243 } else {
244 co = S->co_free_list.front();
245 S->co_free_list.pop_front();
246
247 S->co_free_num--;
248 }
249 co->func = func;
250 co->ud = ud;
251 co->sch = S;
252 co->status = COROUTINE_READY;
253
254 return co;
255 }
518 void DoTask(struct schedule*, void *ud) {
519 CoroutineTask* task = static_cast<CoroutineTask*>(ud);
520 assert(task != NULL);
521 task->Run();
522 delete task;
523 }
coroutine_new
里主要工作是從協(xié)程池中看一下有沒有空閑的協(xié)程對象,有的話復用入热,否則創(chuàng)建,然后設置協(xié)程的處理函數(shù)和對象骄噪,以及status
為COROUTINE_READY
狀態(tài)腰池,其中task->Run()
最終運行的是ProcessRequestInCoroutine
示弓,創(chuàng)建完一個協(xié)程對象后進行coroutine_resume
:
381 int32_t coroutine_resume(struct schedule * S, int64_t id, int32_t result) {
382 if (NULL == S) {
383 return kCO_INVALID_PARAM;
384 }
385 if (S->running != -1) {
386 return kCO_CANNOT_RESUME_IN_COROUTINE;
387 }
388 cxx::unordered_map<int64_t, coroutine*>::iterator pos = S->co_hash_map.find(id);
391 if (pos == S->co_hash_map.end()) {
393 return kCO_COROUTINE_UNEXIST;
394 }
395
396 struct coroutine *C = pos->second;
397 if (NULL == C) {
399 return kCO_COROUTINE_UNEXIST;
400 }
402 C->result = result;
403 int status = C->status;
404 switch (status) {
405 case COROUTINE_READY: {
408 getcontext(&C->ctx);
409 C->ctx.uc_stack.ss_sp = C->stack;
410 C->ctx.uc_stack.ss_size = S->stack_size;
411 C->ctx.uc_stack.ss_flags = 0;
412 C->ctx.uc_link = &S->main;
413 S->running = id;
414 C->status = COROUTINE_RUNNING;
415 uintptr_t ptr = (uintptr_t) S;
416 makecontext(&C->ctx, (void (*)(void)) mainfunc, 2,
417 (uint32_t)ptr,
418 (uint32_t)(ptr>>32));
419
420 swapcontext(&S->main, &C->ctx);
422 break;
423 }
424 case COROUTINE_SUSPEND: {
428 S->running = id;
429 C->status = COROUTINE_RUNNING;
430 swapcontext(&S->main, &C->ctx);
431
432 break;
433 }
435 default:
437 return kCO_COROUTINE_STATUS_ERROR;
438 }
439
440 return 0;
441 }
355 static void mainfunc(uint32_t low32, uint32_t hi32) {
356 uintptr_t ptr = (uintptr_t) low32 | ((uintptr_t) hi32 << 32);
357 struct schedule *S = (struct schedule *) ptr;
358 int64_t id = S->running;
359 struct coroutine *C = S->co_hash_map[id];
360 if (C->func != NULL) {
361 C->func(S, C->ud);
362 } else {
363 C->std_func();
364 }
365 S->co_free_list.push_back(C);
366 S->co_free_num++;
367
368 if (S->co_free_num > MAX_FREE_CO_NUM) {
369 coroutine* co = S->co_free_list.front();
370 _co_delete(co);
371
372 S->co_free_list.pop_front();
373 S->co_free_num--;
374 }
375
376 S->co_hash_map.erase(id);
377 S->running = -1;
379 }
以上根據(jù)stauts
分別處理不同的邏輯囱皿,如果是COROUTINE_READY
表示協(xié)程還未運行嘱腥,這里的邏輯同Phxrpc一樣:獲取上下文齿兔,設置棧大小和地址,然后設置協(xié)程執(zhí)行完后回到的主協(xié)程main
医寿,并更新status
靖秩,并設置協(xié)程入口函數(shù)mainfunc
和參數(shù)等工作沟突,之后切換到該協(xié)程運行事扭;
mainfunc
執(zhí)行具體的回調(diào)函數(shù)DoTask
求橄,處理完后釋放協(xié)程對象資源条霜;
如果是COROUTINE_SUSPEND
表示切入上一次被切出的協(xié)程宰睡,此時更新status
并由swapcontext
切換上下文,繼續(xù)運行麸恍;
下面是切出cpu協(xié)程的實現(xiàn):
443 int32_t coroutine_yield(struct schedule * S) {
444 if (NULL == S) {
445 return kCO_INVALID_PARAM;
446 }
447
448 int64_t id = S->running;
449 if (id < 0) {
451 return kCO_NOT_IN_COROUTINE;
452 }
453
454 assert(id >= 0);
455 struct coroutine * C = S->co_hash_map[id];
456
457 if (C->status != COROUTINE_RUNNING) {
459 return kCO_NOT_RUNNING;
460 }
462 C->status = COROUTINE_SUSPEND;
463 S->running = -1;
466 swapcontext(&C->ctx, &S->main);
468 return C->result;
469 }
以上整個分析是一個協(xié)程切入和切出cpu的工作原理抹沪。
協(xié)程之間的調(diào)度,超時事件的處理卦羡,可能有可讀寫事件的發(fā)生等逝薪,都要去處理,并執(zhí)行可能的回調(diào)函數(shù)步清;
還是以介紹協(xié)程基礎的時候舉的兩個hook socket api為例:
228 ssize_t read(int fd, void *buf, size_t nbyte)
229 {
230 HOOK_SYS_FUNC(read);
231
232 if (!co_is_enable_sys_hook()) {
233 return g_sys_read_func(fd, buf, nbyte);
234 }
235 rpchook_t *lp = get_by_fd(fd);
236
237 if (!lp || (O_NONBLOCK & lp->user_flag)) {
238 ssize_t ret = g_sys_read_func(fd, buf, nbyte);
239 return ret;
240 }
241 int timeout = (lp->read_timeout.tv_sec * 1000)
242 + (lp->read_timeout.tv_usec / 1000);
243
244 struct pollfd pf = { 0 };
245 pf.fd = fd;
246 pf.events = (POLLIN | POLLERR | POLLHUP);
247
248 int pollret = poll(&pf, 1, timeout);
250 ssize_t readret = g_sys_read_func(fd, reinterpret_cast<char*>(buf), nbyte);
251
252 if (readret < 0) {
255 }
257 return readret;
258 }
259 ssize_t write(int fd, const void *buf, size_t nbyte)
260 {
261 HOOK_SYS_FUNC(write);
262
263 if (!co_is_enable_sys_hook()) {
264 return g_sys_write_func(fd, buf, nbyte);
265 }
266 rpchook_t *lp = get_by_fd(fd);
267
268 if (!lp || (O_NONBLOCK & lp->user_flag)) {
269 ssize_t ret = g_sys_write_func(fd, buf, nbyte);
270 return ret;
271 }
272
273 size_t wrotelen = 0;
274 int timeout = (lp->write_timeout.tv_sec * 1000)
275 + (lp->write_timeout.tv_usec / 1000);
276
277 ssize_t writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
278
279 if (writeret > 0) {
280 wrotelen += writeret;
281 }
282 while (wrotelen < nbyte) {
284 struct pollfd pf = {0};
285 pf.fd = fd;
286 pf.events = (POLLOUT | POLLERR | POLLHUP);
287 poll(&pf, 1, timeout);
288
289 writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
290
291 if (writeret <= 0) {
292 break;
293 }
294 wrotelen += writeret;
295 }
296 return wrotelen;
297 }
427 int poll(struct pollfd fds[], nfds_t nfds, int timeout)
428 {
429 HOOK_SYS_FUNC(poll);
430
431 if (!co_is_enable_sys_hook()) {
432 return g_sys_poll_func(fds, nfds, timeout);
433 }
434
435 return co_poll(co_get_epoll_ct(), fds, nfds, timeout);
436 }
1027 int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout)
1028 {
1029 if (timeout > stTimeoutItem_t::eMaxTimeout) {
1030 timeout = stTimeoutItem_t::eMaxTimeout;
1031 }
1032 int epfd = ctx->iEpollFd;
1033
1034 // 1.struct change
1035 stPoll_t arg;
1036 memset(&arg, 0, sizeof(arg));
1037
1038 arg.iEpollFd = epfd;
1039 arg.fds = fds;
1040 arg.nfds = nfds;
1041
1042 stPollItem_t arr[2];
1043 if (nfds < sizeof(arr) / sizeof(arr[0])) {
1044 arg.pPollItems = arr;
1045 } else {
1046 arg.pPollItems = reinterpret_cast<stPollItem_t*>(malloc(nfds * sizeof(stPollItem_t)));
1047 }
1048 memset(arg.pPollItems, 0, nfds * sizeof(stPollItem_t));
1049
1050 arg.pfnProcess = OnPollProcessEvent;
1051 arg.co_id = get_curr_co_id();
1053 // 2.add timeout
1054 unsigned long long now = GetTickMS();
1055 arg.ullExpireTime = now + timeout;
1056 int ret = AddTimeout(ctx->pTimeout, &arg, now);
1057 if (ret != 0) {
1060 errno = EINVAL;
1061 return -__LINE__;
1062 }
1063
1064 for (nfds_t i = 0; i < nfds; i++) {
1065 arg.pPollItems[i].pSelf = fds + i;
1066 arg.pPollItems[i].pPoll = &arg;
1067
1068 arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
1069 struct epoll_event &ev = arg.pPollItems[i].stEvent;
1070
1071 if (fds[i].fd > -1) {
1072 ev.data.ptr = arg.pPollItems + i;
1073 ev.events = PollEvent2Epoll(fds[i].events);
1074
1075 epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i].fd, &ev);
1076 }
1077 }
1079 coroutine_yield(co_get_curr_thread_env()->co_schedule);
1080
1081 RemoveFromLink<stTimeoutItem_t, stTimeoutItemLink_t>(&arg);
1082 for (nfds_t i = 0; i < nfds; i++) {
1083 int fd = fds[i].fd;
1084 if (fd > -1) {
1085 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &arg.pPollItems[i].stEvent);
1086 }
1087 }
1088
1089 if (arg.pPollItems != arr) {
1090 free(arg.pPollItems);
1091 arg.pPollItems = NULL;
1092 }
1093 return arg.iRaiseCnt;
1094 }
對于accept fd,設置了非阻塞疮装,當讀或?qū)懙臅r候,需要監(jiān)聽相應的事件刷袍,為此,每個線程都會有一個struct stCoRoutineEnv_t
對象:
66 struct stCoEpoll_t {
67 int iEpollFd;
68 static const int _EPOLL_SIZE = 1024 * 10;
69
70 struct stTimeout_t *pTimeout;
72 struct stTimeoutItemLink_t *pstTimeoutList;
74 struct stTimeoutItemLink_t *pstActiveList;
76 co_epoll_res *result;
77 };
在coroutine_open
的時候初始化:
37 struct stCoRoutineEnv_t {
38 schedule* co_schedule;
39 stCoEpoll_t *pEpoll;
40 };
942 void OnPollProcessEvent(stTimeoutItem_t * ap)
943 {
944 coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
945 }
1050 arg.pfnProcess = OnPollProcessEvent;
1051 arg.co_id = get_curr_co_id();
以上兩行當事件發(fā)生時的回調(diào)函數(shù)雷酪,里面會resume對應id的協(xié)程。
1055 arg.ullExpireTime = now + timeout;
1056 int ret = AddTimeout(ctx->pTimeout, &arg, now);
以上是添加超時處理省骂,代碼行1064?1077是依次監(jiān)聽fd钞澳,OnPollPreparePfn
當事件發(fā)生時會設置event并從超時鏈表中移走,并添加到活躍鏈表中兰吟,重點關(guān)注下這種使用方法:ev.data.ptr = arg.pPollItems + i
混蔼,當epoll有事件或超時返回惭嚣,需要從ev.data.ptr
獲得相關(guān)的信息:stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr)
;
然后進行coroutine_yield(co_get_curr_thread_env()->co_schedule)
切出該協(xié)程槽地;
如果當事件發(fā)生時,再切回來,代碼行1079?1088從超時鏈表中稱走捌蚊,并從epoll中刪除相關(guān)的fd集畅,釋放資源等;
最后是一個eventloop類似的wait功能:
965 void co_update()
966 {
967 stCoEpoll_t* ctx = co_get_epoll_ct();
//more code....
974 co_epoll_res *result = ctx->result;
975 int ret = epoll_wait(ctx->iEpollFd, result->events, stCoEpoll_t::_EPOLL_SIZE, 1);
976
977 stTimeoutItemLink_t *active = (ctx->pstActiveList);
978 stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
979
980 memset(active, 0, sizeof(stTimeoutItemLink_t));
981 memset(timeout, 0, sizeof(stTimeoutItemLink_t));
982
983 for (int i = 0; i < ret; i++)
984 {
985 stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr);
986 if (item->pfnPrepare) {
987 item->pfnPrepare(item, result->events[i], active);
988 } else {
989 AddTail(active, item);
990 }
991 }
993 unsigned long long now = GetTickMS();
994 TakeAllTimeout(ctx->pTimeout, now, timeout);
995
996 stTimeoutItem_t *lp = timeout->head;
997 while (lp) {
998 lp->bTimeout = true;
999 lp = lp->pNext;
1000 }
1001
1002 Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);
1003
1004 lp = active->head;
1005 while (lp) {
1006
1007 PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
1008 if (lp->pfnProcess) {
1009 lp->pfnProcess(lp);
1010 }
1011
1012 lp = active->head;
1013 }
1014 }
上面的實現(xiàn)就是使用epoll阻塞一毫秒逢勾,有事件發(fā)生時依次執(zhí)行pfnProcess
牡整,即OnPollProcessEvent
,resume這個協(xié)程:
942 void OnPollProcessEvent(stTimeoutItem_t * ap)
943 {
944 coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
945 }
沒有就是超時溺拱;然后處理超時情況逃贝,和上面的邏輯一樣;其他有些更細節(jié)的沒作過多分析沐扳,整體的實現(xiàn)原理和Phxrpc差不多杨拐,包括和后面分析的libco協(xié)程屋吨。
類似的協(xié)程實現(xiàn),網(wǎng)上有很多方案,再比如libgo懊缺,這個有時間會去研究下途蒋,協(xié)程的棧不能設置過大,看過的幾個實現(xiàn)都是128kb号胚;一般會包括調(diào)度器杜漠,定時器锈至,協(xié)程池,eventloop,重點關(guān)注下協(xié)程的切換和狀態(tài)迄损,以及入口函數(shù)拗引;其他更底層的關(guān)于性能方面的断部,比如少用malloc/free和new/delete等蔑祟,綁定cpu等锄贷,減少線程在cpu間切換導致cache miss等,這些可以參考下brpc設計勿决,如果服務器架構(gòu)支持NUMA的玩般,可以參考下DPDK/SPDK 的典型應用,后期也會找些時間去研究它倆的一些關(guān)鍵技術(shù)實現(xiàn)洒忧。
這幾個協(xié)程都是stackfull的,且棧大小固定惩系,難免會有點浪費纠屋,在結(jié)束libco分析后,會列幾個優(yōu)化它的地方涨薪。