skynet內(nèi)部服務(wù)都是由一個(gè)一個(gè)的消息所驅(qū)動(dòng)驹暑,每個(gè)服務(wù)的上下文結(jié)構(gòu)體struct skynet_context
有個(gè)字段struct message_queue *queue
描述其消息隊(duì)列术唬,所有服務(wù)的消息隊(duì)列掛在全局消息對(duì)列的列表struct global_queue *Q
中
skynet在啟動(dòng)時(shí)會(huì)啟動(dòng)config->thread
個(gè)worker
線程來(lái)處理所有服務(wù)的消息苫幢,worker
線程的入口函數(shù)為static void *thread_worker(void *p)
友多,其處理邏輯如下:
- 如果當(dāng)前要處理的消息隊(duì)列為空啥么,則從全局消息隊(duì)列的列表中取下一個(gè)消息隊(duì)列
- 對(duì)消息隊(duì)列中的每個(gè)消息硼砰,調(diào)用該消息隊(duì)列所屬服務(wù)的回調(diào)函數(shù)糠赦,每次至少處理一個(gè)消息碎罚,之多處理消息隊(duì)列長(zhǎng)度右移
weight
個(gè)消息磅废,其中weight
是事先配置好的
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
比如配置啟動(dòng)n
個(gè)worker
線程,第i
個(gè)線程的weight
為:當(dāng)i小于weight
數(shù)組長(zhǎng)度時(shí)荆烈,線程weight
為weight[i-1]
拯勉,否則為0
每個(gè)服務(wù)的消息隊(duì)列都會(huì)被worker
進(jìn)程公平的進(jìn)行處理,但是每個(gè)線程一次處理的消息個(gè)數(shù)由工作線程配置的權(quán)重決定憔购。
下面以snlua
為例理解消息回調(diào)處理宫峦,在dispatch_message
函數(shù)中,通過(guò)調(diào)用服務(wù)的回調(diào)函數(shù)來(lái)讓服務(wù)處理其收到的消息:
ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)
snlua
是執(zhí)行lua
服務(wù)的沙盒環(huán)境玫鸟,啟動(dòng)一個(gè)lua
服務(wù)之后导绷,在lua
代碼中會(huì)設(shè)置回調(diào)函數(shù),通常在skynet.lua
文件中的skynet.start
中設(shè)置c.callback(skynet.dispatch_message)
屎飘,c.callback
調(diào)用的是:
83 static int
84 _callback(lua_State *L) {
85 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
86 int forward = lua_toboolean(L, 2);
87 luaL_checktype(L,1,LUA_TFUNCTION);
88 lua_settop(L,1);
89 lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
90
91 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
92 lua_State *gL = lua_tothread(L,-1);
93
94 if (forward) {
95 skynet_callback(context, gL, forward_cb);
96 } else {
97 skynet_callback(context, gL, _cb);
98 }
99
100 return 0;
101 }
- 85行獲取服務(wù)的上下文結(jié)構(gòu)妥曲,此upvalue是在啟動(dòng)次服務(wù)的時(shí)候設(shè)置的
- 89行在注冊(cè)表中設(shè)置
_cb=>skynet.dispatch_message
- 91-92行獲取服務(wù)的LUA狀態(tài)機(jī)結(jié)構(gòu)
- 95或者97行設(shè)置服務(wù)上下結(jié)構(gòu)體中的回調(diào)函數(shù)為
_cb
,回調(diào)函數(shù)私有數(shù)據(jù)為L(zhǎng)UA狀態(tài)機(jī)gL
下面來(lái)分析回調(diào)函數(shù)_cb
钦购,任何LUA沙盒服務(wù)收到的消息的回調(diào)函數(shù)入口都是_cb
30 static int
31 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
32 lua_State *L = ud;
33 int trace = 1;
34 int r;
35 int top = lua_gettop(L);
36 if (top == 0) {
37 lua_pushcfunction(L, traceback);
38 lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
39 } else {
40 assert(top == 2);
41 }
42 lua_pushvalue(L,2);
43
44 lua_pushinteger(L, type);
45 lua_pushlightuserdata(L, (void *)msg);
46 lua_pushinteger(L,sz);
47 lua_pushinteger(L, session);
48 lua_pushinteger(L, source);
49
50 r = lua_pcall(L, 5, 0 , trace);
51
52 if (r == LUA_OK) {
53 return 0;
54 }
55 const char * self = skynet_command(context, "REG", NULL);
56 switch (r) {
57 case LUA_ERRRUN:
58 skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
59 break;
60 case LUA_ERRMEM:
61 skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
62 break;
63 case LUA_ERRERR:
64 skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
65 break;
66 case LUA_ERRGCMM:
67 skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
68 break;
69 };
70
71 lua_pop(L,1);
72
73 return 0;
74 }
- 32-38行在LUA狀態(tài)機(jī)的棧中設(shè)置即將執(zhí)行的LUA函數(shù)及參數(shù)檐盟,依次是
traceback
,skynet.dispatch_message
押桃,type
葵萎,msg
,sz
怨规,session
陌宿,source
- 50行在保護(hù)模式執(zhí)行
skynet.dispatch_message
函數(shù),在此函數(shù)進(jìn)行真正消息處理
以上粗略的分析了skynet框架是如何調(diào)度每個(gè)服務(wù)的消息隊(duì)列波丰,以及如何通過(guò)回調(diào)函數(shù)來(lái)對(duì)服務(wù)的消息進(jìn)行處理