許多場(chǎng)景中,數(shù)據(jù)一致性是一個(gè)比較重要的話題,在單機(jī)環(huán)境中混卵,我們可以通過Java提供的并發(fā)API來解決;而在分布式環(huán)境(會(huì)遇到網(wǎng)絡(luò)故障窖张、消息重復(fù)幕随、消息丟失等各種問題)下要復(fù)雜得多,常見的解決方案是分布式事務(wù)宿接、分布式鎖等赘淮。
本文主要探討如何利用Zookeeper來實(shí)現(xiàn)分布式鎖。
關(guān)于分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式睦霎。
在實(shí)現(xiàn)分布式鎖的過程中需要注意的:
鎖的可重入性(遞歸調(diào)用不應(yīng)該被阻塞梢卸、避免死鎖)
鎖的超時(shí)(避免死鎖、死循環(huán)等意外情況)
鎖的阻塞(保證原子性等)
鎖的特性支持(阻塞鎖副女、可重入鎖蛤高、公平鎖、聯(lián)鎖碑幅、信號(hào)量戴陡、讀寫鎖)
在使用分布式鎖時(shí)需要注意:
分布式鎖的開銷(分布式鎖一般能不用就不用,有些場(chǎng)景可以用樂觀鎖代替)
加鎖的粒度(控制加鎖的粒度沟涨,可以優(yōu)化系統(tǒng)的性能)
加鎖的方式
以下是幾種常見的實(shí)現(xiàn)分布式鎖的方案及其優(yōu)缺點(diǎn)恤批。
基于數(shù)據(jù)庫(kù)
1. 基于數(shù)據(jù)庫(kù)表
最簡(jiǎn)單的方式可能就是直接創(chuàng)建一張鎖表,當(dāng)我們要鎖住某個(gè)方法或資源時(shí)裹赴,我們就在該表中增加一條記錄喜庞,想要釋放鎖的時(shí)候就刪除這條記錄诀浪。給某字段添加唯一性約束,如果有多個(gè)請(qǐng)求同時(shí)提交到數(shù)據(jù)庫(kù)的話延都,數(shù)據(jù)庫(kù)會(huì)保證只有一個(gè)操作可以成功雷猪,那么我們就可以認(rèn)為操作成功的那個(gè)線程獲得了該方法的鎖,可以執(zhí)行方法體內(nèi)容晰房。
會(huì)引入數(shù)據(jù)庫(kù)單點(diǎn)春宣、無(wú)失效時(shí)間、不阻塞嫉你、不可重入等問題月帝。
2. 基于數(shù)據(jù)庫(kù)排他鎖
如果使用的是MySql的InnoDB引擎,在查詢語(yǔ)句后面增加for update
幽污,數(shù)據(jù)庫(kù)會(huì)在查詢過程中(須通過唯一索引查詢)給數(shù)據(jù)庫(kù)表增加排他鎖嚷辅,我們可以認(rèn)為獲得排它鎖的線程即可獲得分布式鎖,通過 connection.commit() 操作來釋放鎖距误。
會(huì)引入數(shù)據(jù)庫(kù)單點(diǎn)簸搞、不可重入、無(wú)法保證一定使用行鎖(部分情況下MySQL自動(dòng)使用表鎖而不是行鎖)准潭、排他鎖長(zhǎng)時(shí)間不提交導(dǎo)致占用數(shù)據(jù)庫(kù)連接等問題趁俊。
3. 數(shù)據(jù)庫(kù)實(shí)現(xiàn)分布式鎖總結(jié)
優(yōu)點(diǎn):
- 直接借助數(shù)據(jù)庫(kù),容易理解刑然。
缺點(diǎn):
會(huì)引入更多的問題寺擂,使整個(gè)方案變得越來越復(fù)雜
操作數(shù)據(jù)庫(kù)需要一定的開銷,有一定的性能問題
使用數(shù)據(jù)庫(kù)的行級(jí)鎖并不一定靠譜泼掠,尤其是當(dāng)我們的鎖表并不大的時(shí)候
基于緩存
相比較于基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)分布式鎖的方案來說怔软,基于緩存來實(shí)現(xiàn)在性能方面會(huì)表現(xiàn)的更好一點(diǎn)。目前有很多成熟的緩存產(chǎn)品择镇,包括Redis挡逼、memcached、tair等腻豌。
這里以Redis為例舉出幾種實(shí)現(xiàn)方法:
1. 基于 redis 的 setnx()家坎、expire() 方法做分布式鎖
setnx 的含義就是 SET if Not Exists
,其主要有兩個(gè)參數(shù) setnx(key, value)
吝梅。該方法是原子的虱疏,如果 key 不存在,則設(shè)置當(dāng)前 key 成功憔涉,返回 1订框;如果當(dāng)前 key 已經(jīng)存在,則設(shè)置當(dāng)前 key 失敗兜叨,返回 0穿扳。
expire 設(shè)置過期時(shí)間,要注意的是 setnx 命令不能設(shè)置 key 的超時(shí)時(shí)間国旷,只能通過 expire() 來對(duì) key 設(shè)置矛物。
2. 基于 redis 的 setnx()、get()跪但、getset()方法做分布式鎖
getset 這個(gè)命令主要有兩個(gè)參數(shù) getset(key履羞,newValue)
,該方法是原子的屡久,對(duì) key 設(shè)置 newValue 這個(gè)值忆首,并且返回 key 原來的舊值。
3. 基于 Redlock 做分布式鎖
Redlock 是 Redis 的作者 antirez 給出的集群模式的 Redis 分布式鎖被环,它基于 N 個(gè)完全獨(dú)立的 Redis 節(jié)點(diǎn)(通常情況下 N 可以設(shè)置成 5)
4. 基于 redisson 做分布式鎖
redisson 是 redis 官方的分布式鎖組件
基于緩存實(shí)現(xiàn)分布式鎖總結(jié)
優(yōu)點(diǎn):
- 性能好
缺點(diǎn):
實(shí)現(xiàn)中需要考慮的因素太多
通過超時(shí)時(shí)間來控制鎖的失效時(shí)間并不是十分的靠譜
基于Zookeeper
大致思想為:每個(gè)客戶端對(duì)某個(gè)方法加鎖時(shí)糙及,在 Zookeeper 上與該方法對(duì)應(yīng)的指定節(jié)點(diǎn)的目錄下,生成一個(gè)唯一的臨時(shí)有序節(jié)點(diǎn)筛欢。判斷是否獲取鎖的方式很簡(jiǎn)單浸锨,只需要判斷有序節(jié)點(diǎn)中序號(hào)最小的一個(gè)。當(dāng)釋放鎖的時(shí)候版姑,只需將這個(gè)臨時(shí)節(jié)點(diǎn)刪除即可柱搜。同時(shí),其可以避免服務(wù)宕機(jī)導(dǎo)致的鎖無(wú)法釋放剥险,而產(chǎn)生的死鎖問題
Zookeeper實(shí)現(xiàn)分布式鎖總結(jié)
優(yōu)點(diǎn):
有效的解決單點(diǎn)問題聪蘸,不可重入問題,非阻塞問題以及鎖無(wú)法釋放的問題
實(shí)現(xiàn)較為簡(jiǎn)單
缺點(diǎn):
性能上不如使用緩存實(shí)現(xiàn)的分布式鎖表制,因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過程中宇姚,都要?jiǎng)討B(tài)創(chuàng)建、銷毀臨時(shí)節(jié)點(diǎn)來實(shí)現(xiàn)鎖功能
需要對(duì)Zookeeper的原理有所了解
Zookeeper 如何實(shí)現(xiàn)分布式鎖夫凸?
下面講如何實(shí)現(xiàn)排他鎖和共享鎖浑劳,以及如何解決羊群效應(yīng)。
排他鎖
排他鎖夭拌,又稱寫鎖或獨(dú)占鎖魔熏。如果事務(wù)T1對(duì)數(shù)據(jù)對(duì)象O1加上了排他鎖,那么在整個(gè)加鎖期間鸽扁,只允許事務(wù)T1對(duì)O1進(jìn)行讀取或更新操作蒜绽,其他任務(wù)事務(wù)都不能對(duì)這個(gè)數(shù)據(jù)對(duì)象進(jìn)行任何操作,直到T1釋放了排他鎖桶现。
排他鎖核心是保證當(dāng)前有且僅有一個(gè)事務(wù)獲得鎖躲雅,并且鎖釋放之后,所有正在等待獲取鎖的事務(wù)都能夠被通知到骡和。
Zookeeper 的強(qiáng)一致性特性相赁,能夠很好地保證在分布式高并發(fā)情況下節(jié)點(diǎn)的創(chuàng)建一定能夠保證全局唯一性相寇,即Zookeeper將會(huì)保證客戶端無(wú)法重復(fù)創(chuàng)建一個(gè)已經(jīng)存在的數(shù)據(jù)節(jié)點(diǎn)∨タ疲可以利用Zookeeper這個(gè)特性唤衫,實(shí)現(xiàn)排他鎖。
定義鎖:通過Zookeeper上的數(shù)據(jù)節(jié)點(diǎn)來表示一個(gè)鎖
獲取鎖:客戶端通過調(diào)用
create
方法創(chuàng)建表示鎖的臨時(shí)節(jié)點(diǎn)绵脯,可以認(rèn)為創(chuàng)建成功的客戶端獲得了鎖佳励,同時(shí)可以讓沒有獲得鎖的節(jié)點(diǎn)在該節(jié)點(diǎn)上注冊(cè)Watcher監(jiān)聽,以便實(shí)時(shí)監(jiān)聽到lock節(jié)點(diǎn)的變更情況釋放鎖:以下兩種情況都可以讓鎖釋放
當(dāng)前獲得鎖的客戶端發(fā)生宕機(jī)或異常蛆挫,那么Zookeeper上這個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)被刪除
正常執(zhí)行完業(yè)務(wù)邏輯赃承,客戶端主動(dòng)刪除自己創(chuàng)建的臨時(shí)節(jié)點(diǎn)
基于Zookeeper實(shí)現(xiàn)排他鎖流程:
共享鎖
共享鎖,又稱讀鎖悴侵。如果事務(wù)T1對(duì)數(shù)據(jù)對(duì)象O1加上了共享鎖瞧剖,那么當(dāng)前事務(wù)只能對(duì)O1進(jìn)行讀取操作,其他事務(wù)也只能對(duì)這個(gè)數(shù)據(jù)對(duì)象加共享鎖畜挨,直到該數(shù)據(jù)對(duì)象上的所有共享鎖都被釋放筒繁。
共享鎖與排他鎖的區(qū)別在于,加了排他鎖之后巴元,數(shù)據(jù)對(duì)象只對(duì)當(dāng)前事務(wù)可見毡咏,而加了共享鎖之后,數(shù)據(jù)對(duì)象對(duì)所有事務(wù)都可見逮刨。
定義鎖:通過Zookeeper上的數(shù)據(jù)節(jié)點(diǎn)來表示一個(gè)鎖呕缭,是一個(gè)類似于
/lockpath/[hostname]-請(qǐng)求類型-序號(hào)
的臨時(shí)順序節(jié)點(diǎn)獲取鎖:客戶端通過調(diào)用
create
方法創(chuàng)建表示鎖的臨時(shí)順序節(jié)點(diǎn),如果是讀請(qǐng)求修己,則創(chuàng)建/lockpath/[hostname]-R-序號(hào)
節(jié)點(diǎn)恢总,如果是寫請(qǐng)求則創(chuàng)建/lockpath/[hostname]-W-序號(hào)
節(jié)點(diǎn)判斷讀寫順序:大概分為4個(gè)步驟
1)創(chuàng)建完節(jié)點(diǎn)后,獲取
/lockpath
節(jié)點(diǎn)下的所有子節(jié)點(diǎn)睬愤,并對(duì)該節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變更的Watcher監(jiān)聽2)確定自己的節(jié)點(diǎn)序號(hào)在所有子節(jié)點(diǎn)中的順序
3.1)對(duì)于讀請(qǐng)求:1. 如果沒有比自己序號(hào)更小的子節(jié)點(diǎn)片仿,或者比自己序號(hào)小的子節(jié)點(diǎn)都是讀請(qǐng)求,那么表明自己已經(jīng)成功獲取到了共享鎖尤辱,同時(shí)開始執(zhí)行讀取邏輯 2. 如果有比自己序號(hào)小的子節(jié)點(diǎn)有寫請(qǐng)求砂豌,那么等待 3.
3.2)對(duì)于寫請(qǐng)求,如果自己不是序號(hào)最小的節(jié)點(diǎn)光督,那么等待
4)接收到Watcher通知后阳距,重復(fù)步驟1)
釋放鎖:與排他鎖邏輯一致
基于Zookeeper實(shí)現(xiàn)共享鎖流程:
羊群效應(yīng)
在實(shí)現(xiàn)共享鎖的 “判斷讀寫順序” 的第1個(gè)步驟是:創(chuàng)建完節(jié)點(diǎn)后,獲取 /lockpath
節(jié)點(diǎn)下的所有子節(jié)點(diǎn)结借,并對(duì)該節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變更的Watcher監(jiān)聽筐摘。這樣的話,任何一次客戶端移除共享鎖之后,Zookeeper將會(huì)發(fā)送子節(jié)點(diǎn)變更的Watcher通知給所有機(jī)器咖熟,系統(tǒng)中將有大量的 “Watcher通知” 和 “子節(jié)點(diǎn)列表獲取” 這個(gè)操作重復(fù)執(zhí)行圃酵,然后所有節(jié)點(diǎn)再判斷自己是否是序號(hào)最小的節(jié)點(diǎn)(寫請(qǐng)求)或者判斷比自己序號(hào)小的子節(jié)點(diǎn)是否都是讀請(qǐng)求(讀請(qǐng)求),從而繼續(xù)等待下一次通知球恤。
然而辜昵,這些重復(fù)操作很多都是 “無(wú)用的”荸镊,實(shí)際上每個(gè)鎖競(jìng)爭(zhēng)者只需要關(guān)注序號(hào)比自己小的那個(gè)節(jié)點(diǎn)是否存在即可
當(dāng)集群規(guī)模比較大時(shí)咽斧,這些 “無(wú)用的” 操作不僅會(huì)對(duì)Zookeeper造成巨大的性能影響和網(wǎng)絡(luò)沖擊,更為嚴(yán)重的是躬存,如果同一時(shí)間有多個(gè)客戶端釋放了共享鎖张惹,Zookeeper服務(wù)器就會(huì)在短時(shí)間內(nèi)向其余客戶端發(fā)送大量的事件通知–這就是所謂的 “羊群效應(yīng)“。
改進(jìn)后的分布式鎖實(shí)現(xiàn):
具體實(shí)現(xiàn)如下:
客戶端調(diào)用
create
方法創(chuàng)建一個(gè)類似于/lockpath/[hostname]-請(qǐng)求類型-序號(hào)
的臨時(shí)順序節(jié)點(diǎn)客戶端調(diào)用
getChildren
方法獲取所有已經(jīng)創(chuàng)建的子節(jié)點(diǎn)列表(這里不注冊(cè)任何Watcher)
讀請(qǐng)求:向比自己序號(hào)小的最后一個(gè)寫請(qǐng)求節(jié)點(diǎn)注冊(cè)Watcher監(jiān)聽
寫請(qǐng)求:向比自己序號(hào)小的最后一個(gè)節(jié)點(diǎn)注冊(cè)Watcher監(jiān)聽
如果無(wú)法獲取任何共享鎖岭洲,那么調(diào)用
exist
來對(duì)比自己小的那個(gè)節(jié)點(diǎn)注冊(cè)Watcher等待Watcher監(jiān)聽宛逗,繼續(xù)進(jìn)入步驟2
Zookeeper羊群效應(yīng)改進(jìn)前后Watcher監(jiān)聽圖
基于Curator客戶端實(shí)現(xiàn)分布式鎖
Apache Curator是一個(gè)Zookeeper的開源客戶端,它提供了Zookeeper各種應(yīng)用場(chǎng)景(Recipe盾剩,如共享鎖服務(wù)雷激、master選舉、分布式計(jì)數(shù)器等)的抽象封裝告私,接下來將利用Curator提供的類來實(shí)現(xiàn)分布式鎖屎暇。
Curator提供的跟分布式鎖相關(guān)的類有5個(gè),分別是:
Shared Reentrant Lock 可重入鎖
Shared Lock 共享不可重入鎖
Shared Reentrant Read Write Lock 可重入讀寫鎖
Shared Semaphore 信號(hào)量
Multi Shared Lock 多鎖
關(guān)于錯(cuò)誤處理:還是強(qiáng)烈推薦使用ConnectionStateListener處理連接狀態(tài)的改變驻粟。當(dāng)連接LOST時(shí)你不再擁有鎖根悼。
可重入鎖
Shared Reentrant Lock,全局可重入鎖蜀撑,所有客戶端都可以請(qǐng)求挤巡,同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取酷麦,不會(huì)被阻塞矿卑。它是由類 InterProcessMutex
來實(shí)現(xiàn),它的主要方法:
// 構(gòu)造方法
public InterProcessMutex(CuratorFramework client, String path)
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
// 通過acquire獲得鎖,并提供超時(shí)機(jī)制:
public void acquire() throws Exception
public boolean acquire(long time, TimeUnit unit) throws Exception
// 撤銷鎖
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
定義一個(gè) FakeLimitedResource 類來模擬一個(gè)共享資源沃饶,該資源一次只能被一個(gè)線程使用母廷,直到使用結(jié)束,下一個(gè)線程才能使用绍坝,否則會(huì)拋出異常
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
// 模擬只能單線程操作的資源
public void use() throws InterruptedException {
if (!inUse.compareAndSet(false, true)) {
// 在正確使用鎖的情況下徘意,此異常不可能拋出
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (100 * Math.random()));
} finally {
inUse.set(false);
}
下面的代碼將創(chuàng)建 N 個(gè)線程來模擬分布式系統(tǒng)中的節(jié)點(diǎn),系統(tǒng)將通過 InterProcessMutex 來控制對(duì)資源的同步使用轩褐;每個(gè)節(jié)點(diǎn)都將發(fā)起10次請(qǐng)求椎咧,完成 請(qǐng)求鎖--訪問資源--再次請(qǐng)求鎖--釋放鎖--釋放鎖
的過程;客戶端通過 acquire
請(qǐng)求鎖,通過 release
釋放鎖勤讽,獲得幾把鎖就要釋放幾把鎖蟋座;這個(gè)共享資源一次只能被一個(gè)線程使用,如果控制同步失敗脚牍,將拋異常向臀。
public class SharedReentrantLockTest {
private static final String lockPath = "/testZK/sharedreentrantlock";
private static final Integer clientNums = 5;
final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的資源
private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework client = ZKUtils.getClient();
client.start();
Random random = new Random();
try {
final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 每個(gè)客戶端請(qǐng)求10次共享資源
for (int j = 0; j < 10; j++) {
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥鎖");
}
try {
System.out.println(j + ". " + clientName + " 已獲取到互斥鎖");
resource.use(); // 使用資源
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥鎖");
}
System.out.println(j + ". " + clientName + " 已再次獲取到互斥鎖");
lock.release(); // 申請(qǐng)幾次鎖就要釋放幾次鎖
} finally {
System.out.println(j + ". " + clientName + " 釋放互斥鎖");
lock.release(); // 總是在finally中釋放
}
Thread.sleep(random.nextInt(100));
}
} catch (Throwable e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
System.out.println(clientName + " 客戶端關(guān)閉!");
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("結(jié)束诸狭!");
}
}
控制臺(tái)打印日志券膀,可以看到對(duì)資源的同步訪問控制成功,并且鎖是可重入的
0. client#3 已獲取到互斥鎖
0. client#3 已再次獲取到互斥鎖
0. client#3 釋放互斥鎖
0. client#1 已獲取到互斥鎖
0. client#1 已再次獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 已再次獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 已再次獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 已再次獲取到互斥鎖
0. client#4 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 已再次獲取到互斥鎖
1. client#1 釋放互斥鎖
2. client#1 已獲取到互斥鎖
2. client#1 已再次獲取到互斥鎖
2. client#1 釋放互斥鎖
1. client#4 已獲取到互斥鎖
1. client#4 已再次獲取到互斥鎖
1. client#4 釋放互斥鎖
1. client#3 已獲取到互斥鎖
1. client#3 已再次獲取到互斥鎖
1. client#3 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 已再次獲取到互斥鎖
1. client#2 釋放互斥鎖
2. client#4 已獲取到互斥鎖
2. client#4 已再次獲取到互斥鎖
2. client#4 釋放互斥鎖
....
....
client#2 客戶端關(guān)閉驯遇!
9. client#0 已獲取到互斥鎖
9. client#0 已再次獲取到互斥鎖
9. client#0 釋放互斥鎖
9. client#3 已獲取到互斥鎖
9. client#3 已再次獲取到互斥鎖
9. client#3 釋放互斥鎖
client#0 客戶端關(guān)閉芹彬!
8. client#4 已獲取到互斥鎖
8. client#4 已再次獲取到互斥鎖
8. client#4 釋放互斥鎖
9. client#4 已獲取到互斥鎖
9. client#4 已再次獲取到互斥鎖
9. client#4 釋放互斥鎖
client#3 客戶端關(guān)閉和橙!
client#4 客戶端關(guān)閉耕拷!
結(jié)束!
同時(shí)在程序運(yùn)行期間查看Zookeeper節(jié)點(diǎn)樹末秃,可以發(fā)現(xiàn)每一次請(qǐng)求的鎖實(shí)際上對(duì)應(yīng)一個(gè)臨時(shí)順序節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock
[leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]
不可重入鎖
Shared Lock 與 Shared Reentrant Lock 相似陡叠,但是不可重入玩郊。這個(gè)不可重入鎖由類 InterProcessSemaphoreMutex 來實(shí)現(xiàn),使用方法和上面的類類似枉阵。
將上面程序中的 InterProcessMutex 換成不可重入鎖 InterProcessSemaphoreMutex译红,如果再運(yùn)行上面的代碼,結(jié)果就會(huì)發(fā)現(xiàn)線程被阻塞在第二個(gè) acquire
上岭妖,直到超時(shí)临庇,也就是此鎖不是可重入的。
控制臺(tái)輸出日志
0. client#2 已獲取到互斥鎖
0. client#1 不能得到互斥鎖
0. client#4 不能得到互斥鎖
0. client#0 不能得到互斥鎖
0. client#3 不能得到互斥鎖
client#1 客戶端關(guān)閉昵慌!
client#4 客戶端關(guān)閉假夺!
client#3 客戶端關(guān)閉!
client#0 客戶端關(guān)閉斋攀!
0. client#2 釋放互斥鎖
0. client#2 不能再次得到互斥鎖
client#2 客戶端關(guān)閉已卷!
結(jié)束!
把第二個(gè)獲取鎖的代碼注釋淳蔼,程序才能正常執(zhí)行
0. client#1 已獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 釋放互斥鎖
0. client#3 已獲取到互斥鎖
0. client#3 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 釋放互斥鎖
....
....
9. client#4 已獲取到互斥鎖
9. client#4 釋放互斥鎖
9. client#0 已獲取到互斥鎖
client#2 客戶端關(guān)閉侧蘸!
9. client#0 釋放互斥鎖
9. client#1 已獲取到互斥鎖
client#0 客戶端關(guān)閉!
client#4 客戶端關(guān)閉鹉梨!
9. client#1 釋放互斥鎖
9. client#3 已獲取到互斥鎖
client#1 客戶端關(guān)閉讳癌!
9. client#3 釋放互斥鎖
client#3 客戶端關(guān)閉!
結(jié)束存皂!
可重入讀寫鎖
Shared Reentrant Read Write Lock晌坤,可重入讀寫鎖逢艘,一個(gè)讀寫鎖管理一對(duì)相關(guān)的鎖,一個(gè)負(fù)責(zé)讀操作骤菠,另外一個(gè)負(fù)責(zé)寫操作它改;讀操作在寫鎖沒被使用時(shí)可同時(shí)由多個(gè)進(jìn)程使用,而寫鎖在使用時(shí)不允許讀(阻塞)商乎;此鎖是可重入的央拖;一個(gè)擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進(jìn)入寫鎖鹉戚,這也意味著寫鎖可以降級(jí)成讀鎖鲜戒, 比如 請(qǐng)求寫鎖 --->讀鎖 ---->釋放寫鎖
;從讀鎖升級(jí)成寫鎖是不行的崩瓤。
可重入讀寫鎖主要由兩個(gè)類實(shí)現(xiàn):InterProcessReadWriteLock袍啡、InterProcessMutex踩官,使用時(shí)首先創(chuàng)建一個(gè) InterProcessReadWriteLock 實(shí)例却桶,然后再根據(jù)你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是 InterProcessMutex蔗牡。
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
final String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework client = ZKUtils.getClient();
client.start();
final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
final InterProcessMutex readLock = lock.readLock();
final InterProcessMutex writeLock = lock.writeLock();
try {
// 注意只能先得到寫鎖再得到讀鎖颖系,不能反過來!1缭健嘁扼!
if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到寫鎖");
}
System.out.println(clientName + " 已得到寫鎖");
if (!readLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到讀鎖");
}
System.out.println(clientName + " 已得到讀鎖");
try {
resource.use(); // 使用資源
} finally {
System.out.println(clientName + " 釋放讀寫鎖");
readLock.release();
writeLock.release();
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("結(jié)束!");
}
}
控制臺(tái)打印日志
client#1 已得到寫鎖
client#1 已得到讀鎖
client#1 釋放讀寫鎖
client#2 已得到寫鎖
client#2 已得到讀鎖
client#2 釋放讀寫鎖
client#0 已得到寫鎖
client#0 已得到讀鎖
client#0 釋放讀寫鎖
client#4 已得到寫鎖
client#4 已得到讀鎖
client#4 釋放讀寫鎖
client#3 已得到寫鎖
client#3 已得到讀鎖
client#3 釋放讀寫鎖
結(jié)束黔攒!
信號(hào)量
Shared Semaphore趁啸,一個(gè)計(jì)數(shù)的信號(hào)量類似JDK的 Semaphore,JDK中 Semaphore 維護(hù)的一組許可(permits)督惰,而Cubator中稱之為租約(Lease)不傅。有兩種方式可以決定 semaphore 的最大租約數(shù),第一種方式是由用戶給定的 path 決定赏胚,第二種方式使用 SharedCountReader 類访娶。如果不使用 SharedCountReader,沒有內(nèi)部代碼檢查進(jìn)程是否假定有10個(gè)租約而進(jìn)程B假定有20個(gè)租約觉阅。所以所有的實(shí)例必須使用相同的 numberOfLeases 值.
信號(hào)量主要實(shí)現(xiàn)類有:
InterProcessSemaphoreV2 - 信號(hào)量實(shí)現(xiàn)類
Lease - 租約(單個(gè)信號(hào))
SharedCountReader - 計(jì)數(shù)器崖疤,用于計(jì)算最大租約數(shù)量
調(diào)用 acquire
會(huì)返回一個(gè)租約對(duì)象,客戶端必須在 finally 中 close 這些租約對(duì)象典勇,否則這些租約會(huì)丟失掉劫哼。但是,如果客戶端session由于某種原因比如crash丟掉割笙,那么這些客戶端持有的租約會(huì)自動(dòng)close权烧,這樣其它客戶端可以繼續(xù)使用這些租約。租約還可以通過下面的方式返還:
public void returnLease(Lease lease)
public void returnAll(Collection<Lease> leases)
注意一次你可以請(qǐng)求多個(gè)租約,如果 Semaphore 當(dāng)前的租約不夠豪嚎,則請(qǐng)求線程會(huì)被阻塞搔驼。同時(shí)還提供了超時(shí)的重載方法。
public Lease acquire() throws Exception
public Collection<Lease> acquire(int qty) throws Exception
public Lease acquire(long time, TimeUnit unit) throws Exception
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
一個(gè)Demo程序如下
public class SharedSemaphoreTest {
private static final int MAX_LEASE = 10;
private static final String PATH = "/testZK/semaphore";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection<Lease> leases = semaphore.acquire(5);
System.out.println("獲取租約數(shù)量:" + leases.size());
Lease lease = semaphore.acquire();
System.out.println("獲取單個(gè)租約");
resource.use(); // 使用資源
// 再次申請(qǐng)獲取5個(gè)leases侈询,此時(shí)leases數(shù)量只剩4個(gè)舌涨,不夠,將超時(shí)
Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("獲取租約扔字,如果超時(shí)將為null:" + leases2);
System.out.println("釋放租約");
semaphore.returnLease(lease);
// 再次申請(qǐng)獲取5個(gè)囊嘉,這次剛好夠
leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("獲取租約,如果超時(shí)將為null:" + leases2);
System.out.println("釋放集合中的所有租約");
semaphore.returnAll(leases);
semaphore.returnAll(leases2);
client.close();
System.out.println("結(jié)束!");
}
}
控制臺(tái)打印日志
獲取租約數(shù)量:5
獲取單個(gè)租約
獲取租約革为,如果超時(shí)將為null:null
釋放租約
獲取租約扭粱,如果超時(shí)將為null:[org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57]
釋放集合中的所有租約
結(jié)束!
注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看震檩,每個(gè)客戶端都按照請(qǐng)求的順序獲得鎖琢蛤,相當(dāng)公平。
多鎖
Multi Shared Lock 是一個(gè)鎖的容器抛虏。當(dāng)調(diào)用 acquire
博其,所有的鎖都會(huì)被 acquire
,如果請(qǐng)求失敗迂猴,所有的鎖都會(huì)被 release
慕淡。同樣調(diào)用 release
時(shí)所有的鎖都被 release
(失敗被忽略)》谢伲基本上峰髓,它就是組鎖的代表,在它上面的請(qǐng)求釋放操作都會(huì)傳遞給它包含的所有的鎖息尺。
主要涉及兩個(gè)類:
InterProcessMultiLock - 對(duì)所對(duì)象實(shí)現(xiàn)類
InterProcessLock - 分布式鎖接口類
它的構(gòu)造函數(shù)需要包含的鎖的集合携兵,或者一組 ZooKeeper 的 path,用法和 Shared Lock 相同
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
public InterProcessMultiLock(List<InterProcessLock> locks)
一個(gè)Demo程序如下
public class MultiSharedLockTest {
private static final String lockPath1 = "/testZK/MSLock1";
private static final String lockPath2 = "/testZK/MSLock2";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入鎖
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入鎖
// 組鎖掷倔,多鎖
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("不能獲取多鎖");
}
System.out.println("已獲取多鎖");
System.out.println("是否有第一個(gè)鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個(gè)鎖: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); // 資源操作
} finally {
System.out.println("釋放多個(gè)鎖");
lock.release(); // 釋放多鎖
}
System.out.println("是否有第一個(gè)鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個(gè)鎖: " + lock2.isAcquiredInThisProcess());
client.close();
System.out.println("結(jié)束!");
}
}
寫在最后
歡迎大家關(guān)注我的公眾號(hào)【風(fēng)平浪靜如碼】眉孩,海量Java相關(guān)文章,學(xué)習(xí)資料都會(huì)在里面更新勒葱,整理的資料也會(huì)放在里面浪汪。
覺得寫的還不錯(cuò)的就點(diǎn)個(gè)贊,加個(gè)關(guān)注唄凛虽!點(diǎn)關(guān)注死遭,不迷路,持續(xù)更新?Q教丁钉迷!