這是最后一篇了抖拴,其實(shí)還有很多重要的模塊要分析的铆遭,但留給以后有多余時(shí)間再去研究吧,有興趣的可以自行下載源碼分析笙僚。這部分主要是圍繞第三小問(wèn)題展開,并附加些其他skynet中與此有關(guān)的設(shè)計(jì)灵再,即:當(dāng)并發(fā)時(shí)肋层,如何保證消息的正確時(shí)序,以及如何使用協(xié)程處理消息(同步/異步/超時(shí))翎迁;包括創(chuàng)建協(xié)程處理消息栋猖,掛起協(xié)程,切換汪榔。這塊其實(shí)是針對(duì)lua上層來(lái)說(shuō)的蒲拉,底層框架的消息隊(duì)列只是保證消息順序入隊(duì)列且出隊(duì)列,如果交叉執(zhí)行比如lua層的協(xié)程掛起痴腌,那么就會(huì)出現(xiàn)時(shí)序問(wèn)題雌团。
先簡(jiǎn)單回顧下前幾篇博客的分析,包括skynet本身的設(shè)計(jì)士聪,及C++協(xié)程锦援。對(duì)于C++協(xié)程,比如一個(gè)請(qǐng)求a過(guò)來(lái)后剥悟,從協(xié)程池中pop一個(gè)協(xié)程并處理該請(qǐng)求a灵寺,如果需要等待,則讓出協(xié)程并掛上定時(shí)器区岗,然后再處理下一個(gè)請(qǐng)求b略板,如果此時(shí)a和b是相關(guān)聯(lián)的,且b有可能依賴于a的執(zhí)行結(jié)果慈缔,那么就會(huì)出現(xiàn)問(wèn)題羡亩。這對(duì)于游戲中的業(yè)務(wù)來(lái)說(shuō)弊决,尤其涉及到金錢相關(guān)的邏輯凶赁,那是大問(wèn)題缕坎。而那種獨(dú)立的請(qǐng)求間,只是讀之類的操作教藻,那是沒問(wèn)題的距帅。如果需要結(jié)合業(yè)務(wù),那么就需要改造括堤。
而對(duì)于skynet來(lái)說(shuō)碌秸,當(dāng)并發(fā)上來(lái)時(shí),考慮到這個(gè)時(shí)序問(wèn)題悄窃,底層實(shí)現(xiàn)相關(guān)的順序隊(duì)列讥电,大概思路就是lua協(xié)程執(zhí)行a到一半后,哪怕有b的消息被協(xié)程調(diào)度處理轧抗,此時(shí)會(huì)把這個(gè)b協(xié)程壓入隊(duì)列(lua中的table也可以恩敌,使用數(shù)組部分),必須等a執(zhí)行完畢或超時(shí)后横媚,再處理b的纠炮,也就是在業(yè)務(wù)上層串行化了服務(wù)的消息處理。這樣保證了時(shí)序灯蝴。
但這又引起了另一個(gè)問(wèn)題恢口,即可能存在后面的消息都超時(shí)了,然而上層如果無(wú)法識(shí)別繼續(xù)處理穷躁,那么就白白浪費(fèi)了資源耕肩,處理了無(wú)用的消息。這類的相關(guān)介紹在另一篇“談?wù)劸彺娲┩秆┍篮瓦^(guò)載保護(hù)以及一致性等問(wèn)題”中有相關(guān)的介紹及應(yīng)對(duì)方案问潭。
本節(jié)分為兩個(gè)小點(diǎn)討論猿诸,即:
1)如何保證消息的正確時(shí)序,以及如何使用協(xié)程處理消息(同步/異步/超時(shí))睦授;
2)創(chuàng)建協(xié)程處理消息两芳,掛起協(xié)程,切換去枷;
第一小點(diǎn)怖辆,撇開語(yǔ)言方面的限制,考慮skynet本身的框架設(shè)計(jì)删顶,而不摻雜業(yè)務(wù)框架的設(shè)計(jì)竖螃。對(duì)于單進(jìn)程多線程,要想并發(fā)的處理同一個(gè)客戶端的請(qǐng)求逗余,不管是讀還是寫特咆,都必須路由到同一個(gè)線程處理,這樣就保證了不會(huì)導(dǎo)致同一個(gè)client的請(qǐng)求分發(fā)到不同的線程,在skynet底層抽象client為一個(gè)agent service腻格,有自己的消息隊(duì)列画拾,并且當(dāng)工作線程處理這個(gè)agent消息時(shí),先把這個(gè)消息隊(duì)列從全局隊(duì)列出摘出來(lái)菜职,從這個(gè)隊(duì)列pop一條消息青抛,處理完畢后,再把這個(gè)消息隊(duì)列掛到全局隊(duì)列中酬核;而對(duì)于push消息到agent隊(duì)列則沒有這種過(guò)程蜜另,只要獲得自旋鎖即可,相關(guān)源碼可以見前面的分析嫡意。
這一層就保證了消息不會(huì)亂序举瑰,但是對(duì)于業(yè)務(wù)層,使用lua協(xié)程來(lái)提高并發(fā)蔬螟,那么就要好好設(shè)計(jì)此迅。
這里舉例比如在主場(chǎng)景中,這樣可以考慮到client的所有消息都路由到場(chǎng)景后需要考慮到的時(shí)序問(wèn)題促煮。
當(dāng)與client有關(guān)的兩條有依賴關(guān)系的消息a和b被場(chǎng)景服務(wù)dispatch分發(fā)處理時(shí)邮屁,不考慮讀還是寫,都會(huì)創(chuàng)建一個(gè)協(xié)程菠齿,并執(zhí)行相關(guān)的處理函數(shù)佑吝。比如數(shù)據(jù)安全性不是特別嚴(yán)重的例子,玩家在幫派中绳匀,然后點(diǎn)領(lǐng)取今日獎(jiǎng)勵(lì)b消息芋忿,此時(shí)幫主把玩家踢出幫派a消息,本來(lái)是a先執(zhí)行完畢后再b執(zhí)行的順序疾棵,這時(shí)可能出現(xiàn)a先執(zhí)行導(dǎo)致掛起戈钢,而b執(zhí)行完畢后,接著執(zhí)行a的情況是尔,多領(lǐng)了一份獎(jiǎng)勵(lì)殉了。當(dāng)然這里只是為了舉例,通過(guò)檢查可以避免這種問(wèn)題拟枚。
簡(jiǎn)單分析下薪铜,在skynet的做法中,為每個(gè)服務(wù)加個(gè)lua層的消息隊(duì)列恩溅,進(jìn)入該隊(duì)列的消息會(huì)被依次處理完畢隔箍,不管中間是否掛起,這樣帶來(lái)的問(wèn)題是脚乡,并發(fā)度降底了且引入了一定的復(fù)雜度蜒滩。
17 dispatch = function(session, from, ...)
18 table.insert(message_queue, {session = session, addr = from, ... })
19 if thread_id then //有消息,如果有等待則wakeup
20 skynet.wakeup(thread_id)
21 thread_id = nil
22 end
23 end
26 local function do_func(f, msg)
27 return pcall(f, table.unpack(msg))
28 end
29
30 local function message_dispatch(f)
31 while true do
32 if #message_queue==0 then //沒消息則掛起
33 thread_id = coroutine.running()
34 skynet.wait()
35 else
36 local msg = table.remove(message_queue,1) //依次處理消息
37 local session = msg.session
38 if session == 0 then //不需要響應(yīng)
39 local ok, msg = do_func(f, msg)
40 if ok then
41 if msg then
42 skynet.fork(message_dispatch,f)
44 end
45 else
46 skynet.fork(message_dispatch,f)
48 end
49 else
50 local data, size = skynet.pack(do_func(f,msg))
51 -- 1 means response
52 c.send(msg.addr, 1, session, data, size) //需要響應(yīng)
53 end
54 end
55 end
56 end
上面代碼實(shí)現(xiàn)細(xì)節(jié)不作過(guò)多分析,簡(jiǎn)單注釋了下俯艰,大致就是從table數(shù)組中remove前面的消息并處理之捡遍,如果會(huì)掛起則等響應(yīng)結(jié)果或超時(shí),再處理下一條蟆炊。
如上面的實(shí)現(xiàn)稽莉,新消息來(lái)了fork一個(gè)協(xié)程處理:
533 function skynet.fork(func,...)
534 local args = table.pack(...) //打包參數(shù)
535 local co = co_create(function()
536 func(table.unpack(args,1,args.n)) //設(shè)置協(xié)程執(zhí)行函數(shù)和參數(shù)
537 end)
538 table.insert(fork_queue, co) //回收協(xié)程資源
539 return co
540 end
104 local function co_create(f)
105 local co = table.remove(coroutine_pool)
106 if co == nil then
107 co = coroutine.create(function(...)
108 f(...)
109 while true do
110 local session = session_coroutine_id[co]
111 if session and session ~= 0 then
112 local source = debug.getinfo(f,"S")
//log error
117 end
118 f = nil
119 coroutine_pool[#coroutine_pool+1] = co
120 f = coroutine_yield "EXIT"
121 f(coroutine_yield())
122 end
123 end)
124 else
125 coroutine_resume(co, f)
126 end
127 return co
128 end
上面co_create就從協(xié)程池中取一個(gè)協(xié)程對(duì)象處理消息,如果沒有協(xié)程對(duì)象則創(chuàng)建涩搓。你一定會(huì)好奇執(zhí)行完后,返回結(jié)果在哪劈猪?
對(duì)于lua的協(xié)程api昧甘,當(dāng)create協(xié)程時(shí)它的狀態(tài)還沒開始,處于掛起suspended狀態(tài)战得,然后resume后會(huì)處理running狀態(tài)充边,執(zhí)行完后為dead狀態(tài),引用下面的:
a)coroutine.create(arg):根據(jù)一個(gè)函數(shù)創(chuàng)建一個(gè)協(xié)同程序常侦,參數(shù)為一個(gè)函數(shù)浇冰;
b)coroutine.resume(co):使協(xié)同從掛起變?yōu)檫\(yùn)行(1)激活coroutine,也就是讓協(xié)程函數(shù)開始運(yùn)行;(2)喚醒yield聋亡,使掛起的協(xié)同接著上次的地方繼續(xù)運(yùn)行肘习。該函數(shù)可以傳入?yún)?shù);
c)coroutine.yield():使正在運(yùn)行的協(xié)同掛起坡倔,可以傳入?yún)?shù)漂佩;
而真正強(qiáng)大之處在于當(dāng)?shù)诙蝦esume時(shí),resume和yield相關(guān)交換數(shù)據(jù)罪塔,具體怎么交互的建議看下lua協(xié)程基礎(chǔ)投蝉。
在skynet中進(jìn)行了對(duì)lua原始協(xié)程api進(jìn)行封裝并管理,下面說(shuō)明第二個(gè)小點(diǎn)征堪,當(dāng)然會(huì)把第一小點(diǎn)也部分說(shuō)明下瘩缆,畢竟是個(gè)整體,從創(chuàng)建到處理到回收佃蚜,以及中間的注意點(diǎn)庸娱。通過(guò)幾個(gè)常用的接口來(lái)說(shuō)明這套工作流程。
以下實(shí)現(xiàn)是wakeup
相關(guān):
493 function skynet.wakeup(token)
494 if sleep_session[token] then
495 table.insert(wakeup_queue, token) //在下一次suspend時(shí)被處理
496 return true
497 end
498 end
339 function skynet.wait(token)
340 local session = c.genid()
341 token = token or coroutine.running()
342 local ret, msg = coroutine_yield("SLEEP", session, token)//切出協(xié)程(A)
343 sleep_session[token] = nil //協(xié)程切回來(lái)重置相關(guān)數(shù)據(jù)
344 session_id_coroutine[session] = nil
345 end
130 local function dispatch_wakeup()
131 local token = table.remove(wakeup_queue,1)
132 if token then
133 local session = sleep_session[token]
134 if session then
135 local co = session_id_coroutine[session]
136 local tag = session_coroutine_tracetag[co]
137 if tag then c.trace(tag, "resume") end
138 session_id_coroutine[session] = "BREAK"
139 return suspend(co, coroutine_resume(co, false, "BREAK"))(B) 調(diào)度被掛起的協(xié)程
140 end
141 end
142 end
157 function suspend(co, result, command, param, param2)
//more code
183 elseif command == "SLEEP" then
184 local tag = session_coroutine_tracetag[co]
185 if tag then c.trace(tag, "sleep", co, 2) end
186 session_id_coroutine[param] = co
187 if sleep_session[param2] then
188 error(debug.traceback(co, "token duplicative"))
189 end
190 sleep_session[param2] = param
307 dispatch_wakeup()
308 dispatch_error_queue()
309 end
把要喚醒的協(xié)程通過(guò)token插入到wakeup_queue
數(shù)組中(注意下爽锥,很多實(shí)現(xiàn)邏輯是使用table的數(shù)組部分涌韩,因?yàn)橛行虻珟?lái)的問(wèn)題是從索引x處刪除元素后,涉及到移動(dòng))
然后dispatch_wakeup
會(huì)處理wakeup_queue
氯夷,重點(diǎn)是這一句return suspend(co, coroutine_resume(co, false, "BREAK"))
臣樱,這部分在后面分析。
(A)處把當(dāng)前協(xié)程切出去后,那三個(gè)參數(shù)作為主協(xié)程的返回值雇毫,即coroutine_resume
的返回值玄捕,再加一個(gè)本身返回的true or false,然后調(diào)用suspend
棚放,同理coroutine_resume
的后兩個(gè)參數(shù)作為coroutine_yield
的返回值枚粘。
以上部分還是比較容易理解,這里可以結(jié)合c++協(xié)程中的實(shí)現(xiàn)飘蚯,有專門的協(xié)程調(diào)度器馍迄,要么超時(shí)要么有數(shù)據(jù)過(guò)來(lái)(響應(yīng))進(jìn)而切回相應(yīng)的協(xié)程處理。
不過(guò)經(jīng)歷過(guò)的項(xiàng)目貌似沒有那種加限時(shí)的請(qǐng)求局骤,如果call長(zhǎng)時(shí)間收不到響應(yīng)攀圈,可能會(huì)出問(wèn)題,這個(gè)需要多研究下峦甩。不過(guò)赘来,結(jié)合skynet基礎(chǔ)實(shí)現(xiàn)也好辦;另外底層框架也是skynet凯傲,lua層的源碼部分都有返回犬辰,不管正確還是失敗都會(huì)返回,除非這條call請(qǐng)求消息根本沒有被目標(biāo)服務(wù)的消息隊(duì)列收到(可能出錯(cuò))冰单,或者沒有被工作線程調(diào)度幌缝,再或者沒有被上層服務(wù)處理;前者可能基本為零球凰,第一種可能性不大狮腿,因?yàn)榭蚣芤呀?jīng)保證消息一定會(huì)被發(fā)送到消息隊(duì)列中(消息隊(duì)列目前是無(wú)界的),而后面兩種可能確實(shí)存在呕诉,比如一個(gè)死循環(huán)或者處理耗時(shí)的功能等缘厢,這些只能靠開發(fā)人員注意及必要code review了。
311 function skynet.timeout(ti, func)
312 local session = c.intcommand("TIMEOUT",ti)
313 assert(session)
314 local co = co_create(func)
315 assert(session_id_coroutine[session] == nil)
316 session_id_coroutine[session] = co
317 end
318
319 function skynet.sleep(ti, token)
320 local session = c.intcommand("TIMEOUT",ti)
321 assert(session)
322 token = token or coroutine.running()
323 local succ, ret = coroutine_yield("SLEEP", session, token)
324 sleep_session[token] = nil
325 if succ then
326 return
327 end
328 if ret == "BREAK" then
329 return "BREAK"
330 else
331 error(ret)
332 end
333 end
上面就是超時(shí)的實(shí)現(xiàn)甩挫,也即弄個(gè)協(xié)程贴硫,向skynet框架注冊(cè)個(gè)定時(shí)器,當(dāng)超時(shí)時(shí)伊者,發(fā)條消息到上層英遭,上層創(chuàng)建協(xié)程處理。這個(gè)跟c++協(xié)程一樣亦渗,實(shí)現(xiàn)中不能有sleep這種調(diào)用挖诸,只能用超時(shí),然后掛到事件列表中法精,超時(shí)resume協(xié)程回調(diào)多律,不然阻塞其他痴突。
剩下的不過(guò)多分析,這三篇只是簡(jiǎn)單分析了個(gè)大概狼荞,還有蠻多值得學(xué)習(xí)辽装,關(guān)鍵在于思考為什么要這么做,可以根據(jù)自己的經(jīng)驗(yàn)相味,去嘗試改進(jìn)或在github上提pr拾积,分析別人的設(shè)計(jì),可能并不像作者一路踩坑過(guò)來(lái)丰涉,并持續(xù)重構(gòu)那樣拓巧,恰到好處的設(shè)計(jì)。
接下來(lái)的一篇準(zhǔn)備研究下鎖的性能一死,主要是對(duì)前幾天學(xué)習(xí)的一個(gè)總結(jié)玲销。
skynet 中 Lua 服務(wù)的消息處理
Lua中的協(xié)同程序 coroutine
Lua Coroutine詳解
skynet 里的 coroutine
skynet coroutine 運(yùn)行筆記