額
dubbo淺嘗輒止曲管,后期深入學習還會繼續(xù)跟進寫記事本院水。因為我司又要自己搞個消息隊列中間件檬某。和骨灰級玩家螟蝙,結對編程啪提呢三人一起組隊胰默。初期決定基于netty封裝,所以菜雞的我還是決定笨鳥先飛一手漏隐,沒錯奴迅,我學新東西還是喜歡邊動手邊學,不然一直學理論我真的覺得慌張脖隶。領導說離職他不反對产阱,但是做這個中間件是對自己的一個提升构蹬,也是簡歷里漂亮的一筆。我該如何抉擇······
和上一篇一樣,這僅僅是個敘事文铐姚,希望未來我能進步肛捍,變成議論文拙毫,散文,詩歌峭跳。
Netty簡介
基于NIO非阻塞。缺前。蛀醉。。Netty介紹很詳細的那種
第一步:自定義協(xié)議
小白的我第一次聽見這樣的詞語真的是覺得NB衅码,V5拯刁,但當你漸漸對網(wǎng)路傳輸有一點點啟蒙你就會發(fā)現(xiàn)協(xié)議不過是規(guī)則,它本身并不神秘逝段,厲害在于它用小小的規(guī)則徜徉在網(wǎng)絡的海洋里而不出錯垛玻。HTTP發(fā)展至今才發(fā)展到2版本,但是它的官方API有成百上千頁奶躯。消息在網(wǎng)絡間傳輸帚桩,用的是二進制嘹黔,協(xié)議本身就是讓一堆雜亂無章的01變得有意義朗儒。電腦太智障,你必須告訴他0~4
這幾位是什么意思,5~9
這幾位是什么意思醉锄,這就是協(xié)議乏悄。而netty簡化了我們創(chuàng)建Socket,使用NIO的過程恳不。必開了一些晦澀難懂的底層概念檩小。
今天要設計的協(xié)議叫Luck協(xié)議(很多教程上都叫這個)
因為我們中間件本意是為了傳輸文件,實現(xiàn)斷點續(xù)傳烟勋。所以字段名字hhh
header
/**
* 消息開頭信息
*/
private int headerData = ConstantValue.HEAD_DATA;
/**
* 消息體長度
*/
private int contentLength;
/**
* 文件名長度
*/
private byte nameLength;
/**
* 文件名
*/
private String fileName;
傳輸體
/**
* header
*/
private LuckHeader luckHeader;
/**
* 文件二進制
*/
private byte[] content;
協(xié)議就定好了规求,還是一個可變長的頭部呢厲害哦。
第二步:定義編解碼器
編碼器LuckEncoder
這個就簡單了按自己定義的協(xié)議意義一樣一樣的write進ByteBuf里
public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {
@Override
protected void encode(final ChannelHandlerContext ctx, final LuckMessage msg, final ByteBuf out) throws Exception {
out.writeInt(msg.getLuckHeader().getHeaderData());
out.writeInt(msg.getLuckHeader().getContentLength());
out.writeByte(msg.getLuckHeader().getNameLength());
if (msg.getLuckHeader().getNameLength() > 0){
out.writeBytes(msg.getLuckHeader().getFileName().getBytes());
}
out.writeBytes(msg.getContent());
}
}
解碼器LuckDecoder
這個就比較復雜了卵惦,要考慮到粘包斷包的問題阻肿,其實也不是很復雜,就是嚴格的根據(jù)你定義的協(xié)議一點點的去解析沮尿,唯一要注意的就是你的每一次read操作都會導致readerIndex的后移丛塌,控制好readerIndex就不會有粘包斷包的問題
public class LuckDecoder extends ByteToMessageDecoder {
public final int BASE_LENGTH = 4 + 4 + 1;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= BASE_LENGTH) {
// if(in.readableBytes()>2048){
// in.skipBytes(in.readableBytes());
// }
int beginReader;
while (true) {
beginReader = in.readerIndex();
in.markReaderIndex();
if (in.readInt() == ConstantValue.HEAD_DATA) {
break;
}
in.resetReaderIndex();
in.readByte();
if (in.readableBytes() < BASE_LENGTH) {
return;
}
}
int contentLength = in.readInt();
byte nameLength = in.readByte();
if(in.readableBytes()<contentLength+nameLength){
in.resetReaderIndex();
return;
}
byte[] content = new byte[contentLength];
byte[] fileName = new byte[nameLength];
if (nameLength > 0) {
in.readBytes(fileName);
}
in.readBytes(content);
LuckMessage data = nameLength > 0 ?
new LuckMessage(new LuckHeader(contentLength, nameLength, new String(fileName)), content)
: new LuckMessage(new LuckHeader(contentLength), content);
out.add(data);
}
}
}
第三步:定義InboundHandler、OutboundHandler
這一部分就算是業(yè)務的過濾器了畜疾,編解碼后做一些業(yè)務處理
InboundHandler
public class LuckInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
LuckMessage message=(LuckMessage)msg;
System.out.println("server接受的信息為"+message.toString());
}finally {
ReferenceCountUtil.release(msg);
}
}
}
OutboundHandler
public class LuckOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("client發(fā)送消息:"+((LuckMessage)msg).toString());
super.write(ctx,msg,promise);
}
}
第四步:創(chuàng)建LuckInitializer
其實就是在channel的pipeline中添加一層層的handler
public class LuckInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new LuckEncoder());
pipeline.addLast(new LuckDecoder());
pipeline.addLast(new LuckInboundHandler());
pipeline.addLast(new LuckOutboundHandler());
}
}
第五步:就可以創(chuàng)建server和client發(fā)消息啦
server
public class Server {
private static final int PORT = 8888;
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些屬性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一個NIO連接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new LuckInitializer());
// 綁定對應的端口號,并啟動開始監(jiān)聽端口上的連接
Channel ch = serverBootstrap.bind(PORT).sync().channel();
System.out.printf("luck協(xié)議啟動地址:127.0.0.1:%d/\n", PORT);
// 等待關閉,同步端口
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
client
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LuckInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
int version = 1;
String content = "I'm the luck protocol!I'm the luck protocol!" +
"I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!I'm the luck protocol!";
String name = "nihao";
LuckHeader header = new LuckHeader(content.length(), (byte) name.length(), name);
LuckMessage message = new LuckMessage(header, content.getBytes());
ch.write(message);
ch.write(message);
ch.write(message);
ch.writeAndFlush(message);
ch.close();
} finally {
group.shutdownGracefully();
}
}
}