雨露均沾的OkHttp——WebSocket長連接(源碼篇)

前言

雨露均沾的OkHttp—WebSocket長連接(使用篇)
雨露均沾的OkHttp—WebSocket長連接(源碼篇)

上期我們熟悉了OkHttp中實現(xiàn)WebSocket長連接的接入斩松,并且可以通過OkHttp官方的MockWebSocket服務來模擬服務端皆尔,實現(xiàn)整個流程迁央。

今天我們就來說下具體OkHttp中是怎么實現(xiàn)這些功能的呢?相信看過這篇文章你也能深刻了解WebSocket這個協(xié)議纠屋。

使用回顧

簡單貼下WebSocket使用方法,方便下面解析:

       //初始化
        mClient = new OkHttpClient.Builder()
                .pingInterval(10, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(mWbSocketUrl)
                .build();
        mWebSocket = mClient.newWebSocket(request, new WsListener());
        
        //收到消息回調(diào)
        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            Log.e(TAG,"收到消息盾计!");
            onWSDataChanged(DATE_NORMAL, text);
        }        
        
        //發(fā)送消息
        mWebSocket.send(message);
        
        //主動關(guān)閉連接
        mWebSocket.close(code, reason);

源碼解析

WebSocket整個流程無非三個功能:連接售担,接收消息赁遗,發(fā)送消息。下面我們就從這三個方面分析下具體是怎么實現(xiàn)的族铆。

連接

通過上面的代碼我們得知岩四,WebSocket連接是通過newWebSocket方法。直接點進去看這個方法:

  override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
    val webSocket = RealWebSocket(
        taskRunner = TaskRunner.INSTANCE,
        originalRequest = request,
        listener = listener,
        random = Random(),
        pingIntervalMillis = pingIntervalMillis.toLong(),
        extensions = null, // Always null for clients.
        minimumDeflateSize = minWebSocketMessageToCompress
    )
    webSocket.connect(this)
    return webSocket
  }

這里做了兩件事:

  • 初始化RealWebSocket骑素,主要是設置了一些參數(shù)(比如pingIntervalMillis心跳包時間間隔炫乓,還有監(jiān)聽事件之類的)
  • connect方法進行WebSocket連接

繼續(xù)查看connect方法:

connect(WebSocket連接握手)
  fun connect(client: OkHttpClient) {
    //***
    val webSocketClient = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build()
    val request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Extensions", "permessage-deflate")
        .build()
    call = RealCall(webSocketClient, request, forWebSocket = true)
    call!!.enqueue(object : Callback {
      override fun onResponse(call: Call, response: Response) {
        
        //得到數(shù)據(jù)流
        val streams: Streams
        try {
          checkUpgradeSuccess(response, exchange)
          streams = exchange!!.newWebSocketStreams()
        } 
        
        //***
        // Process all web socket messages.
        try {
          val name = "$okHttpName WebSocket ${request.url.redact()}"
          initReaderAndWriter(name, streams)
          listener.onOpen(this@RealWebSocket, response)
          loopReader()
        } catch (e: Exception) {
          failWebSocket(e, null)
        }
      }
    })
  }

上一篇使用篇文章中說過,Websocket連接需要一次Http協(xié)議的握手献丑,然后才能把協(xié)議升級成WebSocket末捣。所以這段代碼就體現(xiàn)出這個功能了。

首先就new了一個用來進行Http連接的request创橄,其中Header的參數(shù)就表示我要進行WebSocket連接了箩做,參數(shù)解析如下:

  • Connection:Upgrade,表示客戶端要連接升級
  • Upgrade:websocket妥畏, 表示客戶端要升級建立Websocket連接
  • Sec-Websocket-Key:key邦邦, 這個key是隨機生成的,服務器會通過這個參數(shù)驗證該請求是否有效
  • Sec-WebSocket-Version:13醉蚁, websocket使用的版本燃辖,一般就是13
  • Sec-webSocket-Extension:permessage-deflate,客戶端指定的一些擴展協(xié)議网棍,比如這里permessage-deflate就是WebSocket的一種壓縮協(xié)議黔龟。

Header設置好之后,就調(diào)用了callenqueue方法滥玷,這個方法大家應該都很熟悉吧氏身,OkHttp里面對于Http請求的異步請求就是這個方法。
至此惑畴,握手結(jié)束蛋欣,服務器返回響應碼101,表示協(xié)議升級如贷。

然后我們繼續(xù)看看獲取服務器響應之后又做了什么陷虎?
在發(fā)送Http請求成功之后,onResponse響應方法里面主要表現(xiàn)為四個處理邏輯:

  • Http流轉(zhuǎn)換成WebSocket流杠袱,得到Streams對象泻红,這個流后面會轉(zhuǎn)化成輸入流和輸出流,也就是進行發(fā)送和讀取的操作流
  • listener.onOpen(this@RealWebSocket, response)霞掺,回調(diào)了接口WebSocketListeneronOpen方法谊路,告訴用戶WebSocket已經(jīng)連接
  • initReaderAndWriter(name, streams)
  • loopReader()

前兩個邏輯還是比較好理解,主要是后兩個方法菩彬,我們分別解析下缠劝。
首先看initReaderAndWriter方法潮梯。

initReaderAndWriter(初始化輸入流輸出流)
  //RealWebSocket.kt

  @Throws(IOException::class)
  fun initReaderAndWriter(name: String, streams: Streams) {
    val extensions = this.extensions!!
    synchronized(this) {
      //***
      
      //寫數(shù)據(jù),發(fā)送數(shù)據(jù)的工具類
      this.writer = WebSocketWriter()
      
      //設置心跳包事件
      if (pingIntervalMillis != 0L) {
        val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
        taskQueue.schedule("$name ping", pingIntervalNanos) {
          writePingFrame()
          return@schedule pingIntervalNanos
        }
      }
      //***
    }

        //***
        
        //讀取數(shù)據(jù)的工具類
    reader = WebSocketReader(     
      ***
      frameCallback = this,
      ***
    )
  }
  
  internal fun writePingFrame() {
   //***
    try {
      writer.writePing(ByteString.EMPTY)
    } catch (e: IOException) {
      failWebSocket(e, null)
    }
  }  
  

這個方法主要干了兩件事:

  • 實例化輸出流輸入流工具類惨恭,也就是WebSocketWriterWebSocketReader秉馏,用來處理數(shù)據(jù)的收發(fā)。
  • 設置心跳包事件脱羡。如果pingIntervalMillis參數(shù)不為0萝究,就通過計時器,每隔pingIntervalNanos發(fā)送一個ping消息锉罐。其中writePingFrame方法就是發(fā)送了ping幀數(shù)據(jù)帆竹。

接收消息處理消息

loopReader

接著看看這個loopReader方法是干什么的,看這個名字我們大膽猜測下脓规,難道這個方法就是用來循環(huán)讀取數(shù)據(jù)的栽连?去代碼里找找答案:

  fun loopReader() {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader!!.processNextFrame()
    }
  }

代碼很簡單,一個while循環(huán)侨舆,循環(huán)條件是receivedCloseCode == -1的時候秒紧,做的事情是reader!!.processNextFrame()方法。繼續(xù):

  //WebSocketWriter.kt
  fun processNextFrame() {
    //讀取頭部信息
    readHeader()
    if (isControlFrame) {
      //如果是控制幀挨下,讀取控制幀內(nèi)容
      readControlFrame()
    } else {
      //讀取普通消息內(nèi)容
      readMessageFrame()
    }
  }
  
  //讀取頭部信息
  @Throws(IOException::class, ProtocolException::class)
  private fun readHeader() {
    if (closed) throw IOException("closed")
    
    try {
     //讀取數(shù)據(jù)熔恢,獲取數(shù)據(jù)幀的前8位
      b0 = source.readByte() and 0xff
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
    }    
    //***
    //獲取數(shù)據(jù)幀的opcode(數(shù)據(jù)格式)
    opcode = b0 and B0_MASK_OPCODE
    //是否為最終幀
    isFinalFrame = b0 and B0_FLAG_FIN != 0
    //是否為控制幀(指令)
    isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0

    //判斷最終幀,獲取幀長度等等
  }  
  
  
  //讀取控制幀(指令)
    @Throws(IOException::class)
  private fun readControlFrame() {
    if (frameLength > 0L) {
      source.readFully(controlFrameBuffer, frameLength)
    }

    when (opcode) {
      OPCODE_CONTROL_PING -> {
      //ping 幀
        frameCallback.onReadPing(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_PONG -> {
        //pong 幀
        frameCallback.onReadPong(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_CLOSE -> {
        //關(guān)閉 幀
        var code = CLOSE_NO_STATUS_CODE
        var reason = ""
        val bufferSize = controlFrameBuffer.size
        if (bufferSize == 1L) {
          throw ProtocolException("Malformed close payload length of 1.")
        } else if (bufferSize != 0L) {
          code = controlFrameBuffer.readShort().toInt()
          reason = controlFrameBuffer.readUtf8()
          val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
          if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
        }
        //回調(diào)onReadClose方法
        frameCallback.onReadClose(code, reason)
        closed = true
      }
    }
  }
  
  //讀取普通消息
  @Throws(IOException::class)
  private fun readMessageFrame() {
    
    readMessage()

    if (readingCompressedMessage) {
      val messageInflater = this.messageInflater
          ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
      messageInflater.inflate(messageFrameBuffer)
    }

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
    } else {
      frameCallback.onReadMessage(messageFrameBuffer.readByteString())
    }
  }  
  

代碼還是比較直觀臭笆,這個processNextFrame其實就是讀取數(shù)據(jù)用的叙淌,首先讀取頭部信息,獲取數(shù)據(jù)幀的類型耗啦,判斷是否為控制幀凿菩,再分別去讀取控制幀數(shù)據(jù)或者普通消息幀數(shù)據(jù)机杜。

數(shù)據(jù)幀格式

問題來了帜讲,什么是數(shù)據(jù)頭部信息,什么是控制幀椒拗?
這里就要說下WebSocket的數(shù)據(jù)幀了似将,先附上一個數(shù)據(jù)幀格式:


   0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
  |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
  |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126)
  |N|V|V|V|       |  |S|             |
  | |1|2|3|       |  |K|             |
  +-+-+-+-+-------+  +-+-------------+
  |                      Extended payload length(if LENGTH=127)
  +                                  +-------------------------------
  |      Extended payload length     | Masking-key,if Mask set to 1
  +----------------------------------+-------------------------------
  |   Masking-key                    |       Data
  +----------------------------------+-------------------------------
  |                                Data
  +----------------------------------+-------------------------------


我承認蚀苛,我懵逼了在验。
冷靜冷靜,一步一步分析下吧堵未。

首先每一行代表4個字節(jié)腋舌,一共也就是32位數(shù),哦渗蟹,那也就是幾個字節(jié)而已嘛块饺,每個字節(jié)有他自己的代表意義唄赞辩,這樣想是不是就很簡單了,下面來具體看看每個字節(jié)授艰。

第1個字節(jié):

  • 第一位是FIN碼辨嗽,其實就是一個標示位,因為數(shù)據(jù)可能多幀操作嘛淮腾,所以多幀情況下糟需,只有最后一幀的FIN設置成1,標示結(jié)束幀谷朝,前面所有幀設置為0洲押。
  • 第二位到第四位是RSV碼,一般通信兩端沒有設置自定義協(xié)議徘禁,就默認為0诅诱。
  • 后四位是opcode,我們叫它操作碼送朱。這個就是判斷這個數(shù)據(jù)幀的類型了娘荡,一般有以下幾個被定義好的類型:

1) 0x0 表示附加數(shù)據(jù)幀
2) 0x1 表示文本數(shù)據(jù)幀
3) 0x2 表示二進制數(shù)據(jù)幀
4) 0x3-7 保留用于未來的非控制幀
5) 0x8 表示連接關(guān)閉
6) 0x9 表示ping
7) 0xA 表示pong
8) 0xB-F 保留用于未來的非控制幀

是不是發(fā)現(xiàn)了些什么,這不就對應了我們應用中的幾種格式嗎驶沼?2和3對應的是普通消息幀炮沐,包括了文本和二進制數(shù)據(jù)。567對應的就是控制幀格式回怜,包括了close大年,ping,pong玉雾。

第2個字節(jié):

  • 第一位是Mask掩碼翔试,其實就是標識數(shù)據(jù)是否加密混淆,1代表數(shù)據(jù)經(jīng)過掩碼的复旬,0是沒有經(jīng)過掩碼的垦缅,如果是1的話,后續(xù)就會有4個字節(jié)代表掩碼key驹碍,也就是數(shù)據(jù)幀中Masking-key所處的位置壁涎。
  • 后7位是LENGTH,用來標示數(shù)據(jù)長度志秃。因為只有7位怔球,所以最大只能儲存1111111對應的十進制數(shù)127長度的數(shù)據(jù),如果需要更大的數(shù)據(jù)浮还,這個儲存長度肯定就不夠了竟坛。
    所以規(guī)定來了,1) 小于126長度則數(shù)據(jù)用這七位表示實際長度。2) 如果長度設置為126担汤,也就是二進制1111110又官,就代表取額外2個字節(jié)表示數(shù)據(jù)長度,共是16位表示數(shù)據(jù)長度漫试。3) 如果長度設置為127六敬,也就是二進制1111111,就代表取額外8個字節(jié)驾荣,共是64位表示數(shù)據(jù)長度外构。

需要注意的是LENGHT的三種情況在一個數(shù)據(jù)幀里面只會出現(xiàn)一種情況,不共存播掷,所以在圖中是用if表示审编。同樣的,Masking-key也是當Mask為1的時候才存在歧匈。

所以也就有了數(shù)據(jù)幀里面的Extended payload length(LENGTH=126)所處的2個字節(jié)垒酬,以及Extended payload length(LENGTH=127)所處的8個字節(jié)。

最后的字節(jié)部分自然就是掩碼key(Mask為1的時候才存在)和具體的傳輸數(shù)據(jù)了件炉。
還是有點暈吧??勘究,來張圖總結(jié)下:

數(shù)據(jù)幀格式.jpeg

好了,了解了數(shù)據(jù)幀格式后斟冕,我們再來讀源碼就清晰多了口糕。
先看看怎么讀的頭部信息并解析的:

  //取數(shù)據(jù)幀前8位數(shù)據(jù)
  b0 = source.readByte() and 0xff
  //獲取數(shù)據(jù)幀的opcode(數(shù)據(jù)格式)
  opcode = b0 and B0_MASK_OPCODE(15)
  //是否為最終幀
  isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
  //是否為控制幀(指令)
  isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0  
  • 第一句獲取頭信息,and是按位與計算磕蛇,and 0xff意思就是按位與11111111景描,所以頭部信息其實就是取了數(shù)據(jù)幀的前8位數(shù)據(jù),一個字節(jié)秀撇。
  • 第二句獲取opcode超棺,and 15也就是按位與00001111,其實也就是取了后四位數(shù)據(jù)呵燕,剛好對應上opcode的位置棠绘,第一個字節(jié)的后四位。
  • 第三句獲取是否為最終幀虏等,剛才數(shù)據(jù)幀格式中說過弄唧,第一位FIN標識了是否為最后一幀數(shù)據(jù)适肠,1代表結(jié)束幀霍衫,所以這里and 128也就是按位與10000000,也就是取的第一位數(shù)侯养。
  • 第四句獲取是否為控制幀敦跌,and 8也就是按位與00001000,取得是第五位,也就是opcode的第一位柠傍,這是什么意思呢麸俘?我們看看剛才的數(shù)據(jù)幀格式,發(fā)現(xiàn)從0x8開始就是所謂的控制幀了惧笛。0x8對應的二進制是1000从媚,0x7對應的二進制是0111。發(fā)現(xiàn)了吧患整,如果為控制幀的時候拜效,opcode第一位肯定是為1的,所以這里就判斷的第五位各谚。

后面還有讀取第二個字節(jié)的代碼紧憾,大家可以自己沿著這個思路自己看看,包括了讀取MASK昌渤,讀取數(shù)據(jù)長度的三種長度等赴穗。

所以這個processNextFrame方法主要做了三件事:

  • readHeader方法中,判斷了是否為控制幀膀息,是否為結(jié)束幀般眉,然后獲取了Mask標識,幀長度等參數(shù)
  • readControlFrame方法中潜支,主要處理了該幀數(shù)據(jù)為ping煤篙,pong,close三種情況毁腿,并且在收到close關(guān)閉幀的情況下辑奈,回調(diào)了onReadClose方法,這個待會要細看下已烤。
  • readMessageFrame方法中鸠窗,主要是讀取了消息后,回調(diào)了onReadMessage方法胯究。

至此可以發(fā)現(xiàn)稍计,其實WebSocket傳輸數(shù)據(jù)并不是一個簡單的事,只是OkHttp都幫我們封裝好了裕循,我們只需要直接傳輸數(shù)據(jù)即可臣嚣,感謝這些三方庫為我們開發(fā)作出的貢獻,不知道什么時候我也能做出點貢獻呢??剥哑。

對了硅则,剛才說回調(diào)也很重要,接著看看株婴。onReadCloseonReadMessage回調(diào)到哪了呢怎虫?還記得上文初始化WebSocketWriter的時候設置了回調(diào)接口嗎。所以就是回調(diào)給RealWebSocket了:

  //RealWebSocket.kt
  override fun onReadClose(code: Int, reason: String) {
    require(code != -1)

    var toClose: Streams? = null
    var readerToClose: WebSocketReader? = null
    var writerToClose: WebSocketWriter? = null
    synchronized(this) {
      check(receivedCloseCode == -1) { "already closed" }
      receivedCloseCode = code
      receivedCloseReason = reason 
      //...
    }

    try {
      listener.onClosing(this, code, reason)

      if (toClose != null) {
        listener.onClosed(this, code, reason)
      }
    } finally {
      toClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }
  
  @Throws(IOException::class)
  override fun onReadMessage(text: String) {
    listener.onMessage(this, text)
  }

  @Throws(IOException::class)
  override fun onReadMessage(bytes: ByteString) {
    listener.onMessage(this, bytes)
  }  

onReadClose回調(diào)方法里面有個關(guān)鍵的參數(shù),receivedCloseCode大审。還記得這個參數(shù)嗎蘸际?上文中解析消息的循環(huán)條件就是receivedCloseCode == -1,所以當收到關(guān)閉幀的時候徒扶,receivedCloseCode就不再等于-1(規(guī)定大于1000)粮彤,也就不再去讀取解析消息了。這樣整個流程就結(jié)束了姜骡。

其中還有一些WebSocketListener的回調(diào)驾诈,比如onClosing,onClosed溶浴,onMessage等乍迄,就直接回調(diào)給用戶使用了。至此士败,接收消息處理消息說完了闯两。

發(fā)消息

好了。接著說發(fā)送谅将,看看send方法:

  @Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
    // ***
    // Enqueue the message frame.
    queueSize += data.size.toLong()
    messageAndCloseQueue.add(Message(formatOpcode, data))
    runWriter()
    return true
  }

首先漾狼,把要發(fā)送的data封裝成Message對象,然后入隊列messageAndCloseQueue饥臂。最后執(zhí)行runWriter方法逊躁。這都不用猜了,runWriter肯定就要開始發(fā)送消息了隅熙,繼續(xù)看:

  //RealWebSocket.kt
  private fun runWriter() {
    this.assertThreadHoldsLock()

    val writerTask = writerTask
    if (writerTask != null) {
      taskQueue.schedule(writerTask)
    }
  }
  
  private inner class WriterTask : Task("$name writer") {
    override fun runOnce(): Long {
      try {
        if (writeOneFrame()) return 0L
      } catch (e: IOException) {
        failWebSocket(e, null)
      }
      return -1L
    }
  }  
  
  //以下是schedule方法轉(zhuǎn)到WriterTask的runOnce方法過程

  //TaskQueue.kt
  fun schedule(task: Task, delayNanos: Long = 0L) {
    synchronized(taskRunner) {
      if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
        taskRunner.kickCoordinator(this)
      }
    }
  }
  
  internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
    //***
    if (insertAt == -1) insertAt = futureTasks.size
    futureTasks.add(insertAt, task)

    // Impact the coordinator if we inserted at the front.
    return insertAt == 0
  }  

  //TaskRunner.kt
  internal fun kickCoordinator(taskQueue: TaskQueue) {
    this.assertThreadHoldsLock()
    
    if (taskQueue.activeTask == null) {
      if (taskQueue.futureTasks.isNotEmpty()) {
        readyQueues.addIfAbsent(taskQueue)
      } else {
        readyQueues.remove(taskQueue)
      }
    }    
    
    if (coordinatorWaiting) {
      backend.coordinatorNotify(this@TaskRunner)
    } else {
      backend.execute(runnable)
    }
  }  
  
  private val runnable: Runnable = object : Runnable {
    override fun run() {
      while (true) {
        val task = synchronized(this@TaskRunner) {
          awaitTaskToRun()
        } ?: return

        logElapsed(task, task.queue!!) {
          var completedNormally = false
          try {
            runTask(task)
            completedNormally = true
          } finally {
            // If the task is crashing start another thread to service the queues.
            if (!completedNormally) {
              backend.execute(this)
            }
          }
        }
      }
    }
  }
  
  private fun runTask(task: Task) {
    try {
      delayNanos = task.runOnce()
    } 
  }  
  

代碼有點長稽煤,這里是從runWriter開始跟的幾個方法,拿到writerTask實例后囚戚,存到TaskQueuefutureTasks列表里酵熙,然后到runnable這里可以看到是一個while死循環(huán),不斷的從futureTasks中取出Task并執(zhí)行runTask方法驰坊,直到Task為空匾二,循環(huán)停止。

其中涉及到兩個新的類:

  • TaskQueue類主要就是管理消息任務列表拳芙,保證按順序執(zhí)行
  • TaskRunner類主要就是做一些任務的具體操作察藐,比如線程池里執(zhí)行任務,記錄消息任務的狀態(tài)(準備發(fā)送的任務隊列readyQueues舟扎,正在執(zhí)行的任務隊列busyQueues等等)

而每一個Task最后都是執(zhí)行到了WriterTaskrunOnce方法分飞,也就是writeOneFrame方法:

  internal fun writeOneFrame(): Boolean {
    synchronized(this@RealWebSocket) {
      if (failed) {
        return false // Failed web socket.
      }
      writer = this.writer
      pong = pongQueue.poll()
      if (pong == null) {
        messageOrClose = messageAndCloseQueue.poll()
        if (messageOrClose is Close) {
        } else if (messageOrClose == null) {
            return false // The queue is exhausted.
        }
      }
    }

   //發(fā)送消息邏輯,包括`pong`消息浆竭,普通消息浸须,關(guān)閉消息
    try {
      if (pong != null) {
        writer!!.writePong(pong)
      } else if (messageOrClose is Message) {
        val message = messageOrClose as Message
        writer!!.writeMessageFrame(message.formatOpcode, message.data)
        synchronized(this) {
          queueSize -= message.data.size.toLong()
        }
      } else if (messageOrClose is Close) {
        val close = messageOrClose as Close
        writer!!.writeClose(close.code, close.reason)
        // We closed the writer: now both reader and writer are closed.
        if (streamsToClose != null) {
          listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
        }
      } 
      return true
    } finally {
      streamsToClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }

這里就會執(zhí)行發(fā)送消息的邏輯了,主要有三種消息情況處理:

  • pong消息邦泄,這個主要是為服務器端準備的顽素,發(fā)送給客戶端回應心跳包先较。
  • 普通消息,就會把數(shù)據(jù)類型Opcode和具體數(shù)據(jù)發(fā)送過去
  • 關(guān)閉消息,其實當用戶執(zhí)行close方法關(guān)閉WebSocket的時候伸但,也是發(fā)送了一條Close控制幀消息給服務器告知這個關(guān)閉需求,并帶上code狀態(tài)碼reason關(guān)閉原因蹋岩,然后服務器端就會關(guān)閉當前連接增蹭。

好了。最后一步了午乓,就是把這些數(shù)據(jù)組裝成WebSocket數(shù)據(jù)幀并寫入流站宗,分成控制幀數(shù)據(jù)和普通消息數(shù)據(jù)幀


  //寫入(發(fā)送)控制幀
  private fun writeControlFrame(opcode: Int, payload: ByteString) {
    if (writerClosed) throw IOException("closed")
    
    val length = payload.size
    require(length <= PAYLOAD_BYTE_MAX) {
      "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
    }
    val b0 = B0_FLAG_FIN or opcode
    sinkBuffer.writeByte(b0)

    var b1 = length
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
      sinkBuffer.writeByte(b1)
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (length > 0) {
        val payloadStart = sinkBuffer.size
        sinkBuffer.write(payload)
        sinkBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(payloadStart)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    } else {
      sinkBuffer.writeByte(b1)
      sinkBuffer.write(payload)
    }

    sink.flush()
  }


  //寫入(發(fā)送)普通消息數(shù)據(jù)幀
  @Throws(IOException::class)
  fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
    if (writerClosed) throw IOException("closed")

    messageBuffer.write(data)

    var b0 = formatOpcode or B0_FLAG_FIN
    val dataSize = messageBuffer.size
    sinkBuffer.writeByte(b0)

    var b1 = 0
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
    }
    when {
      dataSize <= PAYLOAD_BYTE_MAX -> {
        b1 = b1 or dataSize.toInt()
        sinkBuffer.writeByte(b1)
      }
      dataSize <= PAYLOAD_SHORT_MAX -> {
        b1 = b1 or PAYLOAD_SHORT
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeShort(dataSize.toInt())
      }
      else -> {
        b1 = b1 or PAYLOAD_LONG
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeLong(dataSize)
      }
    }

    if (isClient) {
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (dataSize > 0L) {
        messageBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(0L)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    }

    sinkBuffer.write(messageBuffer, dataSize)
    sink.emit()
  }


大家應該都能看懂了吧,其實就是組裝數(shù)據(jù)幀益愈,包括Opcode梢灭,mask,數(shù)據(jù)長度等等蒸其。兩個方法的不同就在于普通數(shù)據(jù)需要判斷數(shù)據(jù)長度的三種情況敏释,再組裝數(shù)據(jù)幀。最后都會通過sinkBuffer寫入到輸出數(shù)據(jù)流摸袁。

終于钥顽,基本的流程說的差不多了。其中還有很多細節(jié)靠汁,同學們可以自己花時間看看琢磨琢磨蜂大,比如Okio部分。還是那句話蝶怔,希望大家有空自己也讀一讀相關(guān)源碼县爬,這樣理解才能深刻,而且你肯定會發(fā)現(xiàn)很多我沒說到的細節(jié)添谊,歡迎大家討論财喳。我也會繼續(xù)努力,最后大家給我加個油點個贊吧斩狱,感謝感謝耳高。

總結(jié)

再來個圖總結(jié)下吧!??


OkHttp-WebSocket源碼.jpg

參考

OkHttp源碼
《WebSocket協(xié)議翻譯》


你的一個??所踊,就是我分享的動力??泌枪。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市秕岛,隨后出現(xiàn)的幾起案子碌燕,更是在濱河造成了極大的恐慌误证,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件修壕,死亡現(xiàn)場離奇詭異愈捅,居然都是意外死亡,警方通過查閱死者的電腦和手機慈鸠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門蓝谨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人青团,你說我怎么就攤上這事譬巫。” “怎么了督笆?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵芦昔,是天一觀的道長。 經(jīng)常有香客問我娃肿,道長烟零,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任咸作,我火速辦了婚禮锨阿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘记罚。我一直安慰自己墅诡,他們只是感情好,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布桐智。 她就那樣靜靜地躺著末早,像睡著了一般。 火紅的嫁衣襯著肌膚如雪说庭。 梳的紋絲不亂的頭發(fā)上然磷,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音刊驴,去河邊找鬼姿搜。 笑死,一個胖子當著我的面吹牛捆憎,可吹牛的內(nèi)容都是我干的舅柜。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼躲惰,長吁一口氣:“原來是場噩夢啊……” “哼致份!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起础拨,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤氮块,失蹤者是張志新(化名)和其女友劉穎绍载,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體滔蝉,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡击儡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了锰提。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片曙痘。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡芳悲,死狀恐怖立肘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情名扛,我是刑警寧澤谅年,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站肮韧,受9級特大地震影響融蹂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弄企,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一超燃、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拘领,春花似錦意乓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至圣猎,卻和暖如春士葫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背送悔。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工慢显, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人欠啤。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓鳍怨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親跪妥。 傳聞我的和親對象是個殘疾皇子鞋喇,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1 述 WebSocket是一種網(wǎng)絡通信協(xié)議WebSocket 協(xié)議在2008年誕生,2011年成為國際標準眉撵。HT...
    凱玲之戀閱讀 672評論 0 0
  • 黑色的海島上懸著一輪又大又圓的明月,毫不嫌棄地把溫柔的月色照在這寸草不生的小島上罐韩。一個少年白衣白發(fā)憾赁,悠閑自如地倚坐...
    小水Vivian閱讀 3,099評論 1 5
  • 漸變的面目拼圖要我怎么拼矾睦? 我是疲乏了還是投降了晦款? 不是不允許自己墜落, 我沒有滴水不進的保護膜枚冗。 就是害怕變得面...
    悶熱當乘涼閱讀 4,237評論 0 13
  • 感覺自己有點神經(jīng)衰弱缓溅,總是覺得手機響了;屋外有人走過赁温;每次媽媽不聲不響的進房間突然跟我說話坛怪,我都會被嚇得半死!一整...
    章魚的擁抱閱讀 2,169評論 4 5
  • 夜鶯2517閱讀 127,716評論 1 9