OkHttp的ConnectInterceptor連接攔截器剖析:http://www.reibang.com/p/f90aa5894cdf
連接池ConnectionPool用來(lái)管理和復(fù)用連接,但是在有限的時(shí)間內(nèi)復(fù)用鏈接的。
一進(jìn)來(lái)會(huì)創(chuàng)建一個(gè)線程池:
/**
* 后臺(tái)線程用于清除過(guò)期的連接
*
* 每個(gè)連接池最多只能運(yùn)行一個(gè)線程
*
* 線程池執(zhí)行器允許對(duì)池本身進(jìn)行垃圾收集
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
上面的注釋是從官方注釋翻譯過(guò)來(lái)的說(shuō)每個(gè)連接池最多只能運(yùn)行一個(gè)線程趣兄,這個(gè)是怎么執(zhí)行的呢
來(lái)看這段代碼:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
// 沒(méi)有任何連接時(shí),cleanupRunning = false;
// 即沒(méi)有任何鏈接時(shí)才會(huì)去執(zhí)行executor.execute(cleanupRunnable);
// 從而保證每個(gè)連接池最多只能運(yùn)行一個(gè)線程妹蔽。
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable); // 做異步處理任務(wù)
}
connections.add(connection); // 將連接加入到雙端隊(duì)列
}
put方法是連接池中的存儲(chǔ)方法跳纳,在ConnectInterceptor-->intercept-->streamAllocation.newStream-->findHealthyConnection-->創(chuàng)建新鏈接后調(diào)用茶敏,進(jìn)行存儲(chǔ)健康的連接窟坐。
什么是雙端隊(duì)列呢叉抡?
// 連接池中維護(hù)了一個(gè)雙端隊(duì)列Deque來(lái)存儲(chǔ)連接
private final Deque<RealConnection> connections = new ArrayDeque<>();
接下來(lái)看一下它的成員變量和構(gòu)造函數(shù):
/** 每個(gè)地址的最大空閑連接數(shù). */
private final int maxIdleConnections; // 默認(rèn) 5
// 每個(gè)keep-alive時(shí)長(zhǎng)為5分鐘
private final long keepAliveDurationNs; // 默認(rèn)5分鐘
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
* TODO 連接池最多保持5個(gè)地址的連接keep-alive猬膨,每個(gè)keep-alive時(shí)長(zhǎng)為5分鐘
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
// 在保持活躍狀態(tài)的持續(xù)時(shí)間內(nèi)放置任務(wù)角撞,否則將循環(huán)清理
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
連接池的get方法:
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists.
* 返回連接,如果不存在此類連接,則返回空值
* The route is null if the address has not yet been routed.
* 如果地址尚未路由靴寂,則路由為空
*/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) { // 判斷連接是否可用
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
連接池的清除原理:
在類中創(chuàng)建了一個(gè)Runnable:
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime()); // cleanup方法里面就是具體的GC回收算法磷蜀,類似于GC的標(biāo)記清除算法
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
在put的時(shí)候回執(zhí)行這個(gè)線程。
這個(gè)Runnable的潤(rùn)方法中執(zhí)行了cleanup方法百炬,這個(gè)方法就是具體的回收方法褐隆,里面主要使用標(biāo)記清除算法。
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
* 對(duì)該池執(zhí)行維護(hù)剖踊,如果已超過(guò)保持活動(dòng)狀態(tài)限制或空閑連接限制庶弃,則清除空閑時(shí)間最長(zhǎng)的連接
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method.
* 返回在下次計(jì)劃調(diào)用此方法之前休眠的持續(xù)時(shí)間(納秒)
*
* Returns -1 if no further cleanups are required.
* 如果不需要進(jìn)一步清理,則返回-1
*
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
// 遍歷隊(duì)列當(dāng)中所有的RealConnection集合德澈,去標(biāo)記泄露或者不活躍的連接
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 如果連接正在使用中歇攻,請(qǐng)繼續(xù)搜索
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 如果連接準(zhǔn)備好被收回,標(biāo)記為空閑連接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
// 如果被標(biāo)記的連接滿足空閑的socekt連接超過(guò)5個(gè)
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
// 如果空閑連接超過(guò)5個(gè)或者keepalive時(shí)間大于5分鐘梆造,則將該連接清理掉缴守,然后在下面關(guān)閉它(同步塊外部)
connections.remove(longestIdleConnection); // 這時(shí)候就會(huì)把連接從集合中移除并關(guān)閉
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs; // 返回此連接的到期時(shí)間,供下次進(jìn)行清理
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs; // 全部都是活躍連接镇辉,5分鐘時(shí)候再進(jìn)行清理
} else { // 沒(méi)有連接
// No connections, idle or in use.
cleanupRunning = false;
return -1; // 返回-1 跳出循環(huán)
}
}
// 關(guān)閉連接屡穗,返回時(shí)間0,立即再次進(jìn)行清理
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
// 立即再次清理
return 0;
}
如何找到最不活躍的鏈接呢忽肛?
在RealConnection里有個(gè)StreamAllocation虛引用列表村砂,每創(chuàng)建一個(gè)StreamAllocation,就會(huì)把它添加進(jìn)該列表中屹逛,如果留關(guān)閉以后就將StreamAllocation對(duì)象從該列表中移除础废,正是利用這種引用計(jì)數(shù)的方式判定一個(gè)連接是否為空閑連接.
public final class RealConnection extends Http2Connection.Listener implements Connection {
/**
* Current streams carried by this connection.
* 由此連接攜帶的當(dāng)前流。
*/
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
}
pruneAndGetAllocationCount方法:
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on {@code connection}
* 刪除任何泄漏的分配罕模,然后返回@code connection上剩余的活動(dòng)分配數(shù)
*
* Allocations are leaked if the connection is tracking them but the application code has abandoned them.
* 如果連接正在跟蹤分配评腺,但應(yīng)用程序代碼已放棄分配,則會(huì)泄漏分配
*
* Leak detection is imprecise and relies on garbage collection.
* 泄漏檢測(cè)不精確手销,依賴于垃圾收集歇僧。
*
* 如何找到最不活躍的鏈接呢
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// 虛引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍歷虛引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
// 如果虛引用StreamAllocation正在被使用图张,則跳過(guò)進(jìn)行下一次循環(huán)
if (reference.get() != null) {
i++; // 引用計(jì)數(shù)
continue;
}
// We've discovered a leaked allocation. This is an application bug.
// 我們發(fā)現(xiàn)了一個(gè)泄露的分配锋拖。這是一個(gè)應(yīng)用程序bug
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
// 如果所有的StreamAllocation引用都沒(méi)有了,返回引用計(jì)數(shù)0
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0; // 表示這個(gè)連接沒(méi)有代碼引用了
}
}
return references.size(); // 返回剩余的活動(dòng)分配數(shù) (返回引用列表的大小祸轮,作為引用計(jì)數(shù))
}
以上就是復(fù)用連接池的核心代碼兽埃。
OkHttp的CallServerInterceptor請(qǐng)求服務(wù)器攔截器剖析:http://www.reibang.com/p/9e402c33b322