十三痊土、soul源碼學(xué)習(xí)-http請求鏈路跟蹤

前面我們從配置的修改是如何更新SoulAdmin本地緩存的,再到網(wǎng)關(guān)和SoulAdmin是如何同步數(shù)據(jù)等墨林,講解了數(shù)據(jù)同步的機(jī)制赁酝,是為了保證我們網(wǎng)關(guān)能夠正確的處理請求,并針對配置的插件進(jìn)行正確的處理旭等,接下來我們從一個(gè)真正的用戶請求http到網(wǎng)關(guān)以及如何最后到我們真正請求的整個(gè)鏈路做一下分析

SoulWebHandler酌呆,是網(wǎng)關(guān)請求的入口。

//org.dromara.soul.web.handler.SoulWebHandler
//實(shí)現(xiàn)了WebHandler
public final class SoulWebHandler implements WebHandler {

    private final List<SoulPlugin> plugins;

    private final Scheduler scheduler;

    /**
     * 初始化的時(shí)候注入所有的SoulPlugin插件
     */
    public SoulWebHandler(final List<SoulPlugin> plugins) {
        this.plugins = plugins;
        String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
        if (Objects.equals(schedulerType, "fixed")) {
            int threads = Integer.parseInt(System.getProperty(
                    "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
            scheduler = Schedulers.newParallel("soul-work-threads", threads);
        } else {
            scheduler = Schedulers.elastic();
        }
    }
}

在我們請求過來的時(shí)候搔耕,會(huì)走到handle

//org.dromara.soul.web.handler.SoulWebHandler#handle
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
  //監(jiān)控相關(guān)
  MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
  Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
  //構(gòu)造DefaultSoulPluginChain肪笋,默認(rèn)的插件鏈進(jìn)行處理
  return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
    .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}

DefaultSoulPluginChain 使用了責(zé)任鏈的設(shè)計(jì)模式,針對一個(gè)請求度迂,對所有的插件進(jìn)行過濾

//org.dromara.soul.web.handler.SoulWebHandler.DefaultSoulPluginChain
private static class DefaultSoulPluginChain implements SoulPluginChain {

  private int index;

  private final List<SoulPlugin> plugins;
  
  DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
    this.plugins = plugins;
  }

  /**
         * Delegate to the next {@code WebFilter} in the chain.
         *
         * @param exchange the current server exchange
         * @return {@code Mono<Void>} to indicate when request handling is complete
         */
  @Override
  public Mono<Void> execute(final ServerWebExchange exchange) {
    return Mono.defer(() -> {
      if (this.index < plugins.size()) {
        SoulPlugin plugin = plugins.get(this.index++);
        Boolean skip = plugin.skip(exchange);
        if (skip) {
          return this.execute(exchange);
        }
        return plugin.execute(exchange, this);
      }
      return Mono.empty();
    });
  }
}

我們看到plugins是按照順序循環(huán)處理的,而且每次的順序是一致的,GlobalPlugin肯定在第一位猜揪,這是怎么實(shí)現(xiàn)的惭墓,我們看下GlobalPlugin插件

//org.dromara.soul.plugin.global.GlobalPlugin
public class GlobalPlugin implements SoulPlugin {
    
    private final SoulContextBuilder builder;
    
    public GlobalPlugin(final SoulContextBuilder builder) {
        this.builder = builder;
    }
  
  //通過getOrder保證初始化的順序
    @Override
    public int getOrder() {
        return 0;
    }
    
}

通過看getOrder的調(diào)用方我們發(fā)現(xiàn)

//org.dromara.soul.web.configuration.SoulConfiguration
@Configuration
@ComponentScan("org.dromara.soul")
@Import(value = {ErrorHandlerConfiguration.class, SoulExtConfiguration.class, SpringExtConfiguration.class})
@Slf4j
public class SoulConfiguration {

   
    @Bean("webHandler")
    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
      //在這里進(jìn)行重排序
        final List<SoulPlugin> soulPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
        return new SoulWebHandler(soulPlugins);
    }
}

所以 插件的順序是定義好的,每次請求的第一個(gè)肯定是GlobalPlugin而姐。GlobalPlugin是最先執(zhí)行的插件

//org.dromara.soul.plugin.global.GlobalPlugin
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  final ServerHttpRequest request = exchange.getRequest();
  final HttpHeaders headers = request.getHeaders();
  final String upgrade = headers.getFirst("Upgrade");
  SoulContext soulContext;
  //先忽略Upgrade腊凶,普通請求upgrade為空
  if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
    soulContext = builder.build(exchange);
  } else {
    final MultiValueMap<String, String> queryParams = request.getQueryParams();
    soulContext = transformMap(queryParams);
  }
  exchange.getAttributes().put(Constants.CONTEXT, soulContext);
  return chain.execute(exchange);
}

這里會(huì)走到DefaultSoulContextBuilder

//org.dromara.soul.plugin.global.DefaultSoulContextBuilder
@Override
public SoulContext build(final ServerWebExchange exchange) {
  final ServerHttpRequest request = exchange.getRequest();
  //獲取到請求的path
  String path = request.getURI().getPath();
  //http先不關(guān)注metaData
  MetaData metaData = MetaDataCache.getInstance().obtain(path);
  if (Objects.nonNull(metaData) && metaData.getEnabled()) {
    exchange.getAttributes().put(Constants.META_DATA, metaData);
  }
  //將請求和元數(shù)據(jù)轉(zhuǎn)換成SoulContext
  return transform(request, metaData);
}
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder#transform
//構(gòu)造Soul的上下文信息
private SoulContext transform(final ServerHttpRequest request, final MetaData metaData) {
  //Constants.APP_KEY = appKey
  final String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
  //Constants.SIGN = sign
  final String sign = request.getHeaders().getFirst(Constants.SIGN);
  //Constants.TIMESTAMP = timestamp
  final String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
  //從header獲取信息
  SoulContext soulContext = new SoulContext();
  String path = request.getURI().getPath();
  soulContext.setPath(path);
  //判斷元數(shù)據(jù)信息,通過元數(shù)據(jù)來拍斷當(dāng)前的請求是屬于什么類型
  if (Objects.nonNull(metaData) && metaData.getEnabled()) {
    if (RpcTypeEnum.SPRING_CLOUD.getName().equals(metaData.getRpcType())) {
      setSoulContextByHttp(soulContext, path);
      soulContext.setRpcType(metaData.getRpcType());
    } else if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
      setSoulContextByDubbo(soulContext, metaData);
    } else if (RpcTypeEnum.SOFA.getName().equals(metaData.getRpcType())) {
      setSoulContextBySofa(soulContext, metaData);
    } else if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
      setSoulContextByTars(soulContext, metaData);
    } else {
      setSoulContextByHttp(soulContext, path);
      soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
    }
    //默認(rèn)當(dāng)成http處理
  } else {
    setSoulContextByHttp(soulContext, path);
    soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
  }
  //注入必要信息
  soulContext.setAppKey(appKey);
  soulContext.setSign(sign);
  soulContext.setTimestamp(timestamp);
  soulContext.setStartDateTime(LocalDateTime.now());
  Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> soulContext.setHttpMethod(httpMethod.name()));
  return soulContext;
}

GlobalPlugin是最關(guān)鍵的插件拴念,通過上面流程钧萍,構(gòu)造Soul的上下文,從而使得后面的插件判斷才有依據(jù)來決定是走哪一個(gè)插件

通過責(zé)任鏈政鼠,依次循環(huán)調(diào)用所有的插件风瘦,直到中間某個(gè)插件匹配調(diào)用生效為止。我們來看下http請求公般,最終會(huì)命中Divide插件万搔,我們來看下DividePlugin插件

//org.dromara.soul.plugin.divide.DividePlugin#skip
@Override
public Boolean skip(final ServerWebExchange exchange) {
  //GlobalPlugin構(gòu)造的上下文
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  //DividePlugin會(huì)判斷當(dāng)前的SoulContext的RpcType是否是Http
  return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}

發(fā)現(xiàn)不需要跳過后胡桨,會(huì)進(jìn)入AbstractSoulPlugin的execute。包括剛才的GlobalPlugin也會(huì)經(jīng)過這里

//org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  String pluginName = named();
  //從本地緩存中獲取PluginData數(shù)據(jù)瞬雹,這里的本地緩存就是之前我們講的數(shù)據(jù)同步所維護(hù)的緩存
  final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
  //判斷plugin是否為空并且開啟
  if (pluginData != null && pluginData.getEnabled()) {
    //在獲取本地Selector數(shù)據(jù)緩存昧谊。獲取SelectorData
    final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
    //如果該插件selector為空會(huì)走這里,里面邏輯實(shí)際上就是調(diào)用下一個(gè)插件
    if (CollectionUtils.isEmpty(selectors)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    //如果Selectors不為空酗捌,則去看是否匹配呢诬,匹配邏輯暫時(shí)先不展開講,之后再看
    final SelectorData selectorData = matchSelector(exchange, selectors);
    //如果匹配為空則繼續(xù)調(diào)用下一個(gè)插件
    if (Objects.isNull(selectorData)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    selectorLog(selectorData, pluginName);
    //在獲取規(guī)則數(shù)據(jù)
    final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
    //規(guī)則為空則繼續(xù)執(zhí)行下一個(gè)插件
    if (CollectionUtils.isEmpty(rules)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    RuleData rule;
    //如果是全流量
    if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
      //則獲取最后一個(gè)規(guī)則
      rule = rules.get(rules.size() - 1);
    } else {
      //判斷是否有匹配規(guī)則胖缤,具體匹配校驗(yàn)之后再說
      rule = matchRule(exchange, rules);
    }
    //如果規(guī)則為空尚镰,繼續(xù)執(zhí)行下一個(gè)插件
    if (Objects.isNull(rule)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    ruleLog(rule, pluginName);
    //如果規(guī)則不為空,則調(diào)用doExecute草姻。對應(yīng)插件的具體實(shí)現(xiàn)
    return doExecute(exchange, chain, selectorData, rule);
  }
  return chain.execute(exchange);
}
//org.dromara.soul.plugin.divide.DividePlugin
public class DividePlugin extends AbstractSoulPlugin {

  
    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
      //這里拿到之前GlobalPlugin的上下文
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
      //獲取Divide規(guī)則處理器钓猬,DivideRuleHandle包含了負(fù)載均衡策略以及重試次數(shù)和超時(shí)時(shí)間信息
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
      //這里根據(jù)之前的探活機(jī)制,獲取到對應(yīng)選擇器的上游服務(wù)列表
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
      //如果為空則拋異常撩独,并直接返回WebFlux結(jié)果
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
      //獲取當(dāng)前調(diào)用方IP
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
      //通過負(fù)載均衡獲取對應(yīng)的上游
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
      //如果上游為空則返回異常信息
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 注入必要信息
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
      //在調(diào)用下一個(gè)插件
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }
}

我們發(fā)現(xiàn)Divide插件并沒有真正的去調(diào)用敞曹,而是主要做一些獲取上游服務(wù)器列表以及根據(jù)負(fù)載均衡選擇一個(gè)有效的遠(yuǎn)端服務(wù),并注入到對應(yīng)的屬性中综膀,供后面使用澳迫,真正調(diào)用遠(yuǎn)端的插件式WebClientPlugin插件

//org.dromara.soul.plugin.httpclient.WebClientPlugin#execute
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  //獲取上下文
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  assert soulContext != null;
  //獲取url地址
  String urlPath = exchange.getAttribute(Constants.HTTP_URL);
  if (StringUtils.isEmpty(urlPath)) {
    Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
    return WebFluxResultUtils.result(exchange, error);
  }
  long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
  int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
  log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
  HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
  WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
  //調(diào)用遠(yuǎn)端服務(wù)
  return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
//org.dromara.soul.plugin.httpclient.WebClientPlugin#handleRequestBody
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
  //使用異步編程方式調(diào)用遠(yuǎn)端服務(wù)并返回結(jié)果
  return requestBodySpec.headers(httpHeaders -> {
    httpHeaders.addAll(exchange.getRequest().getHeaders());
    httpHeaders.remove(HttpHeaders.HOST);
  })
    .contentType(buildMediaType(exchange))
    .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
    .exchange()
    .doOnError(e -> log.error(e.getMessage()))
    .timeout(Duration.ofMillis(timeout))
    .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
               .retryMax(retryTimes)
               .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
    .flatMap(e -> doNext(e, exchange, chain));

}

到這里我們就完整走了一遍http的鏈路跟蹤,中間還有很多其他的細(xì)節(jié)需要之后在講解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末剧劝,一起剝皮案震驚了整個(gè)濱河市橄登,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讥此,老刑警劉巖拢锹,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異萄喳,居然都是意外死亡卒稳,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門他巨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來充坑,“玉大人,你說我怎么就攤上這事染突∧硪” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵份企,是天一觀的道長也榄。 經(jīng)常有香客問我,道長薪棒,這世上最難降的妖魔是什么手蝎? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任榕莺,我火速辦了婚禮,結(jié)果婚禮上棵介,老公的妹妹穿的比我還像新娘钉鸯。我一直安慰自己,他們只是感情好邮辽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布唠雕。 她就那樣靜靜地躺著,像睡著了一般吨述。 火紅的嫁衣襯著肌膚如雪岩睁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天揣云,我揣著相機(jī)與錄音捕儒,去河邊找鬼。 笑死邓夕,一個(gè)胖子當(dāng)著我的面吹牛刘莹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播焚刚,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼点弯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了矿咕?” 一聲冷哼從身側(cè)響起抢肛,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎碳柱,沒想到半個(gè)月后捡絮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡莲镣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年锦援,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片剥悟。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖曼库,靈堂內(nèi)的尸體忽然破棺而出区岗,到底是詐尸還是另有隱情,我是刑警寧澤毁枯,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布慈缔,位于F島的核電站,受9級(jí)特大地震影響种玛,放射性物質(zhì)發(fā)生泄漏藐鹤。R本人自食惡果不足惜瓤檐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望娱节。 院中可真熱鬧挠蛉,春花似錦、人聲如沸肄满。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽稠歉。三九已至掰担,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間怒炸,已是汗流浹背带饱。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留阅羹,地道東北人勺疼。 一個(gè)月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像灯蝴,于是被迫代替她去往敵國和親恢口。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容