前三節(jié)分析,我們主要分析和總結(jié)了Ribbon結(jié)合RestTemplate的使用,本節(jié)我們主要分析Ribbon結(jié)合Feign客戶端的使用蓖议,畢竟當(dāng)下使用Feign非常廣泛虏杰,那么下面我們就分析下Feign客戶端是如何使用Ribbon負(fù)載的,以下分析需要依賴OpenFeign依賴庫模塊
FeignRibbonClientAutoConfiguration配置文件
@ConditionalOnClass({ ILoadBalancer.class, Feign.class })
@Configuration
@AutoConfigureBefore(FeignAutoConfiguration.class)
@EnableConfigurationProperties({ FeignHttpClientProperties.class })
@Import({ HttpClientFeignLoadBalancedConfiguration.class,
OkHttpFeignLoadBalancedConfiguration.class,
DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {
@Bean
@Primary
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
public CachingSpringLoadBalancerFactory cachingLBClientFactory(
SpringClientFactory factory) {
return new CachingSpringLoadBalancerFactory(factory);
}
@Bean
@Primary
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
public CachingSpringLoadBalancerFactory retryabeCachingLBClientFactory(
SpringClientFactory factory,
LoadBalancedRetryFactory retryFactory) {
return new CachingSpringLoadBalancerFactory(factory, retryFactory);
}
//設(shè)置默認(rèn)請求選項(xiàng)拒担,從Ribbon的SpringClientFactory獲取IClientConfig配置參數(shù)
@Bean
@ConditionalOnMissingBean
public Request.Options feignRequestOptions() {
return LoadBalancerFeignClient.DEFAULT_OPTIONS;
}
}
當(dāng)前配置類我們關(guān)注下CachingSpringLoadBalancerFactory這個工廠類嘹屯,主要職責(zé)是獲取FeignLoadBalancer并緩存提供給LoadBalancerFeignClient使用,LoadBalancerFeignClient是Feign負(fù)載均衡客戶端的默認(rèn)實(shí)現(xiàn)
CachingSpringLoadBalancerFactory工廠類
public class CachingSpringLoadBalancerFactory {
private final SpringClientFactory factory;
private LoadBalancedRetryFactory loadBalancedRetryFactory = null;
private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();
public CachingSpringLoadBalancerFactory(SpringClientFactory factory) {
this.factory = factory;
}
public CachingSpringLoadBalancerFactory(SpringClientFactory factory, LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {
this.factory = factory;
this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;
}
public FeignLoadBalancer create(String clientName) {
FeignLoadBalancer client = this.cache.get(clientName);
if(client != null) {
return client;
}
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
}
代碼片段中我們著重關(guān)注下SpringClientFactory對象,這個對象在分析一中已經(jīng)講述過从撼,通過SpringClientFactory獲取Ribbon管理配置接口(IClientConfig)與Ribbon負(fù)載均衡接口(ILoadBalancer)最終生成Feign負(fù)載均衡實(shí)現(xiàn)類(FeignLoadBalancer)州弟,最終提供給LoadBalancerFeignClient使用,LoadBalancerFeignClient內(nèi)部會調(diào)用到ILoadBalancer.chooseServer達(dá)到負(fù)載均衡的效果低零,整個調(diào)用鏈會在下文闡述
DefaultFeignLoadBalancedConfiguration
@Configuration
class DefaultFeignLoadBalancedConfiguration {
@Bean
@ConditionalOnMissingBean
public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
SpringClientFactory clientFactory) {
return new LoadBalancerFeignClient(new Client.Default(null, null),
cachingFactory, clientFactory);
}
}
通過FeignRibbonClientAutoConfiguration的@Import({DefaultFeignLoadBalancedConfiguration.class.class})進(jìn)行加載婆翔,默認(rèn)的Feign客戶端接口實(shí)現(xiàn)類并且有負(fù)載均衡的效果
FeignAutoConfiguration
@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Autowired(required = false)
private List<FeignClientSpecification> configurations = new ArrayList<>();
@Bean
public HasFeatures feignFeature() {
return HasFeatures.namedFeature("Feign", Feign.class);
}
@Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}
@Configuration
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
......
}
在FeignRibbonClientAutoConfiguration之后被裝配,這個配置類我們只大概講述下HystrixFeignTargeterConfiguration配置類下的Targeter,這個的作用就是生成一個Feign客戶端代理類掏婶,這個代理類會實(shí)現(xiàn)Hystrix熔斷的功能啃奴,然后調(diào)用到最終的LoadBalancerFeignClient進(jìn)行請求,整個生成Feign代理類的調(diào)用鏈為@EnableFeignClients->FeignClientsRegistrar.registerBeanDefinitions->FeignClientsRegistrar.registerFeignClient->FeignClientFactoryBean.getObject->FeignClientFactoryBean.loadBalance->HystrixTargeter.target->Feign.target->HystrixFeign#build->ReflectiveFeign.newInstance->HystrixInvocationHandler.invoke->SynchronousMethodHandler.invoke對于整個調(diào)用鏈本節(jié)暫不過多講述雄妥,感興趣可以通過堆棧進(jìn)行跟蹤,具體分析會放到單獨(dú)講述Feign相關(guān)的分析進(jìn)行澄清
LoadBalancerFeignClient負(fù)載均衡實(shí)現(xiàn)類
public class LoadBalancerFeignClient implements Client {
static final Request.Options DEFAULT_OPTIONS = new Request.Options();
private final Client delegate;
private CachingSpringLoadBalancerFactory lbClientFactory;
private SpringClientFactory clientFactory;
......
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
IClientConfig getClientConfig(Request.Options options, String clientName) {
IClientConfig requestConfig;
if (options == DEFAULT_OPTIONS) {
requestConfig = this.clientFactory.getClientConfig(clientName);
} else {
requestConfig = new FeignOptionsClientConfig(options);
}
return requestConfig;
}
......
private FeignLoadBalancer lbClient(String clientName) {
return this.lbClientFactory.create(clientName);
}
......
}
通過上文我們得知最終生成的Feign代理類會調(diào)用到LoadBalancerFeignClient的execute方法,我們主要看executeWithLoadBalancer這個方法最蕾,通過負(fù)載均衡獲取結(jié)果
/**
* 當(dāng)調(diào)用者想要通過負(fù)載均衡分發(fā)請求到一個服務(wù)器時候可以使用這個方法,
* 通過recostructuriWithServer計算最終的URI老厌,而不是在請求URI中指定服務(wù)器
*/
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
......
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
//構(gòu)建LoadBalancerCommand命令行
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
//提交命令請求,并返回一個RxJava的被觀察者對象瘟则,通過阻塞方式獲取最終的結(jié)果
return command.submit(
new ServerOperation<T>() {
//通過LoadBalancerContext.getServerFromLoadBalancer獲取負(fù)載均衡的Server,
//通過Server svc = lb.chooseServer(loadBalancerKey);獲取負(fù)載均衡Server枝秤,
//如何負(fù)載均衡獲取Server可以查看[分析三](http://www.reibang.com/p/f076ab3e4031)
@Override
public Observable<T> call(Server server) {
//計算最終的URI
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//執(zhí)行RetryableFeignLoadBalancer.execute方法
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
//阻塞BlockingObservable
.toBlocking()
//返回結(jié)果
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
}
通過注釋我們得知時候通過阻塞方式獲取結(jié)果,下文會大致分析調(diào)用流程
LoadBalancerCommand命令類
public class LoadBalancerCommand<T> {
......
//創(chuàng)建被觀察者
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
//通過負(fù)載均衡方法獲取Server
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
//發(fā)射數(shù)據(jù)
next.onNext(server);
//結(jié)束當(dāng)前被觀察者
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
......
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//執(zhí)行return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
//發(fā)生錯誤時繼續(xù)執(zhí)行下一個
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
}
我們看見LoadBalancerCommand. submit方法內(nèi)部醋拧,最終執(zhí)行return operation.call(server)->RetryableFeignLoadBalancer.execute從下往上返回Observable<T>返回值,最終返回給最外層淀弹,最外層獲取得到Observable<T>然后.toBlocking().single()通過內(nèi)部阻塞方式獲取最終請求的返回結(jié)果,這個請求方式涉及RxJava的被觀察者與觀察者模式丹壕,所以更多RxJava使用方法請參考RxJava相關(guān)資料
BlockingObservable
public final class BlockingObservable<T> {
......
private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
//線程等待其他線程各自執(zhí)行完畢后再執(zhí)行
final CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
//觀察者,訂閱的數(shù)據(jù)為LoadBalancerCommand. selectServer返回的薇溃,調(diào)用subscribe開啟訂閱菌赖,被觀察者開始執(zhí)行call回調(diào)
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(final Throwable e) {
returnException.set(e);
latch.countDown();
}
@Override
public void onNext(final T item) {
returnItem.set(item);
}
});
//阻塞等待,直到latch計數(shù)器的值為0
BlockingUtils.awaitForComplete(latch, subscription);
if (returnException.get() != null) {
Exceptions.propagate(returnException.get());
}
//返回最終數(shù)據(jù)
return returnItem.get();
}
}
RetryableFeignLoadBalancer
public class RetryableFeignLoadBalancer extends FeignLoadBalancer implements ServiceInstanceChooser {
@Override
public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
throws IOException {
......
return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
@Override
public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
Request feignRequest = null;
if (retryContext instanceof LoadBalancedRetryContext) {
ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
if (service != null) {
feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
}
}
if (feignRequest == null) {
feignRequest = request.toRequest();
}
Response response = request.client().execute(feignRequest, options);
if (retryPolicy.retryableStatusCode(response.status())) {
byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
response.close();
throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
byteArray, request.getUri());
}
//返回最終的結(jié)果
return new RibbonResponse(request.getUri(), response);
}
}, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
@Override
protected RibbonResponse createResponse(Response response, URI uri) {
return new RibbonResponse(uri, response);
}
});
}
}
通過return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));請求就得到了最終的請求結(jié)果
經(jīng)過四節(jié)分析,總結(jié)了RestTemplate與Feign在使用Ribbon時候如何進(jìn)行負(fù)載均衡相關(guān)的知識痊焊,后續(xù)會繼續(xù)分析和總結(jié)Hystrix盏袄、Feign、Zuul等相關(guān)組件薄啥!