基礎(chǔ)理論
- redisson是使用java實現(xiàn)的操作redis的一個工具姐军,redisson可以作為spring-data的底層實現(xiàn),通過redisTemplate封裝的api來使用饼丘,redisson利用redis中的hash數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)獲取鎖屡穗、鎖重入和釋放鎖等。
- redisson包含了各種分布式鎖的實現(xiàn)其中包括了汛骂,可重入鎖、公平鎖、讀寫鎖担敌、聯(lián)鎖桃犬、紅鎖等敢伸。java中常用的鎖redisson都有分布式的實現(xiàn)方案尾序。
- redisson實現(xiàn)的可重入鎖原理跟java中的ReentrantLock類似,通過redis的hash數(shù)據(jù)來獲取鎖和鎖的可重入,redis的發(fā)布訂閱消息實現(xiàn)了線程阻塞和重試獲取鎖。
源碼和流程
獲取鎖原理解析
- 如下圖使用redisson加鎖后會在redis中創(chuàng)建一個hash類型的數(shù)據(jù)秸侣,其中redisKey是order,key是線程的線程id+線程獲取鎖的次數(shù),value是線程獲取鎖的次數(shù)味榛。
- redisson獲取鎖的實現(xiàn)是通過lua腳本來實現(xiàn)的具體實現(xiàn)如下
//判斷rediskey是否存在巾表,如果不存在則表示鎖沒有被其它線程獲取
"if (redis.call('exists', KEYS[1]) == 0) then " +
//創(chuàng)建命名為order的hash數(shù)據(jù)鞠苟,并且把線程id作為key考榨,1作為value存入hash中
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redis過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回nil在java中就是null
"return nil; " +
"end; " +
//到這一步了則表示鎖已經(jīng)被獲取了接下來判斷獲取鎖的線程是否是當前線程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果獲取鎖成功散休,代表獲取鎖次數(shù)的value+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redisKey的有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//到了這一步則表示獲取鎖已經(jīng)失敗了扔嵌,最后返回redisKey有效期的剩余時間
"return redis.call('pttl', KEYS[1]);"
3.源碼解析
@Test
void method1() throws InterruptedException {
// 嘗試獲取鎖
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("獲取鎖失敗 .... 1");
return;
}
try {
log.info("獲取鎖成功 .... 1");
method2();
log.info("開始執(zhí)行業(yè)務 ... 1");
} finally {
log.warn("準備釋放鎖 .... 1");
lock.unlock();
}
}
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//1.執(zhí)行l(wèi)ua腳本并且根據(jù)返回的結(jié)果ttl判斷獲取鎖是否成功
//2.如果獲取鎖成功并且leaseTime(鎖釋放時間)為-1則開啟看門狗
//刷新鎖的過期時間防止鎖過期失效
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//計算剩余的鎖等待時間如果過期了直接返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
//訂閱鎖釋放消息宜狐,訂閱成功后線程會被阻塞在這里等待其它線程釋放鎖
//并且發(fā)布消息俭驮,等待是有時間限制的
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
//超過鎖等待時間訂閱的消息還未發(fā)布直接取消訂閱并且返回false
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
//計算剩余的鎖等待時間并且判斷等待時間是否<=0如果等待時間
//用完了直接返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
//重新嘗試獲取鎖根據(jù)返回的ttl判斷鎖是否獲取成功
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//判斷剩余的鎖等待超時時間是否清零了如果清零了返回false
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 比較ttl和time誰時間少崭倘,時間少的作為第二次訂閱消息的
//等待超時時間
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
//判斷剩余時間是否超時
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//阻塞線程等到lua腳本返回的ttl的值
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//leaseTime鎖過期時間 如果!= -1 不需要開啟看門狗執(zhí)行后直接返回即可
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//執(zhí)行釋放鎖和發(fā)布鎖釋放消息的lua腳本
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//等待lua腳本執(zhí)行結(jié)束
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
//判斷如果獲取鎖成功坞淮,設(shè)置看門狗
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//ExpirationEntry map存放這所有的看門狗定時任務主要作用是
//1.循環(huán)執(zhí)行定時任務時判斷鎖是否被釋放,鎖釋放時會把map中的key刪除凌盯。
//2.釋放鎖時刪除map中的看門狗定時任務
//getEntryName() -> e7b433e7-8f44-46c8-b98a-c59f487b6136:order
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
//獲取看門狗定時任務判斷鎖是否被釋放
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//執(zhí)行定時任務
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//重置redisKey的時間
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//遞歸調(diào)用定時任務,保障鎖未釋放期間redisKey不會過期
if (res) {
// reschedule itself
renewExpiration();
}
});
}
//internalLockLeaseTime / 3 = 10s 默認10s刷一次
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
釋放鎖原理
- redisson釋放鎖的實現(xiàn)也是通過lua腳本來實現(xiàn)的具體實現(xiàn)如下
//檢查鎖是否是自己的
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
//如果鎖不是自己的直接返回nil
"return nil;" +
"end; " +
//釋放一次鎖之后返回剩余的數(shù)量
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果counter>0表示鎖還未被完全釋放
"if (counter > 0) then " +
//重置鎖的有效期
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//counter == 0 表示鎖被完全釋放刪除redisKey
"redis.call('del', KEYS[1]); " +
//發(fā)布消息給還在等待獲取鎖的線程
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
- 源碼解析
@Test
void method1() throws InterruptedException {
// 嘗試獲取鎖
lock.lock();
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("獲取鎖失敗 .... 1");
return;
}
try {
log.info("獲取鎖成功 .... 1");
method2();
log.info("開始執(zhí)行業(yè)務 ... 1");
} finally {
log.warn("準備釋放鎖 .... 1");
lock.unlock();
}
}
@Override
public void unlock() {
try {
//阻塞等待釋放鎖的流程執(zhí)行
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//執(zhí)行釋放鎖的lua腳本并且發(fā)布鎖釋放消息
RFuture<Boolean> future = unlockInnerAsync(threadId);
//等待lua腳本執(zhí)行完并且執(zhí)行下面的代碼
future.onComplete((opStatus, e) -> {
//取消看門狗定時任務
cancelExpirationRenewal(threadId);
//出現(xiàn)異常拋出異常
if (e != null) {
result.tryFailure(e);
return;
}
//lua腳本執(zhí)行返回的結(jié)果opStatus如果是null釋放鎖失敗
//拋出業(yè)務異常
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//鎖釋放成功
result.trySuccess(null);
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
//從EXPIRATION_RENEWAL_MAP中取出看門狗定時任務判斷定時任務是否為空
//如果定時任務為空直接停止
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
//刪除EXPIRATION_RENEWAL_MAP中的定時任務
if (threadId != null) {
task.removeThreadId(threadId);
}
//取消定時任務
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
獲取鎖和釋放鎖流程
如下圖左側(cè)的流程圖是獲取redisson鎖的過程右側(cè)的流程圖是釋放redisson鎖的流程圖