上一篇攔截器分析中,在ConnectInterceptor的intercept方法中鸥跟,有這樣一句代碼來(lái)獲得stream丢郊。
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
點(diǎn)streamAllocation進(jìn)去之后看到newStream方法中調(diào)用了findHealthyConnection方法,實(shí)現(xiàn)如下:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
// 這一句代碼是實(shí)際獲得連接的
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
如果是新的連接就跳過(guò)健康檢查医咨,如果不是就查一下是否已經(jīng)斷開(kāi)啦枫匾,輸入輸出是否關(guān)閉啦。再進(jìn)去看findConnection方法
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
// 在pool中取連接的話拟淮,就需要拿這個(gè)pool做同步鎖干茉,如果是第一次發(fā)起請(qǐng)求應(yīng)該是拿不到的,會(huì)走到下面
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
// 如果allocatedConnection 不為空并且連接池還沒(méi)滿很泊,就直接使用這個(gè)連接
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
// 調(diào)用了okhttpclient的get方法角虫,從connectionPool中根據(jù)address拿到連接。
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
// 第一次進(jìn)來(lái)就是空
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
// 這里面先是查找內(nèi)存緩存委造,根據(jù)proxies的類(lèi)型在routeSelector的集合inetSocketAddresses中查找戳鹅,沒(méi)有的話就重設(shè)一個(gè)
// 調(diào)用address.dns().lookup(socketHost)方法,通過(guò)DNS服務(wù)器查詢(xún)返回一組ip地址(一個(gè)域名可能對(duì)應(yīng)多個(gè)ip地址昏兆,可用于自動(dòng)重連)
// 最后將得到的address 加入集合inetSocketAddresses中緩存起來(lái)粉楚。
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
// 再進(jìn)行一次嘗試,從連接池中拿連接
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) {
route = selectedRoute;
return connection;
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 新搞一個(gè)連接
result = new RealConnection(connectionPool, selectedRoute);
// 將connection的引用交給streamAllocation亮垫,將streamAllocation的弱引用加入到connection的allocations集合中
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 這里調(diào)用RealConnection的connect方法模软,傳入ip 端口號(hào)進(jìn)行connect,RealConnection內(nèi)部的source和sink就是在這個(gè)方法中賦值的饮潦。
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
// connectionPool中維護(hù)了一個(gè)鍵值對(duì)燃异,里面存了所有連接失敗的route,每個(gè)連接失敗的route都加入進(jìn)去继蜡。
// 而這句話是把連接成功的從route黑名單中去除掉回俐。
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
// 用okhttpclient的代碼把當(dāng)前的連接放入連接池中逛腿,這種麻煩的寫(xiě)法估計(jì)是跟設(shè)計(jì)模式有關(guān)系
// 注意放進(jìn)去的同時(shí)會(huì)觸發(fā)清理
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 處理多線程產(chǎn)生的問(wèn)題,如果產(chǎn)生了多個(gè)connection就release掉當(dāng)前的仅颇,用另一個(gè)線程創(chuàng)建的connection
// 并且關(guān)閉掉多余的socket
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
代碼很長(zhǎng)单默,我看到的是:
RouteSelector做準(zhǔn)備;
ConnectionPool管理連接忘瓦;
RealConnection做具體執(zhí)行搁廓;
以StreamAllocation為中心協(xié)調(diào)各個(gè)類(lèi);
將RealConnection生成的Http1Codec和Http2Codec這種面向協(xié)議(設(shè)置請(qǐng)求頭讀取回復(fù))進(jìn)行sink和read的類(lèi) 傳遞到攔截器中耕皮。
代碼執(zhí)行步驟大致分三部分:
- 獲得route境蜕、ip、port那些鬼凌停,由RouteSelector這個(gè)類(lèi)完成粱年,拿到address之后再?lài)L試連接池中拿connection。
- 實(shí)在從連接池中拿不到了罚拟,就新建connection台诗,用raw socket進(jìn)行三握手那些鬼,由RealConnection這個(gè)類(lèi)完成赐俗,拿到source和sink拉队;
讓connection和streamAllocation相互引用(一個(gè)強(qiáng)引用一個(gè)弱引用),連接池里面有一個(gè)ArrayDeque來(lái)記錄所有的socket連接秃励。將新的connection放入連接池氏仗,觸發(fā)清理;
將route從黑名單移除夺鲜。 - 檢查是否有多線程導(dǎo)致的問(wèn)題皆尔,如果有,就釋放當(dāng)前連接币励,用別的線程創(chuàng)建的連接慷蠕。
上面說(shuō)將連接加入連接池時(shí)會(huì)觸發(fā)清理操作,下面貼上代碼詳細(xì)說(shuō)明是如何清理的食呻。
在connectionPool中流炕,有個(gè)cleanup方法來(lái)執(zhí)行清理操作
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 這里具體執(zhí)行streamAllocation的清理,具體代碼在下面
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 如果說(shuō):閑置的時(shí)間超過(guò)了設(shè)定值仅胞,或者最大限制連接數(shù)超過(guò)設(shè)定值每辟,就把connection從連接池中移除,并關(guān)掉connection干旧。
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
// 根據(jù)閑置時(shí)間和閑置連接數(shù)渠欺,還有可能立刻執(zhí)行下一次清理
return 0;
}
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
//遍歷每一個(gè)connection的streamAllocation弱引用集合,發(fā)現(xiàn)弱引用已被回收椎眯,就將其在弱引用集合中移除
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
// 打印警告挠将,告知程序員胳岂,你的使用有問(wèn)題
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
// 如果所有的弱引用都被移除掉了,說(shuō)明這個(gè)connection是閑置的舔稀,記錄閑置的時(shí)間乳丰。將閑置最久的connection記錄下來(lái)。
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
總結(jié)一下内贮,清理策略就是:
- connection自身記錄streamAllocation的連接數(shù)产园,達(dá)到0的時(shí)候就標(biāo)記自己為閑置連接,記錄閑置時(shí)間等待清理
- 滿足閑置時(shí)間太長(zhǎng)或者閑置連接太多時(shí)贺归,ConnectionPool就執(zhí)行清理操作關(guān)掉連接(默認(rèn)空閑的socket最大連接數(shù)為5個(gè)淆两,socket的keepAlive時(shí)間為5秒)断箫。
- 正常情況下ConnectionPool會(huì)每隔一段時(shí)間就嘗試清理一次拂酣。看連接使用情況仲义,忙的話就一直嘗試清理婶熬,閑的時(shí)候加入任務(wù)也會(huì)觸發(fā)清理。
okhttp對(duì)socket的直接管理還是通過(guò)ConnectionPool來(lái)實(shí)現(xiàn)的埃撵。
回顧一下前面的攔截器的知識(shí)赵颅,結(jié)合一下:
- 在RealInterceptorChain中有一個(gè)streamAllocation成員變量
- 在RetryAndFollowUpInterceptor中初始化streamAllocation傳到RealInterceptorChain中,此時(shí)還是沒(méi)有任何連接和這個(gè)streamAllocation綁定的
- 到了ConnectInterceptor中暂刘,調(diào)用streamAllocation的newStream方法饺谬,內(nèi)部調(diào)用findConnection方法,獲得連接
- 連接的獲得是先嘗試從連接池中取谣拣,取不到就初始化一個(gè)連接募寨,將streamAllocation弱引用給connection(此時(shí)connection可能已經(jīng)有很多streamAllocation在用了),同時(shí)在連接池中嘗試清理森缠。
- 拿到連接之后拔鹰,返回給ConnectInterceptor一個(gè)HttpCodec,這是一個(gè)接口的實(shí)現(xiàn)類(lèi)贵涵,根據(jù)http協(xié)議是 1.x 還是2 內(nèi)部有不同的實(shí)現(xiàn)
- 回到CallServerInterceptor中列肢,拿HttpCodec來(lái)執(zhí)行寫(xiě)入請(qǐng)求頭、讀取返回信息宾茂、構(gòu)造responseBody等瓷马。