在《OKHTTP3源碼和設(shè)計模式-1》浊闪,中整體介紹了 OKHttp3 的源碼架構(gòu),重點講解了請求任務的分發(fā)管理和線程池以及請求執(zhí)行過程中的攔截器概荷。這一章我們接著往下走認識一下 OKHttp3 底層連接和連接池工作機制秕岛。
RealCall 封裝了請求過程, 組織了用戶和內(nèi)置攔截器误证,其中內(nèi)置攔截器 retryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor 完執(zhí)行層的大部分邏輯 继薛,ConnectInterceptor -> CallServerInterceptor 兩個攔截器開始邁向連接層最終完成網(wǎng)絡(luò)請求。
連接層連接器
ConnectInterceptor 的工作很簡單愈捅, 負責打開連接; CallServerIntercerceptor 是核心連接器鏈上的最后一個連接器遏考,
負責從當前連接中寫入和讀取數(shù)據(jù)。
連接的打開
/** Opens a connection to the target server and proceeds to the next interceptor. */
// 打開一個和目標服務器的連接蓝谨,并把處理交個下一個攔截器
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public
Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 打開連接
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
// 交個下一個攔截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
單獨看 ConnectInterceptor 的代碼很簡單诈皿,不過連接正在打開的過程需要看看 streamAllocation.newStream(client, doExtensiveHealthChecks),內(nèi)部執(zhí)行過程像棘。還是先整體上了看看 StreamAllocation 這個類的作用稽亏。
StreamAllocation
StreamAllocation 處于上層請求和底層連接池直接 , 協(xié)調(diào)請求和連接池直接的關(guān)系缕题。先來看看 StreamAllocation 對象在哪里創(chuàng)建的? 回到之前文章中介紹的 RetryAndFollowUpInterceptor截歉, 這是核心攔截器鏈上的頂層攔截器其中源碼:
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
...省略代碼
}
這里, 每一次請求創(chuàng)建了一個 StreamAllocation 對象烟零, 那么問題來了瘪松? 之前我們說過每一個 OkHttpClient 對象只有一個對應的連接池咸作, 剛剛又說到 StreamAllocation 打開連接, 那么 StreamAllocation 是如何創(chuàng)建連接池的呢宵睦?我們很容易就去 StreamAllocation 中找連接池創(chuàng)建的邏輯记罚,但是找不到。 連接池創(chuàng)建的地方在 OkHttpClient 中:
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
// 創(chuàng)建連接池
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
OkHttpClient 默認構(gòu)造函數(shù)的 Builder 壳嚎, 在這里創(chuàng)建了連接池桐智。所以這里我們也可以看到, 如果我們對默認連接池不滿烟馅,我們是可以直通過 builder 接指定的说庭。
搞懂了 StreamAllocation 和 ConnectionPool 的創(chuàng)建 , 我們再來看看 StreamAllocation 是怎么打開連接的郑趁?直接兜源碼可能有點繞 刊驴,先給一個粗略流程圖,然后逐點分析寡润。
鏈接池實現(xiàn)
相信大家都有一些 Http 協(xié)議的基礎(chǔ)(如果沒有就去補了捆憎,不然看不懂)都知道 Http 的下層協(xié)議是 TCP。TCP 連接的創(chuàng)建和斷開是有性能開銷的梭纹,在 Http1.0 中攻礼,每一次請求就打開一個連接,在一些老的舊的瀏覽器上栗柒,如果還是基于 Http1.0礁扮,體驗會非常差; Http1.1 以后支持長連接, 運行一個請求打開連接完成請求后瞬沦, 連接可以不關(guān)閉太伊, 下次請求時復用此連接,從而提高連接的利用率逛钻。當然并不是連接打開后一直開著不關(guān)僚焦,這樣又會造成連接浪費,怎么管理曙痘?
在OKHttp3 的默認實現(xiàn)中芳悲,使用一個雙向隊列來緩存所有連接, 這些連接中最空閑時間已經(jīng)超過了keep-alive指定的時間就要移除了边坤。
定期清理實現(xiàn)
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
// 后臺定期清理連接的線程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
// 后臺定期清理連接的任務
private final Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {
while (true) {
// cleanup 執(zhí)行清理
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
雙向隊列
// 存儲連接的雙向隊列
private final Deque<RealConnection> connections = new ArrayDeque<>();
放入連接
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
獲取連接
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
StreamAllocation.連接創(chuàng)建和復用
ConnectionPool 的源碼邏輯還是相當比較簡單名扛, 主要提供一個雙向列表來存取連接, 使用一個定時任務定期清理無用連接茧痒。 二連接的創(chuàng)建和復用邏輯主要在 StreamAllocation 中肮韧。
尋找連接
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
// 核心邏輯在 findConnection()中
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
findConnection():
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
// 省略部分代碼...
// Attempt to get a connection from the pool. Internal.instance 就是 ConnectionPool 的實例
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
// 復用此連接
return connection;
}
// 省略部分代碼...
// 創(chuàng)建新新連接
result = new RealConnection(connectionPool, selectedRoute);
// 引用計數(shù)
acquire(result);
}
synchronized (connectionPool) {
// Pool the connection. 放入連接池
Internal.instance.put(connectionPool, result);
}
// 省略部分代碼...
return result;
}
StreamAllocation 主要是為上層提供一個連接, 如果連接池中有復用的連接則復用連接, 如果沒有則創(chuàng)建新的弄企。無論是拿到可復用的還是創(chuàng)建新的超燃, 都要為此連接計算一下引用計數(shù)。
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
// 連接使用allocations列表來記錄每一個引用
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
Realconnection
Realconnection 封裝了底層 socket 連接拘领, 同時使用 OKio 來進行數(shù)據(jù)讀寫意乓, OKio 是 square 公司的另一個獨立的開源項目, 大家感興趣可以去深入讀下 OKio 源碼约素, 這里不展開届良。
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 打開 socket 連接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// 使用 OKil 連上 socket 后續(xù)讀寫使用 Okio
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}