前言
雨露均沾的OkHttp—WebSocket長連接(使用篇)
雨露均沾的OkHttp—WebSocket長連接(源碼篇)
上期我們熟悉了OkHttp中實現(xiàn)WebSocket
長連接的接入斩松,并且可以通過OkHttp
官方的MockWebSocket
服務來模擬服務端皆尔,實現(xiàn)整個流程迁央。
今天我們就來說下具體OkHttp
中是怎么實現(xiàn)這些功能的呢?相信看過這篇文章你也能深刻了解WebSocket
這個協(xié)議纠屋。
使用回顧
簡單貼下WebSocket
使用方法,方便下面解析:
//初始化
mClient = new OkHttpClient.Builder()
.pingInterval(10, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(mWbSocketUrl)
.build();
mWebSocket = mClient.newWebSocket(request, new WsListener());
//收到消息回調(diào)
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
super.onMessage(webSocket, text);
Log.e(TAG,"收到消息盾计!");
onWSDataChanged(DATE_NORMAL, text);
}
//發(fā)送消息
mWebSocket.send(message);
//主動關(guān)閉連接
mWebSocket.close(code, reason);
源碼解析
WebSocket
整個流程無非三個功能:連接售担,接收消息赁遗,發(fā)送消息。下面我們就從這三個方面
分析下具體是怎么實現(xiàn)的族铆。
連接
通過上面的代碼我們得知岩四,WebSocket
連接是通過newWebSocket
方法。直接點進去看這個方法:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
val webSocket = RealWebSocket(
taskRunner = TaskRunner.INSTANCE,
originalRequest = request,
listener = listener,
random = Random(),
pingIntervalMillis = pingIntervalMillis.toLong(),
extensions = null, // Always null for clients.
minimumDeflateSize = minWebSocketMessageToCompress
)
webSocket.connect(this)
return webSocket
}
這里做了兩件事:
- 初始化
RealWebSocket
骑素,主要是設置了一些參數(shù)(比如pingIntervalMillis
心跳包時間間隔炫乓,還有監(jiān)聽事件之類的) -
connect
方法進行WebSocket
連接
繼續(xù)查看connect方法:
connect(WebSocket連接握手)
fun connect(client: OkHttpClient) {
//***
val webSocketClient = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build()
val request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Extensions", "permessage-deflate")
.build()
call = RealCall(webSocketClient, request, forWebSocket = true)
call!!.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
//得到數(shù)據(jù)流
val streams: Streams
try {
checkUpgradeSuccess(response, exchange)
streams = exchange!!.newWebSocketStreams()
}
//***
// Process all web socket messages.
try {
val name = "$okHttpName WebSocket ${request.url.redact()}"
initReaderAndWriter(name, streams)
listener.onOpen(this@RealWebSocket, response)
loopReader()
} catch (e: Exception) {
failWebSocket(e, null)
}
}
})
}
上一篇使用篇文章中說過,Websocket
連接需要一次Http
協(xié)議的握手献丑,然后才能把協(xié)議升級成WebSocket
末捣。所以這段代碼就體現(xiàn)出這個功能了。
首先就new
了一個用來進行Http
連接的request
创橄,其中Header
的參數(shù)就表示我要進行WebSocket
連接了箩做,參數(shù)解析如下:
-
Connection:Upgrade
,表示客戶端要連接升級 -
Upgrade:websocket
妥畏, 表示客戶端要升級建立Websocket連接 -
Sec-Websocket-Key:key
邦邦, 這個key是隨機生成的,服務器會通過這個參數(shù)驗證該請求是否有效 -
Sec-WebSocket-Version:13
醉蚁, websocket使用的版本燃辖,一般就是13 -
Sec-webSocket-Extension:permessage-deflate
,客戶端指定的一些擴展協(xié)議网棍,比如這里permessage-deflate
就是WebSocket
的一種壓縮協(xié)議黔龟。
Header
設置好之后,就調(diào)用了call
的enqueue
方法滥玷,這個方法大家應該都很熟悉吧氏身,OkHttp
里面對于Http
請求的異步請求就是這個方法。
至此惑畴,握手結(jié)束蛋欣,服務器返回響應碼101
,表示協(xié)議升級如贷。
然后我們繼續(xù)看看獲取服務器響應之后又做了什么陷虎?
在發(fā)送Http
請求成功之后,onResponse
響應方法里面主要表現(xiàn)為四個處理邏輯:
- 將
Http
流轉(zhuǎn)換成WebSocket
流杠袱,得到Streams
對象泻红,這個流后面會轉(zhuǎn)化成輸入流和輸出流,也就是進行發(fā)送和讀取的操作流 -
listener.onOpen(this@RealWebSocket, response)
霞掺,回調(diào)了接口WebSocketListener
的onOpen
方法谊路,告訴用戶WebSocket
已經(jīng)連接 initReaderAndWriter(name, streams)
loopReader()
前兩個邏輯還是比較好理解,主要是后兩個方法菩彬,我們分別解析下缠劝。
首先看initReaderAndWriter
方法潮梯。
initReaderAndWriter(初始化輸入流輸出流)
//RealWebSocket.kt
@Throws(IOException::class)
fun initReaderAndWriter(name: String, streams: Streams) {
val extensions = this.extensions!!
synchronized(this) {
//***
//寫數(shù)據(jù),發(fā)送數(shù)據(jù)的工具類
this.writer = WebSocketWriter()
//設置心跳包事件
if (pingIntervalMillis != 0L) {
val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
taskQueue.schedule("$name ping", pingIntervalNanos) {
writePingFrame()
return@schedule pingIntervalNanos
}
}
//***
}
//***
//讀取數(shù)據(jù)的工具類
reader = WebSocketReader(
***
frameCallback = this,
***
)
}
internal fun writePingFrame() {
//***
try {
writer.writePing(ByteString.EMPTY)
} catch (e: IOException) {
failWebSocket(e, null)
}
}
這個方法主要干了兩件事:
- 實例化輸出流輸入流工具類惨恭,也就是
WebSocketWriter
和WebSocketReader
秉馏,用來處理數(shù)據(jù)的收發(fā)。 - 設置心跳包事件脱羡。如果
pingIntervalMillis
參數(shù)不為0萝究,就通過計時器,每隔pingIntervalNanos
發(fā)送一個ping
消息锉罐。其中writePingFrame
方法就是發(fā)送了ping
幀數(shù)據(jù)帆竹。
接收消息處理消息
loopReader
接著看看這個loopReader
方法是干什么的,看這個名字我們大膽猜測下脓规,難道這個方法就是用來循環(huán)讀取數(shù)據(jù)的栽连?去代碼里找找答案:
fun loopReader() {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader!!.processNextFrame()
}
}
代碼很簡單,一個while
循環(huán)侨舆,循環(huán)條件是receivedCloseCode == -1
的時候秒紧,做的事情是reader!!.processNextFrame()
方法。繼續(xù):
//WebSocketWriter.kt
fun processNextFrame() {
//讀取頭部信息
readHeader()
if (isControlFrame) {
//如果是控制幀挨下,讀取控制幀內(nèi)容
readControlFrame()
} else {
//讀取普通消息內(nèi)容
readMessageFrame()
}
}
//讀取頭部信息
@Throws(IOException::class, ProtocolException::class)
private fun readHeader() {
if (closed) throw IOException("closed")
try {
//讀取數(shù)據(jù)熔恢,獲取數(shù)據(jù)幀的前8位
b0 = source.readByte() and 0xff
} finally {
source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
}
//***
//獲取數(shù)據(jù)幀的opcode(數(shù)據(jù)格式)
opcode = b0 and B0_MASK_OPCODE
//是否為最終幀
isFinalFrame = b0 and B0_FLAG_FIN != 0
//是否為控制幀(指令)
isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0
//判斷最終幀,獲取幀長度等等
}
//讀取控制幀(指令)
@Throws(IOException::class)
private fun readControlFrame() {
if (frameLength > 0L) {
source.readFully(controlFrameBuffer, frameLength)
}
when (opcode) {
OPCODE_CONTROL_PING -> {
//ping 幀
frameCallback.onReadPing(controlFrameBuffer.readByteString())
}
OPCODE_CONTROL_PONG -> {
//pong 幀
frameCallback.onReadPong(controlFrameBuffer.readByteString())
}
OPCODE_CONTROL_CLOSE -> {
//關(guān)閉 幀
var code = CLOSE_NO_STATUS_CODE
var reason = ""
val bufferSize = controlFrameBuffer.size
if (bufferSize == 1L) {
throw ProtocolException("Malformed close payload length of 1.")
} else if (bufferSize != 0L) {
code = controlFrameBuffer.readShort().toInt()
reason = controlFrameBuffer.readUtf8()
val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
}
//回調(diào)onReadClose方法
frameCallback.onReadClose(code, reason)
closed = true
}
}
}
//讀取普通消息
@Throws(IOException::class)
private fun readMessageFrame() {
readMessage()
if (readingCompressedMessage) {
val messageInflater = this.messageInflater
?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
messageInflater.inflate(messageFrameBuffer)
}
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
} else {
frameCallback.onReadMessage(messageFrameBuffer.readByteString())
}
}
代碼還是比較直觀臭笆,這個processNextFrame
其實就是讀取數(shù)據(jù)用的叙淌,首先讀取頭部信息,獲取數(shù)據(jù)幀的類型耗啦,判斷是否為控制幀凿菩,再分別去讀取控制幀數(shù)據(jù)或者普通消息幀數(shù)據(jù)机杜。
數(shù)據(jù)幀格式
問題來了帜讲,什么是數(shù)據(jù)頭部信息,什么是控制幀椒拗?
這里就要說下WebSocket
的數(shù)據(jù)幀了似将,先附上一個數(shù)據(jù)幀格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-------+ +-+-------------+ +-----------------------------+
|F|R|R|R| OP | |M| LENGTH | Extended payload length
|I|S|S|S| CODE | |A| | (if LENGTH=126)
|N|V|V|V| | |S| |
| |1|2|3| | |K| |
+-+-+-+-+-------+ +-+-------------+
| Extended payload length(if LENGTH=127)
+ +-------------------------------
| Extended payload length | Masking-key,if Mask set to 1
+----------------------------------+-------------------------------
| Masking-key | Data
+----------------------------------+-------------------------------
| Data
+----------------------------------+-------------------------------
我承認蚀苛,我懵逼了在验。
冷靜冷靜,一步一步分析下吧堵未。
首先每一行代表4個字節(jié)腋舌,一共也就是32位數(shù),哦渗蟹,那也就是幾個字節(jié)而已嘛块饺,每個字節(jié)有他自己的代表意義唄赞辩,這樣想是不是就很簡單了,下面來具體看看每個字節(jié)授艰。
第1個字節(jié):
- 第一位是
FIN碼
辨嗽,其實就是一個標示位,因為數(shù)據(jù)可能多幀操作嘛淮腾,所以多幀情況下糟需,只有最后一幀的FIN
設置成1,標示結(jié)束幀谷朝,前面所有幀設置為0洲押。 - 第二位到第四位是
RSV碼
,一般通信兩端沒有設置自定義協(xié)議徘禁,就默認為0诅诱。 - 后四位是
opcode
,我們叫它操作碼送朱。這個就是判斷這個數(shù)據(jù)幀的類型了娘荡,一般有以下幾個被定義好的類型:
1) 0x0
表示附加數(shù)據(jù)幀
2) 0x1
表示文本數(shù)據(jù)幀
3) 0x2
表示二進制數(shù)據(jù)幀
4) 0x3-7
保留用于未來的非控制幀
5) 0x8
表示連接關(guān)閉
6) 0x9
表示ping
7) 0xA
表示pong
8) 0xB-F
保留用于未來的非控制幀
是不是發(fā)現(xiàn)了些什么,這不就對應了我們應用中的幾種格式嗎驶沼?2和3
對應的是普通消息幀炮沐,包括了文本和二進制數(shù)據(jù)。567
對應的就是控制幀格式回怜,包括了close大年,ping,pong
玉雾。
第2個字節(jié):
- 第一位是
Mask
掩碼翔试,其實就是標識數(shù)據(jù)是否加密混淆,1代表數(shù)據(jù)經(jīng)過掩碼的复旬,0是沒有經(jīng)過掩碼的垦缅,如果是1的話,后續(xù)就會有4個字節(jié)代表掩碼key
驹碍,也就是數(shù)據(jù)幀中Masking-key
所處的位置壁涎。 - 后7位是
LENGTH
,用來標示數(shù)據(jù)長度志秃。因為只有7位怔球,所以最大只能儲存1111111對應的十進制數(shù)127長度
的數(shù)據(jù),如果需要更大的數(shù)據(jù)浮还,這個儲存長度肯定就不夠了竟坛。
所以規(guī)定來了,1)小于126長度
則數(shù)據(jù)用這七位表示實際長度。2) 如果長度設置為126
担汤,也就是二進制1111110又官,就代表取額外2個字節(jié)
表示數(shù)據(jù)長度,共是16位表示數(shù)據(jù)長度漫试。3) 如果長度設置為127
六敬,也就是二進制1111111,就代表取額外8個字節(jié)
驾荣,共是64位表示數(shù)據(jù)長度外构。
需要注意的是LENGHT的三種情況在一個數(shù)據(jù)幀里面只會出現(xiàn)一種情況,不共存播掷,所以在圖中是用if表示审编。同樣的,Masking-key也是當Mask為1的時候才存在歧匈。
所以也就有了數(shù)據(jù)幀里面的Extended payload length(LENGTH=126)
所處的2個字節(jié)垒酬,以及Extended payload length(LENGTH=127)
所處的8個字節(jié)。
最后的字節(jié)部分自然就是掩碼key
(Mask為1的時候才存在)和具體的傳輸數(shù)據(jù)
了件炉。
還是有點暈吧??勘究,來張圖總結(jié)下:
好了,了解了數(shù)據(jù)幀格式后斟冕,我們再來讀源碼就清晰多了口糕。
先看看怎么讀的頭部信息
并解析的:
//取數(shù)據(jù)幀前8位數(shù)據(jù)
b0 = source.readByte() and 0xff
//獲取數(shù)據(jù)幀的opcode(數(shù)據(jù)格式)
opcode = b0 and B0_MASK_OPCODE(15)
//是否為最終幀
isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
//是否為控制幀(指令)
isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
- 第一句獲取頭信息,
and
是按位與計算磕蛇,and 0xff
意思就是按位與11111111景描,所以頭部信息其實就是取了數(shù)據(jù)幀的前8位數(shù)據(jù)
,一個字節(jié)秀撇。 - 第二句獲取
opcode
超棺,and 15
也就是按位與00001111,其實也就是取了后四位數(shù)據(jù)呵燕,剛好對應上opcode
的位置棠绘,第一個字節(jié)的后四位。 - 第三句獲取是否為
最終幀
虏等,剛才數(shù)據(jù)幀格式中說過弄唧,第一位FIN
標識了是否為最后一幀數(shù)據(jù)适肠,1代表結(jié)束幀霍衫,所以這里and 128
也就是按位與10000000,也就是取的第一位數(shù)侯养。 - 第四句獲取是否為控制幀敦跌,
and 8
也就是按位與00001000,取得是第五位,也就是opcode
的第一位柠傍,這是什么意思呢麸俘?我們看看剛才的數(shù)據(jù)幀格式,發(fā)現(xiàn)從0x8
開始就是所謂的控制幀了惧笛。0x8
對應的二進制是1000从媚,0x7
對應的二進制是0111。發(fā)現(xiàn)了吧患整,如果為控制幀的時候拜效,opcode
第一位肯定是為1的,所以這里就判斷的第五位各谚。
后面還有讀取第二個字節(jié)的代碼紧憾,大家可以自己沿著這個思路自己看看,包括了讀取MASK
昌渤,讀取數(shù)據(jù)長度的三種長度等赴穗。
所以這個processNextFrame
方法主要做了三件事:
-
readHeader
方法中,判斷了是否為控制幀膀息,是否為結(jié)束幀
般眉,然后獲取了Mask
標識,幀長度等參數(shù) -
readControlFrame
方法中潜支,主要處理了該幀數(shù)據(jù)為ping煤篙,pong,close
三種情況毁腿,并且在收到close關(guān)閉幀
的情況下辑奈,回調(diào)了onReadClose
方法,這個待會要細看下已烤。 -
readMessageFrame
方法中鸠窗,主要是讀取了消息后,回調(diào)了onReadMessage方法胯究。
至此可以發(fā)現(xiàn)稍计,其實WebSocket
傳輸數(shù)據(jù)并不是一個簡單的事,只是OkHttp
都幫我們封裝好了裕循,我們只需要直接傳輸數(shù)據(jù)即可臣嚣,感謝這些三方庫為我們開發(fā)作出的貢獻,不知道什么時候我也能做出點貢獻呢??剥哑。
對了硅则,剛才說回調(diào)也很重要,接著看看株婴。onReadClose
和onReadMessage
回調(diào)到哪了呢怎虫?還記得上文初始化WebSocketWriter
的時候設置了回調(diào)接口嗎。所以就是回調(diào)給RealWebSocket
了:
//RealWebSocket.kt
override fun onReadClose(code: Int, reason: String) {
require(code != -1)
var toClose: Streams? = null
var readerToClose: WebSocketReader? = null
var writerToClose: WebSocketWriter? = null
synchronized(this) {
check(receivedCloseCode == -1) { "already closed" }
receivedCloseCode = code
receivedCloseReason = reason
//...
}
try {
listener.onClosing(this, code, reason)
if (toClose != null) {
listener.onClosed(this, code, reason)
}
} finally {
toClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
@Throws(IOException::class)
override fun onReadMessage(text: String) {
listener.onMessage(this, text)
}
@Throws(IOException::class)
override fun onReadMessage(bytes: ByteString) {
listener.onMessage(this, bytes)
}
onReadClose
回調(diào)方法里面有個關(guān)鍵的參數(shù),receivedCloseCode
大审。還記得這個參數(shù)嗎蘸际?上文中解析消息的循環(huán)條件就是receivedCloseCode == -1
,所以當收到關(guān)閉幀的時候徒扶,receivedCloseCode
就不再等于-1(規(guī)定大于1000)粮彤,也就不再去讀取解析消息了。這樣整個流程就結(jié)束了姜骡。
其中還有一些WebSocketListener
的回調(diào)驾诈,比如onClosing,onClosed溶浴,onMessage
等乍迄,就直接回調(diào)給用戶使用了。至此士败,接收消息處理消息說完了闯两。
發(fā)消息
好了。接著說發(fā)送谅将,看看send
方法:
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
// ***
// Enqueue the message frame.
queueSize += data.size.toLong()
messageAndCloseQueue.add(Message(formatOpcode, data))
runWriter()
return true
}
首先漾狼,把要發(fā)送的data
封裝成Message
對象,然后入隊列messageAndCloseQueue
饥臂。最后執(zhí)行runWriter
方法逊躁。這都不用猜了,runWriter
肯定就要開始發(fā)送消息了隅熙,繼續(xù)看:
//RealWebSocket.kt
private fun runWriter() {
this.assertThreadHoldsLock()
val writerTask = writerTask
if (writerTask != null) {
taskQueue.schedule(writerTask)
}
}
private inner class WriterTask : Task("$name writer") {
override fun runOnce(): Long {
try {
if (writeOneFrame()) return 0L
} catch (e: IOException) {
failWebSocket(e, null)
}
return -1L
}
}
//以下是schedule方法轉(zhuǎn)到WriterTask的runOnce方法過程
//TaskQueue.kt
fun schedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
taskRunner.kickCoordinator(this)
}
}
}
internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
//***
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)
// Impact the coordinator if we inserted at the front.
return insertAt == 0
}
//TaskRunner.kt
internal fun kickCoordinator(taskQueue: TaskQueue) {
this.assertThreadHoldsLock()
if (taskQueue.activeTask == null) {
if (taskQueue.futureTasks.isNotEmpty()) {
readyQueues.addIfAbsent(taskQueue)
} else {
readyQueues.remove(taskQueue)
}
}
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
backend.execute(runnable)
}
}
private val runnable: Runnable = object : Runnable {
override fun run() {
while (true) {
val task = synchronized(this@TaskRunner) {
awaitTaskToRun()
} ?: return
logElapsed(task, task.queue!!) {
var completedNormally = false
try {
runTask(task)
completedNormally = true
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
backend.execute(this)
}
}
}
}
}
}
private fun runTask(task: Task) {
try {
delayNanos = task.runOnce()
}
}
代碼有點長稽煤,這里是從runWriter
開始跟的幾個方法,拿到writerTask
實例后囚戚,存到TaskQueue
的futureTasks列表
里酵熙,然后到runnable
這里可以看到是一個while
死循環(huán),不斷的從futureTasks
中取出Task
并執(zhí)行runTask
方法驰坊,直到Task
為空匾二,循環(huán)停止。
其中涉及到兩個新的類:
-
TaskQueue類
主要就是管理消息任務列表拳芙,保證按順序執(zhí)行 -
TaskRunner類
主要就是做一些任務的具體操作察藐,比如線程池里執(zhí)行任務,記錄消息任務的狀態(tài)(準備發(fā)送的任務隊列readyQueues
舟扎,正在執(zhí)行的任務隊列busyQueues
等等)
而每一個Task最后都是執(zhí)行到了WriterTask
的runOnce
方法分飞,也就是writeOneFrame
方法:
internal fun writeOneFrame(): Boolean {
synchronized(this@RealWebSocket) {
if (failed) {
return false // Failed web socket.
}
writer = this.writer
pong = pongQueue.poll()
if (pong == null) {
messageOrClose = messageAndCloseQueue.poll()
if (messageOrClose is Close) {
} else if (messageOrClose == null) {
return false // The queue is exhausted.
}
}
}
//發(fā)送消息邏輯,包括`pong`消息浆竭,普通消息浸须,關(guān)閉消息
try {
if (pong != null) {
writer!!.writePong(pong)
} else if (messageOrClose is Message) {
val message = messageOrClose as Message
writer!!.writeMessageFrame(message.formatOpcode, message.data)
synchronized(this) {
queueSize -= message.data.size.toLong()
}
} else if (messageOrClose is Close) {
val close = messageOrClose as Close
writer!!.writeClose(close.code, close.reason)
// We closed the writer: now both reader and writer are closed.
if (streamsToClose != null) {
listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
}
}
return true
} finally {
streamsToClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
這里就會執(zhí)行發(fā)送消息的邏輯了,主要有三種消息情況處理:
-
pong消息
邦泄,這個主要是為服務器端準備的顽素,發(fā)送給客戶端回應心跳包先较。 -
普通消息
,就會把數(shù)據(jù)類型Opcode
和具體數(shù)據(jù)發(fā)送過去 -
關(guān)閉消息
,其實當用戶執(zhí)行close
方法關(guān)閉WebSocket
的時候伸但,也是發(fā)送了一條Close控制幀
消息給服務器告知這個關(guān)閉需求,并帶上code狀態(tài)碼
和reason關(guān)閉原因
蹋岩,然后服務器端就會關(guān)閉當前連接增蹭。
好了。最后一步了午乓,就是把這些數(shù)據(jù)組裝成WebSocket
數(shù)據(jù)幀并寫入流站宗,分成控制幀
數(shù)據(jù)和普通消息數(shù)據(jù)幀
:
//寫入(發(fā)送)控制幀
private fun writeControlFrame(opcode: Int, payload: ByteString) {
if (writerClosed) throw IOException("closed")
val length = payload.size
require(length <= PAYLOAD_BYTE_MAX) {
"Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
}
val b0 = B0_FLAG_FIN or opcode
sinkBuffer.writeByte(b0)
var b1 = length
if (isClient) {
b1 = b1 or B1_FLAG_MASK
sinkBuffer.writeByte(b1)
random.nextBytes(maskKey!!)
sinkBuffer.write(maskKey)
if (length > 0) {
val payloadStart = sinkBuffer.size
sinkBuffer.write(payload)
sinkBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(payloadStart)
toggleMask(maskCursor, maskKey)
maskCursor.close()
}
} else {
sinkBuffer.writeByte(b1)
sinkBuffer.write(payload)
}
sink.flush()
}
//寫入(發(fā)送)普通消息數(shù)據(jù)幀
@Throws(IOException::class)
fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
if (writerClosed) throw IOException("closed")
messageBuffer.write(data)
var b0 = formatOpcode or B0_FLAG_FIN
val dataSize = messageBuffer.size
sinkBuffer.writeByte(b0)
var b1 = 0
if (isClient) {
b1 = b1 or B1_FLAG_MASK
}
when {
dataSize <= PAYLOAD_BYTE_MAX -> {
b1 = b1 or dataSize.toInt()
sinkBuffer.writeByte(b1)
}
dataSize <= PAYLOAD_SHORT_MAX -> {
b1 = b1 or PAYLOAD_SHORT
sinkBuffer.writeByte(b1)
sinkBuffer.writeShort(dataSize.toInt())
}
else -> {
b1 = b1 or PAYLOAD_LONG
sinkBuffer.writeByte(b1)
sinkBuffer.writeLong(dataSize)
}
}
if (isClient) {
random.nextBytes(maskKey!!)
sinkBuffer.write(maskKey)
if (dataSize > 0L) {
messageBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(0L)
toggleMask(maskCursor, maskKey)
maskCursor.close()
}
}
sinkBuffer.write(messageBuffer, dataSize)
sink.emit()
}
大家應該都能看懂了吧,其實就是組裝數(shù)據(jù)幀益愈,包括Opcode梢灭,mask,數(shù)據(jù)長度
等等蒸其。兩個方法的不同就在于普通數(shù)據(jù)需要判斷數(shù)據(jù)長度的三種情況敏释,再組裝數(shù)據(jù)幀。最后都會通過sinkBuffer
寫入到輸出數(shù)據(jù)流摸袁。
終于钥顽,基本的流程說的差不多了。其中還有很多細節(jié)靠汁,同學們可以自己花時間看看琢磨琢磨蜂大,比如Okio
部分。還是那句話蝶怔,希望大家有空自己也讀一讀相關(guān)源碼县爬,這樣理解才能深刻,而且你肯定會發(fā)現(xiàn)很多我沒說到的細節(jié)添谊,歡迎大家討論财喳。我也會繼續(xù)努力,最后大家給我加個油點個贊吧斩狱,感謝感謝耳高。
總結(jié)
再來個圖總結(jié)下吧!??
參考
你的一個??所踊,就是我分享的動力??泌枪。