本文要求:大概了解使用方法,知道如何通過okhttp建立websocket,四個回調(diào)的名字最好大概記住,因為后面會直接使用习寸。
websocket協(xié)議有兩種消息類型, 被稱為frame器仗, 幀融涣,一種是消息類童番, 就是我們普通通信使用的精钮, 一種是控制通信用的威鹿,比如關(guān)閉用的close幀,心跳相關(guān)的ping幀和pong幀轨香。
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
而發(fā)射的概念就是write a frame忽你,到服務(wù)端, 接受呢就是響應(yīng)的read a frame
兩者最為關(guān)鍵的源碼就是runWriter()
和 loopReader()
這兩個函數(shù).
本文先從程序的入口開始查看設(shè)計和邏輯臂容, 最后再從最貼近業(yè)務(wù)的科雳,暴露給應(yīng)用層使用的 發(fā)射,接受脓杉,以及回調(diào) 來總結(jié)歸納一下糟秘。
可以說市面上大部分的博客都是寫的人都是一知半解,寫出來的更是千篇一律球散,大部分僅僅講解了demo級別的使用方法尿赚,缺失了很多情況的處理, 以及重要的注意點蕉堰。
1. 入口
val client = OkHttpClient.Builder()
client.newWebSocket(request, object : WebSocketListener() {}
還是從OkHttpClient開始的
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
newWebSocket()
函數(shù)里完成的事情:
- 初始化RealWebSocket
- 調(diào)用RealWebSocket$connect()
1.1初始化
public RealWebSocket(Request request, WebSocketListener listener, Random random,
long pingIntervalMillis) {
if (!"GET".equals(request.method())) {
throw new IllegalArgumentException("Request must be GET: " + request.method());
}
this.originalRequest = request;
this.listener = listener;
this.random = random;
this.pingIntervalMillis = pingIntervalMillis;
byte[] nonce = new byte[16];
random.nextBytes(nonce);
this.key = ByteString.of(nonce).base64();
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
這里request和random用來建立連接凌净,把http1升級成websocket。
this.pingIntervalMillis
是指定心跳的間隔屋讶, OkHttpClient.Builder()構(gòu)建的時候默認(rèn)是0冰寻,默認(rèn)的話,就不會發(fā)射ping幀了皿渗,后面會提到斩芭。
還有重要的一步是初始化了this.writerRunnable
,這個runnable乐疆,是后續(xù)所有write操作(write就是客戶端發(fā)送的操作划乖,read就是接受服務(wù)端發(fā)來的操作)最終調(diào)用的任務(wù)(runWriter)
這個任務(wù)是無限循環(huán)writeOneFrame()
函數(shù)
注釋基本寫的非常的清楚了:
嘗試從queue里取出一幀,然后send他诀拭,pong幀優(yōu)先級高于message幀和close幀迁筛,會優(yōu)先發(fā)射,如果一個調(diào)用者入隊了一個被pong幀跟隨的message幀耕挨,那么會發(fā)射pong幀细卧,讓message幀跟隨, pong幀當(dāng)他們?nèi)腙牭臅r候筒占, 永遠(yuǎn)會被下一個執(zhí)行贪庙。
如果queue已經(jīng)空了, 或者websokcet斷開連接了翰苫, 就不能send幀了止邮。
這樣會什么都不做这橙,只是返回false,否則會返回true且立刻再次調(diào)用本方法直到返回false导披。
這個方法只能唄writter thread調(diào)用屈扎, 可能只有一個線程在同一時間調(diào)用這個方法(應(yīng)該是一定的吧, 因為會檢查是否有鎖)撩匕。
看下源碼其實pongQueue和messageAndCloseQueue是兩個queue鹰晨,先處理pongQueue的邏輯
如果poll出了pong幀,就直接
writer.writePong(pong);
發(fā)射如果沒有poll出pong幀止毕,那么處理messageAndCloseQueue
1.1.1處理messageAndCloseQueue
如果沒有pong的話模蜡, messageAndCloseQueue.poll()取出消息messageOrClose
if(messageOrClose is null)
return false隊列空了, 這個while死循環(huán)就斷掉了
if(messageOrClose is Close幀)
if(receivedCloseCode扁凛!= -1忍疾,也就這是從服務(wù)器接收到的close幀)
關(guān)閉excutor
else 證明是自己發(fā)射的close幀
優(yōu)雅的關(guān)閉,60秒后調(diào)用call.cancel
不管優(yōu)不優(yōu)雅是不是自己enqueue的Close幀谨朝,都會執(zhí)行下面的代碼
發(fā)射close幀;
如果是服務(wù)端接受到的close幀那么會回調(diào)onClosed()方法卤妒,通知調(diào)用者;
最后return true
/** The close code from the peer, or -1 if this web socket has not yet read a close frame. */
private int receivedCloseCode = -1;
初始化的代碼基本就這些了,但卻非常的重要叠必,因為他定義了所有與發(fā)射相關(guān)的邏輯荚孵。
1.2connect()函數(shù)
public void connect(OkHttpClient client) {
client = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build();
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams(); // Prevent connection pooling!
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
// Process all web socket messages.
try {
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
initReaderAndWriter(name, streams);
streamAllocation.connection().socket().setSoTimeout(0);
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null);
}
});
}
connect函數(shù)完成了websocket協(xié)議要求的升級過程(如果不了解,請簡單了解下websocket協(xié)議)纬朝,可以看到他添加了一些請求頭收叶, 并且調(diào)用了一次http1的握手協(xié)議, 如果失敗的話共苛,回調(diào)failWebSocket給用戶通知判没。
主要看下升級成功的邏輯, 此時已經(jīng)是websocket協(xié)議了隅茎, 可以看到上來就檢查response澄峰,確認(rèn)無誤后,回調(diào)onOpen通知用戶辟犀。下面是幾件重要的步驟
- 初始化reader和writer
- loopReader()死循環(huán)讀服務(wù)端發(fā)來的消息
- 設(shè)置超時時間為0俏竞,證明是阻塞IO