curator分布式鎖,大概過程:
創(chuàng)建臨時有序節(jié)點(diǎn)上煤,排序祟绊,最先創(chuàng)建節(jié)點(diǎn)的獲取到鎖牧抽,其他節(jié)點(diǎn)監(jiān)聽前一個節(jié)點(diǎn)刪除事件。當(dāng)監(jiān)聽到時扬舒,則重新進(jìn)行排序讲坎,index最小的獲取到鎖。
public class lockService {
@Autowired
private ZookeeperDao dao;
InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");
public void loclMethod(){
try {
//獲取鎖
//一直等待鎖
//lock.acquire();
//嘗試獲取鎖,如果在指定時間獲取鎖,則返回true
if (lock.acquire(1000, TimeUnit.SECONDS)){
int check = dao.check();
if (check > 0){
System.out.println("售出");
dao.des(--check);
}else {
System.out.println("沒有庫存");
}
}
//Thread.sleep(50);
} catch (Exception e) {
System.out.println(e);
}finally {
try {
//釋放鎖
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
interProcessMutex.acquire(1000, TimeUnit.SECONDS)
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
private final LockInternals internals;
private final String basePath;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static class LockData
{
final Thread owningThread;
final String lockPath;
//分布式鎖重入次數(shù)
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
private static final String LOCK_NAME = "lock-";
@Override
public void acquire() throws Exception
{ //獲取鎖瓮栗,一直等待
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
//獲取鎖,等待指定時間
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{ //獲取當(dāng)前線程
Thread currentThread = Thread.currentThread();
//從ConcurrentMap<Thread, LockData>中獲取當(dāng)前線程的鎖數(shù)據(jù)弥激,不為空微服,則直接獲取鎖
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{ //實(shí)現(xiàn)可重入
// 統(tǒng)計重入次數(shù)
lockData.lockCount.incrementAndGet();
return true;
}
//ConcurrentMap<Thread, LockData>中沒有當(dāng)前線程的所數(shù)據(jù)缨历,則當(dāng)前線程嘗試去獲取鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{ //獲取到鎖辛孵,封裝鎖數(shù)據(jù)
LockData newLockData = new LockData(currentThread, lockPath);
//保存到ConcurrentMap<Thread, LockData>緩存中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
嘗試獲取鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
public class LockInternals
{
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
//無線等待時millisToWait 為null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//節(jié)點(diǎn)數(shù)據(jù)
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//當(dāng)前重試獲取鎖次數(shù)
int retryCount = 0;
// 在Zookeeper中創(chuàng)建的臨時順序節(jié)點(diǎn)的路徑觉吭,相當(dāng)于一把待激活的分布式鎖
// 激活條件:同級目錄子節(jié)點(diǎn)鲜滩,名稱排序最小(排隊(duì)榜聂,公平鎖)
String ourPath = null;
// 是否已經(jīng)持有分布式鎖
boolean hasTheLock = false;
// 是否已經(jīng)完成嘗試獲取分布式鎖的操作
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{ //StandardLockInternalsDriver须肆,創(chuàng)建有序臨時節(jié)點(diǎn)
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//是否獲取當(dāng)前鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{ //成功獲取鎖
return ourPath;
}
return null;
}
//是否獲取當(dāng)鎖
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{ //是否擁有分布式鎖
boolean haveTheLock = false;
//是否需要刪除子節(jié)點(diǎn)
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//循環(huán)嘗試獲取鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{ //排序節(jié)點(diǎn)
List<String> children = getSortedChildren();
// 獲取前面自己創(chuàng)建的臨時順序子節(jié)點(diǎn)的名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// StandardLockInternalsDriver
//判斷是否回去鎖,沒有獲取返回監(jiān)聽路徑
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{ //獲得鎖
haveTheLock = true;
}
else
{ //沒有所得到鎖泄隔,監(jiān)聽上一個臨時順序節(jié)點(diǎn)
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{ //上一個臨時順序節(jié)點(diǎn)如果被刪除拒贱,會喚醒當(dāng)前線程繼續(xù)競爭鎖
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{ //獲取鎖超時,標(biāo)記刪除之前創(chuàng)建的臨時順序節(jié)點(diǎn)
doDelete = true;
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
//創(chuàng)建有序臨時節(jié)點(diǎn)
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
}
StandardLockInternalsDriver.getsTheLock(client, children, sequenceNodeName, maxLeases);
public class StandardLockInternalsDriver implements LockInternalsDriver
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{ // 之前創(chuàng)建的臨時順序節(jié)點(diǎn)在排序后的子節(jié)點(diǎn)列表中的索引
int ourIndex = children.indexOf(sequenceNodeName);
// 校驗(yàn)之前創(chuàng)建的臨時順序節(jié)點(diǎn)是否有效
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases 初始化為1
//ourIndex 為0 表示是當(dāng)前排序的節(jié)點(diǎn)里最先創(chuàng)建出節(jié)點(diǎn)的連接佛嬉,也就是越早創(chuàng)建節(jié)點(diǎn)的越早獲取到鎖
boolean getsTheLock = ourIndex < maxLeases;
//獲取到鎖則不需要監(jiān)聽逻澳,沒有獲取到鎖,則監(jiān)聽前一個節(jié)點(diǎn)的刪除事件
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
// 返回獲取鎖的結(jié)果暖呕,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
return new PredicateResults(pathToWatch, getsTheLock);
}
//校驗(yàn)節(jié)點(diǎn)有效性
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
{
if ( ourIndex < 0 )
{ // 由于會話過期或連接丟失等原因斜做,該線程創(chuàng)建的臨時順序節(jié)點(diǎn)被Zookeeper服務(wù)端刪除,往外拋出NoNodeException
throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
}
}
}
釋放鎖
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
@Override
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{ // 無法從映射表中獲取鎖信息湾揽,不持有鎖
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{ // 鎖是可重入的,初始值為1钝腺,原子-1到0抛姑,鎖才釋放
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{ //釋放鎖
internals.releaseLock(lockData.lockPath);
}
finally
{ //從映射中刪除當(dāng)前線程信息
threadData.remove(currentThread);
}
}
}
public class LockInternals
{
final void releaseLock(String lockPath) throws Exception
{ //刪除監(jiān)聽
client.removeWatchers();
revocable.set(null);
//刪除節(jié)點(diǎn)
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{ // 后臺不斷嘗試刪除
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
}