最近在項(xiàng)目的開發(fā)中缅阳,碰到了這樣一個(gè)需求:需要在長連接的心跳發(fā)送時(shí)執(zhí)行一些業(yè)務(wù)上的邏輯。那么命锄,問題就在于如何在現(xiàn)有的長連接的基礎(chǔ)上,以盡可能小的改動(dòng)偏化,實(shí)現(xiàn)這個(gè)需求脐恩。故事也就由此開始了。
確定okhttp是否有提供相應(yīng)的API
首先肯定是要確定okhttp中是否有類似的API可以使用侦讨,或者是否可以通過更新版本來解決這個(gè)問題驶冒。剛好,我找到了GitHub中有人提出了類似的問題韵卤,可以來看看官方的說法:
WebSocket ping logic is not customizable · Issue #3197 · square/okhttp
可以看到骗污,開發(fā)者明確表示了并不希望讓應(yīng)用層自定義ping方法的邏輯,那么看來只能另想辦法了沈条。
okhttp中的心跳的使用方法與實(shí)現(xiàn)原理
首先需忿,我來簡單梳理一下okhttp中心跳的實(shí)現(xiàn)原理,如果只是想要解決方法的朋友可以直接跳過這一部分。
在okhttp中屋厘,實(shí)現(xiàn)心跳的方式非常簡單涕烧,只需要在OkHttpClient創(chuàng)建時(shí)添加相應(yīng)的配置即可:
OkHttpClient.Builder()
.pingInterval(HEART_BEAT_RATE, TimeUnit.SECONDS)
.build()
那么具體的心跳邏輯是如何實(shí)現(xiàn)的呢,一起來看看具體的代碼細(xì)節(jié)汗洒。
//OkHttpClient.java
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
//RealWebSocket.java
public RealWebSocket(Request request, WebSocketListener listener, Random random,long pingIntervalMillis) {
//...
this.pingIntervalMillis = pingIntervalMillis;
//...
}
public void initReaderAndWriter(String name, Streams streams) throws IOException {
synchronized (this) {
//...
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
//...
}
}
private final class PingRunnable implements Runnable {
@Override public void run() {
writePingFrame();
}
}
void writePingFrame() {
//...
try {
writer.writePing(ByteString.EMPTY);
} catch (IOException e) {
failWebSocket(e, null);
}
//...
}
//WebSocketWriter.java
void writePing(ByteString payload) throws IOException {
writeControlFrame(OPCODE_CONTROL_PING, payload);
}
上面的代碼就是ping的主要發(fā)送邏輯了议纯,簡單總結(jié)一下就是如果pingInterval不為0,那就開啟一個(gè)的循環(huán)任務(wù)仲翎,定時(shí)的去發(fā)送代表ping的ControlFrame痹扇。
其中值得一提的就是ControlFrame這個(gè)概念,在WebSocket中的frame分為兩類溯香,一類叫做MessageFrame鲫构,也就是平時(shí)客戶端與服務(wù)端互相通信的部分。另一類叫做ControlFrame玫坛,其中包括CONTROL_PING结笨,CONTROL_PONG,CONTROL_CLOSE湿镀,可以看出這一類更偏重與功能性的方面炕吸。具體為哪一類的Frame可以在Header中進(jìn)行區(qū)分。
上面已經(jīng)介紹了心跳的發(fā)送邏輯勉痴,那么下面就輪到接收的邏輯了赫模,還是先來看看代碼:
//RealWebSocket.java
public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader.processNextFrame();
}
}
//WebSocketReader.java
void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
private void readControlFrame() throws IOException {
//...
switch (opcode) {
case OPCODE_CONTROL_PING:
frameCallback.onReadPing(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_PONG:
frameCallback.onReadPong(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_CLOSE:
//...
default:
throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
}
}
可以看到,接收的部分邏輯也很簡單蒸矛,就是通過一個(gè)循環(huán)去讀取瀑罗,如果接收到了消息,那就先通過header確定frame的類型雏掠,然后再分類進(jìn)行處理斩祭。
而且值得注意的是,上面代碼中出現(xiàn)了一個(gè)frameCallback的對(duì)象乡话,而這個(gè)對(duì)象是WebSocketReader.FrameCallback這個(gè)接口的實(shí)現(xiàn)摧玫,而里面的onReadPing和onReadPong就是我們之后能夠做文章的地方了。
WebSocketReader.FrameCallback
public interface FrameCallback {
void onReadMessage(String text) throws IOException;
void onReadMessage(ByteString bytes) throws IOException;
void onReadPing(ByteString buffer);
void onReadPong(ByteString buffer);
void onReadClose(int code, String reason);
}
添加回調(diào)的具體實(shí)現(xiàn)
在上面的源碼分析中绑青,我們注意到了WebSocketReader.FrameCallback這個(gè)接口诬像,如果我們能夠自己實(shí)現(xiàn)這個(gè)接口,并且注入到websocket的reader中闸婴,那么這個(gè)需求不就實(shí)現(xiàn)了嗎颅停。
那么我們?cè)賮砜纯磖eader中的frameCallback按照原來的邏輯應(yīng)該是個(gè)什么東西:
//RealWebSocket.java
reader = new WebSocketReader(streams.client, streams.source, this);
//WebSocketReader.java
WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) {
//...
this.frameCallback = frameCallback;
//...
}
原來frameCallback就是RealWebSocket,而我們所持有的webSocket正是RealWebSocket的對(duì)象掠拳,那么只需要做一個(gè)靜態(tài)代理,然后通過反射將reader替換為我們自己的實(shí)現(xiàn)就可以了:
private fun replaceReaderCallBack() {
val wsClass = webSocket!!.javaClass
val callbackClass = wsClass.interfaces.find { it.name.contains("FrameCallback") } ?: return
val readerField = wsClass.getDeclaredField("reader")
readerField.isAccessible = true
val reader = readerField.get(webSocket)
val callbackInstance = Proxy.newProxyInstance(reader.javaClass.classLoader, arrayOf(callbackClass)) { proxy, method, args ->
when (method?.name) {
"onReadMessage" -> {
if (args!![0] is String) {
webSocket?.onReadMessage(args[0] as String)
} else {
webSocket?.onReadMessage(args[0] as ByteString)
}
}
"onReadPing" -> { webSocket?.onReadPing(args!![0] as ByteString) }
"onReadPong" -> { webSocket?.onReadPong(args!![0] as ByteString) }
"onReadClose" -> { webSocket?.onReadClose(args!![0] as Int, args[1] as String) }
}
0
}
reader.javaClass.getDeclaredField("frameCallback").apply {
isAccessible = true
set(reader, callbackInstance)
}
}
至此纸肉,回調(diào)已經(jīng)添加完成溺欧,只需要在對(duì)應(yīng)的回調(diào)中補(bǔ)上自己的業(yè)務(wù)邏輯喊熟,然后在websocket創(chuàng)建完成之后調(diào)用一下這個(gè)方法就完成了。