1 概述
應(yīng)用程序建立與數(shù)據(jù)庫(kù)的連接其實(shí)是一項(xiàng)開銷很大的工作式塌,其中涉及網(wǎng)絡(luò)連接的建立博敬、會(huì)話的建立、數(shù)據(jù)庫(kù)端與應(yīng)用程序的適配等諸多操作峰尝。因此偏窝,大部分情況下我們會(huì)選擇將數(shù)據(jù)庫(kù)連接進(jìn)行池化管理。
連接池基本的思想是在系統(tǒng)初始化的時(shí)候,將數(shù)據(jù)庫(kù)連接作為對(duì)象存儲(chǔ)在內(nèi)存中囚枪,當(dāng)用戶需要訪問(wèn)數(shù)據(jù)庫(kù)時(shí),并非建立一個(gè)新的連接劳淆,而是從連接池中取出一個(gè)已建立的空閑連接對(duì)象链沼。使用完畢后,用戶也并非將連接關(guān)閉沛鸵,而是將連接放回連接池中括勺,以供下一個(gè)請(qǐng)求訪問(wèn)使用。而連接的建立曲掰、斷開都由連接池自身來(lái)管理疾捍。
同時(shí),還可以通過(guò)設(shè)置連接池的參數(shù)來(lái)控制連接池中的初始連接數(shù)栏妖、連接的上下限數(shù)以及每個(gè)連接的最大使用次數(shù)乱豆、最大空閑時(shí)間等等。也可以通過(guò)其自身的管理機(jī)制來(lái)監(jiān)視數(shù)據(jù)庫(kù)連接的數(shù)量吊趾、使用情況等宛裕。此外,大部分?jǐn)?shù)據(jù)庫(kù)連接池還提供了不同SQL Dialect的適配论泛、查詢緩存揩尸、性能監(jiān)控、插件擴(kuò)展等特性屁奏,進(jìn)一步豐富了數(shù)據(jù)庫(kù)連接池的功能岩榆。
2 整體架構(gòu)
HikariCP與Druid同屬于第二代連接池,但前者代碼與結(jié)構(gòu)極其精簡(jiǎn)坟瓢。只需要從其核心類HikariPool入手勇边,就可以把整體架構(gòu)梳理出來(lái)。HikariCP啟動(dòng)時(shí)首先根據(jù)用戶配置創(chuàng)建HikariConfig類折联,然后通過(guò)JDBC驅(qū)動(dòng)加載DataSource粥诫,加載完成后根據(jù)配置初始化連接池,然后創(chuàng)建連接崭庸。連接的創(chuàng)建回收都是通過(guò)獨(dú)立的線程池來(lái)異步處理的怀浆,同時(shí)還是用一個(gè)定時(shí)線程池來(lái)處理連接泄漏和數(shù)據(jù)監(jiān)控統(tǒng)計(jì)的任務(wù)。所有的連接以PoolEntry的形式存儲(chǔ)在ConcurrentBag中怕享,每個(gè)PoolEntry對(duì)應(yīng)一個(gè)被HikariCP代理的JDBC連接执赡。
3 連接管理
3.1 申請(qǐng)連接
HikariPool負(fù)責(zé)對(duì)資源連接進(jìn)行管理,而ConcurrentBag則是作為物理連接的共享資源站函筋,PoolEntry則是對(duì)物理連接的一對(duì)一封裝沙合。PoolEntry通過(guò)borrow方法從bag中取出,之后通過(guò)PoolEntry.createProxyConnection調(diào)用工廠類生成HikariProxyConnection返回跌帐。
3.2 歸還連接
HikariProxyConnection調(diào)用close方法時(shí)實(shí)際上通過(guò)代理調(diào)用了PooleEntry的recycle方法首懈,之后通過(guò)HikariPool調(diào)用了ConcurrentBag的requite放回绊率。(poolEntry通過(guò)borrow從bag中取出,再通過(guò)requite放回究履。資源成功回收)滤否。
3.3 創(chuàng)建連接
HikariCP中通過(guò)獨(dú)立的線程池addConnectionExecutor進(jìn)行新連接的生成,連接生成方法為PoolEntryCreator最仑。物理鏈接的生成只由PoolBase的newConnection()實(shí)現(xiàn)藐俺,之后封裝成PoolEntry,通過(guò)Bag的add方法加入ConcurrentBag泥彤。當(dāng)ConcurrentBag存在等待線程欲芹,或者有連接被關(guān)閉時(shí),會(huì)觸發(fā)IBagItemListener的addBagItem(wait)方法吟吝,調(diào)用PoolEntryCreator進(jìn)行新連接的生成菱父。
3.4 回收連接
closeConnectionExecutor關(guān)閉連接后,會(huì)調(diào)用fillPool()方法對(duì)連接池進(jìn)行連接填充剑逃。同時(shí)HikariPool提供evictConnection(Connection)方法對(duì)物理連接進(jìn)行手動(dòng)關(guān)閉滞伟。雖然連接池提供了直接回收連接的接口,但對(duì)于開發(fā)者來(lái)說(shuō)一般不需要顯示調(diào)用炕贵,只有連接本身狀態(tài)異嘲鹉危或者連接池shutdown的時(shí)候才需要回收連接。
4 數(shù)據(jù)結(jié)構(gòu)
4.1 ConcurrentBag
ConcurrentBag內(nèi)部同時(shí)使用了ThreadLocal和CopyOnWriteArrayList來(lái)存儲(chǔ)元素称开,其中CopyOnWriteArrayList是線程共享的亩钟。ConcurrentBag采用了queue-stealing的機(jī)制獲取元素:首先嘗試從ThreadLocal中獲取屬于當(dāng)前線程的元素來(lái)避免鎖競(jìng)爭(zhēng),如果沒(méi)有可用元素則掃描公共集合鳖轰、再次從共享的CopyOnWriteArrayList中獲取清酥。(ThreadLocal列表中沒(méi)有被使用的items在借用線程沒(méi)有屬于自己的時(shí)候,是可以被“竊取”的)
ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成員變量蕴侣,線程間不共享焰轻,避免了偽共享(false sharing)的發(fā)生。
//負(fù)責(zé)存放ConcurrentBag中全部用于出借的資源
private final CopyOnWriteArrayList<T> sharedList;
//是否使用弱引用的標(biāo)志位
private final boolean weakThreadLocals;
//用于加速線程本地化資源訪問(wèn)
private final ThreadLocal<List<Object>> threadList;
//任務(wù)竊取時(shí)的事件監(jiān)聽器
private final IBagStateListener listener;
//等待資源交接的線程計(jì)數(shù)器
private final AtomicInteger waiters;
private volatile boolean closed;
//用于存在資源等待線程時(shí)的第一手資源交接
private final SynchronousQueue<T> handoffQueue;
ConcurrentBag的添加和刪除方法比較簡(jiǎn)單昆雀,直接對(duì)sharedList進(jìn)行添加操作辱志,同時(shí)嘗試通過(guò)handoffQueue交接新添加的Connection;而刪除時(shí)則先CAS修改Connection的狀態(tài)狞膘,如果操作成功才會(huì)移除揩懒。
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);//新添加的資源優(yōu)先放入CopyOnWriteArrayList
// 自旋等待直到將資源交到某個(gè)等待線程后才返回(SynchronousQueue)
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}
}
public boolean remove(final T bagEntry)
{
// 如果資源正在使用且無(wú)法進(jìn)行狀態(tài)切換,則返回失敗
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
final boolean removed = sharedList.remove(bagEntry);// 從CopyOnWriteArrayList中移出
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
ConcurrentBag中通過(guò)borrow方法進(jìn)行數(shù)據(jù)資源借用挽封,通過(guò)requite方法進(jìn)行資源回收已球,注意其中borrow方法只提供對(duì)象引用,不移除對(duì)象。所以從bag中“借用”的items實(shí)際上并沒(méi)有從任何集合中刪除智亮,因此即使引用廢棄了忆某,垃圾收集也不會(huì)發(fā)生。因此使用時(shí)通過(guò)borrow取出的對(duì)象必須通過(guò)requite方法進(jìn)行放回阔蛉,否則會(huì)導(dǎo)致內(nèi)存泄露弃舒,只有”remove”方法才能完全從bag中刪除一個(gè)對(duì)象。
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// 優(yōu)先查看有沒(méi)有可用的本地化的資源
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
final int waiting = waiters.incrementAndGet();
try {
// 當(dāng)無(wú)可用本地化資源時(shí)馍忽,遍歷全部資源,查看是否存在可用資源
// 因此被一個(gè)線程本地化的資源也可能被另一個(gè)線程“搶走”
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
if (waiting > 1) {
// 因?yàn)榭赡堋皳屪摺绷似渌€程的資源燕差,因此提醒包裹進(jìn)行資源添加
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
// 當(dāng)現(xiàn)有全部資源全部在使用中遭笋,等待一個(gè)被釋放的資源或者一個(gè)新資源
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
public void requite(final T bagEntry)
{
// 將狀態(tài)轉(zhuǎn)為未在使用
bagEntry.setState(STATE_NOT_IN_USE);
// 判斷是否存在等待線程,若存在徒探,則直接轉(zhuǎn)手資源
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
// 否則瓦呼,進(jìn)行資源本地化
final List<Object> threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
4.2 FastList
FastList是一個(gè)List接口的精簡(jiǎn)實(shí)現(xiàn),只實(shí)現(xiàn)了接口中必要的幾個(gè)方法测暗。JDK ArrayList每次調(diào)用get()方法時(shí)都會(huì)進(jìn)行rangeCheck檢查索引是否越界央串,F(xiàn)astList的實(shí)現(xiàn)中去除了這一檢查,只要保證索引合法那么rangeCheck就成為了不必要的計(jì)算開銷(當(dāng)然開銷極小)碗啄。此外质和,HikariCP使用List來(lái)保存打開的Statement,當(dāng)Statement關(guān)閉或Connection關(guān)閉時(shí)需要將對(duì)應(yīng)的Statement從List中移除稚字。通常情況下饲宿,同一個(gè)Connection創(chuàng)建了多個(gè)Statement時(shí),后打開的Statement會(huì)先關(guān)閉胆描。ArrayList的remove(Object)方法是從頭開始遍歷數(shù)組瘫想,而FastList是從數(shù)組的尾部開始遍歷,因此更為高效昌讲。
@Override
public T get(int index)
{
return elementData[index];
}
@Override
public boolean add(T element)
{
if (size < elementData.length) {
elementData[size++] = element;
} else {
// 容量溢出時(shí)進(jìn)行擴(kuò)容国夜,創(chuàng)建一個(gè)新的2倍容量數(shù)組然后淺拷貝過(guò)去,最后插入新元素
final int oldCapacity = elementData.length;
final int newCapacity = oldCapacity << 1;
@SuppressWarnings("unchecked")
final T[] newElementData = (T[]) Array.newInstance(clazz, newCapacity);
System.arraycopy(elementData, 0, newElementData, 0, oldCapacity);
newElementData[size++] = element;
elementData = newElementData;
}
return true;
}
@Override
public boolean remove(Object element)
{
for (int index = size - 1; index >= 0; index--) {
if (element == elementData[index]) {
final int numMoved = size - index - 1;
if (numMoved > 0) {
System.arraycopy(elementData, index + 1, elementData, index, numMoved);
}
elementData[--size] = null;
return true;
}
}
return false;
}
4.3 SuspendResumeLock
該類內(nèi)部有一個(gè)靜態(tài)常量鎖FAUX_LOCK短绸,基于Semaphore封裝车吹,默認(rèn)10000個(gè)令牌。HikariCP連接池初始化時(shí)會(huì)根據(jù)isAllowPoolSuspension來(lái)選擇為新建立的連接池新建一個(gè)鎖實(shí)例(用于實(shí)現(xiàn)連接池掛起)還是共享FAUX_LOCK醋闭。
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
private static final int MAX_PERMITS = 10000;
private final Semaphore acquisitionSemaphore;
public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false)
5 總結(jié)
連接池給開發(fā)人員使用數(shù)據(jù)庫(kù)帶來(lái)了性能和開發(fā)效率兩個(gè)方面的提升礼搁,但是也使得某些問(wèn)題變得更復(fù)雜,出現(xiàn)故障時(shí)更難定位目尖。比如數(shù)據(jù)庫(kù)局部變量聲明馒吴、事務(wù)的作用域、Prepared Statement管理、緩存管理饮戳、多線程復(fù)用連接導(dǎo)致的線程安全問(wèn)題等豪治。