網(wǎng)絡編程框架Netty的介紹與使用
一杯活、簡介
Netty的官網(wǎng)https://netty.io/
Netty是一個為了快速開發(fā)可維護的高性能協(xié)議處理器與客戶端的異步事件驅動的網(wǎng)絡應用框架
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty 是一個基于NIO的客戶整葡、服務器端的編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個網(wǎng)絡應用,例如實現(xiàn)了某種協(xié)議的客戶、服務端應用浦译。Netty相當于簡化和流線化了網(wǎng)絡應用的編程開發(fā)過程,例如:基于TCP和UDP的socket服務開發(fā)溯职。
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
“快速”和“簡單”并不用產(chǎn)生維護性或性能上的問題精盅。Netty 是一個吸收了多種協(xié)議(包括FTP、SMTP谜酒、HTTP等各種二進制文本協(xié)議)的實現(xiàn)經(jīng)驗叹俏,并經(jīng)過相當精心設計的項目。最終僻族,Netty 成功的找到了一種方式粘驰,在保證易于開發(fā)的同時還保證了其應用的性能,穩(wěn)定性和伸縮性述么。
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
BIO蝌数、NIO、AIO的區(qū)別:
同步與阻塞的區(qū)別:
這點可以參考知乎https://www.zhihu.com/question/19732473:
同步與異步的關注的是消息通信機制度秘,所謂同步顶伞,就是在發(fā)出一個調用時,在沒有得到結果之前剑梳,該調用就不返回唆貌。但是一旦調用返回,就得到返回值了阻荒。換句話說挠锥,就是由調用者主動等待這個調用的結果众羡。而異步則是相反侨赡,調用在發(fā)出之后,這個調用就直接返回了粱侣,所以沒有返回結果羊壹。換句話說,當一個異步過程調用發(fā)出后齐婴,調用者不會立刻得到結果油猫。而是在調用發(fā)出后,被調用者通過狀態(tài)柠偶、通知來通知調用者情妖,或通過回調函數(shù)處理這個調用睬关。
阻塞與非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態(tài).阻塞調用是指調用結果返回之前毡证,當前線程會被掛起电爹。調用線程只有在得到結果之后才會返回。非阻塞調用指在不能立刻得到結果之前料睛,該調用不會阻塞當前線程丐箩,當前線程可以去處理其他任務。
下面這部分參考了http://www.reibang.com/p/a4e03835921a
-
BIO(Block IO)
BIO是同步阻塞式的IO恤煞,數(shù)據(jù)的讀取寫入必須阻塞在一個線程內等待其完成屎勘。采用BIO通信模式的應用,通尘影牵客戶端需要開兩個線程:一個線程負責監(jiān)聽服務器發(fā)過來的消息概漱,一個線程負責讀取用戶的輸入并發(fā)送消息。服務器有一個線程負責監(jiān)聽客戶端的連接苔货,多個線程處理客戶端的信息犀概。通常一個客戶端一個線程。這種方式如果有大量請求同時連接的話要創(chuàng)建大量線程夜惭,增加了服務器的壓力姻灶,雖然可以通過線程池機制來改善,但仍然是治標不治本的措施诈茧,只能適用于并發(fā)請求量很小的情況产喉。
-
NIO(Non-block IO)
NIO是同步非阻塞式的IO,NIO引入了 Channel , Selector敢会,Buffer等抽象對象曾沈。NIO 是直接面向緩沖區(qū)(ByteBuffer),而傳統(tǒng)的IO面向流的鸥昏。NIO 通過Channel(通道) 進行讀寫塞俱,通道是雙向的,可讀也可寫吏垮,而流的讀寫是單向的障涯。無論讀寫,通道只能和Buffer交互膳汪。因為 Buffer唯蝶,通道可以異步地讀寫。Selector(選擇器)用于使用單個線程處理多個通道遗嗽。因此粘我,它需要較少的線程來處理這些通道。線程之間的切換對于操作系統(tǒng)來說是昂貴的痹换。 因此征字,為了提高系統(tǒng)效率選擇器是有用的都弹。
NIO通常有兩個線程,每個線程綁定一個輪詢器(selector)匙姜,A輪詢器負責輪詢是否有新的連接缔杉,B輪詢器負責輪詢連接是否有數(shù)據(jù)可讀。服務端監(jiān)測到新的連接之后搁料,不再創(chuàng)建一個新的線程或详,而是直接將新連接綁定到B輪詢器上。
-
AIO(Asynchronous IO)
AIO是異步非阻塞的IO郭计,異步 IO 是基于事件和回調機制實現(xiàn)的霸琴,也就是應用操作之后會直接返回,不會堵塞在那里昭伸,當后臺處理完成梧乘,操作系統(tǒng)會通知相應的線程進行后續(xù)的操作。
二庐杨、Netty實戰(zhàn)
1选调、添加依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
2、Netty群發(fā)消息
2.1 Netty的線程模型
單線程模型 - 只有一個線程處理所有客戶端的所有請求
多線程模型 - 有一個線程池處理多個客戶端的所有請求
主從線程模型(商用) - 主線程池的線程用來處理客戶端的連接請求灵份,從線程池的線程用來處理客戶端的消息請求
服務器
public static void main(String[] args) {
//創(chuàng)建兩個主從線程池
EventLoopGroup master = new NioEventLoopGroup();
EventLoopGroup slave = new NioEventLoopGroup();
//創(chuàng)建服務器的初始化引導對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置引導對象
serverBootstrap
//設置當前Netty的線程模型
.group(master, slave)
//設置Channel的類型
.channel(NioServerSocketChannel.class)
//設置事件處理器 -- 重要
.childHandler(new ServerChannelHandler());
//綁定端口
ChannelFuture future = serverBootstrap.bind(8080);//綁定這個動作其實是一個異步的動作
try {
future.sync();//同步阻塞
System.out.println("端口綁定完成仁堪,服務已經(jīng)啟動!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
服務器的事件處理器:
@ChannelHandler.Sharable
public class ServerChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
List<Channel> channels = new ArrayList<Channel>();
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("有一個客戶端連接了服務器填渠!");
channels.add(ctx.channel());
}
//消息處理的方法
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
System.out.println("接收到客戶端的消息:" + byteBuf.toString(Charset.forName("UTF-8")));
//將消息群發(fā)給其他的客戶端
for (Channel channel : channels) {
if(channel != ctx.channel()){
ByteBuf buf = Unpooled.copiedBuffer(byteBuf);
channel.writeAndFlush(buf);
}
}
}
}
客戶端
public class NettyClient {
public static void main(String[] args) {
//創(chuàng)建引導對象
Bootstrap bootstrap = new Bootstrap();
//設置線程模型
bootstrap
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
//設置服務端消息處理器
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("接收到服務端的消息:" + byteBuf.toString(Charset.forName("UTF-8")));
}
});
//連接服務器
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
try {
future.sync();
System.out.println("連接服務器成功弦聂!");
//給服務器循環(huán)發(fā)送消息
Scanner scanner = new Scanner(System.in);
while(true){
System.out.println("請輸入發(fā)送的內容:");
String content = scanner.next();
//發(fā)送消息到服務端
Channel channel = future.channel();//和服務器的連接對象
byte[] bytes = content.getBytes("UTF-8");
ByteBuf byteBuf = Unpooled.buffer(bytes.length);
byteBuf.writeBytes(bytes);
channel.writeAndFlush(byteBuf);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.2 Netty的事件處理器
Netty中的消息按照方向可以分為兩類:出站消息與入站消息。在Netty中可以配置事件處理器鏈氛什,對出站消息和入站消息進行處理莺葫。 入站處理器 - 繼承SimpleChannelInboundHandler類 出站處理器 - 繼承ChannelOutboundHandlerAdapter類。
事件處理器最常用的是編碼與解碼枪眉,常見的編碼與解碼器有:
pipeline.addLast(new StringDecoder()); - 消息的解碼捺檬,將ByteBuf轉換成
Stringpipeline.addLast(new StringEncoder()); - 消息的編碼,將String轉成ByteBuf
pipeline.addLast(new LineBasedFrameDecoder(1024 * 1024)); - 按行解決拆包贸铜、粘包問題的解碼器
public class NettyServer {
public static void main(String[] args) {
//主線程池 處理客戶端的連接請求
EventLoopGroup master = new NioEventLoopGroup();
//從線程池 處理客戶端的消息請求
EventLoopGroup slave = new NioEventLoopGroup();
//創(chuàng)建服務器的初始化引導對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
//設置netty的線程模型:單線程堡纬、多線程、主從線程池(主線程池負責連接萨脑,從線程池負責消息的發(fā)送)
.group(master,slave)
//設置管道類型
.channel(NioServerSocketChannel.class)
//設置子線程池的事件處理
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//事件處理器鏈的使用
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ServerChannelHandler());
}
});
//綁定端口是一個異步動作
ChannelFuture future = serverBootstrap.bind(8080);
try {
//同步
future.sync();
System.out.println("綁定端口已完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3隐轩、Netty對HTTP的支持
添加了三個事件處理器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
public class HttpNettyServer {
public static void main(String[] args) {
//主線程池 處理客戶端的連接請求
EventLoopGroup master = new NioEventLoopGroup();
//從線程池 處理客戶端的消息請求
EventLoopGroup slave = new NioEventLoopGroup();
//創(chuàng)建服務器的初始化引導對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
//設置netty的線程模型:單線程饺饭、多線程渤早、主從線程池(主線程池負責連接,從線程池負責消息的發(fā)送)
.group(master,slave)
//設置管道類型
.channel(NioServerSocketChannel.class)
//設置子線程池的事件處理
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//事件處理器鏈的使用
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(1024*1024));
pipeline.addLast(new HttpChannelHandler());
}
});
//綁定端口是一個異步動作
ChannelFuture future = serverBootstrap.bind(8080);
try {
//同步
future.sync();
System.out.println("綁定端口已完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
HttpChannelHandler
package com.qianfeng.http;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import java.nio.charset.StandardCharsets;
public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
System.out.println(fullHttpRequest.method());
System.out.println(fullHttpRequest.uri());
System.out.println(fullHttpRequest.headers());
System.out.println(fullHttpRequest.content().toString(StandardCharsets.UTF_8));
}
}
3.1 使用Netty編寫一個Http文件服務器
服務器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
public class HttpNettyServer {
public static void main(String[] args) {
//主線程池 處理客戶端的連接請求
EventLoopGroup master = new NioEventLoopGroup();
//從線程池 處理客戶端的消息請求
EventLoopGroup slave = new NioEventLoopGroup();
//創(chuàng)建服務器的初始化引導對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
//設置netty的線程模型:單線程瘫俊、多線程鹊杖、主從線程池(主線程池負責連接悴灵,從線程池負責消息的發(fā)送)
.group(master,slave)
//設置管道類型
.channel(NioServerSocketChannel.class)
//設置子線程池的事件處理
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//事件處理器鏈的使用
ChannelPipeline pipeline = channel.pipeline();
//用于傳輸文件
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpServerCodec());
//http對象聚合處理器
pipeline.addLast(new HttpObjectAggregator(1024*1024));
//自定義的處理器
pipeline.addLast(new HttpChannelHandler());
}
});
//綁定端口是一個異步動作
ChannelFuture future = serverBootstrap.bind(80);
try {
//同步
future.sync();
System.out.println("綁定端口已完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
自定義的處理器
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedNioFile;
import java.io.File;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final String path = "d:\\ceshi";
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
//如果請求方式不是get
if(!"get".equalsIgnoreCase(fullHttpRequest.method().toString())){
setError(channelHandlerContext,"不支持除get外的其他請求方式");
return;
}
//獲得請求路徑
String uri = fullHttpRequest.uri();
//中文的處理
uri = URLDecoder.decode(uri, "UTF-8");
File file = new File(path,uri);
//如果請求的文件不存在
if(!file.exists()){
setError(channelHandlerContext,"對不起,您訪問的資源不存在");
return;
}
//如果是目錄
if(file.isDirectory()){
directoryHandler(channelHandlerContext,file);
}
//如果是文件
else if(file.isFile()){
fileHandler(channelHandlerContext,file);
}
}
private void setError(ChannelHandlerContext ctx,String error){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
//告訴瀏覽器響應體中的內容是什么類型
response.headers().add("Content-Type","text/html;charset=utf-8");
response.content().writeBytes(("<html><head><meta charset=\"UTF-8\"></head><body>"+error+"</body></html>").getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(response);
ctx.close();
}
private void fileHandler(ChannelHandlerContext ctx,File file){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//設置下載的響應頭
response.headers().add("Content-Type","application/octet-stream");
response.headers().add("Content-Length",file.length());
ctx.writeAndFlush(response);
try {
ChunkedNioFile nioFile = new ChunkedNioFile(file,1024*1024);
//因為這是一個異步的操作骂蓖,所以要設置一個監(jiān)聽器監(jiān)聽文件下載
ChannelFuture future = ctx.writeAndFlush(nioFile);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("下載完成积瞒,關閉連接");
ctx.close();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
private void directoryHandler(ChannelHandlerContext ctx,File file){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
StringBuilder sb = new StringBuilder("<html><head><meta charset=\"UTF-8\"></head><body>");
sb.append("<ul>");
String filePath = file.getPath();
String substring = filePath.substring(filePath.indexOf(path) + path.length());
substring = substring.replaceAll("\\\\","/");
for (File f : Objects.requireNonNull(file.listFiles())) {
sb.append("<li>");
sb.append("<a href='").append(substring).append(f.getName()).append("'>").append(f.getName()).append("</a>");
sb.append("</li>");
}
sb.append("</ul>").append("</body></html>");
response.headers().add("Content-Type","text/html;charset=utf-8");
response.content().writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(response);
ctx.close();
}
}
4、Netty對WebSocket的支持
WebSocket是一種在單個TCP連接上進行全雙工通信的協(xié)議登下。WebSocket通信協(xié)議于2011年被IETF定為標準RFC 6455茫孔,并由RFC7936補充規(guī)范。WebSocket API也被W3C定為標準被芳。
WebSocket使得客戶端和服務器之間的數(shù)據(jù)交換變得更加簡單缰贝,允許服務端主動向客戶端推送數(shù)據(jù)。在WebSocket API中畔濒,瀏覽器和服務器只需要完成一次握手剩晴,兩者之間就直接可以創(chuàng)建持久性的連接,并進行雙向數(shù)據(jù)傳輸侵状。
package com.qianfeng.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class WebSocketServer {
public static void main(String[] args) {
//主線程池 處理客戶端的連接請求
EventLoopGroup master = new NioEventLoopGroup();
//從線程池 處理客戶端的消息請求
EventLoopGroup slave = new NioEventLoopGroup();
//創(chuàng)建服務器的初始化引導對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
//設置netty的線程模型:單線程赞弥、多線程、主從線程池(主線程池負責連接趣兄,從線程池負責消息的發(fā)送)
.group(master,slave)
//設置管道類型
.channel(NioServerSocketChannel.class)
//設置子線程池的事件處理
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//事件處理器鏈的使用
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
//http對象聚合處理器
pipeline.addLast(new HttpObjectAggregator(1024*1024));
//websocket
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
//自定義的處理器
pipeline.addLast(new WebSocketHandler());
}
});
//綁定端口是一個異步動作
ChannelFuture future = serverBootstrap.bind(80);
try {
//同步
future.sync();
System.out.println("綁定端口已完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
WebSocketHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客戶端連接");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("客戶端斷開連接");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println("接收到客戶端發(fā)來的消息:"+textWebSocketFrame.text());
ctx.writeAndFlush(new TextWebSocketFrame("你好"));
}
}
客戶端的編寫:
var ws;
//初始化連接websocket
function initWebSocket(){
//判斷瀏覽器是否支持WebSocket
if(window.WebSocket){
//連接WebSocket服務器
ws = new WebSocket("ws://127.0.0.1");
//設置websocket的各種回調方法
ws.onopen = function(){
console.log("已經(jīng)正常連接WebSocket服務器绽左!");
};
ws.onclose = function(){
console.log("連接已經(jīng)關閉!");
}
ws.onerror = function(){
console.log("連接異常艇潭!");
}
ws.onmessage = function(msg){
console.log("已經(jīng)接收到服務器的消息:" + msg.data);
//
var msg = "<li> 服務器:" + msg.data + "</li>";
document.getElementById("msgUl")
.insertAdjacentHTML("beforeEnd", msg);//startAfter startBefore
//'beforeBegin', 'afterBegin', 'beforeEnd', 'afterEnd'
}
} else {
alert("騷瑞妇菱,您的瀏覽器太垃圾了,請換個高級的瀏覽器暴区!");
}
}