SpringBoot+Netty構(gòu)建高并發(fā)穩(wěn)健的部標JT808網(wǎng)關(guān)

應(yīng)很多朋友的要求胞锰,今天分享一下如何使用SpringBoot和Netty構(gòu)建高并發(fā)穩(wěn)健的JT808網(wǎng)關(guān)流强,并且是兼容JT808-2011和JT808-2019的網(wǎng)關(guān),此網(wǎng)關(guān)已經(jīng)有多個客戶在商用。

JT808網(wǎng)關(guān)作為部標終端連接的服務(wù)端雪侥,承載了終端登錄斑芜、心跳肩刃、位置、拍照等基礎(chǔ)業(yè)務(wù)以及信令交互杏头,是整個系統(tǒng)最核心的模塊盈包,一旦崩潰,則所有部標終端都會離線醇王,所有信令交互包括1078和主動安全的信令交互也會大受影響呢燥。所以,JT808網(wǎng)關(guān)的并發(fā)性穩(wěn)定性健壯性成為整個系統(tǒng)最重要的考量之一寓娩。

很多朋友用Mina或者Netty編寫網(wǎng)關(guān)程序時遇到過很多問題:

  • 線程阻塞叛氨、內(nèi)存溢出等。
  • 將所有數(shù)據(jù)轉(zhuǎn)成16進制字符串棘伴,用字符串操作數(shù)據(jù)寞埠。字符串處理的效率是最低的,當終端越來越多時焊夸,性能問題就會凸顯仁连。應(yīng)當充分使用Netty的ByteBuf處理數(shù)據(jù)。
  • 未充分利用Netty的pipeline鏈式處理器阱穗,將所有的業(yè)務(wù)都放在一個handler中處理饭冬。
  • JT808消息類型多使鹅,幾十上百個,如果采用if/else或者枚舉case判斷昌抠,造成業(yè)務(wù)處理類臃腫龐大患朱,維護和新增業(yè)務(wù)處理及其困難。
  • 今年推出的JT808-2019扰魂,不知道如何兼容擴展麦乞。

本文使用JDK8+的環(huán)境開發(fā),使用SpringBoot2.x以及Netty4.x劝评,如有不懂JDK8的新語法姐直,請查閱資料。
此網(wǎng)關(guān)的特性:
1.支持JT808-2011蒋畜、JT808-2019声畏、JT1078報警、主動安全報警
2.使用MQ和Redis解耦姻成,多模塊數(shù)據(jù)共享訂閱插龄,不與任何數(shù)據(jù)庫關(guān)聯(lián)
3.多環(huán)境開發(fā)
4.跨平臺,部署簡單
5.支持ProtoBuf和JSON序列化
6.本公司首創(chuàng)的利用策略模式的底層封裝庫科展,模板可用于任何協(xié)議的開發(fā)均牢,簡化了網(wǎng)絡(luò)編程的復雜度,只專注于業(yè)務(wù)開發(fā)才睹,無任何網(wǎng)絡(luò)編程經(jīng)驗的人員都可接手徘跪,節(jié)省開發(fā)成本。

1.通用TcpServer創(chuàng)建
public class TcpServer {
    public TcpServer(int threadPoolSize, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                    }
                });

        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
            //啟動成功
            } else {
            //啟動失敗
            }
        });
    }
}
  • 首先看到琅攘,我們創(chuàng)建了兩個NioEventLoopGroup垮庐,這兩個對象可以看做是傳統(tǒng)IO編程模型的兩大線程組。bossGroup負責監(jiān)聽端口坞琴,accept 新連接的線程組哨查,這個線程數(shù)不宜過大,1-2個即可剧辐。workerGroup是負責處理每個連接的數(shù)據(jù)讀寫的線程組寒亥,默認線程數(shù)為CPU核心數(shù)的2倍。用通俗易懂的例子就是荧关,一個企業(yè)運作护盈,當然要有一個老板負責從外面接活,然后下面有很多員工負責具體干活羞酗,老板就是bossGroup,員工就是workerGroup紊服。bossGroup接收完連接檀轨,扔給workerGroup處理胸竞。
  • ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開啟
    ChannelOption.TCP_NODELAY表示是否開啟Nagle算法参萄,true表示關(guān)閉卫枝,false表示開啟,通俗地說讹挎,如果要求高實時性校赤,有數(shù)據(jù)發(fā)送時就馬上發(fā)送,就關(guān)閉筒溃,如果需要減少發(fā)送次數(shù)減少網(wǎng)絡(luò)交互马篮,就開啟。
  • 接著怜奖,我們調(diào)用childHandler()方法浑测,給這個引導類創(chuàng)建一個ChannelInitializer,這里主要定義后續(xù)每個連接的數(shù)據(jù)讀寫歪玲,業(yè)務(wù)處理邏輯迁央。
2.接著設(shè)計最重要的Channel Pipeline中的鏈式處理器

先貼上我們pipeline的處理器都有哪些:

ch.pipeline().addLast(new IdleStateHandler(Jt808Constant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new Jt808FrameDecoder());
ch.pipeline().addLast(Jt808ProtocolDecoder.INSTANCE, new Jt808ProtocolEncoder());
ch.pipeline().addLast(Jt808LoginHandler.INSTANCE);
ch.pipeline().addLast(executorGroup, Jt808BusinessHandler.INSTANCE);
  • IdleStateHandler:Netty提供的讀寫空閑狀態(tài)檢測的處理器。
  • Jt808FrameDecoder:首先我們需要獲取0x7E開頭0x7E結(jié)尾的完整JT808消息才能進行下一步的解包滥崩。由于網(wǎng)絡(luò)問題岖圈,數(shù)據(jù)包可能會出現(xiàn)斷包或者粘包的情況。很多朋友會采用DelimiterBasedFrameDecoder的方式截取以0x7E結(jié)尾的數(shù)據(jù)作為完整的數(shù)據(jù)包钙皮。這里有個問題蜂科,如果黑客是熟悉JT808協(xié)議的,他發(fā)了幾十M的數(shù)據(jù)中間都是不含0x7E的株灸,截取的數(shù)據(jù)就會有幾十M崇摄,發(fā)多條進行攻擊內(nèi)存一下子就爆了。我們采用了最原始的ByteToMessageDecoder慌烧,這種方式很靈活逐抑,可以處理斷包粘包,還可以控制每個包的大小屹蚊,保證了靈活性安全性厕氨,性能也更高。這一步我們已經(jīng)獲取了0x7E開頭和結(jié)尾并且已經(jīng)反轉(zhuǎn)義的數(shù)據(jù)包了汹粤,交給下一個處理器處理命斧。
  • Jt808ProtocolDecoder:這個處理器接收ByteBuf,按照JT808協(xié)議解析每個字段嘱兼,根據(jù)消息體屬性国葬,可以區(qū)分數(shù)據(jù)是JT808-2011還是JT808-2019,消息體內(nèi)容在BaseMessage里,然后封裝成JT808消息實體類汇四,傳遞給下一個處理器接奈。

以下是Jt808Message的代碼,我們要把每條消息所有字段都看成一個整體通孽,沒必要把消息頭消息體分離出去新建其他類序宦,最后還派生出一堆子類,只會把自己和別人繞暈背苦。

public class Jt808Message extends BaseMessage {
    /**
     * 消息ID
     */
    private int msgId;

    /**
     * 終端手機號
     */
    private String phoneNumber;

    /**
     * 終端手機號數(shù)組
     */
    private byte[] phoneNumberArr;

    /**
     * 協(xié)議版本號
     */
    private int protocolVersion;

    /**
     * 消息流水號
     */
    private int msgFlowId;

    /**
     * 是否分包
     */
    private boolean multiPacket;

    /**
     * 版本標識
     */
    private int versionFlag;

    /**
     * 加密方式互捌,0:不加密,1:RSA加密
     */
    private int encryptType;

    /**
     * 消息總包數(shù)
     */
    private int packetTotalCount;

    /**
     * 包序號
     */
    private int packetOrder;

    /**
     * 協(xié)議類型(JT808_2011行剂、JT808_2013秕噪、JT905、JT808_2019)
     */
    private ProtocolEnum protocolType;
}

協(xié)議解碼器代碼:

@Slf4j
@Sharable
public class Jt808ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

    public static final Jt808ProtocolDecoder INSTANCE = new Jt808ProtocolDecoder();

    private Jt808ProtocolDecoder() {
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        //消息長度
        int msgLen = msg.readableBytes();
        //包頭
        msg.readByte();
        //消息ID
        int msgId = msg.readUnsignedShort();
        //消息體屬性
        short msgBodyAttr = msg.readShort();
        //消息體長度
        int msgBodyLen = msgBodyAttr & 0b00000011_11111111;
        //是否分包
        boolean multiPacket = (msgBodyAttr & 0b00100000_00000000) > 0;
        //版本標識(版本標識0為2011年的版本,1為2019年的版本)
        int versionFlag = (msgBodyAttr & 0b01000000_00000000) >> 14;
        //去除消息體的基礎(chǔ)長度
        int baseLen = Jt808Constant.MSG_BASE_LENGTH;
        ProtocolEnum protocolType = ProtocolEnum.JT808_2011;
        if (versionFlag == 1) {
            baseLen = Jt808Constant.JT2019_MSG_BASE_LENGTH;
            protocolType = ProtocolEnum.JT808_2019;
        }

        //根據(jù)消息體長度和是否分包得出后面的包長
        int ensureLen = multiPacket ? baseLen + msgBodyLen + 4 : baseLen + msgBodyLen;
        if (msgLen < ensureLen) {
            log.info("包長不對,數(shù)據(jù)長度:{},正確長度:{},數(shù)據(jù):{}", msgLen, ensureLen, ByteBufUtil.hexDump(msg));
            return;
        }

        //數(shù)據(jù)加密方式
        int encryptType = (msgBodyAttr & 0b00011100_00000000) >> 10;
        //協(xié)議版本號
        int protocolVersion = 0;
        //終端手機號數(shù)組,JT808-2019為10個字節(jié)
        byte[] phoneNumberArr;
        if (protocolType == ProtocolEnum.JT808_2019) {
            protocolVersion = msg.readByte();
            phoneNumberArr = new byte[10];
        } else {
            phoneNumberArr = new byte[6];
        }
        msg.readBytes(phoneNumberArr);
        //終端手機號(去除前面的0)
        String phoneNumber = StringUtils.stripStart(ByteBufUtil.hexDump(phoneNumberArr), "0");
        //消息流水號
        int msgFlowId = msg.readUnsignedShort();
        //消息總包數(shù)
        int packetTotalCount = 0;
        //包序號
        int packetOrder = 0;
        //分包
        if (multiPacket) {
            packetTotalCount = msg.readShort();
            packetOrder = msg.readShort();
        }
        //消息體
        byte[] msgBodyArr = new byte[msgBodyLen];
        msg.readBytes(msgBodyArr);
        //校驗碼
        int checkCode = msg.readUnsignedByte();
        //包尾
        msg.readByte();

        //計算和驗證校驗碼
        ByteBuf checksumBuf = msg.slice(1, msgLen - 3);
        int checksumResult = CommonUtil.xor(checksumBuf);
        if (checksumResult != checkCode) {
            log.error("校驗碼驗證失敗,計算結(jié)果:{},校驗碼:{},消息ID:{},手機號:{},數(shù)據(jù):{}", checksumResult, checkCode, NumberUtil.formatMessageId(msgId), phoneNumber, ByteBufUtil.hexDump(msg));
            return;
        }

        //構(gòu)造Jt808消息,傳遞到下一個handler處理
        Jt808Message jt808Msg = new Jt808Message();
        jt808Msg.setMsgId(msgId);
        jt808Msg.setEncryptType(encryptType);
        jt808Msg.setVersionFlag(versionFlag);
        jt808Msg.setProtocolType(protocolType);
        jt808Msg.setMultiPacket(multiPacket);
        jt808Msg.setProtocolVersion(protocolVersion);
        jt808Msg.setPhoneNumber(phoneNumber);
        jt808Msg.setPhoneNumberArr(phoneNumberArr);
        jt808Msg.setMsgFlowId(msgFlowId);
        jt808Msg.setPacketTotalCount(packetTotalCount);
        jt808Msg.setPacketOrder(packetOrder);
        jt808Msg.setMsgBodyArr(msgBodyArr);
        out.add(jt808Msg);
    }
}
  • Jt808LoginHandler:終端登錄的處理器硼讽,這個處理器接收了上一步解析傳遞的Jt808Message實體類巢价。我們知道,終端想接入網(wǎng)關(guān)固阁,必須先在系統(tǒng)中錄入了資料的才是合法終端壤躲。
    驗證的方案有多種:1.查詢數(shù)據(jù)庫,這種方式不太好备燃,壓力測試時會給數(shù)據(jù)庫帶來壓力碉克,網(wǎng)關(guān)跟數(shù)據(jù)庫關(guān)聯(lián)也會造成解耦性不好,維護和部署都困難并齐。2.查詢緩存(Redis漏麦、Memcache等),web后臺啟動時把所有終端的資料都加載到緩存中况褪,這種方式性能高撕贞,解耦性好,其他模塊如JT809和主動安全的程序都可以共用緩存测垛。唯一要注意的是web對終端資料CRUD操作時捏膨,要同步好緩存。
    這一步的驗證食侮,如果是部標過檢号涯,是需要驗證多個字段的,如車牌號锯七,車牌顏色链快,終端ID等,如果是商用環(huán)境眉尸,只需要驗證終端手機號是否合法即可域蜗。我們可以配置開關(guān)巨双,嚴格模式下按照部標過檢驗證多個字段,否則就驗證終端手機號地消。
    登錄成功后炉峰,下一個數(shù)據(jù)包就不用再經(jīng)過登錄處理器了,可以在pipeline中移除這個處理器提高性能脉执,還可以設(shè)置一些屬性(如終端信息)到Channel的上下文中,交給下一個處理器戒劫。
  • Jt808BusinessHandler:這個處理器是Pipeline的最后一個處理器半夷,是對所有消息業(yè)務(wù)邏輯的處理器。為了提高并發(fā)性迅细,這個處理器用了另外一組線程池處理巫橄,以免阻塞workerGroup。這個處理器是整個網(wǎng)關(guān)最重要的環(huán)節(jié)茵典,很多問題都發(fā)生在這里湘换。常見的問題有:
    1.分包數(shù)據(jù)處理問題,我們知道统阿,拍照數(shù)據(jù)彩倚、錄像資源列表等經(jīng)常會分包上傳的,未接收完所有分包是無法處理業(yè)務(wù)的扶平。
    2.很多朋友直接在這個處理器中if(msgId == 0x***) else if()進行處理了帆离,JT808和JT1078加起來上百種消息類型,這個類得幾百上千行了结澄,維護的人是不是會崩潰哥谷,我們設(shè)計時也要考慮維護人員的感受。
    3.消息體內(nèi)容在ByteBuf中麻献,處理完或者出現(xiàn)異常時有沒有及時釋放们妥?我們知道ByteBuf有個引用計數(shù)器refCnt,如果大于0勉吻,則永遠不會釋放监婶,累積多了則會內(nèi)存溢出。
    針對這些痛點餐曼,我們采用了設(shè)計模式中的策略模式压储。JT808的每種消息類型對應(yīng)一種Service,每個Service繼承BaseMessageService<T>源譬,在網(wǎng)關(guān)程序啟動時會把這些Service注冊到MessageServiceProvider中集惋,收到JT808消息時會從MessageServiceProvider中查找相應(yīng)的處理器。
    業(yè)務(wù)處理器的關(guān)鍵代碼如下踩娘,十幾行代碼搞定了分包刮刑、業(yè)務(wù)統(tǒng)一處理喉祭、日志統(tǒng)一打印、指令下發(fā)應(yīng)答雷绢、異常統(tǒng)一處理泛烙、資源統(tǒng)一釋放,而且這個模板是通用的翘紊,適用于任何其他協(xié)議的業(yè)務(wù)處理:
@Override
protected void channelRead0(ChannelHandlerContext ctx, Jt808Message msg) throws Exception {
    //未接收完的分包不進入業(yè)務(wù)處理
    Jt808Message wholeMsg = handleMultiPacket(ctx, msg);
    if (wholeMsg == null) {
        return;
    }

    //獲取對應(yīng)的消息處理器
    int msgId = wholeMsg.getMsgId();
    BaseMessageService messageService = messageServiceProvider.getMessageService(msgId);
    ByteBuf msgBodyBuf = Unpooled.wrappedBuffer(wholeMsg.getMsgBodyArr());
    try {
        Object result = messageService.process(ctx, wholeMsg, msgBodyBuf);
        log.info("收到{}({}),終端手機號:{},消息流水號:{},內(nèi)容:{}", messageService.getDesc(), NumberUtil.formatMessageId(msgId), wholeMsg.getPhoneNumber(), wholeMsg.getMsgFlowId(), wholeMsg.getMsgBodyItems());
        //發(fā)送指令應(yīng)答給調(diào)用方
        if (result != null) {
            downCommandReceiver.sendUpCommand(ctx, NumberUtil.hexStr(msgId), result);
        }
    } catch (Exception e) {
        printExceptionLog(wholeMsg, messageService, e);
    } finally {
        //處理完業(yè)務(wù)邏輯統(tǒng)一釋放資源
        ReferenceCountUtil.release(msgBodyBuf);
    }
}

首先我們先處理分包蔽氨,不分包的消息直接返回給業(yè)務(wù)處理器處理。如果是分包的帆疟,收到第一包時會創(chuàng)建一個分包接收器鹉究,里面會自動判斷有無接收完,接收完后會自動把所有分包數(shù)據(jù)整合在一起踪宠,然后返回給業(yè)務(wù)處理器處理自赔。分包接收器代碼篇幅有限暫時不貼出:

private Jt808Message handleMultiPacket(ChannelHandlerContext ctx, Jt808Message msg) {
    //不分包
    if (!msg.isMultiPacket()) {
        return msg;
    }

    //總包數(shù)
    int packetTotalCount = msg.getPacketTotalCount();
    //當前包序號
    int packetOrder = msg.getPacketOrder();
    //第一包,創(chuàng)建分包接收器
    if (packetTotalCount > 1 && packetOrder == 1) {
        multiPacketService.createMultiPacketReceiver(ctx, msg);
        Jt808PacketUtil.reply8001(ctx, msg);
        log.info("收到{},終端手機號:{},消息流水號:{},分包總包數(shù):{},第{}包,內(nèi)容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
        return null;
    }
    //后續(xù)包
    if (packetTotalCount > 1 && packetOrder > 1) {
        Jt808Message wholeMsg = multiPacketService.addSubPacket(msg);
        Jt808PacketUtil.reply8001(ctx, msg);
        log.info("收到{},終端手機號:{},消息流水號:{},分包總包數(shù):{},第{}包,內(nèi)容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
        return wholeMsg;
    }
    //單個數(shù)據(jù)包
    return msg;
}

再往下看業(yè)務(wù)處理,我們設(shè)計了一個通用的泛型消息服務(wù)類BaseMessageService<T>柳琢,T表示各種協(xié)議的消息實體類绍妨,可以處理任何私有協(xié)議(JT809和主動安全程序也使用了這種處理方式),有些私有協(xié)議的消息ID是字符串類型的柬脸,這個服務(wù)類也做了兼容他去,只需要實現(xiàn)里面的process方法即可。這個方法傳遞了socket上下文可以獲取該socket綁定的終端信息肖粮,消息實體類T以及消息體內(nèi)容的ByteBuf孤页。每種消息類型的處理都集中在這個方法中,按照協(xié)議從ByteBuf解析消息體內(nèi)容即可涩馆。
以下是BaseService的代碼:

public abstract class BaseMessageService<T extends BaseMessage> {

    /**
     * 消息ID
     */
    private int messageId;

    /**
     * 字符串消息ID
     */
    private String strMessageId;

    /**
     * 消息處理器描述
     */
    private String desc;

    /**
     * 獲取終端信息
     *
     * @param ctx socket上下文
     * @return 終端信息
     */
    public TerminalProto getTerminalInfo(ChannelHandlerContext ctx) {
        return SessionUtil.getTerminalInfo(ctx);
    }

    /**
     * 檢查消息體長度
     *
     * @param msg        消息
     * @param msgBodyLen 消息體長度
     * @throws ApplicationException 應(yīng)用異常
     */
    public void checkMessageBodyLen(T msg, int msgBodyLen) throws ApplicationException {
        byte[] msgBody = msg.getMsgBodyArr();
        if (msgBody.length < msgBodyLen) {
            throw new ApplicationException("消息體長度不對,不能小于" + msgBodyLen);
        }
    }

    /**
     * 處理消息
     *
     * @param ctx        socket上下文
     * @param msg        消息
     * @param msgBodyBuf 消息體
     * @return 返回結(jié)果
     * @throws Exception 異常
     */
    public abstract Object process(ChannelHandlerContext ctx, T msg, ByteBuf msgBodyBuf) throws Exception;
}

這里貼出0x0200位置匯報的服務(wù)類:

public class Message0200Service extends BaseMessageService<Jt808Message> {

    @Autowired
    private RabbitMessageSender messageSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
        //檢查消息體長度
        checkMessageBodyLen(msg, 28);
        //通用應(yīng)答
        Jt808PacketUtil.reply8001(ctx, msg);
        //解析位置信息和附加信息
        LocationProto location = LocationParser.parse(msg, msgBodyBuf);
        //發(fā)送到MQ
        messageSender.sendLocation(getTerminalInfo(ctx), location);

        msg.putMessageBodyItem("位置", location);
        return null;
    }
}

幾行代碼完成了消息應(yīng)答行施、位置解析、位置發(fā)送到MQ魂那。
以下是拍照的0x0801多媒體數(shù)據(jù)上傳處理:

public class Message0801Service extends BaseMessageService<Jt808Message> {

    @Autowired
    private RabbitMessageSender messageSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
        //多媒體ID
        long mediaId = msgBodyBuf.readUnsignedInt();
        //多媒體類型
        int mediaType = msgBodyBuf.readByte();
        //多媒體格式編碼
        int mediaFormatCode = msgBodyBuf.readByte();
        //事件項編碼
        int eventItemCode = msgBodyBuf.readByte();
        //通道ID
        int channelId = msgBodyBuf.readByte();

        //老協(xié)議不帶位置數(shù)據(jù)(28 bytes)蛾号,圖片數(shù)據(jù)以0xFFD8開頭
        LocationProto location = null;
        if (mediaFormatCode != 0 || msgBodyBuf.getUnsignedShort(0) != 0xFFD8) {
            location = LocationParser.parseLocation(msgBodyBuf);
        }

        //多媒體數(shù)據(jù)
        byte[] mediaData = new byte[msgBodyBuf.readableBytes()];
        msgBodyBuf.readBytes(mediaData);

        MediaFileProto mediaFile = new MediaFileProto();
        mediaFile.setMediaId(mediaId);
        mediaFile.setMediaType(mediaType);
        mediaFile.setMediaFormatCode(mediaFormatCode);
        mediaFile.setEventItemCode(eventItemCode);
        mediaFile.setChannelId(channelId);
        mediaFile.setLocation(location);
        mediaFile.setMediaData(mediaData);
        mediaFile.setTerminalInfo(getTerminalInfo(ctx));

        //發(fā)送到MQ
        messageSender.sendMediaFile(mediaFile);
        return mediaFile;
    }
}

其他協(xié)議的處理也是采用這個方法,比如JT809的從鏈路連接保持請求消息處理:

public class DownLinkTestReqProcessor extends BaseMessageService<Jt809Message> {

    @Autowired
    private MessageSendService messageSendService;

    @Autowired
    private DownLinkTestRspSender downLinkTestRspSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt809Message jt809Msg, ByteBuf msgBodyBuf) throws Exception {
        //發(fā)送JT809日志到MQ
        Jt809Status jt809Status = Jt809Manager.getStatusAttr(ctx);
        Jt809ConfigDTO jt809Config = jt809Status.getJt809Config();
        messageSendService.publishJt809Log(jt809Config, jt809Msg);

        //發(fā)送從鏈路連接保持應(yīng)答消息
        downLinkTestRspSender.send(ctx, jt809Status);
        return null;
    }
}

至此涯雅,整個網(wǎng)關(guān)的工作量就全部集中在每種消息服務(wù)的開發(fā)了鲜结。內(nèi)存溢出、資源未釋放活逆、異常等問題全部都得到了統(tǒng)一的處理精刷,可以放心大膽的開發(fā)業(yè)務(wù)邏輯。有了通用處理模板蔗候,開發(fā)效率大幅提升怒允,其他私有協(xié)議網(wǎng)關(guān)的開發(fā)也變得異常簡單。如果MQ消息傳輸格式定義好的話锈遥,整個網(wǎng)關(guān)程序2-3天就能全部開發(fā)完纫事。而且無任何網(wǎng)絡(luò)編程經(jīng)驗的人員都能很快接手勘畔。


在這里插入圖片描述

整個工程除了業(yè)務(wù)service,其他類只有十來個:


在這里插入圖片描述
3.整合SpringBoot

TcpServer需要另外開啟線程啟動的丽惶,不要占用阻塞SpringBoot的主線程炫七。
配置多環(huán)境開發(fā):


在這里插入圖片描述

生產(chǎn)環(huán)境的配置:application-prod.yml。

gnss:
    jt808:
      tcpPort: 6608
    middleware-ip: 127.0.0.1
    threadPool:
      size: 10
    message:
      converter: proto

spring:
  redis:
    host: ${gnss.middleware-ip}
    port: 6379
    password: gps-pro@cn

  rabbitmq:
    host: ${gnss.middleware-ip}
    port: 5672
    username: guest
    password: guest

支持2種MQ序列化方式:ProtoBuf和JSON钾唬,可以在配置文件切換万哪。
ProtoBuf性能高,安全性高抡秆,傳輸量少壤圃,Web后臺是JAVA開發(fā)的話可以選用這種方式,雖然也跨語言但要復雜一些琅轧。
JSON性能差,安全性差踊挠,傳輸量大乍桂,優(yōu)點是跨語言兼容性好。如果后臺是非JAVA的可以選擇這種方式效床。
如圖睹酌,發(fā)送一條相同的位置到MQ,ProtoBuf需要152字節(jié)剩檀,JSON需要675字節(jié)憋沿,傳輸量差了5倍。


在這里插入圖片描述

在這里插入圖片描述

啟動后會自動加載消息處理器:


在這里插入圖片描述

終端連接服務(wù)器并且發(fā)送位置沪猴,然后斷開連接時辐啄,日志打犹几臁:
在這里插入圖片描述
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末轻纪,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子铛漓,更是在濱河造成了極大的恐慌担租,老刑警劉巖砸民,帶你破解...
    沈念sama閱讀 212,294評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異奋救,居然都是意外死亡岭参,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,493評論 3 385
  • 文/潘曉璐 我一進店門尝艘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來演侯,“玉大人,你說我怎么就攤上這事利耍“霰荆” “怎么了盔粹?”我有些...
    開封第一講書人閱讀 157,790評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長程癌。 經(jīng)常有香客問我舷嗡,道長,這世上最難降的妖魔是什么嵌莉? 我笑而不...
    開封第一講書人閱讀 56,595評論 1 284
  • 正文 為了忘掉前任进萄,我火速辦了婚禮,結(jié)果婚禮上锐峭,老公的妹妹穿的比我還像新娘中鼠。我一直安慰自己,他們只是感情好沿癞,可當我...
    茶點故事閱讀 65,718評論 6 386
  • 文/花漫 我一把揭開白布援雇。 她就那樣靜靜地躺著,像睡著了一般椎扬。 火紅的嫁衣襯著肌膚如雪惫搏。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,906評論 1 290
  • 那天蚕涤,我揣著相機與錄音筐赔,去河邊找鬼。 笑死揖铜,一個胖子當著我的面吹牛茴丰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播天吓,決...
    沈念sama閱讀 39,053評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼贿肩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了失仁?” 一聲冷哼從身側(cè)響起尸曼,我...
    開封第一講書人閱讀 37,797評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萄焦,沒想到半個月后控轿,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,250評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡拂封,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,570評論 2 327
  • 正文 我和宋清朗相戀三年茬射,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冒签。...
    茶點故事閱讀 38,711評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡在抛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出萧恕,到底是詐尸還是另有隱情刚梭,我是刑警寧澤肠阱,帶...
    沈念sama閱讀 34,388評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站朴读,受9級特大地震影響屹徘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜衅金,卻給世界環(huán)境...
    茶點故事閱讀 40,018評論 3 316
  • 文/蒙蒙 一噪伊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧氮唯,春花似錦鉴吹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,796評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至瞒渠,卻和暖如春肆糕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背在孝。 一陣腳步聲響...
    開封第一講書人閱讀 32,023評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留淮摔,地道東北人私沮。 一個月前我還...
    沈念sama閱讀 46,461評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像和橙,于是被迫代替她去往敵國和親仔燕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,595評論 2 350

推薦閱讀更多精彩內(nèi)容