JAVA 網(wǎng)絡(luò)編程 TCP/IP、Socket 和協(xié)議設(shè)計

【JAVA 網(wǎng)絡(luò)編程 TCP/IP嘴秸、Socket 和協(xié)議設(shè)計】

轉(zhuǎn)自:
TCP/IP、Socket 和協(xié)議設(shè)計

TCP/IP 協(xié)議簡介

IP

首先我們看 IP(Internet Protocol)協(xié)議昭躺。IP 協(xié)議提供了主機(jī)和主機(jī)間的通信。

為了完成不同主機(jī)的通信伪嫁,我們需要某種方式來唯一標(biāo)識一臺主機(jī)窍仰,這個標(biāo)識,就是著名的IP地址礼殊。通過IP地址驹吮,IP 協(xié)議就能夠幫我們把一個數(shù)據(jù)包發(fā)送給對方。

TCP

前面我們說過晶伦,IP 協(xié)議提供了主機(jī)和主機(jī)間的通信碟狞。
TCP 協(xié)議在 IP 協(xié)議提供的主機(jī)間通信功能的基礎(chǔ)上,完成這兩個主機(jī)上進(jìn)程對進(jìn)程的通信婚陪。

有了 IP族沃,不同主機(jī)就能夠交換數(shù)據(jù)。但是泌参,計算機(jī)收到數(shù)據(jù)后脆淹,并不知道這個數(shù)據(jù)屬于哪個進(jìn)程(簡單講,進(jìn)程就是一個正在運(yùn)行的應(yīng)用程序)沽一。TCP 的作用就在于盖溺,讓我們能夠知道這個數(shù)據(jù)屬于哪個進(jìn)程,從而完成進(jìn)程間的通信铣缠。

為了標(biāo)識數(shù)據(jù)屬于哪個進(jìn)程烘嘱,我們給需要進(jìn)行 TCP 通信的進(jìn)程分配一個唯一的數(shù)字來標(biāo)識它。這個數(shù)字蝗蛙,就是我們常說的端口號蝇庭。

三次握手

TCP 的全稱是 Transmission Control Protocol,大家對它說得最多的捡硅,大概就是面向連接的特性了哮内。之所以說它是有連接的,是說在進(jìn)行通信前壮韭,通信雙方需要先經(jīng)過一個三次握手的過程北发。三次握手完成后,連接便建立了泰涂。這時候我們才可以開始發(fā)送/接收數(shù)據(jù)鲫竞。(與之相對的是 UDP,不需要經(jīng)過握手逼蒙,就可以直接發(fā)送數(shù)據(jù))从绘。

下面我們簡單了解一下三次握手的過程。


tcp-three-way-handshake
  1. 首先,客戶向服務(wù)端發(fā)送一個 SYN僵井,假設(shè)此時 sequence number 為 x陕截。這個 x 是由操作系統(tǒng)根據(jù)一定的規(guī)則生成的,不妨認(rèn)為它是一個隨機(jī)數(shù)批什。
  2. 服務(wù)端收到 SYN 后农曲,會向客戶端再發(fā)送一個 SYN,此時服務(wù)器的 seq number = y驻债。與此同時乳规,會 ACK x+1,告訴客戶端“已經(jīng)收到了 SYN合呐,可以發(fā)送數(shù)據(jù)了”暮的。
  3. 客戶端收到服務(wù)器的 SYN 后,回復(fù)一個 ACK y+1淌实,這個 ACK 則是告訴服務(wù)器冻辩,SYN 已經(jīng)收到,服務(wù)器可以發(fā)送數(shù)據(jù)了拆祈。

經(jīng)過這 3 步恨闪,TCP 連接就建立了。這里需要注意的有三點:

  1. 連接是由客戶端主動發(fā)起的
  2. 在第 3 步客戶端向服務(wù)器回復(fù) ACK 的時候放坏,TCP 協(xié)議是允許我們攜帶數(shù)據(jù)的咙咽。之所以做不到,是 API 的限制導(dǎo)致的轻姿。
  3. TCP 協(xié)議還允許 “四次握手” 的發(fā)生犁珠,同樣的,由于 API 的限制互亮,這個極端的情況并不會發(fā)生。

TCP/IP 相關(guān)的理論知識我們就先了解到這里余素。關(guān)于 TCP豹休,還有諸如可靠性、流量控制桨吊、擁塞控制等非常有趣的特性威根,強(qiáng)烈推薦讀者看一看 Richard 的名著《TCP/IP 詳解 - 卷1》(注意,是第1版视乐,不是第2版)洛搀。

下面我們看一些偏實戰(zhàn)的東西。

Socket 基本用法

Socket 是 TCP 層的封裝佑淀,通過 socket留美,我們就能進(jìn)行 TCP 通信。

在 Java 的 SDK 中,socket 的共有兩個接口:用于監(jiān)聽客戶連接的 ServerSocket 和用于通信的 Socket谎砾。使用 socket 的步驟如下:

  1. 創(chuàng)建 ServerSocket 并監(jiān)聽客戶連接
  2. 使用 Socket 連接服務(wù)端
  3. 通過 Socket.getInputStream()/getOutputStream() 獲取輸入輸出流進(jìn)行通信

下面逢倍,我們通過實現(xiàn)一個簡單的 echo 服務(wù)來學(xué)習(xí) socket 的使用。所謂的 echo 服務(wù)景图,就是客戶端向服務(wù)端寫入任意數(shù)據(jù)较雕,服務(wù)器都將數(shù)據(jù)原封不動地寫回給客戶端。

1. 創(chuàng)建 ServerSocket 并監(jiān)聽客戶連接

public class EchoServer {

    private final ServerSocket mServerSocket;

    public EchoServer(int port) throws IOException {
        // 1. 創(chuàng)建一個 ServerSocket 并監(jiān)聽端口 port
        mServerSocket = new ServerSocket(port);
    }

    public void run() throws IOException {
        // 2. 開始接受客戶連接
        Socket client = mServerSocket.accept();
        handleClient(client);
    }

    private void handleClient(Socket socket) {
        // 3. 使用 socket 進(jìn)行通信 ...
    }


    public static void main(String[] argv) {
        try {
            EchoServer server = new EchoServer(9877);
            server.run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2. 使用 Socket 連接服務(wù)端

public class EchoClient {

    private final Socket mSocket;

    public EchoClient(String host, int port) throws IOException {
        // 創(chuàng)建 socket 并連接服務(wù)器
        mSocket = new Socket(host, port);
    }

    public void run() {
        // 和服務(wù)端進(jìn)行通信
    }


    public static void main(String[] argv) {
        try {
            // 由于服務(wù)端運(yùn)行在同一主機(jī)挚币,這里我們使用 localhost
            EchoClient client = new EchoClient("localhost", 9877);
            client.run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3. 通過 socket.getInputStream()/getOutputStream() 獲取輸入/輸出流進(jìn)行通信

首先亮蒋,我們來實現(xiàn)服務(wù)端:

public class EchoServer {
    // ...

    private void handleClient(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        byte[] buffer = new byte[1024];
        int n;
        while ((n = in.read(buffer)) > 0) {
            out.write(buffer, 0, n);
        }
    }
}

可以看到,服務(wù)端的實現(xiàn)其實很簡單妆毕,我們不停地讀取輸入數(shù)據(jù)宛蚓,然后寫回給客戶端。

下面我們看看客戶端设塔。

public class EchoClient {
    // ...

    public void run() throws IOException {
        Thread readerThread = new Thread(this::readResponse);
        readerThread.start();

        OutputStream out = mSocket.getOutputStream();
        byte[] buffer = new byte[1024];
        int n;
        while ((n = System.in.read(buffer)) > 0) {
            out.write(buffer, 0, n);
        }
    }

    private void readResponse() {
        try {
            InputStream in = mSocket.getInputStream();
            byte[] buffer = new byte[1024];
            int n;
            while ((n = in.read(buffer)) > 0) {
                System.out.write(buffer, 0, n);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端會稍微復(fù)雜一點點凄吏,在讀取用戶輸入的同時,我們又想讀取服務(wù)器的響應(yīng)闰蛔。所以痕钢,這里創(chuàng)建了一個線程來讀服務(wù)器的響應(yīng)。

不熟悉 lambda 的讀者序六,可以把Thread readerThread = new Thread(this::readResponse) 換成下面這個代碼:

Thread readerThread = new Thread(new Runnable() {
    @Override
    public void run() {
        readResponse();
    }
});

打開兩個 terminal 分別執(zhí)行如下命令:

$ javac EchoServer.java
$ java EchoServer
$ javac EchoClient.java
$ java EchoClient
hello Server
hello Server
foo
foo

在客戶端任连,我們會看到,輸入的所有字符都打印了出來例诀。

最后需要注意的有幾點:

  1. 在上面的代碼中随抠,我們所有的異常都沒有處理。實際應(yīng)用中繁涂,在發(fā)生異常時拱她,需要關(guān)閉 socket,并根據(jù)實際業(yè)務(wù)做一些錯誤處理工作
  2. 在客戶端扔罪,我們沒有停止 readThread秉沼。實際應(yīng)用中,我們可以通過關(guān)閉 socket 來讓線程從阻塞讀中返回矿酵。推薦讀者閱讀《Java并發(fā)編程實戰(zhàn)》
  3. 我們的服務(wù)端只處理了一個客戶連接唬复。如果需要同時處理多個客戶端,可以創(chuàng)建線程來處理請求全肮。這個作為練習(xí)留給讀者來完全敞咧。

Socket vs ServerSocket

在進(jìn)入這一節(jié)的主題前,讀者不妨先考慮一個問題:在上一節(jié)的實例中辜腺,我們運(yùn)行 echo 服務(wù)后休建,在客戶端連接成功時乍恐,一共有多少個 socket 存在?

答案是 3 個 socket丰包〗客戶端一個,服務(wù)端有兩個邑彪。跟這個問題的答案直接關(guān)聯(lián)的是本節(jié)的主題——SocketServerSocket 的區(qū)別是什么瞧毙。

眼尖的讀者,可能會注意到在上一節(jié)我是這樣描述他們的:

在 Java 的 SDK 中寄症,socket 的共有兩個接口:用于監(jiān)聽客戶連接的 ServerSocket 和用于通信的 Socket宙彪。

注意,我只說 ServerSocket 是用于監(jiān)聽客戶連接有巧,而沒有說它也可以用來通信释漆。下面我們來詳細(xì)了解一下他們的區(qū)別。

注:以下描述使用的是 UNIX/Linux 系統(tǒng)的 API

首先篮迎,我們創(chuàng)建 ServerSocket 后男图,內(nèi)核會創(chuàng)建一個 socket。這個 socket 既可以拿來監(jiān)聽客戶連接甜橱,也可以連接遠(yuǎn)端的服務(wù)逊笆。由于 ServerSocket 是用來監(jiān)聽客戶連接的,緊接著它就會對內(nèi)核創(chuàng)建的這個 socket 調(diào)用 listen 函數(shù)岂傲。這樣一來难裆,這個 socket 就成了所謂的 listening socket,它開始監(jiān)聽客戶的連接镊掖。

接下來乃戈,我們的客戶端創(chuàng)建一個 Socket,同樣的亩进,內(nèi)核也創(chuàng)建一個 socket 實例症虑。內(nèi)核創(chuàng)建的這個 socket 跟 ServerSocket 一開始創(chuàng)建的那個沒有什么區(qū)別。不同的是镐侯,接下來 Socket 會對它執(zhí)行 connect侦讨,發(fā)起對服務(wù)端的連接。前面我們說過苟翻,socket API 其實是 TCP 層的封裝,所以 connect 后骗污,內(nèi)核會發(fā)送一個 SYN 給服務(wù)端崇猫。

現(xiàn)在,我們切換角色到服務(wù)端需忿。服務(wù)端的主機(jī)在收到這個 SYN 后诅炉,會創(chuàng)建一個新的 socket蜡歹,這個新創(chuàng)建的 socket 跟客戶端繼續(xù)執(zhí)行三次握手過程。

三次握手完成后涕烧,我們執(zhí)行的 serverSocket.accept() 會返回一個 Socket 實例月而,這個 socket 就是上一步內(nèi)核自動幫我們創(chuàng)建的。

所以說议纯,在一個客戶端連接的情況下父款,其實有 3 個 socket。

關(guān)于內(nèi)核自動創(chuàng)建的這個 socket瞻凤,還有一個很有意思的地方憨攒。它的端口號跟 ServerSocket 是一毛一樣的。咦7Р巍肝集!不是說,一個端口只能綁定一個 socket 嗎蛛壳?其實這個說法并不夠準(zhǔn)確杏瞻。

前面我說的TCP 通過端口號來區(qū)分?jǐn)?shù)據(jù)屬于哪個進(jìn)程的說法,在 socket 的實現(xiàn)里需要改一改衙荐。Socket 并不僅僅使用端口號來區(qū)別不同的 socket 實例捞挥,而是使用 <peer addr:peer port, local addr:local port> 這個四元組。

在上面的例子中赫模,我們的 ServerSocket 長這樣:<*:*, *:9877>树肃。意思是,可以接受任何的客戶端瀑罗,和本地任何 IP胸嘴。

accept 返回的 Socket 則是這樣:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中斩祭,xxxx 是客戶端的端口號劣像。

如果數(shù)據(jù)是發(fā)送給一個已連接的 socket,內(nèi)核會找到一個完全匹配的實例摧玫,所以數(shù)據(jù)準(zhǔn)確發(fā)送給了對端耳奕。

如果是客戶端要發(fā)起連接,這時候只有 <*:*, *:9877> 會匹配成功诬像,所以 SYN 也準(zhǔn)確發(fā)送給了監(jiān)聽套接字屋群。

Socket/ServerSocket 的區(qū)別我們就講到這里。如果讀者覺得不過癮坏挠,可以參考《TCP/IP 詳解》卷1芍躏、卷2。

Socket 長連接的實現(xiàn)

背景知識

Socket 長連接降狠,指的是在客戶和服務(wù)端之間保持一個 socket 連接長時間不斷開对竣。

比較熟悉 Socket 的讀者庇楞,可能知道有這樣一個 API:

socket.setKeepAlive(true);

嗯……keep alive,“保持活著”否纬,這個應(yīng)該就是讓 TCP 不斷開的意思吕晌。那么,我們要實現(xiàn)一個 socket 的長連接临燃,只需要這一個調(diào)用即可睛驳。

遺憾的是,生活并不總是那么美好谬俄。對于 4.4BSD 的實現(xiàn)來說柏靶,Socket 的這個 keep alive 選項如果打開并且兩個小時內(nèi)沒有通信,那么底層會發(fā)一個心跳溃论,看看對方是不是還活著屎蜓。

注意,兩個小時才會發(fā)一次钥勋。也就是說炬转,在沒有實際數(shù)據(jù)通信的時候,我把網(wǎng)線拔了算灸,你的應(yīng)用程序要經(jīng)過兩個小時才會知道扼劈。

在說明如果實現(xiàn)長連接前,我們先來理一理我們面臨的問題菲驴。假定現(xiàn)在有一對已經(jīng)連接的 socket荐吵,在以下情況發(fā)生時候,socket 將不再可用:

  1. 某一端關(guān)閉 socket(這不是廢話嗎)赊瞬。主動關(guān)閉的一方會發(fā)送 FIN先煎,通知對方要關(guān)閉 TCP 連接。在這種情況下巧涧,另一端如果去讀 socket薯蝎,將會讀到 EoF(End of File)。于是我們知道對方關(guān)閉了 socket谤绳。
  2. 應(yīng)用程序奔潰占锯。此時 socket 會由內(nèi)核關(guān)閉,結(jié)果跟情況1一樣缩筛。
  3. 系統(tǒng)奔潰消略。這時候系統(tǒng)是來不及發(fā)送 FIN 的,因為它已經(jīng)跪了瞎抛。此時對方無法得知這一情況疑俭。對方在嘗試讀取數(shù)據(jù)時,最后會返回 read time out婿失。如果寫數(shù)據(jù)钞艇,則是 host unreachable 之類的錯誤。
  4. 電纜被挖斷豪硅、網(wǎng)線被拔哩照。跟情況3差不多,如果沒有對 socket 進(jìn)行讀寫懒浮,兩邊都不知道發(fā)生了事故飘弧。跟情況3不同的是,如果我們把網(wǎng)線接回去砚著,socket 依舊可以正常使用次伶。

在上面的幾種情形中,有一個共同點就是稽穆,只要去讀冠王、寫 socket,只要 socket 連接不正常舌镶,我們就能夠知道柱彻。基于這一點餐胀,要實現(xiàn)一個 socket 長連接哟楷,我們需要做的就是不斷地給對方寫數(shù)據(jù),然后讀取對方的數(shù)據(jù)否灾,也就是所謂的心跳卖擅。只要心還在跳,socket 就是活的墨技。寫數(shù)據(jù)的間隔惩阶,需要根據(jù)實際的應(yīng)用需求來決定。

心跳包不是實際的業(yè)務(wù)數(shù)據(jù)健提,根據(jù)通信協(xié)議的不同琳猫,需要做不同的處理。

比方說私痹,我們使用 JSON 進(jìn)行通信脐嫂,那么,我們可以加一個 type 字段紊遵,表面這個 JSON 是心跳還是業(yè)務(wù)數(shù)據(jù)账千。

{
    "type": 0,  // 0 表示心跳

    // ...
}

使用二進(jìn)制協(xié)議的情況類似。要求就是暗膜,我們能夠區(qū)別一個數(shù)據(jù)包是心跳還是真實數(shù)據(jù)匀奏。這樣,我們便實現(xiàn)了一個 socket 長連接学搜。

實現(xiàn)示例

這一小節(jié)我們一起來實現(xiàn)一個帶長連接的 Android echo 客戶端娃善。完整的代碼可以在這里[3]找到论衍。

首先了接口部分:

public final class LongLiveSocket {

    /**
     * 錯誤回調(diào)
     */
    public interface ErrorCallback {
        /**
         * 如果需要重連,返回 true
         */
        boolean onError();
    }


    /**
     * 讀數(shù)據(jù)回調(diào)
     */
    public interface DataCallback {
        void onData(byte[] data, int offset, int len);
    }


    /**
     * 寫數(shù)據(jù)回調(diào)
     */
    public interface WritingCallback {
        void onSuccess();
        void onFail(byte[] data, int offset, int len);
    }


    public LongLiveSocket(String host, int port,
                          DataCallback dataCallback, ErrorCallback errorCallback) {
    }

    public void write(byte[] data, WritingCallback callback) {
    }

    public void write(byte[] data, int offset, int len, WritingCallback callback) {
    }

    public void close() {
    }
}

我們這個支持長連接的類就叫 LongLiveSocket 好了聚磺。如果在 socket 斷開后需要重連坯台,只需要在對應(yīng)的接口里面返回 true 即可(在真實場景里,我們還需要讓客戶設(shè)置重連的等待時間瘫寝,還有讀寫蜒蕾、連接的 timeout等。為了簡單焕阿,這里就直接不支持了咪啡。

另外需要注意的一點是,如果要做一個完整的庫暮屡,需要同時提供阻塞式和回調(diào)式API撤摸。同樣由于篇幅原因,這里直接省掉了栽惶。

下面我們直接看實現(xiàn):

public final class LongLiveSocket {
    private static final String TAG = "LongLiveSocket";

    private static final long RETRY_INTERVAL_MILLIS = 3 * 1000;
    private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000;
    private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000;

    /**
     * 錯誤回調(diào)
     */
    public interface ErrorCallback {
        /**
         * 如果需要重連愁溜,返回 true
         */
        boolean onError();
    }


    /**
     * 讀數(shù)據(jù)回調(diào)
     */
    public interface DataCallback {
        void onData(byte[] data, int offset, int len);
    }


    /**
     * 寫數(shù)據(jù)回調(diào)
     */
    public interface WritingCallback {
        void onSuccess();
        void onFail(byte[] data, int offset, int len);
    }


    private final String mHost;
    private final int mPort;
    private final DataCallback mDataCallback;
    private final ErrorCallback mErrorCallback;

    private final HandlerThread mWriterThread;
    private final Handler mWriterHandler;
    private final Handler mUIHandler = new Handler(Looper.getMainLooper());

    private final Object mLock = new Object();
    private Socket mSocket;  // guarded by mLock
    private boolean mClosed; // guarded by mLock

    private final Runnable mHeartBeatTask = new Runnable() {
        private byte[] mHeartBeat = new byte[0];

        @Override
        public void run() {
            // 我們使用長度為 0 的數(shù)據(jù)作為 heart beat
            write(mHeartBeat, new WritingCallback() {
                @Override
                public void onSuccess() {
                    // 每隔 HEART_BEAT_INTERVAL_MILLIS 發(fā)送一次
                    mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
                    mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
                }

                @Override
                public void onFail(byte[] data, int offset, int len) {
                    // nop
                    // write() 方法會處理失敗
                }
            });
        }
    };

    private final Runnable mHeartBeatTimeoutTask = () -> {
        Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
        closeSocket();
    };


    public LongLiveSocket(String host, int port,
                          DataCallback dataCallback, ErrorCallback errorCallback) {
        mHost = host;
        mPort = port;
        mDataCallback = dataCallback;
        mErrorCallback = errorCallback;

        mWriterThread = new HandlerThread("socket-writer");
        mWriterThread.start();
        mWriterHandler = new Handler(mWriterThread.getLooper());
        mWriterHandler.post(this::initSocket);
    }

    private void initSocket() {
        while (true) {
            if (closed()) return;

            try {
                Socket socket = new Socket(mHost, mPort);
                synchronized (mLock) {
                    // 在我們創(chuàng)建 socket 的時候,客戶可能就調(diào)用了 close()
                    if (mClosed) {
                        silentlyClose(socket);
                        return;
                    }
                    mSocket = socket;
                    // 每次創(chuàng)建新的 socket外厂,會開一個線程來讀數(shù)據(jù)
                    Thread reader = new Thread(new ReaderTask(socket), "socket-reader");
                    reader.start();
                    mWriterHandler.post(mHeartBeatTask);
                }
                break;
            } catch (IOException e) {
                Log.e(TAG, "initSocket: ", e);
                if (closed() || !mErrorCallback.onError()) {
                    break;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS);
                } catch (InterruptedException e1) {
                    // interrupt writer-thread to quit
                    break;
                }
            }
        }
    }

    public void write(byte[] data, WritingCallback callback) {
        write(data, 0, data.length, callback);
    }

    public void write(byte[] data, int offset, int len, WritingCallback callback) {
        mWriterHandler.post(() -> {
            Socket socket = getSocket();
            if (socket == null) {
                // initSocket 失敗而客戶說不需要重連冕象,但客戶又叫我們給他發(fā)送數(shù)據(jù)
                throw new IllegalStateException("Socket not initialized");
            }
            try {
                OutputStream outputStream = socket.getOutputStream();
                DataOutputStream out = new DataOutputStream(outputStream);
                out.writeInt(len);
                out.write(data, offset, len);
                callback.onSuccess();
            } catch (IOException e) {
                Log.e(TAG, "write: ", e);
                closeSocket();
                callback.onFail(data, offset, len);
                if (!closed() && mErrorCallback.onError()) {
                    initSocket();
                }
            }
        });
    }

    private boolean closed() {
        synchronized (mLock) {
            return mClosed;
        }
    }

    private Socket getSocket() {
        synchronized (mLock) {
            return mSocket;
        }
    }

    private void closeSocket() {
        synchronized (mLock) {
            closeSocketLocked();
        }
    }

    private void closeSocketLocked() {
        if (mSocket == null) return;

        silentlyClose(mSocket);
        mSocket = null;
        mWriterHandler.removeCallbacks(mHeartBeatTask);
    }

    public void close() {
        if (Looper.getMainLooper() == Looper.myLooper()) {
            new Thread() {
                @Override
                public void run() {
                    doClose();
                }
            }.start();
        } else {
            doClose();
        }
    }

    private void doClose() {
        synchronized (mLock) {
            mClosed = true;
            // 關(guān)閉 socket,從而使得阻塞在 socket 上的線程返回
            closeSocketLocked();
        }
        mWriterThread.quit();
        // 在重連的時候汁蝶,有個 sleep
        mWriterThread.interrupt();
    }


    private static void silentlyClose(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (IOException e) {
                Log.e(TAG, "silentlyClose: ", e);
                // error ignored
            }
        }
    }


    private class ReaderTask implements Runnable {

        private final Socket mSocket;

        public ReaderTask(Socket socket) {
            mSocket = socket;
        }

        @Override
        public void run() {
            try {
                readResponse();
            } catch (IOException e) {
                Log.e(TAG, "ReaderTask#run: ", e);
            }
        }

        private void readResponse() throws IOException {
            // For simplicity, assume that a msg will not exceed 1024-byte
            byte[] buffer = new byte[1024];
            InputStream inputStream = mSocket.getInputStream();
            DataInputStream in = new DataInputStream(inputStream);
            while (true) {
                int nbyte = in.readInt();
                if (nbyte == 0) {
                    Log.i(TAG, "readResponse: heart beat received");
                    mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
                    continue;
                }

                if (nbyte > buffer.length) {
                    throw new IllegalStateException("Receive message with len " + nbyte +
                                    " which exceeds limit " + buffer.length);
                }

                if (readn(in, buffer, nbyte) != 0) {
                    // Socket might be closed twice but it does no harm
                    silentlyClose(mSocket);
                    // Socket will be re-connected by writer-thread if you want
                    break;
                }
                mDataCallback.onData(buffer, 0, nbyte);
            }
        }

        private int readn(InputStream in, byte[] buffer, int n) throws IOException {
            int offset = 0;
            while (n > 0) {
                int readBytes = in.read(buffer, offset, n);
                if (readBytes < 0) {
                    // EoF
                    break;
                }
                n -= readBytes;
                offset += readBytes;
            }
            return n;
        }
    }
}

下面是我們新實現(xiàn)的 EchoClient

public class EchoClient {
    private static final String TAG = "EchoClient";

    private final LongLiveSocket mLongLiveSocket;

    public EchoClient(String host, int port) {
        mLongLiveSocket = new LongLiveSocket(
                host, port,
                (data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)),
                // 返回 true渐扮,所以只要出錯,就會一直重連
                () -> true);
    }

    public void send(String msg) {
        mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() {
            @Override
            public void onSuccess() {
                Log.d(TAG, "onSuccess: ");
            }

            @Override
            public void onFail(byte[] data, int offset, int len) {
                Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len));
                // 連接成功后掖棉,還會發(fā)送這個消息
                mLongLiveSocket.write(data, offset, len, this);
            }
        });
    }
}

就這樣墓律,一個帶 socket 長連接的客戶端就完成了。剩余代碼跟我們這里的主題沒有太大關(guān)系幔亥,感興趣的讀者可以看這里[3]或者自己完成這個例子耻讽。下面是一些輸出示例:

03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess: 
03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello
03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess: 
03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo
03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

最后需要說明的是,如果想節(jié)省資源帕棉,在有客戶發(fā)送數(shù)據(jù)的時候可以省略 heart beat针肥。

我們對讀出錯時候的處理,可能也存在一些爭議香伴。讀出錯后慰枕,我們只是關(guān)閉了 socket。socket 需要等到下一次寫動作發(fā)生時即纲,才會重新連接具帮。實際應(yīng)用中,如果這是一個問題,在讀出錯后可以直接開始重連蜂厅。這種情況下匪凡,還需要一些額外的同步,避免重復(fù)創(chuàng)建 socket葛峻。heart beat timeout 的情況類似锹雏。

跟 TCP/IP 學(xué)協(xié)議設(shè)計

如果僅僅是為了使用是 socket,我們大可以不去理會協(xié)議的細(xì)節(jié)术奖。之所以推薦大家去看一看《TCP/IP 詳解》,是因為它們有太多值得學(xué)習(xí)的地方轻绞。很多我們工作中遇到的問題采记,都可以在這里找到答案。

以下每一個小節(jié)的標(biāo)題都是一個小問題政勃,建議讀者獨立思考一下唧龄,再繼續(xù)往下看。如果你發(fā)現(xiàn)你的答案比我的更好奸远,請一定發(fā)送郵件到 ljtong64 AT gmail DOT com 告訴我既棺。

協(xié)議版本如何升級?

有這么一句流行的話:這個世界唯一不變的懒叛,就是變化丸冕。當(dāng)我們對協(xié)議版本進(jìn)行升級的時候,正確識別不同版本的協(xié)議對軟件的兼容非常重要薛窥。那么胖烛,我們?nèi)绾卧O(shè)計協(xié)議,才能夠為將來的版本升級做準(zhǔn)備呢诅迷?

答案可以在 IP 協(xié)議找到佩番。

IP 協(xié)議的第一個字段叫 version,目前使用的是 4 或 6罢杉,分別表示 IPv4 和 IPv6趟畏。由于這個字段在協(xié)議的開頭,接收端收到數(shù)據(jù)后滩租,只要根據(jù)第一個字段的值就能夠判斷這個數(shù)據(jù)包是 IPv4 還是 IPv6赋秀。

再強(qiáng)調(diào)一下,這個字段在兩個版本的IP協(xié)議都位于第一個字段持际,為了做兼容處理沃琅,對應(yīng)的這個字段必須位于同一位置。文本協(xié)議(如蜘欲,JSON益眉、HTML)的情況類似。

如何發(fā)送不定長數(shù)據(jù)的數(shù)據(jù)包

舉個例子,我們用微信發(fā)送一條消息郭脂。這條消息的長度是不確定的年碘,并且每條消息都有它的邊界。我們?nèi)绾蝸硖幚磉@個邊界呢展鸡?

還是一樣屿衅,看看 IP。IP 的頭部有個 header length 和 data length 兩個字段莹弊。通過添加一個 len 域涤久,我們就能夠把數(shù)據(jù)根據(jù)應(yīng)用邏輯分開。

跟這個相對的忍弛,還有另一個方案响迂,那就是在數(shù)據(jù)的末尾放置終止符。比方說细疚,想 C 語言的字符串那樣蔗彤,我們在每個數(shù)據(jù)的末尾放一個 \0 作為終止符,用以標(biāo)識一條消息的尾部疯兼。這個方法帶來的問題是然遏,用戶的數(shù)據(jù)也可能存在 \0。此時吧彪,我們就需要對用戶的數(shù)據(jù)進(jìn)行轉(zhuǎn)義待侵。比方說,把用戶數(shù)據(jù)的所有 \0 都變成 \0\0来氧。讀消息的過程總诫给,如果遇到 \0\0,那它就代表 \0啦扬,如果只有一個 \0中狂,那就是消息尾部。

使用 len 字段的好處是扑毡,我們不需要對數(shù)據(jù)進(jìn)行轉(zhuǎn)義胃榕。讀取數(shù)據(jù)的時候,只要根據(jù) len 字段瞄摊,一次性把數(shù)據(jù)都讀進(jìn)來就好勋又,效率會更高一些。

終止符的方案雖然要求我們對數(shù)據(jù)進(jìn)行掃描换帜,但是如果我們可能從任意地方開始讀取數(shù)據(jù)楔壤,就需要這個終止符來確定哪里才是消息的開頭了。

當(dāng)然惯驼,這兩個方法不是互斥的蹲嚣,可以一起使用递瑰。

上傳多個文件,只有所有文件都上傳成功時才算成功

現(xiàn)在我們有一個需求隙畜,需要一次上傳多個文件到服務(wù)器抖部,只有在所有文件都上傳成功的情況下,才算成功议惰。我們該如何來實現(xiàn)呢慎颗?

IP 在數(shù)據(jù)報過大的時候,會把一個數(shù)據(jù)報拆分成多個言询,并設(shè)置一個 MF (more fragments)位俯萎,表示這個包只是被拆分后的數(shù)據(jù)的一部分。

好倍试,我們也學(xué)一學(xué) IP讯屈。這里,我們可以給每個文件從 0 開始編號县习。上傳文件的同時,也攜帶這個編號谆趾,并額外附帶一個 MF 標(biāo)志躁愿。除了編號最大的文件,所有文件的 MF 標(biāo)志都置位沪蓬。因為 MF 沒有置位的是最后一個文件彤钟,服務(wù)器就可以根據(jù)這個得出總共有多少個文件。

另一種不使用 MF 標(biāo)志的方法是跷叉,我們在上傳文件前逸雹,就告訴服務(wù)器總共有多少個文件。

如果讀者對數(shù)據(jù)庫比較熟悉云挟,學(xué)數(shù)據(jù)庫用事務(wù)來處理梆砸,也是可以的拿霉。這里就不展開討論了扰楼。

如何保證數(shù)據(jù)的有序性

這里講一個我曾經(jīng)遇到過的面試題〔ㄗ現(xiàn)在有一個任務(wù)隊列轻抱,多個工作線程從中取出任務(wù)并執(zhí)行椰憋,執(zhí)行結(jié)果放到一個結(jié)果隊列中唤殴。先要求档痪,放入結(jié)果隊列的時候枢里,順序順序需要跟從工作隊列取出時的一樣(也就是說绑榴,先取出的任務(wù)哪轿,執(zhí)行結(jié)果需要先放入結(jié)果隊列)。

我們看看 TCP/IP 是怎么處理的翔怎。IP 在發(fā)送數(shù)據(jù)的時候窃诉,不同數(shù)據(jù)報到達(dá)對端的時間是不確定的杨耙,后面發(fā)送的數(shù)據(jù)有可能較先到達(dá)。TCP 為了解決這個問題褐奴,給所發(fā)送數(shù)據(jù)的每個字節(jié)都賦了一個序列號,通過這個序列號敦冬,TCP 就能夠把數(shù)據(jù)按原順序重新組裝。

一樣脖旱,我們也給每個任務(wù)賦一個值,根據(jù)進(jìn)入工作隊列的順序依次遞增萌庆。工作線程完成任務(wù)后溶褪,在將結(jié)果放入結(jié)果隊列前,先檢查要放入對象的寫一個序列號是不是跟自己的任務(wù)相同猿妈,如果不同巍虫,這個結(jié)果就不能放進(jìn)去彭则。此時,最簡單的做法是等待占遥,知道下一個可以放入隊列的結(jié)果是自己所執(zhí)行的那一個俯抖。但是,這個線程就沒辦法繼續(xù)處理任務(wù)了芬萍。

更好的方法是,我們維護(hù)多一個結(jié)果隊列的緩沖柬祠,這個緩沖里面的數(shù)據(jù)按序列號從小到大排序坯癣。工作線程要將結(jié)果放入,有兩種可能:

  1. 剛剛完成的任務(wù)剛好是下一個示罗,將這個結(jié)果放入隊列。然后從緩沖的頭部開始蚜点,將所有可以放入結(jié)果隊列的數(shù)據(jù)都放進(jìn)去。
  2. 所完成的任務(wù)不能放入結(jié)果隊列奶镶,這個時候就插入結(jié)果隊列。然后纤壁,跟上一種情況一樣捺信,需要檢查緩沖。

如果測試表明迄靠,這個結(jié)果緩沖的數(shù)據(jù)不多,那么使用普通的鏈表就可以雨席。如果數(shù)據(jù)比較多吠式,可以使用一個最小堆。

如何保證對方收到了消息

我們說特占,TCP 提供了可靠的傳輸。這樣不就能夠保證對方收到消息了嗎?

很遺憾查辩,其實不能。在我們往 socket 寫入的數(shù)據(jù)长踊,只要對端的內(nèi)核收到后萍倡,就會返回 ACK,此時阱佛,socket 就認(rèn)為數(shù)據(jù)已經(jīng)寫入成功戴而。然而要注意的是,這里只是對方所運(yùn)行的系統(tǒng)的內(nèi)核成功收到了數(shù)據(jù)所意,并不表示應(yīng)用程序已經(jīng)成功處理了數(shù)據(jù)催首。

解決辦法還是一樣郎任,我們學(xué) TCP备籽,添加一個應(yīng)用層的 APP ACK。應(yīng)用接收到消息并處理成功后胶台,發(fā)送一個 APP ACK 給對方。

有了 APP ACK韩脏,我們需要處理的另一個問題是铸磅,如果對方真的沒有收到,需要怎么做吹散?

TCP 發(fā)送數(shù)據(jù)的時候八酒,消息一樣可能丟失。TCP 發(fā)送數(shù)據(jù)后羞迷,如果長時間沒有收到對方的 ACK,就假設(shè)數(shù)據(jù)已經(jīng)丟失浊猾,并重新發(fā)送热鞍。

我們也一樣,如果長時間沒有收到 APP ACK偷办,就假設(shè)數(shù)據(jù)丟失昼接,重新發(fā)送一個。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逐工,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子泪喊,更是在濱河造成了極大的恐慌袒啼,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件滑肉,死亡現(xiàn)場離奇詭異摘仅,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)娃属,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門矾端,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人秩铆,你說我怎么就攤上這事殴玛。” “怎么了族阅?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵坦刀,是天一觀的道長蔬咬。 經(jīng)常有香客問我,道長林艘,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任钢坦,我火速辦了婚禮,結(jié)果婚禮上厨诸,老公的妹妹穿的比我還像新娘禾酱。我一直安慰自己,他們只是感情好颗管,可當(dāng)我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布滓走。 她就那樣靜靜地躺著,像睡著了一般疫粥。 火紅的嫁衣襯著肌膚如雪腰懂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天慷彤,我揣著相機(jī)與錄音怖喻,去河邊找鬼。 笑死锚沸,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的前标。 我是一名探鬼主播距潘,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼音比,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起焰望,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤缭付,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后秫舌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绣檬,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年墨缘,在試婚紗的時候發(fā)現(xiàn)自己被綠了零抬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片平夜。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖忽妒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吃溅,我是刑警寧澤鸯檬,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布喧务,位于F島的核電站,受9級特大地震影響蹂楣,放射性物質(zhì)發(fā)生泄漏讯蒲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一赁酝、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧衡载,春花似錦隙袁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至箱舞,卻和暖如春晴股,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背队魏。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留官帘,地道東北人昧谊。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓呢诬,卻偏偏與公主長得像,于是被迫代替她去往敵國和親尚镰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容