netty搭建tcp服務(wù)器通信(解決粘包問題)

最近做的項(xiàng)目有需求跟硬件通信,使用tcp實(shí)現(xiàn)長(zhǎng)連接,協(xié)議自己規(guī)定,于是后端決定選用netty來(lái)作為tcp服務(wù)器着降,這里簡(jiǎn)單說(shuō)一下netty的工作流程。外部的數(shù)據(jù)傳入netty服務(wù)器中汁展,netty首先通過解碼器對(duì)數(shù)據(jù)進(jìn)行一次預(yù)處理(比如把字節(jié)轉(zhuǎn)為字符串或?qū)ο髞?lái)方便操作)鹊碍,接著把預(yù)處理后的數(shù)據(jù)轉(zhuǎn)發(fā)給處理器,在處理器中執(zhí)行業(yè)務(wù)邏輯食绿,最后如果有必要返回?cái)?shù)據(jù)給連接者,可以通過netty提供的channel發(fā)送公罕。

  • netty—>decode—>handler

首先是啟動(dòng)一個(gè)tcp服務(wù)器

package server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author lanni
 * @date 2020/8/19 23:05
 * @description
 **/
public class TCPServer {
    public void run(int port) throws Exception {
        //創(chuàng)建線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //創(chuàng)建啟動(dòng)類
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer())
                    .option(ChannelOption.SO_BACKLOG, 256)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 綁定端口器紧,開始接收進(jìn)來(lái)的連接
            ChannelFuture f = b.bind(port).sync();
            // 等待服務(wù)器 socket 關(guān)閉 。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        try {
            System.out.println("tcp服務(wù)器啟動(dòng)...");
            new TCPServer().run(8998);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

初始化解碼器楼眷、處理器

package server;

import handler.CustomDecode;
import handler.TCPServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

/**
 * @author lanni
 * @date 2020/8/22 11:58
 * @description
 **/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().
                addLast(new CustomDecode()).        //自定義解碼器
                addLast(new TCPServerHandler())     //自定義處理器
        ;
    }
}

解碼器中解決tcp粘包問題铲汪,關(guān)于什么是粘包、拆包我就不做解釋了罐柳,我這里直接上解決方案掌腰,這里我簡(jiǎn)單說(shuō)一下我做的項(xiàng)目數(shù)據(jù)傳輸,規(guī)定數(shù)據(jù)格式:

固定頭部(2字節(jié))+數(shù)據(jù)長(zhǎng)度(4字節(jié))+其它(17字節(jié))+數(shù)據(jù)(可變長(zhǎng)度)+crc校驗(yàn)碼(2字節(jié))+固定結(jié)尾(2字節(jié))

所以每次收到的數(shù)據(jù)包中包含了數(shù)據(jù)的長(zhǎng)度张吉,就以此長(zhǎng)度來(lái)組裝數(shù)據(jù)包傳遞給handler齿梁,這里注意看我的注釋部分。

import util.StringUtil;

import java.util.List;

/**
 * @Author lanni
 * @Date 2020/8/23 9:30
 * @Description
 **/
public class CustomDecode extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();       //這里得到可讀取的字節(jié)長(zhǎng)度
        in.markReaderIndex();               //包頭做標(biāo)記位肮蛹,后面可以重新回到數(shù)據(jù)包頭開始讀數(shù)據(jù)
        //有數(shù)據(jù)時(shí)開始讀數(shù)據(jù)包
        if (len > 0) {
            byte[] src = new byte[len];
            in.readBytes(src);          //把數(shù)據(jù)讀到字節(jié)數(shù)組中(讀取完之后指針會(huì)到最后一個(gè)數(shù)據(jù))
            in.resetReaderIndex();      //重置當(dāng)前指針到標(biāo)記位(包頭)
            //驗(yàn)證首部為A5 5A,只接收首部正確的數(shù)據(jù)包,如果包頭錯(cuò)誤可以直接丟棄或關(guān)閉連接
            if ((src[0] & 0x000000ff) == 0xA5 && (src[1] & 0x000000ff) == 0x5A) {
                //計(jì)算報(bào)文長(zhǎng)度
                byte[] data =  {src[3],src[2]};
                String hexLen = StringUtil.byteArrayToHexString(data);
                //這里計(jì)算出來(lái)的是數(shù)據(jù)長(zhǎng)度的報(bào)文長(zhǎng)度,需要加27個(gè)固定長(zhǎng)度
                int pLen = Integer.parseInt(hexLen, 16) + 27;
                if (len < pLen) {
                    //當(dāng)數(shù)據(jù)包的長(zhǎng)度不夠時(shí)直接return勺择,netty在緩沖區(qū)有數(shù)據(jù)時(shí)會(huì)一直調(diào)用decode方法,所以我們只需要等待下一個(gè)數(shù)據(jù)包傳輸過來(lái)一起解析
                    return;
                }
                byte[] packet = new byte[pLen];
                in.readBytes(packet,0,pLen);
                out.add(packet);
            }else {
                channelHandlerContext.close();
            }
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("連接異常:"+cause);
//        ctx.close();
    }

然后就是處理器伦忠,用于處理得到的數(shù)據(jù)包省核,這個(gè)大家可以自己編寫邏輯。

package handler;

import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
 * @Author lanni
 * @Date 2020/8/19 23:07
 * @Description
 **/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //這里msg就是從解碼器中傳來(lái)的數(shù)據(jù)昆码,解碼器傳輸過來(lái)是什么格式气忠,這里直接轉(zhuǎn)成對(duì)應(yīng)的格式就可以
        byte[] src = (byte[]) msg;
        try {
            //這里做自己的業(yè)務(wù)邏輯
            
            
            //獲取鏈接實(shí)例
            Channel channel = ctx.channel();
            //響應(yīng)消息一定要這樣去發(fā)送,只能使用字節(jié)傳輸
            //netty中發(fā)送數(shù)據(jù)需要把待發(fā)送的字節(jié)數(shù)組包裝一下成為ByteBuf來(lái)發(fā)送
            byte[] dest = null;
            ByteBuf buf = Unpooled.copiedBuffer(dest);
            //數(shù)據(jù)沖刷
            ChannelFuture cf = channel.writeAndFlush(buf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        ctx.close();
    }
}

netty中當(dāng)然還涉及到服務(wù)器主動(dòng)發(fā)送消息給客戶端赋咽,但是需要注意的是如果是主動(dòng)發(fā)消息旧噪,有一個(gè)先決條件是需要知道客戶端的唯一標(biāo)識(shí)(id或其它標(biāo)識(shí)),我們需要用一個(gè)map來(lái)保存好channel和這個(gè)標(biāo)識(shí)的對(duì)應(yīng)關(guān)系冬耿。我所做的項(xiàng)目是服務(wù)器來(lái)維護(hù)設(shè)備id和連接通道channel的對(duì)應(yīng)關(guān)系舌菜。

首先需要一個(gè)統(tǒng)一管理channel的類,這里有CHANNEL_POOLKEY_POOL兩個(gè)map亦镶,是為了讓id和channel能夠互相對(duì)應(yīng)起來(lái)日月,可能有人會(huì)想著只需要維護(hù)id—>channel的關(guān)系就可以了袱瓮,但是可以看見上面在發(fā)生異常時(shí)所使用的處理方法exceptionCaught(ChannelHandlerContext ctx, Throwable cause)時(shí),只能拿到channel爱咬,所以需要通過channel找到id來(lái)做出相應(yīng)的操作尺借。

import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author lanni
 * @date 2020/9/11 20:21
 *
 **/
@Slf4j
public class NettyChannelManager {
    /**
     * 保存連接 Channel 的地方
     */
    private static Map<String, Channel> CHANNEL_POOL = new ConcurrentHashMap<>();
    private static Map<Channel, String> KEY_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 Channel
     *
     * @param key
     */
    public static void add(String key, Channel channel) {
        CHANNEL_POOL.put(key, channel);
        KEY_POOL.put(channel, key);
    }

    /**
     * 刪除 Channel
     *
     * @param key
     */
    public static void remove(String key) {
        Channel channel = CHANNEL_POOL.get(key);
        if (channel == null) {
            return;
        }
        CHANNEL_POOL.remove(key);
        KEY_POOL.remove(channel);
    }

    /**
     * 刪除并同步關(guān)閉連接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        Channel channel = CHANNEL_POOL.get(key);
        remove(key);
        if (channel != null) {
            // 關(guān)閉連接
            try {
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void removeAndClose(Channel channel) {
        String key = KEY_POOL.get(channel);
        removeAndClose(key);
    }

    /**
     * 獲得 Channel
     *
     * @param key
     * @return String
     */
    public static Channel getChannel(String key) {
        return CHANNEL_POOL.get(key);
    }

    /**
     * 獲得 key
     *
     * @param channel
     * @return Channel
     */
    public static String getKey(Channel channel) {
        return KEY_POOL.get(channel);
    }

    /**
     * 判斷是否存在key
     * @author lanni
     * @date 2020/9/16 10:10
     * @param key
     * @return boolean
     **/
    public static boolean hasKey(String key) {
        return CHANNEL_POOL.containsKey(key);
    }

    /**
     * 判斷是否存在channel
     * @author lanni
     * @date 2020/10/12 9:34
     * @param channel
     * @return boolean
     **/
    public static boolean hasChannel(Channel channel) {
        return KEY_POOL.containsKey(channel);
    }

}

我這里是在處理器中獲取到設(shè)備的id,然后交給NettyChannelManager管理精拟,當(dāng)發(fā)生異常時(shí)關(guān)閉channel并移除對(duì)應(yīng)的連接信息燎斩。

package handler;

import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
 * @Author lanni
 * @Date 2020/8/19 23:07
 * @Description
 **/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //這里msg就是從解碼器中傳來(lái)的數(shù)據(jù),解碼器傳輸過來(lái)是什么格式蜂绎,這里直接轉(zhuǎn)成對(duì)應(yīng)的格式就可以
        byte[] src = (byte[]) msg;
        try {
            // 從數(shù)據(jù)包中拿到設(shè)備id
            byte[] deviceId = new byte[17];
            System.arraycopy(src, 4, deviceId, 0, 17);
            String devId = StrUtil.str(deviceId, CharsetUtil.UTF_8);
            // 保存channel,key
            // deviceId為空時(shí)表示設(shè)備斷線重連
            if (!NettyChannelManager.hasKey(devId)) {
                NettyChannelManager.add(devId, ctx.channel());
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        log.error("發(fā)生異常:" + cause.getMessage());
        String devId = NettyChannelManager.getKey(ctx.channel());
        if (devId == null || "".equals(devId)) {
            return;
        }
        // 刪除鏈接信息并關(guān)閉鏈接
        NettyChannelManager.removeAndClose(ctx.channel());
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String devId = NettyChannelManager.getKey(ctx.channel());
        if (devId == null || "".equals(devId)) {
            return;
        }
        // 刪除鏈接信息并關(guān)閉鏈接
        NettyChannelManager.removeAndClose(ctx.channel());
    }

}

現(xiàn)在有了這樣一個(gè)對(duì)應(yīng)關(guān)系之后栅表,如果我們想給客戶端主動(dòng)發(fā)送消息,那么我們只需要通過客戶端的id拿到對(duì)應(yīng)的channel就可以在任意位置發(fā)送數(shù)據(jù)师枣。

        // 先準(zhǔn)備好需要發(fā)送的數(shù)據(jù)
        byte[] pkg = 
        // 通過id獲取netty連接通道channel
        Channel channel = NettyChannelManager.getChannel(deviceId);
        // 封裝數(shù)據(jù)
        ByteBuf buf = Unpooled.copiedBuffer(pkg);
        // 把數(shù)據(jù)寫入通道并發(fā)送
        channel.writeAndFlush(buf);

結(jié)語(yǔ):以上所說(shuō)都是在單機(jī)環(huán)境下怪瓶,如果說(shuō)是分布式環(huán)境的話那么關(guān)于id-channel的維護(hù)就需要修改。我們可以使用spring session來(lái)代替這里的

NettyChannelManager践美,只需要幾個(gè)配置就能解決分布式的問題洗贰,當(dāng)然也可以有其它的方案,我在這里就不列舉了陨倡。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末敛滋,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子兴革,更是在濱河造成了極大的恐慌绎晃,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,743評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件帖旨,死亡現(xiàn)場(chǎng)離奇詭異箕昭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)解阅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門落竹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人货抄,你說(shuō)我怎么就攤上這事述召。” “怎么了蟹地?”我有些...
    開封第一講書人閱讀 157,285評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵积暖,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我怪与,道長(zhǎng)夺刑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,485評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮遍愿,結(jié)果婚禮上噪径,老公的妹妹穿的比我還像新娘窘拯。我一直安慰自己,他們只是感情好螟深,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,581評(píng)論 6 386
  • 文/花漫 我一把揭開白布变屁。 她就那樣靜靜地躺著谊迄,像睡著了一般梅猿。 火紅的嫁衣襯著肌膚如雪碎赢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評(píng)論 1 290
  • 那天薛夜,我揣著相機(jī)與錄音籍茧,去河邊找鬼。 笑死梯澜,一個(gè)胖子當(dāng)著我的面吹牛硕糊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播腊徙,決...
    沈念sama閱讀 38,960評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼檬某!你這毒婦竟也來(lái)了撬腾?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,719評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤恢恼,失蹤者是張志新(化名)和其女友劉穎民傻,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體场斑,經(jīng)...
    沈念sama閱讀 44,186評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡漓踢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,516評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了漏隐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片喧半。...
    茶點(diǎn)故事閱讀 38,650評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖青责,靈堂內(nèi)的尸體忽然破棺而出挺据,到底是詐尸還是另有隱情,我是刑警寧澤脖隶,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布扁耐,位于F島的核電站,受9級(jí)特大地震影響产阱,放射性物質(zhì)發(fā)生泄漏婉称。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,936評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望王暗。 院中可真熱鬧悔据,春花似錦、人聲如沸瘫筐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)策肝。三九已至肛捍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間之众,已是汗流浹背拙毫。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留棺禾,地道東北人缀蹄。 一個(gè)月前我還...
    沈念sama閱讀 46,370評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像膘婶,于是被迫代替她去往敵國(guó)和親缺前。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,527評(píng)論 2 349