最近做的項(xiàng)目有需求跟硬件通信,使用tcp實(shí)現(xiàn)長(zhǎng)連接,協(xié)議自己規(guī)定,于是后端決定選用netty來(lái)作為tcp服務(wù)器着降,這里簡(jiǎn)單說(shuō)一下netty的工作流程。外部的數(shù)據(jù)傳入netty服務(wù)器中汁展,netty首先通過解碼器對(duì)數(shù)據(jù)進(jìn)行一次預(yù)處理(比如把字節(jié)轉(zhuǎn)為字符串或?qū)ο髞?lái)方便操作)鹊碍,接著把預(yù)處理后的數(shù)據(jù)轉(zhuǎn)發(fā)給處理器,在處理器中執(zhí)行業(yè)務(wù)邏輯食绿,最后如果有必要返回?cái)?shù)據(jù)給連接者,可以通過netty提供的channel
發(fā)送公罕。
- netty—>decode—>handler
首先是啟動(dòng)一個(gè)tcp服務(wù)器
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author lanni
* @date 2020/8/19 23:05
* @description
**/
public class TCPServer {
public void run(int port) throws Exception {
//創(chuàng)建線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創(chuàng)建啟動(dòng)類
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口器紧,開始接收進(jìn)來(lái)的連接
ChannelFuture f = b.bind(port).sync();
// 等待服務(wù)器 socket 關(guān)閉 。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
try {
System.out.println("tcp服務(wù)器啟動(dòng)...");
new TCPServer().run(8998);
} catch (Exception e) {
e.printStackTrace();
}
}
}
初始化解碼器楼眷、處理器
package server;
import handler.CustomDecode;
import handler.TCPServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
/**
* @author lanni
* @date 2020/8/22 11:58
* @description
**/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().
addLast(new CustomDecode()). //自定義解碼器
addLast(new TCPServerHandler()) //自定義處理器
;
}
}
解碼器中解決tcp粘包問題铲汪,關(guān)于什么是粘包、拆包我就不做解釋了罐柳,我這里直接上解決方案掌腰,這里我簡(jiǎn)單說(shuō)一下我做的項(xiàng)目數(shù)據(jù)傳輸,規(guī)定數(shù)據(jù)格式:
固定頭部(2字節(jié))+數(shù)據(jù)長(zhǎng)度(4字節(jié))+其它(17字節(jié))+數(shù)據(jù)(可變長(zhǎng)度)+crc校驗(yàn)碼(2字節(jié))+固定結(jié)尾(2字節(jié))
所以每次收到的數(shù)據(jù)包中包含了數(shù)據(jù)的長(zhǎng)度张吉,就以此長(zhǎng)度來(lái)組裝數(shù)據(jù)包傳遞給handler齿梁,這里注意看我的注釋部分。
import util.StringUtil;
import java.util.List;
/**
* @Author lanni
* @Date 2020/8/23 9:30
* @Description
**/
public class CustomDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
int len = in.readableBytes(); //這里得到可讀取的字節(jié)長(zhǎng)度
in.markReaderIndex(); //包頭做標(biāo)記位肮蛹,后面可以重新回到數(shù)據(jù)包頭開始讀數(shù)據(jù)
//有數(shù)據(jù)時(shí)開始讀數(shù)據(jù)包
if (len > 0) {
byte[] src = new byte[len];
in.readBytes(src); //把數(shù)據(jù)讀到字節(jié)數(shù)組中(讀取完之后指針會(huì)到最后一個(gè)數(shù)據(jù))
in.resetReaderIndex(); //重置當(dāng)前指針到標(biāo)記位(包頭)
//驗(yàn)證首部為A5 5A,只接收首部正確的數(shù)據(jù)包,如果包頭錯(cuò)誤可以直接丟棄或關(guān)閉連接
if ((src[0] & 0x000000ff) == 0xA5 && (src[1] & 0x000000ff) == 0x5A) {
//計(jì)算報(bào)文長(zhǎng)度
byte[] data = {src[3],src[2]};
String hexLen = StringUtil.byteArrayToHexString(data);
//這里計(jì)算出來(lái)的是數(shù)據(jù)長(zhǎng)度的報(bào)文長(zhǎng)度,需要加27個(gè)固定長(zhǎng)度
int pLen = Integer.parseInt(hexLen, 16) + 27;
if (len < pLen) {
//當(dāng)數(shù)據(jù)包的長(zhǎng)度不夠時(shí)直接return勺择,netty在緩沖區(qū)有數(shù)據(jù)時(shí)會(huì)一直調(diào)用decode方法,所以我們只需要等待下一個(gè)數(shù)據(jù)包傳輸過來(lái)一起解析
return;
}
byte[] packet = new byte[pLen];
in.readBytes(packet,0,pLen);
out.add(packet);
}else {
channelHandlerContext.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("連接異常:"+cause);
// ctx.close();
}
然后就是處理器伦忠,用于處理得到的數(shù)據(jù)包省核,這個(gè)大家可以自己編寫邏輯。
package handler;
import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
* @Author lanni
* @Date 2020/8/19 23:07
* @Description
**/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里msg就是從解碼器中傳來(lái)的數(shù)據(jù)昆码,解碼器傳輸過來(lái)是什么格式气忠,這里直接轉(zhuǎn)成對(duì)應(yīng)的格式就可以
byte[] src = (byte[]) msg;
try {
//這里做自己的業(yè)務(wù)邏輯
//獲取鏈接實(shí)例
Channel channel = ctx.channel();
//響應(yīng)消息一定要這樣去發(fā)送,只能使用字節(jié)傳輸
//netty中發(fā)送數(shù)據(jù)需要把待發(fā)送的字節(jié)數(shù)組包裝一下成為ByteBuf來(lái)發(fā)送
byte[] dest = null;
ByteBuf buf = Unpooled.copiedBuffer(dest);
//數(shù)據(jù)沖刷
ChannelFuture cf = channel.writeAndFlush(buf);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
netty中當(dāng)然還涉及到服務(wù)器主動(dòng)發(fā)送消息給客戶端赋咽,但是需要注意的是如果是主動(dòng)發(fā)消息旧噪,有一個(gè)先決條件是需要知道客戶端的唯一標(biāo)識(shí)(id或其它標(biāo)識(shí)),我們需要用一個(gè)map來(lái)保存好channel和這個(gè)標(biāo)識(shí)的對(duì)應(yīng)關(guān)系冬耿。我所做的項(xiàng)目是服務(wù)器來(lái)維護(hù)設(shè)備id和連接通道channel的對(duì)應(yīng)關(guān)系舌菜。
首先需要一個(gè)統(tǒng)一管理channel的類,這里有CHANNEL_POOL
和KEY_POOL
兩個(gè)map亦镶,是為了讓id和channel能夠互相對(duì)應(yīng)起來(lái)日月,可能有人會(huì)想著只需要維護(hù)id—>channel的關(guān)系就可以了袱瓮,但是可以看見上面在發(fā)生異常時(shí)所使用的處理方法exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
時(shí),只能拿到channel爱咬,所以需要通過channel找到id來(lái)做出相應(yīng)的操作尺借。
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author lanni
* @date 2020/9/11 20:21
*
**/
@Slf4j
public class NettyChannelManager {
/**
* 保存連接 Channel 的地方
*/
private static Map<String, Channel> CHANNEL_POOL = new ConcurrentHashMap<>();
private static Map<Channel, String> KEY_POOL = new ConcurrentHashMap<>();
/**
* 添加 Channel
*
* @param key
*/
public static void add(String key, Channel channel) {
CHANNEL_POOL.put(key, channel);
KEY_POOL.put(channel, key);
}
/**
* 刪除 Channel
*
* @param key
*/
public static void remove(String key) {
Channel channel = CHANNEL_POOL.get(key);
if (channel == null) {
return;
}
CHANNEL_POOL.remove(key);
KEY_POOL.remove(channel);
}
/**
* 刪除并同步關(guān)閉連接
*
* @param key
*/
public static void removeAndClose(String key) {
Channel channel = CHANNEL_POOL.get(key);
remove(key);
if (channel != null) {
// 關(guān)閉連接
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void removeAndClose(Channel channel) {
String key = KEY_POOL.get(channel);
removeAndClose(key);
}
/**
* 獲得 Channel
*
* @param key
* @return String
*/
public static Channel getChannel(String key) {
return CHANNEL_POOL.get(key);
}
/**
* 獲得 key
*
* @param channel
* @return Channel
*/
public static String getKey(Channel channel) {
return KEY_POOL.get(channel);
}
/**
* 判斷是否存在key
* @author lanni
* @date 2020/9/16 10:10
* @param key
* @return boolean
**/
public static boolean hasKey(String key) {
return CHANNEL_POOL.containsKey(key);
}
/**
* 判斷是否存在channel
* @author lanni
* @date 2020/10/12 9:34
* @param channel
* @return boolean
**/
public static boolean hasChannel(Channel channel) {
return KEY_POOL.containsKey(channel);
}
}
我這里是在處理器中獲取到設(shè)備的id,然后交給NettyChannelManager
管理精拟,當(dāng)發(fā)生異常時(shí)關(guān)閉channel并移除對(duì)應(yīng)的連接信息燎斩。
package handler;
import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
* @Author lanni
* @Date 2020/8/19 23:07
* @Description
**/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//這里msg就是從解碼器中傳來(lái)的數(shù)據(jù),解碼器傳輸過來(lái)是什么格式蜂绎,這里直接轉(zhuǎn)成對(duì)應(yīng)的格式就可以
byte[] src = (byte[]) msg;
try {
// 從數(shù)據(jù)包中拿到設(shè)備id
byte[] deviceId = new byte[17];
System.arraycopy(src, 4, deviceId, 0, 17);
String devId = StrUtil.str(deviceId, CharsetUtil.UTF_8);
// 保存channel,key
// deviceId為空時(shí)表示設(shè)備斷線重連
if (!NettyChannelManager.hasKey(devId)) {
NettyChannelManager.add(devId, ctx.channel());
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
log.error("發(fā)生異常:" + cause.getMessage());
String devId = NettyChannelManager.getKey(ctx.channel());
if (devId == null || "".equals(devId)) {
return;
}
// 刪除鏈接信息并關(guān)閉鏈接
NettyChannelManager.removeAndClose(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String devId = NettyChannelManager.getKey(ctx.channel());
if (devId == null || "".equals(devId)) {
return;
}
// 刪除鏈接信息并關(guān)閉鏈接
NettyChannelManager.removeAndClose(ctx.channel());
}
}
現(xiàn)在有了這樣一個(gè)對(duì)應(yīng)關(guān)系之后栅表,如果我們想給客戶端主動(dòng)發(fā)送消息,那么我們只需要通過客戶端的id拿到對(duì)應(yīng)的channel就可以在任意位置發(fā)送數(shù)據(jù)师枣。
// 先準(zhǔn)備好需要發(fā)送的數(shù)據(jù)
byte[] pkg =
// 通過id獲取netty連接通道channel
Channel channel = NettyChannelManager.getChannel(deviceId);
// 封裝數(shù)據(jù)
ByteBuf buf = Unpooled.copiedBuffer(pkg);
// 把數(shù)據(jù)寫入通道并發(fā)送
channel.writeAndFlush(buf);
結(jié)語(yǔ):以上所說(shuō)都是在單機(jī)環(huán)境下怪瓶,如果說(shuō)是分布式環(huán)境的話那么關(guān)于id-channel的維護(hù)就需要修改。我們可以使用spring session來(lái)代替這里的
NettyChannelManager
践美,只需要幾個(gè)配置就能解決分布式的問題洗贰,當(dāng)然也可以有其它的方案,我在這里就不列舉了陨倡。