Skynet是多線程框架,其中對(duì)應(yīng)了一些服務(wù)(Service),每個(gè)服務(wù)對(duì)應(yīng)一個(gè)Lua虛擬機(jī)嗤瞎,一個(gè)虛擬機(jī)上可以跑多個(gè)協(xié)程,但同一時(shí)刻只能有一個(gè)協(xié)程听系,每條消息處理由協(xié)程來(lái)完成贝奇,且運(yùn)行在保護(hù)模式下。Lua層實(shí)現(xiàn)的協(xié)議池和時(shí)序相關(guān)的隊(duì)列靠胜,可以類比C++協(xié)程相關(guān)實(shí)現(xiàn)掉瞳。
Skynet中的協(xié)程
Skynet本質(zhì)上只是一個(gè)消息分發(fā)器,以服務(wù)為單位并給每個(gè)服務(wù)分配一個(gè)獨(dú)立的ID浪漠,可以從任意服務(wù)向另一個(gè)服務(wù)發(fā)送消息陕习。在此基礎(chǔ)上,在服務(wù)中接入Lua虛擬機(jī)址愿,并將消息收發(fā)的API封裝成了Lua模塊该镣。
目前使用Lua編寫(xiě)的服務(wù)在最底層只有一個(gè)入口,就是接收和處理一條Skynet框架轉(zhuǎn)發(fā)過(guò)來(lái)的消息响谓∷鸷希可以通過(guò)skynet.core.callback
這個(gè)內(nèi)部用C編寫(xiě)的API(通常由skynet.start
調(diào)用),把一個(gè)Lua函數(shù)設(shè)置到所屬的服務(wù)模塊中娘纷。
每個(gè)模塊必須設(shè)置且只能設(shè)置一個(gè)回調(diào)函數(shù)嫁审,這個(gè)回調(diào)函數(shù)在每次收到一條消息時(shí),會(huì)接收5個(gè)參數(shù):消息類型赖晶、消息指針律适、消息長(zhǎng)度、消息Session嬉探、消息來(lái)源擦耀。
消息分為兩類:
- 別人對(duì)你發(fā)起的請(qǐng)求
- 你過(guò)去對(duì)外的請(qǐng)求所收到的回應(yīng)
無(wú)論是哪一類,都是通過(guò)同一個(gè)回調(diào)函數(shù)進(jìn)入涩堤。在實(shí)際使用Skynet時(shí)可以直接使用rpc
的語(yǔ)法眷蜓,向外部服務(wù)發(fā)起一個(gè)遠(yuǎn)程調(diào)用,等待對(duì)方發(fā)送了回應(yīng)消息后胎围,邏輯接著向下走吁系。那么德召,框架如何把回調(diào)函數(shù)的模式轉(zhuǎn)換為阻塞API調(diào)用的形式的呢?這多虧了Lua支持協(xié)程coroutine
汽纤,使一段代碼運(yùn)行了一半時(shí)掛起上岗,在之后合適的時(shí)候再繼續(xù)運(yùn)行。
為了實(shí)現(xiàn)這點(diǎn)蕴坪,需要在收到每條請(qǐng)求消息時(shí)先創(chuàng)建一個(gè)協(xié)程肴掷,在協(xié)程中去運(yùn)行該類消息的dispatch
函數(shù),可使用框架中skynet.dispath
函數(shù)設(shè)置消息的處理函數(shù)背传。之所以必須先創(chuàng)建協(xié)程而不能直接調(diào)用消息處理函數(shù)呆瞻,是因?yàn)闊o(wú)法預(yù)知在消息處理的過(guò)程中會(huì)不會(huì)因?yàn)樽枞鸄PI而需要掛起執(zhí)行流程。等到第一次需要掛起時(shí)才把執(zhí)行流程綁定到協(xié)程上是做不到的径玖。
接著痴脾,所有的阻塞都通過(guò)coroutine.yield
函數(shù)掛起當(dāng)前協(xié)程,并把掛起類型以及可能用到的數(shù)據(jù)傳出來(lái)梳星≡蘩担框架會(huì)捕獲這些參數(shù)也就進(jìn)一步知道去做什么,也也就解釋了阻塞API為什么必須在消息處理函數(shù)中調(diào)用冤灾,而不能直接卸載服務(wù)的主體代碼中的原因前域。因?yàn)槌跏蓟糠值拇a并不運(yùn)行在框架創(chuàng)建出來(lái)的協(xié)程中。
例如:對(duì)于skynet.call
其實(shí)是生成一個(gè)對(duì)當(dāng)前服務(wù)來(lái)說(shuō)唯一的session
號(hào)瞳购,調(diào)用yield
給框架發(fā)送CALL
指令话侄】魍疲框架中的resume
捕獲到CALL
之后学赛,會(huì)把Session和Coroutine對(duì)象記錄在表中,然后掛起協(xié)程吞杭,并結(jié)束當(dāng)前的回調(diào)函數(shù)盏浇。等待Skynet底層框架后續(xù)消息進(jìn)來(lái)時(shí)再處理。實(shí)際上芽狗,這里還會(huì)處理skynet.fork
創(chuàng)建的額外線程绢掰。
服務(wù)調(diào)度API
local skynet = require "skynet"
skynet.sleep(time)
設(shè)置當(dāng)前任務(wù)休眠等待的微秒數(shù)
skynet.fork(func, ...)
fork
用于創(chuàng)建并啟動(dòng)新任務(wù)
fork
用于啟動(dòng)一個(gè)新的任務(wù)去執(zhí)行函數(shù)func
,實(shí)質(zhì)上它是開(kāi)了一個(gè)協(xié)程童擎,函數(shù)調(diào)用完成后會(huì)返回線程句柄滴劲。雖然可以使用原生的coroutine.create
來(lái)創(chuàng)建協(xié)程,但這樣做會(huì)打亂Skynet的工作流程顾复。
skynet.yield()
yield
讓出當(dāng)前任務(wù)執(zhí)行流程
yield
會(huì)讓出當(dāng)前任務(wù)執(zhí)行流程班挖,使本服務(wù)內(nèi)其它任務(wù)有機(jī)會(huì)執(zhí)行,隨后會(huì)繼續(xù)運(yùn)行芯砸。
skynet.wait()
wait
讓出當(dāng)前任務(wù)執(zhí)行流程直到使用wakeup
喚醒它
skynet.wakeup(co)
wakeup
用于喚醒使用wait
或sleep
后處于等待狀態(tài)的任務(wù)
skynet.timeout(time, func)
timeout
用于設(shè)定一個(gè)定時(shí)觸發(fā)函數(shù)func
萧芙,在time * 0.01s
后觸發(fā)给梅。
skynet.starttime()
starttime
用于返回當(dāng)前進(jìn)程的啟動(dòng)UTC時(shí)間(秒)
skynet.now()
now
用于返回當(dāng)前進(jìn)程啟動(dòng)后經(jīng)過(guò)的時(shí)間(微秒)
skynet.time()
time
用于通過(guò)starttime
和now
計(jì)算出當(dāng)前UTC時(shí)間秒數(shù)
sleep 休眠 定時(shí)器
skynet.sleep(ti)
函數(shù)是將當(dāng)前協(xié)程掛起ti
個(gè)單位時(shí)間,一個(gè)單位時(shí)間是1/100秒双揪。sleep
向框架注冊(cè)注冊(cè)了一個(gè)定時(shí)器的實(shí)現(xiàn)动羽,框架會(huì)在ti
時(shí)間后發(fā)送一個(gè)定時(shí)器消息來(lái)喚醒這個(gè)協(xié)程。sleep
函數(shù)是一個(gè)阻塞API渔期,返回值nil
會(huì)告訴你時(shí)間到了运吓,返回值BREAK
則表示被skynet.wakeup
給喚醒了。
$ cd skynet
$ vim demo/service_sleep.lua
local skynet = require "skynet"
skynet.start(function()
skynet.error("sleep begin")
skynet.sleep(300)
skynet.error("sleep end")
end)
$ cp example/config demo/config
$ cp example/config.path demo/config.path
$ vim demo/config.path
# 將example替換為demo
$ vim demo/config
# 將main替換為service_sleep
$ ./skynet demo/config
[:01000009] sleep begin
service_sleep # 手工輸入
[:01000009] sleep end
[:01000002] KILL self
注意:在console
服務(wù)中輸入service_sleep
后會(huì)發(fā)現(xiàn)疯趟,新服務(wù)不會(huì)立即啟動(dòng)羽德,因?yàn)?code>console服務(wù)正忙于第一個(gè)服務(wù)的初始化,需要等待3秒后新服務(wù)才會(huì)被console
處理迅办。這種做法實(shí)際上是錯(cuò)誤的宅静,在skynet.start
中服務(wù)初始化中是不允許有阻塞的存在,服務(wù)初始化要求盡量快的執(zhí)行完成站欺,所以業(yè)務(wù)邏輯代碼一般不應(yīng)該寫(xiě)在skynet.start
函數(shù)中姨夹。
fork 在服務(wù)中開(kāi)啟新線程
在Skynet的服務(wù)中,可以開(kāi)啟一個(gè)新的線程用來(lái)處理業(yè)務(wù)矾策,注意這里的線程并非傳統(tǒng)意義上的線程磷账,而更像是虛擬線程,其實(shí)是通過(guò)協(xié)程來(lái)模擬的贾虽。
在Skynet中所有的Lua層函數(shù)都是以協(xié)程的方式被執(zhí)行的逃糟,包括skynet.fork
產(chǎn)生的函數(shù)。除非在skynet.start
之外調(diào)用函數(shù)蓬豁,由于start
函數(shù)調(diào)用timeout
產(chǎn)生協(xié)程绰咽,而fork
則產(chǎn)生的是協(xié)程列表
Lua層設(shè)置的回調(diào)函數(shù)skynet.dispatch_message
主要調(diào)用了raw_dispatch_message
,這里才是驅(qū)動(dòng)協(xié)程函數(shù)執(zhí)行的位置地粪。一個(gè)協(xié)程結(jié)束或掛起后將由suspend
函數(shù)來(lái)接管取募。
如果入口函數(shù)start
中沒(méi)有調(diào)用fork
、sleep
蟆技、wait
之類的函數(shù)玩敏,那么驅(qū)動(dòng)start
執(zhí)行的消息將結(jié)束。
$ vim demo/service_fork.lua
local skynet = require "skynet"
function task(timeout)
skynet.error("coroutine fork: ", coroutine.running())
skynet.error("sleep begin ", timeout)
skynet.sleep(timeout)
skynet.error("sleep end")
end
skynet.start(function()
skynet.error("coroutine start: ", coroutine.running())
-- 開(kāi)啟新線程來(lái)執(zhí)行task任務(wù)
skynet.fork(task, 500)
end)
$ vim demo/config
start = "service_fork"
$ ./skynet demo/service_fork
[:01000009] coroutine start: thread: 0x7f173b4f1068 false
[:01000009] coroutine fork: thread: 0x7f173b4f1148 false
[:01000009] sleep begin 500
[:01000002] KILL self
[:01000009] sleep end
注意:可以看到當(dāng)service_fork
啟動(dòng)后质礼,console
服務(wù)仍然可以接收終端輸入的服務(wù)并啟動(dòng)旺聚。若是需要長(zhǎng)時(shí)間運(yùn)行并且出現(xiàn)阻塞的情況,可使用skynet.fork
創(chuàng)建新的協(xié)程來(lái)運(yùn)行眶蕉。
查看skynet/lualib/skynet.lua
文件可知道skynet.fork()
函數(shù)其實(shí)是使用的coroutine.create()
函數(shù)來(lái)實(shí)現(xiàn)的砰粹。
$ vim lublib/skynet.lua
local coroutine_pool = setmetable({}, {__mode = "kv"})
local function co_create(f)
local co = table.remove(coroutine_pool)
if co==nil then
co = coroutine.create(function(...)
-- 函數(shù)執(zhí)行完畢
f(...)
while true do
f = nil
coroutine_pool[#coroutine_pool + 1] = co
-- 協(xié)程掛起,將由suspend函數(shù)接管妻坝,執(zhí)行cmd=="TEXT"分支
f = coroutine_yield "EXIT"
f(coroutine_yield())
end
end)
else
-- 回到前一次消息掛起的位置返回的f就是要執(zhí)行的
coroutine_resume(co, f)
end
return co
end
每次使用skynet.fork()
其實(shí)都是從協(xié)程池中獲取未被使用的協(xié)程伸眶,并把該協(xié)程加入到fork
隊(duì)列中惊窖,等待一個(gè)消息調(diào)度,然會(huì)會(huì)一次把fork
隊(duì)列中的協(xié)程拿出來(lái)執(zhí)行一遍厘贼。執(zhí)行結(jié)束后會(huì)把協(xié)程重新丟入?yún)f(xié)程池中界酒,這樣可避免重復(fù)開(kāi)啟和關(guān)閉協(xié)程帶來(lái)的額外開(kāi)銷。
案例:長(zhǎng)時(shí)間占用執(zhí)行權(quán)限的任務(wù)