前言
使用Mina大概也有半年了,一直忙于開發(fā)而忘了總結让蕾,項目里的業(yè)務系統(tǒng)只需要發(fā)送文字消息皿淋,而TCP底層是不區(qū)分文字還是文件的,所有的應用層報文最終都要轉換為字節(jié)流妥粟,有了上次開發(fā)語音通話的經(jīng)驗审丘,這次來嘗試一下文件的傳輸,希望這篇文章對初學者有所幫助勾给!
本文將提供部分Mina框架的原理,使用Mina開發(fā)文件傳輸協(xié)議的思路滩报、解決方案和遇到的難題。如所述的內(nèi)容有錯誤還望指出播急!
Mina框架的JAR開發(fā)包可以到官方網(wǎng)站下載:
http://mina.apache.org/
為什么使用Mina框架
Mina提供基于Java NIO的Reactor網(wǎng)絡模型API脓钾,并且封裝了會話層、表示層桩警,減輕開發(fā)者開發(fā)編碼器可训、解碼器的負擔,即便不需要NIO的特性也能很大程序的提高開發(fā)效率捶枢。通過Mina內(nèi)置的API可以方便地解決粘包缺包問題握截,方便的將字節(jié)報文轉換為應用層消息報文,通過IDLE事件可以輕松開發(fā)心跳協(xié)議烂叔,支持并發(fā)地處理應用層報文川蒙,滿足于文件的并發(fā)隨機讀寫需求。
Java NIO相比于傳統(tǒng)的BIO(阻塞IO)模型长已,不需要為每一個連接創(chuàng)建一個線程,而是通過一個或一組線程監(jiān)聽多路復用器Selector中的事件昼牛,然后由具體的事件決定哪些線程去處理术瓮。多路復用器的事件變化是由底層驅動的,因此開發(fā)者可以通過輪詢事件的變化來編程贰健。
Mina框架使用固定的一個單一線程池創(chuàng)建或監(jiān)聽連接事件胞四,IoConnector和IoAcceptor都有一個Selector對象,該對象上注冊了OP_CONNECT或者OP_ACCEPT事件伶椿,由IoConnector和IoAcceptor各自的單一線程池輪詢辜伟。
當連接通道創(chuàng)建完畢,它會被封裝為一個IoSession接口脊另,然后分配給某個Processor線程執(zhí)行該會話的讀寫导狡。每個Processor都只維護一個Selector對象,IoSession相關事件就是被注冊到這個Selector對象上偎痛。Processor線程通過select輪詢注冊在該Selector上的事件旱捧。
因此某個IoSession的讀寫一定是按順序由某個Processor線程執(zhí)行的,因為每個IoSession只注冊到一個Selector上,因為一個Selector對應一個Processor線程,所以每個IoSession的讀寫也是固定在一個Processor線程上執(zhí)行的枚赡。這一點是至關重要的氓癌,因為使用單個線程順序讀寫通道的數(shù)據(jù)才可以保證數(shù)據(jù)發(fā)送和接收的有序性。如果用多個線程并發(fā)讀寫通道贫橙,并不能保證數(shù)據(jù)發(fā)送和接收的順序贪婉。
雖然底層字節(jié)流數(shù)據(jù)的順序一定要保證有序性,但是封裝成應用層報文對象后并不一定要順序處理卢肃,Mina提供了ExecutorFilter提供并發(fā)處理應用層報文的能力疲迂。
Mina框架底層具體是如何讀寫的?請閱讀另一篇文章 《Mina框架會話讀寫源碼分析》
文件傳輸思路
文件的傳輸實際就是文件報文的設計問題践剂。設計好的文件報文鬼譬,由編碼器轉換為底層字節(jié)流發(fā)送,接收端通過解碼器接收字節(jié)流并解決粘包逊脯、缺包問題优质,最后轉換為文件報文對象由具體的消息處理器處理。
文件協(xié)議的設計既要保證文件傳輸?shù)耐暾跃荩€要考慮如并發(fā)巩螃、斷點等額外功能需求,由要兼顧性能負載方面的需求匕争。
本例實現(xiàn)了并發(fā)隨機讀取文件傳輸和并發(fā)隨機寫入文件避乏。
應用層報文設計
RequestSendFileMessage:請求發(fā)送文件,包含文件名甘桑、文件MD5拍皮、文件長度、業(yè)務編號跑杭、壓縮后的文件長度铆帽、文件分段長度。
setType(TYPE_REQUEST_SEND_FILE);
JSONObject json=new JSONObject();
json.put("fileName", fileTask.fileName);
json.put("md5", fileTask.md5);
json.put("size", fileTask.size);
json.put("id", fileTask.id);
json.put("zippedFileSize", fileTask.zippedFileSize);
json.put("fileSegmentSize", fileTask.fileSegmentSize);
setBody(json.toString().getBytes(Charset.forName("UTF-8")));
AcceptReceiveFileMessage:同意接收文件報文,包含業(yè)務編號德谅。
setType(TYPE_ACCEPT_RECEIVE_FILE);
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", id);
setBody(jsonObject.toString());
RefuseReceiveFileMessage:拒絕接收文件報文爹橱,包含業(yè)務編號、錯誤代號窄做、錯誤描述愧驱。
setType(TYPE_REFUSE_RECEIVE_FILE);
JSONObject json = new JSONObject();
json.put("id", id);
json.put("errorCode", errorCode);
json.put("des", des);
setBody(json.toString());
SendFilePartMessage:發(fā)送文件分段報文,包含業(yè)務編號椭盏、分段編號组砚、文件分段字節(jié)流.
setType(TYPE_SEND_FILE_PART);
JSONObject json = new JSONObject();
json.put("id", id);
json.put("partId", partId);
byte[] jsonBody = json.toString().getBytes("UTF-8");
ByteBuffer buffer = ByteBuffer.allocate(jsonBody.length + 2 + data.length);
buffer.putShort((short) jsonBody.length);
buffer.put(jsonBody);
buffer.put(data);
setBody(buffer.array());
ReceiveFileFinishMessage:通知對方已完成文件的接收和是否完成對文件的MD5校驗。
setType(TYPE_RECEIVE_FILE_PART);
JSONObject json = new JSONObject();
json.put("id", id);
json.put("success", success);
setBody(json.toString());
流程分析
發(fā)送者先將原文件壓縮掏颊,計算出壓縮后的文件的長度惫确,除以分段長度得到分段總數(shù),一并發(fā)送給接收端:
一方A請求發(fā)送文件,另一方B判斷是否可以接收文件改化,然后通知A發(fā)送文件:
A收到B回復開發(fā)文件分段掩蛤,先計算出文件要分為多少個應用報文,然后由多線程隨機讀取文件發(fā)送:
B依次收到SendFilePartMessage報文陈肛,采用并發(fā)隨機寫入的方式將文件分段寫入本地硬盤揍鸟,當文件的同步分段計數(shù)器數(shù)值等于總分段數(shù),說明任務完成句旱,通知發(fā)送端校驗結果:
發(fā)送者收到 ReceiveFileFinishMessage阳藻,將本地的文件任務刪除。
實現(xiàn)分析
壓縮/解壓文件
本例中使用了GZIPInputStream/GZIPInputStream壓縮與解壓文件谈撒,可以節(jié)約一定的網(wǎng)絡傳輸流量:
public static void zipFile(String source, String target) throws IOException {
FileInputStream fin = null;
FileOutputStream fout = null;
GZIPOutputStream gzout = null;
try {
fin = new FileInputStream(source);
fout = new FileOutputStream(target);
gzout = new GZIPOutputStream(fout);
byte[] buf = new byte[1024];
int num;
while ((num = fin.read(buf)) != -1) {
gzout.write(buf, 0, num);
}
} finally {
if (gzout != null)
gzout.close();
if (fout != null)
fout.close();
if (fin != null)
fin.close();
}
}
public static void unZipFile(String source, String target) throws IOException {
FileInputStream fin = null;
GZIPInputStream gzin = null;
FileOutputStream fout = null;
try {
fin = new FileInputStream(source);
gzin = new GZIPInputStream(fin);
fout = new FileOutputStream(target);
byte[] buf = new byte[1024];
int num;
while ((num = gzin.read(buf, 0, buf.length)) != -1) {
fout.write(buf, 0, num);
}
} finally {
if (fout != null)
fout.close();
if (gzin != null)
gzin.close();
if (fin != null)
fin.close();
}
}
文件的并發(fā)隨機讀寫
利用多線程并發(fā)讀寫文件需要用到RandomAccessFile腥泥,因為FileInputStream/FileReader、FileOutputStream/FileWriter均不支持在文件內(nèi)部使用搜尋方法啃匿,它們的讀寫是流式進行的蛔外。RandomAccessFile內(nèi)部有一個指針標記了當前讀寫文件的位置,通過使用seek方法可以將該指針定位到文件的任意位置實現(xiàn)隨機讀寫溯乒。
讀取指定位置的文件分段:
RandomAccessFile randomAccessFile = new RandomAccessFile(
fileTask.zippedFilePath, "rw");
byte[] buffer = new byte[fileTask.fileSegmentSize];
int availableSize;
randomAccessFile.seek(pardIdLocal * fileTask.fileSegmentSize);
availableSize = randomAccessFile.read(buffer);
randomAccessFile.close();
寫入指定位置的文件分段:
RandomAccessFile randomAccessFile = new RandomAccessFile(
fileTask.zippedFilePath, "rw");
long beginIndex = fileTask.fileSegmentSize * filePart.partId;
randomAccessFile.seek(beginIndex);
// System.out.println("file length = "+randomAccessFile.length()+" , beginIndex = "+beginIndex);
randomAccessFile.write(filePart.data);
randomAccessFile.close();
編碼器與解碼器
了解TCP協(xié)議原理的同學都知道TCP在實際的網(wǎng)絡傳輸過程中是有可能會被分片的夹厌,即便是我們通過API顯示發(fā)送了一個報文,在底層緩沖區(qū)中該報文也可能會被拆分重組再發(fā)送裆悄。
假設發(fā)送端的報文有2K個字節(jié)矛纹,那么接收端處理的過程中有可能分多次獲取這2K個字節(jié),每次獲取的數(shù)據(jù)都是不確定的光稼。
Mina中可以通過自定義解碼器和編碼器實現(xiàn)上傳應用報文對象到底層字節(jié)流的相關轉換或南。
所以通過TCP協(xié)議傳輸數(shù)據(jù)必須有明確的應用層報文規(guī)定以避免實際網(wǎng)絡傳輸中的缺包粘包問題。
本例中的應用層報文格式: |兩個字節(jié)的魔數(shù)|一個字節(jié)的消息類型定義 |兩個字節(jié)的消息數(shù)據(jù)長度|若干字節(jié)的消息數(shù)據(jù)|
消息基類:
public class SocketMessage implements Serializable{
public SocketMessage(){
}
public SocketMessage(SocketMessage msg){
setType(msg.getType());
setBody(msg.getBody());
}
private static final short MAX_BODY_LENGTH = 1400;
public static final byte HEADER1 = 0x5c;
public static final byte HEADER2 = 0x74;
private byte type;
//定義消息類型
public static final byte TYPE_HEART_BEAT = -1;
public static final byte TYPE_SEND_TEXT = 0;
public static final byte TYPE_REQUEST_SEND_FILE = 1;
public static final byte TYPE_ACCEPT_RECEIVE_FILE = 2;
public static final byte TYPE_REFUSE_RECEIVE_FILE = 3;
public static final byte TYPE_SEND_FILE_PART = 4;
public static final byte TYPE_RECEIVE_FILE_PART = 5;
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
private byte[] body;
public byte[] getBody() {
return body;
}
public String getJSONBody() {
return new String(body,Charset.forName("UTF-8"));
}
public void setBody(byte[] bytes){
body = bytes;
}
/**
* 設置消息體艾君,一般用json解析
*/
public void setBody(String str) {
this.body = str.getBytes(Charset.forName("UTF-8"));
}
}
編碼器:
public class BaseEncoder extends ProtocolEncoderAdapter{
@Override
public void encode(IoSession session, Object obj, ProtocolEncoderOutput output)
throws Exception {
SocketMessage msg = (SocketMessage) obj;
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
buffer.order(ByteOrder.BIG_ENDIAN);
buffer.put(SocketMessage.HEADER1);
buffer.put(SocketMessage.HEADER2);
buffer.put(msg.getType());
if(msg.getType()!=SocketMessage.TYPE_HEART_BEAT){
byte[] body = msg.getBody();
short bodyLength = (short) body.length;
buffer.putShort(bodyLength);
buffer.put(body);
}
buffer.flip();
output.write(buffer);
}
}
解碼器:
public class BaseDecoder extends CumulativeProtocolDecoder{
@Override
protected boolean doDecode(IoSession session,IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if(in.remaining()>=3){
in.mark(); // 標記當前位置迎献,方便reset
byte[] header = new byte[2];
in.get(header, 0, header.length);
if(header[0]== SocketMessage.HEADER1 && header[1]== SocketMessage.HEADER2){
byte type=in.get();
//System.out.println("類型 :"+type);
if(type==SocketMessage.TYPE_HEART_BEAT){
SocketMessage msg = new SocketMessage();
msg.setType(SocketMessage.TYPE_HEART_BEAT);
out.write(msg);
if(in.remaining()>=3){
return true;
}
}else{
if(in.remaining()>=2){
short bodyLength = in.getShort();
if(in.remaining()>=bodyLength){
byte[] body = new byte[bodyLength];
in.get(body, 0, bodyLength);
SocketMessage msg = new SocketMessage();
msg.setType(type);
msg.setBody(body);
out.write(msg);
if(in.remaining()>=3){
//再來一遍
return true;
}
}else{
//長度不夠
in.reset();
}
}else{
//長度不夠
in.reset();
}
}
}else{
System.err.println("HEADER[0] = "+header[0]+" , HEADER[1] = "+header[1]);
throw new IllegalArgumentException("錯誤的HEADER");
}
}
return false;
}
}
文件分段長度的設定
假設設定文件分段長度為 1KB,那么發(fā)送端的每個隨機讀取線程都將占用1KB的內(nèi)存。同理服務端處理文件分段接收的線程池的每個線程也會占用1KB的緩沖區(qū)腻贰。
雖然應用層報文在底層會被分片發(fā)送,但還是要注意扒秸,如果設置這個值過大播演,當執(zhí)行文件任務的會話增多,很容易內(nèi)存溢出伴奥。
本例中設計報文數(shù)據(jù)長度最大不能超過2^32K写烤,使用固定長度的線程池FixedThreadPool執(zhí)行文件的并發(fā)寫入,即使session數(shù)量很多但是緩沖區(qū)的數(shù)量是固定的拾徙。在使用CachedThreadPool要注意內(nèi)存問題洲炊。
發(fā)送端與接收端的分段同步
發(fā)送端和接收端都維護了一個FileTask對象,這個對象代表一個文件任務:
public class FileTask {
public boolean running = true;
public long id;
public String zippedFilePath;
public long zippedFileSize;
public String fileName;
public String filePath;
public String md5;
public long startTime;
public long startTime2; //網(wǎng)絡計時
public long size;
public int fileSegmentSize;
public AtomicInteger partId = new AtomicInteger(0);
}
其中的partId 是一個原子整型同步計數(shù)器,其原理是Compare And Swipe暂衡。發(fā)送端每發(fā)送一個SendFilePartMessage都將partId +1,同理接收端每接收一個SendFilePartMessage并將文件分段寫入后也執(zhí)行partId +1询微。
這種粗略的方式只能用于統(tǒng)計文件所有分段是否發(fā)送完成,如果要做斷點傳送還需要維護一個partId數(shù)組狂巢,服務端記錄那些partId已成功接收哪些沒有撑毛,在恢復文件傳輸時將該數(shù)組發(fā)送給發(fā)送端,這樣發(fā)送端就知道接下來哪些分段需要發(fā)送唧领。
測試
在項目中創(chuàng)建了兩個項目藻雌,一個客戶端用于發(fā)送文件,一個服務端處理文件的接收斩个。
客戶端:
14:47:05 已建立連接
14:47:05 session:1 已建立
開始壓縮原文件
壓縮結束 胯杭, 耗時:11478 ms
原文件大小:215089914 壓縮后大小:174372279
請求發(fā)送文件 G:\test2.pdf id 1496990826746
總計5677個分段
發(fā)送完畢
發(fā)送文件用時:2.447 s
平均速度:69589.46821043624 kbps
對方接收并校驗成功,總計用時(壓縮+發(fā)送文件+解壓):21.057 s
移除FileTask 1496990826746
服務端:
已綁定 8890
14:47:05 session:1 已建立
收到接收文件請求 : test2.pdf id 1496990826746
預計占用空間 : 389462193 Bytes
同意接收文件:test2.pdf 臨時存放路徑:G:\server_download\1496990838242_zipped_test2.pdf 目標路徑 G:\server_download\1496990838242_test2.pdf
文件 G:\server_download\1496990838242_zipped_test2.pdf 接收完畢 文件總計174372279字節(jié)
開始解壓
解壓結束 受啥, 耗時:3498 ms
原文件大小:215089914 壓縮文件大小:174372279
原MD5:594a131a0166f3c6e4043f77eb6c0a2b
下載文件md5:594a131a0166f3c6e4043f77eb6c0a2b
校驗通過
移除FileTask 1496990826746
因為是本地測試所以速度很快做个,放到實際的網(wǎng)絡中會遇到延時、斷線異常等各種問題腔呜,待完善叁温。
項目地址:MinaExample