Netty中的FastThreadLocal
版本:4.1.23
大家都應(yīng)該接觸過Jdk的ThreadLocal,它使用每個Thread中的ThreadLocalMap存儲ThreadLocal暴心,ThreadLocalMap內(nèi)部使用ThreadLocalMap.Entry 數(shù)組存儲每一個ThreadLocal踩晶,存儲計(jì)算和HashMap類似檬贰,要計(jì)算key的索引位置=key.threadLocalHashCode&(len-1),中間還需要計(jì)算沖突,使用的是線程探測方法(當(dāng)前索引在被占用下,使用下一個索引)锋玲。達(dá)到一定條件后,還需擴(kuò)充數(shù)組長度涵叮,rehash惭蹂,可為效率不是太高。另外割粮,Jdk的ThreadLocal盾碗,還需要使用者注意內(nèi)存泄漏問題。作為高性能框架的Netty為了解決上面的兩個問題重構(gòu)了TheadLocal舀瓢,產(chǎn)生了FastThreadLocal廷雅。下面講解如何具體解決剛才說的問題的。
1京髓、與TheadLocal內(nèi)部使用類對比
不同對象 | Jdk | Netty | 備注 |
---|---|---|---|
線程 | Thead | FastThreadLocalThread:繼成JDK的Thread | netty使用自己的DefaultThreadFactory |
map | ThreadLocalMap | InternalThreadLocalMap | map |
map內(nèi)部數(shù)組 | ThreadLocalMap.entry | UnpaddedInternalThreadLocalMap.indexedVariables | 存儲theadLocal |
Runnable | Runnable | FastThreadLocalRunnable | 為了防止內(nèi)存泄漏航缀,netty的Runnable包裝了Runable |
ThreadLocal | ThreadLocal | FastThreadLocalMap |
Thead與FastThreadLocalThread
//繼成了Thread,使用InternalThreadLocalMap替代了Thread中的TheadLocal
public class FastThreadLocalThread extends Thread {
// This will be set to true if we have a chance to wrap the Runnable.
private final boolean cleanupFastThreadLocals;
private InternalThreadLocalMap threadLocalMap;
//....省略
}
DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
//....省略
@Override
public Thread newThread(Runnable r) {
//使用 FastThreadLocalRunnable
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
//使用FastThreadLocal
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
FastThreadLocalRunnable
//繼成Runnable
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
@Override
public void run() {
try {
runnable.run();
} finally {
//線程執(zhí)行完成堰怨。刪除theadLocal芥玉,防止內(nèi)存泄漏
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
UnpaddedInternalThreadLocalMap
class UnpaddedInternalThreadLocalMap {
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); //對沒有使用netty的FastThreadLocalThread的使用底層統(tǒng)一使用netty的InternalThreadLocalMap封裝V,但使用JDk的ThreadLocal來存儲
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
Object[] indexedVariables; //底層存儲threadLocal的V的數(shù)組
// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;
// ArrayList-related thread-locals
ArrayList<Object> arrayList;
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
}
2备图、FastThreadLocal源代碼
FastThreadLocal中的三個index
//記錄remove index
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); //這里是所有的FastThreadLocal實(shí)例使用的刪除索引
private final int index; //v 索引
private final int cleanerFlagIndex; //是否放入清除線程隊(duì)列標(biāo)記灿巧,后面補(bǔ)充
//在構(gòu)造器內(nèi)初始化
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();
}
//InternalThreadLocalMap 自增
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement(); //AtomicInteger,
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
set()
設(shè)置v過程是最難得部分皇型,包括創(chuàng)建InternalThreadLocalMap,放入remove Set砸烦,非FastThreadLocalThread的線程還需要放入待清楚任務(wù)隊(duì)列
/**
* Set the value for the current thread.
*/
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) { //判斷是否是要刪除threadLocal弃鸦,InternalThreadLocalMap.UNSET 是Netty內(nèi)部使用的一個Object,底層數(shù)組使用這個默認(rèn)初始化數(shù)據(jù)
//獲取當(dāng)前線程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
//設(shè)置v
if (setKnownNotUnset(threadLocalMap, value)) {
//添加清除map的線程幢痘,針對使用Jdk的Thread唬格,防止內(nèi)存泄漏
registerCleaner(threadLocalMap);
}
} else {
remove();//刪除對象,清除內(nèi)存防止內(nèi)存泄漏
}
}
InternalThreadLocalMap.get()
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread); //獲取FastThreadLocalThread的
} else {
return slowGet();//獲取非FastThreadLocalThread的颜说,一般是Thread
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) { //沒有則創(chuàng)建一個
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; //使用UnpaddedInternalThreadLocalMap的
//ThreadLocal<InternalThreadLocalMap> 存儲
//static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();//沒有則創(chuàng)建一個
slowThreadLocalMap.set(ret);
}
return ret;
}
InternalThreadLocalMap初始化
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
private InternalThreadLocalMap() {
super(newIndexedVariableTable());//父類構(gòu)造方法初始化indexedVariables 存儲v的數(shù)組
}
private static Object[] newIndexedVariableTable() { //初始化32size的數(shù)組 并默認(rèn)值UNSET
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
setKnownNotUnset
//set值 购岗,并記錄當(dāng)remove 線程時(shí),或主動刪除時(shí)要clear的threadLocal
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) { //使用索引index記錄存儲數(shù)組索引
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
//記錄要回收清除的內(nèi)存
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); //底層都是用
//UnpaddedInternalThreadLocalMap的 indexedVariables
Set<FastThreadLocal<?>> variablesToRemove;
//v搞成set集合门粪,目的很簡單喊积,set里面不會放置重復(fù)的 threadLocal,放置同一個threadLocal多次 所有使用TheadLocal都會放到 variablesToRemoveIndex 數(shù)組中這個索引位置的
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);//放到要清楚set里面
}
//threadLocalMap//
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) { //判斷是否會擴(kuò)充
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET; //只有在覆蓋的時(shí)候才會返回false
} else {
expandIndexedVariableTableAndSet(index, value);//這個是擴(kuò)充底層數(shù)組玄妈,類似hashMap底層擴(kuò)展
return true;
}
}
//這個是將當(dāng)前線程的threadLocalmap放入ObjectCleaner清除隊(duì)里里面乾吻,當(dāng)線程被回收情況下回主動remove threadLocalmap 來回收數(shù)據(jù)
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
//如果是FastThreadLocalThread 線程 則不需要,只需要清除非FastThreadLocalThread的線程的拟蜻,因?yàn)镕astThreadLocalThread run中執(zhí)行的方法在執(zhí)行完成后會自動remove
//cleanerFlagIndex 記錄是否已經(jīng)放入绎签,保證放入一次
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
return;
}
// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
// of the thread, and this Object will be discarded if the associated thread is GCed.
threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap); //在curent線程被GC回收時(shí)執(zhí)行,用來清除線程的threadLocalMap
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
}
});
}
ObjectCleaner
//這個是防止內(nèi)存泄漏的核心代碼酝锅,和FastThreadLocal綁定的線程當(dāng)被回收時(shí)诡必,執(zhí)行該類中的任務(wù)來清除map中的數(shù)據(jù)
package io.netty.util.internal;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.util.internal.SystemPropertyUtil.getInt;
import static java.lang.Math.max;
/**
* Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}
* anymore.
*/
public final class ObjectCleaner {
private static final int REFERENCE_QUEUE_POLL_TIMEOUT_MS =
max(500, getInt("io.netty.util.internal.ObjectCleaner.refQueuePollTimeout", 10000));
// Package-private for testing
static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
// This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()
private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);
private static final Runnable CLEANER_TASK = new Runnable() {
@Override
public void run() {
boolean interrupted = false;
for (;;) {
// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
// See if we can let this thread complete.
while (!LIVE_SET.isEmpty()) {
final AutomaticCleanerReference reference;
try {
reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS); //當(dāng)有線程被GC時(shí),會獲取到AutomaticCleanerReference
} catch (InterruptedException ex) {
// Just consume and move on
interrupted = true;
continue;
}
if (reference != null) {
try {
reference.cleanup(); //執(zhí)行清除threadLocalmap動作
} catch (Throwable ignored) {
// ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
// other unexpected side effects.
}
LIVE_SET.remove(reference);
}
}
CLEANER_RUNNING.set(false);
// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
// There was nothing added after we set STARTED to false or some other cleanup Thread
// was started already so its safe to let this Thread complete now.
break;
}
}
if (interrupted) {
// As we caught the InterruptedException above we should mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
};
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
*/
//將線程或要執(zhí)行的任務(wù)放入包裝為AutomaticCleanerReference然后放入隊(duì)列
public static void register(Object object, Runnable cleanupTask) {
//AutomaticCleanerReference繼成WeakReference
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
LIVE_SET.add(reference);
// Check if there is already a cleaner running.
if (CLEANER_RUNNING.compareAndSet(false, true)) {
//CAS 如果改線程已經(jīng)執(zhí)行則不用啟動搔扁,沒有創(chuàng)建線程去執(zhí)行CLEANER_TASK任務(wù)
final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
cleanupThread.setPriority(Thread.MIN_PRIORITY); //優(yōu)先級
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
// classloader.
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
cleanupThread.setContextClassLoader(null);
return null;
}
});
cleanupThread.setName(CLEANER_THREAD_NAME);
// Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
// running.
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
public static int getLiveSetCount() {
return LIVE_SET.size();
}
private ObjectCleaner() {
// Only contains a static method.
}
private static final class AutomaticCleanerReference extends WeakReference<Object> {
private final Runnable cleanupTask;
//AutomaticCleanerReference繼成WeakReference爸舒,特點(diǎn)是當(dāng)referent被回收時(shí)會將對應(yīng)的引用對象放入指定的REFERENCE_QUEUE隊(duì)列,我們可以使用這個功能來跟蹤即將被回收的對象稿蹲,在被回收之前做些額外的工作 比如復(fù)活
AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
super(referent, REFERENCE_QUEUE);
this.cleanupTask = cleanupTask;
}
//執(zhí)行
void cleanup() {
cleanupTask.run();
}
@Override
public Thread get() {
return null;
}
@Override
public void clear() { //從LIVE_SET移除
LIVE_SET.remove(this);
super.clear();
}
}
}
remove
清除執(zhí)行動作
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());取到當(dāng)前線程的InternalThreadLocalMap
}
/**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);//清楚數(shù)據(jù)
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v); //目前什么也沒做
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
//清除當(dāng)前FastThreadLocal
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
//InternalThreadLocalMap內(nèi)的方法
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) { //清除
Object v = lookup[index];
lookup[index] = UNSET; //填充默認(rèn)
return v;
} else {
return UNSET;
}
}
FastThreadLocal.removeAll()
//在當(dāng)前線程執(zhí)行完成后執(zhí)行的動作調(diào)用地方在FastThreadLocalRunnable內(nèi) 防止內(nèi)存泄露
public static void removeAll() {
//獲取當(dāng)前線程 InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap); //清除當(dāng)前FastThreadLocal中的v
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
get
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap); //初始化 返回null
registerCleaner(threadLocalMap);//放到清除隊(duì)列
return value;
}
總結(jié):
1.從代碼來看扭勉,Netty內(nèi)部使用了FastThreadLocal關(guān)聯(lián)的一些自定義類,線程场绿,threadLocalMap剖效,runnable等。
2.為防止內(nèi)存泄露焰盗,F(xiàn)astThreadLocal針對Netty內(nèi)部自己的線程和用戶自定義線程在清除map數(shù)據(jù)有不同的處理方法
3.底層和Jdk使用數(shù)組來存儲threadLocal的值璧尸,但netty直接使用fastThreadLocal的索引來直接定位在數(shù)組的位置,高效熬拒,但也應(yīng)清楚爷光,每一個threadLocal都是用了數(shù)組兩個空間(index,cleanerFlagIndex)澎粟,所有的threadlocal都使用了variablesToRemoveIndex來存儲要清除的threadlocal蛀序。相比JDK的ThreadLocal欢瞪,使用了空間換時(shí)間效率。
3.使用非FastThreadLocalThread時(shí)徐裸,底層也是封裝了JDK的thredLocal來存儲遣鼓,如2所述,不管哪類線程重贺,都有對應(yīng)的防止內(nèi)存泄露方法骑祟。