netty介紹
Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol that is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application.
這是netty的官方介紹,大概意思就是:
我們經(jīng)常希望我們的應(yīng)用能夠和其它應(yīng)用互相通信。例如庆猫,我們經(jīng)常使用http請(qǐng)求去查詢(xún)信息或者使用rpc調(diào)用webservice粱挡,但是對(duì)于這種特定的協(xié)議(http只怎,ftp等)來(lái)說(shuō)梳凛,是不易于專(zhuān)門(mén)針對(duì)
自己應(yīng)用程序進(jìn)行擴(kuò)展的磕潮。比方說(shuō)我們不會(huì)使用http協(xié)議去傳輸大文件公罕,郵件器紧,即時(shí)通訊(金融信息),這需要對(duì)現(xiàn)有協(xié)議做出較大的優(yōu)化楼眷!這樣我們就可以使用netty定制屬于你自己的協(xié)議铲汪!
為什么要學(xué)netty?
這里借用知乎上一個(gè)回答:
作為一個(gè)學(xué)Java的罐柳,如果沒(méi)有研究過(guò)Netty掌腰,那么你對(duì)Java語(yǔ)言的使用和理解僅僅停留在表面水平,會(huì)點(diǎn)SSH张吉,寫(xiě)幾個(gè)MVC齿梁,訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)和緩存,這些只是初等Java程序員干的事。如果你要進(jìn)階勺择,想了解Java服務(wù)器的深層高階知識(shí)创南,Netty絕對(duì)是一個(gè)必須要過(guò)的門(mén)檻。有了Netty酵幕,你可以實(shí)現(xiàn)自己的HTTP服務(wù)器扰藕,F(xiàn)TP服務(wù)器,UDP服務(wù)器芳撒,RPC服務(wù)器邓深,WebSocket服務(wù)器,Redis的Proxy服務(wù)器笔刹,MySQL的Proxy服務(wù)器等等芥备。如果你想知道Nginx是怎么寫(xiě)出來(lái)的,如果你想知道Tomcat和Jetty舌菜,Dubbo是如何實(shí)現(xiàn)的萌壳,如果你也想實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Redis服務(wù)器,那都應(yīng)該好好理解一下Netty日月,它們高性能的原理都是類(lèi)似的袱瓮。
while ture
events = takeEvents(fds) // 獲取事件,如果沒(méi)有事件爱咬,線(xiàn)程就休眠
for event in events {
if event.isAcceptable {
doAccept() // 新鏈接來(lái)了
} elif event.isReadable {
request = doRead() // 讀消息
if request.isComplete() {
doProcess()
}
} elif event.isWriteable {
doWrite() // 寫(xiě)消息
}
}
}
NIO的流程大致就是上面的偽代碼描述的過(guò)程尺借,跟實(shí)際真實(shí)的代碼有較多差異,不過(guò)對(duì)于初學(xué)者精拟,這樣理解也是足夠了燎斩。Netty是建立在NIO基礎(chǔ)之上,Netty在NIO之上又提供了更高層次的抽象蜂绎。在Netty里面栅表,Accept連接可以使用單獨(dú)的線(xiàn)程池去處理,讀寫(xiě)操作又是另外的線(xiàn)程池來(lái)處理师枣。Accept連接和讀寫(xiě)操作也可以使用同一個(gè)線(xiàn)程池來(lái)進(jìn)行處理怪瓶。而請(qǐng)求處理邏輯既可以使用單獨(dú)的線(xiàn)程池進(jìn)行處理,也可以跟放在讀寫(xiě)線(xiàn)程一塊處理践美。線(xiàn)程池中的每一個(gè)線(xiàn)程都是NIO線(xiàn)程劳殖。用戶(hù)可以根據(jù)實(shí)際情況進(jìn)行組裝,構(gòu)造出滿(mǎn)足系統(tǒng)需求的并發(fā)模型拨脉。Netty提供了內(nèi)置的常用編解碼器,包括行編解碼器[一行一個(gè)請(qǐng)求]宣增,前綴長(zhǎng)度編解碼器[前N個(gè)字節(jié)定義請(qǐng)求的字節(jié)長(zhǎng)度]玫膀,可重放解碼器[記錄半包消息的狀態(tài)],HTTP編解碼器爹脾,WebSocket消息編解碼器等等Netty提供了一些列生命周期回調(diào)接口帖旨,當(dāng)一個(gè)完整的請(qǐng)求到達(dá)時(shí)箕昭,當(dāng)一個(gè)連接關(guān)閉時(shí),當(dāng)一個(gè)連接建立時(shí)解阅,用戶(hù)都會(huì)收到回調(diào)事件落竹,然后進(jìn)行邏輯處理。Netty可以同時(shí)管理多個(gè)端口货抄,可以使用NIO客戶(hù)端模型述召,這些對(duì)于RPC服務(wù)是很有必要的。Netty除了可以處理TCP Socket之外蟹地,還可以處理UDP Socket积暖。在消息讀寫(xiě)過(guò)程中,需要大量使用ByteBuffer怪与,Netty對(duì)ByteBuffer在性能和使用的便捷性上都進(jìn)行了優(yōu)化和抽象夺刑。總之分别,Netty是Java程序員進(jìn)階的必備神奇遍愿。如果你知其然,還想知其所以然耘斩,一定要好好研究下Netty沼填。如果你覺(jué)得Java枯燥無(wú)謂,Netty則是重新開(kāi)啟你對(duì)Java興趣大門(mén)的鑰匙煌往。
總結(jié):程序員水平進(jìn)階的利器倾哺!
實(shí)踐
note: 對(duì)于本例中除了非常重要的核心類(lèi)會(huì)講解外,其他類(lèi)不會(huì)過(guò)多講解刽脖,本章只做入門(mén)羞海,其它章節(jié)會(huì)重點(diǎn)講解!
我們已經(jīng)知道了netty的作用(靈活優(yōu)化定制你自己的協(xié)議)曲管,以及為什么要學(xué)習(xí)netty却邓。那接下來(lái)我們就一步一步來(lái)定制自己的協(xié)議最后完成聊天室!
print協(xié)議
既然我們?nèi)∶鹥rint協(xié)議院水,那就是打印的意思:服務(wù)端接受客服端的信息并且打永搬恪!
首先我們編寫(xiě)一個(gè)ChannelInboundHandlerAdapter檬某,用于處理接收到的消息撬腾,我們首先分析下這個(gè)類(lèi)的作用,繼承關(guān)系如下:
它的作用簡(jiǎn)單概括就是:用于處理 I/O事件的處理器恢恼,所以本例我們自然是用它來(lái)處理消息民傻,于是乎有了如下類(lèi):PrintServerHandler:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(Charset.forName("utf-8")));
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
收到消息后打印,接著繼續(xù)編寫(xiě)一個(gè)啟動(dòng)類(lèi),用于啟動(dòng)一個(gè)開(kāi)啟我們自己協(xié)議的服務(wù)漓踢,PrintServerApp:
public class EchoServerApp {
private int port;
public EchoServerApp(int port) {
this.port = port;
}
public void run() throws Exception {
NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup();
NioEventLoopGroup workLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossLoopGroup, workLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossLoopGroup.shutdownGracefully();
workLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new EchoServerApp(8080).run();
}
}
啟動(dòng)牵署。然后我們使用win自帶的telnet工具來(lái)測(cè)試(控制面板-》程序和控制-》開(kāi)啟或關(guān)閉window功能,勾選telnet)喧半。打開(kāi)cmd奴迅,輸入
telnet localhost 8080
測(cè)試成功,我們完成了第一個(gè)demo挺据,實(shí)現(xiàn)了自己的print協(xié)議取具。接下來(lái)我們把客戶(hù)端也換成netty編寫(xiě)。目的:?jiǎn)?dòng)客戶(hù)端吴菠,獲取服務(wù)端時(shí)間者填,叫time協(xié)議。
Time Protocol
首先同上面一樣做葵,寫(xiě)一個(gè)TimeServerHandler:
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf timeBuf = ctx.alloc().buffer();
timeBuf.writeBytes(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()).getBytes());
ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
assert channelFuture == future;
// ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
啟動(dòng)類(lèi)同上占哟,接下來(lái),編寫(xiě)客戶(hù)端TimeClientHandler:
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf byteBuf = (ByteBuf) msg;
int length = byteBuf.readableBytes();
byte[] buff = new byte[1024];
byteBuf.readBytes(buff, 0, length);
System.out.println("current time: " + new String(buff, 0, length));
ctx.close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
分別啟動(dòng)服務(wù)端酿矢,客戶(hù)端榨乎。
測(cè)試結(jié)果如圖,客戶(hù)端啟動(dòng)后拿到了服務(wù)端的時(shí)間瘫筐,這樣我們就實(shí)現(xiàn)了自己的time protocol蜜暑,接下來(lái)繼續(xù)擴(kuò)展,編寫(xiě)一個(gè)客戶(hù)端與服務(wù)端通信的聊天室:
chatroom server
首先策肝,客戶(hù)端與服務(wù)端通信的信息我們抽象出一個(gè)對(duì)象肛捍,Message以及工具類(lèi):
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
private String username;
private Date sentTime;
private String msg;
}
public class Utils {
public static String encodeMsg(Message message) {
return message.getUsername() + "~" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message.getSentTime())) + "~" + message.getMsg();
}
public static String formatDateTime(Date time) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time);
}
public static Date parseDateTime(String time) {
try {
return new SimpleDateFormat("yyyy-MM-dd Hh:mm:ss").parse(time);
} catch (ParseException e) {
return null;
}
}
public static void printMsg(Message msg) {
System.out.println("=================================================================================================");
System.out.println(" " + Utils.formatDateTime(msg.getSentTime()) + " ");
System.out.println(msg.getUsername() + ": " + msg.getMsg());
System.out.println("=================================================================================================");
}
}
三個(gè)屬性分別代表用戶(hù)名,發(fā)送時(shí)間之众,消息內(nèi)容拙毫,接著編寫(xiě)一個(gè)用于處理輸入消息的handler,用于將byte消息轉(zhuǎn)換成Message棺禾,ServerTransferMsgHandler:
public class ServerTransferMsgHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String totalMsg = in.readCharSequence(in.readableBytes(), Charset.forName("utf-8")).toString();
String[] content = totalMsg.split("~");
out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2]));
}
}
接著缀蹄,編寫(xiě)一個(gè)處理接收消息的Handler,用于打印客戶(hù)端發(fā)送過(guò)來(lái)的消息膘婶,ServerMsgHandler:
public class ServerMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("jsbintask-client進(jìn)入聊天室缺前。");
Message message = new Message(Constants.SERVER, new Date(), "Hello, I'm jsbintask-server side.");
ByteBuf buffer = ctx.alloc().buffer();
String content = Utils.encodeMsg(message);
buffer.writeBytes(content.getBytes());
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg1) throws Exception {
try {
Message msg = (Message) msg1;
Utils.printMsg(msg);
Scanner scanner = new Scanner(System.in);
System.out.print("jsbintask-server, please input msg: ");
String reply = scanner.nextLine();
Message message = new Message(Constants.SERVER, new Date(), reply);
ctx.writeAndFlush(message);
} finally {
ReferenceCountUtil.release(msg1);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
知道注意的是,channelActive方法悬襟,在客戶(hù)端鏈接的時(shí)候衅码,率先給客戶(hù)端發(fā)送了一條消息,最后脊岳,在編寫(xiě)一個(gè)用戶(hù)將服務(wù)端Message轉(zhuǎn)成Byte消息的handler逝段,MessageEncoder:
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
String content = Utils.encodeMsg(message);
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
}
}
最后筛璧,編寫(xiě)server端啟動(dòng)類(lèi),ChatroomServerApp:
public class ChatroomServerApp {
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MessageEncoder(), new ServerTransferMsgHandler(), new ServerMsgHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024 * 10)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
啟動(dòng)Server惹恃,繼續(xù)編寫(xiě)ChatroomClient。
chatroom client
同server端一樣棺牧,client的關(guān)鍵也是handler巫糙,ClientMsgHandler如下:
public class ClientMsgHandler extends SimpleChannelInboundHandler<Message> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
try {
Utils.printMsg(msg);
Scanner scanner = new Scanner(System.in);
System.out.print("jsbintask-client, please input msg: ");
String reply = scanner.nextLine();
Message message = new Message(Constants.CLIENT, new Date(), reply);
ByteBuf buffer = ctx.alloc().buffer();
String content = message.getUsername() + "~" + Utils.formatDateTime(message.getSentTime()) + "~" + message.getMsg();
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
接著,同樣有將byte轉(zhuǎn)換成Message的轉(zhuǎn)換器颊乘,CliengMsgHandler:
public class ClientTransferMsgHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] buff = new byte[2024];
int length = in.readableBytes();
in.readBytes(buff, 0, length);
String totalMsg = new String(buff, 0, length, StandardCharsets.UTF_8);
String[] content = totalMsg.split("~");
out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2]));
}
}
最后参淹,啟動(dòng)類(lèi)ChatroomClientApp:
public class ChatroomClientApp {
public static void main(String[] args) throws Exception {
NioEventLoopGroup workLoopGroup = new NioEventLoopGroup();
try {
Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(workLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientTransferMsgHandler(), new ClientMsgHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = clientBootstrap.connect("localhost", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workLoopGroup.shutdownGracefully();
}
}
}
同樣啟動(dòng)client,觀察控制臺(tái)乏悄。首先浙值,server端提示client進(jìn)入了聊天室,并且客戶(hù)端看到了server端發(fā)送過(guò)來(lái)的”招呼“信息:
這樣就代表我們的鏈接建立完畢檩小,接著开呐,客戶(hù)端口予,服務(wù)端相互發(fā)送消息:
如圖桐智,這樣,我們的聊天室也就編寫(xiě)成功了记靡,完整demo如下:
[圖片上傳失敗...(image-1cea11-1548903448254)]
總結(jié)
本章阻肿,我們開(kāi)啟了學(xué)習(xí)netty的大門(mén)瓦戚,首先介紹了netty,為什么要學(xué)netty丛塌,并且通過(guò)三個(gè)案例一步一步實(shí)現(xiàn)了聊天室较解,成功踏入了netty的大門(mén),下一章赴邻,我們就來(lái)學(xué)習(xí)一下netty的架構(gòu)印衔!
例子源碼:https://github.com/jsbintask22/netty-learning.git,歡迎fork乍楚,star學(xué)習(xí)修改当编。
本文原創(chuàng)地址:https://jsbintask.cn/2019/01/30/netty/netty-chatroom/,轉(zhuǎn)載請(qǐng)注明出處徒溪。
如果你覺(jué)得本文對(duì)你有用忿偷,歡迎關(guān)注,分享臊泌!