轉(zhuǎn)自:https://blog.csdn.net/wzq6578702/article/details/78826494
在介紹ReplayingDecoder之前 想看一下它的用法储耐,構(gòu)建一個(gè)服務(wù)端和客戶端的模型:
服務(wù)端:
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服務(wù)端initializer:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyReplayingDecoder());//使用ReplayingDecoder
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyServerHandler());
}
}
ServerHandler:
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
ctx.writeAndFlush(654321L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
MyReplayingDecoder:
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyReplayingDecoder decode invoked!");
//注意沒有判斷字節(jié)數(shù)C弁佟!O煊亍!
out.add(in.readLong());
}
}
MyLongToByteEncoder
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encode invoked");
System.out.println(msg);
out.writeLong(msg);
}
}
客戶端:
public class Myclient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().writeAndFlush("hello");
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客戶端Iniatializer:
public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyReplayingDecoder());
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyClientHandler());
}
}
客戶端Handler:
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output "+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123456L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
運(yùn)行服務(wù)端冀偶,之后運(yùn)行客戶端:
服務(wù)端輸出結(jié)果:
MyReplayingDecoder decode invoked!
/127.0.0.1:4448 --> 123456
encode invoked
654321
客戶端輸出結(jié)果:
encode invoked
123456
MyReplayingDecoder decode invoked!
localhost/127.0.0.1:8899
client output 654321
加了下胳施,數(shù)據(jù)傳輸流程分析
/**
* describe: 1. client channelActive 發(fā)送12345
* 2. client 觸發(fā)encode方法發(fā)送到server端
* 3. server 觸發(fā)decode方法
* 4. server channelRead0
* 5. server write flush 發(fā)送出去
* 6. server 觸發(fā)encode方法發(fā)送到客戶端
* 7. clinet 觸發(fā)decode方法
* 8. clinet 觸發(fā)channelRead0
*/
ReplayingDecoder java doc
ByteToMessageDecoder一種特殊變體,它可以在阻塞I / O范例中實(shí)現(xiàn)非阻塞解碼器坟比。
最大的區(qū)別ReplayingDecoder和ByteToMessageDecoder是ReplayingDecoder可以
讓你實(shí)現(xiàn)decode()和decodeLast()方法,就像已經(jīng)獲得所有所需的字節(jié)嚷往,而不是檢查
所需的字節(jié)的可用性葛账。
例如,以下ByteToMessageDecoder實(shí)現(xiàn):
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
使用ReplayingDecoder了如下簡(jiǎn)化:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
這是如何運(yùn)作的间影?
ReplayingDecoder通過一個(gè)專門的ByteBuf實(shí)現(xiàn)其拋出一個(gè)Error的某些類型的時(shí)候有沒有在緩沖區(qū)足夠的數(shù)據(jù)注竿。 在上面的IntegerHeaderFrameDecoder ,您僅假設(shè)調(diào)用buf.readInt()時(shí)緩沖區(qū)中將有4個(gè)或更多字節(jié)魂贬。 如果緩沖區(qū)中確實(shí)有4個(gè)字節(jié)巩割,它將按預(yù)期返回整數(shù)標(biāo)頭。 否則付燥,將引發(fā)Error并將控件返回給ReplayingDecoder 宣谈。 如果ReplayingDecoder捕獲到Error ,則它將把緩沖區(qū)的readerIndex倒回到“初始”位置(即緩沖區(qū)的開頭)键科,并在緩沖區(qū)中收到更多數(shù)據(jù)時(shí)再次調(diào)用readerIndex decode(..)方法闻丑。
請(qǐng)注意, ReplayingDecoder始終會(huì)拋出相同的緩存Error實(shí)例勋颖,以避免創(chuàng)建新Error并為每次拋出填充其堆棧跟蹤的開銷嗦嗡。
局限性
以簡(jiǎn)單為代價(jià), ReplayingDecoder強(qiáng)制您執(zhí)行一些限制:
禁止某些緩沖區(qū)操作饭玲。
如果網(wǎng)絡(luò)速度慢且消息格式復(fù)雜侥祭,則性能可能會(huì)變差,這與上面的示例不同。 在這種情況下矮冬,您的解碼器可能不得不一遍又一遍地解碼消息的相同部分谈宛。
您必須記住,可以多次調(diào)用decode(..)方法來解碼一條消息胎署。 例如吆录,以下代碼將不起作用:
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
正確的實(shí)現(xiàn)如下所示,您還可以利用“檢查點(diǎn)”功能琼牧,下一部分將對(duì)此進(jìn)行詳細(xì)說明恢筝。
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
改善表現(xiàn)
幸運(yùn)的是,使用checkpoint()方法可以顯著提高復(fù)雜解碼器實(shí)現(xiàn)的性能障陶。
該checkpoint()這樣方法更新緩沖區(qū)的“初始”位置ReplayingDecoder倒回readerIndex緩沖劑與在那里你叫的最后一個(gè)位置checkpoint()方法滋恬。
用Enum調(diào)用checkpoint(T)
盡管您可以只使用checkpoint()方法并自己管理解碼器的狀態(tài)聊训,
但是管理解碼器狀態(tài)的最簡(jiǎn)單方法是
創(chuàng)建一個(gè)代表解碼器當(dāng)前狀態(tài)的Enum類型并調(diào)用checkpoint(T)狀態(tài)改變時(shí)的方法抱究。
根據(jù)要解碼的消息的復(fù)雜性,可以根據(jù)需要設(shè)置任意多個(gè)狀態(tài):
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
沒有參數(shù)調(diào)用checkpoint()
管理解碼器狀態(tài)的另一種方法是自己管理带斑。
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
private boolean readLength;
private int length;
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
}
if (readLength) {
ByteBuf frame = buf.readBytes(length);
readLength = false;
checkpoint();
out.add(frame);
}
}
}
用流水線中的另一個(gè)解碼器替換一個(gè)解碼器
如果你打算寫一個(gè)協(xié)議復(fù)用器鼓寺,你可能會(huì)想更換ReplayingDecoder與另一個(gè)(協(xié)議檢測(cè)) ReplayingDecoder , ByteToMessageDecoder或MessageToMessageDecoder (實(shí)際協(xié)議解碼器)勋磕。 不能僅通過調(diào)用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)來實(shí)現(xiàn)此目的妈候,但是需要一些其他步驟:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
幾種常見的編解碼器:
LineBasedFrameDecoder
解碼器,將接收到的ByteBuf在行尾拆分挂滓。
"\n"和"\r\n"都被處理苦银。 有關(guān)基于分隔符的更通用解碼器,請(qǐng)參見DelimiterBasedFrameDecoder
FixedLengthFrameDecoder
解碼器赶站,將接收到的ByteBuf為固定的字節(jié)數(shù)幔虏。 例如,如果您收到以下四個(gè)分段的數(shù)據(jù)包:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+
FixedLengthFrameDecoder (3)會(huì)將它們解碼為以下三個(gè)具有固定長(zhǎng)度的數(shù)據(jù)包:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
DelimiterBasedFrameDecoder
一種解碼器贝椿, ByteBuf通過一個(gè)或多個(gè)定界符對(duì)接收到的ByteBuf拆分想括。 這對(duì)于解碼以分隔符(例如NUL或換行符)結(jié)尾的幀特別有用。
預(yù)定義的分隔符
為了方便起見烙博, Delimiters定義了常用的定界符瑟蜈。
指定多個(gè)定界符
DelimiterBasedFrameDecoder允許您指定多個(gè)定界符。 如果在緩沖區(qū)中找到多個(gè)定界符渣窜,它將選擇產(chǎn)生`最短幀`的定界符铺根。 例如,如果緩沖區(qū)中包含以下數(shù)據(jù):
+--------------+
| ABC\nDEF\r\n |
+--------------+
DelimiterBasedFrameDecoder ( Delimiters.lineDelimiter() )將選擇'\n'作為第一個(gè)定界符并產(chǎn)生兩個(gè)幀:
+-----+-----+
| ABC | DEF |
+-----+-----+
而不是錯(cuò)誤地選擇'\r\n'作為第一個(gè)定界符:
+----------+
| ABC\nDEF |
+----------+
LengthFieldBasedFrameDecoder
解碼器按消息中的length字段的值動(dòng)態(tài)拆分接收到的ByteBuf 乔宿。 當(dāng)您解碼二進(jìn)制消息時(shí)位迂,此消息特別有用,該二進(jìn)制消息具有代表消息正文或整個(gè)消息長(zhǎng)度的整數(shù)頭字段。
LengthFieldBasedFrameDecoder具有許多配置參數(shù)囤官,因此它可以解碼帶有長(zhǎng)度字段的任何消息冬阳,這在專有的客戶端-服務(wù)器協(xié)議中經(jīng)常出現(xiàn)。 以下是一些示例党饮,可讓您基本了解哪個(gè)選項(xiàng)可以執(zhí)行什么操作肝陪。
2字節(jié)長(zhǎng)度的字段,偏移量為0刑顺,不剝離標(biāo)題
在此示例中氯窍,長(zhǎng)度字段的值為12(0x0C) ,代表“ HELLO蹲堂,WORLD”的長(zhǎng)度狼讨。 默認(rèn)情況下,解碼器假定length字段表示在length字段之后的字節(jié)數(shù)柒竞。 因此政供,可以使用簡(jiǎn)單的參數(shù)組合對(duì)其進(jìn)行解碼。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
2個(gè)字節(jié)的長(zhǎng)度字段朽基,偏移量為0布隔,帶頭
因?yàn)槲覀兛梢酝ㄟ^調(diào)用ByteBuf.readableBytes()獲得內(nèi)容的長(zhǎng)度,所以您可能希望通過指定initialBytesToStrip來剝離長(zhǎng)度字段稼虎。 在此示例中衅檀,我們指定2 ,它與length字段的長(zhǎng)度相同霎俩,以剝離前兩個(gè)字節(jié)哀军。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
2個(gè)字節(jié)的長(zhǎng)度字段,偏移量為0打却,不剝離標(biāo)題杉适,該長(zhǎng)度字段表示整個(gè)消息的長(zhǎng)度
在大多數(shù)情況下,長(zhǎng)度字段僅表示消息正文的長(zhǎng)度学密,如前面的示例所示淘衙。 但是,在某些協(xié)議中腻暮,長(zhǎng)度字段表示整個(gè)消息的長(zhǎng)度彤守,包括消息頭。 在這種情況下哭靖,我們指定一個(gè)非零的lengthAdjustment 具垫。 因?yàn)榇耸纠⒅械膌ength值總是比主體長(zhǎng)度大2 ,所以我們將-2指定為lengthAdjustment以進(jìn)行補(bǔ)償试幽。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 (= the length of the Length field)
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
3個(gè)字節(jié)的長(zhǎng)度字段位于5個(gè)字節(jié)的報(bào)頭末尾筝蚕,不剝離報(bào)頭
以下消息是第一個(gè)示例的簡(jiǎn)單變體。 該消息之前會(huì)附加一個(gè)額外的標(biāo)頭值。 lengthAdjustment再次為零起宽,因?yàn)榻獯a器始終在幀長(zhǎng)度計(jì)算過程中考慮前置數(shù)據(jù)的長(zhǎng)度洲胖。
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
5字節(jié)標(biāo)題開頭的3字節(jié)長(zhǎng)度字段,不剝離標(biāo)題
這是一個(gè)高級(jí)示例坯沪,顯示了在length字段和消息正文之間存在一個(gè)額外的標(biāo)頭的情況绿映。 您必須指定一個(gè)正的lengthAdjustment,以便解碼器將額外的標(biāo)頭計(jì)入幀長(zhǎng)度計(jì)算中腐晾。
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
2個(gè)字節(jié)的長(zhǎng)度字段在4字節(jié)標(biāo)題的中間偏移量1處叉弦,除去第一個(gè)標(biāo)題字段和長(zhǎng)度字段
這是以上所有示例的組合。 在length字段之前有前置報(bào)頭藻糖,在length字段之后有多余的報(bào)頭淹冰。 前置標(biāo)頭會(huì)影響lengthFieldOffset ,額外標(biāo)頭會(huì)影響lengthAdjustment 巨柒。 我們還指定了一個(gè)非零的initialBytesToStrip來從幀中剝離長(zhǎng)度字段和前置標(biāo)頭樱拴。 如果不想剝離前置標(biāo)頭,則可以將initialBytesToSkip指定為0 潘拱。
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
2個(gè)字節(jié)的長(zhǎng)度字段位于4字節(jié)標(biāo)題的中間偏移量1處疹鳄,去掉第一個(gè)標(biāo)題字段和length字段拧略,length字段代表整個(gè)消息的長(zhǎng)度
讓我們?cè)賹?duì)前面的示例進(jìn)行一些修改芦岂。 與前一個(gè)示例的唯一區(qū)別是,長(zhǎng)度字段表示整個(gè)消息的長(zhǎng)度垫蛆,而不是消息主體禽最,就像第三個(gè)示例一樣。 我們必須將HDR1的長(zhǎng)度和Length計(jì)入lengthAdjustment中袱饭。 請(qǐng)注意川无,我們不需要考慮HDR2的長(zhǎng)度,因?yàn)閘ength字段已經(jīng)包含了整個(gè)標(biāo)頭長(zhǎng)度虑乖。
lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = -3 (= the length of HDR1 + LEN, negative)
initialBytesToStrip = 3
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
也可以看看:
LengthFieldPrepender