okhttp3.6.0源碼分析系列文章整體內(nèi)容如下:
前言
在開(kāi)始本章之前有三個(gè)概念要明確一個(gè):
1.connection: socket連接水孩,具體實(shí)現(xiàn)類(lèi)是RealConnection
2.stream: okhttp里面的stream指的是在connection上層的http請(qǐng)求/響應(yīng)數(shù)據(jù)流压恒。它對(duì)應(yīng)的類(lèi)是HttpCodec
3.StreamAllocation:負(fù)責(zé)提供一個(gè)connection(如果有就復(fù)用粤咪,沒(méi)有就會(huì)創(chuàng)建)冷溶,和一個(gè)stream(根據(jù)Call組裝成一個(gè)httpCodec對(duì)象)混稽,并讓stream通過(guò)該connection進(jìn)行傳輸杂彭。
正文
到攔截器為止双揪,都是在介紹okhttp如何構(gòu)造請(qǐng)求報(bào)文和處理響應(yīng)報(bào)文县耽,都是對(duì)http協(xié)議的具體實(shí)現(xiàn)句喷。本章關(guān)注的內(nèi)容則是數(shù)據(jù)是怎么調(diào)用socket進(jìn)行傳輸?shù)摹?/p>
http協(xié)議是無(wú)狀態(tài)的镣典,要進(jìn)行一個(gè)http請(qǐng)求,就要先進(jìn)行tcp握手唾琼,然后傳輸數(shù)據(jù)兄春,最后釋放。
本文主要是分析RealConnection是如何被使用的锡溯。
RealConnection是對(duì)數(shù)據(jù)傳輸連接的一個(gè)抽象赶舆。
通過(guò)跟蹤第一次建立socket連接的流程,來(lái)分析RealConnection 的使用祭饭。
在執(zhí)行ConnectInterceptor的intercept的時(shí)候有兩個(gè)非常重要的步驟:
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
- 獲取httpCodec實(shí)例芜茵,如果沒(méi)有socket連接則創(chuàng)建一個(gè)
- 獲取一個(gè)RealConnection,僅僅是保存一個(gè)RealConnection 的引用甜癞,開(kāi)啟連接的工作已經(jīng)在新建HttpCodec的時(shí)候完成了夕晓。
具體看下streamAllocation.newStream的方法:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
①RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
②HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
可以看到HttpCodec對(duì)象是通過(guò)RealConnection實(shí)例來(lái)構(gòu)建的。
深入到①方法里面悠咱,看下它是怎么獲取一個(gè)realConnection對(duì)象的:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
③ RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
具體工作是由③完成的蒸辆,進(jìn)入findConnection方法內(nèi)部:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
findConnection里面主要有六步:
- 嘗試返回已經(jīng)和這個(gè)allocatedConnection對(duì)象關(guān)聯(lián)的連接
- 如果沒(méi)有,嘗試從連接池里面取出一個(gè)
- 如果還沒(méi)有析既,設(shè)置一個(gè)新的路由躬贡,然后再次嘗試從連接池里取一個(gè)
- 如果依舊沒(méi)有,則創(chuàng)建一個(gè)RealConnection眼坏,然后讓他被當(dāng)前的streamAllocation引用拂玻,并記錄引用次數(shù)
- 建立socket傳輸連接
- 將新建的RealConnection對(duì)象放入連接池里面
到這里我們就有一個(gè)連接了,然后構(gòu)建HttpCodec對(duì)象宰译,具體方法:
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
該方法有三步:
- 設(shè)置socket超時(shí)時(shí)間
- 數(shù)據(jù)讀取超時(shí)時(shí)間
- 數(shù)據(jù)寫(xiě)入超時(shí)時(shí)間
現(xiàn)在ConnectInterceptor的工作已經(jīng)完成了檐蚜。
我們?cè)倩仡^看一下RealConnection的socket傳輸是怎樣創(chuàng)建的。
如果是https沿侈,還會(huì)先對(duì)傳輸?shù)膬?nèi)容進(jìn)行加密/解密
public void connect(
int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
//建立socket連接
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
如果用的是http協(xié)議闯第,那么該方法只做一件事就是調(diào)用connectSocket(connectTimeout, readTimeout);建立一個(gè)socket連接。
進(jìn)入connectSocket內(nèi)部看下:
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
調(diào)用connectSocket建立socket連接的步驟:
- 獲取socket實(shí)例缀拭,我們使用的是http協(xié)議咳短,那就會(huì)從調(diào)用socket的工廠方法創(chuàng)建一個(gè)socket。
- 嘗試和服務(wù)端建立socket連接
- 設(shè)置可以往數(shù)據(jù)傳輸流里讀寫(xiě)數(shù)據(jù)的對(duì)象
(完)
參考: