RestTemplate簡單示例
注冊restTemplate
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
使用restTemplate發(fā)送GET請求
@RequestMapping("/ask")
public String getHello(@RequestParam(value = "name",required = false) String name) {
return restTemplate.getForEntity("http://SERVICE-NAME/hello?hello={1}",String.class,name).getBody();
}
源碼分析
通過@LoadBalanced注解可知該注解用來給restTemplate做標(biāo)記炸站,使用客戶端負(fù)載均衡進(jìn)行配置,搜索LoadBalancerClient類
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
ServiceInstanceChooser用來根據(jù)serviceId進(jìn)行選擇服務(wù)
public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
}
通過以上得知疚顷,客戶端負(fù)載均衡器應(yīng)具備的幾種能力
- 根據(jù)傳入的服務(wù)名旱易,從負(fù)載均衡器中選擇一個(gè)對應(yīng)服務(wù)的實(shí)例
- 從選擇的負(fù)載均衡器中的服務(wù)實(shí)例來發(fā)送請求
- 構(gòu)建一個(gè)合適請求URI
LoadBalancerAutoConfiguration為實(shí)現(xiàn)客戶端負(fù)載均衡器的自動(dòng)化配置類
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
...
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> {
restTemplateCustomizers.ifAvailable((customizers) -> {
Iterator var2 = this.restTemplates.iterator();
while(var2.hasNext()) {
RestTemplate restTemplate = (RestTemplate)var2.next();
Iterator var4 = customizers.iterator();
while(var4.hasNext()) {
RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
customizer.customize(restTemplate);
}
}
});
};
}
...
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
該配置類需要滿足兩個(gè)條件
- RestTemplate必須在工程目錄下
- 必須有LoadBalancerClient類的實(shí)例
在示例中沒有retryTemplate,則會(huì)創(chuàng)建一個(gè)LoadBalancerInterceptor對象
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
在LoadBalancerInterceptor的構(gòu)造方法中注入了LoadBalancerClient的實(shí)現(xiàn)
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
當(dāng)一個(gè)被@LoadBalance標(biāo)注的RestTemplate向外發(fā)送Http請求時(shí)腿堤,會(huì)被interceptor攔截阀坏,調(diào)用execute發(fā)起實(shí)際請求
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
LoadBalancerClient的具體實(shí)現(xiàn)為org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient
execute方法
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
Server server = this.getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
}
}
首先根據(jù)傳入的serviceId獲取具體的服務(wù)實(shí)例
根據(jù)getServer接口,可知并沒有使用choose函數(shù)笆檀,而是使用了ribbon本身的chooseServer函數(shù)
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
}
在ribbonClientConfiguration中可知默認(rèn)采用了ZoneAwareLoadBalancer實(shí)現(xiàn)負(fù)載均衡器
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}
回到RibbonBalancerClient的execute函數(shù)
通過ZoneAwareLoadBalancer的chooseServer獲取負(fù)載均衡策略分配到的服務(wù)實(shí)例后忌堂,將其包裝成RibbonServer對象,RibbonServer對象除了包含服務(wù)實(shí)例對象外募谎,還包含serviceId等信息心墅,然后使用該對象回調(diào)LoadBalancerInteceptor請求攔截器中LoadBalancerRequest的apply函數(shù)耳鸯,向一個(gè)實(shí)際的具體服務(wù)發(fā)起請求
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
...
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
} catch (IOException var8) {
statsRecorder.recordStats(var8);
throw var8;
} catch (Exception var9) {
statsRecorder.recordStats(var9);
ReflectionUtils.rethrowRuntimeException(var9);
return null;
}
}
}
ServiceInstance對象是對服務(wù)的抽象定義,包含了每個(gè)服務(wù)實(shí)例需要提供一些基礎(chǔ)信息
public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
}
而RibbonServer就是ServiceInstance的實(shí)現(xiàn)棋嘲,除了Server實(shí)例,還包含了serviceId,是否使用https矩桂,和一個(gè)map的元數(shù)據(jù)
public static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
...
}
在LoadBalanceRequest的工廠方法中
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
return (instance) -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
LoadBalancerRequestTransformer transformer;
if (this.transformers != null) {
for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
transformer = (LoadBalancerRequestTransformer)var6.next();
}
}
return execution.execute((HttpRequest)serviceRequest, body);
};
}
傳入了一個(gè)ServiceRequestWrapper對象
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
...
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());
return uri;
}
}
在LoadBalanceRequest的工廠方法中的execute具體執(zhí)行時(shí)沸移,會(huì)調(diào)用InteceptingClientHttpRequest下的execute函數(shù)
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
而這里面的request.getUri則會(huì)調(diào)用ServiceRequestWrapper中LoadBalancerClient的getUri函數(shù),至此構(gòu)建了一個(gè)服務(wù)治理環(huán)境的訪問URI,完成代理訪問
負(fù)載均衡器
AbstractLoadBalancer
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public AbstractLoadBalancer() {
}
public Server chooseServer() {
return this.chooseServer((Object)null);
}
public abstract List<Server> getServerList(AbstractLoadBalancer.ServerGroup var1);
public abstract LoadBalancerStats getLoadBalancerStats();
public static enum ServerGroup {
ALL,
STATUS_UP,
STATUS_NOT_UP;
private ServerGroup() {
}
}
}
是ILoadBalancer的抽象實(shí)現(xiàn),定義了服務(wù)的分組枚舉類雹锣,實(shí)現(xiàn)了chooseServer流妻,參數(shù)key為null,表示在選擇具體服務(wù)實(shí)例時(shí)忽略key的條件判斷
LoadBalancerStats用來記錄負(fù)載均衡器中哥哥服務(wù)實(shí)例的屬性和統(tǒng)計(jì)信息
getServerList根據(jù)分組類型來獲取不同的服務(wù)實(shí)例列表
BaseLoadBalancer
是ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類笆制,在該類中定義了很多關(guān)于負(fù)載均衡器相關(guān)的基礎(chǔ)內(nèi)容
定義了所有服務(wù)清單和正常清單
@Monitor(
name = "LoadBalancer_AllServerList",
type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> allServerList;
@Monitor(
name = "LoadBalancer_UpServerList",
type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> upServerList;
定義了檢查服務(wù)實(shí)例操作的執(zhí)行策略對象绅这,默認(rèn)為SerialPingStrategy
private static final BaseLoadBalancer.SerialPingStrategy DEFAULT_PING_STRATEGY = new BaseLoadBalancer.SerialPingStrategy((SyntheticClass_1)null);
SerialPingStrategy默認(rèn)采用遍歷方式,性能欠佳在辆,如有需要证薇,可以實(shí)現(xiàn)IPingStrategy并重寫pingServers方法
private static class SerialPingStrategy implements IPingStrategy {
private SerialPingStrategy() {
}
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
BaseLoadBalancer.logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for(int i = 0; i < numCandidates; ++i) {
results[i] = false;
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception var7) {
BaseLoadBalancer.logger.error("Exception while pinging Server: '{}'", servers[i], var7);
}
}
return results;
}
}
定義了負(fù)載均衡的處理IRule對象,負(fù)載均衡策略委托給IRule匆篓,而默認(rèn)的實(shí)現(xiàn)為RoundRobinRule
...
private static final IRule DEFAULT_RULE = new RoundRobinRule();
...
protected IRule rule;
...
public BaseLoadBalancer() {
...
this.rule = DEFAULT_RULE;
...
}
public Server chooseServer(Object key) {
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
if (this.rule == null) {
return null;
} else {
try {
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}
啟動(dòng)ping任務(wù)
class PingTask extends TimerTask {
PingTask() {
}
public void run() {
try {
(BaseLoadBalancer.this.new Pinger(BaseLoadBalancer.this.pingStrategy)).runPinger();
} catch (Exception var2) {
BaseLoadBalancer.logger.error("LoadBalancer [{}]: Error pinging", BaseLoadBalancer.this.name, var2);
}
}
}
DynamicServerListLoadBalancer
是BaseLoadBalancer的拓展浑度,該負(fù)載均衡器提供了運(yùn)行期間動(dòng)態(tài)更新服務(wù)實(shí)例清單的能力,在成員定義中可發(fā)現(xiàn)
volatile ServerList<T> serverListImpl;
T在類名中限定為Server的子類
public class DynamicServerListLoadBalancer<T extends Server>
ServerList提供了兩個(gè)抽象方法
public interface ServerList<T extends Server> {
//獲取初始化服務(wù)實(shí)例清單
List<T> getInitialListOfServers();
//獲取更新的服務(wù)實(shí)例清單
List<T> getUpdatedListOfServers();
}
在spring cloud整合ribbon和eureka的包下搜索EurekaRibbonClientConfiguration
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) {
return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId);
} else {
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
}
DiscoveryEnabledNIWSServerList內(nèi)部獲取serverlist通過內(nèi)部私有函數(shù)obtainServersViaDiscovery
public List<DiscoveryEnabledServer> getInitialListOfServers() {
return this.obtainServersViaDiscovery();
}
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
return this.obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList();
if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
if (this.vipAddresses != null) { //服務(wù)名
String[] var3 = this.vipAddresses.split(",");
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
String vipAddress = var3[var5];
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
Iterator var8 = listOfInstanceInfo.iterator();
while(var8.hasNext()) {
InstanceInfo ii = (InstanceInfo)var8.next();
if (ii.getStatus().equals(InstanceStatus.UP)) { //判斷狀態(tài)
if (this.shouldUseOverridePort) {
if (logger.isDebugEnabled()) {
logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if (this.isSecure) {
ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
} else {
ii = (new Builder(copy)).setPort(this.overridePort).build();
}
}
DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
break;
}
}
}
return serverList;
} else {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList();
}
}
對服務(wù)進(jìn)行遍歷鸦概,如果狀態(tài)為UP箩张,轉(zhuǎn)換成DiscoveryEnabledServer對象,組織成list返回窗市。
DiscoveryEnabledNIWSServerList從eureka中拿到server列表后先慷,繼續(xù)通過DomainExtractingServerList的setZone進(jìn)行處理,加入了一些必要屬性
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
Iterator var5 = servers.iterator();
while(var5.hasNext()) {
DiscoveryEnabledServer server = (DiscoveryEnabledServer)var5.next();
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
}
return result;
}
更新服務(wù)待續(xù)咨察,沒看明白
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer繼承自DynamicServerListLoadBalancer并重寫了setServerListForZones
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (this.balancers == null) {
this.balancers = new ConcurrentHashMap();
}
Iterator var2 = zoneServersMap.entrySet().iterator();
Entry existingLBEntry;
while(var2.hasNext()) {
existingLBEntry = (Entry)var2.next();
String zone = ((String)existingLBEntry.getKey()).toLowerCase();
this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue());
}
var2 = this.balancers.entrySet().iterator();
while(var2.hasNext()) {
existingLBEntry = (Entry)var2.next();
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList());
}
}
}
負(fù)載均衡策略
- AbstractLoadBalanceRule:抽象負(fù)載均衡器
- RandomRule:隨機(jī)實(shí)例
- RoundRobinRule:輪詢
- RetryRule:重試论熙,內(nèi)部默認(rèn)使用RoundRobinRule
- WeightedResponseTimeRule:權(quán)重
- BestAvailableRule:空閑
- PredicateBasedRule:抽象策略
- AvailabilityFilteringRule:PredicateBasedRule的實(shí)現(xiàn),先過濾摄狱,后輪詢
- ZoneAvoidanceRule:PredicateBasedRule的實(shí)現(xiàn)
個(gè)性化配置
@Bean
IPing ping(){
return new PingUrl();
}
@Bean
IRule iRule() {
return new ZoneAvoidanceRule();
}