使用Mina開發(fā)文件傳輸協(xié)議

前言

使用Mina大概也有半年了,一直忙于開發(fā)而忘了總結让蕾,項目里的業(yè)務系統(tǒng)只需要發(fā)送文字消息皿淋,而TCP底層是不區(qū)分文字還是文件的,所有的應用層報文最終都要轉換為字節(jié)流妥粟,有了上次開發(fā)語音通話的經(jīng)驗审丘,這次來嘗試一下文件的傳輸,希望這篇文章對初學者有所幫助勾给!

本文將提供部分Mina框架的原理,使用Mina開發(fā)文件傳輸協(xié)議的思路滩报、解決方案和遇到的難題。如所述的內(nèi)容有錯誤還望指出播急!

Mina框架的JAR開發(fā)包可以到官方網(wǎng)站下載:
http://mina.apache.org/

為什么使用Mina框架

Mina提供基于Java NIO的Reactor網(wǎng)絡模型API脓钾,并且封裝了會話層、表示層桩警,減輕開發(fā)者開發(fā)編碼器可训、解碼器的負擔,即便不需要NIO的特性也能很大程序的提高開發(fā)效率捶枢。通過Mina內(nèi)置的API可以方便地解決粘包缺包問題握截,方便的將字節(jié)報文轉換為應用層消息報文,通過IDLE事件可以輕松開發(fā)心跳協(xié)議烂叔,支持并發(fā)地處理應用層報文川蒙,滿足于文件的并發(fā)隨機讀寫需求。

Java NIO相比于傳統(tǒng)的BIO(阻塞IO)模型长已,不需要為每一個連接創(chuàng)建一個線程,而是通過一個或一組線程監(jiān)聽多路復用器Selector中的事件昼牛,然后由具體的事件決定哪些線程去處理术瓮。多路復用器的事件變化是由底層驅動的,因此開發(fā)者可以通過輪詢事件的變化來編程贰健。

借用一張Mina框架圖

Mina框架使用固定的一個單一線程池創(chuàng)建或監(jiān)聽連接事件胞四,IoConnector和IoAcceptor都有一個Selector對象,該對象上注冊了OP_CONNECT或者OP_ACCEPT事件伶椿,由IoConnector和IoAcceptor各自的單一線程池輪詢辜伟。

當連接通道創(chuàng)建完畢,它會被封裝為一個IoSession接口脊另,然后分配給某個Processor線程執(zhí)行該會話的讀寫导狡。每個Processor都只維護一個Selector對象,IoSession相關事件就是被注冊到這個Selector對象上偎痛。Processor線程通過select輪詢注冊在該Selector上的事件旱捧。

因此某個IoSession的讀寫一定是按順序由某個Processor線程執(zhí)行的,因為每個IoSession只注冊到一個Selector上,因為一個Selector對應一個Processor線程,所以每個IoSession的讀寫也是固定在一個Processor線程上執(zhí)行的枚赡。這一點是至關重要的氓癌,因為使用單個線程順序讀寫通道的數(shù)據(jù)才可以保證數(shù)據(jù)發(fā)送和接收的有序性。如果用多個線程并發(fā)讀寫通道贫橙,并不能保證數(shù)據(jù)發(fā)送和接收的順序贪婉。

雖然底層字節(jié)流數(shù)據(jù)的順序一定要保證有序性,但是封裝成應用層報文對象后并不一定要順序處理卢肃,Mina提供了ExecutorFilter提供并發(fā)處理應用層報文的能力疲迂。

Mina框架底層具體是如何讀寫的?請閱讀另一篇文章 《Mina框架會話讀寫源碼分析》

文件傳輸思路

文件的傳輸實際就是文件報文的設計問題践剂。設計好的文件報文鬼譬,由編碼器轉換為底層字節(jié)流發(fā)送,接收端通過解碼器接收字節(jié)流并解決粘包逊脯、缺包問題优质,最后轉換為文件報文對象由具體的消息處理器處理。

文件協(xié)議的設計既要保證文件傳輸?shù)耐暾跃荩€要考慮如并發(fā)巩螃、斷點等額外功能需求,由要兼顧性能負載方面的需求匕争。

本例實現(xiàn)了并發(fā)隨機讀取文件傳輸和并發(fā)隨機寫入文件避乏。

應用層報文設計

文件協(xié)議.png

RequestSendFileMessage:請求發(fā)送文件,包含文件名甘桑、文件MD5拍皮、文件長度、業(yè)務編號跑杭、壓縮后的文件長度铆帽、文件分段長度。

        setType(TYPE_REQUEST_SEND_FILE);
        JSONObject json=new JSONObject();
        json.put("fileName", fileTask.fileName);
        json.put("md5", fileTask.md5);
        json.put("size", fileTask.size);
        json.put("id", fileTask.id);
        json.put("zippedFileSize", fileTask.zippedFileSize);
        json.put("fileSegmentSize", fileTask.fileSegmentSize);
        setBody(json.toString().getBytes(Charset.forName("UTF-8")));

AcceptReceiveFileMessage:同意接收文件報文,包含業(yè)務編號德谅。

        setType(TYPE_ACCEPT_RECEIVE_FILE);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", id);
        setBody(jsonObject.toString());

RefuseReceiveFileMessage:拒絕接收文件報文爹橱,包含業(yè)務編號、錯誤代號窄做、錯誤描述愧驱。

        setType(TYPE_REFUSE_RECEIVE_FILE);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("errorCode", errorCode);
        json.put("des", des);
        setBody(json.toString());

SendFilePartMessage:發(fā)送文件分段報文,包含業(yè)務編號椭盏、分段編號组砚、文件分段字節(jié)流.

        setType(TYPE_SEND_FILE_PART);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("partId", partId);
        byte[] jsonBody = json.toString().getBytes("UTF-8");
        ByteBuffer buffer = ByteBuffer.allocate(jsonBody.length + 2 + data.length); 
        buffer.putShort((short) jsonBody.length);
        buffer.put(jsonBody);
        buffer.put(data);
        setBody(buffer.array());

ReceiveFileFinishMessage:通知對方已完成文件的接收和是否完成對文件的MD5校驗。

        setType(TYPE_RECEIVE_FILE_PART);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("success", success);
        setBody(json.toString());

流程分析

發(fā)送者先將原文件壓縮掏颊,計算出壓縮后的文件的長度惫确,除以分段長度得到分段總數(shù),一并發(fā)送給接收端:

準備發(fā)送文件.png

一方A請求發(fā)送文件,另一方B判斷是否可以接收文件改化,然后通知A發(fā)送文件:

文件請求及回復.png

A收到B回復開發(fā)文件分段掩蛤,先計算出文件要分為多少個應用報文,然后由多線程隨機讀取文件發(fā)送:

發(fā)送文件分段.png

B依次收到SendFilePartMessage報文陈肛,采用并發(fā)隨機寫入的方式將文件分段寫入本地硬盤揍鸟,當文件的同步分段計數(shù)器數(shù)值等于總分段數(shù),說明任務完成句旱,通知發(fā)送端校驗結果:

接收文件分段.png

發(fā)送者收到 ReceiveFileFinishMessage阳藻,將本地的文件任務刪除。

實現(xiàn)分析

壓縮/解壓文件

本例中使用了GZIPInputStream/GZIPInputStream壓縮與解壓文件谈撒,可以節(jié)約一定的網(wǎng)絡傳輸流量:

public static void zipFile(String source, String target) throws IOException {
        FileInputStream fin = null;
        FileOutputStream fout = null;
        GZIPOutputStream gzout = null;
        try {
            fin = new FileInputStream(source);
            fout = new FileOutputStream(target);
            gzout = new GZIPOutputStream(fout);
            byte[] buf = new byte[1024];
            int num;
            while ((num = fin.read(buf)) != -1) {
                gzout.write(buf, 0, num);
            }
        } finally {
            if (gzout != null)
                gzout.close();
            if (fout != null)
                fout.close();
            if (fin != null)
                fin.close();
        }
    }

public static void unZipFile(String source, String target) throws IOException {
        FileInputStream fin = null;
        GZIPInputStream gzin = null;
        FileOutputStream fout = null;
        try {
            fin = new FileInputStream(source);
            gzin = new GZIPInputStream(fin);
            fout = new FileOutputStream(target);
            byte[] buf = new byte[1024];
            int num;
            while ((num = gzin.read(buf, 0, buf.length)) != -1) {
                fout.write(buf, 0, num);
            }
        } finally {
            if (fout != null)
                fout.close();
            if (gzin != null)
                gzin.close();
            if (fin != null)
                fin.close();
        }
    }

文件的并發(fā)隨機讀寫

利用多線程并發(fā)讀寫文件需要用到RandomAccessFile腥泥,因為FileInputStream/FileReader、FileOutputStream/FileWriter均不支持在文件內(nèi)部使用搜尋方法啃匿,它們的讀寫是流式進行的蛔外。RandomAccessFile內(nèi)部有一個指針標記了當前讀寫文件的位置,通過使用seek方法可以將該指針定位到文件的任意位置實現(xiàn)隨機讀寫溯乒。

讀取指定位置的文件分段:

                RandomAccessFile randomAccessFile = new RandomAccessFile(
                        fileTask.zippedFilePath, "rw");
                byte[] buffer = new byte[fileTask.fileSegmentSize];
                int availableSize;
                randomAccessFile.seek(pardIdLocal * fileTask.fileSegmentSize);
                availableSize = randomAccessFile.read(buffer);
                randomAccessFile.close();

寫入指定位置的文件分段:

                RandomAccessFile randomAccessFile = new RandomAccessFile(
                        fileTask.zippedFilePath, "rw");
                long beginIndex = fileTask.fileSegmentSize * filePart.partId;
                randomAccessFile.seek(beginIndex);
                // System.out.println("file length = "+randomAccessFile.length()+" , beginIndex = "+beginIndex);
                randomAccessFile.write(filePart.data);
                randomAccessFile.close();

編碼器與解碼器

了解TCP協(xié)議原理的同學都知道TCP在實際的網(wǎng)絡傳輸過程中是有可能會被分片的夹厌,即便是我們通過API顯示發(fā)送了一個報文,在底層緩沖區(qū)中該報文也可能會被拆分重組再發(fā)送裆悄。

假設發(fā)送端的報文有2K個字節(jié)矛纹,那么接收端處理的過程中有可能分多次獲取這2K個字節(jié),每次獲取的數(shù)據(jù)都是不確定的光稼。

Mina中可以通過自定義解碼器和編碼器實現(xiàn)上傳應用報文對象到底層字節(jié)流的相關轉換或南。

所以通過TCP協(xié)議傳輸數(shù)據(jù)必須有明確的應用層報文規(guī)定以避免實際網(wǎng)絡傳輸中的缺包粘包問題。

本例中的應用層報文格式: |兩個字節(jié)的魔數(shù)|一個字節(jié)的消息類型定義 |兩個字節(jié)的消息數(shù)據(jù)長度|若干字節(jié)的消息數(shù)據(jù)|

消息基類:

public class SocketMessage implements Serializable{

    public SocketMessage(){
    }
    
    public SocketMessage(SocketMessage msg){
        setType(msg.getType());
        setBody(msg.getBody());
    }

    private static final short MAX_BODY_LENGTH = 1400;
    public static final byte HEADER1 = 0x5c;
    public static final byte HEADER2 = 0x74;
    private byte type;
    
    //定義消息類型
    public static final byte TYPE_HEART_BEAT = -1;
    public static final byte TYPE_SEND_TEXT = 0;
    public static final byte TYPE_REQUEST_SEND_FILE = 1;
    public static final byte TYPE_ACCEPT_RECEIVE_FILE = 2;
    public static final byte TYPE_REFUSE_RECEIVE_FILE = 3;
    public static final byte TYPE_SEND_FILE_PART = 4;
    public static final byte TYPE_RECEIVE_FILE_PART = 5;

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    private byte[] body;


    public byte[] getBody() {
        return body;
    }
    
    public String getJSONBody() {
        return new String(body,Charset.forName("UTF-8"));
    }

    public void setBody(byte[] bytes){
        body = bytes;
    }
    
    /**
     * 設置消息體艾君,一般用json解析
     */
    public void setBody(String str) {
        this.body = str.getBytes(Charset.forName("UTF-8"));
    }
}

編碼器:

public class BaseEncoder extends ProtocolEncoderAdapter{

    @Override
    public void encode(IoSession session, Object obj, ProtocolEncoderOutput output)
            throws Exception {
        SocketMessage msg = (SocketMessage) obj;
        IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
        buffer.order(ByteOrder.BIG_ENDIAN);
        buffer.put(SocketMessage.HEADER1);
        buffer.put(SocketMessage.HEADER2);
        buffer.put(msg.getType());
        if(msg.getType()!=SocketMessage.TYPE_HEART_BEAT){
            byte[] body = msg.getBody();
            short bodyLength = (short) body.length;
            buffer.putShort(bodyLength);
            buffer.put(body);
        }
        buffer.flip();      
        output.write(buffer); 
    }
}

解碼器:

public class BaseDecoder extends CumulativeProtocolDecoder{
    @Override
    protected boolean doDecode(IoSession session,IoBuffer in,
                               ProtocolDecoderOutput out) throws Exception {
        if(in.remaining()>=3){
            in.mark(); // 標記當前位置迎献,方便reset
            byte[] header = new byte[2];
            in.get(header, 0, header.length);
            if(header[0]== SocketMessage.HEADER1 && header[1]== SocketMessage.HEADER2){
                byte type=in.get();
                //System.out.println("類型 :"+type);
                if(type==SocketMessage.TYPE_HEART_BEAT){
                    SocketMessage msg = new SocketMessage();
                    msg.setType(SocketMessage.TYPE_HEART_BEAT);
                    out.write(msg);
                    if(in.remaining()>=3){
                        return true;
                    }
                }else{
                    if(in.remaining()>=2){
                        short bodyLength = in.getShort();
                        if(in.remaining()>=bodyLength){
                            byte[] body = new byte[bodyLength];
                            in.get(body, 0, bodyLength);
                            SocketMessage msg = new SocketMessage();
                            msg.setType(type);
                            msg.setBody(body);
                            out.write(msg);
                            if(in.remaining()>=3){
                                //再來一遍
                                return true; 
                            }
                        }else{
                            //長度不夠
                            in.reset();
                        }
                    }else{
                        //長度不夠
                        in.reset();
                    }
                }
            }else{
                System.err.println("HEADER[0] = "+header[0]+" , HEADER[1] = "+header[1]);
                throw new IllegalArgumentException("錯誤的HEADER");
            }
        }
        return false;
    }
}

文件分段長度的設定

假設設定文件分段長度為 1KB,那么發(fā)送端的每個隨機讀取線程都將占用1KB的內(nèi)存。同理服務端處理文件分段接收的線程池的每個線程也會占用1KB的緩沖區(qū)腻贰。

雖然應用層報文在底層會被分片發(fā)送,但還是要注意扒秸,如果設置這個值過大播演,當執(zhí)行文件任務的會話增多,很容易內(nèi)存溢出伴奥。

本例中設計報文數(shù)據(jù)長度最大不能超過2^32K写烤,使用固定長度的線程池FixedThreadPool執(zhí)行文件的并發(fā)寫入,即使session數(shù)量很多但是緩沖區(qū)的數(shù)量是固定的拾徙。在使用CachedThreadPool要注意內(nèi)存問題洲炊。

發(fā)送端與接收端的分段同步

發(fā)送端和接收端都維護了一個FileTask對象,這個對象代表一個文件任務:

public class FileTask {
    public boolean running = true;
    public long id;
    public String zippedFilePath;
    public long zippedFileSize;
    public String fileName;
    public String filePath;
    public String md5;
    public long startTime;
    public long startTime2; //網(wǎng)絡計時
    public long size;
    public int fileSegmentSize;
    public AtomicInteger partId = new AtomicInteger(0);
}

其中的partId 是一個原子整型同步計數(shù)器,其原理是Compare And Swipe暂衡。發(fā)送端每發(fā)送一個SendFilePartMessage都將partId +1,同理接收端每接收一個SendFilePartMessage并將文件分段寫入后也執(zhí)行partId +1询微。

這種粗略的方式只能用于統(tǒng)計文件所有分段是否發(fā)送完成,如果要做斷點傳送還需要維護一個partId數(shù)組狂巢,服務端記錄那些partId已成功接收哪些沒有撑毛,在恢復文件傳輸時將該數(shù)組發(fā)送給發(fā)送端,這樣發(fā)送端就知道接下來哪些分段需要發(fā)送唧领。

測試

在項目中創(chuàng)建了兩個項目藻雌,一個客戶端用于發(fā)送文件,一個服務端處理文件的接收斩个。
客戶端:

14:47:05 已建立連接
14:47:05 session:1 已建立
開始壓縮原文件
壓縮結束 胯杭, 耗時:11478 ms
原文件大小:215089914 壓縮后大小:174372279
請求發(fā)送文件 G:\test2.pdf id 1496990826746
總計5677個分段
發(fā)送完畢
發(fā)送文件用時:2.447 s
平均速度:69589.46821043624 kbps
對方接收并校驗成功,總計用時(壓縮+發(fā)送文件+解壓):21.057 s
移除FileTask 1496990826746

服務端:

已綁定 8890
14:47:05 session:1 已建立
收到接收文件請求 : test2.pdf id 1496990826746
預計占用空間 : 389462193 Bytes 
同意接收文件:test2.pdf 臨時存放路徑:G:\server_download\1496990838242_zipped_test2.pdf 目標路徑 G:\server_download\1496990838242_test2.pdf
文件 G:\server_download\1496990838242_zipped_test2.pdf 接收完畢 文件總計174372279字節(jié)
開始解壓
解壓結束 受啥, 耗時:3498 ms
原文件大小:215089914 壓縮文件大小:174372279
原MD5:594a131a0166f3c6e4043f77eb6c0a2b
下載文件md5:594a131a0166f3c6e4043f77eb6c0a2b
校驗通過
移除FileTask 1496990826746

因為是本地測試所以速度很快做个,放到實際的網(wǎng)絡中會遇到延時、斷線異常等各種問題腔呜,待完善叁温。

項目地址:MinaExample

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市核畴,隨后出現(xiàn)的幾起案子膝但,更是在濱河造成了極大的恐慌,老刑警劉巖谤草,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跟束,死亡現(xiàn)場離奇詭異,居然都是意外死亡丑孩,警方通過查閱死者的電腦和手機冀宴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來温学,“玉大人略贮,你說我怎么就攤上這事≌提” “怎么了逃延?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長轧拄。 經(jīng)常有香客問我揽祥,道長,這世上最難降的妖魔是什么檩电? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任拄丰,我火速辦了婚禮府树,結果婚禮上,老公的妹妹穿的比我還像新娘料按。我一直安慰自己奄侠,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布站绪。 她就那樣靜靜地躺著遭铺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪恢准。 梳的紋絲不亂的頭發(fā)上魂挂,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音馁筐,去河邊找鬼涂召。 笑死,一個胖子當著我的面吹牛敏沉,可吹牛的內(nèi)容都是我干的果正。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼盟迟,長吁一口氣:“原來是場噩夢啊……” “哼秋泳!你這毒婦竟也來了?” 一聲冷哼從身側響起攒菠,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤迫皱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后辖众,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卓起,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年凹炸,在試婚紗的時候發(fā)現(xiàn)自己被綠了戏阅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡啤它,死狀恐怖奕筐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情变骡,我是刑警寧澤离赫,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站锣光,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏铝耻。R本人自食惡果不足惜誊爹,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一蹬刷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧频丘,春花似錦办成、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至桐汤,卻和暖如春而克,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背怔毛。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工员萍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人拣度。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓碎绎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親抗果。 傳聞我的和親對象是個殘疾皇子筋帖,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 從三月份找實習到現(xiàn)在秧均,面了一些公司月洛,掛了不少,但最終還是拿到小米冯事、百度宿接、阿里赘淮、京東、新浪睦霎、CVTE梢卸、樂視家的研發(fā)崗...
    時芥藍閱讀 42,239評論 11 349
  • 非原創(chuàng)文章,網(wǎng)絡收集副女,如遇原作者蛤高,請私聊會標明出處! 1--11 tcp協(xié)議中三次握手和四次揮手建立TCP需要三次...
    Juinjonn閱讀 2,160評論 0 28
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 5,855評論 0 13
  • 一直時不時地收到一些用戶的詢問碑幅,「簡書什么時候公測按鞫浮?」沟涨,所以覺得有必要向一直等待的用戶匯報一下簡書近期的進展恤批。 ...
    簡書閱讀 1,704評論 0 7