前言
Netty 的解碼器有很多種摊鸡,比如基于長(zhǎng)度的绽媒,基于分割符的,私有協(xié)議的免猾。但是是辕,總體的思路都是一致的。
拆包思路:當(dāng)數(shù)據(jù)滿足了 解碼條件時(shí)猎提,將其拆開获三。放到數(shù)組。然后發(fā)送到業(yè)務(wù) handler 處理锨苏。
半包思路: 當(dāng)讀取的數(shù)據(jù)不夠時(shí)疙教,先存起來,直到滿足解碼條件后伞租,放進(jìn)數(shù)組贞谓。送到業(yè)務(wù) handler 處理。
而實(shí)現(xiàn)這個(gè)邏輯的就是我們今天的主角:ByteToMessageDecoder肯夏。
看名字的意思是:將字節(jié)轉(zhuǎn)換成消息的解碼器经宏。人如其名。而他本身也是一個(gè)入站 handler驯击,所以烁兰,我們還是從他的 channelRead 方法入手。
1. channelRead 方法
精簡(jiǎn)過的代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 從對(duì)象池中取出一個(gè)List
CodecOutputList out = CodecOutputList.newInstance();
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 第一次解碼
cumulation = data;// 累計(jì)
} else {
// 第二次解碼徊都,就將 data 向 cumulation 追加沪斟,并釋放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加后的 cumulation 后,調(diào)用 decode 方法進(jìn)行解碼
// 解碼過程中暇矫,調(diào)用 fireChannelRead 方法主之,主要目的是將累積區(qū)的內(nèi)容 decode 到 數(shù)組中
callDecode(ctx, cumulation, out);
// 如果累計(jì)區(qū)沒有可讀字節(jié)了
if (cumulation != null && !cumulation.isReadable()) {
// 將次數(shù)歸零
numReads = 0;
// 釋放累計(jì)區(qū)
cumulation.release();
// 等待 gc
cumulation = null;
} // 如果超過了 16 次,就壓縮累計(jì)區(qū)李根,主要是將已經(jīng)讀過的數(shù)據(jù)丟棄槽奕,將 readIndex 歸零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
// 如果沒有向數(shù)組插入過任何數(shù)據(jù)
decodeWasNull = !out.insertSinceRecycled();
// 循環(huán)數(shù)組房轿,向后面的 handler 發(fā)送數(shù)據(jù)粤攒,如果數(shù)組是空,那不會(huì)調(diào)用
fireChannelRead(ctx, out, size);
// 將數(shù)組中的內(nèi)容清空囱持,將數(shù)組的數(shù)組的下標(biāo)恢復(fù)至原來
out.recycle();
}
樓主已經(jīng)在方法中寫了注釋夯接,但還是說說主要的步驟:
- 從對(duì)象池中取出一個(gè)空的數(shù)組。
- 判斷成員變量是否是第一次使用纷妆,(注意盔几,既然使用了成員變量,所以這個(gè) handler 不能是 handler 的掩幢。)將 unsafe 中傳遞來的數(shù)據(jù)寫入到這個(gè) cumulation 累積區(qū)中逊拍。
- 寫到累積區(qū)后上鞠,調(diào)用子類的 decode 方法,嘗試將累積區(qū)的內(nèi)容解碼顺献,每成功解碼一個(gè)旗国,就調(diào)用后面節(jié)點(diǎn)的 channelRead 方法枯怖。若沒有解碼成功注整,什么都不做。
- 如果累積區(qū)沒有未讀數(shù)據(jù)了度硝,就釋放累積區(qū)肿轨。
- 如果還有未讀數(shù)據(jù),且解碼超過了 16 次(默認(rèn))蕊程,就對(duì)累積區(qū)進(jìn)行壓縮椒袍。將讀取過的數(shù)據(jù)清空,也就是將 readIndex 設(shè)置為0.
- 設(shè)置 decodeWasNull 的值藻茂,如果上一次沒有插入任何數(shù)據(jù)驹暑,這個(gè)值就是 ture。該值在 調(diào)用 channelReadComplete 方法的時(shí)候辨赐,會(huì)觸發(fā) read 方法(不是自動(dòng)讀取的話)优俘,嘗試從 JDK 的通道中讀取數(shù)據(jù),并將之前的邏輯重來掀序。主要應(yīng)該是怕如果什么數(shù)據(jù)都沒有插入帆焕,就執(zhí)行 channelReadComplete 會(huì)遺漏數(shù)據(jù)。
- 調(diào)用 fireChannelRead 方法不恭,將數(shù)組中的元素發(fā)送到后面的 handler 中叶雹。
- 將數(shù)組清空。并還給對(duì)象池换吧。
下面來說說詳細(xì)的步驟折晦。
2. 從對(duì)象池中取出一個(gè)空的數(shù)組
代碼:
@1
CodecOutputList out = CodecOutputList.newInstance();
@2
static CodecOutputList newInstance() {
return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3
private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL =
new FastThreadLocal<CodecOutputLists>() {
@Override
protected CodecOutputLists initialValue() throws Exception {
// 16 CodecOutputList per Thread are cached.
return new CodecOutputLists(16);
}
};
@4
CodecOutputLists(int numElements) {
elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)];
for (int i = 0; i < elements.length; ++i) {
// Size of 16 should be good enough for the majority of all users as an initial capacity.
elements[i] = new CodecOutputList(this, 16);
}
count = elements.length;
currentIdx = elements.length;
mask = elements.length - 1;
}
@5
private CodecOutputList(CodecOutputListRecycler recycler, int size) {
this.recycler = recycler;
array = new Object[size];
}
@6
public CodecOutputList getOrCreate() {
if (count == 0) {
// Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
// low.
return new CodecOutputList(NOOP_RECYCLER, 4);
}
--count;
int idx = (currentIdx - 1) & mask;
CodecOutputList list = elements[idx];
currentIdx = idx;
return list;
}
代碼分為 1,2沾瓦,3满着,4,5暴拄, 6 步驟漓滔。
- 靜態(tài)方法調(diào)用。
- 從 FastThreadLocal 中取出一個(gè) CodecOutputLists 對(duì)象乖篷,并從這個(gè)集合中再取出一個(gè) List响驴。也就是 List 中有 List∷喊可以理解為雙重?cái)?shù)組豁鲤。
- 調(diào)用 FastThreadLocal 的 initialValue 方法返回一個(gè) CodecOutputLists 對(duì)象秽誊。
- 創(chuàng)建數(shù)組。數(shù)組大小默認(rèn)16琳骡,循環(huán)填充 CodecOutputList 元素锅论。設(shè)置 count,currentIdx 楣号,mask 屬性最易。
- 創(chuàng)建 CodecOutputList 對(duì)象,這個(gè) recycler 就是他的父 CodecOutputLists炫狱,并創(chuàng)建一個(gè)默認(rèn) 16 的空數(shù)組藻懒。
- 首次進(jìn)入 count 不是0,應(yīng)該是 16视译,隨后將 count -1嬉荆,并與運(yùn)算出 Lists 中的下標(biāo),獲取到下標(biāo)的內(nèi)容酷含。也就是一個(gè) List鄙早。在調(diào)用 recycle 方法還給對(duì)象池的時(shí)候,會(huì)將所有參數(shù)恢復(fù)椅亚。
由于這個(gè) getOrCreate 方法會(huì)被一個(gè)線程的多個(gè)地方使用限番,因此 16 是個(gè)統(tǒng)計(jì)值。當(dāng) 16 不夠的時(shí)候什往,就會(huì)創(chuàng)建一個(gè)新的 List扳缕。也就是 count == 0 的邏輯。而 & mask 的操作就是一個(gè)取模的操作别威。
3. 寫入累積區(qū)
代碼如下:
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
這個(gè) cumulator 默認(rèn)是個(gè) Cumulator 類型的 MERGE_CUMULATOR躯舔,該實(shí)例最主要的是從重寫了 cumulate 方法:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
可以看到該方法,主要是將 unsafe.read 傳遞過來的 ByteBuf 的內(nèi)容寫入到 cumulation 累積區(qū)中省古,然后釋放掉舊的內(nèi)容粥庄,由于這個(gè)變量是成員變量,因此可以多次調(diào)用 channelRead 方法寫入豺妓。
同時(shí)這個(gè)方法也考慮到了擴(kuò)容的問題惜互,總的來說就是 copy。
當(dāng)然琳拭,ByteToMessageDecoder 中還有一個(gè) Cumulator 實(shí)例训堆,稱之為 COMPOSITE_CUMULATOR,混合累積白嘁。由于上個(gè)實(shí)例的 cumulate 方法是使用內(nèi)存拷貝的坑鱼,因此,這里提供了使用混合內(nèi)存絮缅。相較于拷貝鲁沥,性能會(huì)更好點(diǎn)呼股,但同時(shí)也會(huì)更復(fù)雜。
4. decode 方法的作用
當(dāng)數(shù)據(jù)追擊到累積區(qū)之后画恰,需要調(diào)用 decode 方法進(jìn)行解碼彭谁,代碼如下:
@ 1
callDecode(ctx, cumulation, out);
@2
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 如果累計(jì)區(qū)還有可讀字節(jié)
while (in.isReadable()) {
int outSize = out.size();
// 上次循環(huán)成功解碼
if (outSize > 0) {
// 調(diào)用后面的業(yè)務(wù) handler 的 ChannelRead 方法
fireChannelRead(ctx, out, outSize);
// 將 size 置為0
out.clear();//
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 得到可讀字節(jié)數(shù)
int oldInputLength = in.readableBytes();
// 調(diào)用 decode 方法,將成功解碼后的數(shù)據(jù)放入道 out 數(shù)組中允扇,可能會(huì)刪除當(dāng)前節(jié)點(diǎn)缠局,刪除之前會(huì)將數(shù)據(jù)發(fā)送到最后的 handler
decodeRemovalReentryProtection(ctx, in, out);// decode()
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (isSingleDecode()) {
break;
}
}
}
該方法主要邏輯:只要累積區(qū)還有未讀數(shù)據(jù),就循環(huán)進(jìn)行讀取蔼两。
調(diào)用 decodeRemovalReentryProtection 方法甩鳄,內(nèi)部調(diào)用了子類重寫的 decode 方法逞度,很明顯额划,這里是個(gè)模板模式。decode 方法的邏輯就是將累積區(qū)的內(nèi)容按照約定進(jìn)行解碼档泽,如果成功解碼俊戳,就添加到數(shù)組中。同時(shí)該方法也會(huì)檢查該 handler 的狀態(tài)馆匿,如果被移除出 pipeline 了抑胎,就將累積區(qū)的內(nèi)容直接刷新到后面的 handler 中。
如果 Context 節(jié)點(diǎn)被移除了渐北,直接結(jié)束循環(huán)阿逃。如果解碼前的數(shù)組大小和解碼后的數(shù)組大小相等,且累積區(qū)的可讀字節(jié)數(shù)沒有變化赃蛛,說明此次讀取什么都沒做恃锉,就直接結(jié)束。如果字節(jié)數(shù)變化了呕臂,說明雖然數(shù)組沒有增加破托,但確實(shí)在讀取字節(jié),就再繼續(xù)讀取歧蒋。
如果上面的判斷過了土砂,說明數(shù)組讀到數(shù)據(jù)了,但如果累積區(qū)的 readIndex 沒有變化谜洽,則拋出異常萝映,說明沒有讀取數(shù)據(jù),但數(shù)組卻增加了阐虚,子類的操作是不對(duì)的序臂。
如果是個(gè)單次解碼器,解碼一次就直接結(jié)束了敌呈。
所以贸宏,這段代碼的關(guān)鍵就是子類需要重寫 decode 方法造寝,將累積區(qū)的數(shù)據(jù)正確的解碼并添加到數(shù)組中。每添加一次成功吭练,就會(huì)調(diào)用 fireChannelRead 方法诫龙,將數(shù)組中的數(shù)據(jù)傳遞給后面的 handler。完成之后將數(shù)組的 size 設(shè)置為 0.
所以鲫咽,如果你的業(yè)務(wù) handler 在這個(gè)地方可能會(huì)被多次調(diào)用签赃。也可能一次也不調(diào)用。取決于數(shù)組中的值分尸。當(dāng)然锦聊,如果解碼 handler 被移除了,就會(huì)將累積區(qū)的所有數(shù)據(jù)刷到后面的 handler箩绍。
5. 剩下的邏輯
上面的邏輯就是解碼器最主要的邏輯:
將 read 方法的數(shù)據(jù)讀取到累積區(qū)孔庭,使用解碼器解碼累積區(qū)的數(shù)據(jù),解碼成功一個(gè)就放入到一個(gè)數(shù)組中材蛛,并將數(shù)組中的數(shù)據(jù)一次次的傳遞到后面的handler圆到。
從上面的邏輯看,除非 handler 被移除卑吭,否則不會(huì)調(diào)用后面的 handler 方法芽淡,也就是說,只要不滿足解碼器的解碼規(guī)則豆赏,就不會(huì)傳遞給后面的 handler挣菲。
再看看后面的邏輯,主要在 finally 塊中:
- 如果累積區(qū)沒有可讀數(shù)據(jù)了掷邦,將計(jì)數(shù)器歸零白胀,并釋放累積區(qū)。
- 如果不滿足上面的條件耙饰,且計(jì)數(shù)器超過了 16 次纹笼,就壓縮累積區(qū)的內(nèi)容,壓縮手段是刪除已讀的數(shù)據(jù)苟跪。將 readIndex 置為 0廷痘。還記得 ByteBuf 的指針結(jié)構(gòu)嗎?
這樣就能節(jié)省一些內(nèi)存了件已,但這會(huì)引起一些內(nèi)存復(fù)制的過程笋额,以性能損耗為前提的。
- 記錄 decodeWasNull 屬性篷扩,這個(gè)值的決定來自于你有沒有成功的向數(shù)組中插入數(shù)據(jù)兄猩,如果插入了,它就是 fasle,沒有插入枢冤,他就是 true鸠姨。這個(gè)值的作用在于,當(dāng) channelRead 方法結(jié)束的時(shí)候淹真,執(zhí)行該 decoder 的 channelReadComplete 方法(如果你沒有重寫的話)讶迁,會(huì)判斷這個(gè)值:
如果是 true,則會(huì)判斷 autoRead 屬性核蘸,如果是 false 的話巍糯,那么 Netty 認(rèn)為還有數(shù)據(jù)沒有讀到,不然數(shù)組為什么一直是空的客扎?就主動(dòng)調(diào)用 read 方法從 Socket 讀取祟峦。
- 調(diào)用 fireChannelRead 方法,嘗試將數(shù)組中的數(shù)據(jù)發(fā)送到后面的 handler徙鱼。為什么要這么做宅楞。按道理,到這一步的時(shí)候疆偿,數(shù)組不可能是空咱筛,為什么這里還要這么謹(jǐn)慎的再發(fā)送一次?
答:如果是單次解碼器杆故,就需要發(fā)送了,因此單詞解碼器是不會(huì)再 callDecode 方法中發(fā)送的溉愁。
- 最后处铛,將數(shù)組還給對(duì)象池。并清空數(shù)組內(nèi)容拐揭。
最后一行的 recycler.recycle(this)撤蟆,有兩種結(jié)果,如果是 CodecOutputLists 的 recycle 方法堂污,內(nèi)容如下:
恢復(fù)數(shù)組下標(biāo)家肯,對(duì) count ++,表示有對(duì)象可用了盟猖。
還有第二種讨衣,當(dāng) 16 個(gè)數(shù)組不夠用了,就需要?jiǎng)?chuàng)建一個(gè)新的式镐,在 getOrCreate 方法體現(xiàn)反镇。而構(gòu)造函數(shù)中的 recycler 是一個(gè)空對(duì)象。我們看看這個(gè)對(duì)象:
當(dāng)調(diào)用 recycle 方法的時(shí)候娘汞,什么都不做歹茶。等待 GC 回收。因?yàn)檫@不是個(gè)對(duì)象池的引用。
好惊豺,到這里燎孟,關(guān)于 ByteToMessageDecoder 解碼器的主要功能就解讀完了。
5. 總結(jié)
可以說尸昧,ByteToMessageDecoder 是解碼器的核心所做缤弦,Netty 在這里使用了模板模式,留給子類擴(kuò)展的方法就是 decode 方法彻磁。
主要邏輯就是將所有的數(shù)據(jù)全部放入累積區(qū)碍沐,子類從累積區(qū)取出數(shù)據(jù)進(jìn)行解碼后放入到一個(gè) 數(shù)組中,ByteToMessageDecoder 會(huì)循環(huán)數(shù)組調(diào)用后面的 handler 方法衷蜓,將數(shù)據(jù)一幀幀的發(fā)送到業(yè)務(wù) handler 累提。完成這個(gè)的解碼邏輯。
使用這種方式磁浇,無論是粘包還是拆包斋陪,都可以完美的實(shí)現(xiàn)。
還有一些小細(xì)節(jié):
- 比如解碼器可以單次的置吓。
- 如果解碼一直不成功无虚,那么數(shù)據(jù)就一直無法到達(dá)后面的 handler。除非該解碼器從 pipeline 移除衍锚。
- 像其他的 Netty 模塊一樣友题,這里也使用了對(duì)象池的概念,數(shù)組存放在線程安全的 ThreadLocal 中戴质,默認(rèn) 16 個(gè)度宦,當(dāng)不夠時(shí),就創(chuàng)建新的告匠,用完即被 GC 回收戈抄。
- 當(dāng)數(shù)組從未成功添加數(shù)據(jù),且程序沒有開啟 autoRead 后专,就主動(dòng)調(diào)用 read 方法划鸽。嘗試讀取數(shù)據(jù)。
Netty 所有的解碼器戚哎,都可以在此類上擴(kuò)展裸诽,一切取決于 decode 的實(shí)現(xiàn)。只要遵守 ByteToMessageDecoder 的約定即可建瘫。
good luckU负础!拧殷蛇!