skynet call的實現(xiàn)--服務(wù)與服務(wù)的交互

在實現(xiàn)業(yè)務(wù)邏輯的過程中,各個邏輯一般會抽象成一個服務(wù),例如游戲中的登錄服務(wù),訪問數(shù)據(jù)庫服務(wù),創(chuàng)建房間服務(wù)等等.服務(wù)與服務(wù)之間肯定是要通信和交互的,例如登錄的時候要請求數(shù)據(jù)庫驗證.skynet是怎么實現(xiàn)兩個服務(wù)的通信和交互的呢?為了突出主干,我簡化了lua服務(wù)代碼,A服務(wù)調(diào)用B服務(wù):

A服務(wù)代碼(bootstrap.lua):

skynet.start(function()
    local addr = c.command("LAUNCH", "snlua main")
    skynet.name(".main", addr)
    for i = 1, 100000 do              --為了保證調(diào)用B服務(wù)時,該服務(wù)已啟動
        table.insert({}, 123)
    end
    local kv = skynet.call('.main',"lua","get", 'name')
    print('kv is ', kv)
end)

B服務(wù)代碼(main.lua)

local skynet = require "skynet"

local command = {name = 'shonm'}

function command.get(key)
    return command[key]
end

skynet.start(function()
    skynet.dispatch("lua", function(session, address, cmd, ...)
        local f = command[cmd]      
        if f then
            skynet.ret(skynet.pack(f(...))) --回應(yīng)消息
        else
            error(string.format("Unknown command %s", tostring(cmd)))
        end
    end)
end)

入口函數(shù)如何執(zhí)行,上篇skynet lua業(yè)務(wù)邏輯的執(zhí)行我們已經(jīng)講過了.這篇主要講解A服務(wù)調(diào)用call(B)時,代碼的執(zhí)行過程.

skynet.call的實現(xiàn)為:

function skynet.call(addr, typename, ...)
    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))       --①
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))                  --②
end

① 調(diào)用c接口給B服務(wù)的地址發(fā)送消息,即向?qū)Ψ较㈥犃衟ush消息,并且返回一個session.注意該消息結(jié)構(gòu)里也是這個session字段的,他的作用下面將會講到.有消息是經(jīng)過pack打包的,關(guān)于消息怎么打包再另外講解.

② 是消息的發(fā)送者(服務(wù)A)接下來執(zhí)行的代碼,yield_cal的實現(xiàn)為:

local function yield_call(service, session)
    watching_session[session] = service
    local succ, msg, sz = coroutine_yield("CALL", session)    --①
    watching_session[session] = nil
    if not succ then
        error "call failed"
    end
    return msg,sz
end

①的意圖很明顯,暫停協(xié)程,傳入的參數(shù)是'CALL',和消息的session.

上篇中提到raw_dispatch_message中suspend(co, coroutine.resume(co, true, msg, sz)),這里resume會返回'CALL',session,并傳給suspend函數(shù),即suspend(co, result, 'CALL', session)(還沒有搞懂協(xié)程之間是怎么傳參數(shù)的童鞋請參考lua協(xié)程相關(guān)參數(shù)與返回值),result為resume返回的協(xié)程狀態(tài),正常為true. suspend的簡化實現(xiàn)為:

function suspend(co, result, command, param, size)
    --print(' command is ', command, ' ', string.format('%0x', skynet.self()))
    if not result then
        ...
        error(debug.traceback(co,tostring(command)))
    end
    if command == "CALL" then
        session_id_coroutine[param] = co
    elseif command == "SLEEP" then
        session_id_coroutine[param] = co
        sleep_session[co] = param
    elseif command == "RETURN" then
        local co_session = session_coroutine_id[co]
        local co_address = session_coroutine_address[co]

        session_response[co] = true
        ...
                ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
                    return suspend(co, coroutine.resume(co, ret))
    elseif command == "RESPONSE" then
        ...
        return suspend(co, coroutine.resume(co, response))
    elseif command == "EXIT" then
        ...
    elseif command == "QUIT" then
        -- service exit
        return
    elseif command == nil then
        -- debug trace
        return
    else
        error("Unknown command : " .. command .. "\n" .. debug.traceback(co))
    end
    dispatch_wakeup()
    dispatch_error_queue()
end

我們可以看到如果command是'CALL'只是將session與co關(guān)聯(lián),然后什么也沒有做,他仍然是被掛起的(coroutine.yield()會一直掛起協(xié)程,直到coroutine.resume恢復(fù)).

可見call另外一個服務(wù)會掛起當(dāng)前服務(wù).那么call何時返回呢?即當(dāng)前服務(wù)(即發(fā)送消息的服務(wù))何時恢復(fù)呢?

話分兩頭,我們再說說B服務(wù).

B服務(wù)的skynet.start函數(shù)中只是調(diào)用skynet.dispatch('lua', func),他只是在skynet中注冊了'lua'協(xié)議的回調(diào)函數(shù),然后就返回了.

服務(wù)捕獲消息并執(zhí)行后也會調(diào)用raw_dispatch_message.通過前幾篇的介紹,我們知道lua層執(zhí)行的第一個消息是skynet.start()調(diào)用timeout來發(fā)送的,當(dāng)執(zhí)行完這個消息之后,代碼繼續(xù)怎么走呢?還是回到那個協(xié)程池函數(shù):

local function co_create(f)
    local co = table.remove(coroutine_pool)
    if co == nil then
        co = coroutine.create(function(...)
            f(...)
            while true do
                f = nil
                coroutine_pool[#coroutine_pool+1] = co
                f = coroutine_yield "EXIT"      --①
                f(coroutine_yield())
            end
        end)
    else
        coroutine.resume(co, f)
    end
    return co
end

①的代碼就是這個消息函數(shù)最后的結(jié)果,協(xié)程被掛起.至此,skynet.start()要執(zhí)行的函數(shù)就執(zhí)行完畢了.我們再看后續(xù)的流程.coroutine.yield傳入的參數(shù)是'EXIT'(這個代表了一條消息處理完畢,而不是整個服務(wù)退出),在suspend函數(shù)中有相應(yīng)的處理,但是它并不重新恢復(fù)協(xié)程,什么時候恢復(fù)下面會講到.

為了敘述方便,我們再貼一遍raw_dispatch_message代碼:

local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then
        local co = session_id_coroutine[session]
        if co == "BREAK" then
            session_id_coroutine[session] = nil
        elseif co == nil then
            unknown_response(session, source, msg, sz)
        else
            session_id_coroutine[session] = nil
            suspend(co, coroutine.resume(co, true, msg, sz))
        end
    else    --①
        local p = proto[prototype]
        if p == nil then
            if session ~= 0 then
                c.send(source, skynet.PTYPE_ERROR, session, "")
            else
                unknown_request(session, source, msg, sz, prototype)
            end
            return
        end
        local f = p.dispatch
        if f then
            local ref = watching_service[source]
            if ref then
                watching_service[source] = ref + 1
            else
                watching_service[source] = 1
            end
            local co = co_create(f)    --②
            session_coroutine_id[co] = session
            session_coroutine_address[co] = source
            suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))  --③
        else
            unknown_request(session, source, msg, sz, proto[prototype].name)
        end
    end
end

我們注意到收發(fā)一般的消息用到的協(xié)議是"lua",這個在skynet.lua中有注冊過.當(dāng)B服務(wù)收到A服務(wù)發(fā)來的消息之后,走分支①.剛才我們說到上一消息結(jié)束后導(dǎo)致協(xié)程被掛起了.

再看②,又調(diào)用了co_create(),回去看看他的實現(xiàn),else分支會重新恢復(fù)協(xié)程,傳遞我們自己定義的函數(shù),即B服務(wù)代碼(main.lua)中的skynet.dispatch函數(shù)注冊了一個新的回調(diào)函數(shù)f,然后又掛起了.在③中才恢復(fù)了協(xié)程,傳入resume的參數(shù),執(zhí)行f.即他將執(zhí)行f(session,source, p.unpack(msg,sz, ...)).

B的服務(wù)解包這個消息的內(nèi)容為get,name.對應(yīng)的有command[get]函數(shù),所以他會執(zhí)行skynet.ret(skynet.pack(f(...))).關(guān)鍵在于skynet.ret(),它用來回復(fù)A服務(wù)消息,他的實現(xiàn)為:

function skynet.ret(msg, sz)
    msg = msg or ""
    return coroutine_yield("RETURN", msg, sz)
end

于是他又將協(xié)程掛起,傳入消息相關(guān)參數(shù). corutine.resume函數(shù)返回,來到suspend()函數(shù).在command == "RETURN"中,他給A服務(wù)發(fā)送了一條回應(yīng)消息,那么A服務(wù)的地址又是怎么獲取的呢?

原來在B收到A的消息時,執(zhí)行raw_dispatch_message函數(shù)中通過表session_coroutine_id和session_coroutine_address保存有A服務(wù)的地址,以及該消息的session.

回復(fù)的這條消息的類型為skynet.PTYPE_RESPONSE,參數(shù)是B處理A的請求并打包的結(jié)果.

前面講到A調(diào)用call之后會被掛起,當(dāng)收到B回復(fù)的消息時,再次調(diào)用raw_dispatch_message.上面提到消息的類型為skynet.PTYPE_RESPONSE.由于在Call的時候保存了session對應(yīng)的協(xié)程,這里再次通過session就很容易找到協(xié)程了.所以一個消息的session至關(guān)重要.總結(jié)一下,就是A服務(wù)給B服務(wù)發(fā)送消息時會產(chǎn)生一個session,同時消息中也會包含session字段.傳遞給B時又會回應(yīng)給A,這樣A服務(wù)就找到了對應(yīng)的那個session相關(guān)的信息.再次調(diào)用coroutine.resume,于是之前掛起的協(xié)程恢復(fù)運行,也就是yield_call函數(shù)(就是在那里coroutine_yield("CALL", session)掛起的)成功返回了B回應(yīng)的消息,然后解包就可以得到正確的結(jié)果.

另外,skynet還有個send函數(shù),他的實現(xiàn)為:

function skynet.send(addr, typename, ...)
    local p = proto[typename]
    return c.send(addr, p.id, 0 , p.pack(...))
end

可以看出,send只是發(fā)送一條消息到給對方就直接返回,不會被掛起.還有就是他提供的session ID為0,而call的session ID為nil.他們的區(qū)別是,在c接口層,如果sessionID為nil,就會重新生成一個新的session ID.這個sessionID是用來關(guān)聯(lián)當(dāng)前協(xié)程的,上面有講到.

另外比較有意思的是,服務(wù)也可以給自己發(fā)消息.對于call,他保存新的sessionID后會掛起,再次收到response類型的消息,根據(jù)sessionID找到協(xié)程,然后恢復(fù).對于send,他的sessionID為0,沒有關(guān)聯(lián)的協(xié)程,所以不會響應(yīng).

好了,這篇比較長,也比較繞,結(jié)合之前寫的,多看幾遍應(yīng)該能夠理解.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嫁乘,一起剝皮案震驚了整個濱河市叙凡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖论泛,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件骇钦,死亡現(xiàn)場離奇詭異辩撑,居然都是意外死亡界斜,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門合冀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來各薇,“玉大人,你說我怎么就攤上這事君躺∏团校” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵棕叫,是天一觀的道長林螃。 經(jīng)常有香客問我,道長俺泣,這世上最難降的妖魔是什么疗认? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮伏钠,結(jié)果婚禮上横漏,老公的妹妹穿的比我還像新娘。我一直安慰自己熟掂,他們只是感情好绊茧,可當(dāng)我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著打掘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鹏秋。 梳的紋絲不亂的頭發(fā)上尊蚁,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機(jī)與錄音侣夷,去河邊找鬼横朋。 笑死,一個胖子當(dāng)著我的面吹牛百拓,可吹牛的內(nèi)容都是我干的琴锭。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼衙传,長吁一口氣:“原來是場噩夢啊……” “哼决帖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蓖捶,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤地回,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體刻像,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡畅买,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了尖淘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片齿税。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡咧党,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出湃缎,到底是詐尸還是另有隱情,我是刑警寧澤萌京,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布雁歌,位于F島的核電站,受9級特大地震影響知残,放射性物質(zhì)發(fā)生泄漏靠瞎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一求妹、第九天 我趴在偏房一處隱蔽的房頂上張望乏盐。 院中可真熱鬧,春花似錦制恍、人聲如沸父能。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽何吝。三九已至,卻和暖如春鹃唯,著一層夾襖步出監(jiān)牢的瞬間爱榕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工坡慌, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留黔酥,地道東北人。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓洪橘,卻偏偏與公主長得像跪者,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子熄求,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容