上一篇文章我們知道了TTL利用了InheritableThreadLocal線程傳遞的特性進(jìn)行擴(kuò)展瀑志,也可以在使用線程池時(shí)線程復(fù)用的情況也可以正確的傳遞線程私有變量鞍帝,現(xiàn)在我們就學(xué)習(xí)一下其設(shè)計(jì)
首先聲明TTL重寫(xiě)了InheritableThreadLocal#childValue(T parentValue) 提供了一個(gè)以InheritableThreadLocal為基礎(chǔ)的擴(kuò)展斗遏。
InheritableThreadLocal 的線程傳遞只在當(dāng)子線程為new的時(shí)候會(huì)調(diào)用匾二,接下來(lái)分析代碼
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* Computes the child's initial value for this inheritable thread-local
* variable as a function of the parent's value at the time the child
* thread is created. This method is called from within the parent
* thread before the child is started.
* <p>
* This method merely returns its input argument, and should be overridden
* if a different behavior is desired.
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
// 這是ThreadLocal的執(zhí)行邏輯闷供,相當(dāng)于一個(gè)模板方法,由子類實(shí)現(xiàn)夺荒,ThreadLocal不支持傳遞給子線程
protected T childValue(T parentValue) {
return parentValue;
}
/**
* Get the map associated with a ThreadLocal.
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
// 顧名思義瞒渠,只有在線程new出來(lái)的時(shí)刻會(huì)調(diào)用當(dāng)前方法,然后調(diào)用childValue
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
然后看看 TTL的重寫(xiě)邏輯
// Note about holder:
// 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation),
// but it is used as *set*.
// 2. WeakHashMap support null value.
// 這是TTL的核心設(shè)計(jì)技扼,組裝為一個(gè) 以TTL對(duì)象為key的map返回,同同時(shí)這個(gè)map對(duì)象還是TTL對(duì)象的一個(gè)內(nèi)部靜態(tài)對(duì)象伍玖,一直跟隨客戶端使用的TTL對(duì)象。
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
// 只有子線程 為new時(shí)調(diào)用
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
// 只有子線程 為new時(shí)調(diào)用,雖然做了拓展剿吻,通過(guò)一個(gè)跟隨客戶端使用的TTL對(duì)象內(nèi)部構(gòu)造了這個(gè)holder中轉(zhuǎn)站窍箍,但是還是使用的引用傳遞,如果主子線程一邊直接修改了引用的對(duì)象丽旅,另一邊也會(huì)感知到椰棘。并且存在并發(fā)修改問(wèn)題。因?yàn)槭窃鰪?qiáng)InheritableThreadLocal魔招,并沒(méi)有修改這里的引用傳遞邏輯晰搀。實(shí)際其它擴(kuò)展有傳遞為不可變對(duì)象的邏輯
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
TTL也是使用的引用邏輯實(shí)際也有一些拓展是不可變對(duì)象的邏輯五辽,例如
我們看一下CopyOnWriteSortedArrayThreadContextMap中的代碼
private ThreadLocal<StringMap> createThreadLocalMap() {
return (ThreadLocal)(inheritableMap ? new InheritableThreadLocal<StringMap>() {
protected StringMap childValue(StringMap parentValue) {
if (parentValue == null) {
return null;
} else {
// 主要看看這個(gè)接口
StringMap stringMap = CopyOnWriteSortedArrayThreadContextMap.this.createStringMap(parentValue);
stringMap.freeze();
return stringMap;
}
}
} : new ThreadLocal());
}
// 看名字就知道了办斑,是一個(gè)不可變對(duì)象,也就是不同于InheritableThreadLocal和TTL傳遞的對(duì)象引用,這里做了復(fù)制后變?yōu)椴豢勺儗?duì)象的邏輯乡翅,日后小伙伴們也可以借助TTL實(shí)現(xiàn)自己不可變對(duì)象的邏輯
public interface StringMap extends ReadOnlyStringMap {
void clear();
boolean equals(Object var1);
void freeze();
int hashCode();
boolean isFrozen();
void putAll(ReadOnlyStringMap var1);
void putValue(String var1, Object var2);
void remove(String var1);
}
接下來(lái)看裝飾器
裝飾器的引入鳞疲,實(shí)際是對(duì)ExecutorService的執(zhí)行Runnable,Callable等真正執(zhí)行邏輯的攔截蠕蚜,做前尚洽,后的邏輯,而裝飾器在不改變?cè)袑?duì)象的邏輯包裹一層后靶累,可以做到增強(qiáng)的目的腺毫,其實(shí)這個(gè)裝飾器本身也是 Runnable,Callable的一個(gè)代理。
看看使用的接入
@Override
public Executor getAsyncExecutor() {
// 這里原本是設(shè)置一個(gè) @Async的默認(rèn)線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(512), FACTORY);
executor.setRejectedExecutionHandler(new CustomRejectedHandler());
// 最后我們裝入了TTL的裝飾器返回
return TtlExecutors.getTtlExecutorService(executor);
}
ExecutorServiceTtlWrapper作為ExecutorService的裝飾器目的就是為了再進(jìn)行真正執(zhí)行的目標(biāo)接口再封裝一層裝飾器挣柬。
如上圖各種目標(biāo)接口的裝飾器潮酒,我們就看看 TtlCallable這個(gè)裝飾器作為@Async線程池執(zhí)行單元的增強(qiáng)
public final class TtlCallable<V> implements Callable<V>, TtlEnhanced {
//用于threadLocal中轉(zhuǎn)的對(duì)象,通過(guò)Transmitter#capture()在裝飾器初始化時(shí)就創(chuàng)建好邪蛔,實(shí)際就是獲取當(dāng)前主線程的threadLocal
private final AtomicReference<Object> capturedRef;
// 被裝飾的目標(biāo)對(duì)象接口
private final Callable<V> callable;
// 是否釋放TTL對(duì)象傳遞過(guò)來(lái)的業(yè)務(wù)對(duì)象引用急黎,從代碼看這里只決定了當(dāng)前TtlCallable對(duì)象的引用是否釋放,TtlCallable對(duì)象本身有一定生命周期侧到,再者如果復(fù)用主線程傳遞過(guò)來(lái)的TTL對(duì)象引用也一直存在于主線程勃教,目前都是false,子線程引用也會(huì)一直隨著主線程傳遞而更新
private final boolean releaseTtlValueReferenceAfterCall;
private TtlCallable(@Nonnull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
// 在new這個(gè)對(duì)象時(shí)是在主線程匠抗,所以capture()方法拿到的是主線程的TTL對(duì)象最新的引用故源,包括業(yè)務(wù)對(duì)象也是最新的
this.capturedRef = new AtomicReference<Object>(capture());
this.callable = callable;
this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}
/**
* wrap method {@link Callable#call()}.
*/
@Override
public V call() throws Exception {
// 獲取主線程的 TTL對(duì)象map,就是通過(guò)Transmitter#capture()方法從 TTL對(duì)象中上面所說(shuō)的TTL對(duì)象中的內(nèi)部holder中轉(zhuǎn)map獲取到主線程的所有TTL及業(yè)務(wù)對(duì)象引用
Object captured = capturedRef.get();
// 如果為空 或者 需要清理TTL對(duì)象引用汞贸,則進(jìn)行一次原子操作對(duì)TTL對(duì)象引用置為空
if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after call!");
}
// 重放 captured為當(dāng)前裝飾器初始化時(shí)從主線程拿到的心软,這里對(duì)其進(jìn)行重放替換
// 并返回當(dāng)前子線程的 TTL對(duì)象作為還原
Object backup = replay(captured);
try {
//被增項(xiàng)的目標(biāo)方法執(zhí)行
return callable.call();
} finally {
// 再將當(dāng)前子線程還原
restore(backup);
}
}
// ----------------省略大部分代碼--------------
}
Transmitter#capture()方法
@Nonnull
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
// 復(fù)制的核心 是從 holder中轉(zhuǎn)對(duì)象中獲取每個(gè)key的threadLocal中的業(yè)務(wù)對(duì)象引用
// 然后再用其TTL對(duì)象作為key 組裝一個(gè) TTL對(duì)象 -> 業(yè)務(wù)對(duì)象的map返回
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
//-----------copyValue 方法----------
private T copyValue() {
// 復(fù)制就是從當(dāng)前主線程 的threadLocal get
return copy(get());
}
//------------copy 方法------
protected T copy(T parentValue) {
// 復(fù)制的是對(duì)象的引用
return parentValue;
}
下面看Transmitter#replay(@Nonnull Object captured) 重放邏輯
@Nonnull
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
// 主線程傳遞過(guò)來(lái)的引用
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
// 當(dāng)前子線程的TTL引用用于返回后 還原
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
//清除掉可能失效和舊的子線程的TTL對(duì)象引用,為什么這么做著蛙,目前不太清楚
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
// 我們上一篇文章以及當(dāng)前文章上面提到删铃,在thread new的時(shí)候調(diào)用initialValue和childValue 方法時(shí),會(huì)將主線程的TTL對(duì)象引用傳遞給子線程踏堡,但是不同裝飾器增強(qiáng)時(shí)猎唁,子線程里的TTL對(duì)象中的業(yè)務(wù)對(duì)象引用是一直不變的,一直是第一次傳遞過(guò)來(lái)的業(yè)務(wù)對(duì)象的值顷蟆,而主線程的業(yè)務(wù)對(duì)象變更子線程感知不到诫隅,但是TTL對(duì)象也一直是一個(gè)引用這里將其舊的TTL引用
// 放入主線程新得 TTL中的業(yè)務(wù)對(duì)象引用,實(shí)際因?yàn)樽泳€程的TTL對(duì)象引用和主線程的TTL對(duì)象是一樣的,只不過(guò)主線程更新了業(yè)務(wù)對(duì)象引用子線程感知不到帐偎,因?yàn)閖ava內(nèi)存模型的原因逐纬,所以這里直接重新操作一次 子線程的TTL對(duì)象更新 *業(yè)務(wù)對(duì)象引用* 重復(fù)了一次主線程的操作
setTtlValuesTo(capturedMap);
// call beforeExecute callback
// 這里其實(shí)是一個(gè)模板方法,包括目標(biāo)對(duì)象執(zhí)行前也就是重放削樊,及目標(biāo)對(duì)象執(zhí)行后豁生,還原的實(shí)際的一個(gè)鉤子
doExecuteCallback(true);
return backup;
}
我們來(lái)看看 setTtlValuesTo(capturedMap); 實(shí)際就是重復(fù)了主線程的操作兔毒,使用相同的TTL對(duì)象引用對(duì)業(yè)務(wù)對(duì)象引用進(jìn)行更新
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
看看鉤子方法,可以用于我們擴(kuò)展TTL對(duì)象進(jìn)行鉤子回調(diào)
private static void doExecuteCallback(boolean isBefore) {
for (Map.Entry<TransmittableThreadLocal<?>, ?> entry : holder.get().entrySet()) {
TransmittableThreadLocal<?> threadLocal = entry.getKey();
try {
// 兩個(gè)模板方法鉤子
if (isBefore) threadLocal.beforeExecute();
else threadLocal.afterExecute();
} catch (Throwable t) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
}
}
}
}
目前TTL對(duì)象中是空實(shí)現(xiàn)甸箱。如果繼承擴(kuò)展TTL對(duì)象可能用到噢
然后是 還原方法Transmitter#restore(@Nonnull Object backup)邏輯和上面的replay方法基本相同不過(guò)邏輯是反過(guò)來(lái)的小伙伴可以自行看代碼
總結(jié)
- 利用裝飾器對(duì)ExectorService包裝后一步步的繼續(xù)利用裝飾器一直裝飾到要執(zhí)行的目標(biāo)對(duì)象接口例如Runnable育叁,Callable等對(duì)初始化,執(zhí)行前芍殖,執(zhí)行后三個(gè)時(shí)機(jī)進(jìn)行增強(qiáng)
- 重寫(xiě)了InheritableThreadLocal#childValue 方法來(lái)傳遞 TTL定義的一個(gè)中轉(zhuǎn)map對(duì)象 key為 TTL對(duì)象
- 利用了主子線程傳遞 TTL對(duì)象的引用一致豪嗽,同時(shí)用以TTL對(duì)象為key的map進(jìn)行重放,直接對(duì)主線程傳遞過(guò)來(lái)的TTL對(duì)象業(yè)務(wù)對(duì)象引用進(jìn)行更新豌骏,因?yàn)樽訉?duì)象的引用相同相當(dāng)于對(duì)子線程的TTL的業(yè)務(wù)對(duì)象引用更新龟梦。感覺(jué)用其它集合也可以,但是看代碼map可以在重放的同時(shí)更方便的清理子線程的多余的TTL對(duì)象窃躲,保證主子線程的TTL對(duì)應(yīng)一致性变秦。
- 提供了 一些模板方法提高了擴(kuò)展性 例如beforeExecute ,afterExecute
- 提供了屏蔽ForkJoin工作線程屏蔽InheritableThreadLocal的傳遞框舔,幫助開(kāi)發(fā)期間及時(shí)發(fā)現(xiàn)threadLocal的問(wèn)題
其它問(wèn)題蹦玫,java8提供的parallelStream 并行流和CompletableFuture 都是使用ForkJoin框架實(shí)現(xiàn),使用TTL還是會(huì)有問(wèn)題
在TTL源碼沒(méi)有看到關(guān)于forkJoin的增強(qiáng)刘绣,但是發(fā)現(xiàn)了TtlForkJoinPoolHelper類樱溉,提供了DisableInheritableForkJoinWorkerThreadFactory 的支持,為了屏蔽掉InheritableThreadLocal的傳遞防止開(kāi)發(fā)測(cè)試時(shí)theadLocal錯(cuò)誤傳遞的假象纬凤。
// ForkJoinWorkerThreadFactory 的裝飾器
public interface DisableInheritableForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
/**
* Unwrap {@link DisableInheritableThreadFactory} to the original/underneath one.
*/
@Nonnull
ForkJoinWorkerThreadFactory unwrap();
}
ForkJoin的邏輯大家自行查詢資料福贞,因?yàn)榇嬖诠ぷ鞲`取等邏輯理論上是無(wú)法避免的ThreadLocal錯(cuò)亂問(wèn)題。所以TTL提供了屏蔽裝飾器停士,但是forkJoin的工作線程也可能是主線程挖帘,所以使用TTL的屏蔽邏輯只能屏蔽掉ForkJoin的工作線程,無(wú)法避免ForkJoin直接使用主線程執(zhí)行任務(wù)單元時(shí)還是有正確的threadLocal對(duì)象引用恋技。但是這樣也足夠開(kāi)發(fā)測(cè)試期間及時(shí)發(fā)現(xiàn)threadLocal的問(wèn)題了拇舀。
經(jīng)過(guò)我網(wǎng)上搜索我們可以替換掉ForkJoin默認(rèn)的ForkJoinWorkerThreadFactory,增強(qiáng)線程創(chuàng)建邏輯蜻底。
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
// ForkJoin會(huì)有一個(gè)擴(kuò)展邏輯骄崩,這里如果獲取到指定的線程工廠類則不會(huì)使用默認(rèn)的。但是當(dāng)前makeCommonPool 方法在 static {} 代碼塊中執(zhí)行薄辅,經(jīng)過(guò)測(cè)試直接System.setProperty無(wú)法掌控好加載順序要拂,可能獲取不到自定義的系統(tǒng)變量,索性直接通過(guò)jvm啟動(dòng)參數(shù)指定
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
// 如果有自定義的線程工廠會(huì)初始化
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
但是看到TTL包內(nèi)的DisableInheritableForkJoinWorkerThreadFactoryWrapper 線程工廠裝飾器并沒(méi)有構(gòu)造方法站楚,并且不是public不能繼承脱惰,也就是直接指定這個(gè)類不能被正常加載后newInstance(),又不能繼承窿春,可能只是一個(gè)示例拉一?那么我自定義一個(gè)類復(fù)制它的邏輯
class DisableInheritableForkJoinWorkerThreadFactoryWrapper implements DisableInheritableForkJoinWorkerThreadFactory {
final ForkJoinWorkerThreadFactory threadFactory;
public DisableInheritableForkJoinWorkerThreadFactoryWrapper(@Nonnull ForkJoinWorkerThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 看到這里在new thread時(shí)進(jìn)行了 TTL對(duì)象的清理
//這個(gè)執(zhí)行時(shí)機(jī)其實(shí)還是在主線程中采盒,如果正常不執(zhí)行這個(gè)代碼子線程會(huì)拿到一個(gè)舊的主線程的TTL對(duì)象引用,但是這里清除了舅踪,就不會(huì)拿到了纽甘,方便開(kāi)發(fā)測(cè)試階段發(fā)現(xiàn)問(wèn)題
final Object backup = TransmittableThreadLocal.Transmitter.clear();
try {
return threadFactory.newThread(pool);
} finally {
// 執(zhí)行完后進(jìn)行還原
TransmittableThreadLocal.Transmitter.restore(backup);
}
}
@Nonnull
@Override
public ForkJoinWorkerThreadFactory unwrap() {
return threadFactory;
}
}
我們自定義仿照上述類,直接復(fù)制的良蛮,區(qū)別是提供了構(gòu)造方法抽碌,可以讓ForkJoinPool#makeCommonPool方法可以加載擴(kuò)展工廠,并且直接指定被增強(qiáng)的默認(rèn)ForkJoinWorkerThreadFactory
public class CustomForkJoinThreadFactory implements DisableInheritableForkJoinWorkerThreadFactory {
// 被增強(qiáng)的默認(rèn)的線程工廠
final ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
// 有無(wú)參構(gòu)造才可以 加載成功噢
public CustomForkJoinThreadFactory() {
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final Object backup = TransmittableThreadLocal.Transmitter.clear();
try {
return threadFactory.newThread(pool);
} finally {
TransmittableThreadLocal.Transmitter.restore(backup);
}
}
@Nonnull
@Override
public ForkJoinWorkerThreadFactory unwrap() {
return threadFactory;
}
}
jvm啟動(dòng)參數(shù)(如果有辦法在ForkJoinPool的static加載前System.setProperty也可以)
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=xxx.xxx.xxx.CustomForkJoinThreadFactory