利用Zookeeper來實現(xiàn)分布式鎖,主要基于其臨時(或臨時有序)節(jié)點和watch機制.
為什么是臨時節(jié)點?
臨時節(jié)點的特性,在連接斷開的情況下節(jié)點能被刪除,所以即使客戶端發(fā)生故障鎖也能被釋放,避免死鎖的發(fā)生.
為什么是有序節(jié)點?
當然不用有序節(jié)點也是可以實現(xiàn)的.每個客戶端嘗試創(chuàng)建同一個臨時節(jié)點,創(chuàng)建者獲得鎖,創(chuàng)建失敗的客戶端監(jiān)聽這個鎖節(jié)點.
但是當客戶端太多的時候,會形成羊群效應,因為只有一個客戶端能獲取鎖,其他客戶端都因失敗而需要監(jiān)聽這個鎖節(jié)點的刪除事件,當獲得鎖的客戶端完成業(yè)務后釋放鎖即刪除這個鎖節(jié)點時,zk要給所有監(jiān)視的客戶端發(fā)送通知,這樣大量的消息通知可能會造成ZK的阻塞.
在這種場景下,更優(yōu)化的方式是使用有序節(jié)點.
每個未獲得鎖的客戶端只需要監(jiān)聽排在他前面的那個節(jié)點,每次節(jié)點刪除也只需要通知一個客戶端即可.
Curator(Zookeeper的Java客戶端)就是用的臨時有序節(jié)點和watch機制來實現(xiàn)分布式鎖的.
步驟如下(這里本來想畫一張圖的,但畫圖能力有限,還沒有畫出滿意的圖來)
- 每個客戶端基于節(jié)點/mylock創(chuàng)建臨時有序子節(jié)點/mylock/lock-,比如第一個創(chuàng)建的/mylock/lock-0000000000,第二個/mylock/lock-0000000001......
- 客戶端獲取/mylock節(jié)點的子節(jié)點列表并按升序排序,判斷自己創(chuàng)建的節(jié)點是否排在第一個.如果排在第一個則表示獲得鎖,否則監(jiān)聽前一個節(jié)點的刪除事件.
- 獲得鎖的客戶端進行業(yè)務處理.完成后刪除子節(jié)點,釋放鎖.監(jiān)聽該子節(jié)點的客戶端收到通知,嘗試獲取鎖.
針對上述步驟考慮幾個場景
場景1. 比如當前獲得鎖的節(jié)點是/mylock/lock-0000000000,而節(jié)點/mylock/lock-0000000001還沒有對/mylock/lock-0000000000設(shè)置好監(jiān)聽事件的時候/mylock/lock-0000000000節(jié)點刪除了
/mylock/lock-0000000001對應的客戶端對/mylock/lock-0000000000設(shè)置監(jiān)聽的時候,如果該節(jié)點刪除了會拋出一個NoNodeException異常;這個時候可以生吞這個異常重新嘗試獲取鎖.
場景2. 比如當前獲得鎖的節(jié)點是/mylock/lock-0000000000,而節(jié)點/mylock/lock-0000000005對應的客戶端突然宕機了,該節(jié)點被刪除;
這個時候創(chuàng)建/mylock/lock-0000000006節(jié)點的客戶端會收到節(jié)點刪除的通知,然后嘗試獲取鎖,發(fā)現(xiàn)自己獲取不到鎖,則監(jiān)聽/mylock/lock-0000000004子節(jié)點的刪除事件.
編碼實現(xiàn)分布式鎖
看到這才是實現(xiàn)分布式鎖的正確姿勢厘惦!這篇文章才知道原來Spring早就為我們提供了分布式鎖的實現(xiàn)了.不過其實也是用的Curator來實現(xiàn)的啦.看下依賴關(guān)系就知道啦.
重點看下類org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry
Spring封裝之后使用也非常簡單,大概步驟就是這樣:
Lock lock = zookeeperLockRegistry.obtain("lock-xh");
//tryLock()zk內(nèi)部實現(xiàn)的是一個超時接口
if (lock.tryLock()) {
//業(yè)務邏輯
lock.unlock();
}
我這里寫了一個Demo,代碼在github上 sb-learn-distributedlock-zk
模擬2個線程爭搶這個鎖lock-xh
,讓線程B先啟動,線程A休眠一段時間再啟動.然后可以得到線程B搶到了鎖.
通過zkCli.sh
查看鎖節(jié)點,發(fā)現(xiàn)在/SpringIntegration-LockRegistry/lock-xh
節(jié)點下創(chuàng)建了2個臨時有序節(jié)點.
[zk: 10.45.82.76(CONNECTED) 2] ls /SpringIntegration-LockRegistry/lock-xh
[_c_413a0764-4abe-4476-b241-a33f9a4af228-lock-0000000009, _c_11e2b7fb-ca26-4a4c-8832-b3d8e8b741de-lock-0000000008]
其實從這里也能看出來,他是利用了zk的臨時有序節(jié)點來實現(xiàn)的.兩個線程都到這個鎖節(jié)點下創(chuàng)建子節(jié)點.然后按照順序誰排前面誰就獲得了鎖.
注意
我開始編碼的時候,spring-boot-starter-parent
用了2.2.0.BUILD-SNAPSHOT
版本念搬,其依賴的curator
為4.0.1
版本,而我連的zk版本是3.4.11
.所以我測試的時候報錯了:
Caused by: org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /SpringIntegration-LockRegistry/lock-xh/_c_48572564-d1d3-4134-9491-b359d756acc2-lock-
at org.apache.zookeeper.KeeperException.create(KeeperException.java:103)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1525)
at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1181)
at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1158)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:1155)
at org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:605)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:595)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:573)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:49)
at org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver.createsTheLock(StandardLockInternalsDriver.java:54)
at org.apache.curator.framework.recipes.locks.LockInternals.attemptLock(LockInternals.java:225)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.internalLock(InterProcessMutex.java:237)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.acquire(InterProcessMutex.java:108)
at org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry$ZkLock.tryLock(ZookeeperLockRegistry.java:300)
... 3 more
是的!要注意服務端zk和客戶端Curator
版本的兼容性,具體請看這里ZooKeeper Version Compatibility
關(guān)于獲取鎖的邏輯,重點看下Curator
的org.apache.curator.framework.recipes.locks.LockInternals
,我這里用的Curator版本4.0.1
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//省略其他代碼
//創(chuàng)建臨時有序節(jié)點 如果父節(jié)點沒有也同時創(chuàng)建
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//阻塞直到獲得鎖,或者等待時間過了退出或者線程中斷退出
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
//省略其他代碼
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if(this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
//不斷循環(huán)嘗試獲得鎖
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//獲取節(jié)點下的所有子節(jié)點并排序
List<String> children = this.getSortedChildren();
//ourPath如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
//basePath如/SpringIntegration-LockRegistry/lock-xh
//得到當前線程創(chuàng)建的有序節(jié)點名稱 比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//檢查當前節(jié)點是否排在第一個,如果排在第一個則獲得鎖,如果沒有獲得鎖,則尋找需要監(jiān)視的節(jié)點(即有序節(jié)點列表中排在當前節(jié)點前面的那個節(jié)點)
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
//獲得鎖啦
if(predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
//未獲得鎖,需要對前一個節(jié)點進行監(jiān)視
//得到前一個有序節(jié)點的qu路徑
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
//設(shè)置監(jiān)視器; 這里有一種場景,即設(shè)置監(jiān)視器的時候可能上一個節(jié)點已經(jīng)被刪除了.對于這種情況,會拋出NoNodeException異常;
//下面直接生吞了這種異常.繼續(xù)循環(huán)嘗試獲得鎖.
//這里使用getData()接口而不是checkExists()是因為,如果前一個子節(jié)點已經(jīng)被刪除了那么會拋出異常而且不會設(shè)置事件監(jiān)聽器,
//而checkExists雖然也可以獲取到節(jié)點是否存在的信息但是同時設(shè)置了監(jiān)聽器,這個監(jiān)聽器其實永遠不會觸發(fā),對于zookeeper來說屬于資源泄露
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if(millisToWait == null) {
this.wait();
} else {
millisToWait = Long.valueOf(millisToWait.longValue() - (System.currentTimeMillis() - startMillis));
startMillis = System.currentTimeMillis();
if(millisToWait.longValue() > 0L) {
this.wait(millisToWait.longValue());
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
;
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if(doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
注意
上面關(guān)于鎖節(jié)點,可能會有點迷糊,為什么中間會有一串隨機數(shù)?
org.apache.curator.framework.imps.CreateBuilderImpl#adjustPath
比如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
是在/SpringIntegration-LockRegistry/lock-xh/lock-的基礎(chǔ)上調(diào)整而來的,分成路徑/SpringIntegration-LockRegistry/lock-xh和節(jié)點lock-
然后節(jié)點lock-前面拼接上"_c_" + protectedId + "-"
比如這里的protectedId=35957bd7-a9e9-4f6f-a9f3-c131b9c3734c
最后拼成的節(jié)點全路徑即:/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-
最后再創(chuàng)建有序節(jié)點的時候尾巴上補上了有序序列號0000000000
那么我們現(xiàn)在知道了這個節(jié)點的名稱的創(chuàng)建邏輯,那么既然這是串隨機數(shù),我們怎么能保證先創(chuàng)建的節(jié)點就能排在前面呢鸟蜡?
其實原因就在上面源碼中g(shù)etSortedChildren方法,里面排序的時候并不是按照整個節(jié)點名稱比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000來排序的,而是按照后面的有序序列號比如0000000000來排序的!
//獲取節(jié)點下的所有子節(jié)點并排序
List<String> children = this.getSortedChildren();
關(guān)于zk實現(xiàn)分布式鎖的學習資料
7 張圖講清楚ZooKeeper分布式鎖實現(xiàn)原理
這才是實現(xiàn)分布式鎖的正確姿勢!