前言
眾所周知我們在進(jìn)行網(wǎng)絡(luò)連接的時候,建立套接字連接是一個非常消耗性能的事情纱耻,特別是在分布式的情況下芭梯,用線程池去保持多個客戶端連接,是一種非常消耗線程的行為弄喘。那么我們該通過什么技術(shù)去解決上述的問題呢玖喘,那么就不得不提一個網(wǎng)絡(luò)連接的利器——Netty.
Netty
Netty是一個NIO客戶端服務(wù)器框架:
它可快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,例如協(xié)議服務(wù)器和客戶端限次。
它極大地簡化和簡化了網(wǎng)絡(luò)編程芒涡,例如TCP和UDP套接字服務(wù)器。
NIO是一種非阻塞IO 卖漫,它具有以下的特點
單線程可以連接多個客戶端费尽。
選擇器可以實現(xiàn)單線程管理多個Channel,新建的通道都要向選擇器注冊羊始。
一個SelectionKey鍵表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊關(guān)系旱幼。
selector進(jìn)行select()操作可能會產(chǎn)生阻塞,但是可以設(shè)置阻塞時間突委,并且可以用wakeup()喚醒selector柏卤,所以NIO是非阻塞IO。
Netty模型selector模式
它相對普通NIO的在性能上有了提升匀油,采用了:
NIO采用多線程的方式可以同時使用多個selector
通過綁定多個端口的方式缘缚,使得一個selector可以同時注冊多個ServerSocketServer
單個線程下只能有一個selector,用來實現(xiàn)Channel的匹配及復(fù)用
半包問題
TCP/IP在發(fā)送消息的時候敌蚜,可能會拆包桥滨,這就導(dǎo)致接收端無法知道什么時候收到的數(shù)據(jù)是一個完整的數(shù)據(jù)。在傳統(tǒng)的BIO中在讀取不到數(shù)據(jù)時會發(fā)生阻塞弛车,但是NIO不會齐媒。為了解決NIO的半包問題,Netty在Selector模型的基礎(chǔ)上纷跛,提出了reactor模式喻括,從而解決客戶端請求在服務(wù)端不完整的問題。
netty模型reactor模式
- 在selector的基礎(chǔ)上解決了半包問題贫奠。
上圖唬血,簡單地可以描述為"boss接活望蜡,讓work干":manReactor用來接收請求(會與客戶端進(jìn)行握手驗證),而subReactor用來處理請求(不與客戶端直接連接)刁品。
SpringBoot使用Netty實現(xiàn)遠(yuǎn)程調(diào)用
maven依賴
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
服務(wù)端部分
NettyServer.java:服務(wù)啟動監(jiān)聽器
@Slf4j
public class NettyServer {
public void start() {
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
//new 一個主線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//new 一個工作線程組
EventLoopGroup workGroup = new NioEventLoopGroup(200);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
//設(shè)置隊列大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 兩小時內(nèi)沒有數(shù)據(jù)的通信時,TCP會自動發(fā)送一個活動探測數(shù)據(jù)報文
.childOption(ChannelOption.SO_KEEPALIVE, true);
//綁定端口,開始接收進(jìn)來的連接
try {
ChannelFuture future = bootstrap.bind(socketAddress).sync();
log.info("服務(wù)器啟動開始監(jiān)聽端口: {}", socketAddress.getPort());
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("服務(wù)器開啟失敗", e);
} finally {
//關(guān)閉主線程組
bossGroup.shutdownGracefully();
//關(guān)閉工作線程組
workGroup.shutdownGracefully();
}
}
}
ServerChannelInitializer.java:netty服務(wù)初始化器
/**
* netty服務(wù)初始化器
**/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加編解碼
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
NettyServerHandler.java:netty服務(wù)端處理器
/**
* netty服務(wù)端處理器
**/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客戶端連接會觸發(fā)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客戶端發(fā)消息會觸發(fā)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服務(wù)器收到消息: {}", msg.toString());
ctx.write("你也好哦");
ctx.flush();
}
/**
* 發(fā)生異常觸發(fā)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
RpcServerApp.java:SpringBoot啟動類
/**
* 啟動類
*
*/
@Slf4j
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class RpcServerApp extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(RpcServerApp.class);
}
/**
* 項目的啟動方法
*
* @param args
*/
public static void main(String[] args) {
SpringApplication.run(RpcServerApp.class, args);
//開啟Netty服務(wù)
NettyServer nettyServer =new NettyServer ();
nettyServer.start();
log.info("======服務(wù)已經(jīng)啟動========");
}
}
客戶端部分
NettyClientUtil.java:NettyClient工具類
/**
* Netty客戶端
**/
@Slf4j
public class NettyClientUtil {
public static ResponseResult helloNetty(String msg) {
NettyClientHandler nettyClientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
//該參數(shù)的作用就是禁止使用Nagle算法泣特,使用于小數(shù)據(jù)即時傳輸
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
log.info("客戶端發(fā)送成功....");
//發(fā)送消息
future.channel().writeAndFlush(msg);
// 等待連接被關(guān)閉
future.channel().closeFuture().sync();
return nettyClientHandler.getResponseResult();
} catch (Exception e) {
log.error("客戶端Netty失敗", e);
throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
} finally {
//以一種優(yōu)雅的方式進(jìn)行線程退出
group.shutdownGracefully();
}
}
}
NettyClientHandler.java:客戶端處理器
/**
* 客戶端處理器
**/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private ResponseResult responseResult;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端Active .....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客戶端收到消息: {}", msg.toString());
this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
驗證
測試接口
@RestController
@Slf4j
public class UserController {
@PostMapping("/helloNetty")
@MethodLogPrint
public ResponseResult helloNetty(@RequestParam String msg) {
return NettyClientUtil.helloNetty(msg);
}
}
訪問測試接口
服務(wù)端打印信息
客戶端打印信息
作者:溪源的奇思妙想
原文鏈接:https://blog.csdn.net/weixin_40990818/article/details/109248198