使用過zookeeper的都知道贴硫,當我們使用zookeeper創(chuàng)建一個節(jié)點時,我們能選擇節(jié)點的類型是“臨時節(jié)點”還是“永久節(jié)點”伊者。臨時節(jié)點和永久節(jié)點的區(qū)別是英遭,臨時節(jié)點會在客戶端斷開連接時被刪除,而永久節(jié)點無論客戶端是否斷開連接亦渗,都會保留挖诸。
臨時節(jié)點非常重要,我們經常利用它來實現分布式鎖法精、選舉等等
而為了實現臨時節(jié)點的功能税灌,zookeeper服務端就勢必要有一套高效的session管理機制,它能實現如下功能:當客戶端session失效后亿虽,服務端能感知到,隨后刪掉客戶端當前創(chuàng)建的臨時節(jié)點苞也,并通知給其他的客戶端洛勉。這篇文章會深入探討zookeeper的session管理機制。
zookeeper的心跳機制
為什么要有心跳機制
zookeeper底層支持兩種網絡庫如迟,一種是zookeeper基于NIO自己寫的收毫,一種是Netty。那么zookeeper能不能直接通過感知TCP連接是否斷開來感知客戶端連接是否斷開呢殷勘?
答案是不能此再,原因有很多,個人覺得最重要的一點是玲销,基于TCP連接來判斷客戶端是否存活是不靠譜的输拇。
這里舉兩個異常case
1、客戶端進程直接crash了贤斜,還沒來得及發(fā)送FIN報文策吠,這種情況下逛裤,zookeeper在TCP這一側是沒辦法及時感知到TCP連接已經失效了。(也就是說猴抹,由于zookeeper沒收到client的FIN包带族,雖然client已經掛了,TCP側還認為客戶端還活著蟀给。只能等弱雞keepalive了)
2蝙砌、客戶端和服務端之間很久沒有報文交互,TCP連接其實已經失效了跋理。(失效原因很多择克,比如路由器出問題了,網絡設備故障了等等)這時候薪介,如果客戶端發(fā)送報文給服務端祠饺,linux會進行重試,默認差不多要重試15分鐘汁政,才能感知到這個連接已經失效道偷。
這里只是舉個例子丸相。實際來說藕届,client和zookeeper的報文交互可能不會那么少。也就是第二種情況浓镜,客戶端很久不給服務端發(fā)報文目木,要發(fā)了换途,才發(fā)現tcp連接已經有問題了,這種情況不太可能出現刽射。(當然军拟,選舉這種場景是有可能出現這種情況的,后面會單獨寫文章分析這個case)
但是從上面兩個例子誓禁,我們也不難得出結論懈息,通過tcp連接的斷開與否來感知客戶端是否存活,似乎并不太靠譜摹恰。
client發(fā)送心跳
因為tcp的“不靠譜”辫继,zookeeper為了能夠實現可靠的連接管理(也為了保活)俗慈,選擇自己實現心跳機制姑宽。
正如上圖所示,client隔一段時間就會發(fā)送一個心跳報文給服務端闺阱,告訴zookeeper自己還活著炮车,別把我連接關了。
注意:這里亂入了一個create報文,因為正常報文也算是一種“心跳”示血。反正棋傍,zookeeper只要能收到報文,就能知道客戶端還活著难审。
服務端收到報文后瘫拣,會更新session信息
這里帶大家看看代碼。很簡單告喊,收到報文后麸拄,會調用 SessionTracker.touchSession()
來更新session信息
sessionId是zookeeper為每一個連接分配的唯一id
管理session的難點在哪
zookeeper 管理session的需求分析
這里我們思考下zookeeper對于session管理的需求是什么?
1黔姜、zookeeper需要有個地方存session
2拢切、當客戶端一段時間沒發(fā)生心跳時,zookeeper要能感知到
第一個問題比較簡單秆吵,java提供了各式各樣的集合淮椰,無論是Map或者是List理論上都能存session。由于我們一般是通過sessionId來找到關聯的Session的纳寂,因此使用Map更合適點主穗。Zookeeper也是這么做的
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById
這里sessionsById 就是zookeeper管理Session使用的容器
第二個問題看起來也很簡單,客戶端不是會發(fā)送心跳么毙芜?我給每一個session記錄一個上一個報文到達時間忽媒,一旦收到新的報文,我就更新這個時間腋粥。然后我再不斷地掃描每個session是否很久沒收到報文不就行了晦雨?
如何檢測Session失效?
按照上面的說法隘冲,收到一個報文闹瞧,我就更新Session的“上次報文”字段。假設session失效時間是4秒展辞,我就每隔4秒掃描一次session的集合夹抗,找出那個超過4秒沒有新的報文的session不就行了?
像上圖一樣纵竖,假設sessionTimeout是4秒,現在已經11點50分05秒了杏愤,理論上每個session上一個報文時間應該大于11點50分01秒靡砌。很明顯,session1失效了珊楼。
定時任務的選型
既然要定時掃描通殃,我們就需要跑一個定時任務。jdk本身也提供了很多的定時任務方案。不知道定時任務的同學可以參考這篇文章Java中定時任務的6種實現方式画舌,你知道幾種堕担? - 掘金
既然如此,我們完全可以使用jdk自帶的定時任務曲聂,定時去掃描這個集合啊霹购,這樣不就能很輕易的找到失效的session了么?
這里有兩個問題:
1朋腋、每一個客戶端和服務端的連接齐疙,sessionTimeout都是可配的。我們例子是4秒沒收到報文旭咽,就認為連接失效贞奋。實際超時時間可能有1秒、2秒穷绵、3秒.... 所以如果用定時任務來實現轿塔,我們可能需要啟動不止一個定時任務。
2仲墨、jdk提供的定時任務不夠靈活勾缭,什么意思呢。比如我設置的sessionTimeout是4秒宗收,現在是11點漫拭。然后我在11點00分02秒就收到了一個心跳,那么下次檢測時間應該變成11點00分06秒混稽。而jdk的定時任務限定了采驻,只能每隔4秒檢測一次。比如:11點匈勋、11點00分04秒礼旅、11點00分08秒、這樣檢測下去洽洁。而如果用jdk的定時任務痘系,我們只能簡單的隔一段時間,檢測一次饿自。這里可以仔細體會下兩者的差異汰翠。
想一想這些問題,是不是發(fā)現想實現一個高效的session管理機制是不是沒那么簡單昭雌。接下來我們看看复唤,zookeeper是如何巧妙地實現session的管理。
zookeeper session管理機制
接下來就要介紹zookeeper的session管理機制了烛卧。
1佛纫、通過expiryMap存儲過期時間與session集合的對應關系
首先,zookeeper內部有一個expiryMap
非常簡單,Key是過期時間呈宇,value是一個Set好爬,里面放了一個個的Session。
舉個例子:S1在11點50分02秒的key下面甥啄,表示如果11點50分02秒前沒收到新的報文存炮,就認為S1過期了。
2型豁、當收到某條連接的報文時僵蛛,更新expiryMap
拿上圖的S1為例,它的sessionTimeout是4秒迎变,在11點50分01秒收到報文充尉。那么理論上下個session檢測時間會是 11點50分05秒。
這里要說下expiryMap的第一個特征衣形,它的key并不是隨意一個時間驼侠。它會間隔一個固定的時間叫做 expirationInterval
,數值上它等于zookeeper的配置tickTime
(默認配的2秒)
所以說谆吴,這里計算出11點50分05秒后倒源,它會round下,round到11點50分06秒
如圖所示:
S1在截止時間前更新了session句狼,我們就要把它從舊的桶里移除笋熬,挪到新的桶里。
3腻菇、循環(huán)檢測ExpiryMap
有個SessionTracker
線程會循環(huán)檢測這個expiryMap
胳螟,找到最近的那個key對應的session集合,把他們全部都過期掉筹吐。
就像上面的例子糖耸,一旦時間到了11點50分02秒,就把對應的session全部過期掉丘薛。
小結下
1嘉竟、如果收到報文,會把session放到下一個過期桶里洋侨。
2舍扰、SessionTracker會按次序,不斷地取出過期的桶希坚,把桶里的session全部過期掉(過期會刪除臨時節(jié)點边苹,當然還有其他一系列操作)
3、zookeeper底層使用了非常簡單的Map就實現了非常高效的Session管理機制吏够。
session管理機制源碼分析
接下來我們來看看源碼
1、server端的心跳續(xù)約
//org.apache.zookeeper.server.ExpiryQueue#update
public Long update(E elem, int timeout) {
//1、除了上面我們介紹的ExpiryMap锅知,zookeeper內部還有一個elemMap播急,用于存放 Session -> 過期時間
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
//2、收到心跳后售睹,我們會計算session應該更新到哪個桶里
Long newExpiryTime = roundToNextInterval(now + timeout);
//桶不變桩警,就不用更新expiryMap了
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
// First add the elem to the new expiry time bucket in expiryMap.
//3. 找到新的桶,插入進去
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
//4. 從舊的桶里移除
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
[圖片上傳失敗...(image-6f8444-1704619490775)]
其實就是這幅圖
1昌妹、放入一個:Session -> 過期時間 的Map中
2捶枢、收到心跳后,我們會計算session應該更新到哪個桶里
3飞崖、找到新的桶烂叔,插入進去
4、把session從舊的桶里移除
2固歪、SessionTracker不斷地輪訓蒜鸡,找到過期的Session集合,然后都過期掉
//org.apache.zookeeper.server.SessionTrackerImpl#run
@Override
public void run() {
try {
while (running) {
//1. 這個其實就是不斷地輪訓下一個要檢測的key
// 比如按我們的例子牢裳,應該是11點50分02秒檢測一次逢防、11點50分04秒檢測一次、11點50分06秒檢測一次...
// 這里的waitTime就是找到下次檢測需要等待多久蒲讯,比如現在是11點50分01秒了忘朝,這個waitTime就是1秒
// 如果是11點50分02秒了,waitTime就是0判帮,我們要開始把過期的session都失效掉了
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
//2. 取出過期的Session集合局嘁,全部都expire掉,如果session都及時發(fā)送了心跳了脊另,這里就會拿到一個空的集合
for (SessionImpl s : sessionExpiryQueue.poll()) {
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
其實就是這個圖