0.what is Netty?
簡(jiǎn)單來(lái)說(shuō)啥箭,Netty是異步的谍珊,事務(wù)驅(qū)動(dòng)的,高性能的NIO框架
Netty官網(wǎng)
其中有詳細(xì)的netty介紹急侥,剛開始學(xué)習(xí)砌滞,有什么不對(duì),歡迎指出坏怪。
1.Netty中幾個(gè)比較重要的概念
1.0 Handler
Handler其實(shí)就是事件的處理器贝润,Netty通過(guò)Channel讀入請(qǐng)求內(nèi)容后會(huì)分配給Handler進(jìn)行事件處理,Handler能夠處理的事件包括:數(shù)據(jù)接收铝宵,異常處理打掘,數(shù)據(jù)轉(zhuǎn)換华畏,編碼解碼等問(wèn)題,其中包含兩個(gè)非常重要的接口ChannelInboundHandler尊蚁,ChannelOutboundHandler亡笑,前者負(fù)責(zé)處理客戶端發(fā)送到服務(wù)端的請(qǐng)求,后者反之横朋。關(guān)于Handler執(zhí)行順序的一些介紹可以看一看這篇文章: handler的執(zhí)行順序
在這個(gè)地方有一個(gè)值得注意的點(diǎn)仑乌,無(wú)論是InboundHandler還是OutboundHandler都不適合用于做耗時(shí)操作,官方要求耗時(shí)操作應(yīng)當(dāng)使用單獨(dú)的EventExcutorGroup+專門的Handler來(lái)進(jìn)行操作
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
1.1 Channel
這里的Channel的概念和NIO中Channel的概念是一樣的琴锭,相當(dāng)于一個(gè)Socket連接
1.2 Bootstrap
Bootstrap其實(shí)就是Netty服務(wù)的啟動(dòng)器晰甚,服務(wù)端使用的是ServerBootstrap,客戶端使用的是Bootstrap,我們可以通過(guò)配置Bootstrap來(lái)配置Netty使用哪種的Channel决帖,Group厕九,Handler和Encoder,Decoder……
1.3 LoopGroup
一個(gè)LoopGroup可以包含多個(gè)EventLoop古瓤,我目前的理解是將其理解為一個(gè)線程池止剖,其中的EventLoop為其中的線程
1.4 ChannelFuture
這點(diǎn)在官方的Guide中也有提到,在Netty中落君,所有的處理都是異步的穿香,因此需要一個(gè)Future對(duì)象,可以注冊(cè)監(jiān)聽在異步線程處理完以后進(jìn)行一些處理
2.HelloWorld
2.0 Server端代碼
public class Server{
public void start(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventExecutorGroup bussinessGroup = new DefaultEventExecutorGroup(16);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//使用哪一種Channel
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加http請(qǐng)求所需要的編碼器與解碼器
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpRequestDecoder());
//ch.pipeline().addLast(new ServerOutboundHandler()); //如果存在OutboundHandler則必須在最后一個(gè)inbound前面
ch.pipeline().addLast(new ServerHandler());
//耗時(shí)操作使用的group和專門的handler
ch.pipeline().addLast(bussinessGroup, "handler", new BusinessHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port);
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
Server server = new Server();
server.start(8090);
}
}
2.1 ServerHandler
public class ServerHandler extends ChannelInboundHandlerAdapter {
private HttpRequest request;
private HttpContent content;
private FullHttpResponse response;
private StringBuilder sb = new StringBuilder();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
System.out.println(request.uri());
System.out.println("request received success");
sb.append("request received success \n");
}
if (msg instanceof HttpContent) {
content = (HttpContent) msg;
ByteBuf buf = content.content();
String result = buf.toString(CharsetUtil.UTF_8);
System.out.println(result);
buf.release();
System.out.println("content received success");
sb.append("content received success");
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(sb.toString().getBytes()));
ctx.writeAndFlush(response);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_GATEWAY);
ctx.writeAndFlush(response);
ctx.close();
}
}
3. 請(qǐng)求服務(wù)器
3.0 使用postman來(lái)請(qǐng)求服務(wù)器
我使用post方式隨意發(fā)送了任意內(nèi)容到服務(wù)器绎速,能夠得到返回內(nèi)容皮获,并且idea控制臺(tái)輸出如下,表明可以獲得postman發(fā)送的數(shù)據(jù)
3.1 使用NettyClient訪問(wèn)Netty服務(wù)器
3.1.0 Client
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8090).sync();
Channel ch = f.channel();
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "http://127.0.0.1:8090", Unpooled.wrappedBuffer("{ \"test\" : \"test\" }".getBytes())); //發(fā)送請(qǐng)求道服務(wù)端
request.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, 1024); //必須設(shè)置Content length否則服務(wù)端收不到content
System.out.println(request.content().toString(CharsetUtil.UTF_8));
ch.writeAndFlush(request);
//wait until server to close the connection
ch.closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
3.1.1 ClientHandler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
HttpContent content;
HttpResponse response;
if (msg instanceof HttpResponse) {
response = (HttpResponse) msg;
System.out.println(response.status().toString());
}
if (msg instanceof HttpContent) {
content = (HttpContent) msg;
ByteBuf buf = content.content();
String responseContent = buf.toString(CharsetUtil.UTF_8);
System.out.println(responseContent);
buf.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
以上代碼也能獲得和postman訪問(wèn)相同的效果
4.其中思考的問(wèn)題
- Q:耗時(shí)的業(yè)務(wù)邏輯操作應(yīng)該放在哪里
A:使用EventExcutorGroup和單獨(dú)的Handler來(lái)實(shí)現(xiàn)業(yè)務(wù)邏輯代碼(在上面的代碼中有提到) - Q :如何將InboundMessageHandler讀到的msg傳到其他Handler
A:使用ctx.pipeline().channel().attr()可以設(shè)置屬性來(lái)在Handler傳遞屬性值