應(yīng)很多朋友的要求胞锰,今天分享一下如何使用SpringBoot和Netty構(gòu)建高并發(fā)穩(wěn)健的JT808網(wǎng)關(guān)流强,并且是兼容JT808-2011和JT808-2019的網(wǎng)關(guān),此網(wǎng)關(guān)已經(jīng)有多個客戶在商用。
JT808網(wǎng)關(guān)作為部標終端連接的服務(wù)端雪侥,承載了終端登錄斑芜、心跳肩刃、位置、拍照等基礎(chǔ)業(yè)務(wù)以及信令交互杏头,是整個系統(tǒng)最核心的模塊盈包,一旦崩潰,則所有部標終端都會離線醇王,所有信令交互包括1078和主動安全的信令交互也會大受影響呢燥。所以,JT808網(wǎng)關(guān)的并發(fā)性穩(wěn)定性健壯性成為整個系統(tǒng)最重要的考量之一寓娩。
很多朋友用Mina或者Netty編寫網(wǎng)關(guān)程序時遇到過很多問題:
- 線程阻塞叛氨、內(nèi)存溢出等。
- 將所有數(shù)據(jù)轉(zhuǎn)成16進制字符串棘伴,用字符串操作數(shù)據(jù)寞埠。字符串處理的效率是最低的,當終端越來越多時焊夸,性能問題就會凸顯仁连。應(yīng)當充分使用Netty的ByteBuf處理數(shù)據(jù)。
- 未充分利用Netty的pipeline鏈式處理器阱穗,將所有的業(yè)務(wù)都放在一個handler中處理饭冬。
- JT808消息類型多使鹅,幾十上百個,如果采用if/else或者枚舉case判斷昌抠,造成業(yè)務(wù)處理類臃腫龐大患朱,維護和新增業(yè)務(wù)處理及其困難。
- 今年推出的JT808-2019扰魂,不知道如何兼容擴展麦乞。
本文使用JDK8+的環(huán)境開發(fā),使用SpringBoot2.x以及Netty4.x劝评,如有不懂JDK8的新語法姐直,請查閱資料。
此網(wǎng)關(guān)的特性:
1.支持JT808-2011蒋畜、JT808-2019声畏、JT1078報警、主動安全報警
2.使用MQ和Redis解耦姻成,多模塊數(shù)據(jù)共享訂閱插龄,不與任何數(shù)據(jù)庫關(guān)聯(lián)
3.多環(huán)境開發(fā)
4.跨平臺,部署簡單
5.支持ProtoBuf和JSON序列化
6.本公司首創(chuàng)的利用策略模式的底層封裝庫科展,模板可用于任何協(xié)議的開發(fā)均牢,簡化了網(wǎng)絡(luò)編程的復雜度,只專注于業(yè)務(wù)開發(fā)才睹,無任何網(wǎng)絡(luò)編程經(jīng)驗的人員都可接手徘跪,節(jié)省開發(fā)成本。
1.通用TcpServer創(chuàng)建
public class TcpServer {
public TcpServer(int threadPoolSize, int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
}
});
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
//啟動成功
} else {
//啟動失敗
}
});
}
}
- 首先看到琅攘,我們創(chuàng)建了兩個NioEventLoopGroup垮庐,這兩個對象可以看做是傳統(tǒng)IO編程模型的兩大線程組。bossGroup負責監(jiān)聽端口坞琴,accept 新連接的線程組哨查,這個線程數(shù)不宜過大,1-2個即可剧辐。workerGroup是負責處理每個連接的數(shù)據(jù)讀寫的線程組寒亥,默認線程數(shù)為CPU核心數(shù)的2倍。用通俗易懂的例子就是荧关,一個企業(yè)運作护盈,當然要有一個老板負責從外面接活,然后下面有很多員工負責具體干活羞酗,老板就是bossGroup,員工就是workerGroup紊服。bossGroup接收完連接檀轨,扔給workerGroup處理胸竞。
- ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開啟
ChannelOption.TCP_NODELAY表示是否開啟Nagle算法参萄,true表示關(guān)閉卫枝,false表示開啟,通俗地說讹挎,如果要求高實時性校赤,有數(shù)據(jù)發(fā)送時就馬上發(fā)送,就關(guān)閉筒溃,如果需要減少發(fā)送次數(shù)減少網(wǎng)絡(luò)交互马篮,就開啟。 - 接著怜奖,我們調(diào)用childHandler()方法浑测,給這個引導類創(chuàng)建一個ChannelInitializer,這里主要定義后續(xù)每個連接的數(shù)據(jù)讀寫歪玲,業(yè)務(wù)處理邏輯迁央。
2.接著設(shè)計最重要的Channel Pipeline中的鏈式處理器
先貼上我們pipeline的處理器都有哪些:
ch.pipeline().addLast(new IdleStateHandler(Jt808Constant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new Jt808FrameDecoder());
ch.pipeline().addLast(Jt808ProtocolDecoder.INSTANCE, new Jt808ProtocolEncoder());
ch.pipeline().addLast(Jt808LoginHandler.INSTANCE);
ch.pipeline().addLast(executorGroup, Jt808BusinessHandler.INSTANCE);
- IdleStateHandler:Netty提供的讀寫空閑狀態(tài)檢測的處理器。
- Jt808FrameDecoder:首先我們需要獲取0x7E開頭0x7E結(jié)尾的完整JT808消息才能進行下一步的解包滥崩。由于網(wǎng)絡(luò)問題岖圈,數(shù)據(jù)包可能會出現(xiàn)斷包或者粘包的情況。很多朋友會采用DelimiterBasedFrameDecoder的方式截取以0x7E結(jié)尾的數(shù)據(jù)作為完整的數(shù)據(jù)包钙皮。這里有個問題蜂科,如果黑客是熟悉JT808協(xié)議的,他發(fā)了幾十M的數(shù)據(jù)中間都是不含0x7E的株灸,截取的數(shù)據(jù)就會有幾十M崇摄,發(fā)多條進行攻擊內(nèi)存一下子就爆了。我們采用了最原始的ByteToMessageDecoder慌烧,這種方式很靈活逐抑,可以處理斷包粘包,還可以控制每個包的大小屹蚊,保證了靈活性安全性厕氨,性能也更高。這一步我們已經(jīng)獲取了0x7E開頭和結(jié)尾并且已經(jīng)反轉(zhuǎn)義的數(shù)據(jù)包了汹粤,交給下一個處理器處理命斧。
- Jt808ProtocolDecoder:這個處理器接收ByteBuf,按照JT808協(xié)議解析每個字段嘱兼,根據(jù)消息體屬性国葬,可以區(qū)分數(shù)據(jù)是JT808-2011還是JT808-2019,消息體內(nèi)容在BaseMessage里,然后封裝成JT808消息實體類汇四,傳遞給下一個處理器接奈。
以下是Jt808Message的代碼,我們要把每條消息所有字段都看成一個整體通孽,沒必要把消息頭消息體分離出去新建其他類序宦,最后還派生出一堆子類,只會把自己和別人繞暈背苦。
public class Jt808Message extends BaseMessage {
/**
* 消息ID
*/
private int msgId;
/**
* 終端手機號
*/
private String phoneNumber;
/**
* 終端手機號數(shù)組
*/
private byte[] phoneNumberArr;
/**
* 協(xié)議版本號
*/
private int protocolVersion;
/**
* 消息流水號
*/
private int msgFlowId;
/**
* 是否分包
*/
private boolean multiPacket;
/**
* 版本標識
*/
private int versionFlag;
/**
* 加密方式互捌,0:不加密,1:RSA加密
*/
private int encryptType;
/**
* 消息總包數(shù)
*/
private int packetTotalCount;
/**
* 包序號
*/
private int packetOrder;
/**
* 協(xié)議類型(JT808_2011行剂、JT808_2013秕噪、JT905、JT808_2019)
*/
private ProtocolEnum protocolType;
}
協(xié)議解碼器代碼:
@Slf4j
@Sharable
public class Jt808ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
public static final Jt808ProtocolDecoder INSTANCE = new Jt808ProtocolDecoder();
private Jt808ProtocolDecoder() {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//消息長度
int msgLen = msg.readableBytes();
//包頭
msg.readByte();
//消息ID
int msgId = msg.readUnsignedShort();
//消息體屬性
short msgBodyAttr = msg.readShort();
//消息體長度
int msgBodyLen = msgBodyAttr & 0b00000011_11111111;
//是否分包
boolean multiPacket = (msgBodyAttr & 0b00100000_00000000) > 0;
//版本標識(版本標識0為2011年的版本,1為2019年的版本)
int versionFlag = (msgBodyAttr & 0b01000000_00000000) >> 14;
//去除消息體的基礎(chǔ)長度
int baseLen = Jt808Constant.MSG_BASE_LENGTH;
ProtocolEnum protocolType = ProtocolEnum.JT808_2011;
if (versionFlag == 1) {
baseLen = Jt808Constant.JT2019_MSG_BASE_LENGTH;
protocolType = ProtocolEnum.JT808_2019;
}
//根據(jù)消息體長度和是否分包得出后面的包長
int ensureLen = multiPacket ? baseLen + msgBodyLen + 4 : baseLen + msgBodyLen;
if (msgLen < ensureLen) {
log.info("包長不對,數(shù)據(jù)長度:{},正確長度:{},數(shù)據(jù):{}", msgLen, ensureLen, ByteBufUtil.hexDump(msg));
return;
}
//數(shù)據(jù)加密方式
int encryptType = (msgBodyAttr & 0b00011100_00000000) >> 10;
//協(xié)議版本號
int protocolVersion = 0;
//終端手機號數(shù)組,JT808-2019為10個字節(jié)
byte[] phoneNumberArr;
if (protocolType == ProtocolEnum.JT808_2019) {
protocolVersion = msg.readByte();
phoneNumberArr = new byte[10];
} else {
phoneNumberArr = new byte[6];
}
msg.readBytes(phoneNumberArr);
//終端手機號(去除前面的0)
String phoneNumber = StringUtils.stripStart(ByteBufUtil.hexDump(phoneNumberArr), "0");
//消息流水號
int msgFlowId = msg.readUnsignedShort();
//消息總包數(shù)
int packetTotalCount = 0;
//包序號
int packetOrder = 0;
//分包
if (multiPacket) {
packetTotalCount = msg.readShort();
packetOrder = msg.readShort();
}
//消息體
byte[] msgBodyArr = new byte[msgBodyLen];
msg.readBytes(msgBodyArr);
//校驗碼
int checkCode = msg.readUnsignedByte();
//包尾
msg.readByte();
//計算和驗證校驗碼
ByteBuf checksumBuf = msg.slice(1, msgLen - 3);
int checksumResult = CommonUtil.xor(checksumBuf);
if (checksumResult != checkCode) {
log.error("校驗碼驗證失敗,計算結(jié)果:{},校驗碼:{},消息ID:{},手機號:{},數(shù)據(jù):{}", checksumResult, checkCode, NumberUtil.formatMessageId(msgId), phoneNumber, ByteBufUtil.hexDump(msg));
return;
}
//構(gòu)造Jt808消息,傳遞到下一個handler處理
Jt808Message jt808Msg = new Jt808Message();
jt808Msg.setMsgId(msgId);
jt808Msg.setEncryptType(encryptType);
jt808Msg.setVersionFlag(versionFlag);
jt808Msg.setProtocolType(protocolType);
jt808Msg.setMultiPacket(multiPacket);
jt808Msg.setProtocolVersion(protocolVersion);
jt808Msg.setPhoneNumber(phoneNumber);
jt808Msg.setPhoneNumberArr(phoneNumberArr);
jt808Msg.setMsgFlowId(msgFlowId);
jt808Msg.setPacketTotalCount(packetTotalCount);
jt808Msg.setPacketOrder(packetOrder);
jt808Msg.setMsgBodyArr(msgBodyArr);
out.add(jt808Msg);
}
}
- Jt808LoginHandler:終端登錄的處理器硼讽,這個處理器接收了上一步解析傳遞的Jt808Message實體類巢价。我們知道,終端想接入網(wǎng)關(guān)固阁,必須先在系統(tǒng)中錄入了資料的才是合法終端壤躲。
驗證的方案有多種:1.查詢數(shù)據(jù)庫,這種方式不太好备燃,壓力測試時會給數(shù)據(jù)庫帶來壓力碉克,網(wǎng)關(guān)跟數(shù)據(jù)庫關(guān)聯(lián)也會造成解耦性不好,維護和部署都困難并齐。2.查詢緩存(Redis漏麦、Memcache等),web后臺啟動時把所有終端的資料都加載到緩存中况褪,這種方式性能高撕贞,解耦性好,其他模塊如JT809和主動安全的程序都可以共用緩存测垛。唯一要注意的是web對終端資料CRUD操作時捏膨,要同步好緩存。
這一步的驗證食侮,如果是部標過檢号涯,是需要驗證多個字段的,如車牌號锯七,車牌顏色链快,終端ID等,如果是商用環(huán)境眉尸,只需要驗證終端手機號是否合法即可域蜗。我們可以配置開關(guān)巨双,嚴格模式下按照部標過檢驗證多個字段,否則就驗證終端手機號地消。
登錄成功后炉峰,下一個數(shù)據(jù)包就不用再經(jīng)過登錄處理器了,可以在pipeline中移除這個處理器提高性能脉执,還可以設(shè)置一些屬性(如終端信息)到Channel的上下文中,交給下一個處理器戒劫。 - Jt808BusinessHandler:這個處理器是Pipeline的最后一個處理器半夷,是對所有消息業(yè)務(wù)邏輯的處理器。為了提高并發(fā)性迅细,這個處理器用了另外一組線程池處理巫橄,以免阻塞workerGroup。這個處理器是整個網(wǎng)關(guān)最重要的環(huán)節(jié)茵典,很多問題都發(fā)生在這里湘换。常見的問題有:
1.分包數(shù)據(jù)處理問題,我們知道统阿,拍照數(shù)據(jù)彩倚、錄像資源列表等經(jīng)常會分包上傳的,未接收完所有分包是無法處理業(yè)務(wù)的扶平。
2.很多朋友直接在這個處理器中if(msgId == 0x***) else if()進行處理了帆离,JT808和JT1078加起來上百種消息類型,這個類得幾百上千行了结澄,維護的人是不是會崩潰哥谷,我們設(shè)計時也要考慮維護人員的感受。
3.消息體內(nèi)容在ByteBuf中麻献,處理完或者出現(xiàn)異常時有沒有及時釋放们妥?我們知道ByteBuf有個引用計數(shù)器refCnt,如果大于0勉吻,則永遠不會釋放监婶,累積多了則會內(nèi)存溢出。
針對這些痛點餐曼,我們采用了設(shè)計模式中的策略模式压储。JT808的每種消息類型對應(yīng)一種Service,每個Service繼承BaseMessageService<T>源譬,在網(wǎng)關(guān)程序啟動時會把這些Service注冊到MessageServiceProvider中集惋,收到JT808消息時會從MessageServiceProvider中查找相應(yīng)的處理器。
業(yè)務(wù)處理器的關(guān)鍵代碼如下踩娘,十幾行代碼搞定了分包刮刑、業(yè)務(wù)統(tǒng)一處理喉祭、日志統(tǒng)一打印、指令下發(fā)應(yīng)答雷绢、異常統(tǒng)一處理泛烙、資源統(tǒng)一釋放,而且這個模板是通用的翘紊,適用于任何其他協(xié)議的業(yè)務(wù)處理:
@Override
protected void channelRead0(ChannelHandlerContext ctx, Jt808Message msg) throws Exception {
//未接收完的分包不進入業(yè)務(wù)處理
Jt808Message wholeMsg = handleMultiPacket(ctx, msg);
if (wholeMsg == null) {
return;
}
//獲取對應(yīng)的消息處理器
int msgId = wholeMsg.getMsgId();
BaseMessageService messageService = messageServiceProvider.getMessageService(msgId);
ByteBuf msgBodyBuf = Unpooled.wrappedBuffer(wholeMsg.getMsgBodyArr());
try {
Object result = messageService.process(ctx, wholeMsg, msgBodyBuf);
log.info("收到{}({}),終端手機號:{},消息流水號:{},內(nèi)容:{}", messageService.getDesc(), NumberUtil.formatMessageId(msgId), wholeMsg.getPhoneNumber(), wholeMsg.getMsgFlowId(), wholeMsg.getMsgBodyItems());
//發(fā)送指令應(yīng)答給調(diào)用方
if (result != null) {
downCommandReceiver.sendUpCommand(ctx, NumberUtil.hexStr(msgId), result);
}
} catch (Exception e) {
printExceptionLog(wholeMsg, messageService, e);
} finally {
//處理完業(yè)務(wù)邏輯統(tǒng)一釋放資源
ReferenceCountUtil.release(msgBodyBuf);
}
}
首先我們先處理分包蔽氨,不分包的消息直接返回給業(yè)務(wù)處理器處理。如果是分包的帆疟,收到第一包時會創(chuàng)建一個分包接收器鹉究,里面會自動判斷有無接收完,接收完后會自動把所有分包數(shù)據(jù)整合在一起踪宠,然后返回給業(yè)務(wù)處理器處理自赔。分包接收器代碼篇幅有限暫時不貼出:
private Jt808Message handleMultiPacket(ChannelHandlerContext ctx, Jt808Message msg) {
//不分包
if (!msg.isMultiPacket()) {
return msg;
}
//總包數(shù)
int packetTotalCount = msg.getPacketTotalCount();
//當前包序號
int packetOrder = msg.getPacketOrder();
//第一包,創(chuàng)建分包接收器
if (packetTotalCount > 1 && packetOrder == 1) {
multiPacketService.createMultiPacketReceiver(ctx, msg);
Jt808PacketUtil.reply8001(ctx, msg);
log.info("收到{},終端手機號:{},消息流水號:{},分包總包數(shù):{},第{}包,內(nèi)容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
return null;
}
//后續(xù)包
if (packetTotalCount > 1 && packetOrder > 1) {
Jt808Message wholeMsg = multiPacketService.addSubPacket(msg);
Jt808PacketUtil.reply8001(ctx, msg);
log.info("收到{},終端手機號:{},消息流水號:{},分包總包數(shù):{},第{}包,內(nèi)容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
return wholeMsg;
}
//單個數(shù)據(jù)包
return msg;
}
再往下看業(yè)務(wù)處理,我們設(shè)計了一個通用的泛型消息服務(wù)類BaseMessageService<T>柳琢,T表示各種協(xié)議的消息實體類绍妨,可以處理任何私有協(xié)議(JT809和主動安全程序也使用了這種處理方式),有些私有協(xié)議的消息ID是字符串類型的柬脸,這個服務(wù)類也做了兼容他去,只需要實現(xiàn)里面的process方法即可。這個方法傳遞了socket上下文可以獲取該socket綁定的終端信息肖粮,消息實體類T以及消息體內(nèi)容的ByteBuf孤页。每種消息類型的處理都集中在這個方法中,按照協(xié)議從ByteBuf解析消息體內(nèi)容即可涩馆。
以下是BaseService的代碼:
public abstract class BaseMessageService<T extends BaseMessage> {
/**
* 消息ID
*/
private int messageId;
/**
* 字符串消息ID
*/
private String strMessageId;
/**
* 消息處理器描述
*/
private String desc;
/**
* 獲取終端信息
*
* @param ctx socket上下文
* @return 終端信息
*/
public TerminalProto getTerminalInfo(ChannelHandlerContext ctx) {
return SessionUtil.getTerminalInfo(ctx);
}
/**
* 檢查消息體長度
*
* @param msg 消息
* @param msgBodyLen 消息體長度
* @throws ApplicationException 應(yīng)用異常
*/
public void checkMessageBodyLen(T msg, int msgBodyLen) throws ApplicationException {
byte[] msgBody = msg.getMsgBodyArr();
if (msgBody.length < msgBodyLen) {
throw new ApplicationException("消息體長度不對,不能小于" + msgBodyLen);
}
}
/**
* 處理消息
*
* @param ctx socket上下文
* @param msg 消息
* @param msgBodyBuf 消息體
* @return 返回結(jié)果
* @throws Exception 異常
*/
public abstract Object process(ChannelHandlerContext ctx, T msg, ByteBuf msgBodyBuf) throws Exception;
}
這里貼出0x0200位置匯報的服務(wù)類:
public class Message0200Service extends BaseMessageService<Jt808Message> {
@Autowired
private RabbitMessageSender messageSender;
@Override
public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
//檢查消息體長度
checkMessageBodyLen(msg, 28);
//通用應(yīng)答
Jt808PacketUtil.reply8001(ctx, msg);
//解析位置信息和附加信息
LocationProto location = LocationParser.parse(msg, msgBodyBuf);
//發(fā)送到MQ
messageSender.sendLocation(getTerminalInfo(ctx), location);
msg.putMessageBodyItem("位置", location);
return null;
}
}
幾行代碼完成了消息應(yīng)答行施、位置解析、位置發(fā)送到MQ魂那。
以下是拍照的0x0801多媒體數(shù)據(jù)上傳處理:
public class Message0801Service extends BaseMessageService<Jt808Message> {
@Autowired
private RabbitMessageSender messageSender;
@Override
public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
//多媒體ID
long mediaId = msgBodyBuf.readUnsignedInt();
//多媒體類型
int mediaType = msgBodyBuf.readByte();
//多媒體格式編碼
int mediaFormatCode = msgBodyBuf.readByte();
//事件項編碼
int eventItemCode = msgBodyBuf.readByte();
//通道ID
int channelId = msgBodyBuf.readByte();
//老協(xié)議不帶位置數(shù)據(jù)(28 bytes)蛾号,圖片數(shù)據(jù)以0xFFD8開頭
LocationProto location = null;
if (mediaFormatCode != 0 || msgBodyBuf.getUnsignedShort(0) != 0xFFD8) {
location = LocationParser.parseLocation(msgBodyBuf);
}
//多媒體數(shù)據(jù)
byte[] mediaData = new byte[msgBodyBuf.readableBytes()];
msgBodyBuf.readBytes(mediaData);
MediaFileProto mediaFile = new MediaFileProto();
mediaFile.setMediaId(mediaId);
mediaFile.setMediaType(mediaType);
mediaFile.setMediaFormatCode(mediaFormatCode);
mediaFile.setEventItemCode(eventItemCode);
mediaFile.setChannelId(channelId);
mediaFile.setLocation(location);
mediaFile.setMediaData(mediaData);
mediaFile.setTerminalInfo(getTerminalInfo(ctx));
//發(fā)送到MQ
messageSender.sendMediaFile(mediaFile);
return mediaFile;
}
}
其他協(xié)議的處理也是采用這個方法,比如JT809的從鏈路連接保持請求消息處理:
public class DownLinkTestReqProcessor extends BaseMessageService<Jt809Message> {
@Autowired
private MessageSendService messageSendService;
@Autowired
private DownLinkTestRspSender downLinkTestRspSender;
@Override
public Object process(ChannelHandlerContext ctx, Jt809Message jt809Msg, ByteBuf msgBodyBuf) throws Exception {
//發(fā)送JT809日志到MQ
Jt809Status jt809Status = Jt809Manager.getStatusAttr(ctx);
Jt809ConfigDTO jt809Config = jt809Status.getJt809Config();
messageSendService.publishJt809Log(jt809Config, jt809Msg);
//發(fā)送從鏈路連接保持應(yīng)答消息
downLinkTestRspSender.send(ctx, jt809Status);
return null;
}
}
至此涯雅,整個網(wǎng)關(guān)的工作量就全部集中在每種消息服務(wù)的開發(fā)了鲜结。內(nèi)存溢出、資源未釋放活逆、異常等問題全部都得到了統(tǒng)一的處理精刷,可以放心大膽的開發(fā)業(yè)務(wù)邏輯。有了通用處理模板蔗候,開發(fā)效率大幅提升怒允,其他私有協(xié)議網(wǎng)關(guān)的開發(fā)也變得異常簡單。如果MQ消息傳輸格式定義好的話锈遥,整個網(wǎng)關(guān)程序2-3天就能全部開發(fā)完纫事。而且無任何網(wǎng)絡(luò)編程經(jīng)驗的人員都能很快接手勘畔。
整個工程除了業(yè)務(wù)service,其他類只有十來個:
3.整合SpringBoot
TcpServer需要另外開啟線程啟動的丽惶,不要占用阻塞SpringBoot的主線程炫七。
配置多環(huán)境開發(fā):
生產(chǎn)環(huán)境的配置:application-prod.yml。
gnss:
jt808:
tcpPort: 6608
middleware-ip: 127.0.0.1
threadPool:
size: 10
message:
converter: proto
spring:
redis:
host: ${gnss.middleware-ip}
port: 6379
password: gps-pro@cn
rabbitmq:
host: ${gnss.middleware-ip}
port: 5672
username: guest
password: guest
支持2種MQ序列化方式:ProtoBuf和JSON钾唬,可以在配置文件切換万哪。
ProtoBuf性能高,安全性高抡秆,傳輸量少壤圃,Web后臺是JAVA開發(fā)的話可以選用這種方式,雖然也跨語言但要復雜一些琅轧。
JSON性能差,安全性差踊挠,傳輸量大乍桂,優(yōu)點是跨語言兼容性好。如果后臺是非JAVA的可以選擇這種方式效床。
如圖睹酌,發(fā)送一條相同的位置到MQ,ProtoBuf需要152字節(jié)剩檀,JSON需要675字節(jié)憋沿,傳輸量差了5倍。
啟動后會自動加載消息處理器:
終端連接服務(wù)器并且發(fā)送位置沪猴,然后斷開連接時辐啄,日志打犹几臁: