什么是Netty
Netty是一個異步事件驅動的網(wǎng)絡應用程序框架,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端,Netty是一個NIO客戶端服務器框架,可以快速輕松地開發(fā)網(wǎng)絡應用程序御板,例如協(xié)議服務器和客戶端猎莲。它極大地簡化和簡化了諸如TCP和UDP套接字服務器之類的網(wǎng)絡編程煤篙。
“快速簡便”并不意味著最終的應用程序將遭受可維護性或性能問題的困擾。Netty經過精心設計,結合了許多協(xié)議(例如FTP盔几,SMTP,HTTP以及各種基于二進制和文本的舊式協(xié)議)的實施經驗掩幢。結果逊拍,Netty成功地找到了一種無需妥協(xié)即可輕松實現(xiàn)開發(fā),性能际邻,穩(wěn)定性和靈活性的方法芯丧。
特性
- 高性能 事件驅動
- 異步非堵塞 基于NIO的客戶端,服務器端編程框架
- 穩(wěn)定性和伸縮性
- 適用于各種傳輸類型的統(tǒng)一API-阻塞和非阻塞套接字
- 基于靈活且可擴展的事件模型,可將關注點明確分離
- 高度可定制的線程模型-單線程,一個活多個線程池 ,例如SEDA
- 真正的無連接數(shù)據(jù)報套接字支持 (從3.1開始)
表現(xiàn)
- 更高的吞吐量
- 減少資源消耗
- 減少不必要的內存復制
使用
在pom.xml中添加依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
一丶Springboot應用啟動加載Netty應用
暴露Netty端口,隨springboot應用一并啟動
/**
* @dete: 2021/4/21 9:08 上午
* @author: 徐子木
*/
@SpringBootApplication
public class PmSocketApplication implements CommandLineRunner {
// yml中指定netty端口號
@Value("${netty.port}")
private int nettyServerPort;
@Autowired
private NettyWebSocketServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(PmSocketApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//netty 服務端啟動的端口不可和Springboot啟動類的端口號重復
nettyServer.start(nettyServerPort);
//關閉服務器的時候同時關閉Netty服務
Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
}
}
二丶Netty整合WebSocket服務端
啟動或銷毀netty服務端的過程實現(xiàn)
/**
* Netty整合websocket 服務端
* 運行流程:
* 1.創(chuàng)建一個ServerBootstrap的實例引導和綁定服務器
* 2.創(chuàng)建并分配一個NioEventLoopGroup實例以進行事件的處理,比如接收連接和讀寫數(shù)據(jù)
* 3.指定服務器綁定的本地的InetSocketAddress
* 4.使用一個NettyServerHandler的實例初始化每一個新的Channel
* 5.調用ServerBootstrap.bind()方法以綁定服務器
*
* @description
* @author: 徐子木
* @create: 2020-06-02 14:23
**/
@Component
@Slf4j
public class NettyWebSocketServer {
/**
* EventLoop接口
* NioEventLoop中維護了一個線程和任務隊列,支持異步提交任務,線程啟動時會調用NioEventLoop的run方法,執(zhí)行I/O任務和非I/O任務
* I/O任務即selectionKey中的ready的事件,如accept,connect,read,write等,由processSelectedKeys方法觸發(fā)
* 非I/O任務添加到taskQueue中的任務,如register0,bind0等任務,由runAllTasks方法觸發(fā)
* 兩種任務的執(zhí)行時間比由變量ioRatio控制,默認為50,則表示允許非IO任務執(zhí)行的事件與IO任務的執(zhí)行時間相等
*/
private final EventLoopGroup boosGroup = new NioEventLoopGroup();
private final EventLoopGroup workGroup = new NioEventLoopGroup();
/**
* Channel
* Channel類似Socket,它代表一個實體(如一個硬件設備,一個網(wǎng)絡套接字) 的開放連接,如讀寫操作.通俗的講,Channel字面意思就是通道,每一個客戶端與服務端之間進行通信的一個雙向通道.
* Channel主要工作:
* 1.當前網(wǎng)絡連接的通道的狀態(tài)(例如是否打開?是否已連接?)
* 2.網(wǎng)絡連接的配置參數(shù)(例如接收緩沖區(qū)的大小)
* 3.提供異步的網(wǎng)絡I/O操作(如建立連接,讀寫,綁定端口),異步調用意味著任何I/O調用都將立即返回,并且不保證在調用結束時鎖清秋的I/O操作已完成.
* 調用立即放回一個ChannelFuture實例,通過注冊監(jiān)聽器到ChannelFuture上,可以I/O操作成功,失敗或取消時回調通知調用方.
* 4.支持關聯(lián)I/O操作與對應的處理程序.
* 不同協(xié)議,不同的阻塞類型的連接都有不同的Channel類型與之對應,下面是一些常用的Channel類型
* NioSocketChannel,異步的客戶端 TCP Socket連接
* NioServerSocketChannel,異步的服務端 TCP Socket 連接
* NioDatagramChannel,異步的UDP連接
* NioSctpChannel,異步的客戶端Sctp連接
* NioSctoServerChannel,異步的Sctp服務端連接
* 這些通道涵蓋了UDP 和TCP網(wǎng)絡IO以及文件IO
*/
private Channel channel;
/**
* 啟動服務
*
* @param port
*/
public void start(int port) {
log.info("=================Netty 端口啟動:{}==================",port);
/**
* Future
* Future提供了另外一種在操作完成時通知應用程序的方式,這個對象可以看做一個異步操作的結果占位符.
* 通俗的講,它相當于一位指揮官,發(fā)送了一個請求建立完連接,通信完畢了,你通知一聲它回來關閉各種IO通道,整個過程,它是不阻塞的,異步的.
* 在Netty中所有的IO操作都是異步的,不能理科的值消息是否被正確處理,但是可以過一會兒等他執(zhí)行完成或者直接注冊一個監(jiān)聽,具體的實現(xiàn)就是通過Future和ChannelFutures.
* 他們可以注冊一個監(jiān)聽,當操作執(zhí)行成功成功或者失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件
*/
try {
/**
* Bootstrap
* Bootstrap是引導的意思,一個Netty應用通常由一個Bootstrap開始
* 主要作用是配置整個Netty程序,串聯(lián)各個組件
* Netty中Bootstrap類是服務端啟動引導類
*/
ServerBootstrap server = new ServerBootstrap();
server.group(boosGroup, workGroup)
//非阻塞異步服務端TCP Socket 連接
.channel(NioServerSocketChannel.class)
//設置為前端WebSocket可以連接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// HttpServerCodec: 將請求和映帶消息節(jié)嗎為HTTP消息
pipeline.addLast("http-codec", new HttpServerCodec());
// 講HTTP消息的多個部分合成一條完整的HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
// 向客戶端發(fā)送HTML5文件
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 進行設置心跳檢測
socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));
// 配置通道處理 來進行業(yè)務處理
pipeline.addLast("handler", new WebSocketServerHandler());
}
});
channel = server.bind(port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void destroy() {
log.info("=================Netty服務關閉==================");
if (channel != null) {
channel.close();
}
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
三丶Socket處理注冊連接類
接收客戶端連接信息,心跳檢測,存儲于銷毀等的處理
其余部分業(yè)務類(messageService),均為自行處理連接存儲為業(yè)務所用,請忽略
/**
* @description
* @author: 徐子木
* @create: 2020-06-02 14:57
**/
@Slf4j
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte CUSTOM_MSG = 3;
private int heartbeatCount = 0;
// 配置客戶端是否為https的控制
@Value("${netty.ssl-enabled:false}")
private Boolean useSsl;
/**
* 這里可以引入自己業(yè)務類來處理進行的客戶端連接
*/
@Autowired
private MessageService messageService;
public static WebSocketServerHandler webSocketServerHandler;
/**
* 解決啟動加載不到自己業(yè)務類
*/
@PostConstruct
public void init() {
webSocketServerHandler = this;
}
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//http請求和tcp請求分開處理
if (msg instanceof HttpRequest) {
handlerHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
//踩坑: simpleChannelInboundHandler 他會進行一次釋放(引用計數(shù)器減一),參考源碼,而我們釋放的時候就變?yōu)榱?,所以必須手動進行引用計數(shù)器加1
WebSocketFrame frame = (WebSocketFrame) msg;
frame.retain();
handlerWebSocketFrame(ctx, frame);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* WebSocket 消息處理
*
* @param ctx
* @param frame
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
//判斷是否是關閉鏈路的指令
if (frame instanceof CloseWebSocketFrame) {
log.info("【" + ctx.channel().remoteAddress() + "】已關閉(服務器端)");
//移除channel
NioSocketChannel channel = (NioSocketChannel) ctx.channel();
webSocketServerHandler.messageService.removeConnection(channel);
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return;
}
//判斷是否是ping消息
if (frame instanceof PingWebSocketFrame) {
log.info("【ping】");
return;
}
//判斷實時是pong消息
if (frame instanceof PongWebSocketFrame) {
log.info("【pong】");
return;
}
//本例子只支持文本世曾,不支持二進制
if (!(frame instanceof TextWebSocketFrame)) {
log.info("【不支持二進制】");
throw new UnsupportedOperationException("不支持二進制");
}
// 傳送的消息 ,接收客戶端指定格式(自己與客戶端約定json格式)的消息,并進行處理
MessageObject messageObject = JSONObject.parseObject(((TextWebSocketFrame) frame).text().toString(), MessageObject.class);
webSocketServerHandler.messageService.sendMessage(messageObject, ctx);
}
/**
* websocket第一次連接握手
*
* @param ctx
*/
@SuppressWarnings("deprecation")
private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// 這里接收客戶端附加連接參數(shù),根據(jù)自己業(yè)務與客戶端指定需要哪些參數(shù)來辨別連接唯一性
String userUid = null;
String sectionId = null;
if ("GET".equalsIgnoreCase(req.getMethod().toString())) {
String uri = req.getUri();
userUid = req.getUri().substring(uri.indexOf("/", 2) + 1, uri.lastIndexOf("/"));
sectionId = req.getUri().substring(uri.lastIndexOf("/") + 1);
//對用戶信息進行存儲
NioSocketChannel channel = (NioSocketChannel) ctx.channel();
webSocketServerHandler.messageService.putConnection(userUid, sectionId, channel);
}
// http 解碼失敗
if (!req.getDecoderResult().isSuccess() || (!"websocket".equalsIgnoreCase(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
}
//可以通過url獲取其他參數(shù)
WebSocketServerHandshakerFactory factory;
// 這里主要用于 客戶端為wss連接的處理
if (useSsl != null && useSsl) {
factory = new WebSocketServerHandshakerFactory(
"wss://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
);
} else {
factory = new WebSocketServerHandshakerFactory(
"ws://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
);
}
handshaker = factory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
//進行連接
handshaker.handshake(ctx.channel(), (FullHttpRequest) req);
}
}
@SuppressWarnings("deprecation")
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回應答給客戶端
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
// buf.release();
}
// 如果是非Keep-Alive缨恒,關閉連接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 這里是保持服務器與客戶端長連接 進行心跳檢測 避免連接斷開
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent stateEvent = (IdleStateEvent) evt;
PingWebSocketFrame ping = new PingWebSocketFrame();
switch (stateEvent.state()) {
//讀空閑(服務器端)
case READER_IDLE:
//log.info("【" + ctx.channel().remoteAddress() + "】讀空閑(服務器端)");
ctx.writeAndFlush(ping);
break;
//寫空閑(客戶端)
case WRITER_IDLE:
//log.info("【" + ctx.channel().remoteAddress() + "】寫空閑(客戶端)");
ctx.writeAndFlush(ping);
break;
case ALL_IDLE:
//log.info("【" + ctx.channel().remoteAddress() + "】讀寫空閑");
break;
default:
break;
}
}
}
/**
* 出現(xiàn)異常時
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
//移除channel
webSocketServerHandler.messageService.removeConnection((NioSocketChannel) ctx.channel());
ctx.close();
log.info("【" + ctx.channel().remoteAddress() + "】已關閉(服務器端)");
}
}
四丶與業(yè)務整合并存儲客戶連接
以下均為自己業(yè)務與socket連接對象的存儲,發(fā)送,踢出等處理,可根據(jù)自己業(yè)務自行參考
當時公司業(yè)務需要將連接存儲兩級維度,可根據(jù)自己的業(yè)務給客戶端分組,或不分組都可以
/**
* @description
* @author: 徐子木
* @create: 2020-09-30 15:06
**/
@Slf4j
@Service(value = "messageService")
public class MessageServiceImpl implements MessageService {
// 是消息通信dubbo類,請忽略
@Autowired
private ChatMsgService chatMsgService;
// 是db存儲類 ,請忽略
@Autowired
private BidPresentDao bidPresentDao;
// 這里是netty可為指定的唯一key去與連接進行分組處理并存儲
private HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
private final AttributeKey<String> userKey = AttributeKey.valueOf("user");
private final AttributeKey<String> sectionKey = AttributeKey.valueOf("section");
/**
* 裝載標段與對應在線的用戶
*/
private static final Map<String, ChannelGroup> SECTION_GROUPS = new ConcurrentHashMap<>();
/**
* 維護某標段中的socket連接
*
* @param sectionId
* @param channel
*/
@Override
public void putConnection(String userId, String sectionId, NioSocketChannel channel) {
channel.attr(userKey).set(userId);
channel.attr(sectionKey).set(sectionId);
bidPresentDao.comeOnlineByUserId(userId, sectionId);
//存儲用戶標段對應連接
ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
if (null == channelGroup) {
//保存全局的,連接上的服務器的客戶
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelGroup.add(channel);
SECTION_GROUPS.put(sectionId, channelGroup);
} else {
channelGroup.add(channel);
}
}
/**
* 判斷一個通道是否有用戶在使用
*
* @param channel
* @return
*/
private boolean hasUser(Channel channel) {
return ((channel.hasAttr(userKey) || channel.hasAttr(sectionKey)));
}
/**
* 獲取連接對應用戶
*
* @param channel
* @return
*/
@Override
public String getBindUserId(NioSocketChannel channel) {
if (hasUser(channel)) {
return channel.attr(userKey).get();
}
return null;
}
/**
* 獲取連接對應標段Id
*
* @param channel
* @return
*/
@Override
public String getBindSectionId(NioSocketChannel channel) {
if (hasUser(channel)) {
return channel.attr(sectionKey).get();
}
return null;
}
/**
* 用戶退出標段在線連接
*
* @param channel
*/
@Override
public void removeConnection(NioSocketChannel channel) {
String userId = getBindUserId(channel);
String sectionId = getBindSectionId(channel);
Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
while (iterator.hasNext()) {
ChannelGroup channelGroup = iterator.next().getValue();
if (channelGroup.contains(channel)) {
channelGroup.remove(channel);
}
if (null == channelGroup || channelGroup.size() == 0) {
iterator.remove();
}
}
if (StringUtils.isNotEmpty(userId) && StringUtils.isNotEmpty(sectionId)) {
bidPresentDao.exitOnlineByUserId(userId, sectionId);
}
}
/**
* 根據(jù)用戶Id獲取連接
*
* @param userId
* @return
*/
private NioSocketChannel getChannel(String userId) {
Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
while (iterator.hasNext()) {
ChannelGroup channelGroup = iterator.next().getValue();
for (Channel channel : channelGroup) {
if (userId.equalsIgnoreCase(channel.attr(userKey).get())) {
return (NioSocketChannel) channel;
}
}
}
return null;
}
/**
* 發(fā)送純狀態(tài)碼的消息
*
* @param toUserId
* @param message
*/
@Override
public void sendMessage(String toUserId, String message) {
NioSocketChannel channel = getChannel(toUserId);
if (channel != null) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("message", message);
channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(jsonObject)));
}
}
/**
* 向指定投標人發(fā)送狀態(tài)
*
* @param toUserId
*/
@Override
public void sendMessage(String toUserId, MessageEnum messageEnum) {
NioSocketChannel channel = getChannel(toUserId);
if (channel != null) {
MessageObject messageObject = MessageObject.builder().
code(messageEnum.getCode())
.build();
channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
}
}
/**
* 向當前標段所有投標人發(fā)送消息
*
* @param sectionId
*/
@Override
public void sendMessageAll(String sectionId, MessageEnum messageEnum) {
log.debug("標段Id: {},發(fā)送狀態(tài)碼: {}", sectionId, messageEnum);
MessageObject messageObject = MessageObject.builder()
.code(messageEnum.getCode())
.build();
sendMessageAll(sectionId, messageObject);
}
@Override
public void sendMessageAll(String sectionId, MessageObject messageObject) {
ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
if (channelGroup == null || channelGroup.size() == 0) {
log.warn("暫時無客戶端在線 sectionId:{}", sectionId);
return;
}
channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
}
/**
* 根據(jù)狀態(tài)碼處理消息
*
* @param message
*/
@Override
public void sendMessage(MessageObject message, ChannelHandlerContext ctx) {
MessageEnum messageEnum = MessageEnum.valuesOf(message.getCode());
switch (messageEnum) {
case CHAT_SEND_MESSAGE:
ChatMsg msg = JSONObject.parseObject(message.getData().toString(), ChatMsg.class);
flushSectionChat(msg, ctx);
break;
case HEART_CONNECT:
//心跳連接
break;
case REDIRECT:
RedirectEntity redirectEntity = JSONObject.parseObject(message.getData().toString(), RedirectEntity.class);
flushSectionToObject(redirectEntity.getSectionId(), redirectEntity);
break;
case DECODE:
String sectionId = JSONObject.parseObject(message.getData().toString(), String.class);
sendMessageAll(sectionId, MessageEnum.DECODE);
break;
default:
break;
}
}
/**
* 向該標段中發(fā)送系統(tǒng)消息
*
* @param sectionId
* @param msg
*/
@Override
public void flushSectionSystem(String sectionId, String msg) {
ChatMsg chatMsg = ChatMsg.builder()
.sectionId(sectionId)
.msgType(ChatMsgType.NOTICE.getCode())
.content(msg)
.build();
flushSectionChat(chatMsg, null);
}
/**
* 向當前標段的在線人員刷新一條消息
*
* @param chatMsg
*/
private void flushSectionChat(ChatMsg chatMsg, ChannelHandlerContext ctx) {
if (ObjectUtil.isNotEmpty(ctx)) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = inetSocketAddress.getAddress().getHostAddress();
chatMsg.setIp(ip);
}
String sectionId = chatMsg.getSectionId();
MessageObject messageObject = MessageObject.builder()
.code(MessageEnum.CHAT_SEND_MESSAGE.getCode())
.data(chatMsg)
.build();
sendMessageAll(sectionId, messageObject);
chatMsgService.create(chatMsg);
}
/**
* 當前標段在線人員發(fā)送自定義數(shù)據(jù)
*
* @param sectionId
* @param object
*/
private void flushSectionToObject(String sectionId, Object object) {
MessageObject messageObject = MessageObject.builder()
.code(MessageEnum.REDIRECT.getCode())
.data(object)
.build();
sendMessageAll(sectionId, messageObject);
}
/**
* 向指定用戶發(fā)送自定義數(shù)據(jù)
*
* @param toUserId
*/
@Override
public void sendMessage(String toUserId, Object object, MessageEnum messageEnum) {
NioSocketChannel channel = getChannel(toUserId);
if (channel != null) {
MessageObject messageObject = MessageObject.builder().
code(messageEnum.getCode())
.data(object)
.build();
if(MessageEnum.EXTRACT_PARAM_RESULT.equals(messageEnum)){
log.debug("參數(shù)抽取發(fā)送內容:{}",JSONObject.toJSONString(messageObject));
}
channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
}
}
}