本示例首選介紹Java原生API實現(xiàn)BIO通信依鸥,然后進階實現(xiàn)NIO通信拆火,最后利用Netty實現(xiàn)NIO通信及Netty主要模塊組件介紹抗悍。
Netty 是一個異步事件驅(qū)動的網(wǎng)絡應用程序框架亡笑,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端吃嘿。
BIO(Blocking I/O) 方案
BIO通信(一請求一應答)模型圖如下
采用 BIO 通信模型 的服務端祠乃,通常由一個獨立的 Acceptor 線程負責監(jiān)聽客戶端的連接。我們一般通過在while(true) 循環(huán)中服務端會調(diào)用 accept() 方法等待接收客戶端的連接的方式監(jiān)聽請求兑燥,一旦接收到一個連接請求亮瓷,就可以在這個通信套接字上進行讀寫操作,此時不能再接收其他客戶端連接請求降瞳,只能等待當前連接的客戶端的操作執(zhí)行完成嘱支, 如果要讓 BIO 通信模型 能夠同時處理多個客戶端請求,就必須使用多線程(主要原因是socket.accept()力崇、socket.read()斗塘、socket.write() 涉及的三個主要函數(shù)都是同步阻塞的)
代碼實現(xiàn)
BIO服務端
BIOServer.java
package com.easy.javaBio;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
@Slf4j
public class BIOServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(10002);
while (true) {
Socket client = server.accept(); //等待客戶端的連接,如果沒有獲取連接 ,在此步一直等待
new Thread(new ServerThread(client)).start(); //為每個客戶端連接開啟一個線程
}
//server.close();
}
}
@Slf4j
class ServerThread extends Thread {
private Socket client;
public ServerThread(Socket client) {
this.client = client;
}
@SneakyThrows
@Override
public void run() {
log.info("客戶端:" + client.getInetAddress().getLocalHost() + "已連接到服務器");
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
//讀取客戶端發(fā)送來的消息
String mess = br.readLine();
log.info("客戶端:" + mess);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
bw.write(mess + "\n");
bw.flush();
}
}
BIO客戶端
BIOClient.java
package com.easy.javaBio;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.Socket;
@Slf4j
public class BIOClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("0.0.0.0", 10002);
InputStream input = s.getInputStream();
OutputStream output = s.getOutputStream();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(output));
bw.write("客戶端給服務端發(fā)消息測試\n"); //向服務器端發(fā)送一條消息
bw.flush();
BufferedReader br = new BufferedReader(new InputStreamReader(input)); //讀取服務器返回的消息
String mess = br.readLine();
log.info("服務器:" + mess);
}
}
運行示例
運行BIO服務端亮靴,然后再運行BIO客戶端馍盟,觀察控制臺
BIOServer控制臺輸出:
Connected to the target VM, address: '127.0.0.1:64346', transport: 'socket'
17:29:52.519 [Thread-1] INFO com.easy.javaBio.ServerThread - 客戶端:YHE6OR5UXQJ6D35/192.168.9.110已連接到服務器
17:29:52.523 [Thread-1] INFO com.easy.javaBio.ServerThread - 客戶端:客戶端給服務端發(fā)消息測試
BIOClient控制臺輸出:
Connected to the target VM, address: '127.0.0.1:64355', transport: 'socket'
17:29:52.527 [main] INFO com.easy.javaBio.BIOClient - 服務器:客戶端給服務端發(fā)消息測試
Disconnected from the target VM, address: '127.0.0.1:64355', transport: 'socket'
這表示我們實現(xiàn)了一個最簡單的BIO通信了
這種方式為每個客戶端開啟一個線程,高并發(fā)時消耗資源較多茧吊,容易浪費贞岭,甚至導致服務端崩潰,對性能造成負面影響搓侄,高并發(fā)下不推薦使用瞄桨。
NIO(New I/O)方案
NIO通信模型圖如下
NIO是一種同步非阻塞的I/O模型,在Java 1.4 中引入了 NIO 框架讶踪,對應 java.nio 包芯侥,提供了 Channel , Selector,Buffer等抽象。
NIO中的N可以理解為Non-blocking柱查,不單純是New廓俭。它支持面向緩沖的,基于通道的I/O操作方法唉工。 NIO提供了與傳統(tǒng)BIO模型中的 Socket 和 ServerSocket 相對應的 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實現(xiàn),兩種通道都支持阻塞和非阻塞兩種模式研乒。阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡單淋硝,但是性能和可靠性都不好雹熬;非阻塞模式正好與之相反。對于低負載谣膳、低并發(fā)的應用程序竿报,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護性;對于高負載参歹、高并發(fā)的(網(wǎng)絡)應用仰楚,應使用 NIO 的非阻塞模式來開發(fā)。
NIO服務端
NIOServer.java
package com.easy.javaBio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
@Slf4j
public class NIOServer {
private InetAddress addr;
private int port;
private Selector selector;
private static int BUFF_SIZE = 1024;
public NIOServer(InetAddress addr, int port) throws IOException {
this.addr = addr;
this.port = port;
startServer();
}
private void startServer() throws IOException {
// 獲得selector及通道(socketChannel)
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// 綁定地址及端口
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
log.info("NIOServer運行中...按下Ctrl-C停止服務");
while (true) {
log.info("服務器等待新的連接和selector選擇…");
this.selector.select();
// 選擇key工作
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
// 防止出現(xiàn)重復的key犬庇,處理完需及時移除
keys.remove();
//無效直接跳過
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
} else if (key.isConnectable()) {
this.connect(key);
}
}
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.finishConnect()) {
// 成功
log.info("成功連接了");
} else {
// 失敗
log.info("失敗連接");
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(this.selector, SelectionKey.OP_READ);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
log.info("連接到: " + remoteAddr);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
int numRead = channel.read(buffer);
if (numRead == -1) {
log.info("關閉客戶端連接: " + channel.socket().getRemoteSocketAddress());
channel.close();
return;
}
String msg = new String(buffer.array()).trim();
log.info("得到了: " + msg);
// 回復客戶端
String reMsg = msg + " 你好,這是BIOServer給你的回復消息:" + System.currentTimeMillis();
channel.write(ByteBuffer.wrap(reMsg.getBytes()));
}
private void write(SelectionKey key) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFF_SIZE);
byteBuffer.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
while (byteBuffer.hasRemaining()) {
clientChannel.write(byteBuffer);
}
byteBuffer.compact();
}
public static void main(String[] args) throws IOException {
new NIOServer(null, 10002);
}
}
使用NIO, 可以用Selector最終決定哪一組注冊的socket準備執(zhí)行I/O
NIO客戶端
NIOClient.java
package com.easy.javaBio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
@Slf4j
public class NIOClient {
private static int BUFF_SIZE = 1024;
public static void main(String[] args) throws IOException, InterruptedException {
InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", 10002);
SocketChannel socketChannel = SocketChannel.open(socketAddress);
log.info("連接 BIOServer 服務侨嘀,端口:10002...");
ArrayList<String> companyDetails = new ArrayList<>();
// 創(chuàng)建消息列表
companyDetails.add("騰訊");
companyDetails.add("阿里巴巴");
companyDetails.add("京東");
companyDetails.add("百度");
companyDetails.add("google");
for (String companyName : companyDetails) {
socketChannel.write(ByteBuffer.wrap(companyName.getBytes()));
log.info("發(fā)送: " + companyName);
ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
buffer.clear();
socketChannel.read(buffer);
String result = new String(buffer.array()).trim();
log.info("收到NIOServer回復的消息:" + result);
// 等待2秒鐘再發(fā)送下一條消息
Thread.sleep(2000);
}
socketChannel.close();
}
}
運行示例
首先運行我們的NIOServer臭挽,然后再運行NIOClient,觀察控制臺輸出
NIOServer控制臺輸出
17:35:40.921 [main] INFO com.easy.javaBio.NIOServer - NIOServer運行中...按下Ctrl-C停止服務
17:35:40.924 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:29.188 [main] INFO com.easy.javaBio.NIOServer - 連接到: /192.168.9.110:64443
17:36:29.188 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:29.194 [main] INFO com.easy.javaBio.NIOServer - 得到了: 騰訊
17:36:29.194 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:31.194 [main] INFO com.easy.javaBio.NIOServer - 得到了: 阿里巴巴
17:36:31.195 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:33.195 [main] INFO com.easy.javaBio.NIOServer - 得到了: 京東
17:36:33.195 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:35.196 [main] INFO com.easy.javaBio.NIOServer - 得到了: 百度
17:36:35.197 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:37.197 [main] INFO com.easy.javaBio.NIOServer - 得到了: google
17:36:37.198 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
17:36:39.198 [main] INFO com.easy.javaBio.NIOServer - 關閉客戶端連接: /192.168.9.110:64443
17:36:39.198 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的連接和selector選擇…
NIOClient控制臺輸出
17:36:29.189 [main] INFO com.easy.javaBio.NIOClient - 連接 BIOServer 服務咬腕,端口:10002...
17:36:29.194 [main] INFO com.easy.javaBio.NIOClient - 發(fā)送: 騰訊
17:36:29.194 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回復的消息:騰訊 你好欢峰,這是BIOServer給你的回復消息:1576229789194
17:36:31.194 [main] INFO com.easy.javaBio.NIOClient - 發(fā)送: 阿里巴巴
17:36:31.195 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回復的消息:阿里巴巴 你好,這是BIOServer給你的回復消息:1576229791194
17:36:33.195 [main] INFO com.easy.javaBio.NIOClient - 發(fā)送: 京東
17:36:33.196 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回復的消息:京東 你好涨共,這是BIOServer給你的回復消息:1576229793195
17:36:35.196 [main] INFO com.easy.javaBio.NIOClient - 發(fā)送: 百度
17:36:35.197 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回復的消息:百度 你好纽帖,這是BIOServer給你的回復消息:1576229795197
17:36:37.197 [main] INFO com.easy.javaBio.NIOClient - 發(fā)送: google
17:36:37.198 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回復的消息:google 你好,這是BIOServer給你的回復消息:1576229797198
NIO服務端每隔兩秒會收到客戶端的請求举反,并對客戶端的消息做出回復懊直。
直接使用Java NIO API構建應用程序是可以的,但要做到正確和安全并不容易火鼻。特別是在高負載下室囊,可靠和高效地處理和調(diào)度I/O操作是一項繁瑣而且容易出錯的任務】鳎可以選中Netty, Apache Mina等高性能網(wǎng)絡編程框架融撞。
Netty 構建 NIO 通信服務 方案
使用JDK原生網(wǎng)絡應用程序API,會存在的問題
NIO的類庫和API繁雜粗蔚,使用麻煩尝偎,你需要熟練掌握Selector、ServerSocketChannel鹏控、SocketChannel致扯、ByteBuffer等
需要具備其它的額外技能做鋪墊趁窃,例如熟悉Java多線程編程,因為NIO編程涉及到Reactor模式急前,你必須對多線程和網(wǎng)路編程非常熟悉醒陆,才能編寫出高質(zhì)量的NIO程序
可靠性能力補齊,開發(fā)工作量和難度都非常大裆针。例如客戶端面臨斷連重連刨摩、網(wǎng)絡閃斷、半包讀寫世吨、失敗緩存澡刹、網(wǎng)絡擁塞和異常碼流的處理等等,NIO編程的特點是功能開發(fā)相對容易耘婚,但是可靠性能力補齊工作量和難度都非常大
Netty對JDK自帶的NIO的API進行封裝罢浇,解決上述問題,主要特點有
- 高并發(fā)
Netty是一款基于NIO(Nonblocking I/O沐祷,非阻塞IO)開發(fā)的網(wǎng)絡通信框架嚷闭,對比于BIO(Blocking I/O,阻塞IO)赖临,他的并發(fā)性能得到了很大提高 胞锰。
- 傳輸快
Netty的傳輸快其實也是依賴了NIO的一個特性——零拷貝。
- 封裝好
Netty封裝了NIO操作的很多細節(jié)兢榨,提供易于使用的API嗅榕。
Netty框架的優(yōu)勢
- API使用簡單,開發(fā)門檻低吵聪;
- 功能強大凌那,預置了多種編解碼功能,支持多種主流協(xié)議吟逝;
- 定制能力強帽蝶,可以通過ChannelHandler對通信框架進行靈活地擴展;
- 性能高澎办,通過與其他業(yè)界主流的NIO框架對比嘲碱,Netty的綜合性能最優(yōu);
- 成熟局蚀、穩(wěn)定麦锯,Netty修復了已經(jīng)發(fā)現(xiàn)的所有JDK NIO BUG,業(yè)務開發(fā)人員不需要再為NIO的BUG而煩惱琅绅;
- 社區(qū)活躍扶欣,版本迭代周期短,發(fā)現(xiàn)的BUG可以被及時修復,同時料祠,更多的新功能會加入骆捧;
- 經(jīng)歷了大規(guī)模的商業(yè)應用考驗,質(zhì)量得到驗證髓绽。在互聯(lián)網(wǎng)敛苇、大數(shù)據(jù)、網(wǎng)絡游戲顺呕、企業(yè)應用枫攀、電信軟件等眾多行業(yè)得到成功商用,證明了它已經(jīng)完全能夠滿足不同行業(yè)的商業(yè)應用了株茶。
代碼實現(xiàn)
pom.xml依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.easy</groupId>
<artifactId>netty</artifactId>
<version>0.0.1</version>
<name>netty</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<modules>
<module>java-tcp</module>
<module>netty-server</module>
<module>netty-client</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
搭建 Netty 服務端
NettyServer.java
package com.easy.nettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
@Component
@Slf4j
public class NettyServer {
/**
* boss 線程組用于處理連接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 線程組用于數(shù)據(jù)處理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/**
* 啟動Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口設置套接字地址
.localAddress(new InetSocketAddress(port))
//服務端可連接隊列數(shù),對應TCP/IP協(xié)議listen函數(shù)中backlog參數(shù)
.option(ChannelOption.SO_BACKLOG, 1024)
//設置TCP長連接,一般如果兩個小時內(nèi)沒有數(shù)據(jù)的通信時,TCP會自動發(fā)送一個活動探測數(shù)據(jù)報文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//將小的數(shù)據(jù)包包裝成更大的幀進行傳送来涨,提高網(wǎng)絡的負載
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ServerChannelInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("啟動 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("關閉Netty");
}
}
NettyServerHandler.java
package com.easy.nettyServer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客戶端連接會觸發(fā)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客戶端發(fā)消息會觸發(fā)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服務器收到消息: {}", msg.toString());
ctx.write("我是服務端,我收到你的消息了启盛!");
ctx.flush();
}
/**
* 發(fā)生異常觸發(fā)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ServerChannelInitializer.java
package com.easy.nettyServer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加編解碼
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
創(chuàng)建 Netty 客戶端
NettyClient.java
package com.easy.nettyClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
/**
* 發(fā)送消息
*/
public void sendMsg(String msg) {
socketChannel.writeAndFlush(msg);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new NettyClientInitializer());
ChannelFuture future = bootstrap.connect();
//客戶端斷線重連邏輯
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("連接Netty服務端成功");
} else {
log.info("連接失敗蹦掐,進行斷線重連");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
NettyClientHandler.java
package com.easy.nettyClient;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端Active .....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客戶端收到消息: {}", msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
NettyClientInitializer.java
package com.easy.nettyClient;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
}
運行示例
打開瀏覽器,地址欄輸入:http://localhost:8091/send?msg=%E4%BD%A0%E5%A5%BD僵闯,觀察服務端和客戶端控制臺
服務端控制臺輸出
2019-12-13 18:01:37.901 INFO 11288 --- [ main] com.easy.nettyServer.NettyServer : 啟動 Netty Server
2019-12-13 18:01:45.834 INFO 11288 --- [ntLoopGroup-3-1] com.easy.nettyServer.NettyServerHandler : Channel active......
2019-12-13 18:02:07.858 INFO 11288 --- [ntLoopGroup-3-1] com.easy.nettyServer.NettyServerHandler : 服務器收到消息: 你好
客戶端控制臺輸出
2019-12-13 18:01:45.822 INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClient : 連接Netty服務端成功
2019-12-13 18:01:45.822 INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClientHandler : 客戶端Active .....
2019-12-13 18:02:08.005 INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClientHandler : 客戶端收到消息: 我是服務端卧抗,我收到你的消息了!
表示使用Netty實現(xiàn)了我們的NIO通信了
Netty 模塊組件
Bootstrap棍厂、ServerBootstrap
一個Netty應用通常由一個Bootstrap開始颗味,主要作用是配置整個Netty程序,串聯(lián)各個組件牺弹,Netty中Bootstrap類是客戶端程序的啟動引導類,ServerBootstrap是服務端啟動引導類时呀。
Future张漂、ChannelFuture
在Netty中所有的IO操作都是異步的,不能立刻得知消息是否被正確處理谨娜,但是可以過一會等它執(zhí)行完成或者直接注冊一個監(jiān)聽航攒,具體的實現(xiàn)就是通過Future和ChannelFuture,他們可以注冊一個監(jiān)聽趴梢,當操作執(zhí)行成功或失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件漠畜。
Channel
Netty網(wǎng)絡通信組件,能夠用于執(zhí)行網(wǎng)絡I/O操作坞靶。Channel為用戶提供:
- 當前網(wǎng)絡連接的通道的狀態(tài)(例如是否打開憔狞?是否已連接?)
- 網(wǎng)絡連接的配置參數(shù) (例如接收緩沖區(qū)大姓靡酢)
- 提供異步的網(wǎng)絡I/O操作(如建立連接瘾敢,讀寫,綁定端口),異步調(diào)用意味著任何I/O調(diào)用都將立即返回簇抵,并且不保證在調(diào)用結束時所請求的I/O操作已完成庆杜。調(diào)用立即返回一個ChannelFuture實例,通過注冊監(jiān)聽器到ChannelFuture上碟摆,可以I/O操作成功晃财、失敗或取消時回調(diào)通知調(diào)用方。
- 支持關聯(lián)I/O操作與對應的處理程序
不同協(xié)議典蜕、不同阻塞類型的連接都有不同的 Channel 類型與之對應断盛,下面是一些常用的 Channel 類型
- NioSocketChannel,異步的客戶端 TCP Socket 連接
- NioServerSocketChannel嘉裤,異步的服務器端 TCP Socket 連接
- NioDatagramChannel郑临,異步的 UDP 連接
- NioSctpChannel,異步的客戶端 Sctp 連接
- NioSctpServerChannel屑宠,異步的 Sctp 服務器端連接
Selector
Netty基于Selector對象實現(xiàn)I/O多路復用厢洞,通過 Selector, 一個線程可以監(jiān)聽多個連接的Channel事件, 當向一個Selector中注冊Channel 后,Selector 內(nèi)部的機制就可以自動不斷地查詢(select) 這些注冊的Channel是否有已就緒的I/O事件(例如可讀, 可寫, 網(wǎng)絡連接完成等)典奉,這樣程序就可以很簡單地使用一個線程高效地管理多個 Channel
NioEventLoop
NioEventLoop中維護了一個線程和任務隊列躺翻,支持異步提交執(zhí)行任務,線程啟動時會調(diào)用NioEventLoop的run方法卫玖,執(zhí)行I/O任務和非I/O任務:
- I/O任務 即selectionKey中ready的事件公你,如accept、connect假瞬、read陕靠、write等,由processSelectedKeys方法觸發(fā)脱茉。
- 非IO任務 添加到taskQueue中的任務剪芥,如register0、bind0等任務琴许,由runAllTasks方法觸發(fā)税肪。
兩種任務的執(zhí)行時間比由變量ioRatio控制,默認為50榜田,則表示允許非IO任務執(zhí)行的時間與IO任務的執(zhí)行時間相等益兄。
NioEventLoopGroup
NioEventLoopGroup,主要管理eventLoop的生命周期箭券,可以理解為一個線程池净捅,內(nèi)部維護了一組線程,每個線程(NioEventLoop)負責處理多個Channel上的事件邦鲫,而一個Channel只對應于一個線程灸叼。
ChannelHandler
ChannelHandler是一個接口神汹,處理I/O事件或攔截I/O操作,并將其轉(zhuǎn)發(fā)到其ChannelPipeline(業(yè)務處理鏈)中的下一個處理程序古今。
ChannelHandlerContext
保存Channel相關的所有上下文信息屁魏,同時關聯(lián)一個ChannelHandler對象
ChannelPipline
保存ChannelHandler的List,用于處理或攔截Channel的入站事件和出站操作捉腥。 ChannelPipeline實現(xiàn)了一種高級形式的攔截過濾器模式氓拼,使用戶可以完全控制事件的處理方式,以及Channel中各個的ChannelHandler如何相互交互抵碟。