Mina粘包块仆,斷包問題處理(附完整實例构蹬,客戶端,服務端)

1.什么是斷包悔据,粘包庄敛?

在講斷包,粘包之前科汗,先說下消息保護邊界和無消息保護邊界铐姚。
1.保護消息邊界,就是指傳輸協(xié)議把數(shù)據(jù)當作一條獨立的消息在網(wǎng)上傳輸,接收端只能接收獨立的消息.也就是說存在保護消息邊界,接收端一次只能接收發(fā)送端發(fā)出的一個數(shù)據(jù)包.
2.而面向流則是無消息保護邊界的,如果發(fā)送端連續(xù)發(fā)送數(shù)據(jù), 接收端有可能在一次接收動作中,會接收兩個或者更多的數(shù)據(jù)包肛捍。

而tcp是面向流的隐绵,需要在消息接收端處理消息邊界問題。

接收端在接受數(shù)據(jù)時有可能會遇到下面四種情況

A.先接收到dataA然后接收到dataB.
B.先接收到dataA的部分數(shù)據(jù),然后接收到dataA余下的部分以及dataB的全部.
C.先接收到了dataA的全部數(shù)據(jù)和dataB的部分數(shù)據(jù),然后接收到了dataB的余下的數(shù)據(jù).
D.一次性接收到了dataA和dataB的全部數(shù)據(jù).

A為正常情況拙毫,無粘包或斷包依许。
B為斷包+粘包。
C為粘包+斷包缀蹄。
D為粘包峭跳。

2.如何處理Mina中遇到的粘包和斷包問題

在Mina框架中有個CumulativeProtocolDecoder 累積性的協(xié)議解碼器,專門用來處理粘包和斷包問題缺前。doDecode()的返回值有重要作用蛀醉。

A.你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首先判斷你是否在doDecode()方法中從內(nèi)部的IoBuffer 緩沖區(qū)讀取了數(shù)據(jù)衅码,如果沒有拯刁,則會拋出非法的狀態(tài)異常,也就是你的doDecode()方法返回true 就表示你已經(jīng)消費了本次數(shù)據(jù)(相當于聊天室中一個完整的消息已經(jīng)讀取完畢)逝段,進一步說垛玻,也就是此時你必須已經(jīng)消費過內(nèi)部的IoBuffer 緩沖區(qū)的數(shù)據(jù)(哪怕是消費了一個字節(jié)的數(shù)據(jù))。如果驗證過通過奶躯,那么CumulativeProtocolDecoder 會檢查緩沖區(qū)內(nèi)是否還有數(shù)據(jù)未讀取帚桩,如果有就繼續(xù)調(diào)用doDecode()方法,沒有就停止對doDecode()方法的調(diào)用嘹黔,直到有新的數(shù)據(jù)被緩沖账嚎。

B. 當你的doDecode()方法返回false 時,CumulativeProtocolDecoder 會停止對doDecode()方法的調(diào)用,但此時如果本次數(shù)據(jù)還有未讀取完的郭蕉,就將含有剩余數(shù)據(jù)的IoBuffer 緩沖區(qū)保存到IoSession 中乏悄,以便下一次數(shù)據(jù)到來時可以從IoSession 中提取合并。如果發(fā)現(xiàn)本次數(shù)據(jù)全都讀取完畢恳不,則清空IoBuffer 緩沖區(qū)(讓父類進行接收下一個包)檩小。簡而言之,當你認為讀取到的數(shù)據(jù)已經(jīng)夠解碼了烟勋,那么就返回true规求,否則就返回false。這個CumulativeProtocolDecoder 其實最重要的工作就是幫你完成了數(shù)據(jù)的累積卵惦,因為這個工作是很煩瑣的阻肿。也就是說返回true,那么CumulativeProtocolDecoder會再次調(diào)用decoder沮尿,并把剩余的數(shù)據(jù)發(fā)下來丛塌;(意思就是會把剩余數(shù)據(jù)給doDecode()處理,剩余數(shù)據(jù)就是remaining()的數(shù)據(jù))畜疾,返回false就不處理剩余的赴邻,(不把剩余數(shù)據(jù)給doDecode()處理)當有新數(shù)據(jù)包來的時候就把剩余的數(shù)據(jù)和新的數(shù)據(jù)拼接在一起,然后再調(diào)用decoder啡捶。

下面附上一個完整的實例
1.消息的格式
包頭+消息長度(int)+消息內(nèi)容(json字符串)+包尾姥敛,包頭包尾是十六進制字符串00 aa bb cc,轉化成字節(jié)數(shù)組0, -86, -69, -52四個字節(jié),下面的完整實例有客戶端瞎暑,服務端彤敛,將會解析數(shù)據(jù),獲取其中的消息內(nèi)容(Json字符串)并且打印處理了赌,消息以字節(jié)數(shù)組的方式在服務端墨榄,客戶端之間傳遞。

服務端代碼

package com.my.mina;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 * mina的Service端
 * 
 * @author linbin
 *
 */
public class MinaService {

    public static void main(String[] args) {

        // 創(chuàng)建一個非阻塞的server端的Socket
        IoAcceptor acceptor = new NioSocketAcceptor();
        // 添加日志過濾器
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// 自定義解編碼器

        // 設置Handler
        acceptor.setHandler(new DemoServerHandler());
        // 設置讀取數(shù)據(jù)的緩存區(qū)大小
        acceptor.getSessionConfig().setReadBufferSize(2048);
        // 讀寫通道10秒內(nèi)無操作進入空閑狀態(tài)
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        try {
            // 綁定端口
            acceptor.bind(new InetSocketAddress(20000));
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("啟動服務");
    }

    /**
     * @ClassName: DemoServerHandler
     * @Description: 負責session對象的創(chuàng)建和監(jiān)聽以及消息的創(chuàng)建和接收監(jiān)聽
     * @author chenzheng
     * @date 2016-12-9 下午3:57:11
     */
    private static class DemoServerHandler extends IoHandlerAdapter {

        // 服務器與客戶端創(chuàng)建連接
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("服務器與客戶端創(chuàng)建連接...");
            super.sessionCreated(session);
        }

        @Override
        public void sessionOpened(IoSession session) throws Exception {
            System.out.println("服務器與客戶端連接打開...");
            super.sessionOpened(session);
        }

        // 消息的接收處理
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            // TODO Auto-generated method stub
            super.messageReceived(session, message);// 消息的接受

            // 傳遞自定義解編碼器傳遞數(shù)組和解析數(shù)組丟包斷包的
            String a = (String) message;
            System.out.println("接收到的數(shù)據(jù):" + a);
            session.write(a);

        }

        // 消息發(fā)送后調(diào)用
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            // TODO Auto-generated method stub
            super.messageSent(session, message);
            System.out.println("服務器發(fā)送消息成功...");
        }

        // session關閉
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            // TODO Auto-generated method stub
            super.sessionClosed(session);
            System.out.println("斷開連接:");
        }
    }

}

編碼器


package com.my.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import java.nio.charset.Charset;

/**
 * 編碼器
 * 
 */
public class ByteArrayEncoder extends ProtocolEncoderAdapter {

    private final Charset charset;

    public ByteArrayEncoder(Charset charset) {
        this.charset = charset;

    }

    /**
     * 直接將數(shù)據(jù)發(fā)出去,數(shù)據(jù)格式勿她,包頭+消息長度(int)+消息內(nèi)容(json字符串)+包尾 包頭包尾是十六進制字符串00 aa bb cc,轉化成字節(jié)數(shù)組0,
     * -86, -69, -52四個字節(jié)
     *
     * @param session
     * @param message
     * @param out
     * @throws Exception
     */
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        // 仿項目袄秩,解決斷包,粘包問題
        String value = (message == null ? "" : message.toString());// 消息值
        byte[] content = value.getBytes(charset);// 消息內(nèi)容,字節(jié)數(shù)組
        IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// 緩沖區(qū)容量大小38字節(jié)加上字符長度
        buf.put(new byte[] { 0, -86, -69, -52 });// 輸入包開頭固定值十六進制00 aa bb cc,轉化成字節(jié)數(shù)組
        buf.putUnsignedInt(content.length);// int為4字節(jié)嫂拴,一個字節(jié)等于2個16進制字符播揪,所以有八位 00 00 00 0c,內(nèi)容長度筒狠。
        buf.put(content);// 消息內(nèi)容
        buf.put(new byte[] { 0, -86, -69, -52 });// 包尾
        buf.flip();
        out.write(buf);// 寫入
    }

}

解碼器,重點箱沦,解決Mina斷包辩恼,丟包問題

package com.my.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import java.nio.charset.Charset;

/**
 * 自定義解碼器,確保能讀到完整的包
 */
public class ByteArrayDecoder extends CumulativeProtocolDecoder {

    private final Charset charset;

    public ByteArrayDecoder(Charset charset) {
        this.charset = charset;

    }

    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)
            throws Exception {
        // 丟包,斷包處理
        if (ioBuffer.remaining() > 4)// 有包頭灶伊,包頭足夠
        {
            ioBuffer.mark();// 標記當前position的快照標記mark疆前,以便后繼的reset操作能恢復position位置,開始是0
            byte[] l = new byte[4];
            ioBuffer.get(l);// 讀取包頭聘萨,占4個字節(jié)
            if (ioBuffer.remaining() < 4)// 內(nèi)容長度的4個字節(jié)不夠竹椒,斷包
            {
                ioBuffer.reset();
                return false;//
            } else {// 內(nèi)容長度的4個字節(jié)數(shù)組足夠
                byte[] bytesLegth = new byte[4];// 內(nèi)容長度
                ioBuffer.get(bytesLegth);// 讀取內(nèi)容長度,int類型,占四個字節(jié)
                int len = MinaUtil.byteArrayToInt(bytesLegth);// 內(nèi)容長度有多少
                if (ioBuffer.remaining() < len)// 內(nèi)容不夠米辐,斷包
                {
                    ioBuffer.reset();
                    return false;//

                } else { // 消息內(nèi)容足夠

                    byte[] bytes = new byte[len];
                    ioBuffer.get(bytes, 0, len);
                    protocolDecoderOutput.write(new String(bytes, charset));// 讀取內(nèi)容胸完,并且發(fā)送

                    if (ioBuffer.remaining() < 4) {// 包尾不夠
                        ioBuffer.reset();
                        return false;//

                    } else {// 包尾足夠
                        byte[] tails = new byte[4];
                        ioBuffer.get(tails);// 讀取包尾
                        if (ioBuffer.remaining() > 0)// 最后如果粘了包,會再次調(diào)用doDeocde()方法翘贮,把剩余數(shù)據(jù)給doDeocde()方法處理
                        {
                            return true;
                        }

                    }
                }

            }

        }
        return false;// 斷包赊窥,或者執(zhí)行完,

    }
}

解編碼工廠

package com.my.mina;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

import java.nio.charset.Charset;

/**
 * 自定義解編碼器工廠
 *
 */

public class ByteArrayCodecFactory implements ProtocolCodecFactory {

    private ByteArrayDecoder decoder;
    private ByteArrayEncoder encoder;

    public ByteArrayCodecFactory() {
        this(Charset.defaultCharset());
    }

    public ByteArrayCodecFactory(Charset charSet) {
        encoder = new ByteArrayEncoder(charSet);
        decoder = new ByteArrayDecoder(charSet);
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

}

注意:客戶端狸页,服務端需要和服務端有同樣的解碼器锨能,編碼器,解編碼工廠這三個類芍耘。

客戶端核心代碼


package com.example.mina.minaapplication.view;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;

import com.example.mina.minaapplication.R;
import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * Mina客戶端
 */
public class MainActivity extends Activity {
    /**
     * 線程池址遇,避免阻塞主線程,與服務器建立連接使用斋竞,創(chuàng)建一個只有單線程的線程池傲隶,盡快執(zhí)行線程的線程池
     */
    private static ExecutorService executorService = Executors.newSingleThreadExecutor();


    /**
     * 連接對象
     */
    private NioSocketConnector mConnection;
    /**
     * session對象
     */
    private IoSession mSession;
    /**
     * 連接服務器的地址
     */
    private InetSocketAddress mAddress;

    private ConnectFuture mConnectFuture;


    public static final int UPADTE_TEXT = 1;
    /**
     * 服務端返回的信息
     */
    private TextView tvShow;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        tvShow = findViewById(R.id.tv_show);
        initConfig();
        connect();
        findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//發(fā)送消息數(shù)據(jù)


            @Override
            public void onClick(View view) {
                if (mConnectFuture != null && mConnectFuture.isConnected()) {//與服務器連接上
                    mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//發(fā)送json字符串
                }

            }
        });
    }

    /**
     * 初始化Mina配置信息
     */
    private void initConfig() {
        mAddress = new InetSocketAddress("192.168.0.1", 20000);//連接地址,此數(shù)據(jù)可改成自己要連接的IP和端口號
        mConnection = new NioSocketConnector();// 創(chuàng)建連接
        // 設置讀取數(shù)據(jù)的緩存區(qū)大小
        SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig();
        socketSessionConfig.setReadBufferSize(2048);
        socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//設置4秒沒有讀寫操作進入空閑狀態(tài)
        mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging過濾器
        mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//自定義解編碼器
        mConnection.setHandler(new DefaultHandler());//設置handler
        mConnection.setDefaultRemoteAddress(mAddress);//設置地址


    }

    /**
     * 創(chuàng)建連接
     */

    private void connect() {

        FutureTask<Void> futureTask = new FutureTask<>(new Callable<Void>() {
            @Override
            public Void call() {//

                try {
                    while (true) {
                        mConnectFuture = mConnection.connect();
                        mConnectFuture.awaitUninterruptibly();//一直等到他連接為止
                        mSession = mConnectFuture.getSession();//獲取session對象
                        if (mSession != null && mSession.isConnected()) {
                            Toast.makeText(MainActivity.this, "連接成功", Toast.LENGTH_SHORT).show();
                            break;
                        }
                        Thread.sleep(3000);//每隔三秒循環(huán)一次
                    }

                } catch (Exception e) {//連接異常


                }
                return null;
            }
        });
        executorService.execute(futureTask);//執(zhí)行連接線程
    }


    /**
     * Mina處理消息的handler,從服務端返回的消息一般在這里處理
     */
    private class DefaultHandler extends IoHandlerAdapter {


        @Override
        public void sessionOpened(IoSession session) throws Exception {
            super.sessionOpened(session);

        }

        /**
         * 接收到服務器端消息
         *
         * @param session
         * @param message
         * @throws Exception
         */
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            Log.e("tag", "接收到服務器端消息:" + message.toString());

            Message message1 = new Message();
            message1.what = UPADTE_TEXT;
            message1.obj = message;
            handler.sendMessage(message1);
        }


        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//客戶端進入空閑狀態(tài).
            super.sessionIdle(session, status);

        }
    }

    /**
     * 更新UI
     */
    private Handler handler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case UPADTE_TEXT:
                    String message = (String) msg.obj;
                    tvShow.setText(message);
                    break;
            }
        }
    };
}

客戶端截圖:

客戶端截圖

服務端截圖:

服務端截圖

本文完整項目代碼地址(歡迎來star):
https://github.com/lb1207087645/Android-Mina-master

參考資源:

淺談TCP粘包、斷包問題與解決方案

mina自定義編解碼器接收處理byte數(shù)組(同時解決數(shù)據(jù)傳輸中的粘包窃页、缺包問題)

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末跺株,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子脖卖,更是在濱河造成了極大的恐慌乒省,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件畦木,死亡現(xiàn)場離奇詭異袖扛,居然都是意外死亡,警方通過查閱死者的電腦和手機十籍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門蛆封,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人勾栗,你說我怎么就攤上這事惨篱。” “怎么了围俘?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵砸讳,是天一觀的道長琢融。 經(jīng)常有香客問我,道長簿寂,這世上最難降的妖魔是什么漾抬? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮常遂,結果婚禮上纳令,老公的妹妹穿的比我還像新娘。我一直安慰自己克胳,他們只是感情好平绩,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著毯欣,像睡著了一般馒过。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上酗钞,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天腹忽,我揣著相機與錄音,去河邊找鬼砚作。 笑死窘奏,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的葫录。 我是一名探鬼主播着裹,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼米同!你這毒婦竟也來了骇扇?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤面粮,失蹤者是張志新(化名)和其女友劉穎少孝,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熬苍,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡稍走,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了柴底。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片婿脸。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖柄驻,靈堂內(nèi)的尸體忽然破棺而出狐树,到底是詐尸還是另有隱情,我是刑警寧澤凿歼,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布褪迟,位于F島的核電站冗恨,受9級特大地震影響答憔,放射性物質(zhì)發(fā)生泄漏味赃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一虐拓、第九天 我趴在偏房一處隱蔽的房頂上張望心俗。 院中可真熱鬧,春花似錦蓉驹、人聲如沸城榛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狠持。三九已至,卻和暖如春瞻润,著一層夾襖步出監(jiān)牢的瞬間喘垂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工绍撞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留正勒,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓傻铣,卻偏偏與公主長得像章贞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子非洲,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容