轉(zhuǎn)自:https://github.com/angryz/my-blog/issues/4
Why 分布式鎖
java.util.concurrent.locks
中包含了 JDK 提供的在多線程情況下對共享資源的訪問控制的一系列工具撒轮,它們可以幫助我們解決進程內(nèi)多線程并發(fā)時的數(shù)據(jù)一致性問題。
但是在分布式系統(tǒng)中,JDK 原生的并發(fā)鎖工具在一些場景就無法滿足我們的要求了晦墙,這就是為什么要使用分布式鎖唉窃。我總結(jié)了一句話梅鹦,分布式鎖是用于解決分布式系統(tǒng)中操作共享資源時的數(shù)據(jù)一致性問題颁糟。
設(shè)計分布式鎖要注意的問題
互斥
分布式系統(tǒng)中運行著多個節(jié)點桑谍,必須確保在同一時刻只能有一個節(jié)點的一個線程獲得鎖瓦盛,這是最基本的一點洗显。
死鎖
分布式系統(tǒng)中,可能產(chǎn)生死鎖的情況要相對復雜一些原环。分布式系統(tǒng)是處在復雜網(wǎng)絡(luò)環(huán)境中的挠唆,當一個節(jié)點獲取到鎖,如果它在釋放鎖之前掛掉了嘱吗,或者因網(wǎng)絡(luò)故障無法執(zhí)行釋放鎖的命令玄组,都會導致其他節(jié)點無法申請到鎖滔驾。
因此分布式鎖有必要設(shè)置時效,確保在未來的一定時間內(nèi)俄讹,無論獲得鎖的節(jié)點發(fā)生了什么問題哆致,最終鎖都能被釋放掉。
性能
對于訪問量大的共享資源患膛,如果針對其獲取鎖時造成長時間的等待摊阀,導致大量節(jié)點阻塞,是絕對不能接受的踪蹬。
所以設(shè)計分布式鎖時要能夠掌握鎖持有者的動態(tài),若判斷鎖持有者處于不活動狀態(tài),要能夠強制釋放其持有的鎖控硼。
此外暇仲,排隊等待鎖的節(jié)點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖疚漆,這樣無法保證資源的高效利用酣胀,因此當鎖釋放時,要能夠通知等待隊列愿卸,使一個等待節(jié)點能夠立刻獲得鎖灵临。
重入
考慮到一些應用場景和資源的高效利用,鎖要設(shè)計成可重入的趴荸,就像 JDK 中的 ReentrantLock 一樣儒溉,同一個線程可以重復拿到同一個資源的鎖。
RedissonLock 實現(xiàn)解讀
本文中 Redisson 的代碼版本為 2.2.17-SNAPSHOT发钝。
這里以 lock()
方法為例顿涣,其他一系列方法與其核心實現(xiàn)基本一致。
先來看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.獲得鎖對象實例
lock.lock(); // 2.獲取分布式鎖
try {
// do sth.
} finally {
lock.unlock(); // 3.釋放鎖
}
- 通過 RedissonClient 的
getLock()
方法取得一個 RLock 實例酝豪。 -
lock()
方法嘗試獲取鎖涛碑,如果成功獲得鎖,則繼續(xù)往下執(zhí)行孵淘,否則等待鎖被釋放蒲障,然后再繼續(xù)嘗試獲取鎖,直到成功獲得鎖瘫证。 -
unlock()
方法釋放獲得的鎖揉阎,并通知等待的節(jié)點鎖已釋放。
下面來看看 RedissonLock 的具體實現(xiàn)
org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
}
這里的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個 interface背捌,getLock
返回的實際上是其實現(xiàn)類 RedissonLock 的實例毙籽。
來看看構(gòu)造 RedissonLock 的參數(shù)
-
commandExecutor: 與 Redis 節(jié)點通信并發(fā)送指令的真正實現(xiàn)。需要說明一下毡庆,Redisson 缺省的 CommandExecutor 實現(xiàn)是通過
eval
命令來執(zhí)行 Lua 腳本坑赡,所以要求 Redis 的版本必須為 2.6 或以上烙如,否則你可能要自己來實現(xiàn) CommandExecutor。關(guān)于 Redisson 的 CommandExecutor 以后會專門解讀毅否,所以本次就不多說了亚铁。 -
name: 鎖的全局名稱,例如上面代碼中的
"foobar"
搀突,具體業(yè)務中通车睹疲可能使用共享資源的唯一標識作為該名稱。 - id: Redisson 客戶端唯一標識仰迁,實際上就是一個 UUID.randomUUID()甸昏。
org.redisson.RedissonLock#lock()
此處略過前面幾個方法的層層調(diào)用,直接看最核心部分的方法 lockInterruptibly()
徐许,該方法在 RLock 中聲明施蜜,支持對獲取鎖的線程進行中斷操作。在直接使用 lock()
方法獲取鎖時雌隅,最后實際執(zhí)行的是 lockInterruptibly(-1, null)
翻默。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1.嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit);
// 2.獲得鎖成功
if (ttl == null) {
return;
}
// 3.等待鎖釋放,并訂閱鎖
long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
get(future);
try {
while (true) {
// 4.重試獲取鎖
ttl = tryAcquire(leaseTime, unit);
// 5.成功獲得鎖
if (ttl == null) {
break;
}
// 6.等待鎖釋放
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 7.取消訂閱
unsubscribe(future, threadId);
}
}
- 首先嘗試獲取鎖恰起,具體代碼下面再看修械,返回結(jié)果是已存在的鎖的剩余存活時間,為
null
則說明沒有已存在的鎖并成功獲得鎖检盼。 - 如果獲得鎖則結(jié)束流程肯污,回去執(zhí)行業(yè)務邏輯。
- 如果沒有獲得鎖吨枉,則需等待鎖被釋放蹦渣,并通過 Redis 的 channel 訂閱鎖釋放的消息,這里的具體實現(xiàn)本文也不深入貌亭,只是簡單提一下 Redisson 在執(zhí)行 Redis 命令時提供了同步和異步的兩種實現(xiàn)柬唯,但實際上同步的實現(xiàn)都是基于異步的,具體做法是使用 Netty 中的異步工具 Future 和 FutureListener 結(jié)合 JDK 中的 CountDownLatch 一起實現(xiàn)圃庭。
- 訂閱鎖的釋放消息成功后锄奢,進入一個不斷重試獲取鎖的循環(huán),循環(huán)中每次都先試著獲取鎖剧腻,并得到已存在的鎖的剩余存活時間拘央。
- 如果在重試中拿到了鎖,則結(jié)束循環(huán)恕酸,跳過第 6 步。
- 如果鎖當前是被占用的蕊温,那么等待釋放鎖的消息袱箱,具體實現(xiàn)使用了 JDK 并發(fā)的信號量工具 Semaphore 來阻塞線程,當鎖釋放并發(fā)布釋放鎖的消息后义矛,信號量的
release()
方法會被調(diào)用发笔,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。 - 在成功獲得鎖后凉翻,就沒必要繼續(xù)訂閱鎖的釋放消息了了讨,因此要取消對 Redis 上相應 channel 的訂閱。
下面著重看看 tryAcquire()
方法的實現(xiàn)制轰,
private Long tryAcquire(long leaseTime, TimeUnit unit) {
// 1.將異步執(zhí)行的結(jié)果以同步的形式返回
return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}
private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2.用默認的鎖超時時間去獲取鎖
Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 成功獲得鎖
if (ttlRemaining == null) {
// 3.鎖過期時間刷新任務調(diào)度
scheduleExpirationRenewal();
}
}
});
return ttlRemainingFuture;
}
<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 4.使用 EVAL 命令執(zhí)行 Lua 腳本獲取鎖
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime,
getLockName(threadId));
}
- 上面說過 Redisson 實現(xiàn)的執(zhí)行 Redis 命令都是異步的前计,但是它在異步的基礎(chǔ)上提供了以同步的方式獲得執(zhí)行結(jié)果的封裝。
- 前面提到分布式鎖要確保未來的一段時間內(nèi)鎖一定能夠被釋放垃杖,因此要對鎖設(shè)置超時釋放的時間男杈,在我們沒有指定該時間的情況下,Redisson 默認指定為30秒调俘。
- 在成功獲取到鎖的情況下伶棒,為了避免業(yè)務中對共享資源的操作還未完成,鎖就被釋放掉了彩库,需要定期(鎖失效時間的三分之一)刷新鎖失效的時間肤无,這里 Redisson 使用了 Netty 的 TimerTask、Timeout 工具來實現(xiàn)該任務調(diào)度骇钦。
- 獲取鎖真正執(zhí)行的命令宛渐,Redisson 使用
EVAL
命令執(zhí)行上面的 Lua 腳本來完成獲取鎖的操作:- 如果通過
exists
命令發(fā)現(xiàn)當前 key 不存在,即鎖沒被占用司忱,則執(zhí)行hset
寫入 Hash 類型數(shù)據(jù) key:全局鎖名稱(例如共享資源ID), field:鎖實例名稱(Redisson客戶端ID:線程ID), value:1皇忿,并執(zhí)行pexpire
對該 key 設(shè)置失效時間,返回空值nil
坦仍,至此獲取鎖成功鳍烁。 - 如果通過
hexists
命令發(fā)現(xiàn) Redis 中已經(jīng)存在當前 key 和 field 的 Hash 數(shù)據(jù),說明當前線程之前已經(jīng)獲取到鎖繁扎,因為這里的鎖是可重入的幔荒,則執(zhí)行hincrby
對當前 key field 的值加一,并重新設(shè)置失效時間梳玫,返回空值爹梁,至此重入獲取鎖成功。 - 最后是鎖已被占用的情況提澎,即當前 key 已經(jīng)存在姚垃,但是 Hash 中的 Field 與當前值不同,則執(zhí)行
pttl
獲取鎖的剩余存活時間并返回盼忌,至此獲取鎖失敗积糯。
- 如果通過
以上就是對 lock()
的解讀掂墓,不過在實際業(yè)務中我們可能還會經(jīng)常使用 tryLock()
,雖然兩者有一定差別看成,但核心部分的實現(xiàn)都是相同的君编,另外還有其他一些方法可以支持更多自定義參數(shù),本文中就不一一詳述了川慌。
org.redisson.RedissonLock#unlock()
最后來看鎖的釋放吃嘿,
@Override
public void unlock() {
// 1.通過 EVAL 和 Lua 腳本執(zhí)行 Redis 命令釋放鎖
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime,
getLockName(Thread.currentThread().getId()));
// 2.非鎖的持有者釋放鎖時拋出異常
if (opStatus == null) {
throw new IllegalMonitorStateException(
"attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 3.釋放鎖后取消刷新鎖失效時間的調(diào)度任務
if (opStatus) {
cancelExpirationRenewal();
}
}
- 使用
EVAL
命令執(zhí)行 Lua 腳本來釋放鎖:- key 不存在,說明鎖已釋放梦重,直接執(zhí)行
publish
命令發(fā)布釋放鎖消息并返回1
兑燥。 - key 存在,但是 field 在 Hash 中不存在忍饰,說明自己不是鎖持有者贪嫂,無權(quán)釋放鎖,返回
nil
艾蓝。 - 因為鎖可重入力崇,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖赢织,因此執(zhí)行
hincrby
對鎖的值減一亮靴。 - 釋放一把鎖后,如果還有剩余的鎖于置,則刷新鎖的失效時間并返回
0
茧吊;如果剛才釋放的已經(jīng)是最后一把鎖,則執(zhí)行del
命令刪除鎖的 key八毯,并發(fā)布鎖釋放消息搓侄,返回1
。
- key 不存在,說明鎖已釋放梦重,直接執(zhí)行
- 上面執(zhí)行結(jié)果返回
nil
的情況(即第2中情況)话速,因為自己不是鎖的持有者讶踪,不允許釋放別人的鎖,故拋出異常泊交。 - 執(zhí)行結(jié)果返回
1
的情況乳讥,該鎖的所有實例都已全部釋放,所以不需要再刷新鎖的失效時間廓俭。
總結(jié)
寫了這么多云石,其實最主要的就是上面的兩段 Lua 腳本,基于 Redis 的分布式鎖的設(shè)計完全體現(xiàn)在其中研乒,看完這兩段腳本汹忠,再回顧一下前面的 設(shè)計分布式鎖要注意的問題 就豁然開朗了。
redisson是redis官網(wǎng)推薦的java語言實現(xiàn)分布式鎖的項目。當然宽菜,redisson遠不止分布式鎖奖地,還包括其他一些分布式結(jié)構(gòu)。詳情請移步:https://github.com/mrniko/redisson/wiki
redisson支持4種鏈接redis的方式:
Cluster(集群)
Sentinel servers(哨兵)
Single server(單機)
分布式鎖之redisson
redisson是redis官網(wǎng)推薦的java語言實現(xiàn)分布式鎖的項目赋焕。當然,redisson遠不止分布式鎖仰楚,還包括其他一些分布式結(jié)構(gòu)隆判。詳情請移步:https://github.com/mrniko/redisson/wiki
redisson支持4種鏈接redis的方式:
Cluster(集群)
Sentinel servers(哨兵)
Single server(單機)
下面通過簡單的案例使用redisson的lock。
1僧界、RedissonManager類侨嘀,管理redisson的初始化等操作。
public class RedissonManager {
private static final String RAtomicName = "genId_";
private static Config config = new Config();
private static Redisson redisson = null;
public static void init(){
try {
config.useClusterServers() //這是用的集群server
.setScanInterval(2000) //設(shè)置集群狀態(tài)掃描時間
.setMasterConnectionPoolSize(10000) //設(shè)置連接數(shù)
.setSlaveConnectionPoolSize(10000)
.addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
redisson = Redisson.create(config);
//清空自增的ID數(shù)字
RAtomicLong atomicLong = redisson.getAtomicLong(RAtomicName);
atomicLong.set(1);
}catch (Exception e){
e.printStackTrace();
}
}
public static Redisson getRedisson(){
return redisson;
}
/** 獲取redis中的原子ID */
public static Long nextID(){
RAtomicLong atomicLong = getRedisson().getAtomicLong(RAtomicName);
atomicLong.incrementAndGet();
return atomicLong.get();
}
}
2捂襟、DistributedRedisLock類咬腕,提供鎖和解鎖方法
public class DistributedRedisLock {
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";
public static void acquire(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.lock(2, TimeUnit.MINUTES); //lock提供帶timeout參數(shù),timeout結(jié)束強制解鎖葬荷,防止死鎖
System.err.println("======lock======"+Thread.currentThread().getName());
}
public static void release(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.unlock();
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
3涨共、測試
private static void redisLock(){
RedissonManager.init(); //初始化
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
String key = "test123";
DistributedRedisLock.acquire(key);
Thread.sleep(1000); //獲得鎖之后可以進行相應的處理
System.err.println("======獲得鎖后進行相應的操作======");
DistributedRedisLock.release(key);
System.err.println("=============================");
} catch (Exception e) {
e.printStackTrace();
}
}
});
t.start();
}
}
4、測試結(jié)果
======lock======Thread-91
======獲得鎖后進行相應的操作======
======unlock======Thread-91
=============================
======lock======Thread-63
======獲得鎖后進行相應的操作======
======unlock======Thread-63
=============================
======lock======Thread-31
======獲得鎖后進行相應的操作======
======unlock======Thread-31
=============================
======lock======Thread-97
======獲得鎖后進行相應的操作======
======unlock======Thread-97
=============================
======lock======Thread-8
======獲得鎖后進行相應的操作======
======unlock======Thread-8
=============================
從測試結(jié)果可以看出宠漩,結(jié)果還是達到了預期举反,在服務器跑一萬個線程還是能很好運行,感覺還不錯扒吁。
題外話:
在初始化數(shù)據(jù)時候火鼻,最好不要使用static{} 即靜態(tài)塊。因為在多核機器的情況下讀取配置文件雕崩,會拋出java.lang.NoClassDefFoundError: Could not initialize class XXX魁索。
所以最好還是使用init的方式,在啟動程序的時候手動執(zhí)行下盼铁。