使用場景草巡,往往我們使用theadLocal存取用戶登陸信息增淹,但是當(dāng)開啟hystrix時使用線程隔離模式,會使用對應(yīng)線程池內(nèi)的線程執(zhí)行feignClient的方法倔韭,那么就會導(dǎo)致threadLocal丟失
通過百度以及看源碼可以發(fā)現(xiàn)hystrix提供了HystrixPlugins恳谎,可以看到他的方法
我們先來看看HystrixPlugins暴露的方法
// 可以看到 HystrixPlugins 提供了很多東西芝此,包括線程策略,鉤子因痛,事件婚苹,等等。鸵膏。
// 首先第一感覺 鉤子是可以用的
HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
首先看看HystrixCommandExecutionHook 的方法膊升,以及執(zhí)行時機(jī)
/**
* Invoked before {@link HystrixInvokable} begins executing.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.2
*/
public <T> void onStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param failureType {@link FailureType} enum representing which type of error
* @param e exception object
*
* @since 1.2
*/
public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
return e; //by default, just pass through
}
/**
* Invoked when {@link HystrixInvokable} finishes a successful execution.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
*
* @param commandInstance The executing HystrixCommand instance.
*
* @since 1.2
*/
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
* This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
* naturally, or was unsubscribed externally
*
* @param commandInstance The executing HystrixCommand instance.
*
* @since 1.2
*/
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
// do nothing by default
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} starts.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param e exception object
*
* @since 1.4
*/
public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
return e; //by default, just pass through
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} starts.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.2
*/
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param e exception object
*
* @since 1.2
*/
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
//by default, just pass through
return e;
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
*
* @param commandInstance The executing HystrixCommand
*
* @since 1.4
*/
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked with the command is unsubscribed before a terminal state
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.5.9
*/
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
通過查找文檔以及 測試確定了我們需要的方法,這是我們可以寫一個自己實(shí)現(xiàn)的鉤子,但是這個鉤子是用不同的執(zhí)行時機(jī)的回調(diào)來實(shí)現(xiàn)谭企,那么我們需要想辦法把需要傳遞的信息從前面鉤子的方法傳到后面的鉤子方法廓译,簡單的方式就是用ConcurrentMap來進(jìn)行映射,但是沒有合適的key,那么我們找到了一個類HystrixRequestContext,其實(shí)使用ConcurrentMap映射是一個比較蠢的方式债查,也就是實(shí)在沒辦法了再這樣非区,正常應(yīng)該是像TTL那樣做一個裝飾器,在成員變量里操作盹廷,或者統(tǒng)一的地方征绸,那么我們又繼續(xù)找到了HystrixRequestVariableDefault 這個類在操作HystrixRequestContext,再看一下源碼速和,它是通過包內(nèi)方法來操作HystrixRequestContext的threadLocal變量
public void set(T value) {
// 這個方法沒有暴露給我們使用歹垫,而是提供了一個操作器
HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}
下面看看我們的鉤子模樣
public class MyHystrixHook extends HystrixCommandExecutionHook {
private final HystrixRequestVariableDefault <Long> reuqestVariable = new HystrixRequestVariableDefault<>();
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
//這里是hystrix 執(zhí)行對應(yīng)feignClient方法時開始時的鉤子,那么就是在當(dāng)前主線程內(nèi)操作
// 先初始化 hystrix的線程私有變量容器
HystrixRequestContext.initializeContext();
// 你要傳遞的 變量颠放,其實(shí)就是從當(dāng)前主線程的threadLocal中獲取到的
reuqestVariable.set(111L);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
// 這里是你的 FeignClient方法執(zhí)行失敗后的回調(diào)鉤子,需要清理你操作的數(shù)據(jù)
// 需要清理 子線程的threadLocal以及HystrixRequestContext
HystrixRequestContext.getContextForCurrentThread().shutdown();
return e; //by default, just pass through
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
// 這里是你的 FeignClient方法執(zhí)行成功后的回調(diào)鉤子,需要清理你操作的數(shù)據(jù)
// 需要清理 子線程的threadLocal以及HystrixRequestContext
HystrixRequestContext.getContextForCurrentThread().shutdown();
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
// 這里已經(jīng)是子線程了排惨,執(zhí)行目標(biāo) feignClient方法前
Long tenantId = requestVariableDefault.get();
// 這里你可以獲取到想要的信息,可以存到threadLocal保證后續(xù)方法的一致性
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
// 這里是熔斷回調(diào)方法開始碰凶,也可以管理threadLocal的信息傳遞暮芭, 和上述onStart方法一致
}
@Override
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
//by default, just pass through
// 熔斷回調(diào)失敗 的鉤子,清理即可
return e;
}
@Override
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
// 熔斷回調(diào)成功 的鉤子欲低,清理即可
}
}
同時怎么才能將我們自定義的鉤子讓框架去調(diào)用呢辕宏?
// 直接在項(xiàng)目啟動時 調(diào)用這個方法 會報錯,那么直接這樣是不行的砾莱。
// 而且我們也可以看到HystrixPlugins.reset() 方法重置瑞筐,那么再看看getInstance方法內(nèi)部
HystrixPlugins.getInstance().registerCommandExecutionHook(你的鉤子);
hystrix的spi體系
下面來看鉤子的實(shí)例獲取方法
public HystrixCommandExecutionHook getCommandExecutionHook() {
// 這里可以看到hystrix各種類型的插件默認(rèn)只有一個,但是我們可以在自定義的插件套一層裝飾器來實(shí)現(xiàn)多個相同類型的插件對方法進(jìn)行增強(qiáng)(seata集成hystrix中使用到此方式)
if (commandExecutionHook.get() == null) {
// check for an implementation from Archaius first
// 內(nèi)部spi的方式獲取實(shí)例
Object impl = getPluginImplementation(HystrixCommandExecutionHook.class);
if (impl == null) {
// cas的方式來保證多個組件或?qū)崿F(xiàn)只有一個實(shí)例正常放入
commandExecutionHook.compareAndSet(null, HystrixCommandExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from Archaius so use it
commandExecutionHook.compareAndSet(null, (HystrixCommandExecutionHook) impl);
}
}
return commandExecutionHook.get();
}
private <T> T getPluginImplementation(Class<T> pluginClass) {
// hystrix 自己提供的 配置文件spi方式獲取實(shí)例
T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
if (p != null) return p;
// 利用jdk 自動的 serviceLoader方式獲取實(shí)例
// serviceLoader可以參考 http://www.reibang.com/p/ccfd80d407ef
return findService(pluginClass, classLoader);
}
我們來看看hystrix配置方式的spi, 其實(shí)就是讀取hystrix-plugins.properties中文件讀取到key value腊瑟,例如我們替換hook的配置就為
hystrix.plugin.HystrixConcurrencyStrategy.implementation=xxx.xxx.MyTestHystrix
private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {
String classSimpleName = pluginClass.getSimpleName();
// Check Archaius for plugin class.
String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";
String implementingClass = dynamicProperties.getString(propertyName, null).get();
if (implementingClass != null) {
try {
Class<?> cls = Class.forName(implementingClass);
// narrow the scope (cast) to the type we're expecting
cls = cls.asSubclass(pluginClass);
return (T) cls.newInstance();
} catch (ClassCastException e) {
throw new RuntimeException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass);
} catch (ClassNotFoundException e) {
throw new RuntimeException(classSimpleName + " implementation class not found: " + implementingClass, e);
} catch (InstantiationException e) {
throw new RuntimeException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);
} catch (IllegalAccessException e) {
throw new RuntimeException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);
}
} else {
return null;
}
}
關(guān)于seata集成hystrix以及feign的實(shí)現(xiàn)
上述使用鉤子 + spi 替換自己的鉤子方式實(shí)現(xiàn)聚假,但是我們還可以觀察到HystrixConcurrencyStrategy#wrapCallable方法块蚌,這不就是妥妥的一個線程執(zhí)行器的裝飾器預(yù)留的方法嘛?很顯然也可以通過這種方式來實(shí)現(xiàn)
如果你引入了spring-cloud-starter-alibaba-seata的話膘格,可以看到線程策略的一個實(shí)現(xiàn)com.alibaba.cloud.seata.feign.hystrix.SeataHystrixConcurrencyStrategy
下面來看看代碼
public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private final Logger logger = LoggerFactory
.getLogger(SeataHystrixConcurrencyStrategy.class);
// 這里是seata自定義線程策略的被裝飾的對象峭范,那么其實(shí)是允許多個同樣插件存在,不過是通過裝飾器包裹后層層增強(qiáng)
private HystrixConcurrencyStrategy delegate;
public SeataHystrixConcurrencyStrategy() {
try {
// 這里通過spi獲取實(shí)例
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof SeataHystrixConcurrencyStrategy) {
// 如果已經(jīng)是當(dāng)前實(shí)現(xiàn)則不做任何操作
return;
}
// 這里會重新獲取所有其它插件瘪贱,然后重置后統(tǒng)一再注冊進(jìn)去
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception ex) {
logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (logger.isDebugEnabled()) {
logger.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
logger.debug("Registering Seata Hystrix Concurrency Strategy.");
}
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
// 這里是上面提到的 線程執(zhí)行的裝飾增強(qiáng)
@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
// 如果是已經(jīng)裝飾了 提前返回
if (c instanceof SeataContextCallable) {
return c;
}
Callable<K> wrappedCallable;
if (this.delegate != null) {
// 如果有其它的自定義的插件纱控,需要再被裝飾一層
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
// 如果是已經(jīng)裝飾了 提前返回
if (wrappedCallable instanceof SeataContextCallable) {
return wrappedCallable;
}
// 真正的對其裝飾
return new SeataContextCallable<>(wrappedCallable,
RequestContextHolder.getRequestAttributes());
}
private static class SeataContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
// 哈哈,我們看到了 seata對線程策略的線程執(zhí)行裝飾的真正目的菜秦,用于傳遞seata全局事務(wù)id
private final String xid;
private final RequestAttributes requestAttributes;
SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;
this.requestAttributes = requestAttribute;
// 當(dāng)前還是主線程在執(zhí)行甜害,所以直接從當(dāng)前線程獲取全局事務(wù)id
this.xid = RootContext.getXID();
}
@Override
public K call() throws Exception {
// 典型的裝飾增強(qiáng),這里已經(jīng)是子線程在執(zhí)行了
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
RootContext.bind(xid);
return actual.call();
}
finally {
RootContext.unbind();
RequestContextHolder.resetRequestAttributes();
}
}
}
}
那么我們自己可不可以用他這種方式實(shí)現(xiàn)threadLocal傳遞呢喷户,答案是可以的唾那,但是要注意的是循環(huán)創(chuàng)建問題,我們添加了一個volatile 變量防止自定義插件的構(gòu)造方法中通過spi獲取實(shí)例形成循環(huán)創(chuàng)建實(shí)例
// 邏輯和seata的自定義插件基本一致
public class MyTestHystrix extends HystrixConcurrencyStrategy {
private final Logger logger = LoggerFactory
.getLogger(SeataHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
private static volatile boolean alreadyInit = false;
public MyTestHystrix() {
// 這里添加一個 volatile變量防止構(gòu)造方法和spi獲取實(shí)例形成循環(huán)
if (alreadyInit) {
return;
}
alreadyInit = true;
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof MyTestHystrix) {
return;
}
HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception ex) {
logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
}
}
@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof MyTestHystrix.MyContext) {
return c;
}
Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
} else {
wrappedCallable = c;
}
if (wrappedCallable instanceof MyTestHystrix.MyContext) {
return wrappedCallable;
}
return new MyTestHystrix.MyContext<>(wrappedCallable,
RequestContextHolder.getRequestAttributes());
}
private static class MyContext<K> implements Callable<K> {
private final Callable<K> actual;
private final String tenantId;
private final RequestAttributes requestAttributes;
MyContext(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;
this.requestAttributes = requestAttribute;
this.tenantId = LoginInfoUtils.getTenantId();
}
@Override
public K call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
// 這里就是手動的將當(dāng)前子線程信息填充
LoginInfoUtils.fillLoginInfo(null, this.tenantId, null);
return actual.call();
} finally {
// 清理或者 還原(TTL的重放邏輯是還原)
LoginInfoUtils.remove();
RequestContextHolder.resetRequestAttributes();
}
}
}
}
以上就是另一種方式來實(shí)現(xiàn)threadLocal的傳遞褪尝,但是據(jù)其它文章描述通過callable的wrap方式并不能覆蓋到hystrix的fallback闹获,而鉤子的自定義實(shí)現(xiàn)可以,本人沒有去考證河哑,因?yàn)橹苯邮褂玫你^子方式
seata集成feign以及hystrix
先看hystrix的集成
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HystrixCommand.class)
public class SeataHystrixAutoConfiguration {
// 通過注入bean的方式 使用自定義的線程策略避诽,這樣的好處是可以通過裝飾器來拼裝多個自定義實(shí)現(xiàn)的插件
@Bean
SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {
return new SeataHystrixConcurrencyStrategy();
}
}
再來看feign的集成
再bean的注入會一層層的進(jìn)行裝飾器增強(qiáng)來完成我們需要的目標(biāo),即處理全局事務(wù)id
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class SeataFeignClientAutoConfiguration {
// 如果開啟了hystrix璃谨,返回一個feign構(gòu)造器
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
return SeataHystrixFeignBuilder.builder(beanFactory);
}
// 如果使用了阿里的 sentinal 返回一個feign構(gòu)造器
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
return SeataSentinelFeignBuilder.builder(beanFactory);
}
// 如果沒有開啟hystrix和sentinal 返回一個只有feign裝飾器的feign構(gòu)造器實(shí)例
@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
return SeataFeignBuilder.builder(beanFactory);
}
@Configuration(proxyBeanMethods = false)
protected static class FeignBeanPostProcessorConfiguration {
// 一個 beanPostProcessor 對所有spring 管理的bean進(jìn)行選擇性的裝飾
@Bean
SeataBeanPostProcessor seataBeanPostProcessor(
SeataFeignObjectWrapper seataFeignObjectWrapper) {
return new SeataBeanPostProcessor(seataFeignObjectWrapper);
}
// 對于feignContext修飾的一個 攔截處理器
@Bean
SeataContextBeanPostProcessor seataContextBeanPostProcessor(
BeanFactory beanFactory) {
return new SeataContextBeanPostProcessor(beanFactory);
}
// seata自己的真正選擇某些bean進(jìn)行裝飾的類
@Bean
SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
return new SeataFeignObjectWrapper(beanFactory);
}
}
}
SeataContextBeanPostProcessor 和 SeataBeanPostProcessor 分別為feignContext沙庐,feignClient裝飾,這兩個都是利用了beanPostProcessor這個鉤子所有spring管理的bean都會判斷是否需要處理佳吞,如果是想要處理的bean則進(jìn)行裝飾
下面來看看SeataFeignObjectWrapper裝飾邏輯
// 首先是一個包內(nèi)可調(diào)用的方法拱雏,我們自己業(yè)務(wù)服務(wù)內(nèi)是調(diào)用不到的噢,這是封裝的一種保護(hù)機(jī)制
Object wrap(Object bean) {
// 分別有兩種feignClient進(jìn)行裝飾
if (bean instanceof Client && !(bean instanceof SeataFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(),
clientFactory(), this);
}
if (bean instanceof FeignBlockingLoadBalancerClient) {
FeignBlockingLoadBalancerClient client = (FeignBlockingLoadBalancerClient) bean;
return new SeataFeignBlockingLoadBalancerClient(client.getDelegate(),
beanFactory.getBean(BlockingLoadBalancerClient.class), this);
}
return new SeataFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}
feignContext的裝飾最后也是為了繼續(xù)裝飾feign底扳,這種邏輯見到了很多铸抑,例如TTL 等一層層的裝飾為了覆蓋所有情況,保證目標(biāo)要被裝飾的實(shí)例各種情況一定被裝飾到
下面看看裝飾后干了些啥
public class SeataFeignClient implements Client {
private final Client delegate;
private final BeanFactory beanFactory;
private static final int MAP_SIZE = 16;
SeataFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}
SeataFeignClient(BeanFactory beanFactory, Client delegate) {
// 被裝飾的client
this.delegate = delegate;
this.beanFactory = beanFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
// 調(diào)用時增強(qiáng)方法
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
// 目的明確還是處理 全局事務(wù)id
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
headers.put(RootContext.KEY_XID, seataXid);
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
}
RestTemplate的攔截器衷模,我們發(fā)現(xiàn)seata在裝飾feignClient的同時也對RestTemplate進(jìn)行了攔截鹊汛,防止我們的項(xiàng)目直接使用RestTemplate進(jìn)行接口調(diào)用
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
// 注入我們的攔截器
return new SeataRestTemplateInterceptor();
}
// RestTemplate可能存在多個
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@PostConstruct
public void init() {
if (this.restTemplates != null) {
// 對所有注入的RestTemplate 進(jìn)行攔截
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
}
我們再來看看SeataRestTemplateInterceptor
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
// 目標(biāo)還是非常明確,對全局事務(wù)id進(jìn)行處理阱冶,放入header中
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}
最后我們再看一眼 被調(diào)用接口如何處理吧刁憋,還是我們非常熟悉的HandlerInterceptor
public class SeataHandlerInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory
.getLogger(SeataHandlerInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) {
// 請求進(jìn)入前取出header的 全局事務(wù)id放入 threadLocal中
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if (StringUtils.isBlank(xid) && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) {
// 請求完成清理 threadLocal信息
if (StringUtils.isNotBlank(RootContext.getXID())) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
// 如果解綁和綁定的 全局事務(wù)id不同,則對后面的全局事務(wù)id再次進(jìn)行綁定木蹬,存入threadLocal
// 這里可能是處理全局事務(wù)沖突的特殊情況至耻,目前不太了解
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
}
}
解決hystrix跨線程threadLocal丟失總結(jié)
- 通過HystrixCommandExecutionHook + spi 配置讓自定義鉤子被使用,然后通過不同的執(zhí)行時機(jī)處理threadLocal
- 具體傳遞threadLocal利用 hystrix內(nèi)部的threadLocal重放機(jī)制,即HystrixRequestContext和HystrixRequestVariableDefault 的使用
- 也可以學(xué)習(xí)seata集成的方式尘颓,利用線程策略HystrixConcurrencyStrategy 的Callable 裝飾方法進(jìn)行裝飾增強(qiáng)
- 通過@Bean注入自定義HystrixConcurrencyStrategy 插件后重置原有的注冊是尖,并且留了一個可裝飾的口子,可以讓多個自定義插件層層裝飾(hystrix的本身的插件只允許存在一個)
seata繼承hystrix和feign
- 如果開啟了hystrix/sentinal等注入自定義的client構(gòu)造裝飾器泥耀,否則使用默認(rèn)的裝飾器
- 對hystrix線程切換定義了一個可多層裝飾的 自定義HystrixConcurrencyStrategy 插件,通過HystrixConcurrencyStrategy#wrapCallable 進(jìn)行增強(qiáng)
- 對feignContext蛔添,feignClient進(jìn)行裝飾痰催,最終目標(biāo)裝飾為SeataFeignClient類將全局事務(wù)id放入header
- 對所有注入到spring容器內(nèi)的RestTemplate進(jìn)行攔截,將全局事務(wù)id放入header
- 通過spring mvc的HandlerInterceptor取出header的事務(wù)id放入threadLocal