寫在前面
這篇文章是分析skynet框架,自己“用”skynet已經(jīng)有一年,項目中是以它為底層框架宅倒,上層使用lua,以消息方式驅(qū)動邏輯,做到隔離保護罢维;
我先說說自己這段時間使用的感受,不會涉及到項目里功能的具體實現(xiàn)。當時我剛?cè)肼毨钇剩o自己列下三周的計劃椎麦,前一周熟悉下lua語言,比較重要和常用的部分现诀,能做到怎么使用好lua以及規(guī)范這樣膘螟;第二周熟悉skynet框架,從main開始备徐,分析它有幾個部分,每個部分做了什么琼锋,然后不懂的地方看下官方的wiki和別人寫的分析,再然后gdb看下一些變量值什么的女仰;第三周是看項目的整體框架及各個模塊組成及作用,以及消息走向迈着,和登陸流程等,當然也是結(jié)合lua具體怎么使用;第四周開始接需求和從其他同事手中接手原來的所有玩法功能岗照,一方面是維護,另一方面是增加或修改需求吧,我是比較贊成老人做新事,新人做老事,這樣有個熟悉的過程抬纸,也減少些不必要的風險和其他成本。
當然我一直做的是新需求就斤,而且比較重要悍募,且系統(tǒng)有點復雜,附加任務是維護老的功能洋机。我實現(xiàn)功能主要是從復雜度和擴展性上面考慮坠宴,一般先把需求文檔弄清楚,約定通信協(xié)議格式绷旗,把臨時數(shù)據(jù)和需要持久化的數(shù)據(jù)定好喜鼓,然后將整個關(guān)鍵點列出來然后逐一分解,而且策劃需求總是變更刁标,把可能會變的功能颠通,跟基礎功能分開,改動的比較少引入新bug可能性會小些膀懈。
后來也斷斷續(xù)續(xù)利用業(yè)余時間研究skynet源碼顿锰,所以準備記錄下來。我這里以“淺析”是因為對skynet框架還沒熟練運用并理解里面的實現(xiàn)原理,這里能看懂源碼硼控,但對云風作者為什么要這么設計等當時的思考不知刘陶,雖然會參考他的博客,但是有些一步步走過來的坑還不知道牢撼。
我會帶著幾個問題去探索這個skynet設計以及運行原理匙隔,研究源碼,并不是把代碼給分析一遍熏版,而是需要思考為什么這么設計纷责,換一種方式是否可行?要帶著why撼短,去思考how和why再膳,這樣才有可能學到,并且可能進行二次開發(fā)等曲横。
以下是幾個問題喂柒,可能并不能全面理解所有實現(xiàn),因為一篇文章是無法分析完的禾嫉,當然可能有后續(xù)的分析灾杰,下面正式開始。
問題:
1)整個skynet框架的組成熙参,以及各模塊的作用艳吠,以及啟動流程;
2)如何加載一個服務尊惰,以及從服務a發(fā)消息至服務b的流程讲竿,處理并返回;
3)當并發(fā)時弄屡,如何保證正確時序题禀,以及如何使用協(xié)程處理消息(同步/異步/超時);
以上三個問題涉及到的東西比較多膀捷,有不明白的可以參考網(wǎng)上資料和源碼迈嘹。
這里不會詳細介紹lua的東西,我會在后續(xù)博客中重點分析下lua的協(xié)程實現(xiàn)源碼以及閉包實現(xiàn)全庸,這是為了填以往博客的坑秀仲。也不會分析游戲中的框架,這里會用身邊的場景來代入問題壶笼。這里的分析只考慮單點神僵,不會考慮分布式節(jié)點。
首先說明的是覆劈,skynet是多線程框架保礼,然后里面對應了一些服務(service)沛励,每個服務對應一個lua 虛擬機,而一個虛擬機中可以跑很多個協(xié)程炮障,但同一時刻就一個協(xié)程目派,每條消息處理由協(xié)程來做,且運行在保護模式下胁赢,lua層實現(xiàn)了協(xié)程池和時序相關(guān)的隊列企蹭,這里可以類比前面c++協(xié)程相關(guān)實現(xiàn),關(guān)于虛擬機的原理以及如何和c/c++交互可以查閱資料智末。
skynet框架那些事兒
整個框架相關(guān)的源碼在skynet-src
目錄下谅摄,下面分析時不會涉及到哪個目錄哪個文件,會從啟動流程開始吹害,有些初始化會跳過不作詳細分析螟凭,但重要的相關(guān)的會說明。
在main
啟動時它呀,會有個lua格式的config
配置文件,里面配置了工作線程個數(shù)棒厘,要加載的lua服務纵穿,以及環(huán)境相關(guān)的參數(shù)信息等,部分代碼如下奢人,其中skynet_start
是關(guān)鍵谓媒。
118 int
119 main(int argc, char *argv[]) {
135 struct skynet_config config;
140 interr = luaL_loadbufferx(L, load_config, strlen(load_config),"=[skynet config]", "t");
152 config.thread = optint("thread",8);
153 config.module_path = optstring("cpath","./cservice/?.so");
155 config.bootstrap = optstring("bootstrap","snlua bootstrap");
156 config.daemon = optstring("daemon", NULL);
157 config.logger = optstring("logger", NULL);
158 config.logservice = optstring("logservice", "logger");
163 skynet_start(&config);
167 return 0;
168 }
246 void
247 skynet_start(struct skynet_config * config) {
262 skynet_mq_init();
264 skynet_timer_init();
265 skynet_socket_init();
268 struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
274 bootstrap(ctx, config->bootstrap);
276 start(config->thread);
以上是部分實現(xiàn)代碼,在skynet_start
過程中何乎,會先設置信號處理函數(shù)句惯,判斷是否作為daemon
進程,初始化harbor
是否作為分布式節(jié)點之一(這時只考慮有一個節(jié)點)支救。
代碼行262初始化全局消息隊列:
211 void
212 skynet_mq_init() {
213 struct global_queue *q = skynet_malloc(sizeof(*q));
214 memset(q,0,sizeof(*q));
215 SPIN_INIT(q);
216 Q=q;
217 }
35 struct global_queue {
36 struct message_queue *head;
37 struct message_queue *tail;
38 struct spinlock lock;
39 };
skynet中的消息隊列是非常重要的抢野,服務之間通信正是通過消息來驅(qū)動的,這里的設計是有一個全局消息隊列各墨,然后每個服務在創(chuàng)建時有一個消息隊列指孤,是發(fā)往到該服務待處理的消息,然后掛載到全局隊列中等工作線程處理贬堵,引用一張圖如下:
關(guān)于把消息入隊和出隊在這里不做詳細分析恃轩。
代碼行264是初始化定時器相關(guān)的數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)如下:
46 struct timer {
47 struct link_list near[TIME_NEAR];
48 struct link_list t[4][TIME_LEVEL];
49 struct spinlock lock;
50 uint32_t time;
51 uint32_t starttime;
52 uint64_t current;
53 uint64_t current_point;
54 };
其中為什么有這一行struct link_list t[4][TIME_LEVEL]
黎做,當時我分析的時候也是一頭霧水叉跛,會在后面說明原因,以及如何跟協(xié)程綁定和處理超時蒸殿,會在后面說明筷厘。
代碼行265是初始化socket相關(guān)的數(shù)據(jù)挽铁,主要結(jié)構(gòu)如下:
100 struct socket_server {
101 int recvctrl_fd;
102 int sendctrl_fd;
103 int checkctrl;
104 poll_fd event_fd;
105 int alloc_id;
106 int event_n;
107 int event_index;
108 struct socket_object_interface soi;
109 struct event ev[MAX_EVENT];
110 struct socket slot[MAX_SOCKET];
111 char buffer[MAX_INFO];
112 uint8_t udpbuffer[MAX_UDP_PACKAGE];
113 fd_set rfds;
114 };
初始化部分實現(xiàn)如下:
328 struct socket_server *
329 socket_server_create() {
331 int fd[2];
332 poll_fd efd = sp_create();
337 if (pipe(fd)) { }
342 if (sp_add(efd, fd[0], NULL)) { }
351 struct socket_server *ss = MALLOC(sizeof(*ss));
352 ss->event_fd = efd;
353 ss->recvctrl_fd = fd[0];
354 ss->sendctrl_fd = fd[1];
371 }
以上這塊設計的挺好的,每個字段作用從命名能知道一二敞掘,具體的流程會在代碼中分析(不知道能寫多少)叽掘。
代碼行268會先創(chuàng)建個log服務,struct skynet_context
對應一個虛擬機:
125 struct skynet_context *
126 skynet_context_new(const char * name, const char *param) {
127 struct skynet_module * mod = skynet_module_query(name);
132 void *inst = skynet_module_instance_create(mod);
135 struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
136 CHECKCALLING_INIT(ctx)
138 ctx->mod = mod;
139 ctx->instance = inst;
140 ctx->ref =2;
141 ctx->cb =NULL;
142 ctx->cb_ud =NULL;
143 ctx->session_id =0;
154 ctx->handle =0;
155 ctx->handle = skynet_handle_register(ctx);
156 struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
161 int r = skynet_module_instance_init(mod, inst, ctx, param);
162 CHECKCALLING_END(ctx)
163 if (r == 0) {
164 struct skynet_context * ret = skynet_context_release(ctx);
165 if (ret) {
166 ctx->init =true;
167 }
168 skynet_globalmq_push(queue);
172 return ret;
173 }else {
174 //more code...
181 }
182 }
以上這段實現(xiàn)非常重要玖雁,這里加載動態(tài)庫的模塊為logger.so
更扁,非snlua.so
,在用snlua
啟動bootstrap
服務時會說明其他部分赫冬。
這里skynet_module_query
根據(jù)name加載動態(tài)庫用以初始化各服務浓镜,沒有加載過的話會dlopen(tmp, RTLD_NOW | RTLD_GLOBAL)
,并設置相關(guān)的接口:
92 static int
93 open_sym(struct skynet_module *mod) {
94 mod->create = get_api(mod,"_create");
95 mod->init = get_api(mod,"_init");
96 mod->release = get_api(mod,"_release");
97 mod->signal = get_api(mod,"_signal");
99 return mod->init == NULL;
100 }
后面的邏輯就是調(diào)用logger_create
劲厌,這里不對logger
相關(guān)的接口說明膛薛,畢竟是次要的,重點會分析snlua
的补鼻;其中ctx->cb =NULL;ctx->cb_ud =NULL;
是消息分發(fā)到lua層的回調(diào)函數(shù)和參數(shù)哄啄,ctx->handle = skynet_handle_register(ctx)
是分配一個唯一的句柄handle與每個服務關(guān)聯(lián),通過handle可找到對應的服務风范,相當于地址咨跌;然后為每個服務創(chuàng)建服務消息隊列,接著logger_init
硼婿,最后把消息隊列通過skynet_globalmq_push
到全局隊列锌半,一方面接收新的消息,另一方面由工作線程依次處理寇漫。
代碼行274通過snlua啟動bootstrap第一個服務(其實logger也算是一個服務):
232 static void
233 bootstrap(struct skynet_context * logger, const char * cmdline) {
238 struct skynet_context *ctx = skynet_context_new(name, args);
244 }
其中cmdline
為snlua bootstrap
刊殉,然后解析后name
為snlua
,args
為bootstrap
州胳;同logger一樣记焊,這里加載的是snlua.so
,其他邏輯一樣陋葡,這時重點分析下snlua_create
和snlua_init
:
180 struct snlua *
181 snlua_create(void) {
182 struct snlua * l = skynet_malloc(sizeof(*l));
183 memset(l,0,sizeof(*l));
186 l->L = lua_newstate(lalloc, l);
187 return l;
188 }
14 struct snlua {
15 lua_State * L;
16 struct skynet_context * ctx;
20 };
其中lua_State
是一個虛擬機的上下文亚亲,每個服務都有一個,做到服務與服務之間隔離腐缤,具體的數(shù)據(jù)組成可以看下lua源碼中的聲明捌归;
147 int
148 snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
152 skynet_callback(ctx, l , launch_cb);
156 skynet_send(ctx,0, handle, PTYPE_TAG_DONTCOPY,0, "bootstrap", 9);
158 }
這里設置服務的回調(diào)函數(shù)launch_cb
,然后發(fā)第一條消息給自己岭粤,由launch_cb
處理惜索,處理完后,重新設置回調(diào)函數(shù)為lua層的剃浇,那么后面路由到該服務的消息就能正確的分發(fā)到對應的回調(diào)函數(shù)了:
134 static int
135 launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , co nst void * msg, size_t sz) {
136 assert(type ==0 && session == 0);
137 struct snlua *l = ud;
138 skynet_callback(context,NULL, NULL);
139 int err = init_cb(l, context, msg, sz);
145 }
75 static int
76 init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
104 const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
106 int r = luaL_loadfile(L,loader);
113 r = lua_pcall(L,1,0,1);
108 static int
109 lcallback(lua_State *L) {
110 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
111 int forward = lua_toboolean(L, 2);
119 if (forward) {
120 skynet_callback(context, gL, forward_cb);
121 }else {
122 skynet_callback(context, gL,_cb);
123 }
125 return 0;
126 }
55 static int
56 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const vo id * msg, size_t sz) {
57 lua_State *L = ud;
58 int trace = 1;
59 int r;
60 int top = lua_gettop(L);
61 if (top == 0) {
62 lua_pushcfunction(L, traceback);
63 lua_rawgetp(L, LUA_REGISTRYINDEX,_cb);
64 }else {
65 assert(top ==2);
66 }
67 lua_pushvalue(L,2);
69 lua_pushinteger(L, type);
70 lua_pushlightuserdata(L, (void *)msg);
71 lua_pushinteger(L,sz);
72 lua_pushinteger(L, session);
73 lua_pushinteger(L, source);
75 r = lua_pcall(L,5, 0 , trace);
97
98 return 0;
99}
init_cb
比較復雜巾兆,設置虛擬機的相關(guān)環(huán)境變量猎物,加載lua文件等,怎么加載的看上面列的幾行關(guān)鍵代碼角塑,具體為什么看下源碼和lua的luaL_loadfile
和c與lua的交互棧蔫磨。
代碼行276是啟動線程:
181 static void
182 start(int thread) {
183 pthread_t pid[thread+3];
185 struct monitor *m = skynet_malloc(sizeof(*m));
186 memset(m,0, sizeof(*m));
187 m->count = thread;
188 m->sleep =0;
189
190 m->m = skynet_malloc(thread *sizeof(struct skynet_monitor *));
191 int i;
192 for (i=0;i<thread;i++) {
193 m->m[i] = skynet_monitor_new();
194 }
204 create_thread(&pid[0], thread_monitor, m);
205 create_thread(&pid[1], thread_timer, m);
206 create_thread(&pid[2], thread_socket, m);
208 static int weight[] = {
209 -1, -1, -1, -1, 0, 0, 0, 0,
210 1, 1, 1, 1, 1, 1, 1, 1,
211 2, 2, 2, 2, 2, 2, 2, 2,
212 3, 3, 3, 3, 3, 3, 3, 3, };
213 struct worker_parm wp[thread];
214 for (i=0;i<thread;i++) {
215 wp[i].m = m;
216 wp[i].id = i;
217 if (i < sizeof(weight)/sizeof(weight[0])) {
218 wp[i].weight= weight[i];
219 }else {
220 wp[i].weight =0;
221 }
222 create_thread(&pid[i+3], thread_worker, &wp[i]);
223 }
224 //pthread_join all thread
230 }
以上是創(chuàng)建多個線程,有thread_monitor
線程圃伶,用于判斷相應服務的消息列表是否過載堤如;創(chuàng)建定時器thread_timer
線程,用于處理超時窒朋;主要實現(xiàn)如下:
172 static void
173 timer_update(struct timer *T) {
174 SPIN_LOCK(T);
176 // try to dispatch timeout 0 (rare condition)
177 timer_execute(T);
179 // shift time first, and then dispatch timer message
180 timer_shift(T);
182 timer_execute(T);
184 SPIN_UNLOCK(T);
185 }
接著再啟動thread_socket
網(wǎng)絡線程搀罢,主要功能在skynet_socket_poll
實現(xiàn)中,處理讀和寫侥猩,重點是讀事件榔至,這部分會放在后面分析。
下面是處理消息的工作線程實現(xiàn)原理:
152 static void *
153 thread_worker(void *p) {
154 struct worker_parm *wp = p;
155 int id = wp->id;
156 int weight = wp->weight;
160 struct message_queue * q = NULL;
161 while (!m->quit) {
162 q = skynet_context_message_dispatch(sm, q, weight);
163 //check q is null
177 }
178 return NULL;
179 }
上面是一個列循環(huán)欺劳,當m->quit
為true
時才退出唧取,當沒有消息時會進行pthread_cond_wait
,然后就是一直skynet_context_message_dispatch
杰标,其中weight
是權(quán)重兵怯,在隊列消息數(shù)有一定數(shù)量的情況下,有些線程會根據(jù)weight
嘗試處理多條消息:
296 structmessage_queue *
297 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
298 //check q is null
299 q = skynet_globalmq_pop();
300 //check q is null
304 uint32_t handle = skynet_mq_handle(q);
305
306 struct skynet_context * ctx = skynet_handle_grab(handle);
307 if (ctx == NULL) {
308 struct drop_t d = { handle };
309 skynet_mq_release(q, drop_message, &d);
310 return skynet_globalmq_pop();
311 }
13 int i,n=1;
314 struct skynet_message msg;
315
316 for (i=0;i<n;i++) {
317 if (skynet_mq_pop(q,&msg)) {
318 skynet_context_release(ctx);
319 return skynet_globalmq_pop();
320 } else if (i==0 && weight >= 0) {
321 n = skynet_mq_length(q);
322 n >>= weight;
323 }
324 int overload = skynet_mq_overload(q);
325 if (overload) {
326 skynet_error(ctx, "May overload, message queue length = %d", overload);
327 }
328
329 skynet_monitor_trigger(sm, msg.source , handle);
330
331 if (ctx->cb == NULL) {
332 skynet_free(msg.data);
333 } else {
334 dispatch_message(ctx, &msg);
335 }
336
337 skynet_monitor_trigger(sm, 0,0);
338 }
40 assert(q == ctx->queue);
341 struct message_queue *nq = skynet_globalmq_pop();
342 if (nq) {
343 // If global mq is not empty , push q back, and return next queue (nq)
344 // Else (global mq is empty or block, don't push q back, and return q again (for next di spatch)
345 skynet_globalmq_push(q);
346 q = nq;
347 }
348 skynet_context_release(ctx);
349
350 return q;
351 }
上面代碼工作線程從全局隊列中pop出一個服務的隊列腔剂,然后根據(jù)隊列的handle
獲取到對應服務的skynet_context
,然后根據(jù)weight
要處理多少條消息驼仪,主要邏輯在dispatch_message
中掸犬,然后pop下一個服務的消息隊列,這里算是公平吧绪爸,然后把原來的隊列push到全局隊列中湾碎,這里也有一些負載統(tǒng)計邏輯。
dispatch_message
里面最終執(zhí)行的是ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)
奠货,就回到了上面說的調(diào)用lua的回調(diào)函數(shù)介褥。
以上是skynet框架的整體實現(xiàn),可能有些細節(jié)或遺漏的地方?jīng)]有在這里分析递惋。
總結(jié)下柔滔,主要引用連接中作者的設計思想:
1)把一個符合規(guī)范的 C 模塊,從動態(tài)庫(so 文件)中啟動起來萍虽,綁定一個永不重復(即使模塊退出)的數(shù)字 id 做為其 handle 睛廊。模塊被稱為服務(Service),服務間可以自由發(fā)送消息杉编。每個模塊可以向 Skynet 框架注冊一個 callback 函數(shù)超全,用來接收發(fā)給它的消息咆霜。每個服務都是被一個個消息包驅(qū)動,當沒有包到來的時候嘶朱,它們就會處于掛起狀態(tài)蛾坯,對 CPU 資源零消耗。如果需要自主邏輯疏遏,則可以利用 Skynet 系統(tǒng)提供的 timeout 消息脉课,定期觸發(fā)。
2)簡單說改览,Skynet 只負責把一個數(shù)據(jù)包從一個服務內(nèi)發(fā)送出去下翎,讓同一進程內(nèi)的另一個服務收到,調(diào)用對應的 callback 函數(shù)處理宝当。它保證视事,模塊的初始化過程,每個獨立的 callback 調(diào)用庆揩,都是相互線程安全的俐东。編寫服務的人不需要特別的為多線程環(huán)境考慮任何問題。專心處理發(fā)送給它的一個個數(shù)據(jù)包订晌。
3)它僅僅是把數(shù)據(jù)包的指針虏辫,以及你聲稱的數(shù)據(jù)包長度(并不一定是真實長度)傳遞出去。由于服務都是在同一個進程內(nèi)锈拨,接收方取得這個指針后砌庄,就可以直接處理其引用的數(shù)據(jù)了。
這個機制可以在必要時娄昆,保證絕對的零拷貝缝彬,幾乎等價于在同一線程內(nèi)做一次函數(shù)調(diào)用的開銷。
但谷浅,這只是 Skynet 提供的性能上的可能性扒俯。它推薦的是一種更可靠,性能略低的方案:它約定撼玄,每個服務發(fā)送出去的包都是復制到用 malloc 分配出來的連續(xù)內(nèi)存违施。接收方在處理完這個數(shù)據(jù)塊(在處理的 callback 函數(shù)調(diào)用完畢)后互纯,會默認調(diào)用 free 函數(shù)釋放掉所占的內(nèi)存。即磕蒲,發(fā)送方申請內(nèi)存留潦,接收方釋放兔院。
4)在 Skynet 啟動時,建立了若干工作線程(數(shù)量可配置)坊萝,它們不斷的從主消息列隊中取出一個次級消息隊列來,再從次級隊列中取去一條消息菩鲜,調(diào)用對應的服務的 callback 函數(shù)進行出來惦积。為了調(diào)用公平,一次僅處理一條消息狮崩,而不是耗凈所有消息(雖然那樣的局部效率更高睦柴,因為減少了查詢服務實體的次數(shù),以及主消息隊列進出的次數(shù))坦敌,這樣可以保證沒有服務會被餓死。
用戶定義的 callback 函數(shù)不必保證線程安全窝趣,因為在 callback 函數(shù)被調(diào)用的過程中训柴,其它工作線程沒有可能獲得這個 callback 函數(shù)所熟服務的次級消息隊列妇拯,也就不可能被并發(fā)了。一旦一個服務的消息隊列暫時為空越锈,它的消息隊列就不再被放回全局消息隊列了甘凭。這樣使大部分不工作的服務不會空轉(zhuǎn) CPU 。
收發(fā)處理消息的那些事兒
這節(jié)會說明如何從在lua層丹弱,從服務發(fā)條消息到另一個服務铲咨,這里為了簡化起見纤勒,考慮的是send調(diào)用隆檀,發(fā)的參數(shù)是string類型,順便印證上個小結(jié)最后總結(jié)引用的設計思想泉坐。
當我們在lua層的業(yè)務邏輯中寫這么一條語句skynet.send(agentAddr, "lua", "CallFunc", "hello world.")
后會發(fā)生什么事情裳仆?
他調(diào)用的是:
416 function skynet.send(addr, typename, ...)
417 local p = proto[typename]
418 return c.send(addr, p.id, 0 , p.pack(...))
419 end
其中"lua"是我們的協(xié)議類型, "CallFunc"和"hello world."表示的是方法名和參數(shù)记某,表示agentAddr對方服務handle构捡,skynet.pack是對方法名和參數(shù)名編碼,是調(diào)用C層的:
599 LUAMOD_API int
600 luaseri_pack(lua_State *L) {
601 struct block temp;
602 temp.next = NULL;
603 struct write_block wb;
604 wb_init(&wb, &temp);
605 pack_from(L,&wb,0);
606 assert(wb.head == &temp);
607 seri(L, &temp, wb.len);
609 wb_free(&wb);
611 return 2;
612 }
534 static void
535 seri(lua_State *L, struct block *b, int len) {
536 uint8_t * buffer = skynet_malloc(len);
537 uint8_t * ptr = buffer;
538 int sz = len;
539 while(len>0) {
540 if (len >= BLOCK_SIZE) {
541 memcpy(ptr, b->buffer, BLOCK_SIZE);
542 ptr += BLOCK_SIZE;
543 len -= BLOCK_SIZE;
544 b = b->next;
545 } else {
546 memcpy(ptr, b->buffer, len);
547 break;
548 }
549 }
550
551 lua_pushlightuserdata(L, buffer);
552 lua_pushinteger(L, sz);
553 }
296 static void
297 pack_one(lua_State *L, struct write_block *b, int index, int depth) {
320 case LUA_TSTRING: {
321 size_t sz = 0;
322 const char *str = lua_tolstring(L,index,&sz);
323 wb_string(b, str, (int)sz);
324 break;
325 }
打包的時候滑凉,會進行一次拷貝喘帚,最后返回一個C指針和數(shù)據(jù)長度,接著調(diào)用C層的send
函數(shù)若未,即lsend
倾鲫,lsend
調(diào)用send_message(L, 0, 2)
:
232 static int
233 send_message(lua_State *L, int source, int idx_type) {
234 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
252 int mtype = lua_type(L,idx_type+2);
253 switch (mtype) {
267 case LUA_TLIGHTUSERDATA: {
268 void * msg = lua_touserdata(L,idx_type+2);
269 int size = luaL_checkinteger(L,idx_type+3);
270 if (dest_string) {
271 session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
272 } else {
273 session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
274 }
275 break;
276 }
285 lua_pushinteger(L,session);
286 return 1;
287 }
其中type或上PTYPE_TAG_DONTCOPY
表示不要拷貝數(shù)據(jù)乌昔,因為msg是C層變量指針,會在適當?shù)臅r候由C釋放磕道;最后會返回session
用于可能后續(xù)的響應消息回來找到上下文(對于call調(diào)用);
699 int
700 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, i nt session, void * data, size_t sz) {
708 _filter_args(context, type, &session, (void **)&data, &sz);
710 if (source == 0) {
711 source = context->handle;
712 }
713
714 if (destination == 0) {
715 return session;
716 }
717 if (skynet_harbor_message_isremote(destination)) {
718 //跟另外節(jié)點通訊
724 } else {
725 struct skynet_message smsg;
726 smsg.source = source;
727 smsg.session = session;
728 smsg.data = data;
729 smsg.sz = sz;
730
731 if (skynet_context_push(destination, &smsg)) {
732 skynet_free(data);
733 return -1;
734 }
735 }
736 return session;
737 }
229 int
230 skynet_context_push(uint32_t handle, struct skynet_message *message) {
231 struct skynet_context * ctx = skynet_handle_grab(handle);
232 if (ctx == NULL) {
233 return -1;
234 }
235 skynet_mq_push(ctx->queue, message);
236 skynet_context_release(ctx);
237
238 return 0;
239 }
上面代碼是把消息壓入對方服務的消息隊列悼做,而_filter_args
主要是(可能)分配一個session
值撵割,對長度進行編碼,高八位存放type信息羹与。
另外每個服務分配的session
都是大于0的庶灿,每條消息唯一session
,且當int變?yōu)樨摂?shù)時重置session
為1往踢,不大可能造成兩條不同的消息而session
相同峻呕。
當對方服務被工作線程處理時,會進行消息的回調(diào)瘦癌,就回到了上面的實現(xiàn)。
工作中整理的問題:
1)如果處理某個服務的消息造成死循環(huán)或處理時間過久热押,那么可能導致底層一些服務隊列得不到調(diào)度和處理斤寇,可能造成消息隊列過大,占用更多內(nèi)存牙寞,從而導致“雪崩”問題莫秆,后面處理的消息都可能是超時的。比如定時任務運行耗時過多;業(yè)務需求比如從一些道具列表中隨機幾個不同的道具伟端,直到選到幾個不同的才退出循環(huán),沒有選擇適合的隨機算法党巾;有的業(yè)務邏輯不正確,導致使用協(xié)程數(shù)量過多等待驳规,無法切換回來并釋放署海。
2)協(xié)程的調(diào)度,切換回來后出現(xiàn)各種各樣的問題捻勉,比如某個對象已經(jīng)釋放刀森,雖然通過閉包引用著某個對象(地址),但是obj->isReleased()是true埠偿,所以處理了各種錯誤的數(shù)據(jù)榜晦。
3)用不到的對象本應該釋放,不小心相互引用著對方浊服,導致lua gc時不能夠回收相應資源咐刨,造成內(nèi)存泄漏唧领。
4)不合理的使用call孽拷,導致在某些關(guān)鍵路徑上任性的切換協(xié)程半抱,而可能導致時序問題,因為使用協(xié)程后炼幔,有些是不確定的史简,即切出后什么時候切回來,如果玩家在登陸的時候請求其他服務數(shù)據(jù)導致切出協(xié)程跺讯,那什么時候切回來呢。
5)還有跟順序有關(guān)的消息處理局荚,引用云風作者舉的例子“如果 B 是一個 lua 服務愈污,當 A 向 B 發(fā)送了兩條消息 x 和 y 。skynet 一定保證 x 先被 B 中的 lua 虛擬機收到茫陆,并為 x 消息生成了一個 coroutine X 擎析,并運行這個 coroutine 。然后才會收到消息 y 桨醋,重新生成一個新的 coroutine Y 现斋,接下來運行∷材冢”
如果處理X的時候限书,協(xié)程因為某些原因掛起,那么處理Y的時候可能會改變一些狀態(tài)等能真。Skynet中Lua服務的消息處理
6)還有使用不適合的算法扰柠,比如時間復雜度的O(n)或更差實現(xiàn)等。
淺析skynet底層框架中篇主要分析skynet的定時器和網(wǎng)絡實現(xiàn)部分蝙泼,再加個消息隊列劝枣,和本篇的第三個小問題倡缠。
下面是參考資料:
云風博客
skynet源碼
Skynet 設計綜述
skynet服務的過載保護