淺析skynet底層框架上篇

寫在前面

這篇文章是分析skynet框架,自己“用”skynet已經(jīng)有一年,項目中是以它為底層框架宅倒,上層使用lua,以消息方式驅(qū)動邏輯,做到隔離保護罢维;

我先說說自己這段時間使用的感受,不會涉及到項目里功能的具體實現(xiàn)。當時我剛?cè)肼毨钇剩o自己列下三周的計劃椎麦,前一周熟悉下lua語言,比較重要和常用的部分现诀,能做到怎么使用好lua以及規(guī)范這樣膘螟;第二周熟悉skynet框架,從main開始备徐,分析它有幾個部分,每個部分做了什么琼锋,然后不懂的地方看下官方的wiki和別人寫的分析,再然后gdb看下一些變量值什么的女仰;第三周是看項目的整體框架及各個模塊組成及作用,以及消息走向迈着,和登陸流程等,當然也是結(jié)合lua具體怎么使用;第四周開始接需求和從其他同事手中接手原來的所有玩法功能岗照,一方面是維護,另一方面是增加或修改需求吧,我是比較贊成老人做新事,新人做老事,這樣有個熟悉的過程抬纸,也減少些不必要的風險和其他成本。

當然我一直做的是新需求就斤,而且比較重要悍募,且系統(tǒng)有點復雜,附加任務是維護老的功能洋机。我實現(xiàn)功能主要是從復雜度和擴展性上面考慮坠宴,一般先把需求文檔弄清楚,約定通信協(xié)議格式绷旗,把臨時數(shù)據(jù)和需要持久化的數(shù)據(jù)定好喜鼓,然后將整個關(guān)鍵點列出來然后逐一分解,而且策劃需求總是變更刁标,把可能會變的功能颠通,跟基礎功能分開,改動的比較少引入新bug可能性會小些膀懈。

后來也斷斷續(xù)續(xù)利用業(yè)余時間研究skynet源碼顿锰,所以準備記錄下來。我這里以“淺析”是因為對skynet框架還沒熟練運用并理解里面的實現(xiàn)原理,這里能看懂源碼硼控,但對云風作者為什么要這么設計等當時的思考不知刘陶,雖然會參考他的博客,但是有些一步步走過來的坑還不知道牢撼。

我會帶著幾個問題去探索這個skynet設計以及運行原理匙隔,研究源碼,并不是把代碼給分析一遍熏版,而是需要思考為什么這么設計纷责,換一種方式是否可行?要帶著why撼短,去思考how和why再膳,這樣才有可能學到,并且可能進行二次開發(fā)等曲横。

以下是幾個問題喂柒,可能并不能全面理解所有實現(xiàn),因為一篇文章是無法分析完的禾嫉,當然可能有后續(xù)的分析灾杰,下面正式開始。

問題:
1)整個skynet框架的組成熙参,以及各模塊的作用艳吠,以及啟動流程;
2)如何加載一個服務尊惰,以及從服務a發(fā)消息至服務b的流程讲竿,處理并返回;
3)當并發(fā)時弄屡,如何保證正確時序题禀,以及如何使用協(xié)程處理消息(同步/異步/超時);

以上三個問題涉及到的東西比較多膀捷,有不明白的可以參考網(wǎng)上資料和源碼迈嘹。

這里不會詳細介紹lua的東西,我會在后續(xù)博客中重點分析下lua的協(xié)程實現(xiàn)源碼以及閉包實現(xiàn)全庸,這是為了填以往博客的坑秀仲。也不會分析游戲中的框架,這里會用身邊的場景來代入問題壶笼。這里的分析只考慮單點神僵,不會考慮分布式節(jié)點。

首先說明的是覆劈,skynet是多線程框架保礼,然后里面對應了一些服務(service)沛励,每個服務對應一個lua 虛擬機,而一個虛擬機中可以跑很多個協(xié)程炮障,但同一時刻就一個協(xié)程目派,每條消息處理由協(xié)程來做,且運行在保護模式下胁赢,lua層實現(xiàn)了協(xié)程池和時序相關(guān)的隊列企蹭,這里可以類比前面c++協(xié)程相關(guān)實現(xiàn),關(guān)于虛擬機的原理以及如何和c/c++交互可以查閱資料智末。

skynet框架那些事兒

整個框架相關(guān)的源碼在skynet-src目錄下谅摄,下面分析時不會涉及到哪個目錄哪個文件,會從啟動流程開始吹害,有些初始化會跳過不作詳細分析螟凭,但重要的相關(guān)的會說明。

main啟動時它呀,會有個lua格式的config配置文件,里面配置了工作線程個數(shù)棒厘,要加載的lua服務纵穿,以及環(huán)境相關(guān)的參數(shù)信息等,部分代碼如下奢人,其中skynet_start是關(guān)鍵谓媒。

118 int
119 main(int argc, char *argv[]) {
135     struct skynet_config config;
140     interr =  luaL_loadbufferx(L, load_config, strlen(load_config),"=[skynet config]", "t");
152     config.thread =  optint("thread",8);
153     config.module_path = optstring("cpath","./cservice/?.so");
155     config.bootstrap = optstring("bootstrap","snlua bootstrap");
156     config.daemon = optstring("daemon", NULL);
157     config.logger = optstring("logger", NULL);
158     config.logservice = optstring("logservice", "logger");
163     skynet_start(&config);
167     return 0;
168 }
246 void   
247 skynet_start(struct skynet_config * config) {
262     skynet_mq_init();
264     skynet_timer_init();
265     skynet_socket_init();
268     struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
274     bootstrap(ctx, config->bootstrap);
276     start(config->thread);

以上是部分實現(xiàn)代碼,在skynet_start過程中何乎,會先設置信號處理函數(shù)句惯,判斷是否作為daemon進程,初始化harbor是否作為分布式節(jié)點之一(這時只考慮有一個節(jié)點)支救。

代碼行262初始化全局消息隊列:

211 void 
212 skynet_mq_init() {
213     struct global_queue *q = skynet_malloc(sizeof(*q));
214     memset(q,0,sizeof(*q));
215     SPIN_INIT(q);
216     Q=q;
217 }

 35 struct global_queue {
 36    struct message_queue *head;
 37    struct message_queue *tail;
 38    struct spinlock lock;
 39 };

skynet中的消息隊列是非常重要的抢野,服務之間通信正是通過消息來驅(qū)動的,這里的設計是有一個全局消息隊列各墨,然后每個服務在創(chuàng)建時有一個消息隊列指孤,是發(fā)往到該服務待處理的消息,然后掛載到全局隊列中等工作線程處理贬堵,引用一張圖如下:


框架

關(guān)于把消息入隊和出隊在這里不做詳細分析恃轩。

代碼行264是初始化定時器相關(guān)的數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)如下:

 46 struct timer {
 47    struct link_list near[TIME_NEAR];
 48    struct link_list t[4][TIME_LEVEL];
 49    struct spinlock lock;
 50    uint32_t time;
 51    uint32_t starttime;
 52    uint64_t current;
 53    uint64_t current_point;
 54 };

其中為什么有這一行struct link_list t[4][TIME_LEVEL]黎做,當時我分析的時候也是一頭霧水叉跛,會在后面說明原因,以及如何跟協(xié)程綁定和處理超時蒸殿,會在后面說明筷厘。

代碼行265是初始化socket相關(guān)的數(shù)據(jù)挽铁,主要結(jié)構(gòu)如下:

 100 struct socket_server {
 101    int recvctrl_fd;
 102    int sendctrl_fd;
 103    int checkctrl;
 104    poll_fd event_fd;
 105    int alloc_id;
 106    int event_n;
 107    int event_index;
 108    struct socket_object_interface soi;
 109    struct event ev[MAX_EVENT];
 110    struct socket slot[MAX_SOCKET];
 111    char buffer[MAX_INFO];
 112    uint8_t udpbuffer[MAX_UDP_PACKAGE];
 113    fd_set rfds;
 114 };

初始化部分實現(xiàn)如下:

 328 struct socket_server *
 329 socket_server_create() {
 331    int fd[2];
 332    poll_fd efd = sp_create();
 337    if (pipe(fd)) { }
 342    if (sp_add(efd, fd[0], NULL)) { }
 351    struct socket_server *ss = MALLOC(sizeof(*ss));
 352    ss->event_fd = efd;
 353    ss->recvctrl_fd = fd[0];
 354    ss->sendctrl_fd = fd[1];
 371 }

以上這塊設計的挺好的,每個字段作用從命名能知道一二敞掘,具體的流程會在代碼中分析(不知道能寫多少)叽掘。

代碼行268會先創(chuàng)建個log服務,struct skynet_context對應一個虛擬機:

125 struct skynet_context *
126 skynet_context_new(const char * name, const char *param) {
127     struct skynet_module * mod = skynet_module_query(name);
132     void *inst = skynet_module_instance_create(mod);
135     struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
136     CHECKCALLING_INIT(ctx)
138     ctx->mod = mod;
139     ctx->instance = inst;
140     ctx->ref =2;
141     ctx->cb =NULL;
142     ctx->cb_ud =NULL;
143     ctx->session_id =0;
154     ctx->handle =0;
155     ctx->handle = skynet_handle_register(ctx);
156     struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
161     int r = skynet_module_instance_init(mod, inst, ctx, param);
162     CHECKCALLING_END(ctx)
163     if (r == 0) {
164         struct skynet_context * ret = skynet_context_release(ctx);
165         if (ret) {
166             ctx->init =true;
167         }
168         skynet_globalmq_push(queue);
172         return ret;
173     }else {
174         //more code...
181     }
182 }

以上這段實現(xiàn)非常重要玖雁,這里加載動態(tài)庫的模塊為logger.so更扁,非snlua.so,在用snlua啟動bootstrap服務時會說明其他部分赫冬。

這里skynet_module_query根據(jù)name加載動態(tài)庫用以初始化各服務浓镜,沒有加載過的話會dlopen(tmp, RTLD_NOW | RTLD_GLOBAL),并設置相關(guān)的接口:

 92 static int
 93 open_sym(struct skynet_module *mod) {
 94    mod->create = get_api(mod,"_create");
 95    mod->init = get_api(mod,"_init");
 96    mod->release = get_api(mod,"_release");
 97    mod->signal = get_api(mod,"_signal");
 99    return mod->init == NULL;
100 }

后面的邏輯就是調(diào)用logger_create劲厌,這里不對logger相關(guān)的接口說明膛薛,畢竟是次要的,重點會分析snlua的补鼻;其中ctx->cb =NULL;ctx->cb_ud =NULL;是消息分發(fā)到lua層的回調(diào)函數(shù)和參數(shù)哄啄,ctx->handle = skynet_handle_register(ctx)是分配一個唯一的句柄handle與每個服務關(guān)聯(lián),通過handle可找到對應的服務风范,相當于地址咨跌;然后為每個服務創(chuàng)建服務消息隊列,接著logger_init硼婿,最后把消息隊列通過skynet_globalmq_push到全局隊列锌半,一方面接收新的消息,另一方面由工作線程依次處理寇漫。

代碼行274通過snlua啟動bootstrap第一個服務(其實logger也算是一個服務):

232 static void
233 bootstrap(struct skynet_context * logger, const char * cmdline) {
238     struct skynet_context *ctx = skynet_context_new(name, args);
244 }

其中cmdlinesnlua bootstrap刊殉,然后解析后namesnluaargsbootstrap州胳;同logger一樣记焊,這里加載的是snlua.so,其他邏輯一樣陋葡,這時重點分析下snlua_createsnlua_init

180 struct snlua *
181 snlua_create(void) {
182     struct snlua * l = skynet_malloc(sizeof(*l));
183     memset(l,0,sizeof(*l));
186     l->L = lua_newstate(lalloc, l);
187     return l;
188 }

 14 struct snlua {
 15    lua_State * L;
 16    struct skynet_context * ctx;
 20 };

其中lua_State是一個虛擬機的上下文亚亲,每個服務都有一個,做到服務與服務之間隔離腐缤,具體的數(shù)據(jù)組成可以看下lua源碼中的聲明捌归;

147 int
148 snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
152     skynet_callback(ctx, l , launch_cb);
156     skynet_send(ctx,0, handle, PTYPE_TAG_DONTCOPY,0, "bootstrap", 9);
158 }

這里設置服務的回調(diào)函數(shù)launch_cb,然后發(fā)第一條消息給自己岭粤,由launch_cb處理惜索,處理完后,重新設置回調(diào)函數(shù)為lua層的剃浇,那么后面路由到該服務的消息就能正確的分發(fā)到對應的回調(diào)函數(shù)了:

134 static int
135 launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , co    nst void * msg, size_t sz) {
136     assert(type ==0 && session == 0);
137     struct snlua *l = ud;
138     skynet_callback(context,NULL, NULL);
139     int err = init_cb(l, context, msg, sz);
145 }
 75 static int
 76 init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
104     const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
106     int r = luaL_loadfile(L,loader);
113     r = lua_pcall(L,1,0,1);
108 static int
109 lcallback(lua_State *L) {
110     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
111     int forward = lua_toboolean(L, 2);
119     if (forward) {
120         skynet_callback(context, gL, forward_cb);
121     }else {
122         skynet_callback(context, gL,_cb);
123     }
125     return 0;
126 }

 55 static int
 56 _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const vo    id * msg, size_t sz) {
 57    lua_State *L = ud;
 58    int trace = 1;
 59    int r;
 60    int top = lua_gettop(L);
 61    if (top == 0) {
 62        lua_pushcfunction(L, traceback);
 63        lua_rawgetp(L, LUA_REGISTRYINDEX,_cb);
 64    }else {
 65        assert(top ==2);
 66    }
 67    lua_pushvalue(L,2);
 69    lua_pushinteger(L, type);
 70    lua_pushlightuserdata(L, (void *)msg);
 71    lua_pushinteger(L,sz);
 72    lua_pushinteger(L, session);
 73    lua_pushinteger(L, source);
 75    r = lua_pcall(L,5, 0 , trace);
 97 
 98    return 0;
 99}

init_cb比較復雜巾兆,設置虛擬機的相關(guān)環(huán)境變量猎物,加載lua文件等,怎么加載的看上面列的幾行關(guān)鍵代碼角塑,具體為什么看下源碼和lua的luaL_loadfile和c與lua的交互棧蔫磨。

代碼行276是啟動線程:

181 static void
182 start(int thread) {
183     pthread_t pid[thread+3];
185     struct monitor *m = skynet_malloc(sizeof(*m));
186     memset(m,0, sizeof(*m));
187     m->count = thread;
188     m->sleep =0;
189 
190     m->m = skynet_malloc(thread *sizeof(struct skynet_monitor *));
191     int i;
192     for (i=0;i<thread;i++) {
193         m->m[i] = skynet_monitor_new();
194     }

204     create_thread(&pid[0], thread_monitor, m);
205     create_thread(&pid[1], thread_timer, m);
206     create_thread(&pid[2], thread_socket, m);
208     static int weight[] = {
209         -1, -1, -1, -1, 0, 0, 0, 0,
210         1, 1, 1, 1, 1, 1, 1, 1,
211         2, 2, 2, 2, 2, 2, 2, 2,
212         3, 3, 3, 3, 3, 3, 3, 3, };
213     struct worker_parm wp[thread];
214     for (i=0;i<thread;i++) {
215         wp[i].m = m;
216         wp[i].id = i;
217         if (i < sizeof(weight)/sizeof(weight[0])) {
218             wp[i].weight= weight[i];
219         }else {
220             wp[i].weight =0;
221         }
222         create_thread(&pid[i+3], thread_worker, &wp[i]);
223     }
224     //pthread_join all thread
230 }

以上是創(chuàng)建多個線程,有thread_monitor線程圃伶,用于判斷相應服務的消息列表是否過載堤如;創(chuàng)建定時器thread_timer線程,用于處理超時窒朋;主要實現(xiàn)如下:

172 static void
173 timer_update(struct timer *T) {
174     SPIN_LOCK(T);
176     // try to dispatch timeout 0 (rare condition)
177     timer_execute(T);
179     // shift time first, and then dispatch timer message
180     timer_shift(T);
182     timer_execute(T);
184     SPIN_UNLOCK(T);
185 }

接著再啟動thread_socket網(wǎng)絡線程搀罢,主要功能在skynet_socket_poll實現(xiàn)中,處理讀和寫侥猩,重點是讀事件榔至,這部分會放在后面分析。

下面是處理消息的工作線程實現(xiàn)原理:

152 static void *
153 thread_worker(void *p) {
154     struct worker_parm *wp = p;
155     int id = wp->id;
156     int weight = wp->weight;
160     struct message_queue * q = NULL;
161     while (!m->quit) {
162         q = skynet_context_message_dispatch(sm, q, weight);
163         //check q is null
177     }
178     return NULL;
179 }

上面是一個列循環(huán)欺劳,當m->quittrue時才退出唧取,當沒有消息時會進行pthread_cond_wait,然后就是一直skynet_context_message_dispatch杰标,其中weight是權(quán)重兵怯,在隊列消息數(shù)有一定數(shù)量的情況下,有些線程會根據(jù)weight嘗試處理多條消息:

296 structmessage_queue * 
297 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight)     {       
298     //check q is null
299     q = skynet_globalmq_pop();
300     //check q is null    
304     uint32_t handle = skynet_mq_handle(q);
305                
306     struct skynet_context * ctx = skynet_handle_grab(handle);
307     if (ctx == NULL) {
308         struct drop_t d = { handle };
309         skynet_mq_release(q, drop_message, &d);
310         return skynet_globalmq_pop();
311     }
13      int i,n=1;
314     struct skynet_message msg;
315 
316     for (i=0;i<n;i++) { 
317         if (skynet_mq_pop(q,&msg)) {
318             skynet_context_release(ctx);
319             return skynet_globalmq_pop();
320         } else if (i==0 && weight >= 0) {
321             n = skynet_mq_length(q);
322             n >>= weight;
323         }
324         int overload = skynet_mq_overload(q);
325         if (overload) {
326             skynet_error(ctx, "May overload, message queue length = %d", overload);
327         }
328 
329         skynet_monitor_trigger(sm, msg.source , handle);
330 
331         if (ctx->cb == NULL) {
332             skynet_free(msg.data);
333         } else {
334             dispatch_message(ctx, &msg);
335         }
336 
337         skynet_monitor_trigger(sm, 0,0);
338     }
40      assert(q == ctx->queue);
341     struct message_queue *nq = skynet_globalmq_pop();
342     if (nq) {
343         // If global mq is not empty , push q back, and return next queue (nq)
344         // Else (global mq is empty or block, don't push q back, and return q again (for next di    spatch)
345         skynet_globalmq_push(q);
346         q = nq;
347     }
348     skynet_context_release(ctx);
349 
350     return q;
351 }

上面代碼工作線程從全局隊列中pop出一個服務的隊列腔剂,然后根據(jù)隊列的handle獲取到對應服務的skynet_context,然后根據(jù)weight要處理多少條消息驼仪,主要邏輯在dispatch_message中掸犬,然后pop下一個服務的消息隊列,這里算是公平吧绪爸,然后把原來的隊列push到全局隊列中湾碎,這里也有一些負載統(tǒng)計邏輯。

dispatch_message里面最終執(zhí)行的是ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)奠货,就回到了上面說的調(diào)用lua的回調(diào)函數(shù)介褥。
以上是skynet框架的整體實現(xiàn),可能有些細節(jié)或遺漏的地方?jīng)]有在這里分析递惋。

總結(jié)下柔滔,主要引用連接中作者的設計思想:

1)把一個符合規(guī)范的 C 模塊,從動態(tài)庫(so 文件)中啟動起來萍虽,綁定一個永不重復(即使模塊退出)的數(shù)字 id 做為其 handle 睛廊。模塊被稱為服務(Service),服務間可以自由發(fā)送消息杉编。每個模塊可以向 Skynet 框架注冊一個 callback 函數(shù)超全,用來接收發(fā)給它的消息咆霜。每個服務都是被一個個消息包驅(qū)動,當沒有包到來的時候嘶朱,它們就會處于掛起狀態(tài)蛾坯,對 CPU 資源零消耗。如果需要自主邏輯疏遏,則可以利用 Skynet 系統(tǒng)提供的 timeout 消息脉课,定期觸發(fā)。

2)簡單說改览,Skynet 只負責把一個數(shù)據(jù)包從一個服務內(nèi)發(fā)送出去下翎,讓同一進程內(nèi)的另一個服務收到,調(diào)用對應的 callback 函數(shù)處理宝当。它保證视事,模塊的初始化過程,每個獨立的 callback 調(diào)用庆揩,都是相互線程安全的俐东。編寫服務的人不需要特別的為多線程環(huán)境考慮任何問題。專心處理發(fā)送給它的一個個數(shù)據(jù)包订晌。

3)它僅僅是把數(shù)據(jù)包的指針虏辫,以及你聲稱的數(shù)據(jù)包長度(并不一定是真實長度)傳遞出去。由于服務都是在同一個進程內(nèi)锈拨,接收方取得這個指針后砌庄,就可以直接處理其引用的數(shù)據(jù)了。
這個機制可以在必要時娄昆,保證絕對的零拷貝缝彬,幾乎等價于在同一線程內(nèi)做一次函數(shù)調(diào)用的開銷。
但谷浅,這只是 Skynet 提供的性能上的可能性扒俯。它推薦的是一種更可靠,性能略低的方案:它約定撼玄,每個服務發(fā)送出去的包都是復制到用 malloc 分配出來的連續(xù)內(nèi)存违施。接收方在處理完這個數(shù)據(jù)塊(在處理的 callback 函數(shù)調(diào)用完畢)后互纯,會默認調(diào)用 free 函數(shù)釋放掉所占的內(nèi)存。即磕蒲,發(fā)送方申請內(nèi)存留潦,接收方釋放兔院。

4)在 Skynet 啟動時,建立了若干工作線程(數(shù)量可配置)坊萝,它們不斷的從主消息列隊中取出一個次級消息隊列來,再從次級隊列中取去一條消息菩鲜,調(diào)用對應的服務的 callback 函數(shù)進行出來惦积。為了調(diào)用公平,一次僅處理一條消息狮崩,而不是耗凈所有消息(雖然那樣的局部效率更高睦柴,因為減少了查詢服務實體的次數(shù),以及主消息隊列進出的次數(shù))坦敌,這樣可以保證沒有服務會被餓死。
用戶定義的 callback 函數(shù)不必保證線程安全窝趣,因為在 callback 函數(shù)被調(diào)用的過程中训柴,其它工作線程沒有可能獲得這個 callback 函數(shù)所熟服務的次級消息隊列妇拯,也就不可能被并發(fā)了。一旦一個服務的消息隊列暫時為空越锈,它的消息隊列就不再被放回全局消息隊列了甘凭。這樣使大部分不工作的服務不會空轉(zhuǎn) CPU 。

收發(fā)處理消息的那些事兒

這節(jié)會說明如何從在lua層丹弱,從服務發(fā)條消息到另一個服務铲咨,這里為了簡化起見纤勒,考慮的是send調(diào)用隆檀,發(fā)的參數(shù)是string類型,順便印證上個小結(jié)最后總結(jié)引用的設計思想泉坐。

當我們在lua層的業(yè)務邏輯中寫這么一條語句skynet.send(agentAddr, "lua", "CallFunc", "hello world.")后會發(fā)生什么事情裳仆?
他調(diào)用的是:

416 function skynet.send(addr, typename, ...)
417     local p = proto[typename]
418     return c.send(addr, p.id, 0 , p.pack(...))
419 end

其中"lua"是我們的協(xié)議類型, "CallFunc"和"hello world."表示的是方法名和參數(shù)记某,表示agentAddr對方服務handle构捡,skynet.pack是對方法名和參數(shù)名編碼,是調(diào)用C層的:

599 LUAMOD_API int
600 luaseri_pack(lua_State *L) {
601     struct block temp;
602     temp.next = NULL;
603     struct write_block wb;
604     wb_init(&wb, &temp);
605     pack_from(L,&wb,0);
606     assert(wb.head == &temp);
607     seri(L, &temp, wb.len); 
609     wb_free(&wb);
611     return 2;
612 }

534 static void
535 seri(lua_State *L, struct block *b, int len) {
536     uint8_t * buffer = skynet_malloc(len);
537     uint8_t * ptr = buffer;
538     int sz = len;
539     while(len>0) {
540         if (len >= BLOCK_SIZE) {
541             memcpy(ptr, b->buffer, BLOCK_SIZE);
542             ptr += BLOCK_SIZE;
543             len -= BLOCK_SIZE;
544             b = b->next;
545         } else {
546             memcpy(ptr, b->buffer, len);
547             break;
548         }
549     }
550 
551     lua_pushlightuserdata(L, buffer);
552     lua_pushinteger(L, sz);
553 }

296 static void
297 pack_one(lua_State *L, struct write_block *b, int index, int depth) {
320     case LUA_TSTRING: {
321         size_t sz = 0;
322         const char *str = lua_tolstring(L,index,&sz);
323         wb_string(b, str, (int)sz);
324         break;
325     }

打包的時候滑凉,會進行一次拷貝喘帚,最后返回一個C指針和數(shù)據(jù)長度,接著調(diào)用C層的send函數(shù)若未,即lsend倾鲫,lsend調(diào)用send_message(L, 0, 2)

232 static int
233 send_message(lua_State *L, int source, int idx_type) {
234     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
252     int mtype = lua_type(L,idx_type+2);
253     switch (mtype) {
267     case LUA_TLIGHTUSERDATA: {
268         void * msg = lua_touserdata(L,idx_type+2);
269         int size = luaL_checkinteger(L,idx_type+3);
270         if (dest_string) {
271             session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
272         } else {
273             session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
274         }
275         break;
276     }
285     lua_pushinteger(L,session);
286     return 1;
287 }

其中type或上PTYPE_TAG_DONTCOPY表示不要拷貝數(shù)據(jù)乌昔,因為msg是C層變量指針,會在適當?shù)臅r候由C釋放磕道;最后會返回session用于可能后續(xù)的響應消息回來找到上下文(對于call調(diào)用);

699 int 
700 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, i    nt session, void * data, size_t sz) {
708     _filter_args(context, type, &session, (void **)&data, &sz);
710     if (source == 0) {
711         source = context->handle;
712     }   
713             
714     if (destination == 0) {
715         return session;
716     }
717     if (skynet_harbor_message_isremote(destination)) {
718         //跟另外節(jié)點通訊
724     } else {
725         struct skynet_message smsg;
726         smsg.source = source;
727         smsg.session = session;
728         smsg.data = data;
729         smsg.sz = sz;
730         
731         if (skynet_context_push(destination, &smsg)) {
732             skynet_free(data);
733             return -1;
734         }   
735     }       
736     return session;
737 }

229 int
230 skynet_context_push(uint32_t handle, struct skynet_message *message) {
231     struct skynet_context * ctx = skynet_handle_grab(handle);
232     if (ctx == NULL) {
233         return -1;
234     }
235     skynet_mq_push(ctx->queue, message);
236     skynet_context_release(ctx);
237 
238     return 0;
239 }

上面代碼是把消息壓入對方服務的消息隊列悼做,而_filter_args主要是(可能)分配一個session值撵割,對長度進行編碼,高八位存放type信息羹与。
另外每個服務分配的session都是大于0的庶灿,每條消息唯一session,且當int變?yōu)樨摂?shù)時重置session為1往踢,不大可能造成兩條不同的消息而session相同峻呕。

當對方服務被工作線程處理時,會進行消息的回調(diào)瘦癌,就回到了上面的實現(xiàn)。

工作中整理的問題:

1)如果處理某個服務的消息造成死循環(huán)或處理時間過久热押,那么可能導致底層一些服務隊列得不到調(diào)度和處理斤寇,可能造成消息隊列過大,占用更多內(nèi)存牙寞,從而導致“雪崩”問題莫秆,后面處理的消息都可能是超時的。比如定時任務運行耗時過多;業(yè)務需求比如從一些道具列表中隨機幾個不同的道具伟端,直到選到幾個不同的才退出循環(huán),沒有選擇適合的隨機算法党巾;有的業(yè)務邏輯不正確,導致使用協(xié)程數(shù)量過多等待驳规,無法切換回來并釋放署海。

2)協(xié)程的調(diào)度,切換回來后出現(xiàn)各種各樣的問題捻勉,比如某個對象已經(jīng)釋放刀森,雖然通過閉包引用著某個對象(地址),但是obj->isReleased()是true埠偿,所以處理了各種錯誤的數(shù)據(jù)榜晦。

3)用不到的對象本應該釋放,不小心相互引用著對方浊服,導致lua gc時不能夠回收相應資源咐刨,造成內(nèi)存泄漏唧领。

4)不合理的使用call孽拷,導致在某些關(guān)鍵路徑上任性的切換協(xié)程半抱,而可能導致時序問題,因為使用協(xié)程后炼幔,有些是不確定的史简,即切出后什么時候切回來,如果玩家在登陸的時候請求其他服務數(shù)據(jù)導致切出協(xié)程跺讯,那什么時候切回來呢。

5)還有跟順序有關(guān)的消息處理局荚,引用云風作者舉的例子“如果 B 是一個 lua 服務愈污,當 A 向 B 發(fā)送了兩條消息 x 和 y 。skynet 一定保證 x 先被 B 中的 lua 虛擬機收到茫陆,并為 x 消息生成了一個 coroutine X 擎析,并運行這個 coroutine 。然后才會收到消息 y 桨醋,重新生成一個新的 coroutine Y 现斋,接下來運行∷材冢”
如果處理X的時候限书,協(xié)程因為某些原因掛起,那么處理Y的時候可能會改變一些狀態(tài)等能真。Skynet中Lua服務的消息處理

6)還有使用不適合的算法扰柠,比如時間復雜度的O(n)或更差實現(xiàn)等。

淺析skynet底層框架中篇主要分析skynet的定時器和網(wǎng)絡實現(xiàn)部分蝙泼,再加個消息隊列劝枣,和本篇的第三個小問題倡缠。

下面是參考資料:
云風博客
skynet源碼
Skynet 設計綜述
skynet服務的過載保護

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市载荔,隨后出現(xiàn)的幾起案子采桃,更是在濱河造成了極大的恐慌,老刑警劉巖工扎,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件衔蹲,死亡現(xiàn)場離奇詭異,居然都是意外死亡橱健,警方通過查閱死者的電腦和手機沙廉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門撬陵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人巨税,你說我怎么就攤上這事垢夹。” “怎么了果元?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵而晒,是天一觀的道長。 經(jīng)常有香客問我迅耘,道長,這世上最難降的妖魔是什么纽哥? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任栖秕,我火速辦了婚禮,結(jié)果婚禮上只壳,老公的妹妹穿的比我還像新娘暑塑。我一直安慰自己,他們只是感情好惕艳,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布分蓖。 她就那樣靜靜地躺著么鹤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蒸甜。 梳的紋絲不亂的頭發(fā)上柠新,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天恨憎,我揣著相機與錄音,去河邊找鬼憔恳。 笑死钥组,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的程梦。 我是一名探鬼主播橘荠,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼哥童,長吁一口氣:“原來是場噩夢啊……” “哼褒翰!你這毒婦竟也來了影暴?” 一聲冷哼從身側(cè)響起探赫,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎妆兑,沒想到半個月后毛仪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡腺逛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年棍矛,在試婚紗的時候發(fā)現(xiàn)自己被綠了抛杨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡茁帽,死狀恐怖屈嗤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情战秋,我是刑警寧澤讨韭,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站狰闪,受9級特大地震影響埋泵,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜丽声,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一雁社、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧磺浙,春花似錦徒坡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至跨新,卻和暖如春坏逢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肖揣。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工浮入, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人彤断。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像平道,于是被迫代替她去往敵國和親供炼。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容