關于@Async異步執(zhí)行及TransmittableThreadLocal(TTL)使用時theadLocal數據錯亂或者失效

@Async 是spring提供的非常方便的異步執(zhí)行的注解,非常方便翅雏,可以指定線程池執(zhí)行撒穷,但是它不是動態(tài)代理實現妆绞,也就是和其它動態(tài)代理注解(例如@Transactional)放在一起會導致動態(tài)代理失效恋沃。因為spring在拿到 @Async注解后直接委托給 AnnotationAsyncExecutionInterceptor 來執(zhí)行@Async目標方法必搞,而不是執(zhí)行代理方法會走層層動態(tài)代理。
然后包裝一個callable提交給TaskExecutor 來執(zhí)行囊咏。
我們不會具體討論@Async和線程池以及threadLocal的具體實現恕洲,只跟隨我們的使用場景涉及到的源碼

使用場景及問題
// 定義個 ttl threadLocal用于存儲一些信息
    private static final TransmittableThreadLocal<FlowContext> FLOW_CONTEXT = new TransmittableThreadLocal<>();
//使用spring mvc 提供的HandlerInterceptor 接口的攔截能力做請求前放入threadLocal中一些信息 然后在 afterCompletion 時機清理掉
//如下例子
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        String processInstanceIdStr = request.getHeader(METADATA_FLOW_PROCESS_INSTANCE_ID);
        String processIdStr = request.getHeader(METADATA_FLOW_FIRST_PROCESS_ID);
        try {
            Long processInstanceId = null;
            Long processId = null;
            if (StringUtils.isNotBlank(processInstanceIdStr)) {
                processInstanceId = Long.valueOf(processInstanceIdStr);
            }
            if (StringUtils.isNotBlank(processIdStr)) {
                processId = Long.valueOf(processIdStr);
            }
            doFill(processInstanceId, processId);
        } catch (Exception e) {
            FLOW_CONTEXT.remove();
        }
        return true;
    }


    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        super.afterCompletion(request, response, handler, ex);
        FLOW_CONTEXT.remove();
    }

如上代碼非常簡單的 通過header透傳一些信息。在servlet線程周期管理 theadLocal信息梅割。但是我們在請求進來后有一些異步操作也想要獲取threadLocal信息如下

    @Async
    public void test(Long a, TransmittableThreadLocal<FlowContext> local) throws InterruptedException {
        Long processId = MetadataFlowContextHandler.getFirstProcessId();
        if (MetadataFlowContextHandler.getLocal() == local) {
            System.out.println("會進入到這里");
        }
        if (!Objects.equals(a, processId)) {
            System.out.println("有問題霜第,除了第一次都不相等");
        }
    }

在我們本地進行一次測試會發(fā)現ThreadLocal信息如預想般獲取到了正確的值,但是如果你仔細測試户辞,并發(fā)情況泌类,或者你測試幾下,然后等一會再測試就會出現錯誤的情況底燎,那么下面列出了錯誤的情況和簡略原因刃榨,然后分析一下源碼

影響因素
  1. 內存足夠
  2. @Aync 線程池core線程數量都已經創(chuàng)建
  3. @Aync 線程池任務隊列沒有排滿
會出現theadLocal錯誤的情況
  1. 滿足上述條件1,2书蚪,3新的請求進入就會錯誤
  2. 1不滿足喇澡,滿足2迅栅,3則可能會在子線程(也就是@Async)獲取到null(未驗證憑借猜想)
原理就是因為主線程在第一次傳遞theadLocal對象的引用給子線程后放到當前線程的threadLocalMap中殊校,后續(xù)子線程由于線程復用會在get時先通過當前線程對象去theadLocalMap中獲取緩存的值,如果獲取到直接返回读存,那么大部分時候會一直返回第一次主線程傳遞過來的引用为流。而主線程remove是不會傳遞的呕屎。

為什么要滿足上述3個影響因素,如果1不滿足敬察,jvm的gc會將theadLocalMap對象清理秀睛,因為他是一個弱引用 WeakReference,而TTL主線程傳遞給子線程時也是存入主線程的theadLocal對象weakHashMap返回莲祸,如果內存不足會清理掉后在子線程會調用ThreadLocal#setInitialValue方法委托到子類TransmittableThreadLocal#initialValue其實也是返回一個空的map就會獲取失敗(未驗證)蹂安,那么2,3也是這個道理锐帜,如果服務剛啟動線程池可能會new新的thead那么主線程也一定傳遞正確的

@Async的執(zhí)行代碼

我們直接看ReflectiveMethodInvocation類進入攔截邏輯

    @Override
    @Nullable
    public Object proceed() throws Throwable {
        // We start with an index of -1 and increment early.
        if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
            return invokeJoinpoint();
        }
// 這里會獲取到一個AnnotationAsyncExecutionInterceptor田盈,它不屬于動態(tài)代理,在下面不會執(zhí)行其它所有動態(tài)代理了,至于這個@Async的排序是不是最前面的index缴阎,如果是后面的index其實前面的動態(tài)代理也是可以執(zhí)行的允瞧,這里不詳細研究了
        Object interceptorOrInterceptionAdvice =
                this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
            // Evaluate dynamic method matcher here: static part will already have
            // been evaluated and found to match.
            InterceptorAndDynamicMethodMatcher dm =
                    (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
            Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
// 這里算是將動態(tài)代理的方法作為一個適配器去匹配
            if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
// 如果匹配成功則會執(zhí)行動態(tài)代理,而后又會執(zhí)行到當前方法體內
                return dm.interceptor.invoke(this);
            }
            else {
                // Dynamic matching failed.
                // Skip this interceptor and invoke the next in the chain.
                return proceed();
            }
        }
        else {
            // It's an interceptor, so we just invoke it: The pointcut will have
            // been evaluated statically before this object was constructed.
// 如果不是動態(tài)代理這里直接執(zhí)行目標方法攔截器蛮拔,那么不會重復進入當前方法體內了述暂,其它的動態(tài)代理會失效
            return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
        }
    }

下面是AnnotationAsyncExecutionInterceptor#invoke方法

    @Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 這里必須是 異步的
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };
// 提交到線程池執(zhí)行
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

下面是 doSubmit方法

    @Nullable
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 這里根據返回值進行了封裝,如果是CompletableFuture 則將這個callable 封裝為CompletableFuture 返回給客戶端自由操作
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
// 這是 spring提供的一個可以添加監(jiān)聽的Future建炫,也就是將返回值設置為ListenableFuture的子類便可以添加一些監(jiān)聽例如異步方法成功后畦韭,或者拋出異常的后進行一些信息收集和邏輯判斷日志打印之類
// 例如 StompBrokerRelayMessageHandler#forward 方法,這是spring-messaging中的stomp協(xié)議的一個future監(jiān)聽實現

        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
// 這是一個基礎的 Future封裝返回
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {
// 其它返回值或者沒有返回值的
            executor.submit(task);
            return null;
        }
    }

再下面進入了線程池的邏輯踱卵,為什么要知道線程池的邏輯廊驼,因為影響了ttl傳遞threadLocal的邏輯,因為在子線程是new的情況下會將當前主線程的threadLocal的引用傳遞給異步的子線程惋砂,如果是復用時則什么也不會做妒挎!那為什么你在測試代碼時已經復用的線程還是好用呢,因為子線程通過弱引用的threadLocalMap保存了第一次在new Thread時的主線程threadLocal信息西饵,你換個信息的值再試試酝掩!
/*
*分三步進行:

  • 1。如果運行的線程少于corePoolSize眷柔,則嘗試
    *以給定的命令作為第一個啟動一個新線程
    *任務期虾。對addWorker的調用自動地檢查runState和
  • workerCount,這樣可以防止添加錯誤警報
    *當它不應該返回false線程驯嘱。
  • 2镶苞。如果一個任務可以成功排隊,那么我們仍然需要
    *再次檢查我們是否應該添加一個線程
    *(因為已經存在的線程在上次檢查后已經死亡)
    *池在進入該方法后關閉鞠评。所以我們
    *重新檢查狀態(tài)茂蚓,必要時回滾排隊
    *停止,或啟動一個新的線程,如果沒有聋涨。
  • 3晾浴。如果不能對任務進行排隊,則嘗試添加一個新的
    *線程牍白。如果失敗脊凰,我們就知道系統(tǒng)關閉或飽和了
    *等拒絕該任務。
    */
    上面復制于源碼的注釋茂腥,具體大家可以百度其它的文章來學習狸涌,或者直接看源碼ThreadPoolExecutor#execute(Runnable command)的代碼
ttl在子線程為new Thread時傳遞邏輯

直接看Thread#init代碼

  private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
      // 省略部分代碼...
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
// 這里判斷主線程是否含有inheritableThreadLocals && 當前子線程是否可以傳遞線程私有變量
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 子線程創(chuàng)建 threadLocal 并傳入父線程的 threadLocalMap
            this.inheritableThreadLocals =
              ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

下面會調用ThreadLocal#ThreadLocalMap(ThreadLocalMap parentMap) -> ThreadLocal#childValue(T parentValue)

// 這是ThreadLocal 的代碼,會直接報錯不支持最岗,只有InheritableThreadLocal及其子類支持杈抢,而TTL繼承了InheritableThreadLocal類
T childValue(T parentValue) {
        throw new UnsupportedOperationException();
    }
// 而InheritableThreadLocal的實現如下直接返回主線程的值,雖然傳遞了但是客戶端不容易拿到
  protected T childValue(T parentValue) {
        return parentValue;
    }
// TransmittableThreadLocal 的實現是返回一個以TransmittableThreadLocal對象為key的weakHashMap仑性,作為InheritableThreadLocal的增強保持了弱引用的語義及傳入主線程值的引用惶楼,并且可以在子線程通過TTL的get時從這個weakHashMap直接獲取到從主線程傳遞過來的引用
    private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
        protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
            return new WeakHashMap();
        }
//這里看出TTL 不單單是將主線程的 threadLocal的值引用傳遞,并且將主線程的TransmittableThreadLocal 對象作為key傳入到子線程的ThreadLocalMap中
        protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
            return new WeakHashMap(parentValue);
        }
    };
下面時TTL 的get方法
   @Override
    public final T get() {
// 調用父類 ThreadLocal的get
        T value = super.get();
// 這里很重要诊杆,
        if (null != value) addValue();
        return value;
    }
//  threadLocal 的get方法
    public T get() {
        Thread t = Thread.currentThread();
// 獲取當前線程的 ThreadLocalMap 
        ThreadLocalMap map = getMap(t);
        if (map != null) {
// 這里的this其實是當前TransmittableThreadLocal 對象
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
// 如果沒有獲取到值會初始化一下新的theadLocal中的對象歼捐,先會調用子類中的initialValue然后如果ThreadLocalMap沒有被回收直接返回init的值并返回,如果被回收了會create新的map晨汹,實際TTL也只會new一個空的map返回
        return setInitialValue();
    }
     private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
// 通過threadLocal對象的hashCode從ThreadLocalMap獲取到緩存的對象
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

我們盡量不討論 TTL之外的代碼豹储,上述代碼是一個標準的從ThreadLocal中get對象的流程,但是TTL的get有一個addValue的操作

// 這里的邏輯是在TTL 傳遞childValue時的 map重新灌入的邏輯淘这,目前還不知道為什么這樣做剥扣,后續(xù)文章會仔細探討
    private void addValue() {
        if (!holder.get().containsKey(this)) {
            holder.get().put(this, null); // WeakHashMap supports null value.
        }
    }

如上述這些代碼,雖然使用TTL在new Thread時將主線程的引用灌入了子線程中铝穷,并處理業(yè)務對象本身還放入了一個weakHashMap以threadLocal對象為key钠怯,但是在get時候并沒有什么不同啊,我們通過測試發(fā)現了問題后debug這里也看不出什么貓膩曙聂,但是發(fā)現就是子線程一直獲取第一次傳遞過來的對象引用晦炊,實際實現的邏輯也沒有用到TTL重寫的childValue方法中構造的map,而是直接使用了InheritableThreadLocal實現的業(yè)務對象的直接引用宁脊。然后看TTL的代碼中內部類Transmitter有大量的復制断国,重放,還原的邏輯如下

       @Nonnull
        public static Object replay(@Nonnull Object captured) {
// 這個方法一看就是在復制替換數據榆苞,實際就是在各種線程池的工作線程執(zhí)行前的重放(替換threadLocal變量)具體邏輯后續(xù)文章探討稳衬,那么看來這個動作在debug中一直沒有執(zhí)行,所以沒有產生TTL線程私有變量的正確傳遞坐漏,我們看看是誰在調用它
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                // backup
                backup.put(threadLocal, threadLocal.get());
                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }
            // set values to captured TTL
            setTtlValuesTo(capturedMap);
            // call beforeExecute callback
            // 執(zhí)行時機 為 目標方法執(zhí)行前
            doExecuteCallback(true);
            return backup;
        }

看看是誰在調用這個replay方法


誰在調用replay

我們進入一個Runnable的地方TtlRunnable類薄疚,看的出來是一個裝飾器做了增強弄砍,然后對目標Runnable執(zhí)行前執(zhí)行后對threadLocal進行了重放,還原工作 如下 TtlRunnable

    @Override
    public void run() {
        Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            restore(backup);
        }
    }

那么繼續(xù)输涕,TtlRunnable又是誰搞的呢


image.png

原來是ExecutorServiceTtlWrapper 類,另一個先忽略一看就是和定時相關的。那么這是一個ExecutorService的裝飾器,也是做了增強崔列,目的是可以使用TtlRunnable這個增強再往下看


image.png

TtlExecutors 這個類有一堆靜態(tài)方法遮精,都是返回傳入目標對象返回其裝飾器的方法,那就是我們在構造ExecutorService線程池時可以直接使用這個類的返回裝飾器應該就可以了

    @Bean(name = "taskExecutor")
    public ExecutorService threadPoolTaskExecutor() {
// 返回裝飾器
        return TtlExecutors.getTtlExecutorService(
            new ThreadPoolExecutor(50, 300, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()));
    }

經過測試沒有問題砸王,嗐,原來是自己不知道TTL還需要結合線程池的裝飾器來實現threadLocal的正確傳遞!菜是原罪啊T.T
網上很多關于TTL的實現原理的講解乃正,我們后續(xù)也會通過這次經驗來詳細了解一下TTL的實現機制和設計思想

TransmittableThreadLocal線程間傳遞邏輯 - 簡書 (jianshu.com)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市婶博,隨后出現的幾起案子瓮具,更是在濱河造成了極大的恐慌,老刑警劉巖凡人,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件名党,死亡現場離奇詭異,居然都是意外死亡挠轴,警方通過查閱死者的電腦和手機传睹,發(fā)現死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來岸晦,“玉大人欧啤,你說我怎么就攤上這事∑羯希” “怎么了邢隧?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長冈在。 經常有香客問我府框,道長,這世上最難降的妖魔是什么讥邻? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任迫靖,我火速辦了婚禮,結果婚禮上兴使,老公的妹妹穿的比我還像新娘系宜。我一直安慰自己,他們只是感情好发魄,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布盹牧。 她就那樣靜靜地躺著俩垃,像睡著了一般。 火紅的嫁衣襯著肌膚如雪汰寓。 梳的紋絲不亂的頭發(fā)上口柳,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音有滑,去河邊找鬼跃闹。 笑死,一個胖子當著我的面吹牛毛好,可吹牛的內容都是我干的望艺。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼肌访,長吁一口氣:“原來是場噩夢啊……” “哼找默!你這毒婦竟也來了?” 一聲冷哼從身側響起吼驶,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤惩激,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蟹演,有當地人在樹林里發(fā)現了一具尸體咧欣,經...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年轨帜,在試婚紗的時候發(fā)現自己被綠了魄咕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚌父,死狀恐怖哮兰,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情苟弛,我是刑警寧澤喝滞,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站膏秫,受9級特大地震影響右遭,放射性物質發(fā)生泄漏。R本人自食惡果不足惜缤削,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一窘哈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧亭敢,春花似錦滚婉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽远剩。三九已至,卻和暖如春骇窍,著一層夾襖步出監(jiān)牢的瞬間瓜晤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工腹纳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留痢掠,地道東北人。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓只估,卻偏偏與公主長得像,于是被迫代替她去往敵國和親着绷。 傳聞我的和親對象是個殘疾皇子蛔钙,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內容