目前,Spring Cloud已在南京公司推廣開(kāi)來(lái)姥芥,不僅如此兔乞,深圳那邊近期也要基于Spring Cloud新開(kāi)微服務(wù)了。
于是凉唐,領(lǐng)導(dǎo)要求我出一套基于Spring Cloud的快速開(kāi)發(fā)腳手架(近期開(kāi)源)庸追。在編寫(xiě)腳手架的過(guò)程中,也順帶總結(jié)一下以前在項(xiàng)目中遇到的問(wèn)題:
使用Hystrix時(shí)台囱,如何傳播ThreadLocal對(duì)象淡溯?
我們知道,Hystrix有隔離策略:THREAD以及SEMAPHORE簿训。
如果你不知道Hystrix的隔離策略咱娶,可以閱讀我的書(shū)籍《Spring Cloud與Docker微服務(wù)架構(gòu)實(shí)戰(zhàn)》,或者參考文檔:https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy
引子
當(dāng)隔離策略為 THREAD
時(shí)强品,是沒(méi)辦法拿到 ThreadLocal
中的值的膘侮。
舉個(gè)例子,使用Feign調(diào)用某個(gè)遠(yuǎn)程API的榛,這個(gè)遠(yuǎn)程API需要傳遞一個(gè)Header琼了,這個(gè)Header是動(dòng)態(tài)的,跟你的HttpRequest相關(guān)夫晌,我們選擇編寫(xiě)一個(gè)攔截器來(lái)實(shí)現(xiàn)Header的傳遞(當(dāng)然也可以在Feign Client接口的方法上加RequestHeader
)雕薪。
示例代碼:
public class KeycloakRequestInterceptor implements RequestInterceptor {
private static final String AUTHORIZATION_HEADER = "Authorization";
@Override
public void apply(RequestTemplate template) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
Principal principal = attributes.getRequest().getUserPrincipal();
if (principal != null && principal instanceof KeycloakPrincipal) {
KeycloakSecurityContext keycloakSecurityContext = ((KeycloakPrincipal) principal)
.getKeycloakSecurityContext();
if (keycloakSecurityContext instanceof RefreshableKeycloakSecurityContext) {
RefreshableKeycloakSecurityContext.class.cast(keycloakSecurityContext)
.refreshExpiredToken(true);
template.header(AUTHORIZATION_HEADER, "Bearer " + keycloakSecurityContext.getTokenString());
}
}
// 否則啥都不干
}
}
你可能不知道Keycloak是什么,不過(guò)沒(méi)有關(guān)系晓淀,相信這段代碼并不難閱讀所袁,該攔截器做了幾件事:
- 使用
RequestContextHolder.getRequestAttributes()
靜態(tài)方法獲得Request。 - 從Request獲得當(dāng)前用戶(hù)的身份凶掰,然后使用Keycloak的API拿到Token燥爷,并扔到Header里蜈亩。
- 這樣,F(xiàn)eign使用這個(gè)攔截器時(shí)局劲,就會(huì)用你這個(gè)Header去請(qǐng)求了勺拣。
注:Keycloak是一個(gè)非常容易上手,并且功能強(qiáng)大的單點(diǎn)認(rèn)證平臺(tái)鱼填。
現(xiàn)實(shí)很骨感
以上代碼可完美運(yùn)行——但僅限于Feign不開(kāi)啟Hystrix支持時(shí)药有。
注:Spring Cloud Dalston以及更高版可使用
feign.hystrix.enabled=true
為Feign開(kāi)啟Hystrix支持。
當(dāng)Feign開(kāi)啟Hystrix支持時(shí)苹丸,
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
是 null
愤惰。
原因在于,Hystrix的默認(rèn)隔離策略是THREAD
赘理。而 RequestContextHolder
源碼中宦言,使用了兩個(gè)血淋淋的ThreadLocal
。
解決方案一:調(diào)整隔離策略
將隔離策略設(shè)為SEMAPHORE即可:
hystrix.command.default.execution.isolation.strategy: SEMAPHORE
這樣配置后商模,F(xiàn)eign可以正常工作奠旺。
但該方案不是特別好。原因是Hystrix官方強(qiáng)烈建議使用THREAD作為隔離策略施流! 參考文檔:
Thread or Semaphore
The default, and the recommended setting, is to run
HystrixCommand
s using thread isolation (THREAD
) andHystrixObservableCommand
s using semaphore isolation (SEMAPHORE
).Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.
Generally the only time you should use semaphore isolation for
HystrixCommand
s is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.
于是响疚,那么有沒(méi)有更好的方案呢?
解決方案二:自定義并發(fā)策略
既然Hystrix不太建議使用SEMAPHORE作為隔離策略瞪醋,那么是否有其他方案呢忿晕?答案是自定義并發(fā)策略,目前银受,Spring Cloud Sleuth以及Spring Security都通過(guò)該方式傳遞 ThreadLocal
對(duì)象践盼。
下面我們來(lái)編寫(xiě)自定義的并發(fā)策略。
編寫(xiě)自定義并發(fā)策略
編寫(xiě)自定義并發(fā)策略比較簡(jiǎn)單宾巍,只需編寫(xiě)一個(gè)類(lèi)咕幻,讓其繼承HystrixConcurrencyStrategy
,并重寫(xiě)wrapCallable
方法即可顶霞。
代碼示例:
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Log log = LogFactory.getLog(RequestHystrixConcurrencyStrategy.class);
public RequestHystrixConcurrencyStrategy() {
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
如代碼所示谅河,我們編寫(xiě)了一個(gè)RequestHystrixConcurrencyStrategy
,在其中:
-
wrapCallable
方法拿到RequestContextHolder.getRequestAttributes()
确丢,也就是我們想傳播的對(duì)象; - 在
WrappedCallable
類(lèi)中吐限,我們將要傳播的對(duì)象作為成員變量鲜侥,并在其中的call方法中,為靜態(tài)方法設(shè)值诸典。 - 這樣描函,在Hystrix包裹的方法中,就可以使用
RequestContextHolder.getRequestAttributes()
獲取到相關(guān)屬性——也就是說(shuō),可以拿到RequestContextHolder
中的ThreadLocal
屬性舀寓。
經(jīng)過(guò)測(cè)試胆数,代碼能正常工作。
新的問(wèn)題
至此互墓,我們已經(jīng)實(shí)現(xiàn)了ThreadLocal
屬性的傳遞必尼,然而Hystrix只允許有一個(gè)并發(fā)策略!這意味著——如果不做任何處理篡撵,Sleuth判莉、Spring Security將無(wú)法正常拿到上下文!(上文說(shuō)過(guò)育谬,目前Sleuth券盅、Spring Security都是通過(guò)自定義并發(fā)策略的方式來(lái)傳遞ThreadLocal對(duì)象的。)
如何解決這個(gè)問(wèn)題呢膛檀?
我們知道锰镀,Spring Cloud中,Spring Cloud Security與Spring Cloud Sleuth是可以共存的咖刃!我們不妨參考下Sleuth以及Spring Security的實(shí)現(xiàn):
- Sleuth:
org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy
- Spring Security:
org.springframework.cloud.netflix.hystrix.security.SecurityContextConcurrencyStrategy
閱讀完后泳炉,你將恍然大悟——于是,我們可以模仿它們的寫(xiě)法僵缺,改寫(xiě)上文編寫(xiě)的并發(fā)策略:
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Log log = LogFactory.getLog(RequestAttributeHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
簡(jiǎn)單講解下:
- 將現(xiàn)有的并發(fā)策略作為新并發(fā)策略的成員變量
- 在新并發(fā)策略中胡桃,返回現(xiàn)有并發(fā)策略的線(xiàn)程池、Queue磕潮。
Pull Request
筆者已將該實(shí)現(xiàn)方式Pull Request:https://github.com/spring-cloud/spring-cloud-netflix/pull/2509 翠胰,希望官方能夠接納,也希望在不久的將來(lái)自脯,能夠更舒服之景、更爽地使用Spring Cloud。
PS. Pull Request的代碼跟博客中的代碼略有區(qū)別膏潮,有少量簡(jiǎn)單的優(yōu)化锻狗,主要是增加了一個(gè)開(kāi)關(guān)。
靈感來(lái)自
- https://stackoverflow.com/questions/34062900/forward-a-request-header-with-a-feign-client-requestinterceptor
- https://stackoverflow.com/questions/34719809/unreachable-security-context-using-feign-requestinterceptor
- https://github.com/spring-cloud/spring-cloud-netflix/pull/1379
- https://github.com/Writtscher/kanoon
原文首發(fā)
http://www.itmuch.com/spring-cloud-sum/hystrix-threadlocal/