@Async 是spring提供的非常方便的異步執(zhí)行的注解,非常方便翅雏,可以指定線程池執(zhí)行撒穷,但是它不是動態(tài)代理實現妆绞,也就是和其它動態(tài)代理注解(例如@Transactional)放在一起會導致動態(tài)代理失效恋沃。因為spring在拿到 @Async注解后直接委托給 AnnotationAsyncExecutionInterceptor 來執(zhí)行@Async目標方法必搞,而不是執(zhí)行代理方法會走層層動態(tài)代理。
然后包裝一個callable提交給TaskExecutor 來執(zhí)行囊咏。
我們不會具體討論@Async和線程池以及threadLocal的具體實現恕洲,只跟隨我們的使用場景涉及到的源碼
使用場景及問題
// 定義個 ttl threadLocal用于存儲一些信息
private static final TransmittableThreadLocal<FlowContext> FLOW_CONTEXT = new TransmittableThreadLocal<>();
//使用spring mvc 提供的HandlerInterceptor 接口的攔截能力做請求前放入threadLocal中一些信息 然后在 afterCompletion 時機清理掉
//如下例子
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String processInstanceIdStr = request.getHeader(METADATA_FLOW_PROCESS_INSTANCE_ID);
String processIdStr = request.getHeader(METADATA_FLOW_FIRST_PROCESS_ID);
try {
Long processInstanceId = null;
Long processId = null;
if (StringUtils.isNotBlank(processInstanceIdStr)) {
processInstanceId = Long.valueOf(processInstanceIdStr);
}
if (StringUtils.isNotBlank(processIdStr)) {
processId = Long.valueOf(processIdStr);
}
doFill(processInstanceId, processId);
} catch (Exception e) {
FLOW_CONTEXT.remove();
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
super.afterCompletion(request, response, handler, ex);
FLOW_CONTEXT.remove();
}
如上代碼非常簡單的 通過header透傳一些信息。在servlet線程周期管理 theadLocal信息梅割。但是我們在請求進來后有一些異步操作也想要獲取threadLocal信息如下
@Async
public void test(Long a, TransmittableThreadLocal<FlowContext> local) throws InterruptedException {
Long processId = MetadataFlowContextHandler.getFirstProcessId();
if (MetadataFlowContextHandler.getLocal() == local) {
System.out.println("會進入到這里");
}
if (!Objects.equals(a, processId)) {
System.out.println("有問題霜第,除了第一次都不相等");
}
}
在我們本地進行一次測試會發(fā)現ThreadLocal信息如預想般獲取到了正確的值,但是如果你仔細測試户辞,并發(fā)情況泌类,或者你測試幾下,然后等一會再測試就會出現錯誤的情況底燎,那么下面列出了錯誤的情況和簡略原因刃榨,然后分析一下源碼
影響因素
- 內存足夠
- @Aync 線程池core線程數量都已經創(chuàng)建
- @Aync 線程池任務隊列沒有排滿
會出現theadLocal錯誤的情況
- 滿足上述條件1,2书蚪,3新的請求進入就會錯誤
- 1不滿足喇澡,滿足2迅栅,3則可能會在子線程(也就是@Async)獲取到null(未驗證憑借猜想)
原理就是因為主線程在第一次傳遞theadLocal對象的引用給子線程后放到當前線程的threadLocalMap中殊校,后續(xù)子線程由于線程復用會在get時先通過當前線程對象去theadLocalMap中獲取緩存的值,如果獲取到直接返回读存,那么大部分時候會一直返回第一次主線程傳遞過來的引用为流。而主線程remove是不會傳遞的呕屎。
為什么要滿足上述3個影響因素,如果1不滿足敬察,jvm的gc會將theadLocalMap對象清理秀睛,因為他是一個弱引用 WeakReference,而TTL主線程傳遞給子線程時也是存入主線程的theadLocal對象weakHashMap返回莲祸,如果內存不足會清理掉后在子線程會調用ThreadLocal#setInitialValue方法委托到子類TransmittableThreadLocal#initialValue其實也是返回一個空的map就會獲取失敗(未驗證)蹂安,那么2,3也是這個道理锐帜,如果服務剛啟動線程池可能會new新的thead那么主線程也一定傳遞正確的
@Async的執(zhí)行代碼
我們直接看ReflectiveMethodInvocation類進入攔截邏輯
@Override
@Nullable
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// 這里會獲取到一個AnnotationAsyncExecutionInterceptor田盈,它不屬于動態(tài)代理,在下面不會執(zhí)行其它所有動態(tài)代理了,至于這個@Async的排序是不是最前面的index缴阎,如果是后面的index其實前面的動態(tài)代理也是可以執(zhí)行的允瞧,這里不詳細研究了
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
// 這里算是將動態(tài)代理的方法作為一個適配器去匹配
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
// 如果匹配成功則會執(zhí)行動態(tài)代理,而后又會執(zhí)行到當前方法體內
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
// 如果不是動態(tài)代理這里直接執(zhí)行目標方法攔截器蛮拔,那么不會重復進入當前方法體內了述暂,其它的動態(tài)代理會失效
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
下面是AnnotationAsyncExecutionInterceptor#invoke方法
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 這里必須是 異步的
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交到線程池執(zhí)行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
下面是 doSubmit方法
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 這里根據返回值進行了封裝,如果是CompletableFuture 則將這個callable 封裝為CompletableFuture 返回給客戶端自由操作
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// 這是 spring提供的一個可以添加監(jiān)聽的Future建炫,也就是將返回值設置為ListenableFuture的子類便可以添加一些監(jiān)聽例如異步方法成功后畦韭,或者拋出異常的后進行一些信息收集和邏輯判斷日志打印之類
// 例如 StompBrokerRelayMessageHandler#forward 方法,這是spring-messaging中的stomp協(xié)議的一個future監(jiān)聽實現
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 這是一個基礎的 Future封裝返回
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
// 其它返回值或者沒有返回值的
executor.submit(task);
return null;
}
}
再下面進入了線程池的邏輯踱卵,為什么要知道線程池的邏輯廊驼,因為影響了ttl傳遞threadLocal的邏輯,因為在子線程是new的情況下會將當前主線程的threadLocal的引用傳遞給異步的子線程惋砂,如果是復用時則什么也不會做妒挎!那為什么你在測試代碼時已經復用的線程還是好用呢,因為子線程通過弱引用的threadLocalMap保存了第一次在new Thread時的主線程threadLocal信息西饵,你換個信息的值再試試酝掩!
/*
*分三步進行:
*
- 1。如果運行的線程少于corePoolSize眷柔,則嘗試
*以給定的命令作為第一個啟動一個新線程
*任務期虾。對addWorker的調用自動地檢查runState和 - workerCount,這樣可以防止添加錯誤警報
*當它不應該返回false線程驯嘱。
* - 2镶苞。如果一個任務可以成功排隊,那么我們仍然需要
*再次檢查我們是否應該添加一個線程
*(因為已經存在的線程在上次檢查后已經死亡)
*池在進入該方法后關閉鞠评。所以我們
*重新檢查狀態(tài)茂蚓,必要時回滾排隊
*停止,或啟動一個新的線程,如果沒有聋涨。
* - 3晾浴。如果不能對任務進行排隊,則嘗試添加一個新的
*線程牍白。如果失敗脊凰,我們就知道系統(tǒng)關閉或飽和了
*等拒絕該任務。
*/
上面復制于源碼的注釋茂腥,具體大家可以百度其它的文章來學習狸涌,或者直接看源碼ThreadPoolExecutor#execute(Runnable command)的代碼
ttl在子線程為new Thread時傳遞邏輯
直接看Thread#init代碼
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略部分代碼...
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
// 這里判斷主線程是否含有inheritableThreadLocals && 當前子線程是否可以傳遞線程私有變量
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 子線程創(chuàng)建 threadLocal 并傳入父線程的 threadLocalMap
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
下面會調用ThreadLocal#ThreadLocalMap(ThreadLocalMap parentMap) -> ThreadLocal#childValue(T parentValue)
// 這是ThreadLocal 的代碼,會直接報錯不支持最岗,只有InheritableThreadLocal及其子類支持杈抢,而TTL繼承了InheritableThreadLocal類
T childValue(T parentValue) {
throw new UnsupportedOperationException();
}
// 而InheritableThreadLocal的實現如下直接返回主線程的值,雖然傳遞了但是客戶端不容易拿到
protected T childValue(T parentValue) {
return parentValue;
}
// TransmittableThreadLocal 的實現是返回一個以TransmittableThreadLocal對象為key的weakHashMap仑性,作為InheritableThreadLocal的增強保持了弱引用的語義及傳入主線程值的引用惶楼,并且可以在子線程通過TTL的get時從這個weakHashMap直接獲取到從主線程傳遞過來的引用
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap();
}
//這里看出TTL 不單單是將主線程的 threadLocal的值引用傳遞,并且將主線程的TransmittableThreadLocal 對象作為key傳入到子線程的ThreadLocalMap中
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap(parentValue);
}
};
下面時TTL 的get方法
@Override
public final T get() {
// 調用父類 ThreadLocal的get
T value = super.get();
// 這里很重要诊杆,
if (null != value) addValue();
return value;
}
// threadLocal 的get方法
public T get() {
Thread t = Thread.currentThread();
// 獲取當前線程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 這里的this其實是當前TransmittableThreadLocal 對象
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果沒有獲取到值會初始化一下新的theadLocal中的對象歼捐,先會調用子類中的initialValue然后如果ThreadLocalMap沒有被回收直接返回init的值并返回,如果被回收了會create新的map晨汹,實際TTL也只會new一個空的map返回
return setInitialValue();
}
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
// 通過threadLocal對象的hashCode從ThreadLocalMap獲取到緩存的對象
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
我們盡量不討論 TTL之外的代碼豹储,上述代碼是一個標準的從ThreadLocal中get對象的流程,但是TTL的get有一個addValue的操作
// 這里的邏輯是在TTL 傳遞childValue時的 map重新灌入的邏輯淘这,目前還不知道為什么這樣做剥扣,后續(xù)文章會仔細探討
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); // WeakHashMap supports null value.
}
}
如上述這些代碼,雖然使用TTL在new Thread時將主線程的引用灌入了子線程中铝穷,并處理業(yè)務對象本身還放入了一個weakHashMap以threadLocal對象為key钠怯,但是在get時候并沒有什么不同啊,我們通過測試發(fā)現了問題后debug這里也看不出什么貓膩曙聂,但是發(fā)現就是子線程一直獲取第一次傳遞過來的對象引用晦炊,實際實現的邏輯也沒有用到TTL重寫的childValue方法中構造的map,而是直接使用了InheritableThreadLocal實現的業(yè)務對象的直接引用宁脊。然后看TTL的代碼中內部類Transmitter有大量的復制断国,重放,還原的邏輯如下
@Nonnull
public static Object replay(@Nonnull Object captured) {
// 這個方法一看就是在復制替換數據榆苞,實際就是在各種線程池的工作線程執(zhí)行前的重放(替換threadLocal變量)具體邏輯后續(xù)文章探討稳衬,那么看來這個動作在debug中一直沒有執(zhí)行,所以沒有產生TTL線程私有變量的正確傳遞坐漏,我們看看是誰在調用它
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
setTtlValuesTo(capturedMap);
// call beforeExecute callback
// 執(zhí)行時機 為 目標方法執(zhí)行前
doExecuteCallback(true);
return backup;
}
看看是誰在調用這個replay方法
我們進入一個Runnable的地方TtlRunnable類薄疚,看的出來是一個裝飾器做了增強弄砍,然后對目標Runnable執(zhí)行前執(zhí)行后對threadLocal進行了重放,還原工作 如下 TtlRunnable
@Override
public void run() {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
那么繼續(xù)输涕,TtlRunnable又是誰搞的呢
原來是ExecutorServiceTtlWrapper 類,另一個先忽略一看就是和定時相關的。那么這是一個ExecutorService的裝飾器,也是做了增強崔列,目的是可以使用TtlRunnable這個增強再往下看
TtlExecutors 這個類有一堆靜態(tài)方法遮精,都是返回傳入目標對象返回其裝飾器的方法,那就是我們在構造ExecutorService線程池時可以直接使用這個類的返回裝飾器應該就可以了
@Bean(name = "taskExecutor")
public ExecutorService threadPoolTaskExecutor() {
// 返回裝飾器
return TtlExecutors.getTtlExecutorService(
new ThreadPoolExecutor(50, 300, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()));
}
經過測試沒有問題砸王,嗐,原來是自己不知道TTL還需要結合線程池的裝飾器來實現threadLocal的正確傳遞!菜是原罪啊T.T
網上很多關于TTL的實現原理的講解乃正,我們后續(xù)也會通過這次經驗來詳細了解一下TTL的實現機制和設計思想