在實現(xiàn)業(yè)務(wù)邏輯的過程中,各個邏輯一般會抽象成一個服務(wù),例如游戲中的登錄服務(wù),訪問數(shù)據(jù)庫服務(wù),創(chuàng)建房間服務(wù)等等.服務(wù)與服務(wù)之間肯定是要通信和交互的,例如登錄的時候要請求數(shù)據(jù)庫驗證.skynet是怎么實現(xiàn)兩個服務(wù)的通信和交互的呢?為了突出主干,我簡化了lua服務(wù)代碼,A服務(wù)調(diào)用B服務(wù):
A服務(wù)代碼(bootstrap.lua):
skynet.start(function()
local addr = c.command("LAUNCH", "snlua main")
skynet.name(".main", addr)
for i = 1, 100000 do --為了保證調(diào)用B服務(wù)時,該服務(wù)已啟動
table.insert({}, 123)
end
local kv = skynet.call('.main',"lua","get", 'name')
print('kv is ', kv)
end)
B服務(wù)代碼(main.lua)
local skynet = require "skynet"
local command = {name = 'shonm'}
function command.get(key)
return command[key]
end
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)
local f = command[cmd]
if f then
skynet.ret(skynet.pack(f(...))) --回應(yīng)消息
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)
end)
入口函數(shù)如何執(zhí)行,上篇skynet lua業(yè)務(wù)邏輯的執(zhí)行我們已經(jīng)講過了.這篇主要講解A服務(wù)調(diào)用call(B)時,代碼的執(zhí)行過程.
skynet.call的實現(xiàn)為:
function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...)) --①
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session)) --②
end
① 調(diào)用c接口給B服務(wù)的地址發(fā)送消息,即向?qū)Ψ较㈥犃衟ush消息,并且返回一個session.注意該消息結(jié)構(gòu)里也是這個session字段的,他的作用下面將會講到.有消息是經(jīng)過pack打包的,關(guān)于消息怎么打包再另外講解.
② 是消息的發(fā)送者(服務(wù)A)接下來執(zhí)行的代碼,yield_cal的實現(xiàn)為:
local function yield_call(service, session)
watching_session[session] = service
local succ, msg, sz = coroutine_yield("CALL", session) --①
watching_session[session] = nil
if not succ then
error "call failed"
end
return msg,sz
end
①的意圖很明顯,暫停協(xié)程,傳入的參數(shù)是'CALL',和消息的session.
上篇中提到raw_dispatch_message中suspend(co, coroutine.resume(co, true, msg, sz)),這里resume會返回'CALL',session,并傳給suspend函數(shù),即suspend(co, result, 'CALL', session)(還沒有搞懂協(xié)程之間是怎么傳參數(shù)的童鞋請參考lua協(xié)程相關(guān)參數(shù)與返回值),result為resume返回的協(xié)程狀態(tài),正常為true. suspend的簡化實現(xiàn)為:
function suspend(co, result, command, param, size)
--print(' command is ', command, ' ', string.format('%0x', skynet.self()))
if not result then
...
error(debug.traceback(co,tostring(command)))
end
if command == "CALL" then
session_id_coroutine[param] = co
elseif command == "SLEEP" then
session_id_coroutine[param] = co
sleep_session[co] = param
elseif command == "RETURN" then
local co_session = session_coroutine_id[co]
local co_address = session_coroutine_address[co]
session_response[co] = true
...
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
return suspend(co, coroutine.resume(co, ret))
elseif command == "RESPONSE" then
...
return suspend(co, coroutine.resume(co, response))
elseif command == "EXIT" then
...
elseif command == "QUIT" then
-- service exit
return
elseif command == nil then
-- debug trace
return
else
error("Unknown command : " .. command .. "\n" .. debug.traceback(co))
end
dispatch_wakeup()
dispatch_error_queue()
end
我們可以看到如果command是'CALL'只是將session與co關(guān)聯(lián),然后什么也沒有做,他仍然是被掛起的(coroutine.yield()會一直掛起協(xié)程,直到coroutine.resume恢復(fù)).
可見call另外一個服務(wù)會掛起當(dāng)前服務(wù).那么call何時返回呢?即當(dāng)前服務(wù)(即發(fā)送消息的服務(wù))何時恢復(fù)呢?
話分兩頭,我們再說說B服務(wù).
B服務(wù)的skynet.start函數(shù)中只是調(diào)用skynet.dispatch('lua', func),他只是在skynet中注冊了'lua'協(xié)議的回調(diào)函數(shù),然后就返回了.
服務(wù)捕獲消息并執(zhí)行后也會調(diào)用raw_dispatch_message.通過前幾篇的介紹,我們知道lua層執(zhí)行的第一個消息是skynet.start()調(diào)用timeout來發(fā)送的,當(dāng)執(zhí)行完這個消息之后,代碼繼續(xù)怎么走呢?還是回到那個協(xié)程池函數(shù):
local function co_create(f)
local co = table.remove(coroutine_pool)
if co == nil then
co = coroutine.create(function(...)
f(...)
while true do
f = nil
coroutine_pool[#coroutine_pool+1] = co
f = coroutine_yield "EXIT" --①
f(coroutine_yield())
end
end)
else
coroutine.resume(co, f)
end
return co
end
①的代碼就是這個消息函數(shù)最后的結(jié)果,協(xié)程被掛起.至此,skynet.start()要執(zhí)行的函數(shù)就執(zhí)行完畢了.我們再看后續(xù)的流程.coroutine.yield傳入的參數(shù)是'EXIT'(這個代表了一條消息處理完畢,而不是整個服務(wù)退出),在suspend函數(shù)中有相應(yīng)的處理,但是它并不重新恢復(fù)協(xié)程,什么時候恢復(fù)下面會講到.
為了敘述方便,我們再貼一遍raw_dispatch_message代碼:
local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
session_id_coroutine[session] = nil
suspend(co, coroutine.resume(co, true, msg, sz))
end
else --①
local p = proto[prototype]
if p == nil then
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch
if f then
local ref = watching_service[source]
if ref then
watching_service[source] = ref + 1
else
watching_service[source] = 1
end
local co = co_create(f) --②
session_coroutine_id[co] = session
session_coroutine_address[co] = source
suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) --③
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
我們注意到收發(fā)一般的消息用到的協(xié)議是"lua",這個在skynet.lua中有注冊過.當(dāng)B服務(wù)收到A服務(wù)發(fā)來的消息之后,走分支①.剛才我們說到上一消息結(jié)束后導(dǎo)致協(xié)程被掛起了.
再看②,又調(diào)用了co_create(),回去看看他的實現(xiàn),else分支會重新恢復(fù)協(xié)程,傳遞我們自己定義的函數(shù),即B服務(wù)代碼(main.lua)中的skynet.dispatch函數(shù)注冊了一個新的回調(diào)函數(shù)f,然后又掛起了.在③中才恢復(fù)了協(xié)程,傳入resume的參數(shù),執(zhí)行f.即他將執(zhí)行f(session,source, p.unpack(msg,sz, ...)).
B的服務(wù)解包這個消息的內(nèi)容為get,name.對應(yīng)的有command[get]函數(shù),所以他會執(zhí)行skynet.ret(skynet.pack(f(...))).關(guān)鍵在于skynet.ret(),它用來回復(fù)A服務(wù)消息,他的實現(xiàn)為:
function skynet.ret(msg, sz)
msg = msg or ""
return coroutine_yield("RETURN", msg, sz)
end
于是他又將協(xié)程掛起,傳入消息相關(guān)參數(shù). corutine.resume函數(shù)返回,來到suspend()函數(shù).在command == "RETURN"中,他給A服務(wù)發(fā)送了一條回應(yīng)消息,那么A服務(wù)的地址又是怎么獲取的呢?
原來在B收到A的消息時,執(zhí)行raw_dispatch_message函數(shù)中通過表session_coroutine_id和session_coroutine_address保存有A服務(wù)的地址,以及該消息的session.
回復(fù)的這條消息的類型為skynet.PTYPE_RESPONSE,參數(shù)是B處理A的請求并打包的結(jié)果.
前面講到A調(diào)用call之后會被掛起,當(dāng)收到B回復(fù)的消息時,再次調(diào)用raw_dispatch_message.上面提到消息的類型為skynet.PTYPE_RESPONSE.由于在Call的時候保存了session對應(yīng)的協(xié)程,這里再次通過session就很容易找到協(xié)程了.所以一個消息的session至關(guān)重要.總結(jié)一下,就是A服務(wù)給B服務(wù)發(fā)送消息時會產(chǎn)生一個session,同時消息中也會包含session字段.傳遞給B時又會回應(yīng)給A,這樣A服務(wù)就找到了對應(yīng)的那個session相關(guān)的信息.再次調(diào)用coroutine.resume,于是之前掛起的協(xié)程恢復(fù)運行,也就是yield_call函數(shù)(就是在那里coroutine_yield("CALL", session)掛起的)成功返回了B回應(yīng)的消息,然后解包就可以得到正確的結(jié)果.
另外,skynet還有個send函數(shù),他的實現(xiàn)為:
function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0 , p.pack(...))
end
可以看出,send只是發(fā)送一條消息到給對方就直接返回,不會被掛起.還有就是他提供的session ID為0,而call的session ID為nil.他們的區(qū)別是,在c接口層,如果sessionID為nil,就會重新生成一個新的session ID.這個sessionID是用來關(guān)聯(lián)當(dāng)前協(xié)程的,上面有講到.
另外比較有意思的是,服務(wù)也可以給自己發(fā)消息.對于call,他保存新的sessionID后會掛起,再次收到response類型的消息,根據(jù)sessionID找到協(xié)程,然后恢復(fù).對于send,他的sessionID為0,沒有關(guān)聯(lián)的協(xié)程,所以不會響應(yīng).
好了,這篇比較長,也比較繞,結(jié)合之前寫的,多看幾遍應(yīng)該能夠理解.