傳統(tǒng)游戲項(xiàng)目一般使用TCP協(xié)議進(jìn)行通信衔统,得益于它的穩(wěn)定和可靠提针,不過在網(wǎng)絡(luò)不穩(wěn)定的情況下,會(huì)出現(xiàn)丟包嚴(yán)重施敢。
不過近期有不少基于UDP的應(yīng)用層協(xié)議周荐,聲稱對(duì)UDP的不可靠進(jìn)行了改造,這意味著我們既可以享受網(wǎng)絡(luò)層提供穩(wěn)定可靠的服務(wù)僵娃,又可以享受它的速度概作。
KCP就是這樣的一個(gè)協(xié)議
不過網(wǎng)上說的再天花亂墜,我們也得親自調(diào)研默怨,分析源碼和它的機(jī)制讯榕,并測試它的性能,是否滿足項(xiàng)目上線要求。本文從C版本的源碼入手理解KCP的機(jī)制愚屁,再研究各種Java版本的實(shí)現(xiàn)
一济竹、KCP協(xié)議
原版源碼(C代碼):https://github.com/skywind3000/kcp
基于底層協(xié)議(一般是UDP)之上,完全在應(yīng)用層實(shí)現(xiàn)類TCP的可靠機(jī)制(快速重傳霎槐,擁塞控制等)
二送浊、KCP特性
KCP實(shí)現(xiàn)以下特性,也可參考github中README中對(duì)KCP的定義
特性 | 說明 | 源碼位置 |
---|---|---|
RTO優(yōu)化 | 超時(shí)時(shí)間計(jì)算優(yōu)于TCP | ikcp_update_ack |
選擇性重傳 | KCP只重傳真正丟失的數(shù)據(jù)包栽燕,TCP會(huì)全部重傳丟失包之后的全部數(shù)據(jù) | ikcp_parse_fastack罕袋,ikcp_flush |
快速重傳 | 根據(jù)配置,可以在丟失包被跳過一定次數(shù)后直接重傳碍岔,不等RTO超時(shí) | ikcp_parse_fastack浴讯,ikcp_flush |
UNA + ACK | ARQ模型響應(yīng)有兩種,UNA(此編號(hào)前所有包已收到蔼啦,如TCP)榆纽,ACK(該編號(hào)包已收到),光用UNA將導(dǎo)致全部重傳捏肢,光用ACK則丟失成本太高奈籽,以往協(xié)議都是二選其一,而 KCP協(xié)議中鸵赫,除去單獨(dú)的 ACK包外衣屏,所有包都有UNA信息。 | ikcp_flush(每次update辩棒,都發(fā)送ACK) |
非延遲ACK | KCP可配置是否延遲發(fā)送ACK | ikcp_update_ack |
流量控制 | 同TCP的公平退讓原則狼忱,發(fā)送窗口大小由:發(fā)送緩存大小、接收端剩余接收緩存大小一睁、丟包退讓及慢啟動(dòng)這四要素決定 | ikcp_input钻弄,ikcp_flush |
三、KCP報(bào)文
1. 報(bào)文解析源碼
源碼中對(duì)報(bào)文解析部分代碼如下
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
2. 報(bào)文定義
報(bào)文中標(biāo)識(shí)的定義
名詞 | 全稱 | 備注 | 作用 |
---|---|---|---|
conv | conversation id | 會(huì)話ID | 每個(gè)連接的唯一標(biāo)識(shí) |
cmd | command | 命令 | 每個(gè)數(shù)據(jù)包指定邏輯 |
frg | fragment count | 數(shù)據(jù)分段序號(hào) | 根據(jù)mtu(最大傳輸單元)和mss(最大報(bào)文長度)的數(shù)據(jù)分段 |
wnd | window size | 接收窗口大小 | 流量控制 |
ts | timestamp | 時(shí)間戳 | 數(shù)據(jù)包發(fā)送時(shí)間記錄 |
sn | serial number | 數(shù)據(jù)報(bào)的序號(hào) | 確保包的有序 |
una | un-acknowledged serial number | 對(duì)端下一個(gè)要接收的數(shù)據(jù)報(bào)序號(hào) | 確保包的有序 |
3. 消息類型
KCP報(bào)文的四種消息類型
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data: 推送數(shù)據(jù)
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack: 對(duì)推送數(shù)據(jù)的確認(rèn)
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask): 詢問窗口大小
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell): 回復(fù)窗口大小
-
報(bào)文結(jié)構(gòu)
四者吁、源碼解析
在網(wǎng)絡(luò)四層模型中窘俺,KCP和TCP/UDP(傳輸層),IP(網(wǎng)絡(luò)層)等協(xié)議有著本質(zhì)上區(qū)別复凳,理論上KCP是屬于應(yīng)用層協(xié)議瘤泪。
KCP并不提供協(xié)議實(shí)際收發(fā)處理,它只是在傳輸層只上對(duì)消息和鏈接的一層中間管理育八。
在KCP的源碼中均芽,它僅僅包含ikcp.c和ikcp.h兩個(gè)文件,僅提供KCP的數(shù)據(jù)管理和數(shù)據(jù)接口单鹿,而用戶需要在應(yīng)用層進(jìn)行KCP的調(diào)度
1. 結(jié)構(gòu)體定義
KCP分包結(jié)構(gòu)KCP對(duì)象結(jié)構(gòu)體定義
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //用來標(biāo)記這個(gè)seg屬于哪個(gè)kcp
IUINT32 cmd;//這個(gè)包的指令是: // 數(shù)據(jù) ack 詢問/應(yīng)答窗口大小
IUINT32 frg; //分包時(shí)掀宋,分包的序號(hào),0為終結(jié)
IUINT32 wnd;//發(fā)送這個(gè)seg的這個(gè)端的 窗口大小--> 遠(yuǎn)端的接收窗口大小
IUINT32 ts; //我不知道為什么要用時(shí)間軸,這個(gè)都1秒劲妙,有什么用 ??
IUINT32 sn;//相當(dāng)于tcp的ack
IUINT32 una;//una 遠(yuǎn)端等待接收的一個(gè)序號(hào)
IUINT32 len; //data的長度
IUINT32 resendts;//重發(fā)的時(shí)間軸
IUINT32 rto;//等于發(fā)送端kcp的 rx_rto->由 計(jì)算得來
IUINT32 fastack;//ack跳過的次數(shù)湃鹊,用于快速重傳
IUINT32 xmit;// fastack resend次數(shù)
char data[1];//當(dāng)malloc時(shí),只需要 malloc(sizeof(IKCPSEG)+datalen) 則镣奋,data長=數(shù)據(jù)長度+1 剛好用來放0
};
struct IKCPCB
{
//會(huì)話ID,最大傳輸單元,最大分片大小币呵,狀態(tài) mss=mtu-sizeof(IKCPSEG)
IUINT32 conv, mtu, mss, state;
//第一個(gè)未接收到的包,待發(fā)送的包(可以認(rèn)為是tcp的ack自增)侨颈,接收消息的序號(hào)-> 用來賦seg的una值
IUINT32 snd_una, snd_nxt, rcv_nxt;
//前兩個(gè)不知道干嘛 擁塞窗口的閾值 用來控制cwnd值變化的
IUINT32 ts_recent, ts_lastack, ssthresh;
//這幾個(gè)變量是用來更新rto的
// rx_rttval 接收ack的浮動(dòng)值
// rx_srtt 接收ack的平滑值
// rx_rto 計(jì)算出來的rto
// rx_minrto 最小rto
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
//發(fā)送隊(duì)列的窗口大小
//接收隊(duì)列的窗口大小
//遠(yuǎn)端的接收隊(duì)列的窗口大小
//窗口大小
//probe 用來二進(jìn)制標(biāo)記
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
//時(shí)間軸 時(shí)間間隔 下一次flush的時(shí)間 xmit發(fā)射多少次余赢? 看不到有什么地方用到
IUINT32 current, interval, ts_flush, xmit;
//接收到的數(shù)據(jù)seg個(gè)數(shù)
//需要發(fā)送的seg個(gè)數(shù)
IUINT32 nrcv_buf, nsnd_buf;
//接收隊(duì)列的數(shù)據(jù) seg個(gè)數(shù)
//發(fā)送隊(duì)列的數(shù)據(jù) seg個(gè)數(shù)
IUINT32 nrcv_que, nsnd_que;
//是否為nodelay模式:如果開啟,rto計(jì)算范圍更小
//updated 在調(diào)用flush時(shí)哈垢,有沒有調(diào)用過update
IUINT32 nodelay, updated;
//請(qǐng)求訪問窗口的時(shí)間相關(guān) 當(dāng)遠(yuǎn)程端口大小為0時(shí)
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
//發(fā)送隊(duì)列
struct IQUEUEHEAD snd_queue;
//接收隊(duì)列
struct IQUEUEHEAD rcv_queue;
//待發(fā)送隊(duì)列
struct IQUEUEHEAD snd_buf;
//待接收隊(duì)列
struct IQUEUEHEAD rcv_buf;
//用來緩存自己接收到了多少個(gè)ack
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
//用戶信息
void *user;
//好像就用來操作數(shù)據(jù)的中轉(zhuǎn)站
char *buffer;
//快速重傳的閾值
int fastresend;
//快速重傳的上限
int fastlimit;
//是否無視重傳等其它設(shè)置窗口
//steam模式的話妻柒,會(huì)將幾個(gè)小包合并成大包
int nocwnd, stream;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
2. 接口分析
分析C源碼,KCP作為中間管理層耘分,主要提供以下接口
//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------
// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
// 創(chuàng)建kcp對(duì)象举塔,conv必須在兩個(gè)端之間相同,user會(huì)被傳遞到output回調(diào)求泰,
// output回調(diào)這樣設(shè)置:kcp->output = my_udp_output
ikcpcb* ikcp_create(IUINT32 conv, void *user);
// release kcp control object
// 釋放kcp對(duì)象
void ikcp_release(ikcpcb *kcp);
// set output callback, which will be invoked by kcp
// 設(shè)置kcp調(diào)用的output回調(diào)
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));
// user/upper level recv: returns size, returns below zero for EAGAIN
// 用戶層/上層 接收消息:返回接收長度央渣,數(shù)據(jù)讀取錯(cuò)誤返回值小于0
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
// user/upper level send, returns below zero for error
// 用戶層/上層 發(fā)送消息,錯(cuò)誤返回值小于0
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
// 更新狀態(tài)(每10ms-100ms調(diào)用一次)渴频,或者你可以通過調(diào)用ikcp_check芽丹,
// 來得知什么時(shí)候再次調(diào)用(不調(diào)用ikcp_input/_send)
// current - 當(dāng)前時(shí)間戳(毫秒)
void ikcp_update(ikcpcb *kcp, IUINT32 current);
// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
// 決定你什么時(shí)候調(diào)用ikcp_update
// 返回你多少毫秒后應(yīng)該調(diào)用ikcp_update,如果沒有ikcp_input/_send調(diào)用卜朗,你可以在那個(gè)時(shí)間
// 調(diào)用ikcp_updates來代替自己驅(qū)動(dòng)update調(diào)用
// 用于減少不必要的ikcp_update調(diào)用志衍。用這個(gè)來驅(qū)動(dòng)ikcp_update(比如:實(shí)現(xiàn)類epoll的機(jī)制,
// 或者優(yōu)化處理大量kcp連接時(shí)的ikcp_update調(diào)用)
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);
// when you received a low level packet (eg. UDP packet), call it
// 接收下層數(shù)據(jù)包(比如:UDP數(shù)據(jù)包)時(shí)調(diào)用
int ikcp_input(ikcpcb *kcp, const char *data, long size);
// flush pending data
// 刷新數(shù)據(jù)
void ikcp_flush(ikcpcb *kcp);
// check the size of next message in the recv queue
// 檢測接收隊(duì)列里下條消息的長度
int ikcp_peeksize(const ikcpcb *kcp);
// change MTU size, default is 1400
// 修改MTU長度聊替,默認(rèn)1400
int ikcp_setmtu(ikcpcb *kcp, int mtu);
// set maximum window size: sndwnd=32, rcvwnd=32 by default
// 設(shè)置最大窗口大小,默認(rèn)值:sndwnd=32, rcvwnd=32
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
// get how many packet is waiting to be sent
// 獲取準(zhǔn)備發(fā)送的數(shù)據(jù)包
int ikcp_waitsnd(const ikcpcb *kcp);
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
// 快速設(shè)置:ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay:0:使用(默認(rèn))培廓,1:使用
// interval:update時(shí)間(毫秒)惹悄,默認(rèn)100ms
// resend:0:不適用快速重發(fā)(默認(rèn)), 其他:自己設(shè)置值,若設(shè)置為2(則2次ACK跨越將會(huì)直接重傳)
// nc:0:正常擁塞控制(默認(rèn)), 1:不適用擁塞控制
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);
// setup allocator
// 設(shè)置kcp allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));
// read conv
// 獲取conv
IUINT32 ikcp_getconv(const void *ptr);
3. 調(diào)度邏輯
KCP關(guān)鍵接口:
-
更新(上層驅(qū)動(dòng)KCP狀態(tài)更新)
ikcp_update:kcp狀態(tài)更新接口肩钠,需要上層進(jìn)行調(diào)度泣港,判斷flush時(shí)間,滿足條件調(diào)用ikcp_flush刷新數(shù)據(jù)价匠,同時(shí)也負(fù)責(zé)對(duì)收到數(shù)據(jù)的kcp端回復(fù)ACK消息 -
發(fā)送
ikcp_send -> ikcp_update -> ikcp_output
ikcp_send:上層調(diào)用發(fā)送接口当纱,把數(shù)據(jù)根據(jù)mss值進(jìn)行分片,設(shè)置分包編號(hào)踩窖,放到snd_queue隊(duì)尾
ikcp_flush:發(fā)送數(shù)據(jù)接口坡氯,根據(jù)對(duì)端窗口大小,拷貝snd_queue的數(shù)據(jù)到snd_buf,遍歷snd_buf箫柳,滿足條件則調(diào)用output回調(diào)(調(diào)用網(wǎng)絡(luò)層的發(fā)送) -
接收
ikcp_input -> ikcp_update -> ikcp_recv
ikcp_input:解析上層輸入數(shù)據(jù)手形,拷貝rcv_buf到rcv_queue
ikcp_recv:數(shù)據(jù)接收接口,上層從rcv_queue中復(fù)制數(shù)據(jù)到網(wǎng)絡(luò)層buffer
五悯恍、Java版本
目前github上有幾個(gè)高star的java版本實(shí)現(xiàn)库糠,選取最高的三個(gè)進(jìn)行分析
1. https://github.com/szhnet/kcp-netty.git(star:212)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.UkcpServerChannel繼承ServerChannel,UkcpServerBootStrap
3.用Boss線程EventLoopGroup的read事件來驅(qū)動(dòng)KCP邏輯
優(yōu)點(diǎn):使用Netty的Boss線程Read事件來驅(qū)動(dòng)KCP涮毫,不用while(true)的驅(qū)動(dòng)瞬欧;使用簡單,只需使用指定的ServerChannel和ServerBootStrap來啟動(dòng)Netty
缺點(diǎn):無明顯缺點(diǎn)
2. https://github.com/beykery/jkcp.git(star:172)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.啟動(dòng)指定線程數(shù)的KcpThread自定義IO線程池罢防,進(jìn)行KCP邏輯調(diào)度
3.Netty讀消息時(shí)拋到KcpThread自定義IO線程
// 通過hash選擇IO線程處理
InetSocketAddress sender = dp.sender();
int hash = sender.hashCode();
hash = hash < 0 ? -hash : hash;
this.workers[hash % workers.length].input(dp);
優(yōu)點(diǎn):代碼簡單明了艘虎,容易理解,核心是翻譯版源碼篙梢,外殼套的是Netty+自定義IO線程池
缺點(diǎn):IO線程池會(huì)while(true)的調(diào)用KCP的update
3. https://github.com/l42111996/java-Kcp.git(star:187)
實(shí)現(xiàn)原理:
1.KCP邏輯是源碼的Java翻譯版(一模一樣)
2.Netty讀消息時(shí)顷帖,扔到定時(shí)器,1ms后渤滞,拋出任務(wù)到自定義IO線程
優(yōu)點(diǎn):擁有1的全部優(yōu)點(diǎn)贬墩,也在Netty的讀消息,把消息拋到定時(shí)器去調(diào)用KCP的邏輯妄呕,避免了2的無意義的while(true)陶舞,同時(shí)實(shí)現(xiàn)功能更全,有上線項(xiàng)目驗(yàn)證(據(jù)作者描述)
缺點(diǎn):Netty相關(guān)邏輯完全封裝起來绪励,不能修改任何Netty參數(shù)(不過源碼中對(duì)Netty的參數(shù)已配置的很好了)
目前看來肿孵,第三種實(shí)現(xiàn)(https://github.com/l42111996/java-Kcp.git)是最理想的方式
如果大家感興趣,后邊會(huì)對(duì)第三種實(shí)現(xiàn)進(jìn)行詳細(xì)的源碼分析
六疏魏、性能測試
近期準(zhǔn)備做性能測試進(jìn)行對(duì)比停做,感興趣的朋友可以關(guān)注下
// TODO