Java HttpComponents源碼閱讀2

Java HttpComponents源碼閱讀1
Java HttpComponents源碼閱讀2

MainClientExec#exec

MainClientExec作為執(zhí)行鏈中的最后一個(gè)請(qǐng)求執(zhí)行器宠互,負(fù)責(zé)執(zhí)行實(shí)際的請(qǐng)求操作。所以這個(gè)方法匯總了大部分連接的處理椭坚,下面一步步進(jìn)行拆解予跌;

public CloseableHttpResponse execute(
        final HttpRoute route,
        final HttpRequestWrapper request,
        final HttpClientContext context,
        final HttpExecutionAware execAware) throws IOException, HttpException {
    ... 
    Object userToken = context.getUserToken();
    // 從連接池中獲取http請(qǐng)求句柄
    final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);

connManagerPoolingHttpClientConnectionManager對(duì)象,每一個(gè)HTTP請(qǐng)求都會(huì)調(diào)用requestConnection方法來獲取一個(gè)請(qǐng)求的句柄善茎;

PoolingHttpClientConnectionManager#requestConnection
private final CPool pool;
public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        // pool對(duì)象為CPool類型券册,根據(jù)路由從池中租用連接句柄
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() { ... };
}

根據(jù)路由從連接池中租用連接的句柄,從該句柄可以拿到HttpClientConnection對(duì)象巾表,注意這里新分配的連接可以以關(guān)閉狀態(tài)返回汁掠;state代表著期望的連接狀態(tài),如果為null表示新的連接不期望攜帶任何狀態(tài)集币,否則從可用連接池中獲取請(qǐng)求時(shí)會(huì)去尋找連接狀態(tài)為state的連接考阱;

CPool#lease
public Future<E> lease(final T route, final Object state, 
                       final FutureCallback<E> callback) {
        return new Future<E>() { ... }
}

lease方法返回一個(gè)Future句柄,該句柄定義了租用連接的具體細(xì)節(jié)鞠苟,這里該句柄已經(jīng)和路由綁定起來乞榨;

繼續(xù)接MainClientExec#exec

    ...
    if (execAware != null) {
        if (execAware.isAborted()) {
            // 如果請(qǐng)求被中斷,取消請(qǐng)求
            connRequest.cancel();
            throw new RequestAbortedException("Request aborted");
        }
        // 通過cas設(shè)置cancellable方法
        execAware.setCancellable(connRequest);
    }
    ... 

所有的方法類如HttpGet都是繼承HttpExecutionAware当娱,這里把請(qǐng)求取消的處理和請(qǐng)求綁定起來吃既;

    ... 
    final RequestConfig config = context.getRequestConfig();
    final HttpClientConnection managedConn;
    try {
        // 獲取配置中的超時(shí)時(shí)間
        final int timeout = config.getConnectionRequestTimeout();
        // 在一定時(shí)間內(nèi)從句柄接口中獲取HttpClientConnection連接
        // 見下面
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
    } catch(final InterruptedException interrupted) {
        // 如果遇到線程中斷直接拋異常
        Thread.currentThread().interrupt();
        throw new RequestAbortedException("Request aborted", interrupted);
    } catch(final ExecutionException ex) {
        // 試圖檢索因拋出異常而中止的任務(wù)的結(jié)果時(shí)引發(fā)的異常,
        // 主要是和Future有關(guān)的跨细,在后面可以看到為什么會(huì)發(fā)生這個(gè)異常
        Throwable cause = ex.getCause();
        if (cause == null) {
            cause = ex;
        }
        throw new RequestAbortedException("Request execution failed", cause);
    }
    ...

根據(jù)配置來的超時(shí)時(shí)間請(qǐng)求HttpClientConnection連接對(duì)象鹦倚;

ConnectionRequest#get
public HttpClientConnection get(
        final long timeout,
        final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
    // 根據(jù)route從連接池中獲取connection
    // 見下面
    final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit); 
    if (conn.isOpen()) {
        // 獲取到重用連接,socket還未被關(guān)閉冀惭,重新刷新socket配置
        final HttpHost host;
        if (route.getProxyHost() != null) {
            host = route.getProxyHost();
        } else {
            host = route.getTargetHost();
        }
        final SocketConfig socketConfig = resolveSocketConfig(host);
        // 重新刷新timeout
        conn.setSocketTimeout(socketConfig.getSoTimeout());
    }
    return conn;
}

在給定時(shí)間內(nèi)在連接池中獲取連接震叙,調(diào)用此方法時(shí)將阻塞掀鹅,直到連接變?yōu)榭捎谩⒊瑫r(shí)過期或連接管理器關(guān)閉為止媒楼。如果在阻塞時(shí)或開始之前調(diào)用了cancel()乐尊,將拋出一個(gè)InterruptedException

PoolingHttpClientConnectionManager#leaseConnection
protected HttpClientConnection leaseConnection(
        final Future<CPoolEntry> future,
        final long timeout,
        final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
    final CPoolEntry entry;
    try {
        // future里面含有路由信息划址,從future中獲取CPoolEntry對(duì)象
        // 見下面
        entry = future.get(timeout, timeUnit);
        if (entry == null || future.isCancelled()) {
            throw new ExecutionException(new CancellationException("Operation cancelled"));
        }
        return CPoolProxy.newProxy(entry); // 返回的是CPoolEntry的代理對(duì)象
    } catch (final TimeoutException ex) {
        throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
    }
}

leaseConnection做的事情就是執(zhí)行future句柄定義好的獲取連接操作扔嵌,成功獲取到CPoolEntry后會(huì)返回一個(gè)代理對(duì)象CPoolProxy,該代理的核心功能就是校驗(yàn)連接的可用性夺颤,一旦發(fā)現(xiàn)連接不可用了痢缎,就會(huì)拋出ConnectionShutdownException異常;

AbstractConnPool$future#get
private final AtomicBoolean cancelled = new AtomicBoolean(false);
// 代表完成獲取連接
private final AtomicBoolean done = new AtomicBoolean(false);
// 儲(chǔ)存的是CPoolEntry對(duì)象
private final AtomicReference<E> entryRef = new AtomicReference<E>(null); 

public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
    for (;;) {
        // 這里開始阻塞
        synchronized (this) { // this對(duì)象是指future對(duì)象
            try {
                // 第一次獲取entry為null
                final E entry = entryRef.get();
                if (entry != null) {
                    return entry;
                }
                if (done.get()) {
                    // 如果已經(jīng)成功獲取到了連接
                    throw new ExecutionException(operationAborted());
                }
                // 租用entry拂共,這里獲取到的entry一定是非空
                final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                if (validateAfterInactivity > 0)  {
                    // 如果開啟校驗(yàn)連接可用性
                    if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                        // 檢查連接是否可用
                        if (!validate(leasedEntry)) {  
                            // 連接不可用牺弄,釋放連接姻几,并重新獲取連接
                            leasedEntry.close();
                            release(leasedEntry, false);
                            continue;
                        }
                    }
                }
                // 防止重復(fù)租用連接
                if (done.compareAndSet(false, true)) {
                    // 更新狀態(tài)
                    entryRef.set(leasedEntry);
                    done.set(true);
                    // 成功租用連接的鉤子
                    onLease(leasedEntry);
                    if (callback != null) {
                        callback.completed(leasedEntry);
                    }
                    return leasedEntry;
                } else {
                    // cas失敗宜狐,該future已被其他線程用來租用連接
                    release(leasedEntry, true);
                    throw new ExecutionException(operationAborted());
                }
            } catch (final IOException ex) {
                // getPoolEntryBlocking會(huì)拋IOException,通常是socket的IO異常
                if (done.compareAndSet(false, true)) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                }
                throw new ExecutionException(ex);
            }
        }
    }
}

自旋的獲取CPoolEntry對(duì)象蛇捌,無(wú)論是從空閑的連接池或是新創(chuàng)建一個(gè)新的連接抚恒,總之從這個(gè)方法獲取到的CPoolEntry一定是可用的連接對(duì)象;

AbstractConnPool#getPoolEntryBlocking
private E getPoolEntryBlocking(
        final T route, final Object state,
        final long timeout, final TimeUnit timeUnit,
        final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {

    Date deadline = null;
    if (timeout > 0) {
        deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
    }
    this.lock.lock();
    try {
        // 根據(jù)路由返回特定的連接池
        final RouteSpecificPool<T, C, E> pool = getPool(route);
        E entry;
        for (;;) {
            if (future.isCancelled()) {
                throw new ExecutionException(operationAborted());
            }
            for (;;) {
                // 根據(jù)state去available連接池中獲取連接
                entry = pool.getFree(state); 
                if (entry == null) {
                    // 連接池中沒有空閑的連接
                    break;
                }
                // 獲取到連接
                if (entry.isExpired(System.currentTimeMillis())) {
                    // 連接已經(jīng)過期络拌,將連接關(guān)閉
                    entry.close();
                }
                if (entry.isClosed()) {
                    // 如果連接已被關(guān)閉俭驮,將其從available移除
                    this.available.remove(entry);
                    pool.free(entry, false);
                } else {
                    break;
                }
            }
            if (entry != null) {
                // 如果從連接池中獲取到連接,將連接從available中拿出來放置到租用池中
                this.available.remove(entry);
                this.leased.add(entry);
                onReuse(entry); // 重用連接的鉤子
                return entry;
            }

            // 連接池中沒有可用連接春贸,將創(chuàng)建新的連接
            final int maxPerRoute = getMax(route);
            // 在分配新連接之前縮小連接池大小
            final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
            if (excess > 0) {
                for (int i = 0; i < excess; i++) {
                    final E lastUsed = pool.getLastUsed();
                    if (lastUsed == null) {
                        break;
                    }
                    lastUsed.close();
                    this.available.remove(lastUsed);
                    pool.remove(lastUsed);
                }
            }

            if (pool.getAllocatedCount() < maxPerRoute) {
                // 每個(gè)路由的最大容量是有限的
                final int totalUsed = this.leased.size();
                final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                if (freeCapacity > 0) {
                    final int totalAvailable = this.available.size();
                    if (totalAvailable > freeCapacity - 1) {
                        // 空閑連接池容量滿了混萝,將最近一個(gè)空閑連接移出空閑池
                        if (!this.available.isEmpty()) {
                            final E lastUsed = this.available.removeLast();
                            lastUsed.close();
                            final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                            otherpool.remove(lastUsed);
                        }
                    }
                    // 創(chuàng)建出來的對(duì)象是LoggingManagedHttpClientConnection
                    final C conn = this.connFactory.create(route);
                    // 同一個(gè)連接會(huì)在RouteSpecificPool中和AbstractConnPool保存兩份
                    entry = pool.add(conn);
                    this.leased.add(entry);
                    return entry;
                }
            }
            // 創(chuàng)建連接失敗,因?yàn)樵撀酚傻倪B接已經(jīng)達(dá)到最大上限
            boolean success = false;
            try {
                // 將該創(chuàng)建連接的任務(wù)緩存起來
                pool.queue(future);
                this.pending.add(future);
                // 等待直到有連接被釋放
                if (deadline != null) { 
                    success = this.condition.awaitUntil(deadline);
                } else {
                    this.condition.await();
                    success = true;
                }
                if (future.isCancelled()) {
                    throw new ExecutionException(operationAborted());
                }
            } finally {
                // 一般情況下等待線程被連接池喚醒萍恕,現(xiàn)在應(yīng)該有一個(gè)空閑連接可用
                // 但是也有可能等待超時(shí)了逸嘀,無(wú)論如何只要繼續(xù)循環(huán),這兩種情況都會(huì)被檢查允粤。
                pool.unqueue(future);
                this.pending.remove(future);
            }
            // check for spurious wakeup vs. timeout
            if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                break;
            }
        }
        throw new TimeoutException("Timeout waiting for connection");
    } finally {
        this.lock.unlock();
    }
}

// 在連接池內(nèi)部每一個(gè)route對(duì)應(yīng)一個(gè)連接池
private RouteSpecificPool<T, C, E> getPool(final T route) {
    RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
    if (pool == null) {
        pool = new RouteSpecificPool<T, C, E>(route) {
            @Override
            protected E createEntry(final C conn) {
                return AbstractConnPool.this.createEntry(route, conn);
            }
        };
        this.routeToPool.put(route, pool);
    }
    return pool;
}

該方法的主要處理邏輯都是發(fā)生在下面兩個(gè)結(jié)構(gòu)中

CPool作為連接池的核心類崭倘,承擔(dān)了連接池的管理工作,內(nèi)部每個(gè)路由
都對(duì)應(yīng)一個(gè)RouteSpecificPool类垫,所以同一份連接會(huì)在兩個(gè)地方都保存起來司光;

這個(gè)方法最主要做的事情就是:

  1. 根據(jù)路由在RouteSpecificPool池中查找一個(gè)空閑連接,如果有則判斷連接是否過期超時(shí)悉患,判斷連接可用后直接返回残家;
  2. 如果在空閑連接池中找不到可用連接,則判斷當(dāng)前條件下是否允許創(chuàng)建連接售躁,如果成功創(chuàng)建連接則直接返回坞淮;
  3. 如果連接池已經(jīng)滿了谴仙,將請(qǐng)求加入等待隊(duì)列,然后將自己阻塞等待其他連接釋放碾盐;

繼續(xù)回到MainClientExec#exec

    ...
    // 創(chuàng)建連接成功后晃跺,將連接保存到上下文中
    context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
    // 如果開啟老化連接檢查
    if (config.isStaleConnectionCheckEnabled()) {
        if (managedConn.isOpen()) {  // 驗(yàn)證連接
            if (managedConn.isStale()) { 
                managedConn.close(); // 發(fā)現(xiàn)陳舊的連接,將連接斷開
            }
        }
    }
    // 創(chuàng)建holder毫玖,相當(dāng)于一個(gè)請(qǐng)求包裝類
    final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
    try {
        if (execAware != null) {
            // 如果前面已經(jīng)設(shè)置了cancellable掀虎,這行代碼就沒有效果了
            execAware.setCancellable(connHolder);
        }
        HttpResponse response;
        for (int execCount = 1;; execCount++) {
            // 開始執(zhí)行具體請(qǐng)求
            if (!managedConn.isOpen()) {
                // 如果是新的連接,則需要建立請(qǐng)求的路由
                try {
                    // 建立請(qǐng)求路由
                    establishRoute(proxyAuthState, managedConn, route, request, context);
                } catch (final TunnelRefusedException ex) {
                    response = ex.getResponse();
                    break;
                }
            }
            final int timeout = config.getSocketTimeout();
            if (timeout >= 0) {
                managedConn.setSocketTimeout(timeout);
            }

            ...

成功拿到可用連接后付枫,就是開始建立請(qǐng)求路由烹玉,會(huì)根據(jù)當(dāng)前請(qǐng)求目標(biāo)選擇不同的路徑,但是最終都會(huì)調(diào)用下面這個(gè)方法阐滩;

PoolingHttpClientConnectionManager#connect
public void connect(         
        final HttpClientConnection managedConn,
        final HttpRoute route,
        final int connectTimeout,
        final HttpContext context) throws IOException {
    final ManagedHttpClientConnection conn;
    synchronized (managedConn) {
        final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
        conn = entry.getConnection();
    }
    final HttpHost host;
    if (route.getProxyHost() != null) { // 有代理二打,建立代理路由
        host = route.getProxyHost();
    } else {
        host = route.getTargetHost(); // 沒有代理,直接路由
    }
    // 實(shí)際建立socket并連接的操作
    this.connectionOperator.connect(
            conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
}

如果是直接路由掂榔,則將底層連接套接字連接到目標(biāo)地址继效,如果是通過代理(或多個(gè)代理)的路由,則連接到第一個(gè)代理跳轉(zhuǎn)装获。

結(jié)合第一張圖片看瑞信,將LoggingManagedHttpClientConnection對(duì)象與實(shí)際負(fù)責(zé)通信的socket綁定;

繼續(xù)回到MainClientExec#exec

    ... 
    context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
    response = requestExecutor.execute(request, managedConn, context);
    ...

實(shí)際發(fā)送請(qǐng)求消息并拿到響應(yīng)的方法穴豫;

HttpRequestExecutor#execute
public HttpResponse execute(
        final HttpRequest request,
        final HttpClientConnection conn,
        final HttpContext context) throws IOException, HttpException {
    try {
        // 發(fā)送請(qǐng)求凡简,有些情況下response會(huì)隨著請(qǐng)求返回
        HttpResponse response = doSendRequest(request, conn, context);
        if (response == null) {
            // 接受請(qǐng)求
            response = doReceiveResponse(request, conn, context);
        }
        return response;
    } catch (final IOException ex) {
        closeConnection(conn);
        throw ex;
    } catch (final HttpException ex) {
        closeConnection(conn);
        throw ex;
    } catch (final RuntimeException ex) {
        closeConnection(conn);
        throw ex;
    }
}

該方法主要就是處理發(fā)送請(qǐng)求和接受請(qǐng)求;

HttpRequestExecutor#doSendRequest
protected HttpResponse doSendRequest(
        final HttpRequest request,
        final HttpClientConnection conn,
        final HttpContext context) throws IOException, HttpException {
    HttpResponse response = null;
    context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
    context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);
    // 發(fā)送header
    conn.sendRequestHeader(request); 
    if (request instanceof HttpEntityEnclosingRequest) {
        // 檢查expect-continue握手精肃。我們必須刷新頭并等待100-continue響應(yīng)來處理它秤涩。
        // 如果我們得到不同的響應(yīng),我們一定不能發(fā)送實(shí)體司抱。
        boolean sendentity = true;
        final ProtocolVersion ver =
                request.getRequestLine().getProtocolVersion();
        if (((HttpEntityEnclosingRequest) request).expectContinue() &&
                !ver.lessEquals(HttpVersion.HTTP_1_0)) {
            // 需要處理100-continue邏輯
            conn.flush();
            // 在指定時(shí)間內(nèi)等到對(duì)端回復(fù)筐眷,不需要永遠(yuǎn)等待100-continue響應(yīng),如果發(fā)生超時(shí)則發(fā)送實(shí)體請(qǐng)求
            if (conn.isResponseAvailable(this.waitForContinue)) {
                // 意味著在指定時(shí)間內(nèi)獲取到了100-continue應(yīng)答
                response = conn.receiveResponseHeader();
                if (canResponseHaveBody(request, response)) {
                    // 意味著response存在響應(yīng)包状植,解析response
                    conn.receiveResponseEntity(response);
                }
                final int status = response.getStatusLine().getStatusCode();
                if (status < 200) {
                    if (status != HttpStatus.SC_CONTINUE) {
                        throw new ProtocolException(
                                "Unexpected response: " + response.getStatusLine());
                    }
                    // 非正常的狀態(tài)碼浊竟,重置response
                    response = null;
                } else {
                    sendentity = false;
                } 
            }
        }
        if (sendentity) {
            // 發(fā)送請(qǐng)求實(shí)體
            conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
        }
    }
    conn.flush();
    context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);
    return response;
}

通過給定的連接發(fā)送請(qǐng)求,此方法還處理expect-continue握手津畸;

HTTP/1.1 協(xié)議里的Expect: 100-continue 意思是振定,在客戶端發(fā)送 Request Message 之前,HTTP/1.1 協(xié)議允許客戶端先判定服務(wù)器是否愿意接受客戶端發(fā)來的消息主體(基于 Request Headers)肉拓。
目的:它可以讓客戶端在發(fā)送請(qǐng)求數(shù)據(jù)之前去判斷服務(wù)器是否愿意接收該數(shù)據(jù)后频,如果服務(wù)器愿意接收,客戶端才會(huì)真正發(fā)送數(shù)據(jù),如果客戶端直接發(fā)送請(qǐng)求數(shù)據(jù)卑惜,但是服務(wù)器又將該請(qǐng)求拒絕的話膏执,這種行為將帶來很大的資源開銷。
發(fā)送了100-continue的客戶端不應(yīng)該永遠(yuǎn)等待服務(wù)器端做出回應(yīng)露久,超時(shí)一段時(shí)間之后更米,客戶端應(yīng)該直接將實(shí)體發(fā)送出去。

比如在POST方法時(shí)HttpPost就會(huì)去處理expect-continue的邏輯毫痕,如果在這里有判斷到返回了響應(yīng)征峦,則處理響應(yīng)實(shí)體;

HttpRequestExecutor#doReceiveResponse
protected HttpResponse doReceiveResponse(
        final HttpRequest request,
        final HttpClientConnection conn,
        final HttpContext context) throws HttpException, IOException {
     HttpResponse response = null;
     int statusCode = 0;

     while (response == null || statusCode < HttpStatus.SC_OK) {
         response = conn.receiveResponseHeader();
         statusCode = response.getStatusLine().getStatusCode();
         if (statusCode < HttpStatus.SC_CONTINUE) {
             throw new ProtocolException("Invalid response: " + response.getStatusLine());
         }
         // 解析response entity
         if (canResponseHaveBody(request, response)) {
             conn.receiveResponseEntity(response);
         }
     }

    return response;
}

doSendRequest中如果是Get請(qǐng)求消请,會(huì)將HTTP報(bào)文的每一行逐一寫入socket緩沖區(qū)中栏笆,當(dāng)緩沖區(qū)滿了或是數(shù)據(jù)全部寫入后會(huì)發(fā)送出去;
如果是Post請(qǐng)求臊泰,除了發(fā)送請(qǐng)求頭蛉加、請(qǐng)求行、header等信息缸逃,還會(huì)發(fā)送請(qǐng)求體针饥;

doReceiveResponse方法用來接受響應(yīng),但是不負(fù)責(zé)解析具體的entity察滑,只是將對(duì)應(yīng)的socket句柄封裝在響應(yīng)實(shí)體HttpResponse里面等待讀却蚶濉修肠;

繼續(xù)回到MainClientExec#exec贺辰,到這里已經(jīng)拿到請(qǐng)求響應(yīng)了;

            ... 
            // 判斷連接是否是可重用狀態(tài)
            if (reuseStrategy.keepAlive(response, context)) {
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                 // 返回的響應(yīng)頭中含有Keep-Alive
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                connHolder.markReusable();
            } else {
                // 連接不可重用
                connHolder.markNonReusable();
            }

            ...
            // 校驗(yàn)請(qǐng)求
            ...

        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            // 連接不需要并且(假定)處于可重用狀態(tài)
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        }
        // HttpResponse的代理類嵌施,可以用來釋放與原始響應(yīng)關(guān)聯(lián)的客戶端連接
        return new HttpResponseProxy(response, connHolder);
    } catch (final Exception ex) {
       ...
    }
}

至此整個(gè)請(qǐng)求流程就大概完成了饲化,下面看看連接什么時(shí)候放回連接池;

EntityUtils.toString(response.getEntity(), "UTF-8");

最后會(huì)走到下面這個(gè)方法

private static String toString(
        final HttpEntity entity,
        final ContentType contentType) throws IOException {
    final InputStream inStream = entity.getContent();
    if (inStream == null) {
        return null;
    }
    try {
        int capacity = (int)entity.getContentLength();
        Charset charset = null;
        ...
        final Reader reader = new InputStreamReader(inStream, charset);
        final CharArrayBuffer buffer = new CharArrayBuffer(capacity);
        final char[] tmp = new char[1024];
        int l;
        while((l = reader.read(tmp)) != -1) {
            buffer.append(tmp, 0, l);
        }
        return buffer.toString();
    } finally {
        inStream.close();
    }
}

public InputStream getContent() throws IOException {
    return new EofSensorInputStream(this.wrappedEntity.getContent(), this);
}

關(guān)鍵在于EofSensorInputStream這個(gè)對(duì)象吗伤,該對(duì)象的核心功能就是檢測(cè)EOF標(biāo)識(shí)符并觸發(fā)close()操作吃靠,主要用于在使用響應(yīng)體或不再需要連接時(shí)自動(dòng)釋放底層托管的連接。

public int read() throws IOException {
    int readLen = -1;
    if (isReadAllowed()) {
        try {
            readLen = wrappedStream.read();
            checkEOF(readLen); // 檢查eof
        } catch (final IOException ex) {
            checkAbort();
            throw ex;
        }
    }
    return readLen;
}
// checkEOF會(huì)走到這里
public boolean eofDetected(final InputStream wrapped) throws IOException {
    try {
        if (wrapped != null) {
            wrapped.close();
        }
        releaseConnection(); // 釋放連接
    } catch (final IOException ex) {
        abortConnection(); // 中斷連接
        throw ex;
    } catch (final RuntimeException ex) {
        abortConnection();
        throw ex;
    } finally {
        cleanup(); // 關(guān)閉連接
    }
    return false;
}

當(dāng)從流中讀取到EOF標(biāo)志符后會(huì)主動(dòng)釋放連接足淆;

ConnectionHolder#releaseConnection
private void releaseConnection(final boolean reusable) {
    if (this.released.compareAndSet(false, true)) {
        synchronized (this.managedConn) {
            if (reusable) {
                // 把連接放回連接池
                this.manager.releaseConnection(this.managedConn, 
                        this.state, this.validDuration, this.timeUnit);
            } else {
                try {
                    this.managedConn.close();
                } catch (final IOException ex) {
                } finally {
                    this.manager.releaseConnection(
                        this.managedConn, null, 0, TimeUnit.MILLISECONDS);
                }
            }
        }
    }
}

這里的managedConnCPoolProxy對(duì)象巢块,managerPoolingHttpClientConnectionManager對(duì)象,如果請(qǐng)求完畢后只是釋放巧号,那么會(huì)把連接放回到連接池族奢;

AbstractConnPool#release
public void release(final E entry, final boolean reusable) {     
    this.lock.lock();
    try {
        if (this.leased.remove(entry)) {
            // 根據(jù)route取連接池
            final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
            // 將RouteSpecificPool的連接從租用池中移除,放入空閑池
            pool.free(entry, reusable);
            if (reusable && !this.isShutDown) {
                this.available.addFirst(entry);
            } else {
                entry.close();
            }
            onRelease(entry); // hook
            Future<E> future = pool.nextPending();
            // 如果有的話從等待隊(duì)列中拿到等待請(qǐng)求的future
            if (future != null) {
                this.pending.remove(future);
            } else {
                future = this.pending.poll();
            }
            if (future != null) {
                // 喚醒其他被阻塞的請(qǐng)求線程
                this.condition.signalAll();
            }
        }
    } finally {
        this.lock.unlock();
    }
}

將釋放的連接從租用池中移回到空閑連接池中丹鸿,然后從等待隊(duì)列取出第一個(gè)請(qǐng)求越走,如果有的話會(huì)喚醒被阻塞線程,注意這里取的操作,是優(yōu)先取同一個(gè)路由下的請(qǐng)求廊敌,如果同一個(gè)路由下沒有才會(huì)取其他路由的請(qǐng)求铜跑;

shutdown和close的區(qū)別
  • close
    此方法會(huì)優(yōu)雅地關(guān)閉連接,在關(guān)閉底層socket之前會(huì)刷新內(nèi)部緩沖區(qū)骡澈,close不允許從其他線程調(diào)用來強(qiáng)制關(guān)閉連接锅纺。
public void close() throws IOException {
    final Socket socket = this.socketHolder.getAndSet(null);
    if (socket != null) {
        try {
            // 先會(huì)刷新緩存區(qū)
            this.inBuffer.clear();
            this.outbuffer.flush();
            try {
                try {
                    socket.shutdownOutput();
                } catch (final IOException ignore) {
                }
                try {
                    socket.shutdownInput();
                } catch (final IOException ignore) {
                }
            } catch (final UnsupportedOperationException ignore) {
                // if one isn't supported, the other one isn't either
            }
        } finally {
            socket.close();
        }
    }
}
  • shutdown
    會(huì)立即強(qiáng)制關(guān)閉這個(gè)連接,該方法可以允許從不同的線程來調(diào)用它終止連接肋殴。在關(guān)閉底層scoket之前伞广,不會(huì)嘗試刷新任何的內(nèi)部緩沖區(qū);
public void shutdown() throws IOException {
    final Socket socket = this.socketHolder.getAndSet(null);
    if (socket != null) {
        try {
            // 設(shè)置關(guān)閉socket的操作
            socket.setSoLinger(true, 0);
        } catch (final IOException ex) {
        } finally {
            socket.close();
        }
    }
}

TCP協(xié)議中RST標(biāo)志位用來表示重置連接疼电、復(fù)位連接嚼锄,關(guān)閉異常的連接,
發(fā)送RST包關(guān)閉連接時(shí)蔽豺,不必等緩沖區(qū)的包都發(fā)出去(正常要說關(guān)閉要經(jīng)歷4次揮手)区丑,直接就丟棄緩存區(qū)的數(shù)據(jù)并發(fā)送RST包;而接收端收到RST包后修陡,也不必發(fā)送ACK包來確認(rèn)沧侥。

setSoLinger的第一個(gè)參數(shù)為false時(shí),TCP關(guān)閉連接時(shí)會(huì)保存默認(rèn)的操作魄鸦,緩沖區(qū)的數(shù)據(jù)會(huì)正常刷新宴杀;

當(dāng)?shù)谝粋€(gè)參數(shù)為true時(shí),會(huì)依賴第二個(gè)參數(shù)的值拾因,當(dāng)?shù)诙€(gè)參數(shù)為0時(shí)旺罢,調(diào)用close的時(shí)候,TCP連接會(huì)立即斷開绢记;緩沖區(qū)中未被發(fā)送的數(shù)據(jù)將被丟棄扁达,并向?qū)Ψ桨l(fā)送一個(gè)RST包,TCP連接將不會(huì)進(jìn)入TIME_WAIT狀態(tài)(上面提到的對(duì)端不會(huì)復(fù)回ACK)蠢熄;

當(dāng)?shù)诙€(gè)參數(shù)值不為0時(shí)跪解,會(huì)在設(shè)置的時(shí)間段內(nèi)繼續(xù)嘗試發(fā)送緩沖區(qū)的數(shù)據(jù),如果超時(shí)則丟棄緩沖區(qū)的數(shù)據(jù)签孔,并且調(diào)用close會(huì)阻塞直到設(shè)置的時(shí)間結(jié)束后才返回叉讥;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市饥追,隨后出現(xiàn)的幾起案子图仓,更是在濱河造成了極大的恐慌,老刑警劉巖判耕,帶你破解...
    沈念sama閱讀 221,820評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件透绩,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)帚豪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門碳竟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人狸臣,你說我怎么就攤上這事莹桅。” “怎么了烛亦?”我有些...
    開封第一講書人閱讀 168,324評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵诈泼,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我煤禽,道長(zhǎng)铐达,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評(píng)論 1 297
  • 正文 為了忘掉前任檬果,我火速辦了婚禮瓮孙,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘选脊。我一直安慰自己杭抠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評(píng)論 6 397
  • 文/花漫 我一把揭開白布恳啥。 她就那樣靜靜地躺著偏灿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪钝的。 梳的紋絲不亂的頭發(fā)上翁垂,一...
    開封第一講書人閱讀 52,328評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音扁藕,去河邊找鬼沮峡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛亿柑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播棍弄,決...
    沈念sama閱讀 40,897評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼望薄,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了呼畸?” 一聲冷哼從身側(cè)響起痕支,我...
    開封第一講書人閱讀 39,804評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蛮原,沒想到半個(gè)月后卧须,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評(píng)論 3 340
  • 正文 我和宋清朗相戀三年花嘶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了笋籽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,561評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡椭员,死狀恐怖车海,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情隘击,我是刑警寧澤侍芝,帶...
    沈念sama閱讀 36,238評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站埋同,受9級(jí)特大地震影響州叠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜凶赁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評(píng)論 3 334
  • 文/蒙蒙 一留量、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧哟冬,春花似錦楼熄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至翰灾,卻和暖如春缕粹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纸淮。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工平斩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咽块。 一個(gè)月前我還...
    沈念sama閱讀 48,983評(píng)論 3 376
  • 正文 我出身青樓绘面,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親侈沪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子揭璃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評(píng)論 2 359