hystrix線程切換導(dǎo)致threadLocal丟失問題及延申seata與hystrix/feign的整合邏輯

使用場景草巡,往往我們使用theadLocal存取用戶登陸信息增淹,但是當(dāng)開啟hystrix時使用線程隔離模式,會使用對應(yīng)線程池內(nèi)的線程執(zhí)行feignClient的方法倔韭,那么就會導(dǎo)致threadLocal丟失

通過百度以及看源碼可以發(fā)現(xiàn)hystrix提供了HystrixPlugins恳谎,可以看到他的方法

我們先來看看HystrixPlugins暴露的方法

// 可以看到 HystrixPlugins 提供了很多東西芝此,包括線程策略,鉤子因痛,事件婚苹,等等。鸵膏。
// 首先第一感覺 鉤子是可以用的
         HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

首先看看HystrixCommandExecutionHook 的方法膊升,以及執(zhí)行時機(jī)

 /**
     * Invoked before {@link HystrixInvokable} begins executing.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param failureType {@link FailureType} enum representing which type of error
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} finishes a successful execution.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     * This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
     * naturally, or was unsubscribed externally
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
        // do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.4
     */
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
        return e;
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
     *
     * @param commandInstance The executing HystrixCommand
     *
     * @since 1.4
     */
    public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked with the command is unsubscribed before a terminal state
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.5.9
     */
    public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

通過查找文檔以及 測試確定了我們需要的方法,這是我們可以寫一個自己實(shí)現(xiàn)的鉤子,但是這個鉤子是用不同的執(zhí)行時機(jī)的回調(diào)來實(shí)現(xiàn)谭企,那么我們需要想辦法把需要傳遞的信息從前面鉤子的方法傳到后面的鉤子方法廓译,簡單的方式就是用ConcurrentMap來進(jìn)行映射,但是沒有合適的key,那么我們找到了一個類HystrixRequestContext,其實(shí)使用ConcurrentMap映射是一個比較蠢的方式债查,也就是實(shí)在沒辦法了再這樣非区,正常應(yīng)該是像TTL那樣做一個裝飾器,在成員變量里操作盹廷,或者統(tǒng)一的地方征绸,那么我們又繼續(xù)找到了HystrixRequestVariableDefault 這個類在操作HystrixRequestContext,再看一下源碼速和,它是通過包內(nèi)方法來操作HystrixRequestContext的threadLocal變量

  public void set(T value) {
// 這個方法沒有暴露給我們使用歹垫,而是提供了一個操作器
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }

下面看看我們的鉤子模樣

public class MyHystrixHook extends HystrixCommandExecutionHook {

     private final HystrixRequestVariableDefault <Long> reuqestVariable = new HystrixRequestVariableDefault<>();
    @Override
     public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //這里是hystrix 執(zhí)行對應(yīng)feignClient方法時開始時的鉤子,那么就是在當(dāng)前主線程內(nèi)操作
// 先初始化 hystrix的線程私有變量容器
        HystrixRequestContext.initializeContext();
// 你要傳遞的 變量颠放,其實(shí)就是從當(dāng)前主線程的threadLocal中獲取到的
         reuqestVariable.set(111L);
        
    }
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        // 這里是你的 FeignClient方法執(zhí)行失敗后的回調(diào)鉤子,需要清理你操作的數(shù)據(jù)
// 需要清理 子線程的threadLocal以及HystrixRequestContext
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return e; //by default, just pass through
    }

    @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
               // 這里是你的 FeignClient方法執(zhí)行成功后的回調(diào)鉤子,需要清理你操作的數(shù)據(jù)
// 需要清理 子線程的threadLocal以及HystrixRequestContext
        HystrixRequestContext.getContextForCurrentThread().shutdown();
    }
@Override
 public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
// 這里已經(jīng)是子線程了排惨,執(zhí)行目標(biāo) feignClient方法前
       Long tenantId = requestVariableDefault.get();
// 這里你可以獲取到想要的信息,可以存到threadLocal保證后續(xù)方法的一致性
    }
@Override
 public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        // 這里是熔斷回調(diào)方法開始碰凶,也可以管理threadLocal的信息傳遞暮芭, 和上述onStart方法一致

    }
@Override
   public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
// 熔斷回調(diào)失敗 的鉤子,清理即可
        return e;
    }
@Override
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
// 熔斷回調(diào)成功 的鉤子欲低,清理即可
    }
}

同時怎么才能將我們自定義的鉤子讓框架去調(diào)用呢辕宏?

// 直接在項(xiàng)目啟動時 調(diào)用這個方法 會報錯,那么直接這樣是不行的砾莱。
// 而且我們也可以看到HystrixPlugins.reset() 方法重置瑞筐,那么再看看getInstance方法內(nèi)部
HystrixPlugins.getInstance().registerCommandExecutionHook(你的鉤子);

hystrix的spi體系

下面來看鉤子的實(shí)例獲取方法

    public HystrixCommandExecutionHook getCommandExecutionHook() {
// 這里可以看到hystrix各種類型的插件默認(rèn)只有一個,但是我們可以在自定義的插件套一層裝飾器來實(shí)現(xiàn)多個相同類型的插件對方法進(jìn)行增強(qiáng)(seata集成hystrix中使用到此方式)
        if (commandExecutionHook.get() == null) {
            // check for an implementation from Archaius first
// 內(nèi)部spi的方式獲取實(shí)例
            Object impl = getPluginImplementation(HystrixCommandExecutionHook.class);
            if (impl == null) {
                // cas的方式來保證多個組件或?qū)崿F(xiàn)只有一個實(shí)例正常放入
                commandExecutionHook.compareAndSet(null, HystrixCommandExecutionHookDefault.getInstance());
                // we don't return from here but call get() again in case of thread-race so the winner will always get returned
            } else {
                // we received an implementation from Archaius so use it
                commandExecutionHook.compareAndSet(null, (HystrixCommandExecutionHook) impl);
            }
        }
        return commandExecutionHook.get();
    }

    private <T> T getPluginImplementation(Class<T> pluginClass) {
// hystrix 自己提供的 配置文件spi方式獲取實(shí)例
        T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
        if (p != null) return p;        
// 利用jdk 自動的 serviceLoader方式獲取實(shí)例
// serviceLoader可以參考 http://www.reibang.com/p/ccfd80d407ef
        return findService(pluginClass, classLoader);
    }

我們來看看hystrix配置方式的spi, 其實(shí)就是讀取hystrix-plugins.properties中文件讀取到key value腊瑟,例如我們替換hook的配置就為
hystrix.plugin.HystrixConcurrencyStrategy.implementation=xxx.xxx.MyTestHystrix

    private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {
        String classSimpleName = pluginClass.getSimpleName();
        // Check Archaius for plugin class.
        String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";
        String implementingClass = dynamicProperties.getString(propertyName, null).get();
        if (implementingClass != null) {
            try {
                Class<?> cls = Class.forName(implementingClass);
                // narrow the scope (cast) to the type we're expecting
                cls = cls.asSubclass(pluginClass);
                return (T) cls.newInstance();
            } catch (ClassCastException e) {
                throw new RuntimeException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(classSimpleName + " implementation class not found: " + implementingClass, e);
            } catch (InstantiationException e) {
                throw new RuntimeException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);
            } catch (IllegalAccessException e) {
                throw new RuntimeException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);
            }
        } else {
            return null;
        }
    }

關(guān)于seata集成hystrix以及feign的實(shí)現(xiàn)

上述使用鉤子 + spi 替換自己的鉤子方式實(shí)現(xiàn)聚假,但是我們還可以觀察到HystrixConcurrencyStrategy#wrapCallable方法块蚌,這不就是妥妥的一個線程執(zhí)行器的裝飾器預(yù)留的方法嘛?很顯然也可以通過這種方式來實(shí)現(xiàn)

如果你引入了spring-cloud-starter-alibaba-seata的話膘格,可以看到線程策略的一個實(shí)現(xiàn)com.alibaba.cloud.seata.feign.hystrix.SeataHystrixConcurrencyStrategy

下面來看看代碼

public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private final Logger logger = LoggerFactory
            .getLogger(SeataHystrixConcurrencyStrategy.class);
        // 這里是seata自定義線程策略的被裝飾的對象峭范,那么其實(shí)是允許多個同樣插件存在,不過是通過裝飾器包裹后層層增強(qiáng)
    private HystrixConcurrencyStrategy delegate;
    public SeataHystrixConcurrencyStrategy() {
        try {
// 這里通過spi獲取實(shí)例
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof SeataHystrixConcurrencyStrategy) {
// 如果已經(jīng)是當(dāng)前實(shí)現(xiàn)則不做任何操作
                return;
            }
// 這里會重新獲取所有其它插件瘪贱,然后重置后統(tǒng)一再注冊進(jìn)去
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
        HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
    HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        }
        catch (Exception ex) {
            logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
        }
    }
    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
            HystrixMetricsPublisher metricsPublisher,
            HystrixPropertiesStrategy propertiesStrategy) {
        if (logger.isDebugEnabled()) {
            logger.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            logger.debug("Registering Seata Hystrix Concurrency Strategy.");
        }
    }
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixProperty<Integer> corePoolSize,
            HystrixProperty<Integer> maximumPoolSize,
            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }
    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }
    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }
// 這里是上面提到的 線程執(zhí)行的裝飾增強(qiáng)
    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) {
// 如果是已經(jīng)裝飾了 提前返回
        if (c instanceof SeataContextCallable) {
            return c;
        }
        Callable<K> wrappedCallable;
        if (this.delegate != null) {
// 如果有其它的自定義的插件纱控,需要再被裝飾一層
            wrappedCallable = this.delegate.wrapCallable(c);
        }
        else {
            wrappedCallable = c;
        }
// 如果是已經(jīng)裝飾了 提前返回
        if (wrappedCallable instanceof SeataContextCallable) {
            return wrappedCallable;
        }
// 真正的對其裝飾
        return new SeataContextCallable<>(wrappedCallable,
                RequestContextHolder.getRequestAttributes());
    }
    private static class SeataContextCallable<K> implements Callable<K> {
        private final Callable<K> actual;
// 哈哈,我們看到了 seata對線程策略的線程執(zhí)行裝飾的真正目的菜秦,用于傳遞seata全局事務(wù)id
        private final String xid;
        private final RequestAttributes requestAttributes;
        SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
            this.actual = actual;
            this.requestAttributes = requestAttribute;
                      // 當(dāng)前還是主線程在執(zhí)行甜害,所以直接從當(dāng)前線程獲取全局事務(wù)id
            this.xid = RootContext.getXID();
        }
        @Override
        public K call() throws Exception {
                      // 典型的裝飾增強(qiáng),這里已經(jīng)是子線程在執(zhí)行了
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                RootContext.bind(xid);
                return actual.call();
            }
            finally {
                RootContext.unbind();
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

那么我們自己可不可以用他這種方式實(shí)現(xiàn)threadLocal傳遞呢喷户,答案是可以的唾那,但是要注意的是循環(huán)創(chuàng)建問題,我們添加了一個volatile 變量防止自定義插件的構(gòu)造方法中通過spi獲取實(shí)例形成循環(huán)創(chuàng)建實(shí)例
// 邏輯和seata的自定義插件基本一致
public class MyTestHystrix extends HystrixConcurrencyStrategy {

    private final Logger logger = LoggerFactory
        .getLogger(SeataHystrixConcurrencyStrategy.class);
    private HystrixConcurrencyStrategy delegate;
    private static volatile boolean alreadyInit = false;
    public MyTestHystrix() {
// 這里添加一個 volatile變量防止構(gòu)造方法和spi獲取實(shí)例形成循環(huán)
        if (alreadyInit) {
            return;
        }
        alreadyInit = true;
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof MyTestHystrix) {
                return;
            }
            HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
            logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception ex) {
            logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
        }
    }


    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) {
        if (c instanceof MyTestHystrix.MyContext) {
            return c;
        }

        Callable<K> wrappedCallable;
        if (this.delegate != null) {
            wrappedCallable = this.delegate.wrapCallable(c);
        } else {
            wrappedCallable = c;
        }
        if (wrappedCallable instanceof MyTestHystrix.MyContext) {
            return wrappedCallable;
        }

        return new MyTestHystrix.MyContext<>(wrappedCallable,
            RequestContextHolder.getRequestAttributes());
    }

    private static class MyContext<K> implements Callable<K> {

        private final Callable<K> actual;

        private final String tenantId;

        private final RequestAttributes requestAttributes;


        MyContext(Callable<K> actual, RequestAttributes requestAttribute) {
            this.actual = actual;
            this.requestAttributes = requestAttribute;
            this.tenantId = LoginInfoUtils.getTenantId();
        }

        @Override
        public K call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
              // 這里就是手動的將當(dāng)前子線程信息填充
                LoginInfoUtils.fillLoginInfo(null, this.tenantId, null);
                return actual.call();
            } finally {
              // 清理或者 還原(TTL的重放邏輯是還原)
                LoginInfoUtils.remove();
                RequestContextHolder.resetRequestAttributes();
            }
        }

    }
}

以上就是另一種方式來實(shí)現(xiàn)threadLocal的傳遞褪尝,但是據(jù)其它文章描述通過callable的wrap方式并不能覆蓋到hystrix的fallback闹获,而鉤子的自定義實(shí)現(xiàn)可以,本人沒有去考證河哑,因?yàn)橹苯邮褂玫你^子方式

seata集成feign以及hystrix

先看hystrix的集成
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HystrixCommand.class)
public class SeataHystrixAutoConfiguration {
// 通過注入bean的方式 使用自定義的線程策略避诽,這樣的好處是可以通過裝飾器來拼裝多個自定義實(shí)現(xiàn)的插件
    @Bean
    SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {
        return new SeataHystrixConcurrencyStrategy();
    }

}
再來看feign的集成

再bean的注入會一層層的進(jìn)行裝飾器增強(qiáng)來完成我們需要的目標(biāo),即處理全局事務(wù)id


@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class SeataFeignClientAutoConfiguration {
// 如果開啟了hystrix璃谨,返回一個feign構(gòu)造器
    @Bean
    @Scope("prototype")
    @ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
    @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
    Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
        return SeataHystrixFeignBuilder.builder(beanFactory);
    }
// 如果使用了阿里的 sentinal 返回一個feign構(gòu)造器
    @Bean
    @Scope("prototype")
    @ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
    @ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
    Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
        return SeataSentinelFeignBuilder.builder(beanFactory);
    }
// 如果沒有開啟hystrix和sentinal 返回一個只有feign裝飾器的feign構(gòu)造器實(shí)例
    @Bean
    @ConditionalOnMissingBean
    @Scope("prototype")
    Feign.Builder feignBuilder(BeanFactory beanFactory) {
        return SeataFeignBuilder.builder(beanFactory);
    }

    @Configuration(proxyBeanMethods = false)
    protected static class FeignBeanPostProcessorConfiguration {
// 一個 beanPostProcessor 對所有spring 管理的bean進(jìn)行選擇性的裝飾
        @Bean
        SeataBeanPostProcessor seataBeanPostProcessor(
                SeataFeignObjectWrapper seataFeignObjectWrapper) {
            return new SeataBeanPostProcessor(seataFeignObjectWrapper);
        }
// 對于feignContext修飾的一個 攔截處理器
        @Bean
        SeataContextBeanPostProcessor seataContextBeanPostProcessor(
                BeanFactory beanFactory) {
            return new SeataContextBeanPostProcessor(beanFactory);
        }
// seata自己的真正選擇某些bean進(jìn)行裝飾的類
        @Bean
        SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
            return new SeataFeignObjectWrapper(beanFactory);
        }

    }

}
SeataContextBeanPostProcessor 和 SeataBeanPostProcessor 分別為feignContext沙庐,feignClient裝飾,這兩個都是利用了beanPostProcessor這個鉤子所有spring管理的bean都會判斷是否需要處理佳吞,如果是想要處理的bean則進(jìn)行裝飾

下面來看看SeataFeignObjectWrapper裝飾邏輯

// 首先是一個包內(nèi)可調(diào)用的方法拱雏,我們自己業(yè)務(wù)服務(wù)內(nèi)是調(diào)用不到的噢,這是封裝的一種保護(hù)機(jī)制
    Object wrap(Object bean) {
// 分別有兩種feignClient進(jìn)行裝飾
        if (bean instanceof Client && !(bean instanceof SeataFeignClient)) {
            if (bean instanceof LoadBalancerFeignClient) {
                LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
                return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(),
                        clientFactory(), this);
            }
            if (bean instanceof FeignBlockingLoadBalancerClient) {
                FeignBlockingLoadBalancerClient client = (FeignBlockingLoadBalancerClient) bean;
                return new SeataFeignBlockingLoadBalancerClient(client.getDelegate(),
                        beanFactory.getBean(BlockingLoadBalancerClient.class), this);
            }
            return new SeataFeignClient(this.beanFactory, (Client) bean);
        }
        return bean;
    }

feignContext的裝飾最后也是為了繼續(xù)裝飾feign底扳,這種邏輯見到了很多铸抑,例如TTL 等一層層的裝飾為了覆蓋所有情況,保證目標(biāo)要被裝飾的實(shí)例各種情況一定被裝飾到
下面看看裝飾后干了些啥

public class SeataFeignClient implements Client {

    private final Client delegate;

    private final BeanFactory beanFactory;

    private static final int MAP_SIZE = 16;

    SeataFeignClient(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
        this.delegate = new Client.Default(null, null);
    }
    SeataFeignClient(BeanFactory beanFactory, Client delegate) {
// 被裝飾的client
        this.delegate = delegate;
        this.beanFactory = beanFactory;
    }
    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
    // 調(diào)用時增強(qiáng)方法
        Request modifiedRequest = getModifyRequest(request);
        return this.delegate.execute(modifiedRequest, options);
    }

    private Request getModifyRequest(Request request) {
    // 目的明確還是處理 全局事務(wù)id
        String xid = RootContext.getXID();

        if (StringUtils.isEmpty(xid)) {
            return request;
        }

        Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
        headers.putAll(request.headers());

        List<String> seataXid = new ArrayList<>();
        seataXid.add(xid);
        headers.put(RootContext.KEY_XID, seataXid);

        return Request.create(request.method(), request.url(), headers, request.body(),
                request.charset());
    }

}

RestTemplate的攔截器衷模,我們發(fā)現(xiàn)seata在裝飾feignClient的同時也對RestTemplate進(jìn)行了攔截鹊汛,防止我們的項(xiàng)目直接使用RestTemplate進(jìn)行接口調(diào)用
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {

    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
// 注入我們的攔截器
        return new SeataRestTemplateInterceptor();
    }
      // RestTemplate可能存在多個
    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;

    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
// 對所有注入的RestTemplate 進(jìn)行攔截
            for (RestTemplate restTemplate : restTemplates) {
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
                        restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }
    }

}

我們再來看看SeataRestTemplateInterceptor

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    @Override
    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
            ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
              // 目標(biāo)還是非常明確,對全局事務(wù)id進(jìn)行處理阱冶,放入header中
        String xid = RootContext.getXID();

        if (!StringUtils.isEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }
        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }

}
最后我們再看一眼 被調(diào)用接口如何處理吧刁憋,還是我們非常熟悉的HandlerInterceptor
public class SeataHandlerInterceptor implements HandlerInterceptor {

    private static final Logger log = LoggerFactory
            .getLogger(SeataHandlerInterceptor.class);

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
            Object handler) {
// 請求進(jìn)入前取出header的 全局事務(wù)id放入 threadLocal中
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader(RootContext.KEY_XID);
        if (log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }

        if (StringUtils.isBlank(xid) && rpcXid != null) {
            RootContext.bind(rpcXid);
            if (log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }

        return true;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
            Object handler, Exception e) {
// 請求完成清理 threadLocal信息
        if (StringUtils.isNotBlank(RootContext.getXID())) {
            String rpcXid = request.getHeader(RootContext.KEY_XID);
            if (StringUtils.isEmpty(rpcXid)) {
                return;
            }
            String unbindXid = RootContext.unbind();
            if (log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }
// 如果解綁和綁定的 全局事務(wù)id不同,則對后面的全局事務(wù)id再次進(jìn)行綁定木蹬,存入threadLocal
// 這里可能是處理全局事務(wù)沖突的特殊情況至耻,目前不太了解
            if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if (unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }
        }
    }

}

解決hystrix跨線程threadLocal丟失總結(jié)

  1. 通過HystrixCommandExecutionHook + spi 配置讓自定義鉤子被使用,然后通過不同的執(zhí)行時機(jī)處理threadLocal
  2. 具體傳遞threadLocal利用 hystrix內(nèi)部的threadLocal重放機(jī)制,即HystrixRequestContext和HystrixRequestVariableDefault 的使用
  3. 也可以學(xué)習(xí)seata集成的方式尘颓,利用線程策略HystrixConcurrencyStrategy 的Callable 裝飾方法進(jìn)行裝飾增強(qiáng)
  4. 通過@Bean注入自定義HystrixConcurrencyStrategy 插件后重置原有的注冊是尖,并且留了一個可裝飾的口子,可以讓多個自定義插件層層裝飾(hystrix的本身的插件只允許存在一個)

seata繼承hystrix和feign

  1. 如果開啟了hystrix/sentinal等注入自定義的client構(gòu)造裝飾器泥耀,否則使用默認(rèn)的裝飾器
  2. 對hystrix線程切換定義了一個可多層裝飾的 自定義HystrixConcurrencyStrategy 插件,通過HystrixConcurrencyStrategy#wrapCallable 進(jìn)行增強(qiáng)
  3. 對feignContext蛔添,feignClient進(jìn)行裝飾痰催,最終目標(biāo)裝飾為SeataFeignClient類將全局事務(wù)id放入header
  4. 對所有注入到spring容器內(nèi)的RestTemplate進(jìn)行攔截,將全局事務(wù)id放入header
  5. 通過spring mvc的HandlerInterceptor取出header的事務(wù)id放入threadLocal
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末迎瞧,一起剝皮案震驚了整個濱河市夸溶,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凶硅,老刑警劉巖缝裁,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異足绅,居然都是意外死亡捷绑,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門氢妈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來粹污,“玉大人,你說我怎么就攤上這事首量∽撤裕” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵加缘,是天一觀的道長鸭叙。 經(jīng)常有香客問我,道長拣宏,這世上最難降的妖魔是什么沈贝? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮蚀浆,結(jié)果婚禮上缀程,老公的妹妹穿的比我還像新娘。我一直安慰自己市俊,他們只是感情好杨凑,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著摆昧,像睡著了一般撩满。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天伺帘,我揣著相機(jī)與錄音昭躺,去河邊找鬼。 笑死伪嫁,一個胖子當(dāng)著我的面吹牛领炫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播张咳,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼帝洪,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了脚猾?” 一聲冷哼從身側(cè)響起葱峡,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎龙助,沒想到半個月后砰奕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡提鸟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年军援,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沽一。...
    茶點(diǎn)故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡盖溺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出铣缠,到底是詐尸還是另有隱情烘嘱,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布蝗蛙,位于F島的核電站蝇庭,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏捡硅。R本人自食惡果不足惜哮内,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望壮韭。 院中可真熱鬧北发,春花似錦、人聲如沸喷屋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽屯曹。三九已至狱庇,卻和暖如春惊畏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背密任。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工颜启, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人浪讳。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓缰盏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親淹遵。 傳聞我的和親對象是個殘疾皇子乳规,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評論 2 354