- 1.OkHttp源碼解析(一):OKHttp初階
- 2 OkHttp源碼解析(二):OkHttp連接的"前戲"——HTTP的那些事
- 3 OkHttp源碼解析(三):OKHttp中階之線程池和消息隊列
- 4 OkHttp源碼解析(四):OKHttp中階之攔截器及調(diào)用鏈
- 5 OkHttp源碼解析(五):OKHttp中階之OKio簡介
- 6 OkHttp源碼解析(六):OKHttp中階之緩存基礎
- 7 OkHttp源碼解析(七):OKHttp中階之緩存機制
- 8 OkHttp源碼解析(八):OKHttp中階之連接與請求值前奏
- 9 OkHttp源碼解析(九):OKHTTP連接中三個"核心"RealConnection蜡镶、ConnectionPool雾袱、StreamAllocation
- 10 OkHttp源碼解析(十) OKHTTP中連接與請求
- 11 OkHttp的感謝
本片文章終于講到OKHTTP中的核心了,復用連接池官还,本片文章的順序是
- 1芹橡、RealConnection類
- 2、ConnectionPool類
- 3望伦、StreamAllocation類
一林说、RealConnection
RealConnection是Connection的實現(xiàn)類,代表著鏈接socket的鏈路屯伞,如果擁有了一個RealConnection就代表了我們已經(jīng)跟服務器有了一條通信鏈路述么,而且通過
RealConnection代表是連接socket鏈路,RealConnection對象意味著我們已經(jīng)跟服務端有了一條通信鏈路了愕掏。很多朋友這時候會想到度秘,有通信鏈路了,是不是與意味著在這個類實現(xiàn)的三次握手饵撑,你們猜對了剑梳,的確是在這個類里面實現(xiàn)的三次握手。在講握手的之前滑潘,看下它的屬性和構造函數(shù)垢乙,對他有個大概的了解。
private final ConnectionPool connectionPool;
private final Route route;
// The fields below are initialized by connect() and never reassigned.
//下面這些字段语卤,通過connect()方法開始初始化追逮,并且絕對不會再次賦值
/** The low-level TCP socket. */
private Socket rawSocket; //底層socket
/**
* The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
* {@link #rawSocket} itself if this connection does not use SSL.
*/
private Socket socket; //應用層socket
//握手
private Handshake handshake;
//協(xié)議
private Protocol protocol;
// http2的鏈接
private Http2Connection http2Connection;
//通過source和sink酪刀,大家可以猜到是與服務器交互的輸入輸出流
private BufferedSource source;
private BufferedSink sink;
// The fields below track connection state and are guarded by connectionPool.
//下面這個字段是 屬于表示鏈接狀態(tài)的字段,并且有connectPool統(tǒng)一管理
/** If true, no new streams can be created on this connection. Once true this is always true. */
//如果noNewStreams被設為true钮孵,則noNewStreams一直為true骂倘,不會被改變,并且表示這個鏈接不會再創(chuàng)新的stream流
public boolean noNewStreams;
//成功的次數(shù)
public int successCount;
/**
* The maximum number of concurrent streams that can be carried by this connection. If {@code
* allocations.size() < allocationLimit} then new streams can be created on this connection.
*/
//此鏈接可以承載最大并發(fā)流的限制巴席,如果不超過限制历涝,可以隨意增加
public int allocationLimit = 1;
通過上面代碼,我們可以得出以下結論:
1漾唉、里面除了route 字段荧库,部分的字段都是在connect()方法里面賦值的,并且不會再次賦值
2赵刑、這里含有source和sink分衫,所以可以以流的形式對服務器進行交互
3、noNewStream可以簡單理解為它表示該連接不可用般此。這個值一旦被設為true,則這個conncetion則不會再創(chuàng)建stream丐箩。
4、allocationLimit是分配流的數(shù)量上限恤煞,一個connection最大只能支持一個1并發(fā)
5屎勘、allocations是關聯(lián)StreamAllocation,它用來統(tǒng)計在一個連接上建立了哪些流,通過StreamAllocation的acquire方法和release方法可以將一個allcation對方添加到鏈表或者移除鏈表居扒,
其實大家估計已經(jīng)猜到了connect()里面進行了三次握手概漱,大家也猜對了,那咱們就簡單的介紹下connect()方法:
public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
// 線路的選擇
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
// 連接開始
while (true) {
try {
// 如果要求隧道模式喜喂,建立通道連接瓤摧,通常不是這種
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
// 一般都走這條邏輯了,實際上很簡單就是socket的連接
connectSocket(connectTimeout, readTimeout);
}
// https的建立
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
這里的執(zhí)行過程大體如下玉吁;
- 1照弥、檢查連接是否已經(jīng)建立,若已經(jīng)建立进副,則拋出異常这揣,否則繼續(xù),連接的是否簡歷由protocol標示影斑,它表示在整個連接建立给赞,及可能的協(xié)商過程中選擇所有要用到的協(xié)議。
- 2矫户、用 集合connnectionspecs構造ConnectionSpecSelector片迅。
- 3、如果請求是不安全的請求皆辽,會對請求執(zhí)行一些額外的限制:
3.1柑蛇、ConnectionSpec集合必須包含ConnectionSpec.CLEARTEXT芥挣。也就是說OkHttp用戶可以通過OkHttpClient設置不包含ConnectionSpec.CLEARTEXT的ConnectionSpec集合來禁用所有的明文要求。
3.2耻台、平臺本身的安全策略允向相應的主機發(fā)送明文請求空免。對于Android平臺而言,這種安全策略主要由系統(tǒng)的組件android.security.NetworkSecurityPolicy執(zhí)行粘我。平臺的這種安全策略不是每個Android版本都有的。Android6.0之后存在這種控制痹换。
(okhttp/okhttp/src/main/java/okhttp3/internal/platform/AndroidPlatform.java 里面的isCleartextTrafficPermitted()方法) - 4征字、根據(jù)請求判斷是否需要建立隧道連接,如果建立隧道連接則調(diào)用
connectTunnel(connectTimeout, readTimeout, writeTimeout); - 5娇豫、如果不是隧道連接則調(diào)用connectSocket(connectTimeout, readTimeout);建立普通連接匙姜。
- 6、通過調(diào)用establishProtocol建立協(xié)議
- 7冯痢、如果是HTTP/2氮昧,則設置相關屬性。
整個流程已經(jīng)梳理完浦楣,咱們就摳一下具體的細節(jié)袖肥,首先來看下建立普通連接,因為隧道連接也會用到普通連接的代碼:
看下connectSocket()方法
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 根據(jù)代理類型來選擇socket類型振劳,是代理還是直連
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 連接socket椎组,之所以這樣寫是因為支持不同的平臺
//里面實際上是 socket.connect(address, connectTimeout);
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// 得到輸入/輸出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
有3種情況需要建立普通連接:
- 無代理
- 明文的HTTP代理
- SOCKS代理
普通連接的建立過程為建立TCP連接,建立TCP連接的過程為:
- 1历恐、創(chuàng)建Socket寸癌,非SOCKS代理的情況下,通過SocketFactory創(chuàng)建弱贼;在SOCKS代理則傳入proxy手動new一個出來蒸苇。
- 2、為Socket設置超時
- 3吮旅、完成特定于平臺的連接建立
- 4溪烤、創(chuàng)建用于I/O的source和sink
下面我來看下connectSocket()的具體實現(xiàn),connectSocket()具體實現(xiàn)是AndroidPlatform.java里面的connectSocket()庇勃。
關于AndroidPlatform.java請看上一篇文章氛什。
設置了SOCKS代理的情況下,僅有的特別之處在于匪凉,是通過傳入proxy手動創(chuàng)建Socket枪眉。route的socketAddress包含目標HTTP服務器的域名。由此可見SOCKS協(xié)議的處理再层,主要是在Java標準庫的java.net.Socket中處理贸铜,對于外界而言堡纬,就好像是HTTP服務器直接建立連接一樣,因此連接時傳入的地址都是HTTP服務器的域名蒿秦。
而對于明文的HTTP代理的情況下烤镐,這里滅有任何特殊處理。route的socketAddress包含著代理服務器的IP地址棍鳖。HTTP代理自身會根據(jù)請求及相應的實際內(nèi)容炮叶,建立與HTTP服務器的TCP連接,并轉發(fā)數(shù)據(jù)渡处。
這時候我們再來看下建立隧道邏輯:
/**
* Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
* proxy server can issue an auth challenge and then close the connection.
*/
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
throws IOException {
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
int attemptedConnections = 0;
int maxAttempts = 21;
while (true) {
if (++attemptedConnections > maxAttempts) {
throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
}
connectSocket(connectTimeout, readTimeout);
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
if (tunnelRequest == null) break; // Tunnel successfully created.
// The proxy decided to close the connection after an auth challenge. We need to create a new
// connection, but this time with the auth credentials.
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
}
}
建立隧道連接的過程又分為幾個步驟:
- 創(chuàng)建隧道請求
- 建立Socket連接
- 發(fā)送請求建立隧道
隧道請求是一個常規(guī)的HTTP請求镜悉,只是請求的內(nèi)容有點特殊医瘫。最初創(chuàng)建的隧道請求如:
/**
* Returns a request that creates a TLS tunnel via an HTTP proxy. Everything in the tunnel request
* is sent unencrypted to the proxy server, so tunnels include only the minimum set of headers.
* This avoids sending potentially sensitive data like HTTP cookies to the proxy unencrypted.
*/
private Request createTunnelRequest() {
return new Request.Builder()
.url(route.address().url())
.header("Host", Util.hostHeader(route.address().url(), true))
.header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
.header("User-Agent", Version.userAgent())
.build();
}
一個隧道請求的例子如下:
請求的"Host" header中包含了目標HTTP服務器的域名莺禁。建立socket連接的過程這里就不細說了
創(chuàng)建隧道的過程是這樣子的:
/**
* To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
* the proxy connection. This may need to be retried if the proxy requires authorization.
*/
private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
HttpUrl url) throws IOException {
// Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
while (true) {
Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
source.timeout().timeout(readTimeout, MILLISECONDS);
sink.timeout().timeout(writeTimeout, MILLISECONDS);
tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
tunnelConnection.finishRequest();
Response response = tunnelConnection.readResponseHeaders(false)
.request(tunnelRequest)
.build();
// The response body from a CONNECT should be empty, but if it is not then we should consume
// it before proceeding.
long contentLength = HttpHeaders.contentLength(response);
if (contentLength == -1L) {
contentLength = 0L;
}
Source body = tunnelConnection.newFixedLengthSource(contentLength);
Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
body.close();
switch (response.code()) {
case HTTP_OK:
// Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
// that happens, then we will have buffered bytes that are needed by the SSLSocket!
// This check is imperfect: it doesn't tell us whether a handshake will succeed, just
// that it will almost certainly fail because the proxy has sent unexpected data.
if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
throw new IOException("TLS tunnel buffered too many bytes!");
}
return null;
case HTTP_PROXY_AUTH:
tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
if ("close".equalsIgnoreCase(response.header("Connection"))) {
return tunnelRequest;
}
break;
default:
throw new IOException(
"Unexpected response code for CONNECT: " + response.code());
}
}
}
在前面創(chuàng)建的TCP連接值上斩例,完成代理服務器的HTTP請求/響應交互。請求的內(nèi)容類似下面這樣:
"CONNECT m.taobao.com:443 HTTP/1.1"
這里可能會根據(jù)HTTP代理是否需要認證而有多次HTTP請求/響應交互僚纷。
總結一下OkHttp3中代理相關的處理矩距;
- 1、沒有設置代理的情況下怖竭,直接與HTTP服務器建立TCP連接剩晴,然后進行HTTP請求/響應的交互。
- 2侵状、設置了SOCKS代理的情況下赞弥,創(chuàng)建Socket時,為其傳入proxy趣兄,連接時還是以HTTP服務器為目標绽左。在標準庫的Socket中完成SOCKS協(xié)議相關的處理。此時基本上感知不到代理的存在艇潭。
- 3拼窥、設置了HTTP代理時的HTTP請求,與HTTP代理服務器建立TCP連接蹋凝。HTTP代理服務器解析HTTP請求/響應的內(nèi)容鲁纠,并根據(jù)其中的信息來完成數(shù)據(jù)的轉發(fā)。也就是說鳍寂,如果HTTP請求中不包含"Host"header改含,則有可能在設置了HTTP代理的情況下無法與HTTP服務器建立連接。
- 4迄汛、HTTP代理時的HTTPS/HTTP2請求捍壤,與HTTP服務器建立通過HTTP代理的隧道連接骤视。HTTP代理不再解析傳輸?shù)臄?shù)據(jù),僅僅完成數(shù)據(jù)轉發(fā)的功能鹃觉。此時HTTP代理的功能退化為如同SOCKS代理類似专酗。
- 5、設置了代理類時盗扇,HTTP的服務器的域名解析會交給代理服務器執(zhí)行祷肯。其中設置了HTTP代理時,會對HTTP代理的域名做域名解析疗隶。
上述流程弄明白后佑笋,來看下建立協(xié)議
不管是建立隧道連接,還是建立普通連接抽减,都少不了建立協(xié)議這一步驟允青,這一步是建立好TCP連接之后橄碾,而在該TCP能被拿來手法數(shù)據(jù)之前執(zhí)行的卵沉。它主要為了數(shù)據(jù)的加密傳輸做一些初始化,比如TCL握手法牲,HTTP/2的協(xié)商史汗。
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
//如果不是ssl
if (route.address().sslSocketFactory() == null) {
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}
//如果是sll
connectTls(connectionSpecSelector);
//如果是HTTP2
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.build();
http2Connection.start();
}
}
上面的代碼大體上可以歸納為兩點
- 1、對于加密的數(shù)據(jù)傳輸拒垃,創(chuàng)建TLS連接停撞。對于明文傳輸,則設置protocol和socket悼瓮。socket直接指向應用層戈毒,如HTTP或HTTP/2,交互的Socket。
1.1對于明文傳輸沒有設置HTTP代理的HTTP請求横堡,它是與HTTP服務器之間的TCP socket埋市。
1.2對于加密傳輸沒有設置HTTP代理服務器的HTTP或HTTP2請求,它是與HTTP服務器之間的SSLSocket命贴。
1.3對于加密傳輸設置了HTTP代理服務器的HTTP或HTTP2請求道宅,它是與HTTP服務器之間經(jīng)過代理服務器的SSLSocket,一個隧道連接胸蛛;
1.4對于加密傳輸設置了SOCKS代理的HTTP或HTTP2請求污茵,它是一條經(jīng)過了代理服務器的SSLSocket連接。 - 2葬项、對于HTTP/2泞当,通過new 一個Http2Connection.Builder會建立HTTP/2連接 Http2Connection,然后執(zhí)行http2Connection.start()和服務器建立協(xié)議民珍。我們先來看下建立TLS連接的connectTls()方法
private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
Address address = route.address();
SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
boolean success = false;
SSLSocket sslSocket = null;
try {
// Create the wrapper over the connected socket.
//在原來的Socket加一層ssl
sslSocket = (SSLSocket) sslSocketFactory.createSocket(
rawSocket, address.url().host(), address.url().port(), true /* autoClose */);
// Configure the socket's ciphers, TLS versions, and extensions.
ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
if (connectionSpec.supportsTlsExtensions()) {
Platform.get().configureTlsExtensions(
sslSocket, address.url().host(), address.protocols());
}
// Force handshake. This can throw!
sslSocket.startHandshake();
Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());
// Verify that the socket's certificates are acceptable for the target host.
if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
+ "\n certificate: " + CertificatePinner.pin(cert)
+ "\n DN: " + cert.getSubjectDN().getName()
+ "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
}
// Check that the certificate pinner is satisfied by the certificates presented.
address.certificatePinner().check(address.url().host(),
unverifiedHandshake.peerCertificates());
// Success! Save the handshake and the ALPN protocol.
String maybeProtocol = connectionSpec.supportsTlsExtensions()
? Platform.get().getSelectedProtocol(sslSocket)
: null;
socket = sslSocket;
source = Okio.buffer(Okio.source(socket));
sink = Okio.buffer(Okio.sink(socket));
handshake = unverifiedHandshake;
protocol = maybeProtocol != null
? Protocol.get(maybeProtocol)
: Protocol.HTTP_1_1;
success = true;
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket);
}
if (!success) {
closeQuietly(sslSocket);
}
}
}
TLS連接是對原始TCP連接的一個封裝零蓉,以及聽過TLS握手笤受,及數(shù)據(jù)手法過程中的加解密等功能。在Java中敌蜂,用SSLSocket來描述箩兽。上面建立的TLS連接的過程大體為:
- 1、用SSLSocketFactory基于原始的TCP Socket章喉,創(chuàng)建一個SSLSocket汗贫。
- 2、并配置SSLSocket秸脱。
- 3落包、在前面選擇的ConnectionSpec支持TLS擴展參數(shù)時,配置TLS擴展參數(shù)摊唇。
- 4咐蝇、啟動TLS握手
- 5、TLS握手完成之后巷查,獲取證書信息有序。
- 6、對TLS握手過程中傳回來的證書進行驗證岛请。
- 7旭寿、在前面選擇的ConnectionSpec支持TLS擴展參數(shù)時,獲取TLS握手過程中順便完成的協(xié)議協(xié)商過程所選擇的協(xié)議崇败。這個過程主要用于HTTP/2的ALPN擴展盅称。
- 8、OkHttp主要使用Okio來做IO操作后室,這里會基于前面獲取到SSLSocket創(chuàng)建于執(zhí)行的IO的BufferedSource和BufferedSink等缩膝,并保存握手信息以及所選擇的協(xié)議。
至此連接已經(jīng)建立連接已經(jīng)結束了岸霹。
這里說一下isHealthy(boolean doExtensiveChecks)方法疾层,入?yún)⑹且粋€布爾類,表示是否需要額外的檢查松申。這里主要是檢查云芦,判斷這個連接是否是健康的連接,即是否可以重用贸桶。那我們來看下
/** Returns true if this connection is ready to host new streams. */
public boolean isHealthy(boolean doExtensiveChecks) {
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
if (http2Connection != null) {
return !http2Connection.isShutdown();
}
if (doExtensiveChecks) {
try {
int readTimeout = socket.getSoTimeout();
try {
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
return true;
}
看上述代碼可知舅逸,同時滿足如下條件才是健康的連接,否則返回false
- 1皇筛、socket已經(jīng)關閉
- 2琉历、輸入流關閉
- 3、輸出流關閉
- 4、如果是HTTP/2連接旗笔,則HTTP/2連接也要關閉彪置。
讓我們再來看下isEligible(Address, Route)方法,這個方法主要是判斷面對給出的addres和route蝇恶,這個realConnetion是否可以重用拳魁。
/**
* Returns true if this connection can carry a stream allocation to {@code address}. If non-null
* {@code route} is the resolved route for a connection.
*/
public boolean isEligible(Address address, Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false;
// 2. The routes must share an IP address. This requires us to have a DNS address for both
// hosts, which only happens after route planning. We can't coalesce connections that use a
// proxy, since proxies don't tell us the origin server's IP address.
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
判斷邏輯如下:
- 如果連接達到共享上限,則不能重用
- 非host域必須完全一樣撮弧,如果不一樣不能重用
- 如果此時host域也相同潘懊,則符合條件,可以被復用
- 如果host不相同贿衍,在HTTP/2的域名切片場景下一樣可以復用
關于HTTP/2的可以參考下面的文章
https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
最后再來看下newCodec(OkHttpClient, StreamAllocation)方法
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
里面主要是判斷是否是HTTP/2,如果是HTTP/2則new一個Http2Codec授舟。如果不是HTTP/2則new一個Http1Codec。
上面提到了connection的跟蹤狀態(tài)由ConncetionPool來管理贸辈。
二释树、ConnectionPool
大家先來看下一個類的注釋
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
* share the same {@link Address} may share a {@link Connection}. This class implements the policy
* of which connections to keep open for future use.
*/
簡單的翻譯下,如下:
管理http和http/2的鏈接擎淤,以便減少網(wǎng)絡請求延遲奢啥。同一個address將共享同一個connection。該類實現(xiàn)了復用連接的目標揉燃。
然后看下這個類的字段:
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//這是一個用于清楚過期鏈接的線程池扫尺,每個線程池最多只能運行一個線程筋栋,并且這個線程池允許被垃圾回收
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
//每個address的最大空閑連接數(shù)炊汤。
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//清理任務
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//鏈接的雙向隊列
private final Deque<RealConnection> connections = new ArrayDeque<>();
//路由的數(shù)據(jù)庫
final RouteDatabase routeDatabase = new RouteDatabase();
//清理任務正在執(zhí)行的標志
boolean cleanupRunning;
來看下它的屬性,
- 1弊攘、主要就是connections抢腐,可見ConnectionPool內(nèi)部以隊列方式存儲連接;
- 2襟交、routDatabase是一個黑名單迈倍,用來記錄不可用的route,但是看代碼貌似ConnectionPool并沒有使用它捣域。所以此處不做分析啼染。
- 3、剩下的就是和清理有關了焕梅,所以executor是清理任務的線程池迹鹅,cleanupRunning是清理任務的標志,cleanupRunnable是清理任務贞言。
再來看下他的構造函數(shù)
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
//創(chuàng)建一個適用于單個應用程序的新連接池斜棚。
//該連接池的參數(shù)將在未來的okhttp中發(fā)生改變
//目前最多可容乃5個空閑的連接,存活期是5分鐘
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
//保持活著的時間,否則清理將旋轉循環(huán)
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
通過這個構造器我們知道了這個連接池最多維持5個連接弟蚀,且每個鏈接最多活5分鐘蚤霞。并且包含一個線程池包含一個清理任務。
所以maxIdleConnections和keepAliveDurationNs則是清理中淘汰連接的的指標义钉,這里需要說明的是maxIdleConnections是值每個地址上最大的空閑連接數(shù)昧绣。所以OkHttp只是限制與同一個遠程服務器的空閑連接數(shù)量,對整體的空閑連接并沒有限制捶闸。
PS:
這時候說下ConnectionPool的實例化的過程滞乙,一個OkHttpClient只包含一個ConnectionPool,其實例化也是在OkHttpClient的過程鉴嗤。這里說一下ConnectionPool各個方法的調(diào)用并沒有直接對外暴露斩启,而是通過OkHttpClient的Internal接口統(tǒng)一對外暴露。
然后我們來看下他的get和put方法
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists. The
* route is null if the address has not yet been routed.
*/
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
//斷言醉锅,判斷線程是不是被自己鎖住了
assert (Thread.holdsLock(this));
// 遍歷已有連接集合
for (RealConnection connection : connections) {
//如果connection和需求中的"地址"和"路由"匹配
if (connection.isEligible(address, route)) {
//復用這個連接
streamAllocation.acquire(connection);
//返回這個連接
return connection;
}
}
return null;
}
get() 方法遍歷 connections 中的所有 RealConnection 尋找同時滿足條件的RealConnection兔簇。
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
put方法更為簡單,就是異步觸發(fā)清理任務硬耍,然后將連接添加到隊列中垄琐。那么下面開始重點分析他的清理任務。
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
這個邏輯也很簡單经柴,就是調(diào)用cleanup方法執(zhí)行清理狸窘,并等待一段時間,持續(xù)清理坯认,其中cleanup方法返回的值來來決定而等待的時間長度翻擒。那我們繼續(xù)來看下cleanup函數(shù):
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
//統(tǒng)計空閑連接數(shù)量
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
//找出空閑時間最長的連接以及對應的空閑時間
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
//在符合清理條件下,清理空閑時間最長的連接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
//不符合清理條件牛哺,則返回下次需要執(zhí)行清理的等待時間陋气,也就是此連接即將到期的時間
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
//沒有空閑的連接,則隔keepAliveDuration(分鐘)之后再次執(zhí)行
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//清理結束
cleanupRunning = false;
return -1;
}
}
//關閉socket資源
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
//這里是在清理一個空閑時間最長的連接以后會執(zhí)行到這里引润,需要立即再次執(zhí)行清理
return 0;
}
這里的首先統(tǒng)計空閑連接數(shù)量巩趁,然后通過for循環(huán)查找最長空閑時間的連接以及對應空閑時長,然后判斷是否超出最大空閑連接數(shù)(maxIdleConnections)或者或者超過最大空閑時間(keepAliveDurationNs)淳附,滿足其一則清除最長空閑時長的連接议慰。如果不滿足清理條件,則返回一個對應等待時間奴曙。
這個對應等待的時間又分二種情況:
- 1 有連接則等待下次需要清理的時間去清理:keepAliveDurationNs-longestIdleDurationNs;
- 2 沒有空閑的連接别凹,則等下一個周期去清理:keepAliveDurationNs
如果清理完畢返回-1。
綜上所述缆毁,我們來梳理一下清理任務番川,清理任務就是異步執(zhí)行的,遵循兩個指標,最大空閑連接數(shù)量和最大空閑時長颁督,滿足其一則清理空閑時長最大的那個連接践啄,然后循環(huán)執(zhí)行,要么等待一段時間沉御,要么繼續(xù)清理下一個連接屿讽,知道清理所有連接,清理任務才結束吠裆,下一次put的時候伐谈,如果已經(jīng)停止的清理任務則會被再次觸發(fā)
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
//遍歷弱引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//若StreamAllocation被使用則接著循環(huán)
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
//若StreamAllocation未被使用則移除引用,這邊注釋為泄露
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
//如果列表為空則說明此連接沒有被引用了试疙,則返回0诵棵,表示此連接是空閑連接
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
pruneAndGetAllocationCount主要是用來標記泄露連接的。內(nèi)部通過遍歷傳入進來的RealConnection的StreamAllocation列表祝旷,如果StreamAllocation被使用則接著遍歷下一個StreamAllocation履澳。如果StreamAllocation未被使用則從列表中移除,如果列表中為空則說明此連接連接沒有引用了怀跛,返回0距贷,表示此連接是空閑連接,否則就返回非0表示此連接是活躍連接吻谋。
接下來讓我看下ConnectionPool的connectionBecameIdle()方法忠蝗,就是當有連接空閑時,喚起cleanup線程清洗連接池
/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
//該連接已經(jīng)不可用
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
//歡迎clean 線程
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
connectionBecameIdle標示一個連接處于空閑狀態(tài)漓拾,即沒有流任務阁最,那么久需要調(diào)用該方法,由ConnectionPool來決定是否需要清理該連接晦攒。
再來看下deduplicate()方法
/**
* Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
* This recovers when multiple multiplexed connections are created concurrently.
*/
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
該方法主要是針對HTTP/2場景下多個多路復用連接清除的場景闽撤。如果是當前連接是HTTP/2得哆,那么所有指向該站點的請求都應該基于同一個TCP連接脯颜。這個方法比較簡單就不詳細說了,再說下另外一個方法
/** Close and remove all idle connections in the pool. */
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
該方法是刪除所有空閑的連接贩据,比較簡單栋操,不說了
三、 StreamAllocation
這個類很重要饱亮,我們先來看下類的注釋
/**
* This class coordinates the relationship between three entities:
*
* <ul>
* <li><strong>Connections:</strong> physical socket connections to remote servers. These are
* potentially slow to establish so it is necessary to be able to cancel a connection
* currently being connected.
* <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
* connections. Each connection has its own allocation limit, which defines how many
* concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
* at a time, HTTP/2 typically carry multiple.
* <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
* its follow up requests. We prefer to keep all streams of a single call on the same
* connection for better behavior and locality.
* </ul>
*
* <p>Instances of this class act on behalf of the call, using one or more streams over one or more
* connections. This class has APIs to release each of the above resources:
*
* <ul>
* <li>{@link #noNewStreams()} prevents the connection from being used for new streams in the
* future. Use this after a {@code Connection: close} header, or when the connection may be
* inconsistent.
* <li>{@link #streamFinished streamFinished()} releases the active stream from this allocation.
* Note that only one stream may be active at a given time, so it is necessary to call
* {@link #streamFinished streamFinished()} before creating a subsequent stream with {@link
* #newStream newStream()}.
* <li>{@link #release()} removes the call's hold on the connection. Note that this won't
* immediately free the connection if there is a stream still lingering. That happens when a
* call is complete but its response body has yet to be fully consumed.
* </ul>
*
* <p>This class supports {@linkplain #cancel asynchronous canceling}. This is intended to have the
* smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream
* but not the other streams sharing its connection. But if the TLS handshake is still in progress
* then canceling may break the entire connection.
*/
在講解這個類的時候不得不說下背景:
HTTP的版本:
HTTP的版本從最初的1.0版本矾芙,到后續(xù)的1.1版本,再到后續(xù)的google推出的SPDY,后來再推出2.0版本近上,http協(xié)議越來越完善剔宪。(ps:okhttp也是根據(jù)2.0和1.1/1.0作為區(qū)分,實現(xiàn)了兩種連接機制)這里要說下http2.0和http1.0,1.1的主要區(qū)別,2.0解決了老版本(1.1和1.0)最重要兩個問題:連接無法復用和head of line blocking (HOL)問題.2.0使用多路復用的技術葱绒,多個stream可以共用一個socket連接感帅,每個tcp連接都是通過一個socket來完成的,socket對應一個host和port地淀,如果有多個stream(也就是多個request)都是連接在一個host和port上失球,那么它們就可以共同使用同一個socket,這樣做的好處就是可以減少TCP的一個三次握手的時間。在OKHttp里面帮毁,記錄一次連接的是RealConnection实苞,這個負責連接,在這個類里面用socket來連接烈疚,用HandShake來處理握手黔牵。
在講解這個類的之前我們先熟悉3個概念:請求、連接爷肝、流荧止。我們要明白HTTP通信執(zhí)行網(wǎng)絡"請求"需要在"連接"上建立一個新的"流",我們將StreamAllocation稱之流的橋梁,它負責為一次"請求"尋找"連接"并建立"流"阶剑,從而完成遠程通信跃巡。所以說StreamAllocation與"請求"、"連接"牧愁、"流"都有關素邪。
從注釋我們看到。Connection是建立在Socket之上的物流通信信道猪半,而Stream則是代表邏輯的流兔朦,至于Call是對一次請求過程的封裝。之前也說過一個Call可能會涉及多個流(比如重定向或者auth認證等情況)磨确。所以我們想一下沽甥,如果StreamAllocation要想解決上述問題,需要兩個步驟乏奥,一是尋找連接摆舟,二是獲取流。所以StreamAllocation里面應該包含一個Stream(上文已經(jīng)說到了邓了,OKHttp里面的流是HttpCodec)恨诱;還應該包含連接Connection。如果想找到合適的劉姐骗炉,還需要一個連接池ConnectionPool屬性照宝。所以應該有一個獲取流的方法在StreamAllocation里面是newStream();找到合適的流的方法findConnection()句葵;還應該有完成請求任務的之后finish()的方法來關閉流對象厕鹃,還有終止和取消等方法兢仰,以及釋放資源的方法。
1剂碴、那咱們先就看下他的屬性
public final Address address;//地址
private Route route; //路由
private final ConnectionPool connectionPool; //連接池
private final Object callStackTrace; //日志
// State guarded by connectionPool.
private final RouteSelector routeSelector; //路由選擇器
private int refusedStreamCount; //拒絕的次數(shù)
private RealConnection connection; //連接
private boolean released; //是否已經(jīng)被釋放
private boolean canceled //是否被取消了
看完屬性旨别,我們來看下構造函數(shù)
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
這時候我們再來看下他的一個比較重要的方法
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//獲取一個連接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//實例化HttpCodec,如果是HTTP/2則是Http2Codec否則是Http1Codec
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
這里面兩個重要方法
1是通過findHealthyConnection獲取一個連接、2是通過resultConnection.newCodec獲取流汗茄。
我們接著來看findHealthyConnection()方法
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
我們看到里面調(diào)用findConnection來獲取一個RealConnection秸弛,然后通過RealConnection自己的方法isHealthy,去判斷是否是健康的連接洪碳,如果是健康的連接递览,則重用,否則就繼續(xù)查找瞳腌。那我們繼續(xù)看下findConnection()方法
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
//獲取存在的連接
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
// 如果已經(jīng)存在的連接滿足要求绞铃,則使用已存在的連接
return allocatedConnection;
}
//從緩存中去取
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 線路的選擇,多ip的支持
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
//里面是個遞歸
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
//更換路由再次嘗試
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 以上都不符合嫂侍,創(chuàng)建一個連接
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
//連接并握手
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
//更新本地數(shù)據(jù)庫
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
//把連接放到連接池中
Internal.instance.put(connectionPool, result);
//如果這個連接是多路復用
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
//調(diào)用connectionPool的deduplicate方法去重儿捧。
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//如果是重復的socket則關閉socket,不是則socket為nul挑宠,什么也不做
closeQuietly(socket);
//返回整個連接
return result;
}
上面代碼大概的邏輯是:
- 1菲盾、先找是否有已經(jīng)存在的連接,如果有已經(jīng)存在的連接各淀,并且可以使用(!noNewStreams)則直接返回懒鉴。
- 2、根據(jù)已知的address在connectionPool里面找碎浇,如果有連接临谱,則返回
- 3、更換路由奴璃,更換線路悉默,在connectionPool里面再次查找,如果有則返回苟穆。
- 4抄课、如果以上條件都不滿足則直接new一個RealConnection出來
- 5、new出來的RealConnection通過acquire關聯(lián)到connection.allocations上
- 6鞭缭、做去重判斷剖膳,如果有重復的socket則關閉
里面涉及到的RealConnection的connect()方法,我們已經(jīng)在RealConnection里面講過岭辣,這里就不講了。不過這里說下acquire()方法
/**
* Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
* {@link #release} on the same connection.
*/
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
這里相當于給connection的引用計數(shù)器加1
這里說下StreamAllocationReference甸饱,StreamAllocationReference其實是弱引用的子類沦童。具體代碼如下:
public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {
/**
* Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
* identifying the origin of connection leaks.
*/
public final Object callStackTrace;
StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
super(referent);
this.callStackTrace = callStackTrace;
}
}
下面來看下他的他的其他方法streamFinished(boolean, HttpCodec)仑濒、release(RealConnection)和deallocate(boolean, boolean, boolean)方法。
public void streamFinished(boolean noNewStreams, HttpCodec codec) {
Socket socket;
synchronized (connectionPool) {
if (codec == null || codec != this.codec) {
throw new IllegalStateException("expected " + this.codec + " but was " + codec);
}
if (!noNewStreams) {
connection.successCount++;
}
socket = deallocate(noNewStreams, false, true);
}
closeQuietly(socket);
}
/**
* Releases resources held by this allocation. If sufficient resources are allocated, the
* connection will be detached or closed. Callers must be synchronized on the connection pool.
*
* <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
* of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
*/
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
if (streamFinished) {
this.codec = null;
}
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true;
}
if (this.codec == null && (this.released || connection.noNewStreams)) {
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;
}
/** Remove this allocation from the connection's list of allocations. */
private void release(RealConnection connection) {
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
其中deallocate(boolean, boolean, boolean)和release(RealConnection)方法都是private偷遗,而且均在streamFinished里面調(diào)用墩瞳。
release(RealConnection)方法比較簡單,主要是把RealConnection對應的allocations清除掉氏豌,把計數(shù)器歸零喉酌。
deallocate(boolean, boolean, boolean)方法也簡單,根據(jù)傳入的三個布爾類型的值進行操作泵喘,如果streamFinished為true則代表關閉流泪电,所以要通過連接池connectionPool把這個connection設置空閑連接,如果可以設為空閑連接則返回這個socket纪铺。不能則返回null相速。
streamFinished()主要做了一些異常判斷,然后調(diào)用deallocate()方法
綜上所述:streamFinished(boolean, HttpCodec)主要是關閉流鲜锚,release(RealConnection)主要是釋放connection的引用,deallocate(boolean, boolean, boolean)主要是根據(jù)參數(shù)做一些設置突诬。
上面說到了release(RealConnection),為了防止大家混淆概念芜繁,這里說一下另外一個方法release()這個是無參的方法旺隙。
public void release() {
Socket socket;
synchronized (connectionPool) {
socket = deallocate(false, true, false);
}
closeQuietly(socket);
}
注意這個和上面的帶有RealConnection的參數(shù)release()的區(qū)別。
然后說一下noNewStreams()方法骏令,主要是設置防止別人在這個連接上開新的流催束。
/** Forbid new streams from being created on the connection that hosts this allocation. */
public void noNewStreams() {
Socket socket;
synchronized (connectionPool) {
socket = deallocate(true, false, false);
}
closeQuietly(socket);
}
還有一個方法,平時也是經(jīng)常有遇到的就是cancel()方法
public void cancel() {
HttpCodec codecToCancel;
RealConnection connectionToCancel;
synchronized (connectionPool) {
canceled = true;
codecToCancel = codec;
connectionToCancel = connection;
}
if (codecToCancel != null) {
codecToCancel.cancel();
} else if (connectionToCancel != null) {
connectionToCancel.cancel();
}
}
其實也比較簡單的就是調(diào)用RealConnection的Cancel方法伏社。
如果在連接中過程出現(xiàn)異常抠刺,會調(diào)用streamFailed(IOException)方法
public void streamFailed(IOException e) {
Socket socket;
boolean noNewStreams = false;
synchronized (connectionPool) {
if (e instanceof StreamResetException) {
StreamResetException streamResetException = (StreamResetException) e;
if (streamResetException.errorCode == ErrorCode.REFUSED_STREAM) {
refusedStreamCount++;
}
// On HTTP/2 stream errors, retry REFUSED_STREAM errors once on the same connection. All
// other errors must be retried on a new connection.
if (streamResetException.errorCode != ErrorCode.REFUSED_STREAM || refusedStreamCount > 1) {
noNewStreams = true;
route = null;
}
} else if (connection != null
&& (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {
noNewStreams = true;
// If this route hasn't completed a call, avoid it for new connections.
if (connection.successCount == 0) {
if (route != null && e != null) {
routeSelector.connectFailed(route, e);
}
route = null;
}
}
socket = deallocate(noNewStreams, false, true);
}
closeQuietly(socket);
}
根據(jù)異常類型來采取不同的應對措施。注釋已經(jīng)比較清楚了摘昌,就不細說了速妖。
其他的方法比較簡單,我這里就不細說了聪黎。