一违霞、場景
使用spring cloud gateway后霜第,有了熔斷泌类,問題也就隨之而來,服務(wù)間調(diào)用有了hystrix可以及時的排除壞接口弹砚、壞服務(wù)的問題桌吃,對系統(tǒng)很有幫助苞轿。但是搬卒!不是所有的接口都是極短時間內(nèi)完成的,不是所有的接口都可以設(shè)置一樣的超時時間的摆寄!
那么我們面臨一個問題微饥,那就是百分之99的接口都可以在1s內(nèi)完美完成,但是就是那幾個特殊接口矩肩,需要十幾秒蛮拔,幾十秒的等待時間痹升,而默認(rèn)熔斷的時間又只有一個疼蛾。
二、分析
在前面springcloudgateway源碼解析之請求篇中我們知道請求會經(jīng)過一些列的過濾器(GatewayFilter),而springcloudgateway的降級熔斷處理就是由一個特殊的過濾器來處理的衍慎,通過源碼分析我們關(guān)注到HystrixGatewayFilterFactory這個類稳捆,這個類的作用就是生產(chǎn)GatewayFilter用的麦轰,我們看下它的實現(xiàn)
可以看到紅框處最后構(gòu)建了一個匿名的GatewayFilter對象返回末荐,這個對象在接口請求過程中會被加載到過濾器鏈條中新锈,仔細(xì)看到 這里是創(chuàng)建了一個RouteHystrixCommand這個命令對象,最終調(diào)用command.toObservable()方法處理請求块请,如果超時熔斷調(diào)用resumeWithFallback方法
通過源碼分析 gateway在路由時可以指定HystrixCommandKey负乡,并且對HystrixCommandKey設(shè)置超時時間
三、方案
知道網(wǎng)關(guān)熔斷的原理就好辦了,自定義熔斷的過濾器配置到接口請求過程中狸涌,由過濾器來讀取接口熔斷配置并構(gòu)建HystrixObservableCommand處理請求。
自定義一個類XXXGatewayFilterFactory繼承AbstractGatewayFilterFactory朝捆,將api和對應(yīng)的timeout配置化芙盘,來實現(xiàn)細(xì)化到具體接口的熔斷配置脸秽,具體實現(xiàn)如下:
package org.unicorn.framework.gateway.filter;
import cn.hutool.core.collection.CollectionUtil;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.gateway.support.TimeoutException;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Subscription;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
/**
* @Author: xiebin
* @Description:
* @Date:Create:in 2022-07-06 9:17
*/
@Component
public class UnicornHystrixGatewayFilterFactoryextends AbstractGatewayFilterFactory {
private static final StringNAME ="unicornHystrix";
? ? private ObjectProviderdispatcherHandlerProvider;
? ? private AntPathMatcherantPathMatcher;
? ? public UnicornHystrixGatewayFilterFactory(ObjectProvider dispatcherHandlerProvider) {
super(Config.class);
? ? ? ? this.dispatcherHandlerProvider = dispatcherHandlerProvider;
? ? ? ? this.antPathMatcher =new AntPathMatcher();
? ? }
@Override
? ? public ListshortcutFieldOrder() {
return Collections.singletonList(NAME_KEY);
? ? }
/**
* 獲取服務(wù)ID
*
? ? * @param exchange
? ? * @return
? ? */
? ? public StringserviceId(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
? ? ? ? String path = request.getPath().pathWithinApplication().value();
? ? ? ? try {
path = path.split("/")[1];
? ? ? ? ? ? return path;
? ? ? ? }catch (Exception e) {
return "default";
? ? ? ? }
}
/**
? ? * @param key
? ? * @param timeout
? ? * @return
? ? */
? ? private HystrixObservableCommand.SetterinitSetter(String key, Integer timeout) {
HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key)).andCommandKey(HystrixCommandKey.Factory.asKey(key));
? ? ? ? if (timeout !=null) {
setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeout));
? ? ? ? }
return setter;
? ? }
/**
? ? * @param exchange
? ? * @param chain
? ? * @param config
? ? * @return
? ? */
? ? private UnicornRouteHystrixCommandinitUnicornRouteHystrixCommand(ServerWebExchange exchange, GatewayFilterChain chain, Config config) {
//路由配置的超時設(shè)置
? ? ? ? List apiTimeoutList = config.getTimeouts();
? ? ? ? ServerHttpRequest request = exchange.getRequest();
? ? ? ? String path = request.getPath().pathWithinApplication().value();
? ? ? ? UnicornRouteHystrixCommand command;
? ? ? ? if (CollectionUtil.isNotEmpty(apiTimeoutList)) {
//request匹配屬于那種模式
? ? ? ? ? ? ApiHystrixTimeout apiHystrixTimeout = getApiHystrixTimeout(apiTimeoutList, path);
? ? ? ? ? ? command =new UnicornRouteHystrixCommand(config.getFallbackUri(), exchange, chain, initSetter(apiHystrixTimeout.getApiPattern(), apiHystrixTimeout.getTimeout()));
? ? ? ? }else {
command =new UnicornRouteHystrixCommand(config.getFallbackUri(), exchange, chain, initSetter(serviceId(exchange), null));
? ? ? ? }
return command;
? ? }
/**
? ? * @param apiTimeoutList
? ? * @param path
? ? * @return
? ? */
? ? private ApiHystrixTimeoutgetApiHystrixTimeout(List apiTimeoutList, String path) {
for (ApiHystrixTimeout apiTimeoutPattern : apiTimeoutList) {
if (this.antPathMatcher.match(apiTimeoutPattern.getApiPattern(), path)) {
return apiTimeoutPattern;
? ? ? ? ? ? }
}
ApiHystrixTimeout apiHystrixTimeout =new ApiHystrixTimeout();
? ? ? ? apiHystrixTimeout.setApiPattern("default");
? ? ? ? apiHystrixTimeout.timeout =null;
? ? ? ? return apiHystrixTimeout;
? ? }
@Override
? ? public GatewayFilterapply(Config config) {
return (exchange, chain) -> {
UnicornRouteHystrixCommand command = initUnicornRouteHystrixCommand(exchange, chain, config);
? ? ? ? ? ? return Mono.create(s -> {
Subscription sub =command.toObservable().subscribe(s::success, s::error, s::success);
? ? ? ? ? ? ? ? s.onCancel(sub::unsubscribe);
? ? ? ? ? ? }).onErrorResume((Function>) throwable -> {
if (throwableinstanceof HystrixRuntimeException) {
HystrixRuntimeException e = (HystrixRuntimeException) throwable;
? ? ? ? ? ? ? ? ? ? HystrixRuntimeException.FailureType failureType = e.getFailureType();
? ? ? ? ? ? ? ? ? ? switch (failureType) {
case TIMEOUT:
return Mono.error(new TimeoutException());
? ? ? ? ? ? ? ? ? ? ? ? case COMMAND_EXCEPTION: {
Throwable cause = e.getCause();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (causeinstanceof ResponseStatusException || AnnotatedElementUtils
.findMergedAnnotation(cause.getClass(), ResponseStatus.class) !=null) {
return Mono.error(cause);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
}
default:
break;
? ? ? ? ? ? ? ? ? ? }
}
return Mono.error(throwable);
? ? ? ? ? ? }).then();
? ? ? ? };
? ? }
@Override
? ? public Stringname() {
return NAME;
? ? }
private class UnicornRouteHystrixCommandextends HystrixObservableCommand {
private final URIfallbackUri;
? ? ? ? private final ServerWebExchangeexchange;
? ? ? ? private final GatewayFilterChainchain;
? ? ? ? /**
? ? ? ? * @param fallbackUri
? ? ? ? * @param exchange
? ? ? ? * @param chain
? ? ? ? */
? ? ? ? public UnicornRouteHystrixCommand(URI fallbackUri, ServerWebExchange exchange, GatewayFilterChain chain, HystrixObservableCommand.Setter setter) {
super(setter);
? ? ? ? ? ? this.fallbackUri = fallbackUri;
? ? ? ? ? ? this.exchange = exchange;
? ? ? ? ? ? this.chain = chain;
? ? ? ? }
@Override
? ? ? ? protected Observableconstruct() {
return RxReactiveStreams.toObservable(this.chain.filter(exchange));
? ? ? ? }
@Override
? ? ? ? protected ObservableresumeWithFallback() {
if (null ==fallbackUri) {
return super.resumeWithFallback();
? ? ? ? ? ? }
URI uri =exchange.getRequest().getURI();
? ? ? ? ? ? boolean encoded = ServerWebExchangeUtils.containsEncodedParts(uri);
? ? ? ? ? ? URI requestUrl = UriComponentsBuilder.fromUri(uri)
.host(null)
.port(null)
.uri(this.fallbackUri)
.build(encoded)
.toUri();
? ? ? ? ? ? exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
? ? ? ? ? ? ServerHttpRequest request =this.exchange.getRequest().mutate().uri(requestUrl).build();
? ? ? ? ? ? ServerWebExchange mutated =exchange.mutate().request(request).build();
? ? ? ? ? ? DispatcherHandler dispatcherHandler = UnicornHystrixGatewayFilterFactory.this.dispatcherHandlerProvider.getIfAvailable();
? ? ? ? ? ? return RxReactiveStreams.toObservable(dispatcherHandler.handle(mutated));
? ? ? ? }
}
public static class ApiHystrixTimeout {
public StringgetApiPattern() {
return apiPattern;
? ? ? ? }
public void setApiPattern(String apiPattern) {
this.apiPattern = apiPattern;
? ? ? ? }
public IntegergetTimeout() {
return timeout;
? ? ? ? }
public void setTimeout(Integer timeout) {
this.timeout = timeout;
? ? ? ? }
private StringapiPattern;
? ? ? ? private Integertimeout;
? ? }
public static class Config {
private Stringid;
? ? ? ? private URIfallbackUri;
? ? ? ? /**
* url -> timeout ms
*/
? ? ? ? private Listtimeouts;
? ? ? ? public StringgetId() {
return id;
? ? ? ? }
public ConfigsetId(String id) {
this.id = id;
return this;
? ? ? ? }
public URIgetFallbackUri() {
return fallbackUri;
? ? ? ? }
public ConfigsetFallbackUri(URI fallbackUri) {
if (fallbackUri !=null && !"forward".equals(fallbackUri.getScheme())) {
throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
? ? ? ? ? ? }
this.fallbackUri = fallbackUri;
return this;
? ? ? ? }
public ListgetTimeouts() {
return timeouts;
? ? ? ? }
public ConfigsetTimeouts(List timeouts) {
this.timeouts = timeouts;
return this;
? ? ? ? }
}
}
配置示例
spring.cloud.gateway.default-filters[0].name=unicornHystrix
spring.cloud.gateway.default-filters[0].args.fallbackUri=forward:/defaultFallback
spring.cloud.gateway.default-filters[0].args.timeouts[0].apiPattern=/gf-oss-service//oss/part/upload
spring.cloud.gateway.default-filters[0].args.timeouts[0].timeout=100000