一丸升、ConnectInterceptor攔截器
ConnectInterceptor攔截器,intercept()方法牺氨,創(chuàng)建Network鏈路狡耻。
每一次RealCall請求類,分配一個StreamAllocation對象猴凹,負責RealConnection管理夷狰,連接池操作,HttpCodec郊霎,網(wǎng)絡鏈路的分配沼头、取消、釋放书劝,RealConnection可復用进倍。
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();//從Chain中獲取Request
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//處理返回Response
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
RetryAndFollowUpInterceptor攔截器(位于攔截器列表第二項目),intercept()方法庄撮,創(chuàng)建StreamAllocation對象背捌,每次遞歸將引用傳給下一個新建Chain。
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
到達ConnectInterceptor攔截器時洞斯,從Chain中取出毡庆,StreamAllocation類的newStream()方法,創(chuàng)建/查找鏈路烙如。
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
...
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout,
readTimeout,writeTimeout, connectionRetryEnabled,
doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
}
}
findConnection()方法么抗,通過策略尋找一個真正的RealConnection連接,該連接和Server建立個Socket連接亚铁,RealConnection類初始化HttpCodec蝇刀。
public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation)
throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
若采用Protocol.HTTP_2協(xié)議,即Http2Connection存在徘溢,創(chuàng)建Http2Codec吞琐,否則捆探,創(chuàng)建Http1Codec,封裝BufferedSource和BufferedSink站粟。
這些新初始化的對象StreamAllocation黍图、HttpCodec、RealConnection奴烙,一起傳遞給Chain節(jié)點助被,下一個攔截器使用。
二切诀、分配策略
StreamAllocation類findConnection()方法揩环。
private RealConnection findConnection(int connectTimeout, int readTimeout,
int writeTimeout,boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
//先使用StreamAllocation內(nèi)部保存的RealConnection
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
//連接池中尋找
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
...
RealConnection result;
synchronized (connectionPool) {
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
//創(chuàng)建RealConnection
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
//socket連接
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
//入池
Internal.instance.put(connectionPool, result);
...
}
return result;
}
1,StreamAllocation內(nèi)部RealConnection連接幅虑。
2丰滑,從ConnectionPool連接池查找。
3翘单,創(chuàng)建RealConnection對象吨枉,保存在StreamAllocation內(nèi)部蹦渣,入池哄芜。
OkHttpClient類靜態(tài)代碼段初始化一個Internal對象。
static {
Internal.instance = new Internal() {
@Override public
RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
@Override
public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
....//其他方法.
};
}
ConnectionPool類get()方法柬唯,查詢連接池中的RealConnection认臊。
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
遍歷連接池每項RealConnection,當Address相同且StreamAllocationReference數(shù)量小于限制锄奢,說明是可用連接失晴。
RealConnection內(nèi)部引用StreamAllocation類型弱引用列表,allocationLimit變量限制StreamAllocation弱引用數(shù)量拘央。
public void acquire(RealConnection connection) {
..
this.connection = connection;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
將可用RealConnection設置成StreamAllocation內(nèi)部connection涂屁,同時將StreamAllocation加入RealConnection內(nèi)部弱引用列表。
達到限制時灰伟,不能再被StreamAllocation使用拆又,新建鏈路,新RealConnection同樣賦值到StreamAllocation內(nèi)部栏账,將該StreamAllocation加入弱引用列表帖族,最后put連接池。
RealConnection代表一個真正的鏈路挡爵,封裝BufferedSource竖般、BufferedSink、路由茶鹃、socket涣雕、協(xié)議艰亮、Handshake握手信息。
若是新鏈接挣郭,進行Socket連接垃杖,connect()方法,connectSocket()方法建立連接丈屹。
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//創(chuàng)建Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
創(chuàng)建一個Socket调俘,Platform#connectSocket方法,連接旺垒。
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException {
socket.connect(address, connectTimeout);
}
成功彩库,和Server建立一個Socket連接,失敗先蒋,拋出異常骇钦。
三、連接清理
ConnectionPool連接池管理所有Socket連接竞漾,當有新的請求時眯搭,從池中分配一個鏈路。
ArrayDeque雙向隊列业岁,線性連續(xù)空間鳞仙,雙向開口,在頭尾兩端插入刪除高效笔时,同時具有隊列和棧性質棍好,緩存常用。
默認支持5個并發(fā)keepalive允耿,鏈路生命為5分鐘借笙,即鏈路數(shù)據(jù)傳輸完成,可保持5分鐘的存活時間较锡。
自動清除線程业稼,將查找超過5分鐘的鏈路,關閉socket蚂蕴。
ConnectionPool的get()/put()操作方法低散。
void put(RealConnection connection) {
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);//執(zhí)行清理任務
}
connections.add(connection);//加入連接池隊列
}
線程池執(zhí)行cleanupRunnable清理任務,設置cleanupRunning標志位掂墓。實質上是一個阻塞的清理任務谦纱。若while一直運行,下次put()將不會觸發(fā)君编。
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
waitNanos等待下次清理的間隔時間跨嘉,-1表示不需要再次清理,退出循環(huán)吃嘿,0表示立即再次清理祠乃,wait()方法等待梦重,釋放鎖與時間片。
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// 是否在使用
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//找空閑最長的
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//決定是否清理
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//刪除連接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
return keepAliveDurationNs;
} else {
cleanupRunning = false;
return -1;
}
}
//關閉Socket
closeQuietly(longestIdleConnection.socket());
return 0;
}
遍歷連接亮瓷,pruneAndGetAllocationCount()方法查看連接是否在使用琴拧。inUseConnectionCount,正在使用數(shù)量嘱支,自增蚓胸,idleConnectionCount,空閑數(shù)量除师,未使用自增沛膳。
idleDurationNs,空閑連接汛聚,計算已空閑時間锹安,查找到空閑時間最長的。
清理算法
1倚舀,longestIdleDurationNs(最長空閑時間)叹哭,大于5min或空閑數(shù)量超過5個(默認),將對應longestIdleConnection連接刪除痕貌,返回0风罩,下一次立即清理。
2芯侥,不足5min泊交,空閑連接數(shù)量<5,返回timeout(距離5min還有多久)柱查,線程阻塞timeout后,再次清理云石。
3唉工,空閑連接=0,且正使用連接>0汹忠,返回5min淋硝,最長等待時間。
4宽菜,空閑和正使用連接都0谣膳,返回-1,不清理铅乡,線程結束继谚。設置cleanupRunning標志位,下次put()方法重新喚起cleanupRunnable任務阵幸。
清理過程
從連接池隊列ArrayDeque中刪除該項連接花履。
closeQuietly()方法關閉Socket芽世。
using連接判斷
遍歷RealConnection內(nèi)部Reference<StreamAllocation>列表
reference.get()不空,說明有StreamAllocation正引用RealConnection诡壁。
reference.get()不存在济瓢,說明StreamAllocation已被Jvm清理,同時妹卿,從references列表中刪除該項reference旺矾。
若references列表是空,說明references弱引用都是空夺克,沒有StreamAllocation使用該連接宠漩。
四、總結
ConnectInterceptor攔截器負責Network連接懊直。
每一個RealCall請求對應一個StreamAllocation扒吁。
真正的連接RealConnection復用。
連接池采用ArrayDeque雙向隊列數(shù)據(jù)結構室囊。
連接清理任務雕崩。
任重而道遠