1滚躯、介紹
TransmittableThreadLocal(TTL) 是 Alibaba 開源的缨恒,用于解決在使用線程池等會池化復(fù)用線程的組件情況下臣嚣,提供 ThreadLocal 值的傳遞功能颅停,解決異步執(zhí)行時上下文傳遞的問題惫搏。TransmittableThreadLocal 需要配合 TTL 提供的 TtlExecutors具温、TtlRunnable 和 TtlCallable使用,也可以使用 Java Agent 無侵入式實現(xiàn)線程池的傳遞筐赔。
2铣猩、使用場景
- 調(diào)用鏈中的 traceId 傳遞
- 框架層封裝的業(yè)務(wù)工具在業(yè)務(wù)線程池中傳遞
- Slf4j 日志框架中 MDC 傳遞
3、TTL 執(zhí)行流程
4茴丰、源碼解析
TransmittableThreadLocal 繼承于 InheritableThreadLocal达皿,并擁有了 InheritableThreadLocal 對子線程傳遞上下文的特性,只需解決線程池上下文傳遞問題贿肩。
4.1峦椰、項目結(jié)構(gòu)
4.2、核心部分源碼
在 TransmittableThreadLocal 中汰规,定義了一個全局靜態(tài)變量 holder们何,用于存儲使用 TransmittableThreadLocal set 的上下文。
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
- initialValue 方法會在 InheritableThreadLocal 創(chuàng)建時被調(diào)用控轿,默認(rèn)創(chuàng)建一個 WeakHashMap冤竹。
- childValue 方法會在創(chuàng)建子線程時,Thread 調(diào)用 init 方法茬射,會調(diào)用 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals)鹦蠕,createInheritedMap 中會創(chuàng)建 ThreadLocalMap,ThreadLocalMap 的構(gòu)造方法中會調(diào)用 childValue 方法在抛。
public class Thread implements Runnable {
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
Thread parent = currentThread();
// TODO 忽略其他源碼
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
// TODO 忽略其他源碼
}
}
public class ThreadLocal<T> {
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
// TODO 忽略部分代碼
for (int j = 0; j < len; j++) {
if (e != null) {
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
// TODO 忽略部分代碼
}
}
}
}
}
可能有人有疑問钟病,為什么使用 WeakHashMap。關(guān)于 WeakHashMap 不了解的,大家可以自行查詢一下肠阱。這里只是闡述一下票唆,為什么 TTL 會使用 WeakHashMap。
- 使用 WeakHashMap 是 “繼承” ThreadLocalMap.Entry 的“優(yōu)良傳統(tǒng)”屹徘,在沒有其它強(qiáng)引用的情況下走趋,下一次GC 時才會被垃圾回收,避免內(nèi)存泄露噪伊。
- 程序中可能會使用到多個 ThreadLocal簿煌,所以需要使用 Map 作為容器儲存,使用 Map 還能快速 remove 當(dāng)前 ThreadLocal鉴吹。
在使用線程池時姨伟,需要使用 TTL 提供的 TtlExecutors 包裝,如:
TtlExecutors.getTtlExecutor(Executors.newCachedThreadPool());
讓我們繼續(xù)跟進(jìn) TtlExecutors.getTtlExecutor 方法中豆励,探究下這個方法里面究竟做了什么夺荒?
public final class TtlExecutors {
public static Executor getTtlExecutor(Executor executor) {
if (null == executor || executor instanceof ExecutorTtlWrapper) {
return executor;
}
return new ExecutorTtlWrapper(executor);
}
}
- 使用 ExecutorTtlWrapper 包裝 Executor。
使用 ExecutorTtlWrapper 包裝有什么用呢良蒸?那么就繼續(xù)看看 ExecutorTtlWrapper 里面的實現(xiàn):
class ExecutorTtlWrapper implements Executor {
private final Executor executor;
ExecutorTtlWrapper(Executor executor) {
this.executor = executor;
}
public void execute(Runnable command) {
executor.execute(TtlRunnable.get(command));
}
// TODO 忽略部分代碼
}
- 重點是在執(zhí)行 execute 方法的時候技扼,使用 TtlRunnable 做了線程上下文的處理,再執(zhí)行真正的 Runnable run 方法诚啃。
現(xiàn)在重點介紹一下 TtlRunnable 里面做了什么處理:
public final class TtlRunnable implements Runnable {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(TransmittableThreadLocal.Transmitter.capture());
// TODO 忽略部分代碼
}
public void run() {
Object captured = capturedRef.get();
// TODO 忽略部分代碼
Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
try {
runnable.run();
} finally {
TransmittableThreadLocal.Transmitter.restore(backup);
}
}
public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
// TODO 忽略部分代碼
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
}
public static class Transmitter {
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
public static Object replay(Object captured) {
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
setTtlValuesTo(capturedMap);
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
public static void restore(Object backup) {
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
setTtlValuesTo(backupMap);
}
private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
}
- TtlRunnable 是實現(xiàn)于 Runnable,所以線程池執(zhí)行的是 TtlRunnable私沮,但是在 TtlRunnable run 方法中會執(zhí)行 Runnable run 方法始赎。
- 線程池執(zhí)行時,執(zhí)行了 ExecutorTtlWrapper execute 方法仔燕,execute 方法中調(diào)用了 TtlRunnable.get(command) 造垛,get 方法中創(chuàng)建了一個 TtlRunnable 對象返回了。
- TtlRunnable 構(gòu)造方法中晰搀,調(diào)用了 TransmittableThreadLocal.Transmitter.capture() 獲取當(dāng)前線程中所有的上下文五辽,并儲存在 AtomicReference 中。
- 當(dāng)線程執(zhí)行時外恕,調(diào)用 TtlRunnable run 方法杆逗,TtlRunnable 會從 AtomicReference 中獲取出調(diào)用線程中所有的上下文,并把上下文給 TransmittableThreadLocal.Transmitter.replay 方法把上下文復(fù)制到當(dāng)前線程鳞疲。并把上下文備份罪郊。
- 當(dāng)線程執(zhí)行完,調(diào)用 TransmittableThreadLocal.Transmitter.restore 并把備份的上下文傳入尚洽,恢復(fù)備份的上下文悔橄,把后面新增的上下文刪除,并重新把上下文復(fù)制到當(dāng)前線程。
PS:
- Log4j2 MDC 集成 TTL
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>log4j2-ttl-thread-context-map</artifactId>
<version>1.2.0</version>
</dependency>
- Logback MDC 集成 TTL
<dependency>
<groupId>com.ofpay</groupId>
<artifactId>logback-mdc-ttl</artifactId>
<version>1.0.2</version>
</dependency>
- ThreadLocal 使用:http://www.reibang.com/p/4093add7f2cd
- TTL GitHub:https://github.com/alibaba/transmittable-thread-local