概述
本文通過一個簡單的 demo 來介紹 Netty 在 spring boot 項目中的使用,其中包括了服務(wù)器端和客戶端的啟動代碼登失,客戶端向服務(wù)器端發(fā)送文本消息遏佣。
maven 依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
關(guān)鍵點
服務(wù)器端在啟動的時候開放一個端口:19080
客戶端在啟動的時候通過 ip 和 端口連上服務(wù)器端
客戶端和服務(wù)器端都通過 Channel 對象向彼此發(fā)送數(shù)據(jù)
服務(wù)器和客戶端都通過繼承 ChannelInboundHandlerAdapter 類實現(xiàn)對消息的讀取和回寫等操作
服務(wù)器和客戶端都通過 StringDecoder 和 StringEncoder 實現(xiàn)對消息的解碼和轉(zhuǎn)碼操作
服務(wù)器和客戶端啟動的時候都會阻塞當前線程,因此需要在一個單獨的線程中進行啟動
消息發(fā)送的例子
本例是一個 spring boot web 項目揽浙,項目占用了 8080 端口
服務(wù)器端在啟動的時候開放 19080 端口(注意不要和 web 端口沖突了)
客戶端在啟動的時候連上服務(wù)器端
通過 web api 向客戶端發(fā)送數(shù)據(jù)状婶,客戶端再通過 Channel 對象向服務(wù)器端發(fā)送數(shù)據(jù)
服務(wù)器接收到客戶端數(shù)據(jù)后也通過 Channel 對象向客戶端發(fā)送數(shù)據(jù)
server 服務(wù)器端
通過 @PostConstruct 注解的方法進行啟動,具體如下
package me.yizhou.test.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;
@Slf4j
@Component
public class HelloWorldServer {
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();
@Resource
private HelloWorldServerHandler helloWorldServerHandler;
public void startServer(int port) {
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(helloWorldServerHandler);
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口馅巷,開始接收進來的連接
sbs.bind(port).addListener(future -> {
log.info(String.format("服務(wù)器啟動成功膛虫,并監(jiān)聽端口:%s ", port));
});
} catch (Exception e) {
log.error("啟動 netty 服務(wù)器端出現(xiàn)異常", e);
}
}
// 服務(wù)器端啟動,并綁定 19080 端口
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startServer(19080));
}
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
服務(wù)器端 HelloWorldServerHandler 如下
package me.yizhou.test.server;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
@Slf4j
public class HelloWorldServerHandler extends ChannelInboundHandlerAdapter {
// 服務(wù)器端讀取到 客戶端發(fā)送過來的數(shù)據(jù)钓猬,然后通過 Channel 回寫數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info(String.format("服務(wù)器端讀取到從客戶端:%s 發(fā)送過來的數(shù)據(jù):%s", ctx.channel().remoteAddress(), msg.toString()));
ctx.channel().writeAndFlush(String.format("server write:%s", msg));
}
// 捕獲到異常的處理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
client 客戶端
通過 @PostConstruct 注解的方法進行啟動稍刀,具體如下
package me.yizhou.test.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.concurrent.ForkJoinPool;
@Slf4j
@Component
public class HelloWorldClient {
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
private final EventLoopGroup group = new NioEventLoopGroup();
private ChannelFuture mChannelFuture = null;
private final ThreadLocal<Channel> mChannel = new ThreadLocal<>();
@Resource
private HelloWorldClientHandler helloWorldClientHandler;
public void startClient(String host, int port) {
// Configure the client.
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(helloWorldClientHandler);
}
});
mChannelFuture = b.connect(host, port).addListener(future -> {
log.info(String.format("客戶端啟動成功,并監(jiān)聽端口:%s ", port));
});
} catch (Exception e) {
log.error("啟動 netty 客戶端出現(xiàn)異常", e);
}
}
/**
* 客戶端通過 Channel 對象向服務(wù)器端發(fā)送數(shù)據(jù)
* @param data 文本數(shù)據(jù)
*/
public void send(String data) {
try {
if (mChannel.get() == null) {
mChannel.set(mChannelFuture.channel());
}
mChannel.get().writeAndFlush(data);
} catch (Exception e) {
log.error(this.getClass().getName().concat(".send has error"), e);
}
}
// 客戶端啟動敞曹,并連上服務(wù)器端
@PostConstruct
public void init() {
ForkJoinPool.commonPool().submit(() -> startClient("127.0.0.1", 19080));
}
@PreDestroy
public void destroy() {
group.shutdownGracefully();
}
}
客戶端 HelloWorldClientHandler 實現(xiàn)如下
package me.yizhou.test.server;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
@Slf4j
public class HelloWorldServerHandler extends ChannelInboundHandlerAdapter {
// 服務(wù)器端讀取到 客戶端發(fā)送過來的數(shù)據(jù)账月,然后通過 Channel 回寫數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info(String.format("服務(wù)器端讀取到從客戶端:%s 發(fā)送過來的數(shù)據(jù):%s", ctx.channel().remoteAddress(), msg.toString()));
ctx.channel().writeAndFlush(String.format("server write:%s", msg));
}
// 捕獲到異常的處理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
web api 數(shù)據(jù)發(fā)送入口
這里只是通過
package com.ckjava.test.web;
import com.ckjava.test.client.HelloWorldClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author yizhou
* @date 2022/6/16 20:50
*/
@RequestMapping(produces = "application/json;charset=utf-8")
@RestController
public class HelloNettyController {
@Resource
private HelloWorldClient mHelloWorldClient;
@GetMapping("/nettyClient")
public void nettyClient(@RequestParam String data) throws Exception {
mHelloWorldClient.send(data);
}
}
啟動類
package me.yizhou.test;
import me.yizhou.test.server.HelloWorldServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
/**
* @author yizhou
* @date 2022/6/16 20:40
*/
@SpringBootApplication
public class TestNettyApplication implements CommandLineRunner {
@Resource
private HelloWorldServer discardServer;//echoServer
public static void main(String[] args) {
SpringApplication.run(TestNettyApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
discardServer.startServer(8080);
}
}
測試
執(zhí)行如下請求
GET localhost:8087/nettyClient?data=%E4%BD%A0%E5%A5%BD%20yizhou
Content-Type: application/json;charset=utf-8
輸出如下