ByteToMessageDecoder

ByteToMessageDecoder是解碼器的基類(lèi), 具有最基本的能力, 將字節(jié)解碼成消息, 以便在pipeline上進(jìn)行傳遞.

1564365188195.png

關(guān)鍵屬性

// 對(duì)入站數(shù)據(jù)進(jìn)行臨時(shí)緩沖, 直到它準(zhǔn)備好處理
ByteBuf cumulation;
// 緩沖的策略
private Cumulator cumulator = MERGE_CUMULATOR;
// 是否只解碼一次
private boolean singleDecode;
// 意思是解碼有沒(méi)有結(jié)果, true為沒(méi)有
private boolean decodeWasNull;
// 這批數(shù)據(jù)是不是第一次處理
private boolean first;
private int discardAfterReads = 16;
private int numReads;

channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如果消息是ByteBuf類(lèi)型
    if (msg instanceof ByteBuf) {
        // 從對(duì)象池取出一個(gè)CodecOutputList, 用來(lái)收集解碼后的消息
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            // 看累積器是不是為空來(lái)決定是不是首次處理
            first = cumulation == null;
            // 看是不是第一次處理,如果是,那么直接賦予累積器
            if (first) {
                cumulation = data;
            } else {
                // 否則累加到累積器, 見(jiàn)cumulator部分
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 對(duì)消息進(jìn)行解碼
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            // 如果累積器的內(nèi)容已經(jīng)讀取完畢,那么回收掉
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            // 當(dāng)這批數(shù)據(jù)已經(jīng)讀了有16次之多后,需要整理下內(nèi)存
            } else if (++ numReads >= discardAfterReads) {               
                numReads = 0;
                // 這里主要是對(duì)累積器進(jìn)行整理,清理discard區(qū)域?yàn)樽x寫(xiě)空間騰地方
                discardSomeReadBytes();
            }
            
            // 將解碼后的結(jié)果通知下游, 且回收out容器
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        // 如果不是ByteBuf類(lèi)型,直接傳遞給下游
        ctx.fireChannelRead(msg);
    }
}

callDecode

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 如果緩沖器有內(nèi)容可讀
        while (in.isReadable()) {
            int outSize = out.size();

            // 如果out容器有內(nèi)容,那么說(shuō)明解碼有結(jié)果了,那么馬上需要通知下游handle
            // 通知完,清理容器out
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();
                
                // 如果此時(shí)hanler被移除,那么不用繼續(xù)處理,直接退出
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            // 子類(lèi)來(lái)處理具體解碼的工作,最終將解碼后的消息放在out里面就好
            decode(ctx, in, out);

            // 如果此時(shí)hanler被移除,那么不用繼續(xù)處理,直接退出
            if (ctx.isRemoved()) {
                break;
            }
            
            // 如果這時(shí),out容器的大小沒(méi)有變化,說(shuō)明子類(lèi)那邊解碼還沒(méi)有結(jié)果
            if (outSize == out.size()) {
                // 如果沒(méi)有讀取任何數(shù)據(jù), 那么結(jié)束循環(huán)
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    // 如果有讀取,但不足與產(chǎn)生結(jié)果,那么需要繼續(xù)讀取
                    continue;
                }
            }
            
            // 如果有結(jié)果,但是根本就沒(méi)有讀取數(shù)據(jù),那么不是很詭異么?
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }
            
            // 是否只執(zhí)行一次
            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

Cumulator

MERGE_CUMULATOR

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        // 簡(jiǎn)單的情況是當(dāng)前的累積器空間不足,需要擴(kuò)容
        // 因?yàn)閞ead的時(shí)候,每隔一段時(shí)間都需要對(duì)累積器的內(nèi)存空間進(jìn)行整理,那么整理的過(guò)程會(huì)導(dǎo)致
        // 讀寫(xiě)index變更, 進(jìn)而導(dǎo)致淺拷貝后的ByteBuf不可用.
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) { 
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

子緩沖區(qū)

Netty中調(diào)用ByteBuf.duplicate(),ByteBuf.slice()和ByteBuf.order(ByteOrder)三個(gè)方法缆瓣, 會(huì)創(chuàng)建一個(gè)子緩沖區(qū)闪盔,子緩沖區(qū)共享父緩沖區(qū)的內(nèi)存區(qū)域抚官。子緩沖區(qū)沒(méi)有自己的引用計(jì)數(shù),而是 共享父緩沖區(qū)的引用計(jì)數(shù)科雳。

當(dāng)父緩沖區(qū)release的時(shí)候, 會(huì)引用計(jì)數(shù)清零, 導(dǎo)致該內(nèi)存區(qū)域被回收, 進(jìn)而影響子緩沖區(qū), 導(dǎo)致讀寫(xiě)失敗. 那么需要注意的事, 子緩沖區(qū)需要顯示調(diào)用retain來(lái)提示Netty有其他人在使用, 防止被錯(cuò)誤回收. 這里帶來(lái)額外的壞處是, 所有子緩沖區(qū)在使用完后, 要及時(shí)release, 防止內(nèi)存泄漏.

在前面的場(chǎng)景中可以看到, 解碼器在解碼完后視情況來(lái)決定要不要做內(nèi)存整理, 而整理的過(guò)程會(huì)進(jìn)行數(shù)據(jù)移動(dòng), 且按照整理后的結(jié)果重置read和write索引, 這樣會(huì)影響到子緩沖區(qū)的讀寫(xiě). 下面是個(gè)簡(jiǎn)單的例子, 自己體會(huì). 另外一個(gè)問(wèn)題是, 如果源Buffer提前release, 那么子緩沖區(qū)也會(huì)讀寫(xiě)異常.

ByteBuf source = ByteBufAllocator.DEFAULT.buffer(20, 20);

source.writeInt(1);
source.readInt();

source.writeInt(2);

ByteBuf duplicate = source.duplicate();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
source.discardReadBytes();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
duplicate.readerIndex(0);
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
source:PooledUnsafeDirectByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
source:PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:0
duplicate:UnpooledDuplicatedByteBuf(ridx: 0, widx: 16, cap: 20/20)
duplicate:2

expandCumulation

可以看到解決上面問(wèn)題的方案也是簡(jiǎn)單粗暴, 直接重建一個(gè)ByteBuf, 將數(shù)據(jù)拷貝過(guò)來(lái). 這樣, 之前的子緩沖區(qū)也不會(huì)被之后可能的內(nèi)存整理給影響.

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
    ByteBuf oldCumulation = cumulation;
    cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
    cumulation.writeBytes(oldCumulation);
    oldCumulation.release();
    return cumulation;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尤揣,更是在濱河造成了極大的恐慌,老刑警劉巖柬祠,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件北戏,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡漫蛔,警方通過(guò)查閱死者的電腦和手機(jī)嗜愈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)莽龟,“玉大人蠕嫁,你說(shuō)我怎么就攤上這事√河” “怎么了剃毒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我赘阀,道長(zhǎng)陪拘,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任纤壁,我火速辦了婚禮左刽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘酌媒。我一直安慰自己欠痴,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布秒咨。 她就那樣靜靜地躺著喇辽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪雨席。 梳的紋絲不亂的頭發(fā)上菩咨,一...
    開(kāi)封第一講書(shū)人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音陡厘,去河邊找鬼抽米。 笑死,一個(gè)胖子當(dāng)著我的面吹牛糙置,可吹牛的內(nèi)容都是我干的云茸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼谤饭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼标捺!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起揉抵,我...
    開(kāi)封第一講書(shū)人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤亡容,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后冤今,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體闺兢,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年辟汰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了列敲。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡帖汞,死狀恐怖戴而,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情翩蘸,我是刑警寧澤所意,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響扶踊,放射性物質(zhì)發(fā)生泄漏泄鹏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一秧耗、第九天 我趴在偏房一處隱蔽的房頂上張望备籽。 院中可真熱鬧,春花似錦分井、人聲如沸车猬。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)珠闰。三九已至,卻和暖如春瘫辩,著一層夾襖步出監(jiān)牢的瞬間伏嗜,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工伐厌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留承绸,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓弧械,卻偏偏與公主長(zhǎng)得像八酒,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子刃唐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容