Netty自定義TCP協(xié)議通訊實例

Netty自定義TCP協(xié)議通訊實例

網(wǎng)絡(luò)編程的基本模型就是客戶機到服務(wù)器模型,簡單來說就是進程與進程之間的通訊

兩個進程之間必須要有一個提供固定的位置叁熔,讓另一個進程知道這個位置并建立聯(lián)系,這樣兩個進程就可以相互通訊

其中提供固定位置的叫服務(wù)端床牧,連接固定位置的叫客戶端

實際上因為是雙向通訊者疤,服務(wù)端客戶端并沒有太大的界限

今天寫一個實例,使用netty4來實現(xiàn)一個自定義的tcp報文解析

先說說自定義tcp協(xié)議的意義

文本協(xié)議:
對協(xié)議頭的 size 沒特別要求對解析速度沒特別要求需要協(xié)議輕松
支持變長 header 且保持較友好的擴展性(如像 HTTP 一樣可以隨意增加 header)
可升級協(xié)議且不增加協(xié)議本身復(fù)雜度
要求容易調(diào)試

二進制協(xié)議
要求協(xié)議頭 size 要足夠小
要求解析速度要足夠快
協(xié)議頭相對較固定叠赦,變動的可能性較低,但仍然需要留一些擴展位

比如即時通訊就很適合用二進制協(xié)議革砸,因為即時通訊需要頻繁收發(fā)消息除秀,對于傳輸和解析的速度都有比較高的要求
另外一個就是二進制協(xié)議沒有協(xié)議文檔幾乎不可讀


進入代碼階段

本文代碼鏈接:后端代碼

一、服務(wù)端

1.新建工程

這次不需要boot了算利,新建一個空的maven項目引入依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.53.Final</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>

<!-- log -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
    <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
    <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.15</version>
</dependency>

其中l(wèi)ogback是給lombok的@Slf4j的實現(xiàn)册踩,commons-codec用于獲取md5

2.實現(xiàn)一個服務(wù)端

netty的基本使用就是建立一個worker和一個boss EventLoopGroup,然后配置編碼解碼器以及自定義handler效拭,事件監(jiān)聽等暂吉,使得開發(fā)者的核心可以轉(zhuǎn)移到處理數(shù)據(jù)。



        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true).handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        
                        ch.pipeline().addLast(
                                // 自定義解碼器
                                new DecodeHandler(),
                                new StringEncoder(CharsetUtil.UTF_8),
                                // 自定義的文件處理handler
                                new TcpServerHandler());
                    }
                });

        try {
            int port = 8888;
            String host = "127.0.0.1";

            ChannelFuture future = bootstrap.bind(host, port).sync();

            log.info("啟動服務(wù)器 端口[{}]", port);

            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("%s", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    

除了常規(guī)配置之外缎患,我們要做的就是實現(xiàn)一個自定義的解碼器DecodeHandler以及消息處理器TcpServerHandler

3.協(xié)議制定

開始解碼前我們需要有一個協(xié)議慕的,才能知道該怎么解

業(yè)務(wù)的報文處理
解析
定義協(xié)議 PS:x表示占幾個字節(jié)
[[版本x2][報文總長度x2][消息類型x1][文件標(biāo)識x16][分片序號x2][文件總大小x8][分片文件長度x2][分片文件數(shù)據(jù)]]
消息類型 0x01 上傳文件 0x02 合并文件

示例報文(16進制):

0055041d015b886f17f511ed7908128e68520b8aa300000000000001159cf50400504b03041400000000003b0a3948000000000000000000000000180000007068616e746f6d6a732d322e312e312d77696e646f77732f504b0304140000000000670a39480000000000000000000000001c0000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f504b0304140000000800bd0a394821ec01d2cbbf140100a01b01290000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f7068616e746f6d6a732e657865ec9a8b3fd3ff1ec777373336b5106313b94d482a22e6325b85264285ad92cba2167329b731b799cbeae474fd955427bf2ebf748e44c45c6a2315e9942256a9c8a52531b9ec7cbf9ddbe35c1e8fd31f70be0f1eefafd79efb787edf9feff7cdc383ef4e31040e814010c0a74a0581d442fe7a5021fffba04221102dd25d2d48b5fa23e35aa8cf23e3c0e898043237fe6054fcee38f2dedd070e1ce491f7ec23c7271e20c71c207b6d0d20c71d8cd867a3a98931fddb1aebdb1ef4245caee4fffd73edd4267e3250c79107f987809a3193cc27ffedb5f81f5f6fe2b32e8195c15f0bd4df7646f27997c19af2e375ad7e063fe30a58937fbcff81b30f3ff5c7eb7bf9d13fd6f9951f7e05cc197cc48fd737f1e3ae8395f7b7f513800a726e4005f3a41f755bccde68d0e1df7bc0a441203e503844e07739e8ef991c02836a40b11088d8000a210341d504944bc3c021920928840954b0bb7820874120a81feff867857081d7f17f3d8543a85900f8e3e39ff5af85fa8b0216f20e02896561e062601d70dd58dc7feed1fc6e0c1c0bf9df47281d0367407efed8ce0cb103ca8f6bfbabd78f9be85f0e3ed00a805b0d9cfee8c58feb2f00eabf715480b3894f88df0b7e41feebb54008402d02d87fe52490ff1f3f7bacb1715a0dee93b1a63a1a1588db19d4fccaf6eb7d730c7c7f2ed0573b185c0702d963fc7a56150db579201e865a5ec7ee68ed24950f0e86c75771542b6c77b7d5c1be3db2860a069b7c1bcf52a56437eb7b29111780fdf157f52da82ec0be86f0d7b4318bbd4cd11117cd39c8ac61c8a275ace0ee8b86f136b4d00e518c92e2f7fda95aa5520d6c80af7511598bd0b9ed3c5dc1ac8a4712b4ab14b86634ae4cd27560d74c8e2451db890383310317ba8862eceb17b13138b65eac4fbf50f3c28e59b27334119f00756bb177350fb6db88c0464477b8cfb02ec2be94eb71ec8d535c89a38d9ff7ae264a911832cc6c8dc4199a3a6ac5ba8cbabed63472bb458c615f12c99a4371af7e106e0da3d80de2c2a6590e656be19b22e4b68eac59f1de52172a8731ec62ea9d93217b2f4678315c901c8ca7565dda3e071fe65d4ae34420c7d69d1ae2835a495d1d6253469adb175a8655e3e0b8ec8059b1674434c7923fd39b3c4644536259dc52a2ba71faf5b95e5e560a47

推薦使用NetAssist調(diào)試

4.解碼器

解碼器的意義在于先一步處理得到報文

TCP是一個“流”協(xié)議,所謂流挤渔,就是沒有界限的一長串二進制數(shù)據(jù)肮街。TCP作為傳輸層協(xié)議并不不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行數(shù)據(jù)包的劃分判导,所以在業(yè)務(wù)上認為是一個完整的包嫉父,可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送眼刃,這就是所謂的TCP粘包和拆包問題绕辖。

所以我們需要在解碼器對每一包進行校驗,小于校驗和的包需要粘包擂红,大于校驗和的包需要拆包仪际,這樣才能正確解析
至于怎么解決這個可以專門分析,網(wǎng)上也有很多方案,這里不談

除了粘包拆包問題弟头,我們還可能有對原始報文進行解密的操作吩抓,為了保證下游的handler拿到的bytebuf是與協(xié)議文檔一樣的格式,我們需要先解密赴恨,并把業(yè)務(wù)部分報文傳達下去疹娶,甚至可以在定義了一定格式的報文后,直接在decoder中把二進制數(shù)據(jù)變成對象或者等類型伦连,下游再解析雨饺。

public class DecodeHandler extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        // 取到tag與報文長度后再處理,各2字節(jié)
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        // 記錄當(dāng)前ByteBuf的讀指針位置惑淳,以便下面取報文長度字節(jié)
        // pos是一個完整報文的開始位置额港,報文整體會在ByteBuf中移動,類似內(nèi)存管理歧焦,所以基于字節(jié)的判斷報文長度等等移斩,都是基于pos,否則可以在byteBuf.readBytes()之后加绢馍,byteBuf.discardReadBytes();整理ByteBuf向瓷,使pos回到0開始位置
        int pos = byteBuf.readerIndex();
        
        //不同協(xié)議頭建議使用策略模式處理
        short protocol = byteBuf.readShort();//前面兩個字節(jié)是協(xié)議版本 0x55
        
        int msgLen = byteBuf.readShort();//第三第四個字節(jié)表示報文長度

        // 收到的報文長度不足一個完整的報文,繼續(xù)接收
        if (byteBuf.readableBytes() < msgLen) {
            return;
        }
        
        // 提出完整報文(readBytes讀到msg中)舰涌,放到list給下一個Handler處理
        if (msgLen > 0) {
            out.add(byteBuf.readBytes(msgLen));
        }

    }

}

5.自定義handler

從上面的DecodeHandler可以看到猖任,前4個字節(jié)用來做協(xié)議版本和報文長度,已經(jīng)read了瓷耙,丟到out中給到下游的是這四個字節(jié)以外的數(shù)據(jù)

// 前面4個字節(jié)已經(jīng)在decode階段跳過了
        byte msgType = byteBuf.readByte();// 讀取一個字節(jié)為消息類型

        switch (msgType) {
        case 0x01:
            upload(byteBuf);
            break;
        case 0x02:
            merge(byteBuf);
            break;
        }

所以在handler中可以直接讀取一個字節(jié)判斷當(dāng)前的報文類型再分發(fā)
然后根據(jù)已有的協(xié)議進行解析

// 業(yè)務(wù)的報文處理
// 解析
// 定義協(xié)議 PS:x表示占幾個字節(jié)
// [[版本x2][報文總長度x2][消息類型x1][文件標(biāo)識x16][分片序號x2][文件總大小x8][分片文件長度x2][分片文件數(shù)據(jù)]]
// 消息類型 0x01 上傳文件 0x02 合并文件

byte[] filenameBytes = new byte[16];
buf.readBytes(filenameBytes);// 讀取16字節(jié)是文件名
String filename = ByteUtils.bytesToHex(filenameBytes);// 我這里是md5不會有中文沒有亂碼的問題

short fileseq = buf.readShort();// 讀取2字節(jié)是分片序號

long filesize = buf.readLong();// 讀取8字節(jié)是文件總大小

short chunksize = buf.readShort();// 讀取2字節(jié)是分片文件長度

byte[] dataBytes = new byte[chunksize];
buf.readBytes(dataBytes);

已經(jīng)有大佬寫好框架朱躺,只需要做解析的同學(xué),做這個不算太難搁痛,稍微有些枯燥长搀,需要認真細心,不要解錯

二落追、客戶端

相對服務(wù)端而言盈滴,客戶端需要搜集本次通訊的報文數(shù)據(jù),無論是protobuf也好byte數(shù)據(jù)也好轿钠,都比服務(wù)端要復(fù)雜一些

由于我們的netty實現(xiàn)了一個tcp服務(wù)端巢钓,所以可以客戶端只要使用tcp協(xié)議就可以通訊,我們使用java提供的socketapi即可

核心代碼

/**
     * TODO 本來發(fā)送是不需要返回值的疗垛,但是這里為了使用latch來計算發(fā)了多少症汹,通過返回true來表示發(fā)送成功一個
     * 
     * @param msg
     * @return
     */
    synchronized public boolean sendMsgBySocket(ByteBuffer msg) {
        try {
//          Socket socket = getSocketClient();

            //這段代碼僅僅調(diào)試用,盡量一個不要開多個socket跟服務(wù)端通信贷腕,應(yīng)該使用同一個鏈路背镇,多次發(fā)送消息咬展,服務(wù)端解決粘包和拆包的問題
            // 要連接的服務(wù)端IP地址和端口-----------------------------------
            String host = "127.0.0.1";
            int port = 8888;

            // 與服務(wù)端建立連接
            this.socket = new Socket(host, port);
            //---------------------------------------------------------

            // 建立連接后獲得輸出流
            OutputStream outputStream = socket.getOutputStream();
            byte[] msgArray = msg.array();
            // 報文頭start-----------------------------------------------
            // 55表示協(xié)議版本
            short protocol = 0x55;
            ByteBuffer header = ByteBuffer.allocate(4);
            header.putShort(protocol);// 版本x2

            // 寫入本次報文總長度 固定是37 + 分片文件長度
            short length = (short) (msgArray.length);
            header.putShort(length);// 報文總長度x2
            // 報文頭end-----------------------------------------------
            outputStream.write(header.array());
            outputStream.write(msgArray);
            outputStream.flush();

            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            int read = 0;

            while (true) {
                // 死循環(huán)直到服務(wù)器返回 這里可以添加超時機制,防止服務(wù)器出問題后無限循環(huán)
                read = inputStream.read(bytes);
                if (read > 0) {
                    String result = new String(bytes);
                    log.info("receive : {}", result);
                    if ("ok".equals(result)) {
                        // 假定回復(fù)ok就是成功了
                        return true;
                    }
                    break;
                }
            }

        } catch (Exception e) {
            log.error("[{}]", e);
        }

        return false;

    }

這里的含義是我們先組裝好要發(fā)送的報文ByteBuffer瞒斩,然后調(diào)用這個發(fā)起socket通訊

其他分享的代碼
這次做的是通過tcp分片上傳大文件

思想還是和上篇一樣破婆,客戶端分片,通過md5獲得唯一標(biāo)識以免重復(fù)胸囱,每個報文包括分片序號祷舀,唯一標(biāo)識,總大小分片大小還有分片數(shù)據(jù)

因為我們要規(guī)定每個分片的最大大小才能計算分片數(shù)量

所以這里可以利用FileChannel烹笔,分段讀取裳扯,無需切片文件,也避免把整個文件讀取到內(nèi)存

FileChannel fileChannel = FileChannel.open(Paths.get(file.getAbsolutePath()),
                    EnumSet.of(StandardOpenOption.READ));

    // 每個分片最大1024谤职,所以只有最后一片不一定是1024饰豺,但是也能算出來
    int lastSize = (int) (fileChannel.size() - ((chunk - 1) * 1024));// 總長度減去前片的和

    for (int chunkNum = 0; chunkNum < chunk; chunkNum++) {

        int size = chunkNum == chunk ? lastSize : chunkSize;

        ByteBuffer buffer = ByteBuffer.allocate(size);

        fileChannel.read(buffer);

我規(guī)定每個分片最大1024字節(jié)也就是1kb,所以每次最多讀1k允蜈,最后一片可能小于等于1k冤吨,每循環(huán)一次就把報文放到線程池中等待發(fā)送

利用CountDownLatch設(shè)定一個屏障,要求發(fā)送的總數(shù)達到才關(guān)閉線程池饶套,在服務(wù)端返回ok后latch.countDown()達到計數(shù)的效果

// 設(shè)定一個計數(shù)器
    CountDownLatch latch = new CountDownLatch(chunk);
    fixedThreadPool.execute(() -> {
            // 將組裝好的報文锅很,在線程池中創(chuàng)建線程運行
            client.sendMsgBySocket(msgBuffer);
            // 應(yīng)該要判斷是否成功的。這里不判斷認為都成功
            latch.countDown();
        });
    // 所有線程沒發(fā)送完全之前 一直等待
    latch.await();

分片大小1k確實有點小凤跑,不過這是為了看測試效果,實際上要根據(jù)通訊網(wǎng)絡(luò)來定義分片大小叛复,帶寬好的大一些

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末仔引,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子褐奥,更是在濱河造成了極大的恐慌咖耘,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件撬码,死亡現(xiàn)場離奇詭異儿倒,居然都是意外死亡,警方通過查閱死者的電腦和手機呜笑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門夫否,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人叫胁,你說我怎么就攤上這事凰慈。” “怎么了驼鹅?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵微谓,是天一觀的道長森篷。 經(jīng)常有香客問我,道長豺型,這世上最難降的妖魔是什么仲智? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮姻氨,結(jié)果婚禮上钓辆,老公的妹妹穿的比我還像新娘。我一直安慰自己哼绑,他們只是感情好岩馍,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著抖韩,像睡著了一般蛀恩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上茂浮,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天双谆,我揣著相機與錄音,去河邊找鬼席揽。 笑死顽馋,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的幌羞。 我是一名探鬼主播寸谜,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼属桦!你這毒婦竟也來了熊痴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤聂宾,失蹤者是張志新(化名)和其女友劉穎果善,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體系谐,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡巾陕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了纪他。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鄙煤。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡东跪,死狀恐怖梯轻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嘴办,我是刑警寧澤弹谁,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布乾巧,位于F島的核電站句喜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏沟于。R本人自食惡果不足惜咳胃,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望旷太。 院中可真熱鬧展懈,春花似錦、人聲如沸供璧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽睡毒。三九已至来惧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間演顾,已是汗流浹背供搀。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钠至,地道東北人葛虐。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像棉钧,于是被迫代替她去往敵國和親屿脐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354