基于2.4版本
ConcurrentBag是什么
ConcurrentBag是HikariCP中實現(xiàn)的一個無鎖化集合,比JDK中的LinkedBlockingQueue
和LinkedTransferQueue
的性能更好骇扇。借鑒了C#中的設(shè)計但金,作者在這篇文章中說提到的幾個點是:
- A lock-free design
- ThreadLocal caching
- Queue-stealing
- Direct hand-off optimizations
源碼剖析
設(shè)計目的
ConcurrentBag的類注釋如下:
This is a specialized concurrent bag that achieves superior performance to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a connection pool. It uses ThreadLocal storage when possible to avoid locks, but resorts to scanning a common collection if there are no available items in the ThreadLocal list. Not-in-use items in the ThreadLocal lists can be "stolen" when the borrowing thread has none of its own. It is a "lock-less" implementation using a specialized AbstractQueuedLongSynchronizer to manage cross-thread signaling. Note that items that are "borrowed" from the bag are not actually removed from any collection, so garbage collection will not occur even if the reference is abandoned. Thus care must be taken to "requite" borrowed objects otherwise a memory leak will result. Only the "remove" method can completely remove an object from the bag
簡單翻譯一下:
ConcurrentBag是為追求鏈接池操作高性能而設(shè)計的并發(fā)工具盖淡。它使用ThreadLocal緩存來避免鎖爭搶冗恨,當(dāng)ThreadLocal中沒有可用的鏈接時會去公共集合中“借用”鏈接。ThreadLocal中處于
Not-in-use
狀態(tài)的鏈接也可能會“借走”。ConcurrentBag使用
AbstractQueuedLongSynchronizer
來管理跨線程通信(實際新版本已經(jīng)刪掉了AbstractQueuedLongSynchronizer
)狠持。注意被“借走”的鏈接并沒有從任何集合中刪除甜刻,所以即使鏈接的引用被棄用也不會進行g(shù)c昭齐。所以要及時將被“借走”的鏈接歸還回來,否則可能會發(fā)生內(nèi)存泄露丧荐。只有
remove
方法才會真正將鏈接從ConcurrentBag中刪除车荔。
看下HikariCP中是如何實現(xiàn)ConcurrentBag的帽借。
源碼實現(xiàn)
類定義
public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
ConcurrentBag只是實現(xiàn)了AutoCloseable
接口巍举,而沒有實現(xiàn)List
或Map
等接口。其中的元素要集成IConcurrentBagEntry
。我們看下IConcurrentBagEntry
的定義:
public interface IConcurrentBagEntry
{
//定義鏈接的狀態(tài)
int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1;
int STATE_REMOVED = -1;
int STATE_RESERVED = -2;
//對鏈接狀態(tài)的操作
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
再看下類成員變量:
//存放共享元素,線程安全的List
private final CopyOnWriteArrayList<T> sharedList;
//是否使用弱引用
private final boolean weakThreadLocals;
//線程本地緩存
private final ThreadLocal<List<Object>> threadList;
//添加元素的監(jiān)聽器,在HikariPool中實現(xiàn)
private final IBagStateListener listener;
//當(dāng)前等待獲取元素的線程數(shù)
private final AtomicInteger waiters;
//ConcurrentBag是否處于關(guān)于狀態(tài)
private volatile boolean closed;
//接力隊列
private final SynchronousQueue<T> handoffQueue;
鏈接PoolEntry
在HikariCP中使用PoolEntry
對鏈接實例Connection進行了封裝,記錄了Connection相關(guān)的數(shù)據(jù),如Connection實例惜犀、鏈接狀態(tài)莉御、當(dāng)前活躍會話迄薄、對鏈接池引用等人乓。
PoolEntry
也是ConcurrentBag
管理的對象,sharedList
和threadList
中保存的對象就是PoolEntry
的實例。
/**
* Entry used in the ConcurrentBag to track Connection instances.
*
* @author Brett Wooldridge
*/
final class PoolEntry implements IConcurrentBagEntry {
//用來更新鏈接的狀態(tài)state
private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
//鏈接實例
Connection connection;
//鏈接狀態(tài),如STATE_IN_USE精续、STATE_NOT_IN_USE
private volatile int state;
//驅(qū)逐狀態(tài),刪除該鏈接時標(biāo)記為true
private volatile boolean evict;
//當(dāng)前打開的會話
private final FastList<Statement> openStatements;
//鏈接池引用
private final HikariPool hikariPool;
private final boolean isReadOnly;
private final boolean isAutoCommit;
}
ConcurrentBag中的方法比較少,我們一個個看一下:
1. 增加鏈接
add
方法很簡單菊匿,只是將新的鏈接放入sharedList
中佩厚,如果有等待鏈接的線程思恐,則將鏈接給該線程基跑。
可以發(fā)現(xiàn)其實所有的鏈接都保存在sharedList
中萄凤,ThreadList
只是其中一部分漾月。
/**
* Add a new object to the bag for others to borrow.
*
*@parambagEntryan object to add to the bag
*/
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
//將鏈接放入共享隊列
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
// 等待直到?jīng)]有waiter或有線程拿走它
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
//yield什么都不做烛芬,只是為了讓渡CPU使用西采,避免長期占用
yield();
}
}
2. 獲取鏈接
鏈接獲取順序:
- 從線程本地緩存
ThreadList
中獲取境析,這里保持的是該線程之前使用過的鏈接 - 從共享集合
sharedList
中獲取,如果獲取不到派诬,會通知listener新建鏈接(但不一定真的會新建鏈接出來) - 從
handoffQueue
中阻塞獲取劳淆,新建的鏈接或一些轉(zhuǎn)為可用的鏈接會放入該隊列中
/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
// 先看是否能從ThreadList中拿到可用鏈接,這里的List通常為FastList
List<Object> list = threadList.get();
if (weakThreadLocals && list == null) {
list = new ArrayList<>(16);
threadList.set(list);
}
//1. 試從ThreadList中獲取鏈接默赂,倒序獲取
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
//獲取鏈接沛鸵,鏈接可能使用了弱引用
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
//如果能夠獲取鏈接且鏈接可用,則將該鏈接的狀態(tài)從STATE_NOT_IN_USE置為STATE_IN_USE
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
//2. 如果ThreadList中沒有可用的鏈接缆八,則嘗試從共享集合中獲取鏈接
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
//通知監(jiān)聽器添加鏈接
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
//3. 嘗試從handoffQueue隊列中獲取谒臼。在等待時可能鏈接被新建或改為轉(zhuǎn)為可用狀態(tài)
//SynchronousQueue是一種無容量的BlockingQueue,在poll時如果沒有元素耀里,則阻塞等待timeout時間
timeout = timeUnit.toNanos(timeout);
do {
final long start = CLOCK.currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= CLOCK.elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
3. 歸還鏈接
歸還鏈接的順序:
將鏈接置為可用狀態(tài)
STATE_NOT_IN_USE
-
如果有等待鏈接的線程蜈缤,則將該鏈接通過
handoffQueue
給出去由于該鏈接可能在當(dāng)前線程的threadList里,所以可以發(fā)現(xiàn)A線程的threadList中的鏈接可能被B線程使用
-
將它放入當(dāng)前線程的theadList中
這里可以看出來threadList一開始是空的冯挎,當(dāng)線程從sharedList中借用了鏈接并使用完后底哥,會放入自己的緩存中
/**
* This method will return a borrowed object to the bag. Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the bagEntry was not borrowed from the bag
*/
public void requite(final T bagEntry) {
//1. 將鏈接狀態(tài)改為STATE_NOT_IN_USE
bagEntry.setState(STATE_NOT_IN_USE);
//2. 如果有等待鏈接的線程咙鞍,將該鏈接交出去
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
} else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
} else {
yield();
}
}
//3. 將鏈接放入線程本地緩存ThreadList中
final List<Object> threadLocalList = threadList.get();
if (threadLocalList != null) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
鏈接借用流程
我們可以畫個圖簡單看下鏈接的借用過程
github項目地址:https://github.com/caychan/CCoding
求star