1.什么是斷包悔据,粘包庄敛?
在講斷包,粘包之前科汗,先說下消息保護邊界和無消息保護邊界铐姚。
1.保護消息邊界,就是指傳輸協(xié)議把數(shù)據(jù)當作一條獨立的消息在網(wǎng)上傳輸,接收端只能接收獨立的消息.也就是說存在保護消息邊界,接收端一次只能接收發(fā)送端發(fā)出的一個數(shù)據(jù)包.
2.而面向流則是無消息保護邊界的,如果發(fā)送端連續(xù)發(fā)送數(shù)據(jù), 接收端有可能在一次接收動作中,會接收兩個或者更多的數(shù)據(jù)包肛捍。
而tcp是面向流的隐绵,需要在消息接收端處理消息邊界問題。
接收端在接受數(shù)據(jù)時有可能會遇到下面四種情況
A.先接收到dataA然后接收到dataB.
B.先接收到dataA的部分數(shù)據(jù),然后接收到dataA余下的部分以及dataB的全部.
C.先接收到了dataA的全部數(shù)據(jù)和dataB的部分數(shù)據(jù),然后接收到了dataB的余下的數(shù)據(jù).
D.一次性接收到了dataA和dataB的全部數(shù)據(jù).
A為正常情況拙毫,無粘包或斷包依许。
B為斷包+粘包。
C為粘包+斷包缀蹄。
D為粘包峭跳。
2.如何處理Mina中遇到的粘包和斷包問題
在Mina框架中有個CumulativeProtocolDecoder 累積性的協(xié)議解碼器,專門用來處理粘包和斷包問題缺前。doDecode()的返回值有重要作用蛀醉。
A.你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首先判斷你是否在doDecode()方法中從內(nèi)部的IoBuffer 緩沖區(qū)讀取了數(shù)據(jù)衅码,如果沒有拯刁,則會拋出非法的狀態(tài)異常,也就是你的doDecode()方法返回true 就表示你已經(jīng)消費了本次數(shù)據(jù)(相當于聊天室中一個完整的消息已經(jīng)讀取完畢)逝段,進一步說垛玻,也就是此時你必須已經(jīng)消費過內(nèi)部的IoBuffer 緩沖區(qū)的數(shù)據(jù)(哪怕是消費了一個字節(jié)的數(shù)據(jù))。如果驗證過通過奶躯,那么CumulativeProtocolDecoder 會檢查緩沖區(qū)內(nèi)是否還有數(shù)據(jù)未讀取帚桩,如果有就繼續(xù)調(diào)用doDecode()方法,沒有就停止對doDecode()方法的調(diào)用嘹黔,直到有新的數(shù)據(jù)被緩沖账嚎。
B. 當你的doDecode()方法返回false 時,CumulativeProtocolDecoder 會停止對doDecode()方法的調(diào)用,但此時如果本次數(shù)據(jù)還有未讀取完的郭蕉,就將含有剩余數(shù)據(jù)的IoBuffer 緩沖區(qū)保存到IoSession 中乏悄,以便下一次數(shù)據(jù)到來時可以從IoSession 中提取合并。如果發(fā)現(xiàn)本次數(shù)據(jù)全都讀取完畢恳不,則清空IoBuffer 緩沖區(qū)(讓父類進行接收下一個包)檩小。簡而言之,當你認為讀取到的數(shù)據(jù)已經(jīng)夠解碼了烟勋,那么就返回true规求,否則就返回false。這個CumulativeProtocolDecoder 其實最重要的工作就是幫你完成了數(shù)據(jù)的累積卵惦,因為這個工作是很煩瑣的阻肿。也就是說返回true,那么CumulativeProtocolDecoder會再次調(diào)用decoder沮尿,并把剩余的數(shù)據(jù)發(fā)下來丛塌;(意思就是會把剩余數(shù)據(jù)給doDecode()處理,剩余數(shù)據(jù)就是remaining()的數(shù)據(jù))畜疾,返回false就不處理剩余的赴邻,(不把剩余數(shù)據(jù)給doDecode()處理)當有新數(shù)據(jù)包來的時候就把剩余的數(shù)據(jù)和新的數(shù)據(jù)拼接在一起,然后再調(diào)用decoder啡捶。
下面附上一個完整的實例
1.消息的格式
包頭+消息長度(int)+消息內(nèi)容(json字符串)+包尾姥敛,包頭包尾是十六進制字符串00 aa bb cc,轉化成字節(jié)數(shù)組0, -86, -69, -52四個字節(jié),下面的完整實例有客戶端瞎暑,服務端彤敛,將會解析數(shù)據(jù),獲取其中的消息內(nèi)容(Json字符串)并且打印處理了赌,消息以字節(jié)數(shù)組的方式在服務端墨榄,客戶端之間傳遞。
服務端代碼
package com.my.mina;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
/**
* mina的Service端
*
* @author linbin
*
*/
public class MinaService {
public static void main(String[] args) {
// 創(chuàng)建一個非阻塞的server端的Socket
IoAcceptor acceptor = new NioSocketAcceptor();
// 添加日志過濾器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// 自定義解編碼器
// 設置Handler
acceptor.setHandler(new DemoServerHandler());
// 設置讀取數(shù)據(jù)的緩存區(qū)大小
acceptor.getSessionConfig().setReadBufferSize(2048);
// 讀寫通道10秒內(nèi)無操作進入空閑狀態(tài)
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
try {
// 綁定端口
acceptor.bind(new InetSocketAddress(20000));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("啟動服務");
}
/**
* @ClassName: DemoServerHandler
* @Description: 負責session對象的創(chuàng)建和監(jiān)聽以及消息的創(chuàng)建和接收監(jiān)聽
* @author chenzheng
* @date 2016-12-9 下午3:57:11
*/
private static class DemoServerHandler extends IoHandlerAdapter {
// 服務器與客戶端創(chuàng)建連接
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("服務器與客戶端創(chuàng)建連接...");
super.sessionCreated(session);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
System.out.println("服務器與客戶端連接打開...");
super.sessionOpened(session);
}
// 消息的接收處理
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
super.messageReceived(session, message);// 消息的接受
// 傳遞自定義解編碼器傳遞數(shù)組和解析數(shù)組丟包斷包的
String a = (String) message;
System.out.println("接收到的數(shù)據(jù):" + a);
session.write(a);
}
// 消息發(fā)送后調(diào)用
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
super.messageSent(session, message);
System.out.println("服務器發(fā)送消息成功...");
}
// session關閉
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionClosed(session);
System.out.println("斷開連接:");
}
}
}
編碼器
package com.my.mina;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import java.nio.charset.Charset;
/**
* 編碼器
*
*/
public class ByteArrayEncoder extends ProtocolEncoderAdapter {
private final Charset charset;
public ByteArrayEncoder(Charset charset) {
this.charset = charset;
}
/**
* 直接將數(shù)據(jù)發(fā)出去,數(shù)據(jù)格式勿她,包頭+消息長度(int)+消息內(nèi)容(json字符串)+包尾 包頭包尾是十六進制字符串00 aa bb cc,轉化成字節(jié)數(shù)組0,
* -86, -69, -52四個字節(jié)
*
* @param session
* @param message
* @param out
* @throws Exception
*/
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
// 仿項目袄秩,解決斷包,粘包問題
String value = (message == null ? "" : message.toString());// 消息值
byte[] content = value.getBytes(charset);// 消息內(nèi)容,字節(jié)數(shù)組
IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// 緩沖區(qū)容量大小38字節(jié)加上字符長度
buf.put(new byte[] { 0, -86, -69, -52 });// 輸入包開頭固定值十六進制00 aa bb cc,轉化成字節(jié)數(shù)組
buf.putUnsignedInt(content.length);// int為4字節(jié)嫂拴,一個字節(jié)等于2個16進制字符播揪,所以有八位 00 00 00 0c,內(nèi)容長度筒狠。
buf.put(content);// 消息內(nèi)容
buf.put(new byte[] { 0, -86, -69, -52 });// 包尾
buf.flip();
out.write(buf);// 寫入
}
}
解碼器,重點箱沦,解決Mina斷包辩恼,丟包問題
package com.my.mina;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import java.nio.charset.Charset;
/**
* 自定義解碼器,確保能讀到完整的包
*/
public class ByteArrayDecoder extends CumulativeProtocolDecoder {
private final Charset charset;
public ByteArrayDecoder(Charset charset) {
this.charset = charset;
}
@Override
protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)
throws Exception {
// 丟包,斷包處理
if (ioBuffer.remaining() > 4)// 有包頭灶伊,包頭足夠
{
ioBuffer.mark();// 標記當前position的快照標記mark疆前,以便后繼的reset操作能恢復position位置,開始是0
byte[] l = new byte[4];
ioBuffer.get(l);// 讀取包頭聘萨,占4個字節(jié)
if (ioBuffer.remaining() < 4)// 內(nèi)容長度的4個字節(jié)不夠竹椒,斷包
{
ioBuffer.reset();
return false;//
} else {// 內(nèi)容長度的4個字節(jié)數(shù)組足夠
byte[] bytesLegth = new byte[4];// 內(nèi)容長度
ioBuffer.get(bytesLegth);// 讀取內(nèi)容長度,int類型,占四個字節(jié)
int len = MinaUtil.byteArrayToInt(bytesLegth);// 內(nèi)容長度有多少
if (ioBuffer.remaining() < len)// 內(nèi)容不夠米辐,斷包
{
ioBuffer.reset();
return false;//
} else { // 消息內(nèi)容足夠
byte[] bytes = new byte[len];
ioBuffer.get(bytes, 0, len);
protocolDecoderOutput.write(new String(bytes, charset));// 讀取內(nèi)容胸完,并且發(fā)送
if (ioBuffer.remaining() < 4) {// 包尾不夠
ioBuffer.reset();
return false;//
} else {// 包尾足夠
byte[] tails = new byte[4];
ioBuffer.get(tails);// 讀取包尾
if (ioBuffer.remaining() > 0)// 最后如果粘了包,會再次調(diào)用doDeocde()方法翘贮,把剩余數(shù)據(jù)給doDeocde()方法處理
{
return true;
}
}
}
}
}
return false;// 斷包赊窥,或者執(zhí)行完,
}
}
解編碼工廠
package com.my.mina;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import java.nio.charset.Charset;
/**
* 自定義解編碼器工廠
*
*/
public class ByteArrayCodecFactory implements ProtocolCodecFactory {
private ByteArrayDecoder decoder;
private ByteArrayEncoder encoder;
public ByteArrayCodecFactory() {
this(Charset.defaultCharset());
}
public ByteArrayCodecFactory(Charset charSet) {
encoder = new ByteArrayEncoder(charSet);
decoder = new ByteArrayDecoder(charSet);
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}
}
注意:客戶端狸页,服務端需要和服務端有同樣的解碼器锨能,編碼器,解編碼工廠這三個類芍耘。
客戶端核心代碼
package com.example.mina.minaapplication.view;
import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;
import com.example.mina.minaapplication.R;
import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* Mina客戶端
*/
public class MainActivity extends Activity {
/**
* 線程池址遇,避免阻塞主線程,與服務器建立連接使用斋竞,創(chuàng)建一個只有單線程的線程池傲隶,盡快執(zhí)行線程的線程池
*/
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* 連接對象
*/
private NioSocketConnector mConnection;
/**
* session對象
*/
private IoSession mSession;
/**
* 連接服務器的地址
*/
private InetSocketAddress mAddress;
private ConnectFuture mConnectFuture;
public static final int UPADTE_TEXT = 1;
/**
* 服務端返回的信息
*/
private TextView tvShow;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvShow = findViewById(R.id.tv_show);
initConfig();
connect();
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//發(fā)送消息數(shù)據(jù)
@Override
public void onClick(View view) {
if (mConnectFuture != null && mConnectFuture.isConnected()) {//與服務器連接上
mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//發(fā)送json字符串
}
}
});
}
/**
* 初始化Mina配置信息
*/
private void initConfig() {
mAddress = new InetSocketAddress("192.168.0.1", 20000);//連接地址,此數(shù)據(jù)可改成自己要連接的IP和端口號
mConnection = new NioSocketConnector();// 創(chuàng)建連接
// 設置讀取數(shù)據(jù)的緩存區(qū)大小
SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig();
socketSessionConfig.setReadBufferSize(2048);
socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//設置4秒沒有讀寫操作進入空閑狀態(tài)
mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging過濾器
mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//自定義解編碼器
mConnection.setHandler(new DefaultHandler());//設置handler
mConnection.setDefaultRemoteAddress(mAddress);//設置地址
}
/**
* 創(chuàng)建連接
*/
private void connect() {
FutureTask<Void> futureTask = new FutureTask<>(new Callable<Void>() {
@Override
public Void call() {//
try {
while (true) {
mConnectFuture = mConnection.connect();
mConnectFuture.awaitUninterruptibly();//一直等到他連接為止
mSession = mConnectFuture.getSession();//獲取session對象
if (mSession != null && mSession.isConnected()) {
Toast.makeText(MainActivity.this, "連接成功", Toast.LENGTH_SHORT).show();
break;
}
Thread.sleep(3000);//每隔三秒循環(huán)一次
}
} catch (Exception e) {//連接異常
}
return null;
}
});
executorService.execute(futureTask);//執(zhí)行連接線程
}
/**
* Mina處理消息的handler,從服務端返回的消息一般在這里處理
*/
private class DefaultHandler extends IoHandlerAdapter {
@Override
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
}
/**
* 接收到服務器端消息
*
* @param session
* @param message
* @throws Exception
*/
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
Log.e("tag", "接收到服務器端消息:" + message.toString());
Message message1 = new Message();
message1.what = UPADTE_TEXT;
message1.obj = message;
handler.sendMessage(message1);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//客戶端進入空閑狀態(tài).
super.sessionIdle(session, status);
}
}
/**
* 更新UI
*/
private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case UPADTE_TEXT:
String message = (String) msg.obj;
tvShow.setText(message);
break;
}
}
};
}
客戶端截圖:
服務端截圖:
本文完整項目代碼地址(歡迎來star):
https://github.com/lb1207087645/Android-Mina-master
參考資源: