soul網(wǎng)關學習18-插件實現(xiàn)2-HttpClient插件集-http請求的轉(zhuǎn)發(fā)與結(jié)果處理

在前面兩篇我們分析了Divide插件的處理,soul網(wǎng)關對整個http請求赵颅,Divide插件還只是其中一部分,其主要功能只是完成了后端節(jié)點的選取。至于如何將請求再轉(zhuǎn)發(fā)到后端節(jié)點秕噪,則就是我們今天要分析的插件HttpClient集去實現(xiàn)的。

http請求的處理流圖

http-request-process

我們分析一下這個圖厚宰,這個圖表示了最簡版的soul網(wǎng)關處理http請求的流程

  1. http請求進來腌巾,經(jīng)由Divide插件處理,其主要邏輯就是通過負載均衡算法選取出后端服務節(jié)點來處理這次的請求铲觉,實際上就是拿到了一個后端服務的httpUrl澈蝙,并將httpUrl塞進exchange對象
  2. 接下來HttpClient插件處理,該插件則會向httpUrl發(fā)起請求撵幽,在拿到返回結(jié)果webHandlerClientResponse后灯荧,并將其塞進exchange對象
  3. 接下來WebClientResponse插件會取出返回結(jié)果webHandlerClientResponse,將其作為最終的返回結(jié)果輸出給到客戶端

分析

  1. 先從configuration入手盐杂,找到HttpClient插件的配置類HttpClientPluginConfiguration逗载,會有幾個關鍵的beanHttpClientWebClientConfigurationNettyHttpClientConfiguration
  2. 先看HttpClient链烈,涉及到httpClient的實例化厉斟,包括連接池(禁用、定長强衡、彈性)擦秽、連接超時(默認45s)、是否走代理(代理服務器漩勤、用戶名感挥、密碼;)锯七、https
    public HttpClient httpClient(final HttpClientProperties properties) {
        // configure pool resources
        // httpClient的初始化链快,包括連接池(禁用、定長眉尸、彈性)域蜗、連接超時(默認45s)巨双、
        // 是否走代理(代理服務器、用戶名霉祸、密碼筑累;)、https
        // 連接池
        HttpClientProperties.Pool pool = properties.getPool();
        ConnectionProvider connectionProvider;
        // 省略丝蹭。慢宗。。
        HttpClient httpClient = HttpClient.create(connectionProvider)
                .tcpConfiguration(tcpClient -> {
                    // 連接超時
                    if (properties.getConnectTimeout() != null) {
                        // 省略奔穿。镜沽。。
                    }
                    // configure proxy if proxy host is set.
                    // 是否走代理
                    HttpClientProperties.Proxy proxy = properties.getProxy();
                   // 省略贱田。缅茉。。
                });
        // ssl
        HttpClientProperties.Ssl ssl = properties.getSsl();
        // 省略男摧。蔬墩。。
        return httpClient;
    }
  • WebClientConfiguration則包含了 WebClientPluginWebClientResponsePlugin耗拓;而NettyHttpClientConfiguration則包含了NettyHttpClientPluginNettyClientResponsePlugin
  • 他們都是為了實現(xiàn)http 后端服務的調(diào)用拇颅,以及返回結(jié)果的處理,是一個二選一的事情乔询,由配置項soul.httpclient.strategy決定樟插,其中WebClientConfiguration為默認
  • 我們具體分析下這兩種方式

WebClientConfiguration

WebClientPlugin

  • WebClientPlugin實例化,需傳入WebClient
        @Bean
        public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
            // 構(gòu)造webClient哥谷,是一個非阻塞岸夯、用于執(zhí)行http請求的響應式的客戶端麻献,其底層是Reactor Netty
            // 類似restTemplate们妥,我們使用時也需要進行構(gòu)造
            WebClient webClient = WebClient.builder()
                    // 客戶端連接器,傳入已預先配置好的httpClient勉吻,也就是我們在上面實例化的bean httpClient
                    .clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
                    .build();
            return new WebClientPlugin(webClient);
        }
  • WebClientPlugin處理邏輯监婶,關鍵方法executehandleRequestBody
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 取出網(wǎng)關上下文
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // 請求路徑
        String urlPath = exchange.getAttribute(Constants.HTTP_URL);
        if (StringUtils.isEmpty(urlPath)) {
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
        log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
        // 原始請求的方法類型
        HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
        // 請求體的構(gòu)造
        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
        // 處理請求體以及發(fā)送請求
        return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
    }
   
    private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
        return requestBodySpec
                // 請求頭
                .headers(httpHeaders -> {
                    httpHeaders.addAll(exchange.getRequest().getHeaders());
                    httpHeaders.remove(HttpHeaders.HOST);
                })
                // 請求內(nèi)容的類型
                .contentType(buildMediaType(exchange))
                // 請求body
                .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
                // 執(zhí)行請求
                .exchange()
                // 請求異常時的處理
                .doOnError(e -> log.error(e.getMessage()))
                // 超時異常的拋出
                .timeout(Duration.ofMillis(timeout))
                // 重試:只有當請求發(fā)生連接超時,重試次數(shù)齿桃,重試算法使用2的指數(shù)退讓惑惶,第一次重試等待200ms,期間最大間隔為20s
                .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                        .retryMax(retryTimes)
                        .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                // 將flux -> mono返回
                .flatMap(e -> doNext(e, exchange, chain));

    }

WebClientResponsePlugin

  • 看下其處理邏輯excute方法
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 先執(zhí)行插件鏈
        return chain.execute(exchange)
                // then表示插件鏈處理完成后才執(zhí)行的邏輯短纵,then里邊的邏輯也是異步處理的
                .then(Mono.defer(() -> {
            ServerHttpResponse response = exchange.getResponse();
            // 后端服務返回的響應
            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
            // 一些服務異常的處理
            if (Objects.isNull(clientResponse)
                    || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                    || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            // 后端服務返回的狀態(tài)碼 cookie header的處理
            response.setStatusCode(clientResponse.statusCode());
            response.getCookies().putAll(clientResponse.cookies());
            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
            // 最后將響應體寫入到客戶端響應中
            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
        }));
    }

小結(jié)

  • WebClientPlugin底層直接用spring官方實現(xiàn)的webClient去完成后端服務的請求带污,請求之前需要將客戶端請求過來的請求頭header、請求方法httpMethod香到、請求內(nèi)容類型contentType鱼冀、請求體requestBody一并發(fā)送給后端服務报破;執(zhí)行請求后會將后端服務返回的響應放于exchange中,交由插件WebClientResponsePlugin處理千绪;最后繼續(xù)執(zhí)行插件鏈
  • WebClientResponsePlugin是先執(zhí)行插件鏈充易,待插件鏈上的所有插件全部執(zhí)行完畢后才執(zhí)行自己的邏輯:處理客戶端原始請求的響應,即將由后端服務返回的響應clientResponse荸型、狀態(tài)碼盹靴、cookieheader設置到其內(nèi)容之中

NettyHttpClientConfiguration

NettyHttpClientPlugin

  • NettyHttpClientPlugin實例化瑞妇,傳入httpClient即可
        @Bean
        public SoulPlugin nettyHttpClientPlugin(final ObjectProvider<HttpClient> httpClient) {
            return new NettyHttpClientPlugin(httpClient.getIfAvailable());
        }
  • 看下插件的execute方法稿静,只保留關鍵邏輯
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 后端服務請求方法、請求頭的生成
        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        HttpHeaders filtered = request.getHeaders();
        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);
        String url = exchange.getAttribute(Constants.HTTP_URL);
        // 辕狰。自赔。。
        Flux<HttpClientResponse> responseFlux =
                // 設置請求頭
                this.httpClient.headers(headers -> headers.add(httpHeaders))
                // 構(gòu)造requestSender
                .request(method).uri(url)
                // 發(fā)送請求 nettyOutBound出站的一個處理柳琢,即發(fā)送請求前的一個處理
                // 需要將請求體的內(nèi)容轉(zhuǎn)換為netty的ByteBuf傳輸出去
                .send((req, nettyOutbound) -> nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())))
                // 將connection提取出Flux<HttpClientResponse>绍妨,
                .responseConnection((res, connection) -> {
                    // 將后端服務返回的響應結(jié)果放到exchange中
                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
                    // 將connection對象放到exchange中,需要傳遞給NettyClientResponsePlugin處理
                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection);
                    // 處理header柬脸、cookie他去、httpStatus
                    ServerHttpResponse response = exchange.getResponse();
                    HttpHeaders headers = new HttpHeaders();
                    res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
                    String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
                    if (StringUtils.hasLength(contentTypeValue)) {
                        exchange.getAttributes().put(Constants.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
                    }
                    HttpStatus status = HttpStatus.resolve(res.status().code());
                    if (status != null) {
                        response.setStatusCode(status);
                    } else if (response instanceof AbstractServerHttpResponse) {
                        ((AbstractServerHttpResponse) response)
                                .setStatusCodeValue(res.status().code());
                    } else {
                        throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());
                    }
                    response.getHeaders().putAll(headers);

                    return Mono.just(res);
                });
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        Duration duration = Duration.ofMillis(timeout);
        responseFlux = responseFlux
                // 超時
                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
                // 異常的映射
                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
        // 繼續(xù)執(zhí)行插件鏈
        return responseFlux.then(chain.execute(exchange));
    }

NettyClientResponsePlugin

  • 直接看插件的execute方法
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 先執(zhí)行插件自身的邏輯,為異步執(zhí)行
        return Mono.defer(() -> {
            Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
            if (connection == null) {
                return Mono.empty();
            }
            if (log.isTraceEnabled()) {
                log.trace("NettyWriteResponseFilter start inbound: "
                        + connection.channel().id().asShortText() + ", outbound: "
                        + exchange.getLogPrefix());
            }
            ServerHttpResponse response = exchange.getResponse();
            NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
            // NettyDataBuffer的響應
            final Flux<NettyDataBuffer> body = connection
                    // 對于client端的connection對象倒堕,你響應的處理反而是inbound灾测;
                    // 必須先配置inbound作為connection的橋梁,否則是無法接收數(shù)據(jù)的
                    .inbound()
                    // 接收:轉(zhuǎn)換數(shù)據(jù)
                    .receive()
                    // 將內(nèi)存buffers保留進行復用
                    .retain()
                    // 將ByteBuf轉(zhuǎn)換為spring中的NettyDataBuffer
                    .map(factory::wrap);
            MediaType contentType = response.getHeaders().getContentType();
            // 是否為流媒體響應垦巴,若是直接返回
            return isStreamingMediaType(contentType)
                    ? response.writeAndFlushWith(body.map(Flux::just))
                    : response.writeWith(body);

        })
                // 執(zhí)行插件
                .then(chain.execute(exchange)
                        //TODO question 成功的情況下怎么沒有釋放connection
                        .doOnError(throwable -> cleanup(exchange))).doOnCancel(() -> cleanup(exchange));
    }

小結(jié)

  • NettyHttpClientPlugin直接使用httpClient做為發(fā)送請求的載體媳搪,先構(gòu)造requestSender(請求方法、請求路徑骤宣、請求頭)秦爆,然后調(diào)用其send方法獲取到responseReceiver,再通過responseConnection提取出Flux<HttpClientResponse>憔披,其中對于響應中的(header等限、cookiehttpStatus)這些就直接在這個提取邏輯中處理芬膝,而后端服務返回的響應webHandlerClientResponse望门,還有整個connection對象則放到exchange
  • NettyClientResponsePlugin主要邏輯就是處理后端服務返回的響應。有些奇怪的是锰霜,這里并沒有直接使用NettyHttpClientPlugin中放到exchange對象的webHandlerClientResponse筹误,而是從connection對象中讀取響應數(shù)據(jù);隨后再執(zhí)行插件鏈邏輯

總結(jié)

  • 上述基本將遠程客戶端的http請求轉(zhuǎn)發(fā)到后端服務癣缅,以及將后端服務的響應再返回給遠程客戶端的代碼實現(xiàn)進行了分析
  • soul網(wǎng)關提供了webClientnettyClient兩套實現(xiàn)的機制厨剪,可供選擇
  • 其中webClient方式是直接使用spring自己封裝的WebClient(是在HttpClient之上封裝實現(xiàn)的一套非阻塞勘畔,響應式的http客戶端)作為請求的載體,整個代碼更簡潔明了;而 nettyClient 則是用更底層的HttpClient 去做http請求操作,需自行處理netty的出站outbound與入站inbound邏輯音羞,整個代碼會更復雜一些。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末万哪,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子抡秆,更是在濱河造成了極大的恐慌奕巍,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件儒士,死亡現(xiàn)場離奇詭異的止,居然都是意外死亡,警方通過查閱死者的電腦和手機着撩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門诅福,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拖叙,你說我怎么就攤上這事氓润。” “怎么了薯鳍?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵咖气,是天一觀的道長。 經(jīng)常有香客問我挖滤,道長崩溪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任斩松,我火速辦了婚禮伶唯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘砸民。我一直安慰自己抵怎,他們只是感情好奋救,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布岭参。 她就那樣靜靜地躺著,像睡著了一般尝艘。 火紅的嫁衣襯著肌膚如雪演侯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天背亥,我揣著相機與錄音秒际,去河邊找鬼悬赏。 笑死,一個胖子當著我的面吹牛娄徊,可吹牛的內(nèi)容都是我干的闽颇。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼寄锐,長吁一口氣:“原來是場噩夢啊……” “哼兵多!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起橄仆,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤剩膘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后盆顾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體怠褐,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年您宪,在試婚紗的時候發(fā)現(xiàn)自己被綠了奈懒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡宪巨,死狀恐怖筐赔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情揖铜,我是刑警寧澤茴丰,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站天吓,受9級特大地震影響贿肩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜龄寞,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一汰规、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧物邑,春花似錦溜哮、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至科阎,卻和暖如春述吸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背锣笨。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工蝌矛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留道批,地道東北人。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓入撒,卻偏偏與公主長得像隆豹,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子茅逮,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容