在前面兩篇我們分析了Divide
插件的處理,soul
網(wǎng)關對整個http
請求赵颅,Divide
插件還只是其中一部分,其主要功能只是完成了后端節(jié)點
的選取。至于如何將請求再轉(zhuǎn)發(fā)到后端節(jié)點
秕噪,則就是我們今天要分析的插件HttpClient
集去實現(xiàn)的。
http請求的處理流圖
http-request-process
我們分析一下這個圖厚宰,這個圖表示了最簡版的soul
網(wǎng)關處理http
請求的流程
-
http
請求進來腌巾,經(jīng)由Divide
插件處理,其主要邏輯就是通過負載均衡算法選取出后端服務節(jié)點來處理這次的請求铲觉,實際上就是拿到了一個后端服務的httpUrl
澈蝙,并將httpUrl
塞進exchange
對象 - 接下來
HttpClient
插件處理,該插件則會向httpUrl
發(fā)起請求撵幽,在拿到返回結(jié)果webHandlerClientResponse
后灯荧,并將其塞進exchange
對象 - 接下來
WebClientResponse
插件會取出返回結(jié)果webHandlerClientResponse
,將其作為最終的返回結(jié)果輸出給到客戶端
分析
- 先從
configuration
入手盐杂,找到HttpClient
插件的配置類HttpClientPluginConfiguration
逗载,會有幾個關鍵的bean
:HttpClient
、WebClientConfiguration
和NettyHttpClientConfiguration
- 先看
HttpClient
链烈,涉及到httpClient
的實例化厉斟,包括連接池(禁用、定長强衡、彈性)擦秽、連接超時(默認45s
)、是否走代理(代理服務器漩勤、用戶名感挥、密碼;)锯七、https
public HttpClient httpClient(final HttpClientProperties properties) {
// configure pool resources
// httpClient的初始化链快,包括連接池(禁用、定長眉尸、彈性)域蜗、連接超時(默認45s)巨双、
// 是否走代理(代理服務器、用戶名霉祸、密碼筑累;)、https
// 連接池
HttpClientProperties.Pool pool = properties.getPool();
ConnectionProvider connectionProvider;
// 省略丝蹭。慢宗。。
HttpClient httpClient = HttpClient.create(connectionProvider)
.tcpConfiguration(tcpClient -> {
// 連接超時
if (properties.getConnectTimeout() != null) {
// 省略奔穿。镜沽。。
}
// configure proxy if proxy host is set.
// 是否走代理
HttpClientProperties.Proxy proxy = properties.getProxy();
// 省略贱田。缅茉。。
});
// ssl
HttpClientProperties.Ssl ssl = properties.getSsl();
// 省略男摧。蔬墩。。
return httpClient;
}
-
WebClientConfiguration
則包含了WebClientPlugin
與WebClientResponsePlugin
耗拓;而NettyHttpClientConfiguration
則包含了NettyHttpClientPlugin
與NettyClientResponsePlugin
- 他們都是為了實現(xiàn)
http
后端服務的調(diào)用拇颅,以及返回結(jié)果的處理,是一個二選一的事情乔询,由配置項soul.httpclient.strategy
決定樟插,其中WebClientConfiguration
為默認 - 我們具體分析下這兩種方式
WebClientConfiguration
WebClientPlugin
-
WebClientPlugin
實例化,需傳入WebClient
@Bean
public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
// 構(gòu)造webClient哥谷,是一個非阻塞岸夯、用于執(zhí)行http請求的響應式的客戶端麻献,其底層是Reactor Netty
// 類似restTemplate们妥,我們使用時也需要進行構(gòu)造
WebClient webClient = WebClient.builder()
// 客戶端連接器,傳入已預先配置好的httpClient勉吻,也就是我們在上面實例化的bean httpClient
.clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
.build();
return new WebClientPlugin(webClient);
}
-
WebClientPlugin
處理邏輯监婶,關鍵方法execute
與handleRequestBody
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 取出網(wǎng)關上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 請求路徑
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
// 原始請求的方法類型
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
// 請求體的構(gòu)造
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
// 處理請求體以及發(fā)送請求
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
return requestBodySpec
// 請求頭
.headers(httpHeaders -> {
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
// 請求內(nèi)容的類型
.contentType(buildMediaType(exchange))
// 請求body
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
// 執(zhí)行請求
.exchange()
// 請求異常時的處理
.doOnError(e -> log.error(e.getMessage()))
// 超時異常的拋出
.timeout(Duration.ofMillis(timeout))
// 重試:只有當請求發(fā)生連接超時,重試次數(shù)齿桃,重試算法使用2的指數(shù)退讓惑惶,第一次重試等待200ms,期間最大間隔為20s
.retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
// 將flux -> mono返回
.flatMap(e -> doNext(e, exchange, chain));
}
WebClientResponsePlugin
- 看下其處理邏輯
excute
方法
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 先執(zhí)行插件鏈
return chain.execute(exchange)
// then表示插件鏈處理完成后才執(zhí)行的邏輯短纵,then里邊的邏輯也是異步處理的
.then(Mono.defer(() -> {
ServerHttpResponse response = exchange.getResponse();
// 后端服務返回的響應
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
// 一些服務異常的處理
if (Objects.isNull(clientResponse)
|| response.getStatusCode() == HttpStatus.BAD_GATEWAY
|| response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 后端服務返回的狀態(tài)碼 cookie header的處理
response.setStatusCode(clientResponse.statusCode());
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// 最后將響應體寫入到客戶端響應中
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}));
}
小結(jié)
-
WebClientPlugin
底層直接用spring
官方實現(xiàn)的webClient
去完成后端服務的請求带污,請求之前需要將客戶端請求過來的請求頭header
、請求方法httpMethod
香到、請求內(nèi)容類型contentType
鱼冀、請求體requestBody
一并發(fā)送給后端服務报破;執(zhí)行請求后會將后端服務返回的響應放于exchange
中,交由插件WebClientResponsePlugin
處理千绪;最后繼續(xù)執(zhí)行插件鏈 -
WebClientResponsePlugin
是先執(zhí)行插件鏈充易,待插件鏈上的所有插件全部執(zhí)行完畢后才執(zhí)行自己的邏輯:處理客戶端原始請求的響應,即將由后端服務返回的響應clientResponse
荸型、狀態(tài)碼盹靴、cookie
、header
設置到其內(nèi)容之中
NettyHttpClientConfiguration
NettyHttpClientPlugin
-
NettyHttpClientPlugin
實例化瑞妇,傳入httpClient
即可
@Bean
public SoulPlugin nettyHttpClientPlugin(final ObjectProvider<HttpClient> httpClient) {
return new NettyHttpClientPlugin(httpClient.getIfAvailable());
}
- 看下插件的
execute
方法稿静,只保留關鍵邏輯
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 后端服務請求方法、請求頭的生成
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
HttpHeaders filtered = request.getHeaders();
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
String url = exchange.getAttribute(Constants.HTTP_URL);
// 辕狰。自赔。。
Flux<HttpClientResponse> responseFlux =
// 設置請求頭
this.httpClient.headers(headers -> headers.add(httpHeaders))
// 構(gòu)造requestSender
.request(method).uri(url)
// 發(fā)送請求 nettyOutBound出站的一個處理柳琢,即發(fā)送請求前的一個處理
// 需要將請求體的內(nèi)容轉(zhuǎn)換為netty的ByteBuf傳輸出去
.send((req, nettyOutbound) -> nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())))
// 將connection提取出Flux<HttpClientResponse>绍妨,
.responseConnection((res, connection) -> {
// 將后端服務返回的響應結(jié)果放到exchange中
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
// 將connection對象放到exchange中,需要傳遞給NettyClientResponsePlugin處理
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection);
// 處理header柬脸、cookie他去、httpStatus
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(Constants.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
} else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
} else {
throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());
}
response.getHeaders().putAll(headers);
return Mono.just(res);
});
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
Duration duration = Duration.ofMillis(timeout);
responseFlux = responseFlux
// 超時
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
// 異常的映射
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
// 繼續(xù)執(zhí)行插件鏈
return responseFlux.then(chain.execute(exchange));
}
NettyClientResponsePlugin
- 直接看插件的
execute
方法
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 先執(zhí)行插件自身的邏輯,為異步執(zhí)行
return Mono.defer(() -> {
Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
// NettyDataBuffer的響應
final Flux<NettyDataBuffer> body = connection
// 對于client端的connection對象倒堕,你響應的處理反而是inbound灾测;
// 必須先配置inbound作為connection的橋梁,否則是無法接收數(shù)據(jù)的
.inbound()
// 接收:轉(zhuǎn)換數(shù)據(jù)
.receive()
// 將內(nèi)存buffers保留進行復用
.retain()
// 將ByteBuf轉(zhuǎn)換為spring中的NettyDataBuffer
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
// 是否為流媒體響應垦巴,若是直接返回
return isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body);
})
// 執(zhí)行插件
.then(chain.execute(exchange)
//TODO question 成功的情況下怎么沒有釋放connection
.doOnError(throwable -> cleanup(exchange))).doOnCancel(() -> cleanup(exchange));
}
小結(jié)
-
NettyHttpClientPlugin
直接使用httpClient
做為發(fā)送請求的載體媳搪,先構(gòu)造requestSender
(請求方法、請求路徑骤宣、請求頭)秦爆,然后調(diào)用其send
方法獲取到responseReceiver
,再通過responseConnection
提取出Flux<HttpClientResponse>
憔披,其中對于響應中的(header
等限、cookie
、httpStatus
)這些就直接在這個提取邏輯中處理芬膝,而后端服務返回的響應webHandlerClientResponse
望门,還有整個connection
對象則放到exchange
中 -
NettyClientResponsePlugin
主要邏輯就是處理后端服務返回的響應。有些奇怪的是锰霜,這里并沒有直接使用NettyHttpClientPlugin
中放到exchange
對象的webHandlerClientResponse
筹误,而是從connection
對象中讀取響應數(shù)據(jù);隨后再執(zhí)行插件鏈邏輯
總結(jié)
- 上述基本將遠程客戶端的
http
請求轉(zhuǎn)發(fā)到后端服務癣缅,以及將后端服務的響應再返回給遠程客戶端的代碼實現(xiàn)進行了分析 -
soul
網(wǎng)關提供了webClient
與nettyClient
兩套實現(xiàn)的機制厨剪,可供選擇 - 其中
webClient
方式是直接使用spring
自己封裝的WebClient
(是在HttpClient
之上封裝實現(xiàn)的一套非阻塞勘畔,響應式的http
客戶端)作為請求的載體,整個代碼更簡潔明了;而nettyClient
則是用更底層的HttpClient
去做http
請求操作,需自行處理netty
的出站outbound
與入站inbound
邏輯音羞,整個代碼會更復雜一些。