1.okHttp使用流程分析
// 1.創(chuàng)建OkHttpClient對(duì)象
OkHttpClient client = new OkHttpClient.Builder().build();
// 2.創(chuàng)建Request對(duì)象
Request request = new Request.Builder().build();
// 3.創(chuàng)建請(qǐng)求對(duì)象
Call call = client.newCall(request);
// 4.同步請(qǐng)求
try {
// 同步返回結(jié)果
Response execute = call.execute();
} catch (IOException e) {
e.printStackTrace();
}
// 4.異步使用 異步返回結(jié)果
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
- okhttp請(qǐng)求發(fā)起流程分析
1.同步請(qǐng)求
執(zhí)行call.execute()方法,實(shí)際上會(huì)執(zhí)行到RealCall的execute方法智政,方法所示:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
//此處的代碼是將請(qǐng)求加入到okhttp的同步請(qǐng)求隊(duì)列中
client.dispatcher().executed(this);
//此處代碼是真正的發(fā)出去同步請(qǐng)求并返回結(jié)果狸剃,這里涉及到okhttp的攔截器,下面會(huì)細(xì)講
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
2.異步請(qǐng)求
執(zhí)行 call.enqueue()方法實(shí)際上會(huì)執(zhí)行到RealCall的enqueue方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 獲取okhttp請(qǐng)求調(diào)度器調(diào)用異步請(qǐng)求,AsyncCall方法是一個(gè)實(shí)現(xiàn)Runnable的類
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall繼承NamedRunnable贼陶,NamedRunnable實(shí)現(xiàn)了Runnable,rRunnable的run方法會(huì)執(zhí)行到AsyncCall的execute方法
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
// 實(shí)際是Runnable的run方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
// 進(jìn)入攔截器巧娱,發(fā)起請(qǐng)求獲取請(qǐng)求結(jié)果
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 請(qǐng)求失敗回調(diào)
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 請(qǐng)求成功回調(diào)
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
2.okHttp使用的設(shè)計(jì)模式分析
- Builder設(shè)計(jì)模式
okHttpClient的Builder類,碉怔,提供默認(rèn)配置參數(shù)
public Builder() {
dispatcher = new Dispatcher(); // 默認(rèn)請(qǐng)求調(diào)度器
protocols = DEFAULT_PROTOCOLS; // 默認(rèn)請(qǐng)求協(xié)議 http/1.1 和http/2
connectionSpecs = DEFAULT_CONNECTION_SPECS; // 提供默認(rèn)TLS 連接
eventListenerFactory = EventListener.factory(EventListener.NONE); //指標(biāo)事件監(jiān)聽器,可以監(jiān)控HTTP 呼叫的數(shù)量禁添、大小和持續(xù)時(shí)間
proxySelector = ProxySelector.getDefault(); // 默認(rèn)代理服務(wù)器
cookieJar = CookieJar.NO_COOKIES; // 默認(rèn)cookie實(shí)現(xiàn)
socketFactory = SocketFactory.getDefault(); // 默認(rèn)socket創(chuàng)建工廠
hostnameVerifier = OkHostnameVerifier.INSTANCE; // 默認(rèn)主機(jī)名驗(yàn)證器 可以進(jìn)行證書校驗(yàn)
certificatePinner = CertificatePinner.DEFAULT; // 加密證書之類的 俺也不太懂
proxyAuthenticator = Authenticator.NONE; // 代理請(qǐng)求認(rèn)證 空實(shí)現(xiàn)
authenticator = Authenticator.NONE; //請(qǐng)求認(rèn)證 空實(shí)現(xiàn)
connectionPool = new ConnectionPool(); // 默認(rèn)連接池 應(yīng)該socket連接的連接池
dns = Dns.SYSTEM; // 默認(rèn)dns解析
followSslRedirects = true; // ssl重定向
followRedirects = true; // 默認(rèn)開啟重定向
retryOnConnectionFailure = true; // 失敗重試
connectTimeout = 10_000; // 連接超時(shí)
readTimeout = 10_000; // 讀取超時(shí)
writeTimeout = 10_000; // 寫入超時(shí)
pingInterval = 0; // 心跳間隔
}
Request類的Builder類撮胧,提供默認(rèn)請(qǐng)求方式
public Builder() {
this.method = "GET"; //默認(rèn)get請(qǐng)求
this.headers = new Headers.Builder();
}
- 責(zé)任鏈設(shè)計(jì)模式
前邊已經(jīng)介紹過(guò),這里是okhttp發(fā)起請(qǐng)求的地方老翘,從這里開始通過(guò)責(zé)任鏈模式走okHttp的所有攔截器
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); // 加載用戶自定義攔截器
interceptors.add(retryAndFollowUpInterceptor); // 重試攔截器
interceptors.add(new BridgeInterceptor(client.cookieJar())); // 橋接攔截器
interceptors.add(new CacheInterceptor(client.internalCache())); // 緩存攔截器
interceptors.add(new ConnectInterceptor(client)); // 連接攔截器
if (!forWebSocket) { //如果不是webSocket協(xié)議 添加
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket)); // 請(qǐng)求攔截器 最后的一個(gè)攔截器
// 構(gòu)建責(zé)任鏈對(duì)象
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 進(jìn)入責(zé)任鏈
return chain.proceed(originalRequest);
}
}
通過(guò)上述代碼芹啥,最終會(huì)進(jìn)入到RealInterceptorChain的proceed方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
// 構(gòu)建攔截器責(zé)任鏈的下一個(gè)節(jié)點(diǎn)
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 獲取當(dāng)前攔截器
Interceptor interceptor = interceptors.get(index);
// 執(zhí)行當(dāng)前攔截器 注意這里的參數(shù)是next,也就是說(shuō)是下一個(gè)責(zé)任鏈上的節(jié)點(diǎn)
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
此處Response response = interceptor.intercept(next);
會(huì)執(zhí)行攔截器的intercept方法铺峭,以第一個(gè)攔截器RetryAndFollowUpInterceptor為例墓怀,查看此攔截器的intercept方法
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//... 此處省略部分代碼
try {
// 執(zhí)行責(zé)任鏈節(jié)點(diǎn)的處理方法 記得上邊的攔截器中傳入的下一個(gè)節(jié)點(diǎn)的對(duì)象,所以這里就調(diào)到下一個(gè)節(jié)點(diǎn)的proceed方法
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
//... 此處省略部分代碼
priorResponse = response;
}
}
這樣責(zé)任鏈的每一個(gè)節(jié)點(diǎn)都會(huì)先執(zhí)行攔截的intercept方法卫键,然后執(zhí)行下一個(gè)節(jié)點(diǎn)的proceed的方法傀履,直到執(zhí)行到最后一個(gè)攔截器
CallServerInterceptor的intercept方法,會(huì)發(fā)起真正的請(qǐng)求永罚,拿到響應(yīng)體啤呼,依次返回結(jié)果。(具體請(qǐng)求過(guò)程待補(bǔ)充)
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
// ...省略部分代碼
Request request = realChain.request();
if (responseBuilder == null) {
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// ...省略部分代碼
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
// ...省略部分代碼
httpCodec.finishRequest();
// 構(gòu)建返回體對(duì)象 真正返回呢袱,這里采用okIo進(jìn)行網(wǎng)絡(luò)請(qǐng)求官扣,后期有時(shí)間補(bǔ)充對(duì)應(yīng)的請(qǐng)求過(guò)程
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
return response;
}
3.okHttp的線程調(diào)度器(Dispatcher)分析
從上邊的請(qǐng)求過(guò)程可以看到,不管同步請(qǐng)求或者異步請(qǐng)求羞福,都會(huì)調(diào)用okHttpClient的dispatcher()來(lái)操作惕蹄,
//同步請(qǐng)求
client.dispatcher().executed(this);
// 異步請(qǐng)求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
Dispatcher調(diào)度器的所有成員變量
private int maxRequests = 64; //同時(shí)最大請(qǐng)求數(shù)量
private int maxRequestsPerHost = 5; // 同時(shí)最大同一域名的請(qǐng)求數(shù)量
private @Nullable Runnable idleCallback; // 調(diào)度器空閑狀態(tài)的回調(diào)
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService; // 請(qǐng)求線程池
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); // 異步請(qǐng)求等待隊(duì)列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 異步請(qǐng)求運(yùn)行隊(duì)列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); // 同步請(qǐng)求運(yùn)行隊(duì)列
同步請(qǐng)求隊(duì)列分析,從RealCall的同步請(qǐng)求方法開始
@Override public Response execute() throws IOException {
synchronized (this) {
// ...省略部分代碼
try {
client.dispatcher().executed(this); // 加載同步請(qǐng)求隊(duì)列
Response result = getResponseWithInterceptorChain(); //獲取請(qǐng)求結(jié)果
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this); //完成同步請(qǐng)求
}
}
異步請(qǐng)求隊(duì)列分析,從RealCall的異步請(qǐng)求enqueue()方法開始分析
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
// ...省略部分代碼
client.dispatcher().enqueue(new AsyncCall(responseCallback)); // 加入異步請(qǐng)求隊(duì)列
}
具體實(shí)現(xiàn)治专,看Dispatcher的enqueue方法
synchronized void enqueue(AsyncCall call) {
// 如果正在運(yùn)行異步請(qǐng)求數(shù)量小于最大請(qǐng)求數(shù)量限制
//并且 同一域名的請(qǐng)求數(shù)量小于maxRequestsPerHost卖陵,將請(qǐng)求加入運(yùn)行中隊(duì)列,
// 并直接加入的線程池執(zhí)行张峰,否則添加到等待隊(duì)列
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call); //加入異步請(qǐng)求運(yùn)行中隊(duì)列
executorService().execute(call); // 執(zhí)行異步請(qǐng)求
} else {
readyAsyncCalls.add(call); // 加入異步請(qǐng)求等待隊(duì)列
}
}
異步請(qǐng)求就會(huì)在線程中執(zhí)行Runnable的run方法泪蔫,也就會(huì)執(zhí)行到AsyncCall 的execute方法,這樣就可以正在發(fā)起請(qǐng)求獲取響應(yīng)結(jié)果喘批。
final class AsyncCall extends NamedRunnable {
// ...省略部分代碼
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(); //獲取請(qǐng)求結(jié)果
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
// ...省略部分代碼
} finally {
client.dispatcher().finished(this); //請(qǐng)求結(jié)束
}
}
}
不管是同步請(qǐng)求還是異步請(qǐng)求撩荣,最后都會(huì)執(zhí)行到 client.dispatcher().finished(this); 這句代碼铣揉,我們看一下具體的實(shí)現(xiàn)
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//執(zhí)行結(jié)束,將請(qǐng)求從隊(duì)列中移除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// promoteCalls 同步請(qǐng)求為false,異步請(qǐng)求為true餐曹,調(diào)用promoteCalls來(lái)判斷是否可以將等待隊(duì)列的請(qǐng)求放到運(yùn)行隊(duì)列
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount(); //獲取請(qǐng)求的數(shù)量
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) { //如果請(qǐng)求隊(duì)列中沒(méi)有數(shù)據(jù)逛拱,也就是說(shuō)Dispatcher處于空閑狀態(tài),調(diào)用idleCallback的run方法
idleCallback.run();
}
}
promoteCalls()方法的實(shí)現(xiàn)
private void promoteCalls() {
//運(yùn)行隊(duì)列的數(shù)量是否大于最大請(qǐng)求數(shù)量
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
// 等到隊(duì)列是否為空
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
// 迭代等待隊(duì)列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 判定同一域名的請(qǐng)求是否消息最大數(shù)量
if (runningCallsForHost(call) < maxRequestsPerHost) {
// 移除等待隊(duì)列數(shù)據(jù)
i.remove();
//加入運(yùn)行中隊(duì)列
runningAsyncCalls.add(call);
// 加入線程池
executorService().execute(call);
}
// 這句代碼個(gè)人感覺(jué)冗余了
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
- Dispatcher線程池的配置
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 第一個(gè)參數(shù) 核心線程數(shù)
// 第二個(gè)參數(shù) 最大線程數(shù)
// 第三個(gè)參數(shù) 線程碧ê铮活等到時(shí)間
// 第四個(gè)參數(shù) 單位秒
// 第五個(gè)參數(shù) 線程阻塞隊(duì)列
// 第六個(gè)參數(shù) 線程構(gòu)造工廠
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
1.線程池的核心線程數(shù)為什么設(shè)置成0朽合,最小線程數(shù)設(shè)置成最大值
由于okHttp已經(jīng)自己利用readyAsyncCalls
和runningAsyncCalls
管理請(qǐng)求隊(duì)列和等待隊(duì)列,沒(méi)必要再通過(guò)線程池來(lái)控制了
2.采用SynchronousQueue來(lái)實(shí)現(xiàn)等待隊(duì)列
SynchronousQueue是無(wú)界的饱狂,是一種無(wú)緩沖的等待隊(duì)列曹步,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加休讳;可以認(rèn)為SynchronousQueue是一個(gè)緩存值為1的阻塞隊(duì)列箭窜,但是 isEmpty()方法永遠(yuǎn)返回是true,remainingCapacity() 方法永遠(yuǎn)返回是0衍腥,remove()和removeAll() 方法永遠(yuǎn)返回是false,iterator()方法永遠(yuǎn)返回空纳猫,peek()方法永遠(yuǎn)返回null婆咸。