前言
ThreadLocal解決了在多個(gè)線程針對(duì)一個(gè)變量維護(hù)不同值的功能,如果你想在同一個(gè)線程內(nèi)傳遞一些值,那么就可以用到這個(gè)類普办,它的好處是無(wú)侵入性,這樣我們就不需要再每個(gè)方法內(nèi)透?jìng)鬟@個(gè)參數(shù)徘钥,比如Dubbo的RpcContext衔蹲。另外我們也可以利用這個(gè)類來(lái)解決在多線程情況下使用線程不安全的類的問題,比如SimpleDateFormat呈础。ThreadLocal的子類InheritableThreadLocal在ThreadLocal的基礎(chǔ)上舆驶,解決了和線程相關(guān)的副本從父線程向子線程傳遞的問題。如果不使用InheritableThreadLocal而钞,這個(gè)變量在父線程和子線程是兩個(gè)副本沙廉。
但是還有另外一種特殊情況,就是我們比較常用的線程池臼节,線程池中的線程會(huì)被復(fù)用撬陵,線程在創(chuàng)建的時(shí)候會(huì)把父線程當(dāng)前的inheritableThreadLocals拷貝過去(如果存在,淺拷貝)网缝,之后我們?cè)僭诖a中設(shè)置了InternalThreadLocal后巨税,在線程池中的線程就再也獲取不到這個(gè)新的InheritableThreadLocal了。影響最大的問題就是粉臊,我們調(diào)用鏈跟蹤系統(tǒng)的traceid等信息草添,會(huì)在線程池中的線程丟失,我們也會(huì)丟失一部分調(diào)用信息扼仲。阿里開源的transmittable-thread-local框架就正是解決這個(gè)問題远寸。
我們先來(lái)看下InheritableThreadLocal是怎么實(shí)現(xiàn)讓子線程能訪問到父線程的InheritableThreadLocal變量,并且通過這部分源碼屠凶,也能看出來(lái)為什么線程池中的線程一旦創(chuàng)建完成之后被復(fù)用時(shí)為什么會(huì)丟失InheritableThreadLocal驰后。
首先我們?cè)赥hread類的構(gòu)造函數(shù)能發(fā)現(xiàn)下面這段代碼
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
意思就是說(shuō)父線程的inheritableThreadLocals存在時(shí),子線程的inheritableThreadLocals會(huì)淺拷貝父線程的inheritableThreadLocals
然后我們看InheritableThreadLocal的重載方法
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
getMap中的返回從threadLocals變?yōu)榱薸nheritableThreadLocals阅畴。
因?yàn)榫€程的復(fù)用倡怎,所以這個(gè)inheritableThreadLocals只能維持在這個(gè)線程創(chuàng)建時(shí)候的狀態(tài)。
下面是測(cè)試這個(gè)問題的測(cè)試用例
@Data
@AllArgsConstructor
static class Pet {
private String name;
}
@Test
public void testThreadLocalInPool() throws InterruptedException {
final ThreadLocal<Pet> tl1 = new InheritableThreadLocal<>();
final ThreadLocal<Pet> tl2 = new InheritableThreadLocal<>();
Pet pet = new Pet("xiaomao");
ExecutorService executorService = Executors.newFixedThreadPool(2);
tl1.set(pet);
for(int i =0 ;i<2;i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + tl1.get());
});
}
Thread.sleep(2000L);
//inheritableThreadLocal是淺拷貝
pet.setName("xiaogou");
for(int i =0 ;i<2;i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " " + tl1.get());
});
}
//線程池中線程一旦創(chuàng)建完成贱枣,InheritableThreadLocal就再也傳不進(jìn)去
pet.setName("xiaoji");
tl2.set(pet);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+tl2.get());
}
});
}
原理
我們先貼一個(gè)不使用ttl框架應(yīng)該怎么解決線程池傳遞threadlocal變量的解決方案监署。
private static ThreadLocal<Map<String,String>> holder = new InheritableThreadLocal<Map<String,String>>(){
@Override
protected Map<String,String> initialValue() {
return new HashMap<>();
}
};
@Data
public static class WrapedRunnable implements Runnable{
private Map<String,String> attachment;
private Runnable runnable;
public WrapedRunnable(Runnable runnable) {
this.runnable=runnable;
this.attachment = new HashMap<>(holder.get());
}
@Override
public void run() {
holder.set(this.attachment);
runnable.run();
}
}
@Test
public void testMandatoryTTL(){
Executor executor = Executors.newFixedThreadPool(1);
executor.execute(()->{
System.out.println("init");
});
HashMap<String,String> attachment = new HashMap<>();
attachment.put("123","456");
holder.set(attachment);
//普通方式
executor.execute(()->{
System.out.println(holder.get().containsKey("123"));
});
//處理后的方式
executor.execute(new WrapedRunnable(()->{
System.out.println(holder.get().containsKey("123"));
}));
}
上面的這種方式和ttl的設(shè)計(jì)思想差不多,但是ttl肯定更加優(yōu)雅纽哥,通用性更高钠乏。
下面這張圖是ttl核心設(shè)計(jì)邏輯的時(shí)序圖。我分析過源碼后春塌,大家就能很容易看懂它的設(shè)計(jì)思想了晓避。
源碼講解
后面出現(xiàn)的 tl=ThreadLocal itl=InheritableThreadLocal ttl=TransmittableThreadLocal
就如上面我自己寫的那個(gè)傳遞方式一樣簇捍,ttl也會(huì)把需要傳遞的threadlocal緩存起來(lái),然后在包裝類的run方法內(nèi)重放俏拱,設(shè)置到子線程暑塑。這個(gè)緩存的邏輯封裝在TransmittableThreadLocal類中。
TransmittableThreadLocal
TransmittableThreadLocal繼承了InheritableThreadLocal锅必,重載了get和set方法
@Override
public final T get() {
T value = super.get();
if (null != value) addValue();
return value;
}
@Override
public final void set(T value) {
super.set(value);
// may set null to remove value
if (null == value) removeValue();
else addValue();
}
可以看到在調(diào)用父類的邏輯上事格,新增了addValue和removeValue的邏輯,這個(gè)就是緩存的邏輯
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); // WeakHashMap supports null value.
}
}
private void removeValue() {
holder.get().remove(this);
}
會(huì)把當(dāng)前這個(gè)threadlocal緩存到holder上面搞隐。
下面介紹下這個(gè)很關(guān)鍵的holder
holder
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
首先這個(gè)holder本身是InheritableThreadLocal類型的驹愚,所以它也是和線程相關(guān)聯(lián)的×痈伲可以在父子線程間傳遞逢捺,但是對(duì)于線程池內(nèi)已經(jīng)創(chuàng)建的線程肯定是傳遞不進(jìn)去的。所以在初始化wrapper類的時(shí)候癞季,那個(gè)時(shí)候還是父線程劫瞳,在wrapper類構(gòu)造的時(shí)候,要把這些threadlocal捕獲出來(lái)余佛,這個(gè)捕獲相關(guān)邏輯見下一個(gè)Transmitter的分析柠新。其次這個(gè)holder內(nèi)保存的是一個(gè)WeakHashMap<TransmittableThreadLocal<?>, Object>,所以這個(gè)WeakHashMap的key是在沒被強(qiáng)引用的情況下可以被回收的辉巡。另外需要注意的是恨憎,這個(gè)WeakHashMap設(shè)計(jì)者是為了利用到它的key可以被回收的特性,就是當(dāng)做set在使用郊楣。
Transmitter
Transmitter內(nèi)有3個(gè)核心方法
方法 | 作用 |
---|---|
capture | 捕獲父線程的ttl |
replay | 重放父線程ttl |
restore | 恢復(fù)之前子線程的ttl |
capture用于捕獲父線程的ttl憔恳,捕獲操作要在父線程執(zhí)行
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
replay傳入capture方法捕獲的ttl,然后在子線程重放净蚤,也就是調(diào)用ttl的set方法钥组,會(huì)設(shè)置到當(dāng)前的線程中去,最后會(huì)把子線程之前存在的ttl返回
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
//清除之前上下文今瀑,不在capturedMap中程梦,都清除
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
//這邊是在子線程設(shè)置ttl的邏輯
setTtlValuesTo(capturedMap);
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
setTtlValuesTo用于在子線程設(shè)置ttl,邏輯如下
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
//這邊是設(shè)置到當(dāng)前線程
threadLocal.set(entry.getValue());
}
}
其實(shí)就是調(diào)用ttl的set方法橘荠,看過ThreadLocal源碼的你應(yīng)該懂屿附。
最后就是執(zhí)行結(jié)束,restore之前的上下文哥童,用到replay返回的back挺份。
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
// 清除之前的上下文,不在backupMap中的都清除了
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
// 恢復(fù)到運(yùn)行之前的狀態(tài)
setTtlValuesTo(backupMap);
}
要把capture贮懈,repaly和restore的邏輯串起來(lái)匀泊,那么就需要看下面的TtlRunnable類优训,這個(gè)就是我一直說(shuō)的包裝類。
TtlRunnable
我們先看TtlRunnable的構(gòu)造函數(shù)
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//捕獲父線程ttl
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
在構(gòu)造函數(shù)各聘,也就是父線程揣非,會(huì)通過capture捕獲父線程的ttl,然后保存在capturedRef中伦吠。
在run方法中妆兑,replay魂拦,restore邏輯一目了然毛仪,不多解釋。
public void run() {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
所以我們?cè)陧?xiàng)目中想用到ttl的時(shí)候芯勘,可以這么使用
@Data
@AllArgsConstructor
static class Pet {
private String name;
}
@Test
public void compareTLAndTTL() throws InterruptedException {
Executor executor = Executors.newFixedThreadPool(1);
executor.execute(()->{
System.out.println("init");
});
ThreadLocal<Pet> tl = new ThreadLocal<>();
tl.set(new Pet("xiaogou"));
executor.execute(()->{
//這邊根本拿不到父線程的tl
System.out.println(tl.get());
});
TransmittableThreadLocal<Pet> ttl = new TransmittableThreadLocal<>();
ttl.set(new Pet("xiaomao"));
executor.execute(TtlRunnable.get(()->{
System.out.println(ttl.get());
//證明ttl是淺拷貝
ttl.get().setName("xiaogou");
}));
Thread.sleep(1000L);
System.out.println(ttl.get());
}
輸出如下
init
null
SPITest.Pet(name=xiaomao)
SPITest.Pet(name=xiaogou)
但是這樣使用起來(lái)也太麻煩了箱靴,我們需要修改我們的使用方式,有沒有無(wú)侵入的使用方式荷愕?我們可以把上面包裝Runnable的邏輯封裝到線程池中去衡怀。因此用到了ExecutorTtlWrapper。
ExecutorTtlWrapper
class ExecutorTtlWrapper implements Executor, TtlEnhanced {
private final Executor executor;
ExecutorTtlWrapper(@Nonnull Executor executor) {
this.executor = executor;
}
@Override
public void execute(@Nonnull Runnable command) {
executor.execute(TtlRunnable.get(command));
}
@Nonnull
public Executor unwrap() {
return executor;
}
}
代碼很簡(jiǎn)單安疗,不多解釋了抛杨。
我們可以通過TtlExecutors這個(gè)工具類來(lái)快捷獲取這些包裝TtlRunbale邏輯的線程池。但是這樣還是比較麻煩的荐类,因此用到下面這個(gè)TtlAgent類怖现,它利用了jvm的Instrument機(jī)制,可以在編譯的時(shí)候修改字節(jié)碼玉罐,在jdk的線程池源碼中加入TtlRunnable封裝的邏輯屈嗤。
TtlAgent
instrument的原理以及如何配置不是本文重點(diǎn),大家知道它干了什么就好了吊输,可以在參考貼的鏈接學(xué)習(xí)饶号,這個(gè)技術(shù)在很多中間件用到
public static void premain(String agentArgs, @Nonnull Instrumentation inst) {
//解析key-value配置
kvs = splitCommaColonStringToKV(agentArgs);
//根據(jù)kv配置 設(shè)置日志打印方式
Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
final Logger logger = Logger.getLogger(TtlAgent.class);
try {
logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
//獲取kv中關(guān)于是否禁止向子線程傳遞ttl的配置
final boolean disableInheritable = isDisableInheritableForThreadPool();
final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
//修改java.util.concurrent.ThreadPoolExecutor,java.util.concurrent.ScheduledThreadPoolExecutor的代碼
transformletList.add(new TtlExecutorTransformlet(disableInheritable));
//修改另外一個(gè)線程池
transformletList.add(new TtlForkJoinTransformlet(disableInheritable));
//根據(jù)配置決定是否修改TimeTask源碼季蚂,阿里規(guī)范不建議使用這個(gè)類做定時(shí)任務(wù)
if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
//把我們的轉(zhuǎn)換器設(shè)置到inst中去
final ClassFileTransformer transformer = new TtlTransformer(transformletList);
inst.addTransformer(transformer, true);
logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
logger.info("[TtlAgent.premain] end");
ttlAgentLoaded = true;
} catch (Exception e) {
String msg = "Fail to load TtlAgent , cause: " + e.toString();
logger.log(Level.SEVERE, msg, e);
throw new IllegalStateException(msg, e);
}
}
premain用于我們向jvm注冊(cè)我們的轉(zhuǎn)換器茫船,根據(jù)轉(zhuǎn)換器內(nèi)的邏輯,我們可以修改對(duì)應(yīng)的class文件源碼扭屁。
我們直接來(lái)看下TtlExecutorTransformlet中是怎么修改源碼的算谈,核心代碼如下
PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.lang.Runnable", "com.alibaba.ttl.TtlRunnable");
PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable");
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
final String paramTypeName = parameterTypes[i].getName();
if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
String code = String.format("$%d = %s.get($%d, false, true);", i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
}
大概意思就是,在ThreadPoolExecutor類中疯搅,如果方法參數(shù)含有Runnable或者Callable濒生,會(huì)在方法體第一行,加上一段代碼
runnable = com.alibaba.ttl.TtlRunnable.get(runnable,false,true)
callable = com.alibaba.ttl.TtlCallable(runnable,false,true)
這樣就實(shí)現(xiàn)了無(wú)感知包裝Runnable的邏輯幔欧。
具體如何使用這個(gè)agent 我們需要增加如下的jvm啟動(dòng)參數(shù)
-javaagent:/path/to/transmittable-thread-local-2.x.x.jar=ttl.agent.logger:STDOUT,ttl.agent.disable.inheritable.for.thread.pool:true
等于號(hào)后面的是額外配置參數(shù)罪治,具體如何配置可以看TtlAgent類的注釋
最后
通過agent相當(dāng)于無(wú)侵入引入了ttl丽声,但是ttl的創(chuàng)建這一步還是需要我們手動(dòng)的,不可能去改寫tl或者itl的字節(jié)碼觉义,tl雁社,itl,ttl三者在jvm內(nèi)共存
ttl框架主要還是用于中間件晒骇,但是我們還是需要了解的霉撵,學(xué)習(xí)一個(gè)知識(shí)點(diǎn)需要深入,萬(wàn)一以后遇到這種坑了呢洪囤。
參考
ThreadLocal原理及內(nèi)存泄露預(yù)防
transmittable-thread-local github
TransmittableThreadLocal詳解
agent官方文檔