寫作不易,點贊收藏關注一鍵三連国旷,以便下次再看稠炬,感謝支持~
前兩篇文章咱們聊到了如何采用SQL數(shù)據(jù)庫及Zookeeper實現(xiàn)相應的分布式鎖陶缺。
初識分布式鎖(一)
初識分布式鎖(二):ZooKeeper分布式鎖原理淺析及實戰(zhàn)案例
今天咱們再來聊聊如何采用redis實現(xiàn)相應的分布式鎖,以及這種實現(xiàn)與前兩種方式實現(xiàn)的差異性舔株。
Redis常見命令
在介紹分布式鎖之前莺琳,我們先來了解一下redis的常用命令:
1、SET key value [EX seconds] [PX milliseconds] [NX|XX]载慈,將字符串值 value
關聯(lián)到 key
惭等。如果 key
已經(jīng)持有其他值, SET就覆寫舊值办铡,無視類型辞做。從 Redis 2.6.12 版本開始琳要, SET命令的行為可以通過一系列參數(shù)來修改:
-
EX second
:設置鍵的過期時間為second
秒。SET key value EX second
效果等同于SETEX key second value
秤茅。 -
PX millisecond
:設置鍵的過期時間為millisecond
毫秒稚补。SET key value PX millisecond
效果等同于PSETEX key millisecond value
。 -
NX
:只在鍵不存在時框喳,才對鍵進行設置操作课幕。SET key value NX
效果等同于SETNX key value
。 -
XX
:只在鍵已經(jīng)存在時五垮,才對鍵進行設置操作乍惊。
2、EXPIRE key seconds放仗,為給定 key
設置生存時間润绎,當 key
過期時(生存時間為 0
),它會被自動刪除匙监。
3凡橱、SETEX key seconds value,將值 value
關聯(lián)到 key
亭姥,并將 key
的生存時間設為 seconds
(以秒為單位)。
這個命令類似于以下兩個命令:
SET key value
EXPIRE key seconds # 設置生存時間
SETEX命令與SET + EXPIRE命令的區(qū)別主要在于顾稀,SETEX命令可以保持原子性达罗,而SET+EXPIRE屬于兩條命令,難以保持其原子性静秆。
4粮揉、DEL key [key ...],刪除給定的一個或多個 key
抚笔。
5扶认、SETNX key value,將 key
的值設為 value
殊橙,當且僅當 key
不存在辐宾。若給定的 key
已經(jīng)存在,則 SETNX 不做任何動作膨蛮。
分布式鎖最關鍵的主要幾個命令我都羅列在上面了~如果還有不清楚或者沒有提及的命令叠纹,可以點開這個文章進行查找。
Lua腳本
緊接著還需要介紹一個redis里面比較不常見的內(nèi)容敞葛,lua腳本誉察。
一般我們需要操作redis的時候,都是需要進入到redis客戶端惹谐,通過一個一個的命令進行編輯輸入持偏,從而完成相應的redis操作驼卖。
這樣的方式操作起來相對方便,而且都是及時反饋鸿秆,在命令數(shù)量較少款慨、操作簡單的時候十分友好。
但是如果當需要執(zhí)行的命令很多谬莹、而且命令可能有前后依賴的時候檩奠,那么采用這樣一個個命令輸入的方式就顯得十分不友好了。
為此附帽,redis特意引入了lua腳本埠戳,用戶可以向服務器發(fā)送 lua 腳本來執(zhí)行自定義動作,獲取腳本的響應數(shù)據(jù)蕉扮。
而且另外一個特點是整胃,Redis 服務器會單線程原子性執(zhí)行 lua 腳本,保證 lua 腳本在處理的過程中不會被任意其它請求打斷喳钟。這個也是lua腳本相比較于單條命令不斷執(zhí)行的優(yōu)勢之一屁使。
分布式鎖原理淺析
redis實現(xiàn)分布式鎖,主要有兩種方式:1奔则、基于redis命令實現(xiàn)蛮寂;2、基于lua腳本實現(xiàn)易茬。
基于redis命令實現(xiàn)
實現(xiàn)的邏輯主要梳理如下:
- 當線程進入程序時候酬蹋,采用SETNX命令往緩存中設置key值,如果設置成功抽莱,證明此時加鎖成功范抓。
- 當線程退出程序的時候,采用DEL命令將key值刪除食铐,從而實現(xiàn)解鎖匕垫。
SETNX key value # 加鎖
# 實現(xiàn)相應的業(yè)務代碼邏輯
DEL key value # 解鎖
但是這樣明顯存在一個問題,如果一個線程在加鎖期間虐呻,因為某些特殊原因掛掉了象泵,沒有進行解鎖,此時就會產(chǎn)生【死鎖】铃慷,從而嚴重影響整個系統(tǒng)的性能单芜。
因此在加鎖后我們還需要采用EXPIRE命令,為相應的KEY值添加上過期時間從而避免死鎖的產(chǎn)生犁柜。
SETNX key value # 加鎖
EXPIRE key seconds # 設置過期時間
# 實現(xiàn)相應的業(yè)務代碼邏輯
DEL key value # 解鎖
問題是不是到此就解決了呢洲鸠?顯然并沒有!
之前我們說過由于加鎖及設置過期時間的代碼是兩個命令,而redis在執(zhí)行兩個命令的時候并不能保證原子性扒腕,因此又可能出現(xiàn)在執(zhí)行SETNX命令的時候绢淀,出現(xiàn)宕機,這樣還是出現(xiàn)了死鎖瘾腰!
因此皆的,在redis,對set命令進行了拓展蹋盆,我們可以將上述的代碼替換成下述的代碼费薄。
SET key value EX seconds NX # 設置鎖的超時時間,且當key存在時直接返回栖雾。
# 實現(xiàn)相應的業(yè)務代碼邏輯
DEL key value # 解鎖
盡管如此楞抡,鎖重入仍是個難題,因為我們采用了NX參數(shù)析藕,因此難以實現(xiàn)鎖的重入召廷;
基于lua腳本實現(xiàn)
相反,得益于lua腳本的執(zhí)行時的原子性账胧,lua腳本能較好的解決上述的種種問題竞慢。
用lua腳本實現(xiàn)的加鎖代碼大致流程如下所示:
lua腳本實現(xiàn)解鎖的主要流程如下所示:
更詳細的代碼解析,在Redisson源碼淺析中我們會分析到治泥。
但需要注意的一點是筹煮,鎖的過期時間設定是一門難題,設置時間長了车摄,鎖久久不釋放影響性能寺谤;設置短了沼填,業(yè)務代碼還沒執(zhí)行完鎖就釋放了凌彬,沒法限制其他線程的代碼執(zhí)行东揣。比較巧妙的是,現(xiàn)有的框架里面已經(jīng)有使用守護線程的方式(看門狗)來自動延長過期時間意狠,從而簡化使用的門檻。
代碼實戰(zhàn)
這次代碼實戰(zhàn)疮胖,我們采用Redission實現(xiàn)分布式鎖环戈,其實redission框架對分布式鎖的封裝相對完善,只需要很少的代碼就可以實現(xiàn)對應分布式加鎖及解鎖澎灸。
首先院塞,我們寫一個配置類,用于加載我們對應的容器到spring中性昭,這里需要注意的一點是拦止,@Bean注解會默認使用方法名作為容器名字,要確保咱們的方法名與要加載的容器名字一致,當然也可以使用@Bean(value = "redissionClient")來顯式的指定容器的名字汹族。
@Configuration
public class RedisConfig {
//這里在application.yml中填寫你對應的redis的ip:port
@Value("${redis.address}")
private String redisAddress;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress(redisAddress);
return Redisson.create(config);
}
}
在將對應的容器注入到Spring的框架后萧求,我們調(diào)用redission的關鍵方法getLock獲取對應的鎖。緊接著可以對這個鎖調(diào)用相應的tryLock方法進行上鎖顶瞒,這里的上鎖是個多態(tài)方法夸政,主要區(qū)別如下所示:
// 不填寫參數(shù),即時獲取鎖榴徐,如果鎖不可用則直接返回false守问。
boolean tryLock();
// 在給定時間內(nèi)獲取對應的鎖(如果線程沒有被中斷)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
這里我們采用簡單的方法實現(xiàn),直接采用tryLock()改造對應的代碼內(nèi)容坑资,改造后的代碼如下:
@Resource
RedissionClient redissionClient;
public Boolean deductProduct(ProductPO productPO){
//首先獲取分布式的鎖
RLock lock = redissonClient.getLock("deductProduct");
try{
LOGGER.info("分布式鎖加鎖!");
//嘗試對redis的分布式鎖進行加鎖
boolean success = lock.tryLock(30, TimeUnit.SECONDS);
if (!success){
//加鎖失敗耗帕,直接返回
return false;
}
LOGGER.info("查找商品的數(shù)據(jù)為 :"+ JSON.toJSONString(productPO));
Example example = new Example(ProductPO.class);
Example.Criteria criteria = example.createCriteria();
criteria.andEqualTo("skuId", productPO.getSkuId());
List<ProductPO> productPOS = productMapper.selectByExample(example);
if (CollectionUtils.isEmpty(productPOS)){
throw new RuntimeException("當前商品不存在");
}
for (ProductPO selectProductPO: productPOS){
//對對應的sku進行數(shù)量扣減
Integer number = selectProductPO.getNumber();
LOGGER.info("當前商品的數(shù)量為:"+number);
if (number<=0){
//小于等于0時,不進行扣減
continue;
}
selectProductPO.setNumber(number-productPO.getNumber());
productMapper.updateByPrimaryKey(selectProductPO);
}
}finally {
//最后一定記得釋放鎖資源
LOGGER.info("分布式鎖釋放盐茎!");
lock.unlock();
}
return true;
}
隨后運行咱們的代碼就可以得到相應的結(jié)果啦:
源碼淺析
加鎖源碼
對tryLock()兴垦,即加鎖的代碼進行分析。
boolean success = lock.tryLock();
深入到關鍵的源碼層面字柠,其主要代碼如下:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
if (leaseTime != -1) {
/*關鍵代碼*/
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
/*關鍵代碼*/
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
//如果成功獲取鎖
if (acquired) {
if (leaseTime != -1) {
// 明確指定了租約時間探越,則更新類相應的租約時間即可
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 否則將當前的ThreadId保存到一個相應的ConcurrentMap中,
// 開啟守護線程窑业,定期刷新對應線程ID持有鎖的過期時間钦幔。避免出現(xiàn)鎖過期被釋放的問題
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper<>(f);
}
獲取鎖的命令中,可以看到比較關鍵的代碼是tryLockInnerAsync常柄。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//這個命令的邏輯相對清晰鲤氢,首先判斷當前的key值是否存在
//==0則代表哈希的key不存在,則此時新增哈希的key及field對象
//==1則代表哈希key及對應的field對象存在西潘,刷新其過期時間卷玉,同時會返回其剩余的超時時間。
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
我們追入相應的代碼中可以看到喷市,Redission底層的源碼是采用lua腳本的方式執(zhí)行的相种。其中有一些關鍵的參數(shù)及命令列舉如下:
KEYS[1],是Collections.singletonList(getName())品姓,表示分布式鎖的key寝并,即REDLOCK_KEY;
ARGV[1],是internalLockLeaseTime腹备,即鎖的租約時間衬潦,默認30s;
ARGV[2],是getLockName(threadId)植酥,是獲取鎖時set的唯一值镀岛,即UUID+threadId
"pexpire",為設置鍵的超時時間,對一個已經(jīng)存在的鍵重復使用會刷新過期時間哎媚。
"hincrby"喇伯,則是對哈希對象中某個field對象進行原子增加或減少。
"pttl"拨与,則是返回當前鍵的過期時間稻据。
"exists",判斷當前的key值是否存在买喧。
值得關注的還有scheduleExpirationRenewal里的源碼:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
//往key對應的哈希結(jié)構(gòu)中添加新的ThreadId
oldEntry.addThreadId(threadId);
} else {
//同上
entry.addThreadId(threadId);
try {
renewExpiration();//開啟守護線程捻悯,關鍵代碼
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//啟動一個定時任務
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId(); // 獲取對應key的第一個線程Id
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId); // 在此處采用腳本的方式更新對應的過期時間
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
//如果更新成功,那么此時重復進入此方法淤毛,再次更新今缚。
renewExpiration();
} else {
//否則取消更新
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
由此分析下來,整個加鎖的流程就相對清晰了低淡。流程主要為:
1姓言、首先判斷是否存在這個鍵
- 返回0則代表哈希的key不存在,則此時新增哈希的key及field對象蔗蹋;
- 返回1則代表哈希key及對應的field對象存在何荚,刷新其過期時間,同時會返回其剩余的超時時間猪杭。
2餐塘、如果加鎖成功了,根據(jù)租約時間會有不同的策略皂吮。
- 如果指定了過期時間戒傻,那么不會開啟守護線程,而是任由鎖超時后自動釋放
- 如果沒有指定過期時間蜂筹,那么此時會開啟一個守護線程需纳,持續(xù)去更新對應線程ID的redis鎖時間。
解鎖源碼
解鎖的關鍵代碼主要如下:
@Override
public RFuture<Void> unlockAsync(long threadId) {
/*解鎖關鍵代碼*/
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//解鎖成功后艺挪,需要解鎖對應的watchDog機制候齿,即關閉掉對應的自動延時機制。
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
追入unlockInnerAsync中進行查看:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
//如果當前不存在對應的分布式鎖闺属,直接返回
//否則將對應的key/field對象的計數(shù)-1(針對重入鎖)
//如果此時計數(shù)>0,就再次刷新相應鎖過期時間
//否則直接刪除鎖周霉,并向?qū)念l道通知掂器。
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
主要的相關參數(shù)羅列如下:
- KEYS[1],是getRawName()俱箱,表示分布式鎖的key;
- KEYS[2]国瓮,是getChannelName(),這里會將分布式的key與固定的前綴進行組合,用于將解鎖的消息發(fā)送到特定的頻道乃摹。
- ARGV[1]禁漓,是LockPubSub.UNLOCK_MESSAGE,即發(fā)送的消息類型孵睬,此處為【解鎖】;
- ARGV[2]播歼,是internalLockLeaseTime,即鎖的租約時間掰读,默認30s;
- ARGV[3]秘狞,是getLockName(threadId),是獲取鎖時set的唯一值蹈集,即UUID+threadId烁试。
- Publish,命令用于將信息發(fā)送到指定的頻道拢肆。(Ps:關于redis發(fā)布訂閱的介紹减响,可以看這里)
看到這里我產(chǎn)生了一些疑惑,為啥我們前置都沒有進行消息的監(jiān)聽郭怪,這里卻做了解鎖消息的廣播呢支示?為此我又查閱了一遍源碼,發(fā)現(xiàn)原來tryLock()不會去監(jiān)聽相應的頻道消息移盆,但是tryLock(long waitTime, long leaseTime, TimeUnit unit)方法悼院,卻會監(jiān)聽對應的消息。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
...
//監(jiān)聽相應的消息
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
...
}
在進行了訂閱之后咒循,線程會進入自旋据途。只有當其余線程釋放了占用的鎖并會廣播了解鎖消息后,監(jiān)聽器接收到解鎖消息叙甸,并釋放信號量颖医,才會會喚醒阻塞在這里的其余線程。(不禁大喊一聲:“作者牛逼裆蒸∪巯簦”)
由此梳理下來,整個解鎖流程也相對清晰了僚祷。主要為:
1佛致、首先判斷是否存在這個鍵(lua腳本)
- 不存在,直接返回辙谜。
- 存在俺榆,鎖線程計數(shù)-1。
2装哆、如果計數(shù)扣減成功罐脊,根據(jù)計數(shù)會有不同的策略定嗓。
- 鎖線程計數(shù)大于0,意味著此時鎖處于重入狀態(tài)萍桌,刷新過期時間并退出宵溅。
- 如果計數(shù)小于等于0,刪除對應的鎖上炎,同時發(fā)送廣播消息提醒其余的鎖進行爭搶恃逻。
3、最后反症,解鎖成功后辛块,還需要暫停相應的【看門狗機制】,關閉相應的自動延時任務铅碍。
優(yōu)劣性分析
優(yōu)勢
1润绵、基于緩存實現(xiàn),性能較好胞谈,
2尘盼、lua腳本方式實現(xiàn),拓展性好烦绳,可支持鎖重入卿捎、訂閱/發(fā)布等多個功能。
3径密、現(xiàn)有框架已實現(xiàn)午阵,開箱即用。
4享扔、支持watchDog自動延時功能底桂。
缺點
1、不保持高一致性惧眠,不能保證分布式下每個redis中保存的內(nèi)容每時每刻完全一致籽懦。因此讀寫時,需要讀取同一個redis實例氛魁。
2暮顺、而且在主從情況下,往主redis實例寫入后秀存,主實例還沒來得及同步到從實例就掛掉了捶码,導致了從實例可以再次進行加鎖,出現(xiàn)了多個服務器同時加鎖的情況或链,有興趣的可以進一步了解REDLOCK算法宙项。