有時候簡化實現(xiàn)別人的代碼唆迁,有助于你更好的理解代碼逢享,不要一味地讀源代碼。
問題來源
客戶端往服務器發(fā)送小文件
解決思路
1秀又、使用netty(廢話)
2单寂、只是用ByteBuf
3、自定義一種協(xié)議吐辙,用最小的網(wǎng)絡代價完成數(shù)據(jù)傳送
實現(xiàn)
其實netty有很多的定義好的協(xié)議來解決各種各樣的問題宣决,這篇文章來自《netty權威指南》作者李林峰,詳細介紹了netty的編解碼框架昏苏,以及一些常用的編解碼協(xié)議尊沸。
在解決這個問題的時候威沫,我遇到的一個主要問題就是我在客戶端發(fā)送一個數(shù)據(jù)包,這個數(shù)據(jù)包的大小可以很大洼专,但是如果只用簡單的channelRead去讀取數(shù)據(jù)的話得到的數(shù)據(jù)并不是完整的棒掠。具體原因參考netty用戶指南中的tcp stream-based傳輸?shù)膯栴}。
我先做了一個簡單的協(xié)議設計:
packet = |文件名長度|文件名|文件字節(jié)長度|文件字節(jié)流|
于是就有了客戶端發(fā)送的簡單代碼
String name = "diagram.png";
FileInputStream fileInputStream = new FileInputStream(new File("src/main/resources/diagram.png"));
byte[] bytes = new byte[fileInputStream.available()];
fileInputStream.read(bytes);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt("diagram.png".getBytes().length);
byteBuf.writeBytes("diagram.png".getBytes());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
channelFuture.channel().writeAndFlush(byteBuf);
這樣發(fā)送沒有問題屁商,因為byteBuf是動態(tài)擴展的烟很。但是接受的時候就有問題了。如果我們接受比較小的蜡镶,比如一個int雾袱,我們可以直接這樣寫
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf)msg;
if(byteBuf.readableBytes() > 4)
{
int result = byteBuf.readInt();
}
}
}
但是當長度很大的時候,我們就需要解決讀半包的問題了官还。直到讀到完整的數(shù)據(jù)才進行處理芹橡。但是每次收到的數(shù)據(jù)怎么去判斷是不是和上一個數(shù)據(jù)是連續(xù)的,如何在沒有收集到完整數(shù)據(jù)時不處理數(shù)據(jù)而繼續(xù)接受呢望伦?這是我一直困擾的問題林说。因為我把每一個byteBuf當成一個message來想了,其實不是的屯伞,ByteBuf中有兩個指針readIndex和writeIndex腿箩,readIndex永遠小于writeIndex。大概如下圖所示
在netty的設計中ByteBuf是可以被重用的劣摇,所以可能針對這一個ChannelRead一直讀取的是同一個ByteBuf度秘。這其中readrIndex之前的是已經讀取過的,就是已經被調用readXXX()之后的數(shù)據(jù)饵撑,可以重新去讀取剑梳,readerIndex和writerIndex之前的是當前的readableBytes,writerIndex到capacity的是writeableBytes滑潘,當writerIndex超過capacity時就會擴展垢乙。同時為了重用這部分空間,當調用discardBytes時语卤,會把readerIndex和writerIndex拷貝到開頭追逮,這樣前面廢棄的部分就被重用了,也一定程度場避免了擴容粹舵,節(jié)省了空間钮孵。
那如何針對上面的輸入寫B(tài)yteBuf的解碼呢?
先看看netty自帶的解碼器怎么解決這個問題眼滤,其中LengthFieldBasedFrameDecoder就是用來解決這一類的問題的巴席。在李林峰的文章中有詳細介紹,這里就不贅述了诅需。
我在之前代碼的基礎上添加了兩行代碼漾唉。
//在服務器的pipeline中添加的這個解碼器荧库,然后用4個字節(jié)表示整個包的長度,并且廢棄掉這四個字節(jié)赵刑。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));
//在發(fā)送的byteBuf頭部添加真?zhèn)€包的長度
byteBuf.writeInt(4+ name.getBytes().length +4+ bytes.length);
然后我再在ChannelRead中處理剩下的數(shù)據(jù)
packet = |文件名長度|文件名|文件字節(jié)長度|文件字節(jié)流|
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf)msg;
int nameSize = byteBuf.readInt();
String name = new String(byteBuf.readBytes(nameSize).array(), "UTF-8");
int fileSize = byteBuf.readInt();
FileOutputStream fileOutputStream = new FileOutputStream(new File(name));
fileOutputStream.write(byteBuf.readBytes(fileSize).array());
System.out.println(name + " " + fileSize);
}
問題解決分衫,但是自己如何實現(xiàn)這個解碼器呢?先看看netty怎么實現(xiàn)的般此。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
好長蚪战。。里面對于不合理的協(xié)議做了很多假設铐懊,并使不合理的輸入快速失敗屎勘。但是讓我一個初學者寫還是寫不出來。所以我假設協(xié)議就是我設計的那樣居扒,簡化這部分代碼,便于理解丑慎。
變量給一個固定值
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
private int maxFrameLength = 1024*10;
private int lengthFieldLength = 4;
private int initialBytesToStrip = 0;
private long tooLongFrameLength;
private long bytesToDiscard;
private boolean failFast = true;
然后寫decode函數(shù)喜喂,就這么簡單。竿裂。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
int frameLength = (int) in.getUnsignedInt(0);//獲取頭部
if(in.readableBytes() < frameLength)//當ByteBuf沒有達到長度時玉吁,return null
{
return null;
}
in.skipBytes(4);//舍棄頭部
int index = in.readerIndex();
ByteBuf frame = in.slice(index, frameLength).retain();//取出自己定義的packet包返回給ChannelRead
in.readerIndex(frameLength);//這一步一定要有,不然其實bytebuf的readerIndex沒有變腻异,netty會一直從這里開始讀取进副,將readerIndex移動就相當于把前面的數(shù)據(jù)處理過了廢棄掉了。
return frame;
}
所以其實我們只要不處理bytebuf的數(shù)據(jù)知道可以讀的數(shù)據(jù)達到我們需要的長度在處理就可以了悔常。當然包的順序不會出錯是由底層tcp保證的影斑,不用關心。