OkHttp使用分析—WebSocket篇
我們先看一下怎么使用OKhtttp完成WebSocket的請求:
//設(shè)置連接超時(shí)時(shí)間
mOkHttpClient = new OkHttpClient.Builder().connectTimeout(9 * 10, TimeUnit.SECONDS).build();
Request request = new Request.Builder().url(BASE_URL).build();
mWebSocket = mOkHttpClient.newWebSocket(request, this);
重點(diǎn)在這里,打開OkHttpClient.class查找newWebSocket()方法:
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
webSocket.connect(this);
return webSocket;
}
這里傳入request對象和websocket的專用監(jiān)聽WebSocketListener呼巷,WebSocketListener 對象稍后再做贅述褂微,主流程還是看RealWebSocket.class的connect()方法:
步驟1:
client = client.newBuilder()
.protocols(ONLY_HTTP1)
.build();
我們都知道普通的請求時(shí)client是需要被bulid的,這里拿到OkHttpClient又重新創(chuàng)建了一遍,一開始就創(chuàng)建好了干嘛還要?jiǎng)?chuàng)建創(chuàng)建呢?看這個(gè)方法:protocols(ONLY_HTTP1)盏浇,
private static final List<Protocol> ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1);
步驟2:
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
對request對象的頭部加工滑绒,
步驟3:
call = Internal.instance.newWebSocketCall(client, request);
從OkHttpClient中 獲取WebSocket的call對象(回調(diào)使用)佑稠,這個(gè)Internal.instance雖然是接口方法粗卜,其實(shí)現(xiàn)是在OkHttpClient中,直接看對應(yīng)方法:
@Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) {
return new RealCall(client, originalRequest, true);
}
步驟4:搜嘎 原來enqueue()方法是使用RealCall.class的enqueue()方法桥爽,這是一個(gè)入隊(duì)的方法朱灿,而且是個(gè)異步的方法。這就說明webSocket建立連接后才響應(yīng)回調(diào)钠四。而且如果是長連接那么這個(gè)線程就一直在線程池里不會被釋放掉盗扒。
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
照現(xiàn)在的進(jìn)度已經(jīng)到了設(shè)置好的回調(diào)要開始執(zhí)行了,那就轉(zhuǎn)戰(zhàn)RealCall
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
其實(shí)我對okhttp同步請求有幾點(diǎn)疑惑:
1一開始我沒有創(chuàng)建線程缀去,那么這個(gè)請求就是在主線程中嗎侣灶?
2如果是同步請求那么如果同時(shí)多次請求是不是如果前面的請求在執(zhí)行后面的請求在進(jìn)入等待的狀態(tài)了呢?
其實(shí)這些問題就需要從dispatcher()的線程池入手了缕碎。
這個(gè)dispatcher在一開始介紹ok的時(shí)候已經(jīng)介紹過了褥影,我們來看dispatcher中的enqueue()方法:
嘿嘿嘿,又到了OkHttp請求里了 而且 這時(shí)候realCall內(nèi)部創(chuàng)建了AsyncCall(異步的Call)咏雌,其實(shí)看方法名就應(yīng)該知道的凡怎,ok的webSocket都是使用異步的,而且我們要明白現(xiàn)在只是一個(gè)最初的socket赊抖,之后的通信统倒,都會在該線程池的一個(gè)線程中進(jìn)行。
問題1:ok的websocket是異步的氛雪,并不會阻塞主線程房匆,而且也不需要單獨(dú)開辟一個(gè)子線程來創(chuàng)建連接。
問題2:會不會阻塞首先我們再次看看這個(gè)executorService的線程池結(jié)構(gòu)报亩。雖然在同步篇對dispatcher的線程池做過介紹坛缕,但是在我看來還是很解釋不夠清晰的地方:
首先 這個(gè)是dispatcher線程池的結(jié)構(gòu)
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
我在這里做一個(gè)詳細(xì)的說明:首先,SynchronousQueue是一個(gè)無緩存的阻塞的隊(duì)列捆昏,什么意思呢?我們可以理解為當(dāng)這個(gè)隊(duì)列中有元素的時(shí)候毙沾,這個(gè)元素沒有被取走(take方法)之前是不允許繼續(xù)對之后的內(nèi)容進(jìn)行操作骗卜。
注意1:它一種阻塞隊(duì)列,其中每個(gè) put 必須等待一個(gè) take左胞,反之亦然寇仓。同步隊(duì)列沒有任何內(nèi)部容量,甚至連一個(gè)隊(duì)列的容量都沒有烤宙。
注意2:它是線程安全的遍烦,是阻塞的。
注意3:不允許使用 null 元素躺枕。
注意4:公平排序策略是指調(diào)用put的線程之間服猪,或take的線程之間供填。公平排序策略可以查考ArrayBlockingQueue中的公平策略。
所以這又解決了一個(gè)困擾我多年的難題:
okhttp的能同時(shí)執(zhí)行多少個(gè)請求罢猪?
這個(gè)線程池的配置其實(shí)就是Executors提供的線程池配置方案之一近她,構(gòu)造一個(gè)緩沖功能的線程池,配置corePoolSize=0膳帕,maximumPoolSize=Integer.MAX_VALUE粘捎,keepAliveTime=60s,以及一個(gè)無容量的阻塞隊(duì)列 SynchronousQueue,因此任務(wù)提交之后危彩,將會創(chuàng)建新的線程執(zhí)行攒磨;線程空閑超過60s將會銷毀:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
用一個(gè)形象的比喻就是一個(gè)傳球手,當(dāng)從主線程傳進(jìn)了任務(wù)汤徽,就創(chuàng)建一個(gè)runnable來接收娩缰。
這里是Dispatcher的異步啟動(dòng)方法:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
在這里專門用runningAsyncCalls來記錄在執(zhí)行的Call,每次執(zhí)行都會記錄泻骤,當(dāng)向executor添加call的時(shí)候漆羔,根據(jù)2,將任務(wù)放入SynchronousQueue中等待前面的request被取出才能執(zhí)行之后的request狱掂,這里maxRequests 被定為64.超出64的將會被放入readyAsyncCalls演痒。
ready和running之間怎么傳遞呢?
這就需要我們對比分析下RealCall這個(gè)類:
同步的時(shí)候是調(diào)用RealCall的:@Override public Response execute() throws IOException
異步的時(shí)候是調(diào)用AsyncCall的:
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
事件的回調(diào)已經(jīng)具備了趋惨,回收需要看這里.finished(this)方法鸟顺,最終會調(diào)用這個(gè):
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
那么問題又來了,
請對比分析Ok與Volley的優(yōu)缺點(diǎn)器虾。
websocket篇:
此前我先聲明一點(diǎn)讯嫂,一個(gè)websocket鏈接的建立是在一個(gè)子線程當(dāng)中,如果鏈接不關(guān)閉這個(gè)子線程一直存在兆沙,
在鏈接前 我們創(chuàng)建了一個(gè)RealWebSocket.class我們進(jìn)它的構(gòu)造里看看也許有個(gè)驚喜:
public RealWebSocket(Request request, WebSocketListener listener, Random random) {
//省略部分代碼
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
在這里創(chuàng)建了一個(gè)寫的線程欧芽,writerRunnable
再看connect()方法:這次只需要看call的回調(diào)就可以。根據(jù)現(xiàn)在的流程葛圃,鏈接成功千扔,走了成功的回調(diào),Call的onResponse方法:
try {
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
initReaderAndWriter(name, pingIntervalMillis, streams);
streamAllocation.connection().socket().setSoTimeout(0);
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
核心代碼在這里:
1.initReaderAndWriter()初始化讀寫者库正。這是為同服務(wù)器交互進(jìn)行準(zhǔn)備曲楚?
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
準(zhǔn)備了Writer,準(zhǔn)備了定時(shí)任務(wù)(心跳鏈接ping——pong)
runWriter();方法都做了什么呢褥符?
private void runWriter() {
assert (Thread.holdsLock(this));
if (executor != null) {
executor.execute(writerRunnable);
}
}
哈哈 原來是為心跳鏈接做準(zhǔn)備啊龙誊,定時(shí)進(jìn)行通知服務(wù)器 我還在哈。
2.loopReader()開始輪訓(xùn)讀取消息(隨時(shí)準(zhǔn)備接受來自服務(wù)器的消息)
public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader.processNextFrame();
}
}
這不喷楣,一直循環(huán)調(diào)用reader.processNextFrame();
/**
* Process the next protocol frame.
*
* <ul>
* <li>If it is a control frame this will result in a single call to {@link FrameCallback}.
* <li>If it is a message frame this will result in a single call to {@link
* FrameCallback#onReadMessage}. If the message spans multiple frames, each interleaved
* control frame will result in a corresponding call to {@link FrameCallback}.
* </ul>
*/
void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
沒辦法 注釋寫的太好了趟大,我忍不住都粘貼了進(jìn)來:
1如果是控制幀將會有一個(gè)單一的callback:FrameCallback
2如果是消息幀也會有一個(gè)單一的callback:FrameCallback#onReadMessage
看到這里websocket基本上已經(jīng)完了鹤树,剩下的就是調(diào)用監(jiān)聽了。
~~~~~~~~~~~~~~ 補(bǔ)充部分 ~~~~~~~~~~~~~~~
感謝網(wǎng)友朋友細(xì)心指導(dǎo)护昧,因?yàn)閷戇@篇文章比較早(細(xì)節(jié)忘了很多魂迄,尷尬)還原問題:
“框架會自動(dòng)發(fā)送ping包嗎? 怎么設(shè)置發(fā)送間隔時(shí)間呢惋耙?”
真的會捣炬,而且在而且OkHttpClient也支持設(shè)置心跳間隔:
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
還對 ping pong的次數(shù)進(jìn)行了記錄:至于怎么發(fā)送ping 需要看這個(gè):
initReaderAndWriter(name, pingIntervalMillis, streams);
沒錯(cuò) 又追蹤到了初始化讀寫者,在初始化讀寫者的時(shí)候有這樣一句(多看一句就能回答 讀者的問題了 甚是慚愧):
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
由此可見:
1 如果pingIntervalMillis 設(shè)置為0的時(shí)候 心跳executor是不會執(zhí)行的绽榛。
2 executor 原來也負(fù)責(zé)心跳包的定時(shí)任務(wù)
讓我們看看 pingrunnable里都做了什么吧:
private final class PingRunnable implements Runnable {
PingRunnable() {
}
@Override public void run() {
writePingFrame();
}
}
void writePingFrame() {
WebSocketWriter writer;
synchronized (this) {
if (failed) return;
writer = this.writer;
}
try {
writer.writePing(ByteString.EMPTY);
} catch (IOException e) {
failWebSocket(e, null);
}
}
果然簡單實(shí)用:
一個(gè)runnable 調(diào)用writer的writePing方法湿酸。想一想還是很合理啊,畢竟發(fā)送消息就是需要 writer來做灭美,所以 writer有這些方法也不足為其推溃。具體writer怎么寫 我們看下:
/** Send a ping with the supplied {@code payload}. */
void writePing(ByteString payload) throws IOException {
synchronized (this) {
writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload);
}
}
/** Send a pong with the supplied {@code payload}. */
void writePong(ByteString payload) throws IOException {
synchronized (this) {
writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload);
}
}
順便一瞅 就在下邊有個(gè)pong的發(fā)送方法,分析一下:
1 入?yún)ayload 是ByteString.EMPTY 就是一個(gè)空的字節(jié)届腐,
2 最終都是相同的方法writeControlFrameSynchronized铁坎,
3 對于消息的區(qū)分:依靠writeControlFrameSynchronized的第一個(gè)入?yún)pcode,
4 writeControlFrameSynchronized這個(gè)方法雖然沒有注釋 但是 即然寫消息都需要調(diào)用這個(gè)方法犁苏,相比這個(gè)方法才是writer的實(shí)力擔(dān)當(dāng):
private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException {
assert Thread.holdsLock(this);
if (writerClosed) throw new IOException("closed");
int length = payload.size();
if (length > PAYLOAD_BYTE_MAX) {
throw new IllegalArgumentException(
"Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);
}
int b0 = B0_FLAG_FIN | opcode;
sink.writeByte(b0);
int b1 = length;
if (isClient) {
b1 |= B1_FLAG_MASK;
sink.writeByte(b1);
random.nextBytes(maskKey);
sink.write(maskKey);
byte[] bytes = payload.toByteArray();
toggleMask(bytes, bytes.length, maskKey, 0);
sink.write(bytes);
} else {
sink.writeByte(b1);
sink.write(payload);
}
sink.flush();
}
操作太6 硬萍,表示職能看懂個(gè)大概 , 都被寫入這個(gè)sink中了N辍F庸浴!
問題來了:sink是什么東西助赞?
/** Writes must be guarded(被守護(hù)的) by synchronizing on 'this'. */
final BufferedSink sink;
沒有交代买羞,但是有這樣一個(gè)提醒,對sink寫的時(shí)候必須是被synchronizing保護(hù)的 這樣我算是明白為嘛ping和pong的方法都會加鎖了(他說咋做就咋做 嘻嘻 稍后看)雹食。
我們先從單詞上理解這個(gè)變量的意義吧:sink畜普,水槽,洗滌池群叶,什么鬼漠嵌?看不懂。盖呼。。我還是看BufferedSink吧:
- A sink that keeps a buffer internally so that callers can do small writes
- 在內(nèi)部保留緩沖區(qū)的接收器化撕,以便調(diào)用方可以執(zhí)行小的寫入操作几晤。
- without a performance penalty.
都說了是個(gè)小型的緩沖池,因此在寫的時(shí)候會對大小進(jìn)行限制:
static final long PAYLOAD_BYTE_MAX = 125L;
雖然是個(gè)接口但是已經(jīng)給了我們足夠多的有效信息植阴,讓我們看看在創(chuàng)建的時(shí)候是怎么實(shí)現(xiàn)這個(gè)BufferedSink蟹瘾,回到最初writer創(chuàng)建的地方:
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
哦圾浅?在初始化的時(shí)候從Stream中獲取的。在向上找當(dāng)初的stream是怎么創(chuàng)建的:
當(dāng)鏈接成功后就會 返回一個(gè)Call:
@Override public void onResponse(Call call, Response response)
// Promote the HTTP streams into web socket streams.
// 促進(jìn) http流初始化這個(gè)socket流
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
// Prevent connection pooling!
// 防止連接共用
streamAllocation.noNewStreams();
//創(chuàng)建 Stream
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
看來一切的謎底都在 RealConnection的newWebSockerStreams里:
public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) {
return new RealWebSocket.Streams(true, source, sink) {
@Override public void close() throws IOException {
streamAllocation.streamFinished(true, streamAllocation.codec());
}
};
}
呵呵憾朴,看到真相我有點(diǎn)想放棄狸捕, new RealWebSocket.Streams(true, source, sink) sink就是這樣被賦予的,讓我回想一下众雷,RealConnection還是挺熟悉的灸拍,是在什么時(shí)候創(chuàng)建的呢?
今天先研究到這里我容我仔細(xì)研究一番砾省。鸡岗。。