分布式鎖:分布式鎖是控制分布式系統(tǒng)之間同步訪問(wèn)共享資源的一種方式沼琉,如果不同的系統(tǒng)化或者同一個(gè)系統(tǒng)的不同主機(jī)之間共享一個(gè)或者一組資源瓶颠,那么在訪問(wèn)這些資源的時(shí)候,往往需要通過(guò)一些互斥手段防止彼此之間的干擾刺桃,以保證一致性粹淋,這種情況就需要使用分布式鎖。
1瑟慈、排他鎖實(shí)現(xiàn)
排他鎖:核心是如何保證當(dāng)前有且僅有一個(gè)事務(wù)獲取到鎖桃移,并且鎖釋放之后,所有正在等待獲取鎖的事務(wù)都能夠被通知到葛碧。
定義和獲取鎖:在zk中借杰,通過(guò)一個(gè)數(shù)據(jù)節(jié)點(diǎn)來(lái)表示一個(gè)鎖(因?yàn)樵趜k中數(shù)據(jù)節(jié)點(diǎn)是唯一的),zk會(huì)保證在所有客戶(hù)端中进泼,最終只有一個(gè)客戶(hù)端能夠創(chuàng)建一個(gè)節(jié)點(diǎn)成功蔗衡,最終只有一個(gè)客戶(hù)端能夠創(chuàng)建成功,那么久可以認(rèn)為這個(gè)客戶(hù)端獲得了鎖乳绕。同時(shí)沒(méi)有獲取到鎖的客戶(hù)端就需要到相應(yīng)節(jié)點(diǎn)上注冊(cè)一個(gè)子節(jié)點(diǎn)變更的watcher監(jiān)聽(tīng)绞惦,以便實(shí)時(shí)的監(jiān)聽(tīng)lock節(jié)點(diǎn)的變更情況。
釋放鎖:由于定義鎖的節(jié)點(diǎn)是一個(gè)臨時(shí)節(jié)點(diǎn)洋措,因此存在兩種情況釋放鎖1济蝉、當(dāng)前獲取鎖的客戶(hù)端機(jī)器發(fā)生宕機(jī)了。zk就會(huì)把該臨時(shí)節(jié)點(diǎn)刪除2、正常執(zhí)行完業(yè)務(wù)邏輯王滤,客戶(hù)端會(huì)主動(dòng)將自己創(chuàng)建的臨時(shí)節(jié)點(diǎn)刪除贺嫂。而無(wú)論什么情況下面刪除了節(jié)點(diǎn),zk都會(huì)通知所有在該節(jié)點(diǎn)上注冊(cè)了子節(jié)點(diǎn)變更watcher將的客戶(hù)端雁乡。這些客戶(hù)端在接收到通知后第喳,會(huì)再次重新發(fā)起分布式鎖獲取,即重復(fù)“獲取鎖過(guò)程”踱稍。
(獲取鎖和釋放鎖流程)
2墩弯、共享鎖實(shí)現(xiàn)
共享鎖:稱(chēng)讀鎖。如果事務(wù)T對(duì)數(shù)據(jù)對(duì)象O加了共享鎖寞射,那么當(dāng)前的事務(wù)只能對(duì)O進(jìn)行讀取操作,其他事務(wù)也只能對(duì)這個(gè)數(shù)據(jù)對(duì)象加共享鎖-直到該數(shù)據(jù)對(duì)象上的所有共享鎖都被釋放锌钮。
定義鎖:同樣是使用zk上的數(shù)據(jù)節(jié)點(diǎn)來(lái)表示一個(gè)鎖桥温,是一個(gè)類(lèi)似“/share_lock/[hostname]-請(qǐng)求類(lèi)型-序號(hào)”的臨時(shí)順序節(jié)點(diǎn)。
獲取鎖:在需要獲取共享鎖時(shí)候梁丘,所有客戶(hù)端都會(huì)到/share_lock這個(gè)節(jié)點(diǎn)下面創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)侵浸,如果當(dāng)前是讀請(qǐng)求,那么就是例如/share_lock/192.168.0.1-R-0000000001的節(jié)點(diǎn)氛谜;如果是寫(xiě)請(qǐng)求掏觉,那么就會(huì)創(chuàng)建例如/share_lock/192.168.0.1-W-0000000001的節(jié)點(diǎn)。
判斷讀寫(xiě)順序:不同事務(wù)都可以同時(shí)對(duì)一個(gè)數(shù)據(jù)對(duì)象進(jìn)行讀寫(xiě)操作值漫,而更新操作必須在當(dāng)前沒(méi)有任何事務(wù)進(jìn)行讀寫(xiě)操作的情況下面進(jìn)行澳腹。基于這個(gè)原則可以通過(guò)下面四個(gè)步驟來(lái)確定分布式讀寫(xiě)順序:
1杨何、客戶(hù)端調(diào)用create方法創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)
2酱塔、客戶(hù)端調(diào)用getChildren接口來(lái)獲取所有已經(jīng)創(chuàng)建的子節(jié)點(diǎn)列表(注意這里不注冊(cè)任何的watcher)
3、如果無(wú)法獲取共享鎖危虱,那么就調(diào)用exist來(lái)對(duì)比比自己小的那個(gè)節(jié)點(diǎn)注冊(cè)watcher羊娃。
對(duì)于讀請(qǐng)求:向比自己序號(hào)小的最后一個(gè)寫(xiě)請(qǐng)求節(jié)點(diǎn)注冊(cè)watcher監(jiān)聽(tīng)
對(duì)于寫(xiě)請(qǐng)求:向比自己序號(hào)小的最后一個(gè)節(jié)點(diǎn)注冊(cè)watcher監(jiān)聽(tīng)
4、等待watcher通知埃跷,繼續(xù)進(jìn)入步驟2
釋放鎖:和排他鎖一樣蕊玷。
3、使用curator實(shí)現(xiàn)分布式鎖案例
- 以一個(gè)“流水號(hào)生成“的場(chǎng)景為例”:
沒(méi)加鎖時(shí)候:
public class LockTest {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = format.format(new Date());
System.out.println("生成的訂單號(hào):" + orderNo);
}).start();
}
latch.countDown();
}
}
結(jié)果:
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|161
加鎖情況:
public class LockTest {
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
private static String orderNo = "1";
public static void main(String[] args) {
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
latch.await();
//加鎖
mutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = format.format(new Date());
System.out.println("生成的訂單號(hào):" + orderNo);
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
latch.countDown();
}
}
4弥雹、深入理解curator實(shí)現(xiàn)的分布式鎖
InterProcessMutex:分布式可重入排它鎖
InterProcessSemaphoreMutex:分布式排它鎖
InterProcessReadWriteLock:分布式讀寫(xiě)鎖
InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器
- 這里重點(diǎn)說(shuō)明InterProcessMutex垃帅,InterProcessMutex鎖是一種可重入鎖,采用介紹的共享鎖實(shí)現(xiàn)原理實(shí)現(xiàn)的剪勿。
(1)實(shí)現(xiàn)流程
1挺智、A創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
2、判斷是否加鎖(是否是第一個(gè)臨時(shí)子節(jié)點(diǎn)),此時(shí)A加鎖成功
3赦颇、B創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
4试幽、判斷是否加鎖成功,此時(shí)B加鎖失斢淞(因?yàn)锳獲取到了鎖)
5纵装、B加鎖失敗后監(jiān)聽(tīng)上一個(gè)節(jié)點(diǎn)的變化(也就是監(jiān)聽(tīng)A節(jié)點(diǎn),加上相應(yīng)的watcher)
6扇苞、A完成邏輯欺殿,釋放鎖,刪除對(duì)應(yīng)臨時(shí)順序節(jié)點(diǎn)
7鳖敷、zk通知客戶(hù)端B的監(jiān)聽(tīng)器脖苏,A客戶(hù)端的臨時(shí)節(jié)點(diǎn)刪除了
8、客戶(hù)端B再次嘗試去獲取鎖定踱,此時(shí)B字節(jié)的順序是最小的棍潘,獲取鎖成功(同時(shí)如果有C重復(fù)步驟4)
(2)源碼解讀
1、InterProcessMutex構(gòu)造函數(shù)
private final String basePath;
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
private static final String LOCK_NAME = "lock-";
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//1崖媚、創(chuàng)建數(shù)據(jù)容器
this.threadData = Maps.newConcurrentMap();
//2亦歉、驗(yàn)證傳入父路徑的合法性
this.basePath = PathUtils.validatePath(path);
//3、初始化LockInternals(LockInternals是加鎖和解鎖的實(shí)現(xiàn)類(lèi))
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
...
public class LockInternals {
private final CuratorFramework client;
//臨時(shí)順序子節(jié)點(diǎn)
private final String path;
//父節(jié)點(diǎn)
private final String basePath;
private final LockInternalsDriver driver;
private final String lockName;
private volatile int maxLeases;
....
2畅哑、加鎖
InterProcessMutex.class
//入?yún)榭针瓤{(diào)用該方法后,會(huì)一直堵塞荠呐,直到搶奪到鎖資源赛蔫,或者zookeeper連接中斷后,上拋異常泥张。
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
//入?yún)魅氤瑫r(shí)時(shí)間以及單位濒募,搶奪時(shí),如果出現(xiàn)堵塞圾结,會(huì)在超過(guò)該時(shí)間后瑰剃,返回false。
public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
3筝野、鎖的可重入性
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//每個(gè)InterProcessMutex實(shí)例晌姚,都會(huì)持有一個(gè)ConcurrentMap類(lèi)型的threadData對(duì)象,
//以線(xiàn)程對(duì)象作為Key歇竟,以LockData作為Value值挥唠。
//通過(guò)判斷當(dāng)前線(xiàn)程threadData是否有值,如果有焕议,則表示線(xiàn)程可以重入
//該鎖宝磨,于是將lockData的lockCount進(jìn)行累加;如果沒(méi)有,則進(jìn)行鎖的搶奪唤锉。
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
//internals.attemptLock方法返回lockPath!=null時(shí)世囊,表明了該線(xiàn)程已經(jīng)成功
//持有了這把鎖,于是乎LockData對(duì)象被new了出來(lái)窿祥,并存放到threadData中株憾。
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
4、搶奪鎖
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
//正常情況下晒衩,這個(gè)循環(huán)會(huì)在下一次結(jié)束嗤瞎。但是當(dāng)出現(xiàn)NoNodeException
//異常時(shí),會(huì)根據(jù)zookeeper客戶(hù)端的重試策略听系,進(jìn)行有限次數(shù)的重新獲取鎖贝奇。
while(!isDone) {
isDone = true;
try {
//創(chuàng)建這個(gè)鎖,即在zookeeper的指定路徑上靠胜,創(chuàng)建一個(gè)臨時(shí)序列節(jié)點(diǎn)掉瞳。
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//判斷是否獲取到鎖
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
//添加watcher
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
//如果你一開(kāi)始使用無(wú)參的acquire方法,那么此處的循環(huán)可能就是一個(gè)死
//循環(huán)髓帽。當(dāng)zookeeper客戶(hù)端啟動(dòng)時(shí),并且當(dāng)前線(xiàn)程還沒(méi)有成功獲取到鎖時(shí)脑豹,就會(huì)開(kāi)始新的一輪循環(huán)郑藏。
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//,就是獲取到所有子節(jié)點(diǎn)列表瘩欺,并且從小到大根據(jù)節(jié)點(diǎn)名稱(chēng)后10位數(shù)字進(jìn)行排序必盖。
List<String> children = this.getSortedChildren();
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//判斷是否可以持有鎖,
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
//前面字節(jié)點(diǎn)的path
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
//首先添加一個(gè)watcher監(jiān)聽(tīng)俱饿,而監(jiān)聽(tīng)的地址正是上面一步返回的pathToWatch進(jìn)行basePath + "/" 拼接以后的地址歌粥。 ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if (millisToWait == null) {
//wait(millisToWait)。線(xiàn)程交出cpu的占用拍埠,進(jìn)入等待狀態(tài)失驶,等到被喚醒。
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
;
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
//判斷是否可以持有鎖枣购,判斷規(guī)則:當(dāng)前創(chuàng)建的節(jié)點(diǎn)是否在上一步獲取到的子節(jié)點(diǎn)列表的首位嬉探。
//如果是,說(shuō)明可以持有鎖棉圈,那么getsTheLock = true涩堤,封裝進(jìn)PredicateResults返回。
//如果不是分瘾,說(shuō)明有其他線(xiàn)程早已先持有了鎖胎围,那么getsTheLock = false,此處還需要獲取到自己前一個(gè)臨時(shí)節(jié)點(diǎn)的名稱(chēng)pathToWatch。
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
5白魂、釋放鎖
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
//減少重入鎖的計(jì)數(shù)汽纤,直到變成0。
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else {
try {
//釋放鎖碧聪,即移除移除Watchers & 刪除創(chuàng)建的節(jié)點(diǎn)
this.internals.releaseLock(lockData.lockPath);
} finally {
//從threadData中冒版,刪除自己線(xiàn)程的緩存
this.threadData.remove(currentThread);
}
}
}
}
}
參考資料:
《從Paxos到Zookeeper 分布式一致性原理與實(shí)踐》》
http://www.reibang.com/p/6618471f6e75
https://juejin.im/post/5c01532ef265da61362232ed