Pebble協(xié)程庫實現(xiàn)

這部分準備分析下Pebble里的協(xié)程實現(xiàn),它和上部分的Phxrpc協(xié)程有一部分相似點蜈漓,即都使用了ucontext_t,也有協(xié)程管理器融虽,調(diào)度器,定時器等設計思想般又,和Phxrpc不同的是定時器實現(xiàn)并非小根堆,是用了STL中的unordered_map組件巍佑;另外和Libco中的協(xié)程不同的是茴迁,后者沒使用ucontext_t,直接使用匯編實現(xiàn)協(xié)程上下文切換的邏輯和數(shù)據(jù)結(jié)構(gòu)萤衰,后期在分析Libco協(xié)程的時候會結(jié)合ucontext_t相關(guān)的實現(xiàn)重點分析下切換時的工作原理堕义。

Pebble中與協(xié)程相關(guān)的主要有以下幾個類聲明:
1)協(xié)程實例

 68 struct coroutine {
 69     coroutine_func func;
 71     void *ud;
 72     ucontext_t ctx;
 73     struct schedule * sch;
 74     int status;
 76     char* stack;
 77     int32_t result; 
 78 
 79     coroutine() {
            //more code ...
 87         memset(&ctx, 0, sizeof(ucontext_t));
 88     }
 89 };

每個成員變量的作用從聲明可以得知;
2)協(xié)程管理器

 92 struct schedule {
 93     ucontext_t main;
 94     int64_t nco; 
 95     int64_t running;
 96     cxx::unordered_map<int64_t, coroutine*> co_hash_map;
 97     std::list<coroutine*> co_free_list;
 98     int32_t co_free_num;
 99     uint32_t stack_size;
100 };

其中main為主協(xié)程脆栋,running表示正在運行的協(xié)程id倦卖,co_hash_map為協(xié)程管理器,co_free_list為空閑的協(xié)程實例椿争,stack_size大小文中為256k怕膛;
3)協(xié)程任務類相關(guān)

173 class CoroutineTask {
174     friend class CoroutineSchedule;
175 public:
176     CoroutineTask();
177     virtual ~CoroutineTask();
178 
179     int64_t Start(bool is_immediately = true);
180     virtual void Run() = 0;
181    
186     int32_t Yield(int32_t timeout_ms = -1);
188     CoroutineSchedule* schedule_obj();
189 private:
190     int64_t id_;
191     CoroutineSchedule* schedule_obj_;
192 };  
193 
194 class CommonCoroutineTask : public CoroutineTask {
195 public:
196     CommonCoroutineTask() {}  
198     virtual ~CommonCoroutineTask() {}
199     
200     void Init(const cxx::function<void(void)>& run) { m_run = run; }
201     
202     virtual void Run() {
203         m_run();
204     }
206 private:
207     cxx::function<void(void)> m_run;
208 };

4)協(xié)程調(diào)試管理器

233 class CoroutineSchedule {
234     friend class CoroutineTask;
235 public:
236     int Init(Timer* timer = NULL, uint32_t stack_size = 256 * 1024);
237     
238     int Close();
239     CoroutineTask* CurrentTask() const;
240     
241     int64_t CurrentTaskId() const;
242     
243     int32_t Yield(int32_t timeout_ms = -1);
244     int32_t Resume(int64_t id, int32_t result = 0);
245     
246     template<typename TASK>
247     TASK* NewTask() { 
248         if (CurrentTaskId() != INVALID_CO_ID) {
249             return NULL;
250         }   
251         TASK* task = new TASK();
252         if (AddTaskToSchedule(task)) {
253             delete task;
254             task = NULL;
255         }   
256         return task;
257     }     
259 private:
260     int AddTaskToSchedule(CoroutineTask* task);
262     int32_t OnTimeout(int64_t id);
263 
264     struct schedule* schedule_;
265     Timer* timer_;
266     cxx::unordered_map<int64_t, CoroutineTask*> task_map_;
267     std::set<CoroutineTask*> pre_start_task_;
268 };

下面從使用的角度去分析每個接口的實現(xiàn),從pebble_server.cpp中:

 741 int32_t PebbleServer::InitCoSchedule() {
 742     if (m_coroutine_schedule) {
 743         return 0;
 744     }
 745 
 746     m_coroutine_schedule = new CoroutineSchedule();
 747     int32_t ret = m_coroutine_schedule->Init(GetTimer(), m_options._co_stack_size_bytes);
 748     if (ret != 0) {
 749         delete m_coroutine_schedule;
 750         m_coroutine_schedule = NULL;
 752         return -1;
 753     }
 755     return 0;
 756 }

創(chuàng)建一個協(xié)程調(diào)度器丘薛,在Init實現(xiàn)中:

 579 int CoroutineSchedule::Init(Timer* timer, uint32_t stack_size) {
 580     timer_ = timer;
 581     schedule_ = coroutine_open(stack_size);
 582     if (schedule_ == NULL)
 583         return -1;
 584     return 0;
 585 }

創(chuàng)建一個協(xié)程管理器嘉竟,coroutine_open主要做的事情如下:

 262 struct schedule *
 263 coroutine_open(uint32_t stack_size) {
 264     if (0 == stack_size) {
 265         stack_size = 256 * 1024;
 266     }
 267     pid_t pid = GetPid();
 268     stCoRoutineEnv_t *env = GetCoEnv(pid);
 269     if (env) {
 270         return env->co_schedule;
 271     }
 272 
 273     void* p = calloc(1, sizeof(stCoRoutineEnv_t));
 274     env = reinterpret_cast<stCoRoutineEnv_t*>(p);
 275     SetCoEnv(pid, env);
 278     struct schedule *S = new schedule;
 279     S->nco = 0;
 280     S->running = -1;
 281     S->co_free_num = 0;
 282     S->stack_size = stack_size;
 283 
 284     env->co_schedule = S;
 285 
 286     stCoEpoll_t *ev = AllocEpoll();
 287     SetEpoll(env, ev);
 290     return S;
 291 }

coroutine_open是獲取協(xié)程調(diào)度器,沒有的話則為每個線程創(chuàng)建一個stCoRoutineEnv_t洋侨,根據(jù)自己的線程id索引到g_CoEnvArrayForThread舍扰,不存在的話會初始化:設置棧大小,創(chuàng)建協(xié)程管理器希坚,分配stCoEpoll_t

 129 stCoEpoll_t *AllocEpoll() {
 130     stCoEpoll_t *ctx = reinterpret_cast<stCoEpoll_t*>(calloc(1, sizeof(stCoEpoll_t)));
 131 
 132     ctx->iEpollFd = epoll_create(stCoEpoll_t::_EPOLL_SIZE);
 133     ctx->pTimeout = AllocTimeout(60 * 1000);
 134 
 135     ctx->pstActiveList = reinterpret_cast<stTimeoutItemLink_t*>
 136         (calloc(1, sizeof(stTimeoutItemLink_t)));
 137     ctx->pstTimeoutList = reinterpret_cast<stTimeoutItemLink_t*>
 138         (calloc(1, sizeof(stTimeoutItemLink_t)));
 139 
 140     return ctx;
 141 }

這個類的聲明會在分析調(diào)度器時會列出來边苹,這里先跳過。

下面舉例有一個client connect過來后裁僧,整個框架是如何工作的呢个束?先介紹下大概工作流程吧:
Serve()--->Update()--->ProcessMessage()--->Message::Poll()--->RawMessageDriver::Poll()--->NetMessage::Poll()--->Accept()
acceptnew_socket時慕购,會設置非阻塞并初始化相關(guān)的數(shù)據(jù),并m_epoll->AddFd(new_socket, EPOLLIN | EPOLLERR, net_addr)進行對new_socket監(jiān)聽可讀和錯誤事件茬底。

如果不考慮網(wǎng)絡層的具體實現(xiàn)細節(jié)沪悲,在上層收到完整的rpc請求數(shù)據(jù)后殿如,會進行ProcessRequest涉馁,會創(chuàng)建一個task,綁定處理函數(shù)ProcessRequestInCoroutine帮坚,并Start

181 int32_t RpcUtil::ProcessRequest(int64_t handle, const RpcHead& rpc_head,
182     const uint8_t* buff, uint32_t buff_len) {
183     if (!m_coroutine_schedule) {
184         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
185     }   
186 
187     if (m_coroutine_schedule->CurrentTaskId() != INVALID_CO_ID) {
188         return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
189     }
190      
191     CommonCoroutineTask* task = m_coroutine_schedule->NewTask<CommonCoroutineTask>();
192     cxx::function<void(void)> run = cxx::bind(&RpcUtil::ProcessRequestInCoroutine, this,
193         handle, rpc_head, buff, buff_len);
194     task->Init(run);
195     task->Start();
196     
197     return kRPC_SUCCESS;
198 }
200 int32_t RpcUtil::ProcessRequestInCoroutine(int64_t handle, const RpcHead& rpc_head,
201     const uint8_t* buff, uint32_t buff_len) {
202     return m_rpc->ProcessRequestImp(handle, rpc_head, buff, buff_len);
203 }

協(xié)程相關(guān)的創(chuàng)建工作:

287     template<typename TASK>
288     TASK* NewTask() {
289         if (CurrentTaskId() != INVALID_CO_ID) {
290             return NULL; 
291         } 
292         TASK* task = new TASK();
293         if (AddTaskToSchedule(task)) {
294             delete task;
295             task = NULL;
296         }
297         return task;
298     }

 639 int CoroutineSchedule::AddTaskToSchedule(CoroutineTask* task) {
 640     task->schedule_obj_ = this;
 641     pre_start_task_.insert(task);
 642     return 0;
 643 }

 546 int64_t CoroutineTask::Start(bool is_immediately) {
 547     if (is_immediately && schedule_obj_->CurrentTaskId() != INVALID_CO_ID) {
 548         delete this;
 549         return -1;
 550     }
 551     id_ = coroutine_new(schedule_obj_->schedule_, DoTask, this);
 552     if (id_ < 0)
 553         id_ = -1;
 554     int64_t id = id_;
 555     schedule_obj_->task_map_[id_] = this;
 556     schedule_obj_->pre_start_task_.erase(this);
 557     if (is_immediately) {
 558         int32_t ret = coroutine_resume(schedule_obj_->schedule_, id_);
 559         if (ret != 0) {
 560             id = -1;
 561         }
 562     }
 563     return id;
 564 }

以上實現(xiàn)是開始一個協(xié)程任務忘朝,結(jié)束后會delete taskcoroutine_new主要是分配一個協(xié)程對象悦昵,并設置入口函數(shù)DoTask但指,由于默認形參為true,所以會立即coroutine_resume

 342 int64_t coroutine_new(struct schedule *S, coroutine_func func, void *ud) {
 343     if (NULL == S || NULL == func) {
 344         return -1;
 345     }
 346     struct coroutine *co = _co_new(S, func, ud);
 347     int64_t id = S->nco;
 348     S->co_hash_map[id] = co;
 349     S->nco++;
 352     return id;
 353 }

 231 struct coroutine *
 232 _co_new(struct schedule *S, coroutine_func func, void *ud) {
 233     if (NULL == S) {
 234         assert(0);
 235         return NULL;
 236     }
 237 
 238     struct coroutine * co = NULL;
 239 
 240     if (S->co_free_list.empty()) {
 241         co = new coroutine;
 242         co->stack = new char[S->stack_size];
 243     } else {
 244         co = S->co_free_list.front();
 245         S->co_free_list.pop_front();
 246 
 247         S->co_free_num--;
 248     }
 249     co->func = func;
 250     co->ud = ud;
 251     co->sch = S;
 252     co->status = COROUTINE_READY;
 253 
 254     return co;
 255 }

 518 void DoTask(struct schedule*, void *ud) {
 519     CoroutineTask* task = static_cast<CoroutineTask*>(ud);
 520     assert(task != NULL);
 521     task->Run();
 522     delete task;
 523 }

coroutine_new里主要工作是從協(xié)程池中看一下有沒有空閑的協(xié)程對象,有的話復用入热,否則創(chuàng)建,然后設置協(xié)程的處理函數(shù)和對象骄噪,以及statusCOROUTINE_READY狀態(tài)腰池,其中task->Run()最終運行的是ProcessRequestInCoroutine示弓,創(chuàng)建完一個協(xié)程對象后進行coroutine_resume

 381 int32_t coroutine_resume(struct schedule * S, int64_t id, int32_t result) {
 382     if (NULL == S) {
 383         return kCO_INVALID_PARAM;
 384     }
 385     if (S->running != -1) {
 386         return kCO_CANNOT_RESUME_IN_COROUTINE;
 387     }
 388     cxx::unordered_map<int64_t, coroutine*>::iterator pos = S->co_hash_map.find(id);
 391     if (pos == S->co_hash_map.end()) {
 393         return kCO_COROUTINE_UNEXIST;
 394     }
 395 
 396     struct coroutine *C = pos->second;
 397     if (NULL == C) {
 399         return kCO_COROUTINE_UNEXIST;
 400     }
 402     C->result = result;
 403     int status = C->status;
 404     switch (status) {
 405         case COROUTINE_READY: {
 408             getcontext(&C->ctx);
 409             C->ctx.uc_stack.ss_sp = C->stack;
 410             C->ctx.uc_stack.ss_size = S->stack_size;
 411             C->ctx.uc_stack.ss_flags = 0;
 412             C->ctx.uc_link = &S->main;
 413             S->running = id;
 414             C->status = COROUTINE_RUNNING;
 415             uintptr_t ptr = (uintptr_t) S;
 416             makecontext(&C->ctx, (void (*)(void)) mainfunc, 2,
 417             (uint32_t)ptr,  
 418             (uint32_t)(ptr>>32));
 419 
 420             swapcontext(&S->main, &C->ctx);
 422             break;
 423         }
 424         case COROUTINE_SUSPEND: {
 428             S->running = id;
 429             C->status = COROUTINE_RUNNING;
 430             swapcontext(&S->main, &C->ctx);
 431 
 432             break;
 433         }
 435         default:
 437             return kCO_COROUTINE_STATUS_ERROR;
 438     }
 439 
 440     return 0;
 441 }

 355 static void mainfunc(uint32_t low32, uint32_t hi32) {
 356     uintptr_t ptr = (uintptr_t) low32 | ((uintptr_t) hi32 << 32);
 357     struct schedule *S = (struct schedule *) ptr;
 358     int64_t id = S->running;
 359     struct coroutine *C = S->co_hash_map[id];
 360     if (C->func != NULL) {
 361         C->func(S, C->ud);
 362     } else {
 363         C->std_func();
 364     }
 365     S->co_free_list.push_back(C);
 366     S->co_free_num++;
 367 
 368     if (S->co_free_num > MAX_FREE_CO_NUM) {
 369         coroutine* co = S->co_free_list.front();
 370         _co_delete(co);
 371 
 372         S->co_free_list.pop_front();
 373         S->co_free_num--;
 374     }
 375 
 376     S->co_hash_map.erase(id);
 377     S->running = -1;
 379 }

以上根據(jù)stauts分別處理不同的邏輯囱皿,如果是COROUTINE_READY表示協(xié)程還未運行嘱腥,這里的邏輯同Phxrpc一樣:獲取上下文齿兔,設置棧大小和地址,然后設置協(xié)程執(zhí)行完后回到的主協(xié)程main医寿,并更新status靖秩,并設置協(xié)程入口函數(shù)mainfunc和參數(shù)等工作沟突,之后切換到該協(xié)程運行事扭;
mainfunc執(zhí)行具體的回調(diào)函數(shù)DoTask求橄,處理完后釋放協(xié)程對象資源条霜;
如果是COROUTINE_SUSPEND表示切入上一次被切出的協(xié)程宰睡,此時更新status并由swapcontext切換上下文,繼續(xù)運行麸恍;

下面是切出cpu協(xié)程的實現(xiàn):

 443 int32_t coroutine_yield(struct schedule * S) {
 444     if (NULL == S) {
 445         return kCO_INVALID_PARAM;
 446     }
 447 
 448     int64_t id = S->running;
 449     if (id < 0) {
 451         return kCO_NOT_IN_COROUTINE;
 452     }
 453 
 454     assert(id >= 0);
 455     struct coroutine * C = S->co_hash_map[id];
 456 
 457     if (C->status != COROUTINE_RUNNING) {
 459         return kCO_NOT_RUNNING;
 460     }
 462     C->status = COROUTINE_SUSPEND;
 463     S->running = -1;

 466     swapcontext(&C->ctx, &S->main);
 468     return C->result;
 469 }

以上整個分析是一個協(xié)程切入和切出cpu的工作原理抹沪。

協(xié)程之間的調(diào)度,超時事件的處理卦羡,可能有可讀寫事件的發(fā)生等逝薪,都要去處理,并執(zhí)行可能的回調(diào)函數(shù)步清;

還是以介紹協(xié)程基礎的時候舉的兩個hook socket api為例:

228 ssize_t read(int fd, void *buf, size_t nbyte)
229 {   
230     HOOK_SYS_FUNC(read);
231     
232     if (!co_is_enable_sys_hook()) {
233         return g_sys_read_func(fd, buf, nbyte);
234     }
235     rpchook_t *lp = get_by_fd(fd);
236     
237     if (!lp || (O_NONBLOCK & lp->user_flag)) { 
238         ssize_t ret = g_sys_read_func(fd, buf, nbyte);
239         return ret;
240     }
241     int timeout = (lp->read_timeout.tv_sec * 1000)
242                 + (lp->read_timeout.tv_usec / 1000);
243     
244     struct pollfd pf = { 0 };
245     pf.fd = fd; 
246     pf.events = (POLLIN | POLLERR | POLLHUP);
247     
248     int pollret = poll(&pf, 1, timeout);  
250     ssize_t readret = g_sys_read_func(fd, reinterpret_cast<char*>(buf), nbyte);
251     
252     if (readret < 0) {
255     }
257     return readret;
258 }

259 ssize_t write(int fd, const void *buf, size_t nbyte)
260 {   
261     HOOK_SYS_FUNC(write);
262 
263     if (!co_is_enable_sys_hook()) {
264         return g_sys_write_func(fd, buf, nbyte);
265     }
266     rpchook_t *lp = get_by_fd(fd);
267 
268     if (!lp || (O_NONBLOCK & lp->user_flag)) {
269         ssize_t ret = g_sys_write_func(fd, buf, nbyte);
270         return ret;
271     }
272 
273     size_t wrotelen = 0;
274     int timeout = (lp->write_timeout.tv_sec * 1000)
275                 + (lp->write_timeout.tv_usec / 1000);
276 
277     ssize_t writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
278 
279     if (writeret > 0) {
280         wrotelen += writeret;
281     }
282     while (wrotelen < nbyte) {
284         struct pollfd pf = {0};
285         pf.fd = fd;
286         pf.events = (POLLOUT | POLLERR | POLLHUP);
287         poll(&pf, 1, timeout);
288 
289         writeret = g_sys_write_func(fd, (const char*)buf + wrotelen, nbyte - wrotelen);
290 
291         if (writeret <= 0) {
292             break;
293         }
294         wrotelen += writeret;
295     }
296     return wrotelen;
297 }

427 int poll(struct pollfd fds[], nfds_t nfds, int timeout)
428 {
429     HOOK_SYS_FUNC(poll);
430 
431     if (!co_is_enable_sys_hook()) {
432         return g_sys_poll_func(fds, nfds, timeout);
433     }
434 
435     return co_poll(co_get_epoll_ct(), fds, nfds, timeout);
436 }

1027 int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout)
1028 {
1029     if (timeout > stTimeoutItem_t::eMaxTimeout) {
1030         timeout = stTimeoutItem_t::eMaxTimeout;
1031     }
1032     int epfd = ctx->iEpollFd;
1033    
1034     // 1.struct change
1035     stPoll_t arg;
1036     memset(&arg, 0, sizeof(arg));
1037 
1038     arg.iEpollFd = epfd;
1039     arg.fds = fds;
1040     arg.nfds = nfds;
1041 
1042     stPollItem_t arr[2];
1043     if (nfds < sizeof(arr) / sizeof(arr[0])) {
1044         arg.pPollItems = arr;
1045     } else {
1046         arg.pPollItems = reinterpret_cast<stPollItem_t*>(malloc(nfds * sizeof(stPollItem_t)));
1047     }
1048     memset(arg.pPollItems, 0, nfds * sizeof(stPollItem_t));
1049 
1050     arg.pfnProcess = OnPollProcessEvent;
1051     arg.co_id = get_curr_co_id();

1053     // 2.add timeout
1054     unsigned long long now = GetTickMS();
1055     arg.ullExpireTime = now + timeout;
1056     int ret = AddTimeout(ctx->pTimeout, &arg, now);
1057     if (ret != 0) {
1060         errno = EINVAL;
1061         return -__LINE__;
1062     }
1063 
1064     for (nfds_t i = 0; i < nfds; i++) {
1065         arg.pPollItems[i].pSelf = fds + i;
1066         arg.pPollItems[i].pPoll = &arg;
1067 
1068         arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
1069         struct epoll_event &ev = arg.pPollItems[i].stEvent;
1070 
1071         if (fds[i].fd > -1) {
1072             ev.data.ptr = arg.pPollItems + i;
1073             ev.events = PollEvent2Epoll(fds[i].events);
1074 
1075             epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i].fd, &ev);
1076         }
1077     }

1079     coroutine_yield(co_get_curr_thread_env()->co_schedule);
1080 
1081     RemoveFromLink<stTimeoutItem_t, stTimeoutItemLink_t>(&arg);
1082     for (nfds_t i = 0; i < nfds; i++) {
1083         int fd = fds[i].fd;
1084         if (fd > -1) {
1085             epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &arg.pPollItems[i].stEvent);
1086         }
1087     }
1088 
1089     if (arg.pPollItems != arr) {
1090         free(arg.pPollItems);
1091         arg.pPollItems = NULL;
1092     }
1093     return arg.iRaiseCnt;
1094 }

對于accept fd,設置了非阻塞疮装,當讀或?qū)懙臅r候,需要監(jiān)聽相應的事件刷袍,為此,每個線程都會有一個struct stCoRoutineEnv_t對象:

  66 struct stCoEpoll_t {
  67     int iEpollFd;
  68     static const int _EPOLL_SIZE = 1024 * 10;
  69 
  70     struct stTimeout_t *pTimeout;
  72     struct stTimeoutItemLink_t *pstTimeoutList;
  74     struct stTimeoutItemLink_t *pstActiveList;
  76     co_epoll_res *result;
  77 };

coroutine_open的時候初始化:

  37 struct stCoRoutineEnv_t {
  38     schedule* co_schedule;
  39     stCoEpoll_t *pEpoll;
  40 };
 942 void OnPollProcessEvent(stTimeoutItem_t * ap)
 943 {
 944     coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
 945 }

1050     arg.pfnProcess = OnPollProcessEvent;
1051     arg.co_id = get_curr_co_id();

以上兩行當事件發(fā)生時的回調(diào)函數(shù)雷酪,里面會resume對應id的協(xié)程。

1055     arg.ullExpireTime = now + timeout;
1056     int ret = AddTimeout(ctx->pTimeout, &arg, now);

以上是添加超時處理省骂,代碼行1064?1077是依次監(jiān)聽fd钞澳,OnPollPreparePfn當事件發(fā)生時會設置event并從超時鏈表中移走,并添加到活躍鏈表中兰吟,重點關(guān)注下這種使用方法:ev.data.ptr = arg.pPollItems + i混蔼,當epoll有事件或超時返回惭嚣,需要從ev.data.ptr獲得相關(guān)的信息:stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr)
然后進行coroutine_yield(co_get_curr_thread_env()->co_schedule)切出該協(xié)程槽地;
如果當事件發(fā)生時,再切回來,代碼行1079?1088從超時鏈表中稱走捌蚊,并從epoll中刪除相關(guān)的fd集畅,釋放資源等;

最后是一個eventloop類似的wait功能:

 965 void co_update()
 966 {
 967     stCoEpoll_t* ctx = co_get_epoll_ct();
             //more code....
 974     co_epoll_res *result = ctx->result;
 975     int ret = epoll_wait(ctx->iEpollFd, result->events, stCoEpoll_t::_EPOLL_SIZE, 1);
 976 
 977     stTimeoutItemLink_t *active = (ctx->pstActiveList);
 978     stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
 979 
 980     memset(active, 0, sizeof(stTimeoutItemLink_t));
 981     memset(timeout, 0, sizeof(stTimeoutItemLink_t));
 982 
 983     for (int i = 0; i < ret; i++)
 984     {   
 985         stTimeoutItem_t *item = reinterpret_cast<stTimeoutItem_t*>(result->events[i].data.ptr);
 986         if (item->pfnPrepare) {
 987             item->pfnPrepare(item, result->events[i], active);
 988         } else {
 989             AddTail(active, item);
 990         }
 991     }
 993     unsigned long long now = GetTickMS();
 994     TakeAllTimeout(ctx->pTimeout, now, timeout);
 995 
 996     stTimeoutItem_t *lp = timeout->head;
 997     while (lp) {
 998         lp->bTimeout = true;
 999         lp = lp->pNext;
1000     }
1001 
1002     Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);
1003 
1004     lp = active->head;
1005     while (lp) {
1006 
1007         PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
1008         if (lp->pfnProcess) {
1009             lp->pfnProcess(lp);
1010         }
1011 
1012         lp = active->head;
1013     }
1014 }

上面的實現(xiàn)就是使用epoll阻塞一毫秒逢勾,有事件發(fā)生時依次執(zhí)行pfnProcess牡整,即OnPollProcessEvent,resume這個協(xié)程:

 942 void OnPollProcessEvent(stTimeoutItem_t * ap)
 943 {   
 944     coroutine_resume(co_get_curr_thread_env()->co_schedule, ap->co_id);
 945 }

沒有就是超時溺拱;然后處理超時情況逃贝,和上面的邏輯一樣;其他有些更細節(jié)的沒作過多分析沐扳,整體的實現(xiàn)原理和Phxrpc差不多杨拐,包括和后面分析的libco協(xié)程屋吨。

類似的協(xié)程實現(xiàn),網(wǎng)上有很多方案,再比如libgo懊缺,這個有時間會去研究下途蒋,協(xié)程的棧不能設置過大,看過的幾個實現(xiàn)都是128kb号胚;一般會包括調(diào)度器杜漠,定時器锈至,協(xié)程池,eventloop,重點關(guān)注下協(xié)程的切換和狀態(tài)迄损,以及入口函數(shù)拗引;其他更底層的關(guān)于性能方面的断部,比如少用malloc/free和new/delete等蔑祟,綁定cpu等锄贷,減少線程在cpu間切換導致cache miss等,這些可以參考下brpc設計勿决,如果服務器架構(gòu)支持NUMA的玩般,可以參考下DPDK/SPDK 的典型應用,后期也會找些時間去研究它倆的一些關(guān)鍵技術(shù)實現(xiàn)洒忧。

這幾個協(xié)程都是stackfull的,且棧大小固定惩系,難免會有點浪費纠屋,在結(jié)束libco分析后,會列幾個優(yōu)化它的地方涨薪。

https://www.zhihu.com/question/65647171

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末骑素,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子刚夺,更是在濱河造成了極大的恐慌献丑,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侠姑,死亡現(xiàn)場離奇詭異创橄,居然都是意外死亡,警方通過查閱死者的電腦和手機莽红,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門妥畏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人安吁,你說我怎么就攤上這事醉蚁。” “怎么了鬼店?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵网棍,是天一觀的道長。 經(jīng)常有香客問我妇智,道長滥玷,這世上最難降的妖魔是什么氏身? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮惑畴,結(jié)果婚禮上蛋欣,老公的妹妹穿的比我還像新娘。我一直安慰自己桨菜,他們只是感情好豁状,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著倒得,像睡著了一般。 火紅的嫁衣襯著肌膚如雪夭禽。 梳的紋絲不亂的頭發(fā)上霞掺,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機與錄音讹躯,去河邊找鬼菩彬。 笑死,一個胖子當著我的面吹牛潮梯,可吹牛的內(nèi)容都是我干的骗灶。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼秉馏,長吁一口氣:“原來是場噩夢啊……” “哼耙旦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起萝究,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤免都,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后帆竹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绕娘,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年栽连,在試婚紗的時候發(fā)現(xiàn)自己被綠了险领。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡秒紧,死狀恐怖绢陌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情噩茄,我是刑警寧澤下面,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站绩聘,受9級特大地震影響沥割,放射性物質(zhì)發(fā)生泄漏耗啦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一机杜、第九天 我趴在偏房一處隱蔽的房頂上張望帜讲。 院中可真熱鬧,春花似錦椒拗、人聲如沸似将。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽在验。三九已至,卻和暖如春堵未,著一層夾襖步出監(jiān)牢的瞬間腋舌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工渗蟹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留块饺,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓雌芽,卻偏偏與公主長得像授艰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子世落,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容