前面我們從配置的修改是如何更新SoulAdmin本地緩存的,再到網(wǎng)關(guān)和SoulAdmin是如何同步數(shù)據(jù)等墨林,講解了數(shù)據(jù)同步的機(jī)制赁酝,是為了保證我們網(wǎng)關(guān)能夠正確的處理請求,并針對配置的插件進(jìn)行正確的處理旭等,接下來我們從一個(gè)真正的用戶請求http到網(wǎng)關(guān)以及如何最后到我們真正請求的整個(gè)鏈路做一下分析
SoulWebHandler酌呆,是網(wǎng)關(guān)請求的入口。
//org.dromara.soul.web.handler.SoulWebHandler
//實(shí)現(xiàn)了WebHandler
public final class SoulWebHandler implements WebHandler {
private final List<SoulPlugin> plugins;
private final Scheduler scheduler;
/**
* 初始化的時(shí)候注入所有的SoulPlugin插件
*/
public SoulWebHandler(final List<SoulPlugin> plugins) {
this.plugins = plugins;
String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
if (Objects.equals(schedulerType, "fixed")) {
int threads = Integer.parseInt(System.getProperty(
"soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
scheduler = Schedulers.newParallel("soul-work-threads", threads);
} else {
scheduler = Schedulers.elastic();
}
}
}
在我們請求過來的時(shí)候搔耕,會(huì)走到handle
//org.dromara.soul.web.handler.SoulWebHandler#handle
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
//監(jiān)控相關(guān)
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
//構(gòu)造DefaultSoulPluginChain肪笋,默認(rèn)的插件鏈進(jìn)行處理
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
DefaultSoulPluginChain 使用了責(zé)任鏈的設(shè)計(jì)模式,針對一個(gè)請求度迂,對所有的插件進(jìn)行過濾
//org.dromara.soul.web.handler.SoulWebHandler.DefaultSoulPluginChain
private static class DefaultSoulPluginChain implements SoulPluginChain {
private int index;
private final List<SoulPlugin> plugins;
DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
this.plugins = plugins;
}
/**
* Delegate to the next {@code WebFilter} in the chain.
*
* @param exchange the current server exchange
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
我們看到plugins是按照順序循環(huán)處理的,而且每次的順序是一致的,GlobalPlugin肯定在第一位猜揪,這是怎么實(shí)現(xiàn)的惭墓,我們看下GlobalPlugin插件
//org.dromara.soul.plugin.global.GlobalPlugin
public class GlobalPlugin implements SoulPlugin {
private final SoulContextBuilder builder;
public GlobalPlugin(final SoulContextBuilder builder) {
this.builder = builder;
}
//通過getOrder保證初始化的順序
@Override
public int getOrder() {
return 0;
}
}
通過看getOrder的調(diào)用方我們發(fā)現(xiàn)
//org.dromara.soul.web.configuration.SoulConfiguration
@Configuration
@ComponentScan("org.dromara.soul")
@Import(value = {ErrorHandlerConfiguration.class, SoulExtConfiguration.class, SpringExtConfiguration.class})
@Slf4j
public class SoulConfiguration {
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
//在這里進(jìn)行重排序
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
}
所以 插件的順序是定義好的,每次請求的第一個(gè)肯定是GlobalPlugin而姐。GlobalPlugin是最先執(zhí)行的插件
//org.dromara.soul.plugin.global.GlobalPlugin
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final HttpHeaders headers = request.getHeaders();
final String upgrade = headers.getFirst("Upgrade");
SoulContext soulContext;
//先忽略Upgrade腊凶,普通請求upgrade為空
if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
soulContext = builder.build(exchange);
} else {
final MultiValueMap<String, String> queryParams = request.getQueryParams();
soulContext = transformMap(queryParams);
}
exchange.getAttributes().put(Constants.CONTEXT, soulContext);
return chain.execute(exchange);
}
這里會(huì)走到DefaultSoulContextBuilder
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder
@Override
public SoulContext build(final ServerWebExchange exchange) {
final ServerHttpRequest request = exchange.getRequest();
//獲取到請求的path
String path = request.getURI().getPath();
//http先不關(guān)注metaData
MetaData metaData = MetaDataCache.getInstance().obtain(path);
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
}
//將請求和元數(shù)據(jù)轉(zhuǎn)換成SoulContext
return transform(request, metaData);
}
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder#transform
//構(gòu)造Soul的上下文信息
private SoulContext transform(final ServerHttpRequest request, final MetaData metaData) {
//Constants.APP_KEY = appKey
final String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
//Constants.SIGN = sign
final String sign = request.getHeaders().getFirst(Constants.SIGN);
//Constants.TIMESTAMP = timestamp
final String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
//從header獲取信息
SoulContext soulContext = new SoulContext();
String path = request.getURI().getPath();
soulContext.setPath(path);
//判斷元數(shù)據(jù)信息,通過元數(shù)據(jù)來拍斷當(dāng)前的請求是屬于什么類型
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
if (RpcTypeEnum.SPRING_CLOUD.getName().equals(metaData.getRpcType())) {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(metaData.getRpcType());
} else if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
setSoulContextByDubbo(soulContext, metaData);
} else if (RpcTypeEnum.SOFA.getName().equals(metaData.getRpcType())) {
setSoulContextBySofa(soulContext, metaData);
} else if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
setSoulContextByTars(soulContext, metaData);
} else {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
}
//默認(rèn)當(dāng)成http處理
} else {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
}
//注入必要信息
soulContext.setAppKey(appKey);
soulContext.setSign(sign);
soulContext.setTimestamp(timestamp);
soulContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> soulContext.setHttpMethod(httpMethod.name()));
return soulContext;
}
GlobalPlugin是最關(guān)鍵的插件拴念,通過上面流程钧萍,構(gòu)造Soul的上下文,從而使得后面的插件判斷才有依據(jù)來決定是走哪一個(gè)插件
通過責(zé)任鏈政鼠,依次循環(huán)調(diào)用所有的插件风瘦,直到中間某個(gè)插件匹配調(diào)用生效為止。我們來看下http請求公般,最終會(huì)命中Divide插件万搔,我們來看下DividePlugin插件
//org.dromara.soul.plugin.divide.DividePlugin#skip
@Override
public Boolean skip(final ServerWebExchange exchange) {
//GlobalPlugin構(gòu)造的上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
//DividePlugin會(huì)判斷當(dāng)前的SoulContext的RpcType是否是Http
return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}
發(fā)現(xiàn)不需要跳過后胡桨,會(huì)進(jìn)入AbstractSoulPlugin的execute。包括剛才的GlobalPlugin也會(huì)經(jīng)過這里
//org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
//從本地緩存中獲取PluginData數(shù)據(jù)瞬雹,這里的本地緩存就是之前我們講的數(shù)據(jù)同步所維護(hù)的緩存
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
//判斷plugin是否為空并且開啟
if (pluginData != null && pluginData.getEnabled()) {
//在獲取本地Selector數(shù)據(jù)緩存昧谊。獲取SelectorData
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
//如果該插件selector為空會(huì)走這里,里面邏輯實(shí)際上就是調(diào)用下一個(gè)插件
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//如果Selectors不為空酗捌,則去看是否匹配呢诬,匹配邏輯暫時(shí)先不展開講,之后再看
final SelectorData selectorData = matchSelector(exchange, selectors);
//如果匹配為空則繼續(xù)調(diào)用下一個(gè)插件
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
//在獲取規(guī)則數(shù)據(jù)
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
//規(guī)則為空則繼續(xù)執(zhí)行下一個(gè)插件
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
//如果是全流量
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//則獲取最后一個(gè)規(guī)則
rule = rules.get(rules.size() - 1);
} else {
//判斷是否有匹配規(guī)則胖缤,具體匹配校驗(yàn)之后再說
rule = matchRule(exchange, rules);
}
//如果規(guī)則為空尚镰,繼續(xù)執(zhí)行下一個(gè)插件
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
//如果規(guī)則不為空,則調(diào)用doExecute草姻。對應(yīng)插件的具體實(shí)現(xiàn)
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
//org.dromara.soul.plugin.divide.DividePlugin
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
//這里拿到之前GlobalPlugin的上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
//獲取Divide規(guī)則處理器钓猬,DivideRuleHandle包含了負(fù)載均衡策略以及重試次數(shù)和超時(shí)時(shí)間信息
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
//這里根據(jù)之前的探活機(jī)制,獲取到對應(yīng)選擇器的上游服務(wù)列表
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
//如果為空則拋異常撩独,并直接返回WebFlux結(jié)果
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
//獲取當(dāng)前調(diào)用方IP
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
//通過負(fù)載均衡獲取對應(yīng)的上游
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
//如果上游為空則返回異常信息
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 注入必要信息
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
//在調(diào)用下一個(gè)插件
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}
}
我們發(fā)現(xiàn)Divide插件并沒有真正的去調(diào)用敞曹,而是主要做一些獲取上游服務(wù)器列表以及根據(jù)負(fù)載均衡選擇一個(gè)有效的遠(yuǎn)端服務(wù),并注入到對應(yīng)的屬性中综膀,供后面使用澳迫,真正調(diào)用遠(yuǎn)端的插件式WebClientPlugin插件
//org.dromara.soul.plugin.httpclient.WebClientPlugin#execute
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
//獲取上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
//獲取url地址
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
//調(diào)用遠(yuǎn)端服務(wù)
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
//org.dromara.soul.plugin.httpclient.WebClientPlugin#handleRequestBody
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
//使用異步編程方式調(diào)用遠(yuǎn)端服務(wù)并返回結(jié)果
return requestBodySpec.headers(httpHeaders -> {
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
.contentType(buildMediaType(exchange))
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
.exchange()
.doOnError(e -> log.error(e.getMessage()))
.timeout(Duration.ofMillis(timeout))
.retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.flatMap(e -> doNext(e, exchange, chain));
}
到這里我們就完整走了一遍http的鏈路跟蹤,中間還有很多其他的細(xì)節(jié)需要之后在講解