Spring Cloud Ribbon 源碼分析
前言
原理介紹
ribbon提供了http請(qǐng)求負(fù)載均衡的能力葛账,既然要擴(kuò)展調(diào)度能力梧却,就需要在請(qǐng)求之前,通過(guò)某種調(diào)度策略選擇合適的server來(lái)調(diào)用集歇。
想要實(shí)現(xiàn)這個(gè)效果需要以下幾步:
- 對(duì)http的請(qǐng)求已經(jīng)攔截绍刮,對(duì)其進(jìn)行擴(kuò)展
- 通過(guò)調(diào)度策略選擇合適的server
- 改寫(xiě)http的ip和port來(lái)定向調(diào)用
源碼分析
1. http請(qǐng)求攔截
首先介紹ribbon是如何進(jìn)行給請(qǐng)求增加攔截器温圆,需要用到LoadBalancerAutoConfiguration
LoadBalancerAutoConfiguration
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
LoadBalancerAutoConfiguration的restTemplates通過(guò)@LoadBalanced注解,將所有帶有此注解的RestTemplate的對(duì)象都注入到list中孩革。@LoadBalanced內(nèi)部其實(shí)是繼承@Qualifier實(shí)現(xiàn)岁歉。
通過(guò)RestTemplateCustomizer來(lái)對(duì)每個(gè)restTemplate實(shí)現(xiàn)擴(kuò)展,將LoadBalancerInterceptor也加入到攔截器列表中
在整個(gè)restTemplate的調(diào)用過(guò)程中膝蜈,會(huì)從getForObject -> execute -> doExecute -> request.execute 中來(lái)對(duì)攔截器進(jìn)行處理锅移。
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
// ...
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
// ...
}
finally {
if (response != null) {
response.close();
}
}
}
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
一直調(diào)用到InterceptingRequestExecution的execute方法中熔掺,這里先判斷當(dāng)前是否有攔截器,如果存在則調(diào)用intercept非剃,我們這里通過(guò)LoadBalancerAutoConfiguration注入了LoadBalancerInterceptor置逻,具體看下LoadBalancerInterceptor的實(shí)現(xiàn)。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
這里的loadBalancer是在創(chuàng)建Interceptor時(shí)备绽,注入的RibbonLoadBalancerClient券坞,自動(dòng)注入的過(guò)程在RibbonAutoConfiguration中
RibbonAutoConfiguration
public class RibbonAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
}
至此完成了前期的請(qǐng)求調(diào)用攔截的部分,開(kāi)始進(jìn)入ribbon調(diào)度策略的選擇環(huán)節(jié)疯坤。
2. 調(diào)度策略
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
首先實(shí)例化ILoadBalancer的bean报慕,默認(rèn)會(huì)注入ZoneAwareLoadBalancer的bean,ZoneAwareLoadBalancer的繼承關(guān)系如下圖压怠。
ILoadBalancer的注入實(shí)現(xiàn)在RibbonClientConfiguration中。
RibbonClientConfiguration
// client的請(qǐng)求的默認(rèn)配置
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
// 調(diào)度規(guī)則
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
// 維持心跳ping
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
// 服務(wù)端提供的server列表
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
// server列表動(dòng)態(tài)更新
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
// loadbalance的客戶端對(duì)象
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// server列表選擇過(guò)濾器
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
return this.propertiesFactory.get(ServerListFilter.class, config, name);
}
ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
filter.initWithNiwsConfig(config);
return filter;
}
// loadbalancer的上下文配置
@Bean
@ConditionalOnMissingBean
public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
IClientConfig config, RetryHandler retryHandler) {
return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
}
// 重試處理器
@Bean
@ConditionalOnMissingBean
public RetryHandler retryHandler(IClientConfig config) {
return new DefaultLoadBalancerRetryHandler(config);
}
// server 檢查器(內(nèi)史晌)菌瘫,檢查是否安全端口,獲取基本信息等布卡。
@Bean
@ConditionalOnMissingBean
public ServerIntrospector serverIntrospector() {
return new DefaultServerIntrospector();
}
ZoneAwareLoadBalancer
的實(shí)例化還依賴的其他的bean雨让,如ZoneAvoidanceRule
、DummyPing
忿等、ConfigurationBasedServerList
栖忠、DefaultLoadBalancerRetryHandler
等
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer 在構(gòu)造方法中,會(huì)完成一系列的初始化動(dòng)作贸街,如serverList的設(shè)置庵寞、開(kāi)啟server刷新任務(wù),初始化loadBalanceStat薛匪,配置負(fù)載均衡策略捐川、ping策略、filter等操作逸尖。
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
restOfInit()
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
在restOfInit方法中古沥,有兩個(gè)關(guān)鍵方法:enableAndInitLearnNewServersFeature
和updateListOfServers
enableAndInitLearnNewServersFeature()
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
可以看到這個(gè)方法開(kāi)啟了一個(gè)schedule,這個(gè)schedule的用途是每隔一段時(shí)間刷新serverList娇跟。這里傳入了一個(gè)UpdateAction岩齿,這個(gè)UpdateAction是
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
通過(guò)調(diào)用updateListOfServers來(lái)刷新serverList,而刷新的時(shí)間間隔refreshIntervalMs默認(rèn)是30s苞俘。
updateListOfServers
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
這里的servers其實(shí)就是我們配置的listOfServers列表盹沈,在獲取到最新的servers之后,會(huì)調(diào)用updateAllServerList
來(lái)更新緩存苗胀。
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
首先在super.setSersList中襟诸,將全局的server列表瓦堵,也就是allServerList做更新,
在for循環(huán)中歌亲,調(diào)用getSingleServerStat
來(lái)刷新一下內(nèi)部cache菇用,最后設(shè)置zone。
這里需要說(shuō)明一下LoadBalancerStats
LoadBalancerStats
public class LoadBalancerStats implements IClientConfigAware {
private static final String PREFIX = "LBStats_";
String name;
// Map<Server,ServerStats> serverStatsMap = new ConcurrentHashMap<Server,ServerStats>();
volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
private volatile CachedDynamicIntProperty connectionFailureThreshold;
private volatile CachedDynamicIntProperty circuitTrippedTimeoutFactor;
private volatile CachedDynamicIntProperty maxCircuitTrippedTimeout;
private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES =
DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30);
private final LoadingCache<Server, ServerStats> serverStatsCache =
CacheBuilder.newBuilder()
.expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<Server, ServerStats>() {
@Override
public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
notification.getValue().close();
}
})
.build(
new CacheLoader<Server, ServerStats>() {
public ServerStats load(Server server) {
return createServerStats(server);
}
});
ribbon通過(guò)這個(gè)對(duì)象來(lái)記錄每個(gè)server的運(yùn)行特征和統(tǒng)計(jì)數(shù)據(jù)陷揪,如各種連接時(shí)間惋鸥,響應(yīng)時(shí)間,熔斷配置悍缠,請(qǐng)求次數(shù)卦绣,serverStat,zoneStat等飞蚓。
LoadBalancerStats的這些參數(shù)滤港,會(huì)作為負(fù)載均衡選擇策略的重要參考依據(jù)。
至此趴拧,ZoneAwareLoadBalancer
的bean實(shí)例化完成溅漾。
回到調(diào)度選擇的開(kāi)始處,在完成loadBalance的instance之后著榴,開(kāi)始選擇Server添履。
getServer
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
chooseServer
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
選擇分兩部分
- 如果沒(méi)有配置zone,即只有一個(gè)zone脑又,直接調(diào)用super.chooseServer暮胧。
- 如果有一個(gè)以上的zone, 則根據(jù)zone的算法選擇一個(gè)server。
先看一下不使用zone算法的實(shí)現(xiàn)问麸。
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
增加調(diào)用次數(shù)往衷,并通過(guò)role的實(shí)現(xiàn)選擇一個(gè)server。
這里的zone是ZoneAvoidanceRule
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
內(nèi)部通過(guò)輪詢的機(jī)制口叙,來(lái)選擇server炼绘,除了ZoneAvoidanceRule的實(shí)現(xiàn)之外,還有提供了很多的其他rule策略妄田,如下圖:
比如WeighedResponseTimeRule就是通過(guò)響應(yīng)時(shí)間來(lái)動(dòng)態(tài)選擇server俺亮,具體算法的數(shù)據(jù)就是依賴LoadBalanceStat的數(shù)據(jù)統(tǒng)計(jì)。
3. uri改寫(xiě)和調(diào)用
繼續(xù)回到調(diào)度的最開(kāi)始疟呐,選擇好server之后脚曾,就這些對(duì)象封裝成RibbonServer,就繼續(xù)執(zhí)行execute方法启具。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
這個(gè)方法主要就是request.apply的調(diào)用本讥。
request是在前面的調(diào)用中,this.requestFactory.createRequest(request, body, execution) 來(lái)構(gòu)建。
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
};
}
可以看到這里通過(guò)定義了一個(gè)匿名內(nèi)部類拷沸,定義了apply這個(gè)方法的實(shí)現(xiàn)色查。
封裝了一個(gè)ServiceRequestWrapper,并提供了一個(gè)擴(kuò)展口撞芍,可以自定義LoadBalancerRequestTransformer來(lái)改變?cè)械囊恍?shí)現(xiàn)秧了,如可以改變之前的server選擇。
轉(zhuǎn)化完畢后序无,繼續(xù)執(zhí)行execute验毡。
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
這里重新創(chuàng)建了ClientHttpRequest,在getURI中帝嗡,重構(gòu)了uri的地址晶通,將之前uri中的服務(wù)名,改成了選擇的ip和端口哟玷,重新設(shè)置headers后狮辽,發(fā)起真正的request調(diào)用。
至此碗降,一個(gè)完成的請(qǐng)求的ribbon負(fù)載過(guò)程執(zhí)行完成隘竭。