HikariCP源碼簡潔剖析——ConcurrentBag

基于2.4版本

ConcurrentBag是什么

ConcurrentBag是HikariCP中實現(xiàn)的一個無鎖化集合,比JDK中的LinkedBlockingQueueLinkedTransferQueue的性能更好骇扇。借鑒了C#中的設(shè)計但金,作者在這篇文章中說提到的幾個點是:

  1. A lock-free design
  2. ThreadLocal caching
  3. Queue-stealing
  4. Direct hand-off optimizations

源碼剖析

設(shè)計目的

ConcurrentBag的類注釋如下:

This is a specialized concurrent bag that achieves superior performance to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a connection pool. It uses ThreadLocal storage when possible to avoid locks, but resorts to scanning a common collection if there are no available items in the ThreadLocal list. Not-in-use items in the ThreadLocal lists can be "stolen" when the borrowing thread has none of its own. It is a "lock-less" implementation using a specialized AbstractQueuedLongSynchronizer to manage cross-thread signaling. Note that items that are "borrowed" from the bag are not actually removed from any collection, so garbage collection will not occur even if the reference is abandoned. Thus care must be taken to "requite" borrowed objects otherwise a memory leak will result. Only the "remove" method can completely remove an object from the bag

簡單翻譯一下:

ConcurrentBag是為追求鏈接池操作高性能而設(shè)計的并發(fā)工具盖淡。它使用ThreadLocal緩存來避免鎖爭搶冗恨,當(dāng)ThreadLocal中沒有可用的鏈接時會去公共集合中“借用”鏈接。ThreadLocal中處于Not-in-use狀態(tài)的鏈接也可能會“借走”。

ConcurrentBag使用AbstractQueuedLongSynchronizer來管理跨線程通信(實際新版本已經(jīng)刪掉了AbstractQueuedLongSynchronizer)狠持。

注意被“借走”的鏈接并沒有從任何集合中刪除甜刻,所以即使鏈接的引用被棄用也不會進行g(shù)c昭齐。所以要及時將被“借走”的鏈接歸還回來,否則可能會發(fā)生內(nèi)存泄露丧荐。只有remove方法才會真正將鏈接從ConcurrentBag中刪除车荔。

看下HikariCP中是如何實現(xiàn)ConcurrentBag的帽借。

源碼實現(xiàn)

類定義

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable

ConcurrentBag只是實現(xiàn)了AutoCloseable接口巍举,而沒有實現(xiàn)ListMap等接口。其中的元素要集成IConcurrentBagEntry。我們看下IConcurrentBagEntry的定義:

public interface IConcurrentBagEntry
   {
      //定義鏈接的狀態(tài)
      int STATE_NOT_IN_USE = 0;
      int STATE_IN_USE = 1;
      int STATE_REMOVED = -1;
      int STATE_RESERVED = -2;

      //對鏈接狀態(tài)的操作
      boolean compareAndSet(int expectState, int newState);
      void setState(int newState);
      int getState();
   }

再看下類成員變量:


   //存放共享元素,線程安全的List
   private final CopyOnWriteArrayList<T> sharedList;
   //是否使用弱引用
   private final boolean weakThreadLocals;

   //線程本地緩存
   private final ThreadLocal<List<Object>> threadList;
   //添加元素的監(jiān)聽器,在HikariPool中實現(xiàn)
   private final IBagStateListener listener;
   //當(dāng)前等待獲取元素的線程數(shù)
   private final AtomicInteger waiters;
   //ConcurrentBag是否處于關(guān)于狀態(tài)
   private volatile boolean closed;

   //接力隊列
   private final SynchronousQueue<T> handoffQueue;

鏈接PoolEntry

在HikariCP中使用PoolEntry對鏈接實例Connection進行了封裝,記錄了Connection相關(guān)的數(shù)據(jù),如Connection實例惜犀、鏈接狀態(tài)莉御、當(dāng)前活躍會話迄薄、對鏈接池引用等人乓。

PoolEntry也是ConcurrentBag管理的對象,sharedListthreadList中保存的對象就是PoolEntry的實例。

/**
 * Entry used in the ConcurrentBag to track Connection instances.
 *
 * @author Brett Wooldridge
 */
final class PoolEntry implements IConcurrentBagEntry {
   //用來更新鏈接的狀態(tài)state
   private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
   //鏈接實例
   Connection connection;
   //鏈接狀態(tài),如STATE_IN_USE精续、STATE_NOT_IN_USE
   private volatile int state;
   //驅(qū)逐狀態(tài),刪除該鏈接時標(biāo)記為true
   private volatile boolean evict;
   //當(dāng)前打開的會話
   private final FastList<Statement> openStatements;
   //鏈接池引用
   private final HikariPool hikariPool;

   private final boolean isReadOnly;
   private final boolean isAutoCommit;
}

ConcurrentBag中的方法比較少,我們一個個看一下:

1. 增加鏈接

add方法很簡單菊匿,只是將新的鏈接放入sharedList中佩厚,如果有等待鏈接的線程思恐,則將鏈接給該線程基跑。

可以發(fā)現(xiàn)其實所有的鏈接都保存在sharedList中萄凤,ThreadList只是其中一部分漾月。

/**
 * Add a new object to the bag for others to borrow.
 *
 *@parambagEntryan object to add to the bag
 */
public void add(final T bagEntry) {
   if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }

    //將鏈接放入共享隊列
   sharedList.add(bagEntry);

   // spin until a thread takes it or none are waiting
   // 等待直到?jīng)]有waiter或有線程拿走它
   while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
       //yield什么都不做烛芬,只是為了讓渡CPU使用西采,避免長期占用
       yield();
   }
}

2. 獲取鏈接

鏈接獲取順序:

  1. 從線程本地緩存ThreadList中獲取境析,這里保持的是該線程之前使用過的鏈接
  2. 從共享集合sharedList中獲取,如果獲取不到派诬,會通知listener新建鏈接(但不一定真的會新建鏈接出來)
  3. handoffQueue中阻塞獲取劳淆,新建的鏈接或一些轉(zhuǎn)為可用的鏈接會放入該隊列中
   /**
    * The method will borrow a BagEntry from the bag, blocking for the
    * specified timeout if none are available.
    *
    * @param timeout how long to wait before giving up, in units of unit
    * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
    * @return a borrowed instance from the bag or null if a timeout occurs
    * @throws InterruptedException if interrupted while waiting
    */
   public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
      // 先看是否能從ThreadList中拿到可用鏈接,這里的List通常為FastList
      List<Object> list = threadList.get();
      if (weakThreadLocals && list == null) {
         list = new ArrayList<>(16);
         threadList.set(list);
      }

      //1. 試從ThreadList中獲取鏈接默赂,倒序獲取
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         //獲取鏈接沛鸵,鏈接可能使用了弱引用
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         //如果能夠獲取鏈接且鏈接可用,則將該鏈接的狀態(tài)從STATE_NOT_IN_USE置為STATE_IN_USE
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      //2. 如果ThreadList中沒有可用的鏈接缆八,則嘗試從共享集合中獲取鏈接
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  //通知監(jiān)聽器添加鏈接
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         listener.addBagItem(waiting);

         //3. 嘗試從handoffQueue隊列中獲取谒臼。在等待時可能鏈接被新建或改為轉(zhuǎn)為可用狀態(tài)
         //SynchronousQueue是一種無容量的BlockingQueue,在poll時如果沒有元素耀里,則阻塞等待timeout時間
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = CLOCK.currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= CLOCK.elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

3. 歸還鏈接

歸還鏈接的順序:

  1. 將鏈接置為可用狀態(tài)STATE_NOT_IN_USE

  2. 如果有等待鏈接的線程蜈缤,則將該鏈接通過handoffQueue給出去

    由于該鏈接可能在當(dāng)前線程的threadList里,所以可以發(fā)現(xiàn)A線程的threadList中的鏈接可能被B線程使用

  3. 將它放入當(dāng)前線程的theadList中

    這里可以看出來threadList一開始是空的冯挎,當(dāng)線程從sharedList中借用了鏈接并使用完后底哥,會放入自己的緩存中

/**
    * This method will return a borrowed object to the bag.  Objects
    * that are borrowed from the bag but never "requited" will result
    * in a memory leak.
    *
    * @param bagEntry the value to return to the bag
    * @throws NullPointerException if value is null
    * @throws IllegalStateException if the bagEntry was not borrowed from the bag
    */
   public void requite(final T bagEntry) {
      //1. 將鏈接狀態(tài)改為STATE_NOT_IN_USE
      bagEntry.setState(STATE_NOT_IN_USE);

      //2. 如果有等待鏈接的線程咙鞍,將該鏈接交出去
      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         } else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         } else {
            yield();
         }
      }

      //3. 將鏈接放入線程本地緩存ThreadList中
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList != null) {
          threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }

鏈接借用流程

我們可以畫個圖簡單看下鏈接的借用過程

鏈接借用流程

github項目地址:https://github.com/caychan/CCoding

求star

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市趾徽,隨后出現(xiàn)的幾起案子续滋,更是在濱河造成了極大的恐慌,老刑警劉巖孵奶,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疲酌,死亡現(xiàn)場離奇詭異,居然都是意外死亡了袁,警方通過查閱死者的電腦和手機朗恳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來载绿,“玉大人粥诫,你說我怎么就攤上這事≌赣梗” “怎么了怀浆?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長怕享。 經(jīng)常有香客問我执赡,道長,這世上最難降的妖魔是什么函筋? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任沙合,我火速辦了婚禮,結(jié)果婚禮上驻呐,老公的妹妹穿的比我還像新娘灌诅。我一直安慰自己芳来,他們只是感情好含末,可當(dāng)我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著即舌,像睡著了一般佣盒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上顽聂,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天肥惭,我揣著相機與錄音,去河邊找鬼紊搪。 笑死蜜葱,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的耀石。 我是一名探鬼主播牵囤,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了揭鳞?” 一聲冷哼從身側(cè)響起炕贵,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎野崇,沒想到半個月后称开,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡乓梨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年鳖轰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片督禽。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡脆霎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出狈惫,到底是詐尸還是另有隱情睛蛛,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布胧谈,位于F島的核電站忆肾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏菱肖。R本人自食惡果不足惜客冈,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望稳强。 院中可真熱鬧场仲,春花似錦、人聲如沸退疫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽褒繁。三九已至亦鳞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間棒坏,已是汗流浹背燕差。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留坝冕,地道東北人徒探。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像喂窟,于是被迫代替她去往敵國和親测暗。 傳聞我的和親對象是個殘疾皇子吵血,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容