WHY - 分布式鎖
在單進程系統(tǒng)中,當存在多個線程可以同時對某個變量或某塊代碼進行操作時,為保證其結(jié)果的正確性,需要保證同一時間內(nèi)只有一個線程在進行操作,這個過程可以通過加鎖來實現(xiàn)宾舅。由于在單進程中的多線程是可以共享堆內(nèi)存,因此可以簡單的在內(nèi)存中記錄是否加鎖的標記。
但是現(xiàn)在部署一般都是多站點,多進程的情況下,就需要把標記位存儲在一個各個進程都可以看到的地方,這就出現(xiàn)了分布式鎖;
在本次的項目開發(fā)中因為需要不停的掃描數(shù)據(jù)庫的變更情況,為避免多臺站點同時進行,浪費資源,因此需要分布式鎖來鎖定其中一個進程來完成此操作,所以需要用到分布式鎖耕赘。
WHY - Redisson
由于在本次的項目中掃描數(shù)據(jù)庫是一個持續(xù)性的動作,每個5s掃描一次,因此需要獲取鎖之后不斷延長其過期時間,也就是當某個線程獲取鎖后會一直保持并執(zhí)行掃描的動作,直到該站點掛掉后,才會有其他站點重新獲取鎖并執(zhí)行相關(guān)操作;Redisson已經(jīng)封裝好了續(xù)時功能,使用方便,因此選用Rdisson涝影。
分布式鎖的特征:
1-互斥:互斥的是必須的,否則就不叫鎖了;
2-死鎖:如果在一個線程中獲取到鎖,然后掛了,并沒有釋放,這樣會導(dǎo)致其他的進程或線程永遠無法獲取到鎖,這就會造成死鎖.所以分布式鎖必須避免造成死鎖;
3-性能:高并發(fā)分布式系統(tǒng)中,線程互斥等待會成為性能瓶頸,需要好的中間件和實現(xiàn)來保證性能;
4-鎖特性:分布式鎖不能只是加鎖,需要實現(xiàn)一些其他的功能如:鎖判斷,超市設(shè)置,可重入等;
1.鎖的分類以及基本使用方法
1.1可重入鎖
基于Redis的Redisson分布式可重入鎖RLock
RLock lock = redisson.getLock("myTestLock");
// 最常見的使用方法
lock.lock();
// 加鎖以后10秒鐘自動解鎖
// 無需調(diào)用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖岂贩,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
//Reddsion同時還為分布式鎖提供了異步執(zhí)行的相關(guān)方法
RLock lock = redisson.getLock("myTestLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
1.2公平鎖
基于Redis的Redisson分布式可重入公平鎖,它保證了當多個Redisson客戶端線程同時請求加鎖時,優(yōu)先分配給先發(fā)出請求的線程矾削。所有請求線程會在一個隊列中排隊壤玫,當某個線程出現(xiàn)宕機時,Redisson會等待5秒后繼續(xù)下一個線程哼凯,也就是說如果前面有5個線程都處于等待狀態(tài)欲间,那么后面的線程會等待至少25秒。
RLock fairLock = redisson.getFairLock("myTestLock");
// 最常見的使用方法
fairLock.lock();
// 10秒鐘以后自動解鎖
// 無需調(diào)用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖断部,最多等待100秒猎贴,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
//Redisson同時還為分布式可重入公平鎖提供了異步執(zhí)行的相關(guān)方法:
RLock fairLock = redisson.getFairLock("myTestLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
1.3 聯(lián)鎖
基于Redis的Redisson分布式聯(lián)鎖RedissonMultiLock對象可以將多個RLock對象關(guān)聯(lián)為一個聯(lián)鎖,每個RLock對象實例可以來自于不同的Redisson實例蝴光。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功她渴。
lock.lock();
...
lock.unlock();
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 給lock1,lock2蔑祟,lock3加鎖趁耗,如果沒有手動解開的話,10秒鐘后將會自動解開
lock.lock(10, TimeUnit.SECONDS);
// 為加鎖等待100秒時間疆虚,并在加鎖成功10秒鐘后自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
1.4 紅鎖
基于Redis的Redisson紅鎖RedissonRedLock對象可以用來將多個RLock對象關(guān)聯(lián)為一個紅鎖苛败,每個RLock對象實例可以來自于不同的Redisson實例。
使用方式同上径簿。區(qū)別在與連鎖是所有節(jié)點的鎖加鎖成功才算成功,但紅鎖是大部分節(jié)點加鎖成功即為成功著拭。
紅鎖主要是用來解決什么問題的呢?
為了redis的高可用,一般都會給redis的節(jié)點掛一個slave,然后采用哨兵模式進行主備切換牍帚。但由于Redis的主從復(fù)制(replication)是異步的儡遮,這可能會出現(xiàn)在數(shù)據(jù)同步過程中,master宕機暗赶,slave來不及同步數(shù)據(jù)就被選為master鄙币,從而數(shù)據(jù)丟失。具體流程如下所示:
1.客戶端1從Master獲取了鎖蹂随。
2.Master宕機了十嘿,存儲鎖的key還沒有來得及同步到Slave上。
3.Slave升級為Master岳锁。
4.客戶端2從新的Master獲取到了對應(yīng)同一個資源的鎖绩衷。
為了應(yīng)對這個情形, redis的作者提出了RedLock算法激率,步驟如下(該流程出自官方文檔)咳燕,假設(shè)我們有N個master節(jié)點(官方文檔里將N設(shè)置成5,其實大等于3就行)
1.獲取當前時間(單位是毫秒)乒躺。
2.輪流用相同的key和隨機值在N個節(jié)點上請求鎖招盲,在這一步里,客戶端在每個master上請求鎖時嘉冒,會有一個和總的鎖釋放時間相比小的多的超時時間曹货。比如如果鎖自動釋放時間是10秒鐘咆繁,那每個節(jié)點鎖請求的超時時間可能是5-50毫秒的范圍,這個可以防止一個客戶端在某個宕掉的master節(jié)點上阻塞過長時間顶籽,如果一個master節(jié)點不可用了玩般,我們應(yīng)該盡快嘗試下一個master節(jié)點。
3.客戶端計算第二步中獲取鎖所花的時間礼饱,只有當客戶端在大多數(shù)master節(jié)點上成功獲取了鎖(在這里是3個)壤短,而且總共消耗的時間不超過鎖釋放時間,這個鎖就認為是獲取成功了慨仿。
4.如果鎖獲取成功了久脯,那現(xiàn)在鎖自動釋放時間就是最初的鎖釋放時間減去之前獲取鎖所消耗的時間。
5.如果鎖獲取失敗了镰吆,不管是因為獲取成功的鎖不超過一半(N/2+1)還是因為總消耗時間超過了鎖釋放時間帘撰,客戶端都會到每個master節(jié)點上釋放鎖,即便是那些他認為沒有獲取成功的鎖万皿。
分析:RedLock算法細想一下還存在下面的問題
節(jié)點崩潰重啟摧找,會出現(xiàn)多個客戶端持有鎖
假設(shè)一共有5個Redis節(jié)點:A, B, C, D, E。設(shè)想發(fā)生了如下的事件序列:
1.客戶端1成功鎖住了A, B, C牢硅,獲取鎖成功(但D和E沒有鎖椎旁拧)。
2.節(jié)點C崩潰重啟了减余,但客戶端1在C上加的鎖沒有持久化下來综苔,丟失了。
3.節(jié)點C重啟后位岔,客戶端2鎖住了C, D, E如筛,獲取鎖成功。
這樣抒抬,客戶端1和客戶端2同時獲得了鎖(針對同一資源)杨刨。
為了應(yīng)對節(jié)點重啟引發(fā)的鎖失效問題,redis的作者提出了延遲重啟的概念擦剑,即一個節(jié)點崩潰后妖胀,先不立即重啟它,而是等待一段時間再重啟惠勒,等待的時間大于鎖的有效時間赚抡。采用這種方式,這個節(jié)點在重啟前所參與的鎖都會過期捉撮,它在重啟后就不會對現(xiàn)有的鎖造成影響怕品。這其實也是通過人為補償措施妇垢,降低不一致發(fā)生的概率巾遭。
1.5 讀寫鎖
基于Redis的Redisson分布式可重入讀寫鎖RReadWriteLock允許同時有多個讀鎖和一個寫鎖處于加鎖狀態(tài)肉康。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒鐘以后自動解鎖
// 無需調(diào)用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒灼舍,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
1.6 信號量
在上面的分布式可重入鎖中,只有自己持有的鎖才可以解鎖,也就是說其他線程是沒有辦法解不屬于他們的鎖的,但是如果有業(yè)務(wù)需要的話可以使用基于Redis的Redisson的分布式信號量(Semaphore)來實現(xiàn)吼和。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
//設(shè)置共同持有許可證的最大個數(shù)
semaphore.trySetPermits(23);
//申請一個許可證(需要添加try catch 所以一般常用tryAcquire,不傳參數(shù)默認獲取一個,如下面?zhèn)鲄?3,則表示想要申請23個)
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
1.7可過期性信號量
基于Redis的Redisson可過期性信號量(PermitExpirableSemaphore)是在RSemaphore對象的基礎(chǔ)上,為每個信號增加了一個過期時間骑素。
在使用過程中一般較常用的是帶過期時間的信號量,原理類似與車庫停車,車庫滿了就不能停車,車被開走,騰出的車位可以繼續(xù)使用;因此在實際項目中經(jīng)常用來解決分布式限流的問題或限制一項資源最多能夠同時被多少客戶訪問的問題炫乓。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鐘献丑。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
1.8閉鎖
基于Redisson的Redisson分布式閉鎖(CountDownLatch)Java對象RCountDownLatch末捣。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
2.RedissionLock源碼解讀
2.1RedissionLock繼承關(guān)系
2.2加鎖解鎖源碼分析
看加鎖方法之前先來看下加鎖的流程圖:
先來看下RedissionLock中的Lock方法:
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
從上面的源碼可以看出主要的實現(xiàn)方法在lockInterruptibly中,再來看看這個方法(此方法中主要的思路就是先獲取鎖,如果不成功的話則訂閱釋放鎖的消息,獲得消息前阻塞。得到釋放通知后再去循環(huán)獲取鎖):
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//獲取當前的線程id
long threadId = Thread.currentThread().getId();
//嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//獲取成功
return;
}
//異步訂閱redis channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
//阻塞獲取訂閱結(jié)果
//這里會訂閱Channel创橄,當資源可用時可以及時知道箩做,并搶占,防止無效的輪詢而浪費資源
commandExecutor.syncSubscription(future);
//當資源可用用的時候妥畏,循環(huán)去嘗試獲取鎖邦邦,由于多個線程同時去競爭資源,所以這里用了信號量醉蚁,對于同一個資源只允許一個線程獲得鎖燃辖,其它的線程阻塞
try {
while (true) { //循環(huán)判斷直到獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired 如果剩余鎖超時時間==null,則說明獲取成功
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
//在這里其實是使用到了Semaphore來阻塞獲取直到可以獲取當前線程的許可證后才能繼續(xù)當前的while循環(huán)
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//取消訂閱
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
接下來看下嘗試獲取鎖的源碼 Long ttl = tryAcquire(leaseTime, unit, threadId)
tips:下面方法中使用到了Netty的Future-listener模型,(多線程異步執(zhí)行的時候,有時候還需要知道執(zhí)行結(jié)果或者拿到線程執(zhí)行的返回值,Future就是為了解決這個問題)
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
//如果設(shè)置了鎖的超時時間,則直接調(diào)用tryLockInnerAsync
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//如果沒有設(shè)置鎖的超時時間,則默認超時時間為30s(此時間限制可以在配置文件中修改)
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
//監(jiān)聽Future后如果獲取鎖成功,則當還剩下1/3的超時時間時刷新其過期時間達到續(xù)時的效果
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
接下來終于到了最終獲取鎖的方法tryLockInnerAsync(),點進去看到的源碼如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果不存在"myTestLock"(getName()獲取的值)這個key值
"if (redis.call('exists', KEYS[1]) == 0) then " +
//則設(shè)置鎖,"myTestLock"為key, "uuid:threadId" 為filed,filed值為1,大概的結(jié)構(gòu)為"myTestLock":{"uuid:threadId" : 1}
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//設(shè)置當前鎖的過期時間為internalLockLeaseTime對應(yīng)的值
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果key存在,filed也存在并且值為1,說明當前鎖被當前線程持有著
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//則把filed對應(yīng)的值做加1處理
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//刷新過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//已經(jīng)被其他線程持有,key存在,但是field不存在,返回當前鎖的剩余超時時間
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
接下來看下解鎖的核心方法unlockInnerAsync:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果key不存在,說明已經(jīng)過期或壓根沒有加過鎖
"if (redis.call('exists', KEYS[1]) == 0) then " +
//則發(fā)送unlockMessage
"redis.call('publish', KEYS[2], ARGV[1]); " +
//返回1,解鎖成功
"return 1; " +
"end;" +
//如果key存在,但是filed(uuid:threadId)不存在,說明當前線程不是該鎖的持有者,無權(quán)解鎖,直接返回nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//key存在,filed也存在,說明是當前線程的持有鎖,對filed的值進行減1操作,因為是可重入的,所以不能直接釋放
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果減1操作后,持有數(shù)還是大于0,說明該線程還有其他的地方在持有鎖,刷新過期時間,返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//如果不大于0,則說明該線程不再有持有者,則釋放
"else " +
//刪除key
"redis.call('del', KEYS[1]); " +
//發(fā)送釋放鎖消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
//解鎖成功
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
加鎖和解鎖的核心代碼最終是通過一段lua腳本實現(xiàn),這樣做的目的是為了保證這段復(fù)雜業(yè)務(wù)邏輯執(zhí)行的原子性,因為當lua腳本在執(zhí)行的時候,不會有其他腳本和命令同時執(zhí)行。
2.3續(xù)時操作分析
private void scheduleExpirationRenewal(final long threadId) {
//如果當前鎖已過期則不再執(zhí)行續(xù)時操作,直接返回
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//執(zhí)行續(xù)時操作
RFuture<Boolean> future = renewExpirationAsync(threadId);
//同樣使用Future-Listener獲取執(zhí)行續(xù)時操作的結(jié)果
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself 說明執(zhí)行續(xù)時成功,遞歸調(diào)用
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
執(zhí)行續(xù)時的核心方法renewExpirationAsync
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果key和filed都存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//設(shè)置過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}