1.分布式鎖
在我們進(jìn)行單機(jī)應(yīng)用開發(fā),涉及并發(fā)同步的時(shí)候岭粤,我們往往采用synchronized或者Lock的方式來(lái)解決多線程間的代碼同步問(wèn)題嫁审。但當(dāng)我們的應(yīng)用是分布式部署的情況下,那么就需要一種更加高級(jí)的鎖機(jī)制來(lái)處理這個(gè)進(jìn)程級(jí)別的代碼同步問(wèn)題久橙。那么接下來(lái)引出現(xiàn)在比較常用的幾種分布式鎖實(shí)現(xiàn)方案,如下圖:
而在這幾次的實(shí)現(xiàn)方案也是各有優(yōu)缺點(diǎn)管怠,對(duì)比如下:
2.Curator的分布式鎖介紹
今天我們主要介紹這個(gè)基于Zookeeper實(shí)現(xiàn)的分布式鎖方案(Curator)淆衷,當(dāng)然隨著我們?nèi)チ私釩urator這個(gè)產(chǎn)品的時(shí)候,會(huì)驚喜的發(fā)現(xiàn)渤弛,它帶給我們的不僅僅是分布式鎖的實(shí)現(xiàn)祝拯。此處先不做介紹,我會(huì)另外用博客來(lái)記錄她肯,有興趣的朋友可以自行下載這個(gè)項(xiàng)目來(lái)解讀鹿驼。 apache/curator
現(xiàn)在先讓我們看看Curator的幾種鎖方案:
- InterProcessMutex:分布式可重入排它鎖
- InterProcessSemaphoreMutex:分布式排它鎖
- InterProcessReadWriteLock:分布式讀寫鎖
- InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器
接下來(lái)我們以InterProcessMutex為例,介紹一下這個(gè)分布式可重入排它鎖的實(shí)現(xiàn)原理
3.InterProcessMutex代碼跟蹤
一辕宏、獲取鎖的過(guò)程
1).實(shí)例化InterProcessMutex:
// 代碼進(jìn)入:InterProcessMutex.java
/**
* @param client client
* @param path the path to lock
*/
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, new StandardLockInternalsDriver());
}
/**
* @param client client
* @param path the path to lock
* @param driver lock driver
*/
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
{
this(client, path, LOCK_NAME, 1, driver);
}
兩個(gè)構(gòu)造函數(shù)共同的入?yún)ⅲ?/p>
- client:curator實(shí)現(xiàn)的zookeeper客戶端
- path:要在zookeeper加鎖的路徑畜晰,即后面創(chuàng)建臨時(shí)節(jié)點(diǎn)的父節(jié)點(diǎn)
我們可以看到上面兩個(gè)構(gòu)造函數(shù)中,其實(shí)第一個(gè)也是在調(diào)用第二個(gè)構(gòu)造函數(shù)瑞筐,它傳入了一個(gè)默認(rèn)的StandardLockInternalsDriver對(duì)象凄鼻,即標(biāo)準(zhǔn)的鎖驅(qū)動(dòng)類(該類的作用在后面會(huì)介紹)。就是說(shuō)InterProcessMutex也支持你傳入自定義的鎖驅(qū)動(dòng)類來(lái)擴(kuò)展聚假。
// 代碼進(jìn)入:InterProcessMutex.java
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
// 代碼進(jìn)入:LockInternals.java
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
{
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
跟著構(gòu)造函數(shù)的代碼走完块蚌,它接著做了兩件事:驗(yàn)證入?yún)ath的合法性 & 實(shí)例化了一個(gè)LockInternals對(duì)象。
2).加鎖方法acquire:
實(shí)例化完成的InterProcessMutex對(duì)象膘格,開始調(diào)用acquire()方法來(lái)嘗試加鎖:
// 代碼進(jìn)入:InterProcessMutex.java
/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
/**
* Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread
* can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call
* to {@link #release()}
*
* @param time time to wait
* @param unit time unit
* @return true if the mutex was acquired, false if not
* @throws Exception ZK errors, connection interruptions
*/
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
- acquire() :入?yún)榭涨头叮{(diào)用該方法后,會(huì)一直堵塞瘪贱,直到搶奪到鎖資源纱控,或者zookeeper連接中斷后,上拋異常菜秦。
- acquire(long time, TimeUnit unit):入?yún)魅氤瑫r(shí)時(shí)間以及單位甜害,搶奪時(shí),如果出現(xiàn)堵塞球昨,會(huì)在超過(guò)該時(shí)間后尔店,返回false。
對(duì)比兩種方式,可以選擇適合自己業(yè)務(wù)邏輯的方法嚣州。但是一般情況下鲫售,我推薦后者,傳入超時(shí)時(shí)間该肴,避免出現(xiàn)大量的臨時(shí)節(jié)點(diǎn)累積以及線程堵塞的問(wèn)題情竹。
3).鎖的可重入:
// 代碼進(jìn)入:InterProcessMutex.java
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
這段代碼里面,實(shí)現(xiàn)了鎖的可重入沙庐。每個(gè)InterProcessMutex實(shí)例,都會(huì)持有一個(gè)ConcurrentMap類型的threadData對(duì)象佳吞,以線程對(duì)象作為Key拱雏,以LockData作為Value值。通過(guò)判斷當(dāng)前線程threadData是否有值底扳,如果有铸抑,則表示線程可以重入該鎖,于是將lockData的lockCount進(jìn)行累加衷模;如果沒(méi)有鹊汛,則進(jìn)行鎖的搶奪。
internals.attemptLock方法返回lockPath!=null時(shí)阱冶,表明了該線程已經(jīng)成功持有了這把鎖刁憋,于是乎LockData對(duì)象被new了出來(lái),并存放到threadData中木蹬。
4).搶奪鎖:
重頭戲來(lái)了至耻,attemptLock方法就是核心部分,直接看代碼:
// 代碼進(jìn)入:LockInternals.java
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
此處注意三個(gè)地方
- 1.while循環(huán)
正常情況下镊叁,這個(gè)循環(huán)會(huì)在下一次結(jié)束尘颓。但是當(dāng)出現(xiàn)NoNodeException異常時(shí),會(huì)根據(jù)zookeeper客戶端的重試策略晦譬,進(jìn)行有限次數(shù)的重新獲取鎖疤苹。 - 2.driver.createsTheLock
顧名思義,這個(gè)driver的createsTheLock方法就是在創(chuàng)建這個(gè)鎖敛腌,即在zookeeper的指定路徑上卧土,創(chuàng)建一個(gè)臨時(shí)序列節(jié)點(diǎn)。注意:此時(shí)只是純粹的創(chuàng)建了一個(gè)節(jié)點(diǎn)像樊,不是說(shuō)線程已經(jīng)持有了鎖夸溶。
// 代碼進(jìn)入:StandardLockInternalsDriver.java
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
- 3.internalLockLoop
判斷自身是否能夠持有鎖。如果不能凶硅,進(jìn)入wait缝裁,等待被喚醒。
// 代碼進(jìn)入:LockInternals.java
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
誒!又是一大片代碼捷绑。好吧韩脑,咱還是分段挑里面重要的說(shuō)。
- while循環(huán)
如果你一開始使用無(wú)參的acquire方法粹污,那么此處的循環(huán)可能就是一個(gè)死循環(huán)段多。當(dāng)zookeeper客戶端啟動(dòng)時(shí),并且當(dāng)前線程還沒(méi)有成功獲取到鎖時(shí)壮吩,就會(huì)開始新的一輪循環(huán)进苍。
- getSortedChildren
這個(gè)方法比較簡(jiǎn)單,就是獲取到所有子節(jié)點(diǎn)列表鸭叙,并且從小到大根據(jù)節(jié)點(diǎn)名稱后10位數(shù)字進(jìn)行排序觉啊。在上面提到了,創(chuàng)建的是序列節(jié)點(diǎn)沈贝。如下生成的示例:
- driver.getsTheLock
// 代碼進(jìn)入:StandardLockInternalsDriver.java
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
判斷是否可以持有鎖杠人,判斷規(guī)則:當(dāng)前創(chuàng)建的節(jié)點(diǎn)是否在上一步獲取到的子節(jié)點(diǎn)列表的首位。
如果是宋下,說(shuō)明可以持有鎖嗡善,那么getsTheLock = true,封裝進(jìn)PredicateResults返回学歧。
如果不是罩引,說(shuō)明有其他線程早已先持有了鎖,那么getsTheLock = false枝笨,此處還需要獲取到自己前一個(gè)臨時(shí)節(jié)點(diǎn)的名稱pathToWatch蜒程。(注意這個(gè)pathToWatch后面有比較關(guān)鍵的作用)
- synchronized(this)
這塊代碼在爭(zhēng)奪鎖失敗以后的邏輯中。那么此處該線程應(yīng)該做什么呢伺帘?
首先添加一個(gè)watcher監(jiān)聽(tīng)昭躺,而監(jiān)聽(tīng)的地址正是上面一步返回的pathToWatch進(jìn)行basePath + "/" 拼接以后的地址。也就是說(shuō)當(dāng)前線程會(huì)監(jiān)聽(tīng)自己前一個(gè)節(jié)點(diǎn)的變動(dòng)伪嫁,而不是父節(jié)點(diǎn)下所有節(jié)點(diǎn)的變動(dòng)领炫。然后華麗麗的...wait(millisToWait)。線程交出cpu的占用张咳,進(jìn)入等待狀態(tài)帝洪,等到被喚醒。
接下來(lái)的邏輯就很自然了脚猾,如果自己監(jiān)聽(tīng)的節(jié)點(diǎn)發(fā)生了變動(dòng)葱峡,那么就將線程從等待狀態(tài)喚醒,重新一輪的鎖的爭(zhēng)奪龙助。
自此, 我們完成了整個(gè)鎖的搶奪過(guò)程砰奕。
二、釋放鎖
相對(duì)上面獲取鎖的長(zhǎng)篇大論來(lái)說(shuō),釋放的邏輯就很簡(jiǎn)單了军援。
// 代碼進(jìn)入:InterProcessMutex.java
/**
* Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
* thread had made multiple calls to acquire, the mutex will still be held when this method returns.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
@Override
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
- 減少重入鎖的計(jì)數(shù)仅淑,直到變成0。
- 釋放鎖胸哥,即移除移除Watchers & 刪除創(chuàng)建的節(jié)點(diǎn)
- 從threadData中涯竟,刪除自己線程的緩存
三、鎖驅(qū)動(dòng)類
開始的時(shí)候空厌,我們提到了這個(gè)StandardLockInternalsDriver-標(biāo)準(zhǔn)鎖驅(qū)動(dòng)類庐船。還提到了我們可以傳入自定義的,來(lái)擴(kuò)展嘲更。
是的筐钟,我們先來(lái)看看這個(gè)它提供的功能接口:
// 代碼進(jìn)入LockInternalsDriver.java
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception;
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception;
// 代碼進(jìn)入LockInternalsSorter.java
public String fixForSorting(String str, String lockName);
- getsTheLock:判斷是夠獲取到了鎖
- createsTheLock:在zookeeper的指定路徑上,創(chuàng)建一個(gè)臨時(shí)序列節(jié)點(diǎn)哮内。
- fixForSorting:修復(fù)排序盗棵,在StandardLockInternalsDriver的實(shí)現(xiàn)中壮韭,即獲取到臨時(shí)節(jié)點(diǎn)的最后序列數(shù)北发,進(jìn)行排序。
借助于這個(gè)類喷屋,我們可以嘗試實(shí)現(xiàn)自己的鎖機(jī)制琳拨,比如判斷鎖獲得的策略可以做修改,比如獲取子節(jié)點(diǎn)列表的排序方案可以自定義屯曹。狱庇。。
4.InterProcessMutex原理總結(jié)
InterProcessMutex通過(guò)在zookeeper的某路徑節(jié)點(diǎn)下創(chuàng)建臨時(shí)序列節(jié)點(diǎn)來(lái)實(shí)現(xiàn)分布式鎖恶耽,即每個(gè)線程(跨進(jìn)程的線程)獲取同一把鎖前密任,都需要在同樣的路徑下創(chuàng)建一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)名字由uuid + 遞增序列組成偷俭。而通過(guò)對(duì)比自身的序列數(shù)是否在所有子節(jié)點(diǎn)的第一位浪讳,來(lái)判斷是否成功獲取到了鎖。當(dāng)獲取鎖失敗時(shí)涌萤,它會(huì)添加watcher來(lái)監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)的變動(dòng)情況淹遵,然后進(jìn)行等待狀態(tài)。直到watcher的事件生效將自己?jiǎn)拘迅合蛘叱瑫r(shí)時(shí)間異常返回透揣。
5.參考資料
6.寫在最后的話
在最近看的一本書叫《從Paxos到Zookeeper 分布式一致性原理與實(shí)踐》中也提到了一個(gè)關(guān)于基于zookeeper的排它鎖的實(shí)現(xiàn)方案,大致的想法是通過(guò)zookeeper節(jié)點(diǎn)不能重復(fù)的特性川抡,來(lái)判斷是否成功持有了鎖辐真。跟InterProcessMutex對(duì)比來(lái)看,還是后者更靈活些,而且后者的監(jiān)聽(tīng)范圍僅限于前一個(gè)節(jié)點(diǎn)的變動(dòng)拆祈,更小粒度的監(jiān)聽(tīng)范圍可以帶來(lái)更好的性能恨闪。
如若此文能讓您有所得,便是吾之大幸放坏!
本博文歡迎轉(zhuǎn)載咙咽,轉(zhuǎn)載請(qǐng)注明出處和作者。