背景
在zookeeper服務(wù)端中需要管理session和connection這兩類具有超時(shí)屬性的對(duì)象。zookeeper提供了ExpiryQueue來實(shí)現(xiàn)通用對(duì)象超時(shí)管理容器孕锄。
實(shí)現(xiàn)
我拿connection來說如何管理他的超時(shí)問題,不同的連接超時(shí)的時(shí)間點(diǎn)是不同的
那么zookeeper是如何高效的管理這些不同的超時(shí)時(shí)間點(diǎn)的呢?
在ExpiryQueue有個(gè)expirationInterval屬性真慢,zookeeper會(huì)使用expirationInterval作為基準(zhǔn)把每一個(gè)連接的超時(shí)時(shí)間點(diǎn)歸一化為expirationInterval整數(shù)倍服爷,歸一化的計(jì)算方式為
我們拿上一個(gè)圖為例子药薯,通過歸一化處理connection_1,connection_2,connection_3這三個(gè)連接的超時(shí)時(shí)間點(diǎn)可能會(huì)變成同一個(gè)
假定expirationInterval = 10000您朽,
connection_1_timeout_point = 1599715479084
connection_2_timeout_point = 1599715479184
connection_3_timeout_point = 1599715479384
那么通過歸一化計(jì)算他們?nèi)齻€(gè)的超時(shí)時(shí)間點(diǎn)都變成了1599715480000
我們現(xiàn)在看下ExpiryQueue兩個(gè)重要的屬性
//記錄了每一個(gè)對(duì)象的歸一化后的超時(shí)時(shí)間點(diǎn)咪橙,key是被管理的對(duì)象,value是超時(shí)時(shí)間點(diǎn)
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
//存儲(chǔ)了相同超時(shí)時(shí)間點(diǎn)的所有對(duì)象,key是超時(shí)時(shí)間點(diǎn)美侦,value是相同超時(shí)時(shí)間點(diǎn)的對(duì)象集合
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
超時(shí)管理線程
有了超時(shí)對(duì)象管理容器,還需要相應(yīng)的超時(shí)管理線程來監(jiān)控容器中對(duì)象的狀態(tài)魂奥,超時(shí)管理線程也會(huì)按照expirationInterval為時(shí)間間隔單位來運(yùn)行菠剩,它每次運(yùn)行的時(shí)間點(diǎn)記錄在ExpiryQueue的nextExpirationTime屬性中,nextExpirationTime初始化值由
now是ExpiryQueue創(chuàng)建時(shí)的系統(tǒng)時(shí)間點(diǎn)
nextExpirationTime之后每次更新的值通過下面公式得到
可以看出nextExpirationTime和歸一化后的connection超時(shí)時(shí)間點(diǎn)是一致的耻煤,都是expirationInterval的倍數(shù)
超時(shí)管理線程處理超時(shí)對(duì)象
超時(shí)管理線程會(huì)每隔expirationInterval去ExpiryQueue中獲取超時(shí)對(duì)象具壮,具體實(shí)現(xiàn)是ExpiryQueue.poll方法
public Set<E> poll() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
if (now < expirationTime) {
return Collections.emptySet();
}
Set<E> set = null;
//更新nextExpirationTime = nextExpirationTime + expirationInterval
long newExpirationTime = expirationTime + expirationInterval;
if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
//獲取expirationTime超時(shí)時(shí)間點(diǎn)對(duì)應(yīng)的超時(shí)對(duì)象集合
set = expiryMap.remove(expirationTime);
}
if (set == null) {
return Collections.emptySet();
}
//返回在本次expirationTime超時(shí)時(shí)間點(diǎn)超時(shí)的對(duì)象
return set;
}
每次在超時(shí)時(shí)間點(diǎn)獲取超時(shí)對(duì)象之后,超時(shí)管理線程可以根據(jù)超時(shí)對(duì)象的不同業(yè)務(wù)特性做不同的業(yè)務(wù)邏輯
超時(shí)對(duì)象的更新
超時(shí)對(duì)象在放入到ExpiryQueue中之后哈蝇,這些對(duì)象會(huì)根據(jù)自己的特性(比如對(duì)于連接對(duì)象來說當(dāng)連接上發(fā)生IO事件棺妓,那么就需要更新連接超時(shí)時(shí)間點(diǎn))更新自己的超時(shí)時(shí)間點(diǎn),具體的實(shí)現(xiàn)邏輯是在ExpiryQueue.update方法中
//timeout是超時(shí)對(duì)象的超時(shí)時(shí)間(或者說存活時(shí)長(zhǎng))
public Long update(E elem, int timeout) {
//通過elemMap獲取超時(shí)對(duì)象之前的超時(shí)時(shí)間點(diǎn)
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
//通過歸一化方法獲取超時(shí)對(duì)象新的超時(shí)時(shí)間點(diǎn)
Long newExpiryTime = roundToNextInterval(now + timeout);
//如果新超時(shí)時(shí)間點(diǎn)和老超時(shí)時(shí)間點(diǎn)一樣那么不做任何處理
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
// First add the elem to the new expiry time bucket in expiryMap.
//使用新超時(shí)時(shí)間點(diǎn)從expiryMap獲取所有在新超時(shí)時(shí)間點(diǎn)超時(shí)的對(duì)象集合
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
//如果超時(shí)對(duì)象集合為空炮赦,那么創(chuàng)建一個(gè)
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
//并發(fā)的情況下可能會(huì)出現(xiàn)多個(gè)線程同時(shí)創(chuàng)建相同超時(shí)時(shí)間點(diǎn)對(duì)象集合怜跑,所以需要做如下是否存在判斷處理
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
//把本超時(shí)對(duì)象加入集合
set.add(elem);
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
//同時(shí)更新超時(shí)對(duì)象在elemMap中新的超時(shí)時(shí)間點(diǎn)
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
//根據(jù)超時(shí)對(duì)象上一個(gè)超時(shí)時(shí)間點(diǎn)從expiryMap對(duì)應(yīng)的超時(shí)對(duì)象集合中把本超時(shí)對(duì)象刪除
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
上面從源碼的角度分析了zookeeper是如何實(shí)現(xiàn)超時(shí)對(duì)象管理的,關(guān)于這一塊的理解強(qiáng)烈推薦大家看《從 Paxos 到 ZooKeeper:分布式一致性原理與實(shí)踐》這本書吠勘,這本書寫的太棒了