帶著問題去思考
分布式鎖有哪些解決方案掉伏?方案的利弊各自體現(xiàn)在哪里?
基于redis來實現(xiàn)分布式鎖實現(xiàn)原理炕泳,以及需要主要那些問題昭殉?
基于ZooKeeper 的分布式鎖實現(xiàn)原理
背景概要
互聯(lián)網(wǎng)從開始的單體應用隨之發(fā)展成目前的分布式應用棵癣,例如市場上流行的分布式框架Dubbo辕翰、SpringCloud等等
單體應用的優(yōu)勢:維護、集成狈谊、部署簡單喜命,適合小團隊獨立維護,劣勢隨之產(chǎn)生的是可擴展性太差河劝,代碼腐化維護成本的增加渊抄、應用復雜度越高功能越多風險性就越大。
所以為了解決以上問題丧裁,引入了分布式應用护桦,市場上很多大型網(wǎng)站以及應用都是分布式部署的。
分布式系統(tǒng)主要從三方面獲得提升:
- 擴展性:集群擴展性煎娇、地理擴展性二庵、管理擴展性
- 性能:短RT、低延遲缓呛,高吞吐和較低的計算資源占用率催享。
- 可用性:可用性=可用時間/(可用時間+不可用時間),可用性百分比越高哟绊,難度越高 因妙。
分布式家族中包含分布式服務、分布式消息、分布式緩存攀涵、分布式調(diào)度铣耘、分布式數(shù)據(jù)庫、分布式搜索以故、分布式鎖蜗细、分布式事務、分布式計算等等
在分布式場景中數(shù)據(jù)一致性向來是很重要的話題怒详,CAP理論中“任何一個分布式系統(tǒng)都無法同時滿足一致性(Consistency
)炉媒、可用性(Availability
)和分區(qū)容錯性(Partition tolerance
),最多只能同時滿足兩項昆烁。要么CP,要么AP吊骤。
https://www.yuque.com/docs/share/6e6afffe-c348-461e-90bc-711231fbd209?#
在很多場景下,我們是需要保證數(shù)據(jù)一致性静尼,為了滿足一致性問題水援,我們需要很多技術(shù)方案支持,比如分布式鎖分布式事務等等茅郎。本次分享主要針對分布式鎖來進行延伸探討。
分布式鎖的介紹
講到分布式鎖或渤,首先要提到與之對應的線程鎖
- 線程鎖:當某個方法或代碼使用鎖系冗,在同一時刻僅有一個線程執(zhí)行該方法或該代碼段。以保證共享資源安全性薪鹦,線程鎖只在同一個進程【同一個JVM】中才有效掌敬。
- 分布式鎖:當多個進程不在同一個系統(tǒng)中,用分布式鎖控制多個進程對資源的訪問池磁。
到底什么時候需要分布式鎖呢奔害?
總結(jié)來說,當有多個客戶端需要訪問并防止并操作同一個資源地熄,并且還需要保持這個資源的一致性的時候华临,就需要分布式鎖,實現(xiàn)讓多個客戶端互斥的對共享資源進行訪問端考。
eg: 集群部署下秒殺場景雅潭、集群部署下批處理業(yè)務的執(zhí)行等等
考慮因素
- 排他性:分布式部署的應用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執(zhí)行
- 可重入性:同一線程多次獲取鎖避免死鎖
- 阻塞鎖考慮:考慮業(yè)務是否需要
- 高可用:獲取釋放鎖性能佳
- 原子性:加鎖解鎖原子性操作却特,避免多次請求獲得鎖扶供。
常見的分布式鎖實現(xiàn)方案
想要實現(xiàn)分布式鎖,我們就需要借助外部系統(tǒng)實現(xiàn)互斥的功能裂明。常見的有以下幾種方式:
- 基于數(shù)據(jù)庫椿浓;
- 基于redis;
- 基于Zookeeper;
基于數(shù)據(jù)庫
- Mysql:通過唯一索引、通過樂觀鎖version版本扳碍、通過悲觀鎖行鎖 for update實現(xiàn)提岔;
- MongoDB:findAndModify原子性命令,相較Mysql性能要好很多
容易理解左腔,但解決問題的方案相對越來越復雜唧垦,并且數(shù)據(jù)庫需要一定的開銷,性能值得考慮
本次分享不涵蓋基于數(shù)據(jù)庫分布式鎖實現(xiàn)液样,有興趣可以下去自行通過上述方向去擴展振亮。
基于Redis
在實現(xiàn)redis分布式鎖之前,我們先mock一個場景來看看鞭莽,當分布式應用場景下坊秸,我們不用鎖或者用java中線程鎖會出現(xiàn)什么問題呢?
@GetMapping("/kill")
public String kill() {
// 定義商品key值
String key = "goods";
// 獲取商品數(shù)量
Object obj = redisTemplate.opsForValue().get(key);
Integer mount = Integer.valueOf(obj.toString());
// 如果商品被搶完澎怒,直接返回
if (mount < 0 || mount == 0) {
System.out.println("很遺憾褒搔,商品已被搶完");
return "很遺憾,商品已被搶完";
}
// 線程睡眠喷面,目的在于放大錯誤
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 搶到商品后星瘾,將redis的商品數(shù)量減一
mount = --mount;
redisTemplate.opsForValue().set(key, mount.toString());
// 打印,以便觀察
System.out.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":搶到第" + (mount + 1) + "件商品【kill】");
return "恭喜惧辈,商品搶購成功";
}
原理
最核心的三個命令:setNx琳状、Px(setNx、expire)盒齿、delete
setNx當返回1時代表獲取鎖成功念逞,0則搶鎖失敗
redis原生實現(xiàn)
具體實現(xiàn)
接下來我們所有的實現(xiàn)redis分布式鎖都是基于Spring切面定義來完成。
// 準備工作
@Bean(name="redisTemplate")
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key haspmap序列化
template.setHashKeySerializer(redisSerializer);
return template;
}
@Aspect
@Component
public class RedisLockProvider {
@Around("@annotation(lock)")
public Object execute(ProceedingJoinPoint point, RedisLock lock) throws IllegalAccessException, InstantiationException {
Object result = null;
String lockKey = lock.lockKey();
Class<? extends RedisLockStrategy> strategy = lock.strategy();
RedisLockStrategy redisLockStrategy = strategy.newInstance();
try {
int expireTime = lock.expireTime();
if(redisLockStrategy.lock(lockKey, expireTime)){
result = point.proceed();
}
} catch (Throwable throwable) {
redisLockStrategy.unLock(lockKey);
throwable.printStackTrace();
} finally {
redisLockStrategy.unLock(lockKey);
}
return result;
}
}
我們看了不加鎖或者加java鎖的情況边翁,確實會出現(xiàn)數(shù)據(jù)異常的情況翎承,那現(xiàn)在我們通過redis本身的一些特性來實現(xiàn)redis分布式鎖。
public class RedisLockUtil0 implements RedisLockStrategy {
private String TEMP_VALUE = "OK";
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
return redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS);
}
public void unLock(String key){
redisTemplate.delete(key);
}
}
但上述場景中會有一個問題符匾,就是當我獲取鎖的時候叨咖,如果沒搶鎖成功會立刻返回到客戶端通知結(jié)果,也許下一時間正好就能搶到鎖啊胶,所以我們做了自旋的操作芒澜,并設置默認超時時間,這里也可以不設置超時時間:那就是阻塞鎖了创淡,看業(yè)務場景而制定
public class RedisLockUtil1 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 2000; // 默認超時時間(毫秒) 默認2秒
private String TEMP_VALUE = "OK";
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("申請鎖(" + key + ")成功");
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加鎖失敗, 鎖鍵值:" + key, e);
}
return Boolean.FALSE;
}
public void unLock(String key){
log.info("釋放鎖痴晦,key:"+key);
redisTemplate.delete(key);
}
}
上述實現(xiàn)方式,看似邏輯ok琳彩,有同學能看出有什么漏洞嗎誊酌?部凑??我們來測試看下碧浊。
讓我們業(yè)務時間執(zhí)行較短時測試涂邀,發(fā)現(xiàn)業(yè)務邏輯沒太大問題;
讓我們業(yè)務時間稍微變大箱锐,結(jié)果發(fā)現(xiàn)了什么比勉?
當業(yè)務執(zhí)行時間大于默認的自旋超時時間時,會觸發(fā)刪除鎖操作驹止,會出現(xiàn)誤刪的情況浩聋,A業(yè)務的鎖還未執(zhí)行完成,B鎖獲取異畴担或獲取失敗或者自旋時間已經(jīng)超時衣洁,導致誤刪了A業(yè)務的鎖,也就會導致分布式鎖的定義沒有任何意義了抖仅。所以我們在設置鎖的時候坊夫,設置一個屬于該鎖的唯一ID,在刪除鎖的時候要判斷是否屬于自己的鎖。以避免誤刪場景撤卢。
public class RedisLockUtil2 implements RedisLockStrategy {
/**
* 默認超時時間(毫秒) 默認2秒
*/
private static final long DEFAULT_TIME_OUT = 2000;
/**
* 避免誤刪key环凿,在value中賦值UUID
*/
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("加鎖成功...");
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加鎖失敗, 鎖鍵值:" + key, e);
}
return Boolean.FALSE;
}
public void unLock(String key){
if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
log.info("釋放鎖,key:"+key);
redisTemplate.delete(key);
}
}
}
那現(xiàn)在我們模擬一個場景放吩,假設我們業(yè)務的執(zhí)行時間過長的情況下智听,但我們設置的redis過期時間很短,那會出現(xiàn)什么問題呢屎慢??忽洛?我們來測試看下腻惠。
是不是導致A業(yè)務的還未執(zhí)行完成,B業(yè)務卻拿到了本應該屬于A的鎖欲虚,分布式鎖的意思有蕩然無存了集灌,所以我們借鑒了redisson中WatchDog【看門狗】的機制來完善業(yè)務時間大于過期時間的問題。
redisson是用java來實現(xiàn)的分布式框架复哆,稍后我們會介紹redisson是如何基于redis來實現(xiàn)分布式鎖并解決相關問題的欣喧。
public class RedisLockUtil3 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 2000; // 默認超時時間(毫秒) 默認2秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
/**
* 初始化任務線程池
*/
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
/**
* 延長過期次數(shù)閾值
*/
private Integer maxRenewTimes = 10;
/**
* 延長過期時間次數(shù)
*/
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("申請鎖(" + key + ")成功");
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加鎖失敗, 鎖鍵值:" + key, e);
}
return Boolean.FALSE;
}
/**
* expireTime/3 頻率去重置過期時間
* @param key
* @param value
* @param expireTime
*/
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
// 延長過期時間失敗直接返回
if(!this.renewExpiration(key, value, expireTime)){
return;
}
// 超過延長次數(shù)閾值直接返回
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
/**
* 重置過期時間
* @param lockKey
* @param lockValue
* @param lockWatchdogTimeout
* @return
*/
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String value = (String) redisTemplate.opsForValue().get(lockKey);
if (Objects.isNull(value) || !value.equals(lockValue)) {
return false;
}
log.info("延長過期時間,key:"+lockKey);
return redisTemplate.expire(lockKey, lockWatchdogTimeout, TimeUnit.SECONDS);
}
public void unLock(String key){
if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
log.info("釋放鎖梯找,key:"+key);
redisTemplate.delete(key);
}
}
}
Lua腳本語言的引入
到此為止唆阿,我們自己實現(xiàn)的分布式方案看似已經(jīng)ok,但其實還是有很大的問題锈锤,譬如在刪除鎖驯鳖、給鎖加長超時時間等操作闲询,我們是先獲取鎖在刪除或者延長超時時間,兩者操作并不是原子性操作浅辙,如果在獲取鎖成功之后扭弧,redis宕機,那么也會出現(xiàn)業(yè)務紊亂记舆,所以我們在redis操作要盡量保證院子性操作鸽捻。
那么我們可以引入Lua腳本語言來支持,Lua腳本的優(yōu)勢泽腮,興趣的同學可以下去學習下lua語言御蒲,大部分游戲開發(fā)都用的lua來實現(xiàn);語法鏈接:https://www.runoob.com/lua/lua-tutorial.html
并且我們國人章亦春 OpenResty 也是基于Lua和Nginx來實現(xiàn)高性能服務端的盛正。
Redis 腳本使用 Lua 解釋器來執(zhí)行腳本删咱。 Redis 2.6 版本通過內(nèi)嵌支持 Lua 環(huán)境。執(zhí)行腳本的常用命令為 EVAL豪筝。腳本命令:https://www.runoob.com/redis/redis-scripting.html
腳本入?yún)ⅲ簁ey個數(shù)痰滋、KEYS[1]、ARGV[1]续崖、ARGV[2]
eg:EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
eval "return redis.call('setNx', KEYS[1], ARGV[1])" 1 wuding WD
eval "return redis.call('get',KEYS[1])" 1 CCC;
接下來我們先測試下redis內(nèi)嵌lua腳本敲街。執(zhí)行成功返回1,否則返回0
@GetMapping("/luaTest")
public String luaTest() {
String script_init = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
Object initResult1 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
System.out.println(initResult1);
Object initResult2 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
System.out.println(initResult2);
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
Object expireResult = redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList("AAA"),"aaa", "5000");
System.out.println(expireResult);
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Object aaa = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("AAA"), "aaa");
System.out.println(aaa);
return aaa.toString();
}
完善嵌入lua腳本的redis分布式鎖實現(xiàn)
public class RedisLockUtil4 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 20000; // 默認超時時間(毫秒) 默認20秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
private Integer maxRenewTimes = 10;
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("redis申請鎖(" + key + ")成功");
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加鎖失敗, 鎖鍵值:" + key, e);
}
return Boolean.FALSE;
}
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
if(!this.renewExpiration(key, value, expireTime)){
return;
}
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 入?yún)⑶杏浂际亲址贤蝗痪蜁愋娃D(zhuǎn)換失敗多艇,scheduledExecutor把異常捕獲掉了看不到錯誤信息
Long expireResult = null;
try {
expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
} catch (Exception e) {
log.error("執(zhí)行l(wèi)ua腳本出錯!e:"+ e.getCause().getMessage());
return Boolean.FALSE;
}
if(expireResult == 1L){
log.info("延長過期時間像吻,key:"+lockKey);
}
return expireResult == 1L;
}
public void unLock(String key){
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
if(result == 1L){
log.info("釋放鎖峻黍,key:"+key);
}
}
}
接下來我們再設想一個問題:業(yè)務A調(diào)用業(yè)務B,AB兩者業(yè)務都調(diào)用了分布式鎖,或者A業(yè)務來做個遞歸操作立磁,那么大家猜想下會出現(xiàn)什么問題菠齿??骨饿?我們來測試看下。
把鎖的超時時間設大來進行測試台腥,如果是阻塞鎖會一直阻塞下去宏赘,非阻塞鎖的話,超時時間獲取子方法也沒有執(zhí)行黎侈,業(yè)務邏輯也就會有問題察署。這就是我們可重入性的問題。
可重入鎖指的是可重復可遞歸調(diào)用的鎖峻汉,在外層使用鎖之后箕母,在內(nèi)層仍然可以使用储藐,如果沒有可重入鎖的支持,在第二次嘗試獲得鎖時將會進入死鎖狀態(tài)嘶是。
通俗理解就是:排隊打水钙勃,一個人只能用一個桶來接水,如果你還有一個桶聂喇,只能再去排隊辖源,這就是非重入性,反之希太,只要到排到你克饶,不管你拿幾個桶你都可以來接滿水。
public class RedisLockUtil5 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 20000; // 默認超時時間(毫秒) 默認2秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
private Integer maxRenewTimes = 10;
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key){
return lock(key, 10);
}
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
LockInfo lockInfo = ThreadLocalUtil.get();
if(Objects.nonNull(lockInfo)
&& key.equals(lockInfo.getKey())){
lockInfo.getCount().incrementAndGet();
ThreadLocalUtil.put(lockInfo);
// 將threadLocal中的value賦值給當前TEMP_VALUE誊辉,保證可重入性矾湃,保證刪除邏輯正常
TEMP_VALUE = lockInfo.getValue();
// TODO 這里應該重置redis過期時間
log.info("可重入加鎖成功...");
return Boolean.TRUE;
}
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("redis申請鎖(" + key + ")成功");
ThreadLocalUtil.put(new LockInfo(key, TEMP_VALUE, new AtomicInteger(1)));
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加鎖失敗, 鎖鍵值:" + key, e);
}
return Boolean.FALSE;
}
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
if(!this.renewExpiration(key, value, expireTime)){
return;
}
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
ThreadLocalUtil.clear();
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 入?yún)⑶杏浂际亲址蝗痪蜁愋娃D(zhuǎn)換失敗堕澄,scheduledExecutor把異常捕獲掉了看不到錯誤信息
Long expireResult = null;
try {
expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
} catch (Exception e) {
log.error("執(zhí)行l(wèi)ua腳本出錯邀跃!e:"+ e.getCause().getMessage());
return Boolean.FALSE;
}
if(expireResult == 1L){
log.info("延長過期時間,key:"+lockKey);
}
return expireResult == 1L;
}
public void unLock(String key){
LockInfo lockInfo = ThreadLocalUtil.get();
if(Objects.nonNull(lockInfo)
&& key.equals(lockInfo.getKey())
&& TEMP_VALUE.equals(lockInfo.getValue())){
lockInfo.getCount().decrementAndGet();
ThreadLocalUtil.put(lockInfo);
log.info("釋放threadLocal鎖蛙紫,key:"+key);
}
if(Objects.nonNull(lockInfo) && lockInfo.getCount().get() <= 0){
try {
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
if(result == 1L){
log.info("釋放redis鎖拍屑,key:"+key);
}
} finally {
ThreadLocalUtil.clear();
}
}
}
/**
* 也可以將線程id存入redis中去做比較,有興趣可以自行實現(xiàn)
*/
static class ThreadLocalUtil{
private static final ThreadLocal<LockInfo> THREAD_LOCAL = new ThreadLocal<>();
public static LockInfo get() {
return THREAD_LOCAL.get();
}
public static void put(LockInfo lockInfo) {
THREAD_LOCAL.set(lockInfo);
}
public static void clear() {
THREAD_LOCAL.remove();
}
}
@AllArgsConstructor
@Data
static class LockInfo{
private String key;
private String value;
private AtomicInteger count;
}
}
也可以將線程id存入redis中去做比較坑傅,有興趣可以自行實現(xiàn)
并且避免線程id可能重復僵驰,可以把每個進程標識為唯一id作為前綴,有興趣可以自行實現(xiàn)
最終形態(tài)的流程圖
解決了哪些問題
到此為止我們redis分布式鎖就最終實現(xiàn)完成了唁毒。我們回顧下我們解決了那些問題蒜茴?
- 阻塞鎖-回旋操作
- 誤刪鎖的問題
- 業(yè)務執(zhí)行時間大于分布式鎖的過期時間如何處理
- redis命令非原子性問題
- 可重入鎖的問題
Redisson實現(xiàn)
在介紹看門狗機制的時候我們有提到redisson框架,那么接下來我們看下redisson分布式鎖框架具體是如何實現(xiàn)的浆西。首先我們先針對上述mock業(yè)務通過redisson分布式鎖來探究是否會出現(xiàn)上述問題
具體實現(xiàn)
redisson.yml配置:具體配置映射對象在org.redisson.config;
# 單節(jié)點配置
singleServerConfig:
# 連接空閑超時粉私,單位:毫秒
idleConnectionTimeout: 10000
# 連接超時,單位:毫秒
connectTimeout: 10000
# 命令等待超時室谚,單位:毫秒
timeout: 3000
# 命令失敗重試次數(shù),如果嘗試達到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個指定的節(jié)點時毡鉴,將拋出錯誤崔泵。
# 如果嘗試在此限制之內(nèi)發(fā)送成功秒赤,則開始啟用 timeout(命令等待超時) 計時。
retryAttempts: 3
# 命令重試發(fā)送時間間隔憎瘸,單位:毫秒
retryInterval: 1500
# # 重新連接時間間隔入篮,單位:毫秒
# reconnectionTimeout: 3000
# # 執(zhí)行失敗最大次數(shù)
# failedAttempts: 3
# 密碼
password:
# 單個連接最大訂閱數(shù)量
subscriptionsPerConnection: 5
# 客戶端名稱
clientName: myRedis
# # 節(jié)點地址
address: redis://127.0.0.1:6379
# 發(fā)布和訂閱連接的最小空閑連接數(shù)
subscriptionConnectionMinimumIdleSize: 1
# 發(fā)布和訂閱連接池大小
subscriptionConnectionPoolSize: 50
# 最小空閑連接數(shù)
connectionMinimumIdleSize: 32
# 連接池大小
connectionPoolSize: 64
# 數(shù)據(jù)庫編號
database: 0
# DNS監(jiān)測時間間隔,單位:毫秒
dnsMonitoringInterval: 5000
# 線程池數(shù)量,默認值: 當前處理核數(shù)量 * 2
threads: 0
# Netty線程池數(shù)量,默認值: 當前處理核數(shù)量 * 2
nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"
// 準備工作
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
RedissonClient redisson = Redisson.create(
Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));
return redisson;
}
@Aspect
@Component
public class RedissonLockProvider {
@Autowired
private RedissonClient redissonClient;
@Around("@annotation(lock)")
public Object execute(ProceedingJoinPoint point, RedissonLock lock){
Object result = null;
String lockKey = lock.lockKey();
RLock rLock = redissonClient.getLock(lockKey);
rLock.lock();
try {
result = point.proceed();
} catch (Throwable throwable) {
rLock.unlock();
throwable.printStackTrace();
} finally {
rLock.unlock();
}
return result;
}
}
Redisson鎖分類
- 可重入鎖(Reentrant Lock)
- 公平鎖(Fair Lock):也是繼承了可重入鎖的
- 聯(lián)鎖(MultiLock):將多個RLock對象關聯(lián)為一個聯(lián)鎖幌甘,每個實例可以來自于不同Redisson實例潮售。
- 紅鎖(RedLock):繼承聯(lián)鎖痊项。n個master節(jié)點完全獨立,并且沒有主從同步酥诽,此時如果有n / 2 + 1個節(jié)點成功拿到鎖并且大多數(shù)節(jié)點加鎖的總耗時鞍泉,要小于鎖設置的過期時間。so加鎖成功肮帐。
- 讀寫鎖(ReadWriteLock)咖驮、信號量(Semaphore)、可過期性信號量(PermitExpirableSemaphore)训枢、閉鎖(CountDownLatch)
節(jié)點掛掉的時候托修,存在丟失鎖的風險的問題。而現(xiàn)實情況是有一些場景無法容忍的恒界,所以 Redisson 提供了實現(xiàn)了redlock算法睦刃,如果業(yè)務場景可以容忍這種小概率的錯誤,則推薦使用 RedissonLock十酣, 如果無法容忍涩拙,則推薦使用 RedissonRedLock。
源碼探究
大家如果有興趣了解上述鎖的具體實現(xiàn)原理可自行研究婆誓,本次分享主要針對默認redisson實現(xiàn)鎖方案可重入鎖RLock來講解吃环,
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
// 異步的Executor執(zhí)行器
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
// 默認超時時間為30S
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
// redis的訂閱發(fā)布模式
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
// 獲取鎖成功返回null,獲取鎖失敗 && 等待時間還早就頻繁獲取鎖并監(jiān)聽鎖是否被釋放掉
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖,沒有獲取到鎖洋幻,返回剩余ttl過期時間
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// waitTime超時后郁轻,返回false獲取鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 訂閱分布式鎖,解決通知文留,發(fā)布訂閱模式
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待鎖釋放好唯,等待超時釋放訂閱信息
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 循環(huán)去調(diào)用獲取鎖方法tryAcquire
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//1、latch其實是個信號量Semaphore燥翅,調(diào)用其tryAcquire方法會讓當前線程阻塞一段時間骑篙,避免了在while循環(huán)中頻繁請求獲取鎖;
//2森书、該Semaphore的release方法靶端,會在訂閱解鎖消息的監(jiān)聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調(diào)用;當其他線程釋放了占用的鎖凛膏,會廣播解鎖消息杨名,監(jiān)聽器接收解鎖消息,并釋放信號量猖毫,最終會喚醒阻塞在這里的線程台谍。
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 取消解鎖消息的訂閱
unsubscribe(subscribeFuture, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
// tryLock()方法最終執(zhí)行邏輯
if (leaseTime != -1) {
// 如果有設置鎖的過期時間,則直接調(diào)用lua吁断,不走看門狗邏輯
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// lock()方法最終執(zhí)行邏輯
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
// 執(zhí)行看門狗邏輯
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 將線程id存入hash中的Field趁蕊,用于解決可重入問題
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // key是否不存在
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 不存在的話把賦值Key,Filed為線程id,value為1存儲到hash數(shù)據(jù)結(jié)構(gòu)中
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 并且設置過期時間
"return nil; " + // 返回null
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果可以存在坞生,并且field等于線程id
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 則吧value++1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置過期時間
"return nil; " + // 返回null
"end; " +
"return redis.call('pttl', KEYS[1]);", // 返回過期時間
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
// watchDog看門狗邏輯
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 定時任務過期時間 internalLockLeaseTime/3 頻率來延長過期時間為internalLockLeaseTime
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
// 解鎖邏輯
// key+field不存在,直接返回null
// 存在的話掷伙,value--1,判斷value是否大于0是己,大于0重置過期時間,否則刪除key并且發(fā)布刪除訂閱事件
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
基于Zookeeper
zookeeper分布式鎖原理
- 保持獨占:多個客戶端同時過來創(chuàng)建/lock/zk-001節(jié)點任柜,那么有且僅有一個客戶端能創(chuàng)建成功赃泡。換句話說,倘若把Zookeeper上的一個節(jié)點看做是一把鎖乘盼,那么成功創(chuàng)建的客戶端則能保持獨占升熊;
- 控制時序:有一種臨時有序節(jié)點,每個來嘗試獲取鎖的客戶端绸栅,都會在Zookeeper的根目錄下創(chuàng)建一個臨時有序節(jié)點级野,Zookeeper的/lock節(jié)點維護一個序列,序號最小的節(jié)點獲取鎖成功粹胯。
- 監(jiān)聽機制:Watcher機制能原子性的監(jiān)聽Zookeeper上節(jié)點的增刪改操作
基礎理論
zk-znode
zk操作和維護的為一個個數(shù)據(jù)節(jié)點蓖柔,稱為 znode,采用類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進行管理风纠,如果 znode 節(jié)點包含數(shù)據(jù)則存儲為字節(jié)數(shù)組(byte array)况鸣。同一個節(jié)點多個客戶同時創(chuàng)建,只有一個客戶端會成功竹观,其它客戶端創(chuàng)建時將失敗镐捧。
zk的四種節(jié)點
持久性節(jié)點:節(jié)點創(chuàng)建后將會一直存在
臨時節(jié)點:臨時節(jié)點的生命周期和當前會話綁定,一旦當前會話斷開臨時節(jié)點也會刪除臭增,當然可以主動刪除懂酱。
持久有序節(jié)點:節(jié)點創(chuàng)建一直存在,并且zk會自動為節(jié)點加上一個自增的后綴作為新的節(jié)點名稱誊抛。
臨時有序節(jié)點:保留臨時節(jié)點的特性列牺,并且zk會自動為節(jié)點加上一個自增的后綴作為新的節(jié)點名稱。
zk-watcher
事件監(jiān)聽器是zookeeper中的一個很重要的特性拗窃。
None(-1), 客戶端與服務端成功建立連接
NodeCreated(1),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點被創(chuàng)建
NodeDeleted(2),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點被刪除
NodeDataChanged(3),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生變更
NodeChildrenChanged(4),Wather監(jiān)聽的對應數(shù)據(jù)節(jié)點的子節(jié)點列表發(fā)生變更
DataWatchRemoved(5),
ChildWatchRemoved(6),
_PersistentWatchRemoved _(7);
實現(xiàn)思路
首先方案:同一個節(jié)點只能創(chuàng)建一次瞎领,加鎖時檢查節(jié)點是否exist,不存在則創(chuàng)建節(jié)點,否則監(jiān)聽該節(jié)點的刪除事件随夸,當釋放鎖的時候再次競爭去創(chuàng)建節(jié)點九默。如此帶來的就是當并發(fā)量很高的時候,釋放鎖會喚醒許多客戶端都去競爭逃魄,競爭失敗的客戶端再去休眠荤西,如此反復對系統(tǒng)資源造成了極大的浪費澜搅。
為了規(guī)避以上問題伍俘,我們可以使用有序子節(jié)點的形式來實現(xiàn)分布式鎖邪锌,而且為了規(guī)避客戶端獲取鎖后突然斷線的風險,我們有必要使用臨時有序節(jié)點癌瘾。
多個客戶端競爭鎖資源觅丰,創(chuàng)建多個臨時有序節(jié)點,檢查所屬節(jié)點是否是最小節(jié)點妨退,如若是妇萄,則獲取鎖成功,如若不是咬荷,那就監(jiān)聽自己節(jié)點-1的刪除事件冠句,等待被喚醒。
這種方案在每次釋放鎖時只喚醒一個客戶端幸乒,減少了線程喚醒的代價懦底,提高了效率。
zk原生API實現(xiàn)
接下來我們通過zk原生實現(xiàn)API來實現(xiàn)分布式鎖
public class ZkLockUtil implements Watcher {
public static final String NODE_PATH = "/lock-space-watcher";
private ZooKeeper zk = null;
public ZkLockUtil() throws IOException, KeeperException, InterruptedException {
zk = new ZooKeeper("127.0.0.1:2181", 300000, this);
}
protected CountDownLatch countDownLatch=new CountDownLatch(1);
private String lockPath;
public String createNode(String key){
try {
String node = NODE_PATH +"/"+ key;
//檢測節(jié)點是否存在
Stat stat = zk.exists(node, false);
//父節(jié)點不存在罕扎,則創(chuàng)建父節(jié)點
if(Objects.isNull(stat)){
synchronized (NODE_PATH) {
//父節(jié)點是持久節(jié)點 一層層創(chuàng)建否則會報錯
zk.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
lockPath = zk.create(node + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("節(jié)點創(chuàng)建成功聚唐,返回值【"+lockPath+"】");
return lockPath;
} catch (KeeperException e1) {
e1.printStackTrace();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return null;
}
//校驗當前節(jié)點是否為序號最小的節(jié)點
public boolean checkLockPath(String key, String lockPath){
String nodePath = NODE_PATH + "/" + key;
try {
//注冊父節(jié)點監(jiān)聽事件,當父節(jié)點下面的子節(jié)點有變化,就會觸發(fā)Watcher事件
List<String> nodeList = zk.getChildren(nodePath, false);
Collections.sort(nodeList);
int index = nodeList.indexOf( lockPath.substring(nodePath.length()+1));
switch (index){
case -1:{
System.out.println("本節(jié)點已不在了"+lockPath);
return false;
}
case 0:{
System.out.println("獲取鎖成功腔召,子節(jié)點序號【"+lockPath+"】");
return true;
}
default:{
String waitPath = nodeList.get(index - 1);
zk.exists(nodePath+"/"+waitPath, this);
System.out.println(waitPath+"在"+nodeList.get(index)+"點前面,需要等待【"+nodeList.get(index)+"】");
return false;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean lock(String key, Integer waitTime){
//創(chuàng)建獲取鎖的節(jié)點(順序臨時節(jié)點)
String childPath = createNode(key);
boolean flag = true;
AtomicInteger atomicInteger = new AtomicInteger(0);
if(null != childPath){
lockPath = childPath;
try {
//輪詢等待zk獲取鎖的通知
while(flag){
if(checkLockPath(key, childPath)){
//獲取鎖成功
return true;
}
if(null != waitTime && atomicInteger.get() > 0){
// 刪除當前等待節(jié)點
return false;
}
//節(jié)點創(chuàng)建成功杆查, 則等待zk通知
if(null != waitTime){
countDownLatch.await(waitTime, TimeUnit.SECONDS);
atomicInteger.incrementAndGet();
System.out.println("await等待被喚醒~"+waitTime);
}else{
countDownLatch.await();
System.out.println("await等待被喚醒~");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println("節(jié)點沒有創(chuàng)建成功,獲取鎖失敗");
}
return false;
}
@Override
public void process(WatchedEvent event) {
//成功連接zk臀蛛,狀態(tài)判斷
if(event.getState() == Event.KeeperState.SyncConnected){
//子節(jié)點有變化
if(event.getType() == Event.EventType.NodeDeleted){
System.out.println("臨時節(jié)點自動刪除");
countDownLatch.countDown();
}
}
}
public void unlock(){
try {
zk.delete(getLockPath(), -1);
if(Objects.nonNull(zk)){
zk.close();
}
} catch (Exception e) {
}
}
public String getLockPath() {
return lockPath;
}
}
上述實現(xiàn)方式雖能更好的理解zk來實現(xiàn)分布式鎖的邏輯亲桦,但本身zk原生實現(xiàn)編碼實現(xiàn)較多,并且很難保證是否有有問題浊仆,不太建議自己編碼來實現(xiàn)zk原生的分布式鎖烙肺,如果有興趣的同學可自行實現(xiàn)可重入鎖的邏輯,上述已經(jīng)分析了很多實現(xiàn)方案氧卧,這兒不在對此深入桃笙。
客戶端Curator實現(xiàn)
接下來我們通過zk的客戶端Curator來實現(xiàn)zk的分布式鎖。
Apache 開源框架Curator是一個比較完善的ZooKeeper客戶端框架沙绝,通過封裝的一套高級API 簡化了ZooKeeper的操作搏明。
Curator鎖分類
可重入互斥鎖 InterProcessMutex
不可重入互斥鎖 InterProcessSemaphoreMutex
讀寫鎖 InterProcessReadWriteLock
集合鎖 InterProcessMultiLock
具體實現(xiàn)
接下來我們通過zk客戶端Curator來實現(xiàn)分布式鎖
InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
// InterProcessMutex mutex = new InterProcessMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
public String kill() throws Exception {
mutex.acquire();
// mutex.acquire(15, TimeUnit.SECONDS);
try{
// 定義商品key值
String key = "goods";
// 獲取商品數(shù)量
Object obj = redisTemplate.opsForValue().get(key);
Integer mount = Integer.valueOf(obj.toString());
// 如果商品被搶完,直接返回
if (mount < 0 || mount == 0) {
System.out.println("很遺憾闪檬,商品已被搶完【kill】");
return "很遺憾星著,商品已被搶完";
}
// 線程睡眠,目的在于放大錯誤
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 搶到商品后粗悯,將redis的商品數(shù)量減一
mount = --mount;
redisTemplate.opsForValue().set(key, mount.toString());
// 打印虚循,以便觀察
System.err.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":搶到第" + (mount + 1) + "件商品【kill】");
} catch (Exception e) {
mutex.release();
e.printStackTrace();
} finally {
mutex.release();
}
return "恭喜,商品搶購成功";
}
Curator-InterProcessMutex可重入鎖源碼探究
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
// LockData存儲當前持有鎖的線程:為了實現(xiàn)可重入
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 當前鎖 重入++1
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 獲取鎖成功放到緩存中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
...省略部分代碼
// 創(chuàng)建臨時有序節(jié)點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 執(zhí)行獲取鎖邏輯
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
...省略部分代碼
return ourPath;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// 對所有節(jié)點進行排序:從小到大
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 返回當前節(jié)點或者等待節(jié)點的上一個節(jié)點
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
// 上一個節(jié)點路徑信息
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//設置監(jiān)聽器,getData會判讀前一個節(jié)點是否存在横缔,不存在就會拋出異常從而不會設置監(jiān)聽器
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
// 等待一段時間被喚醒
wait(millisToWait);
}
else
{
// 一直等待被喚醒
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// LockData當前線程可重入value--1
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
// value == 0 的時候才真正刪除臨時節(jié)點
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
final void releaseLock(String lockPath) throws Exception
{
// 一處所有監(jiān)聽者
client.removeWatchers();
revocable.set(null);
// 刪除臨時節(jié)點
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
分布式鎖各自有何優(yōu)勢
基于Redis的分布式鎖铺遂,適用于并發(fā)量很大、性能要求很高的茎刚、而可靠性問題可以通過其他方案去彌補的場景襟锐。
基于zk的分布式鎖,適用于高可靠(高可用)而并發(fā)量不是太大的場景膛锭;因為每次在創(chuàng)建鎖和釋放鎖的過程中粮坞,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能初狰。大家知道莫杈,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務器來執(zhí)行,然后Leader服務器還需要將數(shù)據(jù)同不到所有的Follower機器上奢入,這樣頻繁的網(wǎng)絡通信姓迅,性能的短板是非常突出的。
而數(shù)據(jù)庫來實現(xiàn)的分布式鎖俊马,受制于連接池資源丁存、無鎖失效機制、單點等因素柴我,在并發(fā)量較低可靠性不那么強的時候也可以用解寝。
分布式鎖沒有絕對的可靠性,只能通過人為補償機制竟可能的提升鎖可靠性艘儒。