Java HttpComponents源碼閱讀1
Java HttpComponents源碼閱讀2
MainClientExec#exec
MainClientExec
作為執(zhí)行鏈中的最后一個(gè)請(qǐng)求執(zhí)行器宠互,負(fù)責(zé)執(zhí)行實(shí)際的請(qǐng)求操作。所以這個(gè)方法匯總了大部分連接的處理椭坚,下面一步步進(jìn)行拆解予跌;
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
...
Object userToken = context.getUserToken();
// 從連接池中獲取http請(qǐng)求句柄
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
connManager
為PoolingHttpClientConnectionManager
對(duì)象,每一個(gè)HTTP請(qǐng)求都會(huì)調(diào)用requestConnection
方法來獲取一個(gè)請(qǐng)求的句柄善茎;
PoolingHttpClientConnectionManager#requestConnection
private final CPool pool;
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
// pool對(duì)象為CPool類型券册,根據(jù)路由從池中租用連接句柄
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() { ... };
}
根據(jù)路由從連接池中租用連接的句柄,從該句柄可以拿到HttpClientConnection
對(duì)象巾表,注意這里新分配的連接可以以關(guān)閉狀態(tài)返回汁掠;state
代表著期望的連接狀態(tài),如果為null表示新的連接不期望攜帶任何狀態(tài)集币,否則從可用連接池中獲取請(qǐng)求時(shí)會(huì)去尋找連接狀態(tài)為state
的連接考阱;
CPool#lease
public Future<E> lease(final T route, final Object state,
final FutureCallback<E> callback) {
return new Future<E>() { ... }
}
lease
方法返回一個(gè)Future
句柄,該句柄定義了租用連接的具體細(xì)節(jié)鞠苟,這里該句柄已經(jīng)和路由綁定起來乞榨;
繼續(xù)接MainClientExec#exec
...
if (execAware != null) {
if (execAware.isAborted()) {
// 如果請(qǐng)求被中斷,取消請(qǐng)求
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
}
// 通過cas設(shè)置cancellable方法
execAware.setCancellable(connRequest);
}
...
所有的方法類如HttpGet
都是繼承HttpExecutionAware
当娱,這里把請(qǐng)求取消的處理和請(qǐng)求綁定起來吃既;
...
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
try {
// 獲取配置中的超時(shí)時(shí)間
final int timeout = config.getConnectionRequestTimeout();
// 在一定時(shí)間內(nèi)從句柄接口中獲取HttpClientConnection連接
// 見下面
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final InterruptedException interrupted) {
// 如果遇到線程中斷直接拋異常
Thread.currentThread().interrupt();
throw new RequestAbortedException("Request aborted", interrupted);
} catch(final ExecutionException ex) {
// 試圖檢索因拋出異常而中止的任務(wù)的結(jié)果時(shí)引發(fā)的異常,
// 主要是和Future有關(guān)的跨细,在后面可以看到為什么會(huì)發(fā)生這個(gè)異常
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
throw new RequestAbortedException("Request execution failed", cause);
}
...
根據(jù)配置來的超時(shí)時(shí)間請(qǐng)求HttpClientConnection
連接對(duì)象鹦倚;
ConnectionRequest#get
public HttpClientConnection get(
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
// 根據(jù)route從連接池中獲取connection
// 見下面
final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
if (conn.isOpen()) {
// 獲取到重用連接,socket還未被關(guān)閉冀惭,重新刷新socket配置
final HttpHost host;
if (route.getProxyHost() != null) {
host = route.getProxyHost();
} else {
host = route.getTargetHost();
}
final SocketConfig socketConfig = resolveSocketConfig(host);
// 重新刷新timeout
conn.setSocketTimeout(socketConfig.getSoTimeout());
}
return conn;
}
在給定時(shí)間內(nèi)在連接池中獲取連接震叙,調(diào)用此方法時(shí)將阻塞掀鹅,直到連接變?yōu)榭捎谩⒊瑫r(shí)過期或連接管理器關(guān)閉為止媒楼。如果在阻塞時(shí)或開始之前調(diào)用了cancel()乐尊,將拋出一個(gè)InterruptedException
。
PoolingHttpClientConnectionManager#leaseConnection
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
// future里面含有路由信息划址,從future中獲取CPoolEntry對(duì)象
// 見下面
entry = future.get(timeout, timeUnit);
if (entry == null || future.isCancelled()) {
throw new ExecutionException(new CancellationException("Operation cancelled"));
}
return CPoolProxy.newProxy(entry); // 返回的是CPoolEntry的代理對(duì)象
} catch (final TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
leaseConnection
做的事情就是執(zhí)行future
句柄定義好的獲取連接操作扔嵌,成功獲取到CPoolEntry
后會(huì)返回一個(gè)代理對(duì)象CPoolProxy
,該代理的核心功能就是校驗(yàn)連接的可用性夺颤,一旦發(fā)現(xiàn)連接不可用了痢缎,就會(huì)拋出ConnectionShutdownException
異常;
AbstractConnPool$future#get
private final AtomicBoolean cancelled = new AtomicBoolean(false);
// 代表完成獲取連接
private final AtomicBoolean done = new AtomicBoolean(false);
// 儲(chǔ)存的是CPoolEntry對(duì)象
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
// 這里開始阻塞
synchronized (this) { // this對(duì)象是指future對(duì)象
try {
// 第一次獲取entry為null
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
// 如果已經(jīng)成功獲取到了連接
throw new ExecutionException(operationAborted());
}
// 租用entry拂共,這里獲取到的entry一定是非空
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
// 如果開啟校驗(yàn)連接可用性
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
// 檢查連接是否可用
if (!validate(leasedEntry)) {
// 連接不可用牺弄,釋放連接姻几,并重新獲取連接
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
// 防止重復(fù)租用連接
if (done.compareAndSet(false, true)) {
// 更新狀態(tài)
entryRef.set(leasedEntry);
done.set(true);
// 成功租用連接的鉤子
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
// cas失敗宜狐,該future已被其他線程用來租用連接
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
// getPoolEntryBlocking會(huì)拋IOException,通常是socket的IO異常
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
自旋的獲取CPoolEntry
對(duì)象蛇捌,無(wú)論是從空閑的連接池或是新創(chuàng)建一個(gè)新的連接抚恒,總之從這個(gè)方法獲取到的CPoolEntry
一定是可用的連接對(duì)象;
AbstractConnPool#getPoolEntryBlocking
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
this.lock.lock();
try {
// 根據(jù)路由返回特定的連接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
// 根據(jù)state去available連接池中獲取連接
entry = pool.getFree(state);
if (entry == null) {
// 連接池中沒有空閑的連接
break;
}
// 獲取到連接
if (entry.isExpired(System.currentTimeMillis())) {
// 連接已經(jīng)過期络拌,將連接關(guān)閉
entry.close();
}
if (entry.isClosed()) {
// 如果連接已被關(guān)閉俭驮,將其從available移除
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
// 如果從連接池中獲取到連接,將連接從available中拿出來放置到租用池中
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry); // 重用連接的鉤子
return entry;
}
// 連接池中沒有可用連接春贸,將創(chuàng)建新的連接
final int maxPerRoute = getMax(route);
// 在分配新連接之前縮小連接池大小
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
// 每個(gè)路由的最大容量是有限的
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
// 空閑連接池容量滿了混萝,將最近一個(gè)空閑連接移出空閑池
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
// 創(chuàng)建出來的對(duì)象是LoggingManagedHttpClientConnection
final C conn = this.connFactory.create(route);
// 同一個(gè)連接會(huì)在RouteSpecificPool中和AbstractConnPool保存兩份
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
// 創(chuàng)建連接失敗,因?yàn)樵撀酚傻倪B接已經(jīng)達(dá)到最大上限
boolean success = false;
try {
// 將該創(chuàng)建連接的任務(wù)緩存起來
pool.queue(future);
this.pending.add(future);
// 等待直到有連接被釋放
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// 一般情況下等待線程被連接池喚醒萍恕,現(xiàn)在應(yīng)該有一個(gè)空閑連接可用
// 但是也有可能等待超時(shí)了逸嘀,無(wú)論如何只要繼續(xù)循環(huán),這兩種情況都會(huì)被檢查允粤。
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
// 在連接池內(nèi)部每一個(gè)route對(duì)應(yīng)一個(gè)連接池
private RouteSpecificPool<T, C, E> getPool(final T route) {
RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
if (pool == null) {
pool = new RouteSpecificPool<T, C, E>(route) {
@Override
protected E createEntry(final C conn) {
return AbstractConnPool.this.createEntry(route, conn);
}
};
this.routeToPool.put(route, pool);
}
return pool;
}
該方法的主要處理邏輯都是發(fā)生在下面兩個(gè)結(jié)構(gòu)中
CPool
作為連接池的核心類崭倘,承擔(dān)了連接池的管理工作,內(nèi)部每個(gè)路由
都對(duì)應(yīng)一個(gè)RouteSpecificPool
类垫,所以同一份連接會(huì)在兩個(gè)地方都保存起來司光;
這個(gè)方法最主要做的事情就是:
- 根據(jù)路由在
RouteSpecificPool
池中查找一個(gè)空閑連接,如果有則判斷連接是否過期超時(shí)悉患,判斷連接可用后直接返回残家; - 如果在空閑連接池中找不到可用連接,則判斷當(dāng)前條件下是否允許創(chuàng)建連接售躁,如果成功創(chuàng)建連接則直接返回坞淮;
- 如果連接池已經(jīng)滿了谴仙,將請(qǐng)求加入等待隊(duì)列,然后將自己阻塞等待其他連接釋放碾盐;
繼續(xù)回到MainClientExec#exec
...
// 創(chuàng)建連接成功后晃跺,將連接保存到上下文中
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
// 如果開啟老化連接檢查
if (config.isStaleConnectionCheckEnabled()) {
if (managedConn.isOpen()) { // 驗(yàn)證連接
if (managedConn.isStale()) {
managedConn.close(); // 發(fā)現(xiàn)陳舊的連接,將連接斷開
}
}
}
// 創(chuàng)建holder毫玖,相當(dāng)于一個(gè)請(qǐng)求包裝類
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
if (execAware != null) {
// 如果前面已經(jīng)設(shè)置了cancellable掀虎,這行代碼就沒有效果了
execAware.setCancellable(connHolder);
}
HttpResponse response;
for (int execCount = 1;; execCount++) {
// 開始執(zhí)行具體請(qǐng)求
if (!managedConn.isOpen()) {
// 如果是新的連接,則需要建立請(qǐng)求的路由
try {
// 建立請(qǐng)求路由
establishRoute(proxyAuthState, managedConn, route, request, context);
} catch (final TunnelRefusedException ex) {
response = ex.getResponse();
break;
}
}
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
managedConn.setSocketTimeout(timeout);
}
...
成功拿到可用連接后付枫,就是開始建立請(qǐng)求路由烹玉,會(huì)根據(jù)當(dāng)前請(qǐng)求目標(biāo)選擇不同的路徑,但是最終都會(huì)調(diào)用下面這個(gè)方法阐滩;
PoolingHttpClientConnectionManager#connect
public void connect(
final HttpClientConnection managedConn,
final HttpRoute route,
final int connectTimeout,
final HttpContext context) throws IOException {
final ManagedHttpClientConnection conn;
synchronized (managedConn) {
final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
final HttpHost host;
if (route.getProxyHost() != null) { // 有代理二打,建立代理路由
host = route.getProxyHost();
} else {
host = route.getTargetHost(); // 沒有代理,直接路由
}
// 實(shí)際建立socket并連接的操作
this.connectionOperator.connect(
conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
}
如果是直接路由掂榔,則將底層連接套接字連接到目標(biāo)地址继效,如果是通過代理(或多個(gè)代理)的路由,則連接到第一個(gè)代理跳轉(zhuǎn)装获。
結(jié)合第一張圖片看瑞信,將LoggingManagedHttpClientConnection
對(duì)象與實(shí)際負(fù)責(zé)通信的socket
綁定;
繼續(xù)回到MainClientExec#exec
...
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
response = requestExecutor.execute(request, managedConn, context);
...
實(shí)際發(fā)送請(qǐng)求消息并拿到響應(yīng)的方法穴豫;
HttpRequestExecutor#execute
public HttpResponse execute(
final HttpRequest request,
final HttpClientConnection conn,
final HttpContext context) throws IOException, HttpException {
try {
// 發(fā)送請(qǐng)求凡简,有些情況下response會(huì)隨著請(qǐng)求返回
HttpResponse response = doSendRequest(request, conn, context);
if (response == null) {
// 接受請(qǐng)求
response = doReceiveResponse(request, conn, context);
}
return response;
} catch (final IOException ex) {
closeConnection(conn);
throw ex;
} catch (final HttpException ex) {
closeConnection(conn);
throw ex;
} catch (final RuntimeException ex) {
closeConnection(conn);
throw ex;
}
}
該方法主要就是處理發(fā)送請(qǐng)求和接受請(qǐng)求;
HttpRequestExecutor#doSendRequest
protected HttpResponse doSendRequest(
final HttpRequest request,
final HttpClientConnection conn,
final HttpContext context) throws IOException, HttpException {
HttpResponse response = null;
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);
// 發(fā)送header
conn.sendRequestHeader(request);
if (request instanceof HttpEntityEnclosingRequest) {
// 檢查expect-continue握手精肃。我們必須刷新頭并等待100-continue響應(yīng)來處理它秤涩。
// 如果我們得到不同的響應(yīng),我們一定不能發(fā)送實(shí)體司抱。
boolean sendentity = true;
final ProtocolVersion ver =
request.getRequestLine().getProtocolVersion();
if (((HttpEntityEnclosingRequest) request).expectContinue() &&
!ver.lessEquals(HttpVersion.HTTP_1_0)) {
// 需要處理100-continue邏輯
conn.flush();
// 在指定時(shí)間內(nèi)等到對(duì)端回復(fù)筐眷,不需要永遠(yuǎn)等待100-continue響應(yīng),如果發(fā)生超時(shí)則發(fā)送實(shí)體請(qǐng)求
if (conn.isResponseAvailable(this.waitForContinue)) {
// 意味著在指定時(shí)間內(nèi)獲取到了100-continue應(yīng)答
response = conn.receiveResponseHeader();
if (canResponseHaveBody(request, response)) {
// 意味著response存在響應(yīng)包状植,解析response
conn.receiveResponseEntity(response);
}
final int status = response.getStatusLine().getStatusCode();
if (status < 200) {
if (status != HttpStatus.SC_CONTINUE) {
throw new ProtocolException(
"Unexpected response: " + response.getStatusLine());
}
// 非正常的狀態(tài)碼浊竟,重置response
response = null;
} else {
sendentity = false;
}
}
}
if (sendentity) {
// 發(fā)送請(qǐng)求實(shí)體
conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
}
}
conn.flush();
context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);
return response;
}
通過給定的連接發(fā)送請(qǐng)求,此方法還處理expect-continue
握手津畸;
HTTP/1.1 協(xié)議里的Expect: 100-continue 意思是振定,在客戶端發(fā)送 Request Message 之前,HTTP/1.1 協(xié)議允許客戶端先判定服務(wù)器是否愿意接受客戶端發(fā)來的消息主體(基于 Request Headers)肉拓。
目的:它可以讓客戶端在發(fā)送請(qǐng)求數(shù)據(jù)之前去判斷服務(wù)器是否愿意接收該數(shù)據(jù)后频,如果服務(wù)器愿意接收,客戶端才會(huì)真正發(fā)送數(shù)據(jù),如果客戶端直接發(fā)送請(qǐng)求數(shù)據(jù)卑惜,但是服務(wù)器又將該請(qǐng)求拒絕的話膏执,這種行為將帶來很大的資源開銷。
發(fā)送了100-continue的客戶端不應(yīng)該永遠(yuǎn)等待服務(wù)器端做出回應(yīng)露久,超時(shí)一段時(shí)間之后更米,客戶端應(yīng)該直接將實(shí)體發(fā)送出去。
比如在POST方法時(shí)HttpPost
就會(huì)去處理expect-continue
的邏輯毫痕,如果在這里有判斷到返回了響應(yīng)征峦,則處理響應(yīng)實(shí)體;
HttpRequestExecutor#doReceiveResponse
protected HttpResponse doReceiveResponse(
final HttpRequest request,
final HttpClientConnection conn,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = null;
int statusCode = 0;
while (response == null || statusCode < HttpStatus.SC_OK) {
response = conn.receiveResponseHeader();
statusCode = response.getStatusLine().getStatusCode();
if (statusCode < HttpStatus.SC_CONTINUE) {
throw new ProtocolException("Invalid response: " + response.getStatusLine());
}
// 解析response entity
if (canResponseHaveBody(request, response)) {
conn.receiveResponseEntity(response);
}
}
return response;
}
doSendRequest
中如果是Get請(qǐng)求消请,會(huì)將HTTP報(bào)文的每一行逐一寫入socket緩沖區(qū)中栏笆,當(dāng)緩沖區(qū)滿了或是數(shù)據(jù)全部寫入后會(huì)發(fā)送出去;
如果是Post請(qǐng)求臊泰,除了發(fā)送請(qǐng)求頭蛉加、請(qǐng)求行、header等信息缸逃,還會(huì)發(fā)送請(qǐng)求體针饥;
doReceiveResponse
方法用來接受響應(yīng),但是不負(fù)責(zé)解析具體的entity察滑,只是將對(duì)應(yīng)的socket句柄封裝在響應(yīng)實(shí)體HttpResponse
里面等待讀却蚶濉修肠;
繼續(xù)回到MainClientExec#exec贺辰,到這里已經(jīng)拿到請(qǐng)求響應(yīng)了;
...
// 判斷連接是否是可重用狀態(tài)
if (reuseStrategy.keepAlive(response, context)) {
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
// 返回的響應(yīng)頭中含有Keep-Alive
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
connHolder.markReusable();
} else {
// 連接不可重用
connHolder.markNonReusable();
}
...
// 校驗(yàn)請(qǐng)求
...
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// 連接不需要并且(假定)處于可重用狀態(tài)
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
}
// HttpResponse的代理類嵌施,可以用來釋放與原始響應(yīng)關(guān)聯(lián)的客戶端連接
return new HttpResponseProxy(response, connHolder);
} catch (final Exception ex) {
...
}
}
至此整個(gè)請(qǐng)求流程就大概完成了饲化,下面看看連接什么時(shí)候放回連接池;
EntityUtils.toString(response.getEntity(), "UTF-8");
最后會(huì)走到下面這個(gè)方法
private static String toString(
final HttpEntity entity,
final ContentType contentType) throws IOException {
final InputStream inStream = entity.getContent();
if (inStream == null) {
return null;
}
try {
int capacity = (int)entity.getContentLength();
Charset charset = null;
...
final Reader reader = new InputStreamReader(inStream, charset);
final CharArrayBuffer buffer = new CharArrayBuffer(capacity);
final char[] tmp = new char[1024];
int l;
while((l = reader.read(tmp)) != -1) {
buffer.append(tmp, 0, l);
}
return buffer.toString();
} finally {
inStream.close();
}
}
public InputStream getContent() throws IOException {
return new EofSensorInputStream(this.wrappedEntity.getContent(), this);
}
關(guān)鍵在于EofSensorInputStream
這個(gè)對(duì)象吗伤,該對(duì)象的核心功能就是檢測(cè)EOF標(biāo)識(shí)符并觸發(fā)close()
操作吃靠,主要用于在使用響應(yīng)體或不再需要連接時(shí)自動(dòng)釋放底層托管的連接。
public int read() throws IOException {
int readLen = -1;
if (isReadAllowed()) {
try {
readLen = wrappedStream.read();
checkEOF(readLen); // 檢查eof
} catch (final IOException ex) {
checkAbort();
throw ex;
}
}
return readLen;
}
// checkEOF會(huì)走到這里
public boolean eofDetected(final InputStream wrapped) throws IOException {
try {
if (wrapped != null) {
wrapped.close();
}
releaseConnection(); // 釋放連接
} catch (final IOException ex) {
abortConnection(); // 中斷連接
throw ex;
} catch (final RuntimeException ex) {
abortConnection();
throw ex;
} finally {
cleanup(); // 關(guān)閉連接
}
return false;
}
當(dāng)從流中讀取到EOF
標(biāo)志符后會(huì)主動(dòng)釋放連接足淆;
ConnectionHolder#releaseConnection
private void releaseConnection(final boolean reusable) {
if (this.released.compareAndSet(false, true)) {
synchronized (this.managedConn) {
if (reusable) {
// 把連接放回連接池
this.manager.releaseConnection(this.managedConn,
this.state, this.validDuration, this.timeUnit);
} else {
try {
this.managedConn.close();
} catch (final IOException ex) {
} finally {
this.manager.releaseConnection(
this.managedConn, null, 0, TimeUnit.MILLISECONDS);
}
}
}
}
}
這里的managedConn
為CPoolProxy
對(duì)象巢块,manager
為PoolingHttpClientConnectionManager
對(duì)象,如果請(qǐng)求完畢后只是釋放巧号,那么會(huì)把連接放回到連接池族奢;
AbstractConnPool#release
public void release(final E entry, final boolean reusable) {
this.lock.lock();
try {
if (this.leased.remove(entry)) {
// 根據(jù)route取連接池
final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
// 將RouteSpecificPool的連接從租用池中移除,放入空閑池
pool.free(entry, reusable);
if (reusable && !this.isShutDown) {
this.available.addFirst(entry);
} else {
entry.close();
}
onRelease(entry); // hook
Future<E> future = pool.nextPending();
// 如果有的話從等待隊(duì)列中拿到等待請(qǐng)求的future
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
// 喚醒其他被阻塞的請(qǐng)求線程
this.condition.signalAll();
}
}
} finally {
this.lock.unlock();
}
}
將釋放的連接從租用池中移回到空閑連接池中丹鸿,然后從等待隊(duì)列取出第一個(gè)請(qǐng)求越走,如果有的話會(huì)喚醒被阻塞線程,注意這里取的操作,是優(yōu)先取同一個(gè)路由下的請(qǐng)求廊敌,如果同一個(gè)路由下沒有才會(huì)取其他路由的請(qǐng)求铜跑;
shutdown和close的區(qū)別
- close
此方法會(huì)優(yōu)雅地關(guān)閉連接,在關(guān)閉底層socket之前會(huì)刷新內(nèi)部緩沖區(qū)骡澈,close不允許從其他線程調(diào)用來強(qiáng)制關(guān)閉連接锅纺。
public void close() throws IOException {
final Socket socket = this.socketHolder.getAndSet(null);
if (socket != null) {
try {
// 先會(huì)刷新緩存區(qū)
this.inBuffer.clear();
this.outbuffer.flush();
try {
try {
socket.shutdownOutput();
} catch (final IOException ignore) {
}
try {
socket.shutdownInput();
} catch (final IOException ignore) {
}
} catch (final UnsupportedOperationException ignore) {
// if one isn't supported, the other one isn't either
}
} finally {
socket.close();
}
}
}
- shutdown
會(huì)立即強(qiáng)制關(guān)閉這個(gè)連接,該方法可以允許從不同的線程來調(diào)用它終止連接肋殴。在關(guān)閉底層scoket之前伞广,不會(huì)嘗試刷新任何的內(nèi)部緩沖區(qū);
public void shutdown() throws IOException {
final Socket socket = this.socketHolder.getAndSet(null);
if (socket != null) {
try {
// 設(shè)置關(guān)閉socket的操作
socket.setSoLinger(true, 0);
} catch (final IOException ex) {
} finally {
socket.close();
}
}
}
TCP協(xié)議中RST標(biāo)志位用來表示重置連接疼电、復(fù)位連接嚼锄,關(guān)閉異常的連接,
發(fā)送RST包關(guān)閉連接時(shí)蔽豺,不必等緩沖區(qū)的包都發(fā)出去(正常要說關(guān)閉要經(jīng)歷4次揮手)区丑,直接就丟棄緩存區(qū)的數(shù)據(jù)并發(fā)送RST包;而接收端收到RST包后修陡,也不必發(fā)送ACK包來確認(rèn)沧侥。
setSoLinger
的第一個(gè)參數(shù)為false時(shí),TCP關(guān)閉連接時(shí)會(huì)保存默認(rèn)的操作魄鸦,緩沖區(qū)的數(shù)據(jù)會(huì)正常刷新宴杀;
當(dāng)?shù)谝粋€(gè)參數(shù)為true時(shí),會(huì)依賴第二個(gè)參數(shù)的值拾因,當(dāng)?shù)诙€(gè)參數(shù)為0時(shí)旺罢,調(diào)用close的時(shí)候,TCP連接會(huì)立即斷開绢记;緩沖區(qū)中未被發(fā)送的數(shù)據(jù)將被丟棄扁达,并向?qū)Ψ桨l(fā)送一個(gè)RST包,TCP連接將不會(huì)進(jìn)入TIME_WAIT狀態(tài)(上面提到的對(duì)端不會(huì)復(fù)回ACK)蠢熄;
當(dāng)?shù)诙€(gè)參數(shù)值不為0時(shí)跪解,會(huì)在設(shè)置的時(shí)間段內(nèi)繼續(xù)嘗試發(fā)送緩沖區(qū)的數(shù)據(jù),如果超時(shí)則丟棄緩沖區(qū)的數(shù)據(jù)签孔,并且調(diào)用close會(huì)阻塞直到設(shè)置的時(shí)間結(jié)束后才返回叉讥;