Okhttp的5個內置攔截器可以說是Okhttp的核心,因為整個請求的過程都被封裝在這5個攔截器里面。而5個攔截器里面的核心就是這篇要分析的ConnectInterceptor
,因為ConnectInterceptor
才是真正發(fā)起請求黍析,建立連接地方
ConnectInterceptor
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//從攔截器鏈里面拿到流分配管理類StreamAllocation對象
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//通過流分配管理類創(chuàng)建一個流
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//通過通過流分配管理類建立連接
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
整個ConnectInterceptor
類的代碼也就這么幾行,但從這幾行中可以看出,真正創(chuàng)建流和建立連接的邏輯其實都在StreamAllocation
里面. StreamAllocation
對象是從RealInterceptorChain
獲取的. 通過之前的幾篇對Okhttp的源碼的學習福澡,我們知道攔截器的攔截方法intercept(Chain chain)
中的chain
最開始是在RealCall
的getResponseWithInterceptorChain()
中初始化的:
Response getResponseWithInterceptorChain() throws IOException {
........
//第二個參數(shù)就是StreamAllocation 類型,但是傳的是null
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
可以看到第一個攔截器鏈創(chuàng)建的時候驹马,StreamAllocation 傳的是null革砸,那么StreamAllocation 是什么時候賦值的呢,其實是在第一個攔截器RetryAndFollowUpInterceptor
的攔截方法里面賦值的:
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
.......
response = realChain.proceed(request, streamAllocation, null, null);
........
}
第一個攔截器給StreamAllocation
賦值后糯累,后面的攔截器中的攔截器鏈的StreamAllocation
就都是這個值. 前面說StreamAllocation
很重要算利,具體的建立連接過程,還有創(chuàng)建流都是在這個類里面泳姐,下面就看看StreamAllocation
StreamAllocation
先看一下StreamAllocation
的成員和構造函數(shù)
public final class StreamAllocation {
public final Address address; //請求的url地址
private RouteSelector.Selection routeSelection; //選擇的路由
private Route route;
private final ConnectionPool connectionPool; //連接池
public final Call call;
public final EventListener eventListener;
private final Object callStackTrace;
// State guarded by connectionPool.
private final RouteSelector routeSelector; //路由選擇器
private int refusedStreamCount; //拒絕的次數(shù)
private RealConnection connection; //連接
private boolean reportedAcquired;
private boolean released;
private boolean canceled;
private HttpCodec codec; //負責寫入請求數(shù)據或讀出響應數(shù)據的IO流
public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,
EventListener eventListener, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.call = call;
this.eventListener = eventListener;
this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener);
this.callStackTrace = callStackTrace;
}
....
}
從StreamAllocation
的成員和構造函數(shù)可以看到效拭,StreamAllocation
里面主要包含了連接池,連接胖秒,還有流. 為了更好理解這個類缎患,先屢一下基本概念:
- HTTP請求網絡的時候,首先需要通過一個Socket與服務端建立TCP連接阎肝,Socket中還需要有主機名host和端口號port
- 建立好連接后挤渔,就可以使用流在這個連接上向服務端寫入數(shù)據和讀取服務端返回的數(shù)據
- HTTP/1.1提出了Keep-Alive機制:當一個HTTP請求的數(shù)據傳輸結束后,TCP連接不立即釋放风题,如果此時有新的HTTP請求判导,且其請求的Host同上次請求相同嫉父,則可以直接復用未釋放的TCP連接,從而省去了TCP的釋放和再次創(chuàng)建的開銷眼刃,減少了網絡延時
- HTTP2.0的多路復用:允許同時通過單一的 HTTP/2 連接發(fā)起多重的請求-響應消息
OkHttp為了解耦熔号,對請求中的各個概念進行了封裝, RealConnection
就對應著請求中的連接鸟整,HttpCodec
對應流引镊,為了HTTP1.1的連接復用以及HTTP2.0的多路復用,就需要將請求連接保存下來篮条,以便復用弟头,所以就有了ConnectionPool
. 而為了執(zhí)行一次網絡請求,需要從連接池找到可用的的連接涉茧,然后創(chuàng)建流赴恨,所以就需要一個分配管理流的角色,這個角色就是StreamAllocation
StreamAllocation.newStream()
public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//尋找一個健康的連接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//通過找到的連接獲取流
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
調用了findHealthyConnection()
和resultConnection.newCodec()
伴栓,先看findHealthyConnection()
//StreamAllocation.findHealthyConnection()
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//找到可用的連接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
//successCount為0伦连,說明是新建立的連接,沒有用過钳垮,默認可用惑淳,直接返回
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
//如果找到的連接不健康,就不讓創(chuàng)建流饺窿,并關閉該連接歧焦,繼續(xù)找
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
又調用了findConnection
去查找連接
//StreamAllocation.findConnection()
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
//如果當前StreamAllocation持有的連接不為空
if (this.connection != null) {
// We had an already-allocated connection and it's good.
//將這個持有的連接賦值給result
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
//當前StreamAllocation持有的連接為空,reuslt在這里就會為空肚医,說明還沒找到可用連接
if (result == null) {
// Attempt to get a connection from the pool.
//從連接池中找一下绢馍,如果找到了會給持有的連接賦值
Internal.instance.get(connectionPool, address, this, null);
//如果從連接池中找到了可用的連接
if (connection != null) {
foundPooledConnection = true;
//賦值給result
result = connection;
} else {
//如果沒找到,將StreamAllocation持有的路由賦值給已找到的路由
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) { //result不為空意味著當前連接可以用肠套,或者從連接池中找到了可以復用的連接
// If we found an already-allocated or pooled connection, we're done.
return result; //直接返回
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
//如果沒找到路由舰涌,并且路由選擇區(qū)為空
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
//就切換路由
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
//遍歷路由選擇區(qū)的所有路由
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
//根據新的路由地址,再從連接池找一遍
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) { //如果找到了
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) { //如果還沒找到可用連接
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//就新建一個連接
result = new RealConnection(connectionPool, selectedRoute);
//關聯(lián)到流管理引用列表connection.allocations你稚,并用this.connection記錄當前連接
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
//與服務端建立TCP連接
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
//將新建的連接存進連接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
從源碼可以看到瓷耙,OkHttp尋找可用連接的過程如下:
1. 如果是重定向請求,就使用StreamAllocation持有的連接
releasedConnection = this.connection;
//如果當前連接不能創(chuàng)建新的流入宦,就釋放
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
//將StreamAllocation持有的連接賦值給result哺徊,表示就用這個持有的連接
result = this.connection;
//不釋放當前連接
releasedConnection = null;
}
StreamAllocation持有的連接this.connection
一開始肯定是為null,但是當從連接池中找到了可用的連接后乾闰,或者從連接池沒找到落追,新建一個連接后,StreamAllocation持有的連接this.connection
就不為null. 不為null的時候就把它賦值給 result
涯肩,表示就用這個持有的連接轿钠。
但我們知道在RetryAndFollowUpInterceptor
的攔截方法里面巢钓,StreamAllocation是新建的。每調用一次RetryAndFollowUpInterceptor
的攔截方法就會新建一個StreamAllocation
疗垛,也就是說每一次請求都會新建一個StreamAllocation
. 那么也就意味著每一次請求使用的連接根本不可能用到StreamAllocation
持有的連接this.connection
症汹,因為StreamAllocation 是新建的,this.connection
一直是null. 那么什么時候this.connection
才會不為null呢贷腕?其實只有在第一次請求背镇,服務端返回一個比如狀態(tài)碼為307這樣的需要重定向的響應的時候,并且重定向的Request的host泽裳、port瞒斩、scheme與之前一致時出現(xiàn)。在RetryAndFollowUpInterceptor
中涮总,如果響應為需要重定向胸囱,那么會再發(fā)起一次請求,第二次請求時瀑梗,使用的StreamAllocation
就是第一次創(chuàng)建的烹笔,這個時候就會用到這個StreamAllocation
持有的連接(不太明白可以去看下Okhttp源碼學習三(重試和重定向,橋接抛丽,緩存攔截器的內部原理))
2. 如果不是重定向請求谤职,就遍歷連接池中的所有連接,看是否有可復用的連接
if (result == null) { //result為null铺纽,意味著不是重定向請求
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
調用了Internal.instance.get(connectionPool, address, this, null)
從連接池中去找
public abstract class Internal {
public static void initializeInstanceForTests() {
// Needed in tests to ensure that the instance is actually pointing to something.
new OkHttpClient();
}
public static Internal instance;
public abstract void addLenient(Headers.Builder builder, String line);
.......
}
Internal
是一個抽象類柬帕,它的唯一實現(xiàn)是在 OkHttpClient
中:
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(Protocol.HTTP_2, Protocol.HTTP_1_1);
static final List<ConnectionSpec> DEFAULT_CONNECTION_SPECS = Util.immutableList(ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT);
static {
Internal.instance = new Internal() {
........
@Override public RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
//調用的是連接池ConnectionPool的get方法
return pool.get(address, streamAllocation, route);
}
.........
};
......
}
看一下ConnectionPool
的get()方法:
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
//變量連接池中的所有連接哟忍,connections是Deque類型
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
調用了 connection.isEligible(address, route)
來判斷是否可以復用
//RealConnection.isEligible()
public boolean isEligible(Address address, @Nullable Route route) {
//如果當前連接上的并發(fā)流數(shù)量超過最大值1狡门,或當前連接不能創(chuàng)建新的流,返回false
if (allocations.size() >= allocationLimit || noNewStreams) return false;
//如果兩個address除了host以外的所有域不相同锅很,返回false
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
//如果host也相同其馏,那么當前連接可以復用,直接返回
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
//http2連接為空爆安,直接返回false
if (http2Connection == null) return false;
if (route == null) return false;
//路由用到了代理叛复,返回false
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
//socket地址相同不同,返回false
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
判斷是否可以復用的條件大致就是(后面對HTTP2的復用條件判斷暫時沒看明白):
當前連接的流的數(shù)量要少于1個扔仓,請求地址的host相同
如果連接池中有符合上面的條件的連接褐奥,就調用streamAllocation.acquire(connection, true);
//StreamAllocation.acquire()
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
//給StreamAllocation持有的連接賦值
this.connection = connection;
this.reportedAcquired = reportedAcquired;
//將連接connection與StreamAllocation綁定
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
connection.allocations
是一個列表:
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
每一個連接對象RealConnection
都有一個列表,列表的元素類型是StreamAllocation
的弱引用翘簇,它用來記錄當前連接上建立的流撬码。因為每一次請求都會創(chuàng)建一個新的StreamAllocation
回到StreamAllocation.findConnection()
中:
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
//如果連接池中有復用的連接,connection就不為null
if (connection != null) {
foundPooledConnection = true;
//使用復用的連接
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
如果從連接池找到了可以復用的連接版保,直接返回這個復用的連接
3. 如果在連接池沒有找到可復用的連接呜笑,就切換路由夫否,再從連接池中找一次
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
//切換路由
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
關于RouteSelector
以及路由的選擇,切換叫胁,下篇再分析
4. 切換路由后凰慈,連接池中還是沒有找到可以復用的連接,就新建一個連接驼鹅,并將新建的connection和當前的StreamAllocation綁定
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//新建一個連接
result = new RealConnection(connectionPool, selectedRoute);
//綁定connection和StreamAllocation
acquire(result, false);
}