Okhttp的淺層架構(gòu)分析
Okhttp的責(zé)任鏈模式和攔截器分析
Okhttp之RetryAndFollowUpInterceptor攔截器分析
Okhttp之BridgeInterceptor攔截器分析
Okhttp之CacheInterceptor攔截器分析
Okhttp之ConnectInterceptor攔截器分析
Okhttp之網(wǎng)絡(luò)連接相關(guān)三大類RealConnection乌询、ConnectionPool、StreamAllocation
Okhttp之CallServerInterceptor攔截器分析
淺析okio的架構(gòu)和源碼實現(xiàn)
隨著okhttp的不斷深入豌研,已經(jīng)開始接觸到了難啃的內(nèi)容妹田,即核心連接通信邏輯唬党,下面就來扒一扒它是怎么設(shè)計和構(gòu)造的。開始之前先來看看http2.0多路復(fù)用的概念秆麸。
HTTP 1.1 默認啟用長TCP連接初嘹,但所有的請求-響應(yīng)都是按序進行的(這里的長連接可理解成半雙工協(xié)議。即便是HTTP1.1引入了管道機制沮趣,也是如此)屯烦。復(fù)用同一個TCP連接期間,即便是通過管道同時發(fā)送了多個請求房铭,服務(wù)端也是按請求的順序依次給出響應(yīng)的驻龟;而客戶端在未收到之前所發(fā)出所有請求的響應(yīng)之前,將會阻塞后面的請求(排隊等待)缸匪,這稱為"隊頭堵塞"(Head-of-line blocking)翁狐。
HTTP2.0復(fù)用TCP連接則不同,雖然依然遵循請求-響應(yīng)模式凌蔬,但客戶端發(fā)送多個請求和服務(wù)端給出多個響應(yīng)的順序不受限制露懒,這樣既避免了"隊頭堵塞",又能更快獲取響應(yīng)砂心。在復(fù)用同一個TCP連接時懈词,服務(wù)器同時(或先后)收到了A、B兩個請求辩诞,先回應(yīng)A請求坎弯,但由于處理過程非常耗時,于是就發(fā)送A請求已經(jīng)處理好的部分译暂, 接著回應(yīng)B請求抠忘,完成后,再發(fā)送A請求剩下的部分外永。HTTP2.0長連接可以理解成全雙工的協(xié)議崎脉。
HTTP2.0 使用 多路復(fù)用 的技術(shù),多個 stream 可以共用一個 socket 連接象迎。每個 tcp連接都是通過一個 socket 來完成的荧嵌,socket 對應(yīng)一個 host 和 port,如果有多個stream(即多個 Request) 都是連接在一個 host 和 port上砾淌,那么它們就可以共同使用同一個 socket ,這樣做的好處就是 可以減少TCP的一個三次握手的時間啦撮。
在OKHttp里面,負責(zé)連接的是 RealConnection 汪厨。
一赃春、RealConnection
RealConnection是Connection的實現(xiàn)類,代表著鏈接socket的鏈路劫乱,如果擁有了一個RealConnection就代表了我們已經(jīng)跟服務(wù)器有了一條通信鏈路织中。與服務(wù)的三次握手也是在這里實現(xiàn)的锥涕。下面看看它的屬性和構(gòu)造函數(shù)。
public final class RealConnection extends Http2Connection.Listener implements Connection {
private static final String NPE_THROW_WITH_NULL = "throw with null exception";
private static final int MAX_TUNNEL_ATTEMPTS = 21;
private final ConnectionPool connectionPool;//連接池
private final Route route;//路由
// The fields below are initialized by connect() and never reassigned.
//下面這些字段狭吼,通過connect()方法開始初始化层坠,并且絕對不會再次賦值
/** The low-level TCP socket. */
private Socket rawSocket; //底層socket
/**
* The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
* {@link #rawSocket} itself if this connection does not use SSL.
*/
private Socket socket; //應(yīng)用層socket
//握手
private Handshake handshake;
//協(xié)議
private Protocol protocol;
// http2的鏈接
private Http2Connection http2Connection;
//通過okio的source和sink,大家可以猜到是與服務(wù)器交互的輸入輸出流
private BufferedSource source;
private BufferedSink sink;
// The fields below track connection state and are guarded by connectionPool.
//下面這個字段是 屬于表示鏈接狀態(tài)的字段刁笙,并且有connectPool統(tǒng)一管理
/** If true, no new streams can be created on this connection. Once true this is always true. */
//如果noNewStreams被設(shè)為true破花,則noNewStreams一直為true,不會被改變疲吸,并且表示這個鏈接不會再創(chuàng)新的stream流
public boolean noNewStreams;
//成功的次數(shù)
public int successCount;
/**
* The maximum number of concurrent streams that can be carried by this connection. If {@code
* allocations.size() < allocationLimit} then new streams can be created on this connection.
*/
//此鏈接可以承載最大并發(fā)流的限制座每,如果不超過限制,可以隨意增加
public int allocationLimit = 1;
/** Current streams carried by this connection. */
//此鏈接當前攜帶的流
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
/** Nanotime timestamp when {@code allocations.size()} reached zero. */
public long idleAtNanos = Long.MAX_VALUE;
//構(gòu)造方法摘悴,傳入連接池和路由
public RealConnection(ConnectionPool connectionPool, Route route) {
this.connectionPool = connectionPool;
this.route = route;
}
...
}
下面看看核心方法connect():
public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
// 線路的選擇
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
// 連接開始,注意這是個死循環(huán)峭梳,創(chuàng)建連接成功才會跳出
while (true) {
try {
// 如果要求隧道模式,建立通道連接蹂喻,通常不是這種
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
// 一般都走這條邏輯了葱椭,實際上很簡單就是socket的連接
connectSocket(connectTimeout, readTimeout);
}
// 建立協(xié)議,構(gòu)造讀寫橋梁口四,很重要的方法
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
connectTunnel()隧道鏈接
/**
* Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
* proxy server can issue an auth challenge and then close the connection.
* 是否通過代理隧道建立HTTPS連接的所有工作挫以。 這里的問題是代理服務(wù)器可以發(fā)出一個驗證質(zhì)詢,然后關(guān)閉連接窃祝。
*/
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
EventListener eventListener) throws IOException {
//1-構(gòu)造一個 建立隧道連接 請求。
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
//2 與HTTP代理服務(wù)器建立TCP連接踱侣。
connectSocket(connectTimeout, readTimeout, call, eventListener);
//3 創(chuàng)建隧道粪小。這主要是將 建立隧道連接 請求發(fā)送給HTTP代理服務(wù)器,并處理它的響應(yīng)
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
if (tunnelRequest == null) break; // Tunnel successfully created.
// The proxy decided to close the connection after an auth challenge. We need to create a new
// connection, but this time with the auth credentials.
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
//重復(fù)上面的第2和第3步抡句,直到建立好了隧道連接探膊。
}
}
最終還是要調(diào)用到connectSocket():
/**
* Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
* 完成在原始套接字上構(gòu)建完整的HTTP或HTTPS連接所需的所有工作。
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//如果是直連或者明文的HTTP代理模式則調(diào)用address.socketFactory().createSocket()
//是普通的創(chuàng)建new Socket(host, port, clientAddress, clientPort);否則用代理
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
//監(jiān)聽回調(diào)
eventListener.connectStart(call, route.socketAddress(), proxy);
//設(shè)置socket讀數(shù)據(jù)超時時間
rawSocket.setSoTimeout(readTimeout);
try {
//建立Socket連接待榔,實際調(diào)用的就是socket.connect(address, connectTimeout);
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//okio 拿到輸入流逞壁,最終的目的就是建立了管道流
source = Okio.buffer(Okio.source(rawSocket));
// 輸出流
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
public class Platform {
public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
//最終調(diào)用java的connect
socket.connect(address, connectTimeout);
}
}
總結(jié)連接的流程:
1、創(chuàng)建Socket锐锣,非SOCKS代理的情況下腌闯,通過SocketFactory創(chuàng)建;在SOCKS代理則傳入proxy手動new一個出來雕憔。
2姿骏、為Socket設(shè)置超時
3、完成特定于平臺的連接建立
4斤彼、創(chuàng)建用于I/O的source和sink
至于代理的相關(guān)邏輯分瘦,這里暫時就不深究了蘸泻,后續(xù)會再單獨去了解。
再看看establishProtocol()方法嘲玫,這也是核心方法悦施,正是在這里這里把讀寫操作架設(shè)好的:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
// 一些協(xié)議的設(shè)置,主要方法我們看http2.0的startHttp2()方法
if (route.address().sslSocketFactory() == null) {
//如果當前協(xié)議包含了HTTP_2去团,OKhttp就會開啟Http2.0模式抡诞,主要取決于服務(wù)器
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
socket = rawSocket;
protocol = Protocol.H2_PRIOR_KNOWLEDGE;
startHttp2(pingIntervalMillis);
return;
}
//否則降級為1.1
socket = rawSocket;
protocol = Protocol.HTTP_1_1;
return;
}
eventListener.secureConnectStart(call);
//獲取證書
connectTls(connectionSpecSelector);
eventListener.secureConnectEnd(call, handshake);
//在connectTls()方法里明確了是2.0模式
if (protocol == Protocol.HTTP_2) {
startHttp2(pingIntervalMillis);
}
}
//開啟了http2Connection 線程,這是個流讀寫的線程渗勘,就是服務(wù)器客戶端直接的通道交互
private void startHttp2(int pingIntervalMillis) throws IOException {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build();
http2Connection.start();
}
繼續(xù)看http2Connection.start()方法:
/**
* Sends any initial frames and starts reading frames from the remote peer. This should be called
* after {@link Builder#build} for all new connections.
*/
public void start() throws IOException {
start(true);
}
/**
* @param sendConnectionPreface true to send connection preface frames. This should always be true
* except for in tests that don't check for a connection preface.
*/
void start(boolean sendConnectionPreface) throws IOException {
//這部分代碼暫不深究
if (sendConnectionPreface) {
writer.connectionPreface();
writer.settings(okHttpSettings);
int windowSize = okHttpSettings.getInitialWindowSize();
if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
}
}
//實際是調(diào)用了這個readerRunnable封裝的線程start()方法
new Thread(readerRunnable).start(); // Not a daemon thread.
}
看看這個readerRunnable
//前面分析過這個NamedRunnable就是繼承了runnable的接口沐绒,run()方法里面有個execute()抽象方法
//所以最終還是執(zhí)行了execute()
class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler {
final Http2Reader reader;
ReaderRunnable(Http2Reader reader) {
super("OkHttp %s", hostname);
this.reader = reader;
}
@Override
protected void execute() {
ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
try {
reader.readConnectionPreface(this);
//重點是這個nextFrame()方法,循環(huán)執(zhí)行旺坠,看起來應(yīng)該是個讀的操作
while (reader.nextFrame(false, this)) {
}
connectionErrorCode = ErrorCode.NO_ERROR;
streamErrorCode = ErrorCode.CANCEL;
} catch (IOException e) {
connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
streamErrorCode = ErrorCode.PROTOCOL_ERROR;
} finally {
try {
close(connectionErrorCode, streamErrorCode);
} catch (IOException ignored) {
}
Util.closeQuietly(reader);
}
}
...
}
在此重新梳理一下:
從Realconnection調(diào)用connect()創(chuàng)建了socket連接之后(這里討論走http2.0協(xié)議分支)乔遮,創(chuàng)建了一個http2Connection 對象,啟用了一個readerRunnable的線程取刃,run()方法的主要工作是循環(huán)地執(zhí)行reader.nextFrame()方法蹋肮。
看看reader.nextFrame()干了啥:
public boolean nextFrame(boolean requireSettings, Handler handler) throws IOException {
//不停的在讀數(shù)據(jù)幀,直到流關(guān)閉(發(fā)生IOException )返回false
//數(shù)據(jù)幀是個什么概念我也不是很清楚璧疗。坯辩。。立個flag
try {
source.require(9); // Frame header size
} catch (IOException e) {
return false; // This might be a normal socket close.
}
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Length (24) |
// +---------------+---------------+---------------+
// | Type (8) | Flags (8) |
// +-+-+-----------+---------------+-------------------------------+
// |R| Stream Identifier (31) |
// +=+=============================================================+
// | Frame Payload (0...) ...
// +---------------------------------------------------------------+
int length = readMedium(source);
if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) {
throw ioException("FRAME_SIZE_ERROR: %s", length);
}
byte type = (byte) (source.readByte() & 0xff);
if (requireSettings && type != TYPE_SETTINGS) {
throw ioException("Expected a SETTINGS frame but was %s", type);
}
byte flags = (byte) (source.readByte() & 0xff);
//streamId 很重要崩侠,用來追蹤是哪次請求流的
//Map<Integer, Http2Stream> streams ,Http2Connection里維護的一個map漆魔,用來保存各個請求流
int streamId = (source.readInt() & 0x7fffffff); // Ignore reserved bit.
if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags));
//根據(jù)數(shù)據(jù)類型,data却音,header改抡,priopity等等,雖然我不是很懂但是大概理解了
//這個循環(huán)讀操作就是等著我們的request發(fā)過服務(wù)器后系瓢,服務(wù)器返回應(yīng)答后我們在循環(huán)
//讀取的一個動作阿纤,這個方法的理解,會在后面的CallServerInterceptor攔截器里讓你
//醍醐灌頂,特別是那個readHeaders(handler, length, flags, streamId)方法
switch (type) {
case TYPE_DATA:
readData(handler, length, flags, streamId);
break;
case TYPE_HEADERS:
readHeaders(handler, length, flags, streamId);
break;
case TYPE_PRIORITY:
readPriority(handler, length, flags, streamId);
break;
case TYPE_RST_STREAM:
readRstStream(handler, length, flags, streamId);
break;
case TYPE_SETTINGS:
readSettings(handler, length, flags, streamId);
break;
case TYPE_PUSH_PROMISE:
readPushPromise(handler, length, flags, streamId);
break;
case TYPE_PING:
readPing(handler, length, flags, streamId);
break;
case TYPE_GOAWAY:
readGoAway(handler, length, flags, streamId);
break;
case TYPE_WINDOW_UPDATE:
readWindowUpdate(handler, length, flags, streamId);
break;
default:
// Implementations MUST discard frames that have unknown or unsupported types.
source.skip(length);
}
return true;
}
再來重點看看 readHeaders(handler, length, flags, streamId)方法夷陋,因為在后面的CallServerInterceptor攔截器會追蹤到欠拾,提前了解一下,是怎么讀取response的headers的:
private void readHeaders(Handler handler, int length, byte flags, int streamId)
throws IOException {
//streamId 很重要骗绕,用來追蹤是哪次請求的流
if (streamId == 0) throw ioException("PROTOCOL_ERROR: TYPE_HEADERS streamId == 0");
boolean endStream = (flags & FLAG_END_STREAM) != 0;
short padding = (flags & FLAG_PADDED) != 0 ? (short) (source.readByte() & 0xff) : 0;
if ((flags & FLAG_PRIORITY) != 0) {
readPriority(handler, streamId);
length -= 5; // account for above read.
}
length = lengthWithoutPadding(length, flags, padding);
List<Header> headerBlock = readHeaderBlock(length, padding, flags, streamId);
//重點方法藐窄,繼續(xù)追蹤
handler.headers(endStream, streamId, -1, headerBlock);
}
@Override
public void headers(boolean inFinished, int streamId, int associatedStreamId,
List<Header> headerBlock) {
if (pushedStream(streamId)) {
pushHeadersLater(streamId, headerBlock, inFinished);
return;
}
Http2Stream stream;
synchronized (Http2Connection.this) {
//通過id拿到stream
stream = getStream(streamId);
if (stream == null) {
// If we're shutdown, don't bother with this stream.
if (shutdown) return;
// If the stream ID is less than the last created ID, assume it's already closed.
if (streamId <= lastGoodStreamId) return;
// If the stream ID is in the client's namespace, assume it's already closed.
if (streamId % 2 == nextStreamId % 2) return;
// Create a stream.
Headers headers = Util.toHeaders(headerBlock);
final Http2Stream newStream = new Http2Stream(streamId, Http2Connection.this,
false, inFinished, headers);
lastGoodStreamId = streamId;
streams.put(streamId, newStream);
listenerExecutor.execute(new NamedRunnable("OkHttp %s stream %d", hostname, streamId) {
@Override public void execute() {
try {
listener.onStream(newStream);
} catch (IOException e) {
Platform.get().log(INFO, "Http2Connection.Listener failure for " + hostname, e);
try {
newStream.close(ErrorCode.PROTOCOL_ERROR);
} catch (IOException ignored) {
}
}
}
});
return;
}
}
// Update an existing stream.
//更新一個存在的流信息,看下去這個方法
stream.receiveHeaders(headerBlock);
if (inFinished) stream.receiveFin();
}
/**
* Accept headers from the network and store them until the client calls {@link #takeHeaders}, or
* {@link FramingSource#read} them.
*/
void receiveHeaders(List<Header> headers) {
assert (!Thread.holdsLock(Http2Stream.this));
boolean open;
synchronized (this) {
hasResponseHeaders = true;
//headersQueue是一個雙端隊列酬土,用它來接受保存返回的headers信息
headersQueue.add(Util.toHeaders(headers));
open = isOpen();
//配合wait方法枷邪,實際是喚醒takeHeaders()方法里的wait,告訴它讀到了headers
notifyAll();
}
if (!open) {
connection.removeStream(id);
}
}
//取headers 這個也是stream的方法,stream是什么時候創(chuàng)建的呢东揣,下篇講CallServerInterceptor的時候會講到
public synchronized Headers takeHeaders() throws IOException {
readTimeout.enter();
try {
//headersQueue為空的時候一直等待
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
//被喚醒后返回header
if (!headersQueue.isEmpty()) {
return headersQueue.removeFirst();
}
throw new StreamResetException(errorCode);
}
二践惑、ConnectionPool
鏈接池,看名字就能聯(lián)想到線程池之類的池設(shè)計嘶卧,都是為了減少資源創(chuàng)建,提高資源復(fù)用率而設(shè)計的芥吟。連接池是用來管理http和http/2的鏈接復(fù)用,通過讓同一個address將共享同一個connection贡未,以便減少網(wǎng)絡(luò)請求延遲狠怨。
成員變量和構(gòu)造函數(shù):
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//這是一個用于清除過期鏈接的線程池孽鸡,每個線程池最多只能運行一個線程灵奖,并且這個線程池允許被垃圾回收
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
//每個address的最大空閑連接數(shù)箫踩。
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//清理過期鏈接任務(wù)
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
//清除方法
long waitNanos = cleanup(System.nanoTime());
//-1代表沒有鏈接猿规,直接退出
if (waitNanos == -1) return;
//等待
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
//這是object頂級父類的方法张遭,大部分時間清理線程會在這里等待,這里會配合connectionBecameIdle()
//里的notifyAll方法被喚醒來使用聘裁,詳見 http://www.reibang.com/p/c518f9c07a80
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//鏈接的雙向隊列,在這里存儲可復(fù)用的鏈接
private final Deque<RealConnection> connections = new ArrayDeque<>();
//路由的數(shù)據(jù)庫
final RouteDatabase routeDatabase = new RouteDatabase();
//清理任務(wù)正在執(zhí)行的標志
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
//創(chuàng)建一個適用于單個應(yīng)用程序的新連接池善镰。
//該連接池的參數(shù)將在未來的okhttp中發(fā)生改變
//目前最多可容乃5個空閑的連接,存活期是5分鐘
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
//保持活著的時間转晰,否則清理將旋轉(zhuǎn)循環(huán)
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
...
}
先搞明白那個清除cleanup(long now)方法:
//對該池執(zhí)行維護,如果它超出保持活動限制或空閑連接限制士飒,則驅(qū)逐空閑時間最長的連接查邢。
//返回納秒級的睡眠持續(xù)時間,直到下次預(yù)定調(diào)用此方法為止变汪。 如果不需要進一步清理,則返回-1蚁趁。
long cleanup(long now) {
//在用的鏈接數(shù)
int inUseConnectionCount = 0;
//閑置的鏈接數(shù)
int idleConnectionCount = 0;
//閑置最久的鏈接
RealConnection longestIdleConnection = null;
//閑置最久的時間
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
//遍歷所有的連接裙盾,標記處不活躍的連接。
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
//1. 查詢此連接內(nèi)部的StreanAllocation的引用數(shù)量,大于0則跳過這個連接
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
//2. 標記閑置最久的空閑連接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//遍歷完后
//3. 如果空閑連接超過5個或者keepalive時間大于5分鐘番官,則將該連接清理掉庐完。
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//longestIdleConnection此時必然不為空
//只有這個分支才會清理連接,清理后需要關(guān)閉鏈接徘熔,最終return 0
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
//這種情況是只是有空閑的鏈接但是還沒有超過5個且閑置時間還沒到5分鐘
//4. 返回此連接的到期時間门躯,供下次進行清理。
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
//5. 全部都是活躍連接酷师,5分鐘時候再進行清理讶凉。
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//6. 沒有任何連接,跳出循環(huán)山孔。
cleanupRunning = false;
return -1;
}
}
//7. 關(guān)閉連接懂讯,返回時間0,立即再次進行清理台颠。
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
pruneAndGetAllocationCount()方法:
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
//遍歷弱引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//若StreamAllocation被使用則接著循環(huán)
if (reference.get() != null) {
i++;
continue;
}
//到了這里表示這個connect沒有被引用了褐望,但是卻還存在,說明沒有關(guān)閉流操作而導(dǎo)致了內(nèi)存泄露
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
//若StreamAllocation未被使用則移除引用串前,這邊注釋為泄露
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
//如果列表為空則說明此連接沒有被引用了瘫里,則返回0,表示此連接是空閑連接
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
//走到這說明鏈接被引用
return references.size();
}
再來看看清理的任務(wù)是什么時候執(zhí)行的:
//加入一個鏈接到這個鏈接池
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//cleanupRunning 這個變量是在上面的cleanup()方法里控制的荡碾,這個變量保證了同時間只會有一個清理任務(wù)在跑
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
總結(jié)清理閑置鏈接機制:
- 利用一個線程池在不斷監(jiān)控當前的閑置鏈接數(shù)量和鏈接閑置的時長谨读,當數(shù)量和時長出現(xiàn)超載的情況的時候就會執(zhí)行清除動作。
- 每當往連接池加入一個鏈接的時候玩荠,會根據(jù)當前是否有清理線程來決定是否開啟一個新的清理線程漆腌,保證始終只有一個清理線程任務(wù)在跑缠沈。
下面ConnectionPool的實例化的過程佳晶,一個OkHttpClient只包含一個ConnectionPool给猾,其實例化是在OkHttpClient的實例化過程中進行的锥累。ConnectionPool各個方法的調(diào)用并沒有直接對外暴露淤齐,而是通過OkHttpClient的Internal接口統(tǒng)一對外暴露螟左。
Internal.instance = new Internal() {
...
@Override public boolean connectionBecameIdle(
ConnectionPool pool, RealConnection connection) {
return pool.connectionBecameIdle(connection);
}
@Override public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
@Override public Socket deduplicate(
ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
return pool.deduplicate(address, streamAllocation);
}
@Override public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
@Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
return connectionPool.routeDatabase;
}
...
}
再來看看ConnectionPool的其他方法憨愉,看明白了這些方法也就大概了解了它的工作流程艰额,get()方法:
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists. The route is null if the address has not yet been routed.
* 返回一個可復(fù)用鏈接匆骗,如果還沒有被創(chuàng)建則為null
*/
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
//斷言劳景,判斷線程是不是被自己鎖住了
assert (Thread.holdsLock(this));
// 遍歷已有連接集合
for (RealConnection connection : connections) {
//如果connection和需求中的"地址"和"路由"匹配
if (connection.isEligible(address, route)) {
//復(fù)用這個連接
streamAllocation.acquire(connection);
//返回這個連接
return connection;
}
}
return null;
}
connectionBecameIdle()方法:
/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
//手動把一個connection置為閑置狀態(tài),一般是外部主動調(diào)用
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
//該連接已經(jīng)不可用可直接移除
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
//喚醒cleanupRunnable 線程來清理他
notifyAll();
return false;
}
}
deduplicate()方法:
/**
* Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
* This recovers when multiple multiplexed connections are created concurrently.
*/
//如果可能碉就,用共享連接替換 {@code streamAllocation} 持有的連接盟广。
// 當同時創(chuàng)建多個多路復(fù)用連接時會恢復(fù)。
//該方法主要是針對多路復(fù)用連接清除的場景瓮钥。如果是當前連接是HTTP/2筋量,那么所有指向該站點的請求都應(yīng)該基于同一個TCP連接烹吵。
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
//遍歷當前的connection
for (RealConnection connection : connections) {
//connection 能讓address復(fù)用&&connection支持多路復(fù)用(http2.0就能支持)&&
//connection 不等于streamAllocation持有的connection
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
evictAll()方法:
/** Close and remove all idle connections in the pool. */
//關(guān)閉并移除連接池里的所有空閑鏈接
//這應(yīng)該也是一個外部主動調(diào)用的方法
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
總結(jié):
ConnectionPool的主要職責(zé)就是維護了一個RealConnection的雙端隊列,并且維護了一個定時清理空閑和多余connection的線程池桨武,并提供了一些相應(yīng)的操作方法來維護連接池的穩(wěn)定性和提供相應(yīng)的功能肋拔。
三、StreamAllocation
流分配呀酸,Connection是建立在Socket之上的物理通信信道凉蜂,而Stream則是代表邏輯的流,至于Call是對一次請求過程的封裝性誉。之前也說過一個Call可能會涉及多個流(比如重定向或者auth認證等情況)窿吩。所以我們想一下,如果StreamAllocation要想解決上述問題艾栋,需要兩個步驟爆存,一是尋找連接,二是獲取流蝗砾。所以StreamAllocation里面應(yīng)該包含一個Stream先较;還應(yīng)該包含連接Connection。如果想找到合適的鏈接悼粮,還需要一個連接池ConnectionPool屬性闲勺。所以應(yīng)該有一個獲取流的方法在StreamAllocation里面,還應(yīng)該有完成請求任務(wù)的之后的方法來關(guān)閉流對象扣猫,還有終止和取消等方法菜循,以及釋放資源的方法。
成員變量及構(gòu)造函數(shù):
public final class StreamAllocation {
public final Address address;//地址
private Route route; //路由
private final ConnectionPool connectionPool; //連接池
private final Object callStackTrace; //日志
// State guarded by connectionPool.
private final RouteSelector routeSelector; //路由選擇器
private int refusedStreamCount; //拒絕的次數(shù)
private RealConnection connection; //連接
private boolean released; //是否已經(jīng)被釋放
private boolean canceled //是否被取消了
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
}
看到這些成員變量是不是很眼熟申尤,就是之前講過的鏈接以及連接池癌幕,路由這些,下面看看它的幾個重要的方法昧穿,先看看在ConnectInterceptor里調(diào)用到的newStream()方法:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
//各種超時值獲取
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
//鏈接失敗是否重試
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//獲取一個健康可用的連接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//實例化HttpCodec,如果是HTTP/2則是Http2Codec否則是Http1Codec
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
findHealthyConnection()方法
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
//循環(huán)獲取勺远,一直得到健康可用的connection為止
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
//如果這是個全新的鏈接,可以跳過健康檢測直接返回
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//如果不是健康的鏈接时鸵,繼續(xù)循環(huán)獲取
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
//拿到了健康可用的胶逢,返回
return candidate;
}
}
繼續(xù)看下findConnection()方法:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
//1.獲取存在的連接
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
// 如果已經(jīng)存在的連接滿足要求,則使用已存在的連接
return allocatedConnection;
}
//2. 嘗試從鏈接池中去取饰潜,這個get()方法我們在前面講到過初坠,實際調(diào)用的是connectionPool的get()方法
// 最終會調(diào)用到StreamAllocation里的acquire()方法,這個方法會給connection變量賦值
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 線路的選擇彭雾,多ip的支持
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
//里面是個遞歸
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
//更換路由再次嘗試
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 以上都不符合碟刺,創(chuàng)建一個連接
result = new RealConnection(connectionPool, selectedRoute);
//給connection賦值一下
acquire(result);
}
//連接并握手
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
//更新本地數(shù)據(jù)庫
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
//把連接放到連接池中
Internal.instance.put(connectionPool, result);
//如果這個連接是多路復(fù)用
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
//調(diào)用connectionPool的deduplicate方法去重。調(diào)用的其實是
//connectionPool里的方法,這個方法在上面有貼出來
//最終返回socket的方法是streamAllocation的releaseAndAcquire()薯酝,下面有貼出
//socket 不為空則代表有確實有重復(fù)的socket半沽,下面會把他關(guān)掉達到復(fù)用不浪費資源的目的
//其實我在這里有個疑問,前面調(diào)用連接池的get方法如果沒有拿到復(fù)用的connection,那不就說明沒有
//重復(fù)的connection嗎,何必要在這去重呢重挑?
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//如果是重復(fù)的socket則關(guān)閉socket,不是則socket為nul史煎,什么也不做
closeQuietly(socket);
//返回整個連接
return result;
}
/**
* Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
* {@link #release} on the same connection.
獲得一個connection
*/
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
//此時connection 必須是空的,才能被賦值,如果不為空會報非法異常
if (this.connection != null) throw new IllegalStateException();
//七拐八繞最終在這里賦值
this.connection = connection;
//connection被StreamAllocation引用的集合+1
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
//釋放socket并且得到一個可復(fù)用的connection
public Socket releaseAndAcquire(RealConnection newConnection) {
assert (Thread.holdsLock(connectionPool));
//此時的成員connection的allocations必然只有一個,因為他是新建的
if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException();
// Release the old connection.
Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0);
//這個方法后面分析
Socket socket = deallocate(true, false, false);
// Acquire the new connection.
this.connection = newConnection;
//這個newConnection是傳進來的復(fù)用的诡延,關(guān)聯(lián)自身StreamAllocation
newConnection.allocations.add(onlyAllocation);
return socket;
}
/**
* Releases resources held by this allocation. If sufficient resources are allocated, the
* connection will be detached or closed. Callers must be synchronized on the connection pool.
*
* <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
* of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
*/
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
if (streamFinished) {
this.codec = null;
}
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true;
}
if (this.codec == null && (this.released || connection.noNewStreams)) {
//清除此鏈接的StreamAllocation引用
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
//connectionBecameIdle()孕暇,把connection從鏈接池移除妖滔,此時拿到對應(yīng)的socket
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;
}
以上findHealthyConnection()方法的總結(jié):
1座舍、循環(huán)獲取connection實例采蚀,直到獲取到健康可用的,獲取實例的時候先找是否有已經(jīng)存在的連接,如果有已經(jīng)存在的連接神妹,并且可以使用(!noNewStreams)則直接返回。
2蛹找、沒有現(xiàn)成的就根據(jù)已知的address在connectionPool里面找熄赡,如果有連接,則返回
3彼硫、更換路由凌箕,更換線路拧篮,在connectionPool里面再次查找牵舱,如果有則返回串绩。
4芜壁、如果以上條件都不滿足則直接new一個RealConnection出來
5慧妄、new出來的RealConnection通過acquire關(guān)聯(lián)到connection.allocations上
6、做去重判斷饱普,如果有重復(fù)的socket則關(guān)閉
最終得到了一個可用的鏈接connection谁帕,并且在newStream()方法后還用這個鏈接新建了一個Http2Codec(http2.0) 峡继,Http2Codec 的作用主要是對請求進行編碼和對response進行解碼,可以理解成對流的一些操作封裝匈挖。
其他方法暫時沒用到鬓椭,不做一一講解,下篇分析最后一個攔截器CallServerInterceptor关划,最終跟服務(wù)器產(chǎn)生通信的階段,結(jié)合這個攔截器再來重新組織起來看看這篇文章講到的知識點翘瓮。