本篇我們使用mysql實現(xiàn)一個分布式鎖樱哼。
環(huán)境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式鎖的功能
1,分布式鎖使用者位于不同的機(jī)器中良姆,鎖獲取成功之后谴垫,才可以對共享資源進(jìn)行操作
2,鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
3,獲取鎖有超時的功能:即在指定的時間內(nèi)去嘗試獲取鎖,超過了超時時間沟娱,如果還未獲取成功,則返回獲取失敗
4,能夠自動容錯葛作,比如:A機(jī)器獲取鎖lock1之后醒第,在釋放鎖lock1之前,A機(jī)器掛了进鸠,導(dǎo)致鎖lock1未釋放稠曼,結(jié)果會lock1一直被A機(jī)器占有著,遇到這種情況時客年,分布式鎖要能夠自動解決霞幅,可以這么做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的量瓜,其他機(jī)器將有機(jī)會獲取鎖
預(yù)備技能:樂觀鎖
通常我們修改表中一條數(shù)據(jù)過程如下:
t1:select獲取記錄R1
t2:對R1進(jìn)行編輯
t3:update R1
我們來看一下上面的過程存在的問題:
如果A司恳、B兩個線程同時執(zhí)行到t1,他們倆看到的R1的數(shù)據(jù)一樣绍傲,然后都對R1進(jìn)行編輯扔傅,然后去執(zhí)行t3,最終2個線程都會更新成功烫饼,后面一個線程會把前面一個線程update的結(jié)果給覆蓋掉猎塞,這就是并發(fā)修改數(shù)據(jù)存在的問題。
我們可以在表中新增一個版本號杠纵,每次更新數(shù)據(jù)時候?qū)姹咎栕鳛闂l件荠耽,并且每次更新時候版本號+1,過程優(yōu)化一下比藻,如下:
t1:打開事務(wù)start transaction
t2:select獲取記錄R1,聲明變量v=R1.version
t3:對R1進(jìn)行編輯
t4:執(zhí)行更新操作
update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update會返回影響的行數(shù)铝量,我們將其記錄在count中,然后根據(jù)count來判斷提交還是回滾
if(count==1){
//提交事務(wù)
commit;
}else{
//回滾事務(wù)
rollback;
}
上面重點(diǎn)在于步驟t4银亲,當(dāng)多個線程同時執(zhí)行到t1慢叨,他們看到的R1是一樣的,但是當(dāng)他們執(zhí)行到t4的時候务蝠,數(shù)據(jù)庫會對update的這行記錄加鎖拍谐,確保并發(fā)情況下排隊執(zhí)行,所以只有第一個的update會返回1请梢,其他的update結(jié)果會返回0赠尾,然后后面會判斷count是否為1,進(jìn)而對事務(wù)進(jìn)行提交或者回滾毅弧。可以通過count的值知道修改數(shù)據(jù)是否成功了当窗。
上面這種方式就樂觀鎖够坐。我們可以通過樂觀鎖的方式確保數(shù)據(jù)并發(fā)修改過程中的正確性。
使用mysql實現(xiàn)分布式鎖
我們創(chuàng)建一個分布式鎖表,如下
DROP TABLE IF EXISTS t_lock;
create table t_lock(
lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標(biāo)志',
request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標(biāo)識請求對象的',
lock_count INT NOT NULL DEFAULT 0 COMMENT '當(dāng)前上鎖次數(shù)',
timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間',
version INT NOT NULL DEFAULT 0 COMMENT '版本號元咙,每次更新+1'
)COMMENT '鎖信息表';
java代碼如下
mapper接口
package com.shiguiwu.springmybatis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import org.springframework.stereotype.Repository;
/**
* @description: 鎖mapper
* @author: stone
* @date: Created by 2021/5/30 11:12
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.mapper
*/
@Repository
public interface LockMapper extends BaseMapper<LockModel> {
}
鎖對象model
package com.shiguiwu.springmybatis.lock.model;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.Version;
import lombok.Data;
/**
* @description: 鎖模型
* @author: stone
* @date: Created by 2021/9/10 11:13
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock.model
*/
@Data
@TableName("t_lock")
public class LockModel {
/**
* 鎖的唯一值
*/
@TableId
private String lockKey;
/**
* 請求id,同一個線程里請求id一樣
*/
private String requestId;
//鎖次數(shù)
private Integer lockCount;
//鎖超時
private Long timeout;
//樂觀鎖版本
@Version
private Integer version;
}
鎖接口
package com.shiguiwu.springmybatis.lock;
/**
* @description: 鎖接口
* @author: stone
* @date: Created by 2021/9/10 11:40
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
public interface ILock<T> {
/**
* 獲取分布式鎖,支持重入
* @param lockKey 鎖可以
* @param lockTimeout 持有鎖的有效時間梯影,防止死鎖
* @param getTimeout 獲取鎖超時時間,
* @return 是否鎖成功
*/
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception;
/**
* 解鎖
* @param lockKey 鎖key
*
*/
public void unlock(String lockKey);
/**
* 重置鎖對象
* @param t 鎖對象
* @return 返回鎖記錄
*/
public int restLock(T t);
}
鎖的實現(xiàn)代碼如下
package com.shiguiwu.springmybatis.lock;
import cn.hutool.core.util.StrUtil;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import com.shiguiwu.springmybatis.mapper.LockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @description: mysql實現(xiàn)分布式鎖
* @author: stone
* @date: Created by 2021/9/10 11:09
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
@Component
@Slf4j
public class MysqlLock implements ILock<LockModel>{
static ThreadLocal<String> requestIds = new ThreadLocal<>();
@Autowired
private LockMapper lockMapper;
public String getRequestId() {
String requestId = requestIds.get();
if (StrUtil.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
requestIds.set(requestId);
}
log.info("獲取到的requestId===> {}", requestId);
return requestId;
}
/**
* 獲取鎖
* @param lockKey 鎖可以
* @param lockTimeout 持有鎖的有效時間庶香,防止死鎖
* @param getTimeout 獲取鎖超時時間甲棍,
* @return
*/
@Override
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception {
log.info(" lock start =======================> {}",lockKey);
//從local中獲取 請求id
String requestId = this.getRequestId();
//獲取鎖的結(jié)果
boolean lockResult = false;
//開始時間
long startTime = System.currentTimeMillis();
while (true) {
LockModel lockModel = lockMapper.selectById(lockKey);
if (Objects.nonNull(lockModel)) {
//獲取鎖對象的請求id
String reqId = lockModel.getRequestId();
//如果是空,表示改鎖未被占有
if (StrUtil.isBlank(reqId)) {
//馬上占有它
//設(shè)置請求id
lockModel.setRequestId(requestId);
//設(shè)置鎖次數(shù)
lockModel.setLockCount(1);
//設(shè)置超時時間赶掖,防止死鎖
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//如果request_id和表中request_id一樣表示鎖被當(dāng)前線程持有者感猛,此時需要加重入鎖
else if (requestId.equals(reqId)) {
//可重入鎖
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
//設(shè)置獲取初次
lockModel.setLockCount(lockModel.getLockCount() + 1);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//不為空,也不相等奢赂,說明是其他線程占有
else {
//鎖不是自己的陪白,并且已經(jīng)超時了,則重置鎖膳灶,繼續(xù)重試
if (lockModel.getTimeout() < System.currentTimeMillis()) {
//未超時咱士,繼續(xù)重試
this.restLock(lockModel);
}
//如果未超時,休眠100毫秒轧钓,繼續(xù)重試
else {
if (startTime + getTimeout > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
}
else {
//防止長時間阻塞
break;
}
}
}
}
//如果是空序厉,就插入一個鎖,重新嘗試獲取鎖
else {
lockModel = new LockModel();
//設(shè)置鎖key
lockModel.setLockKey(lockKey);
lockMapper.insert(lockModel);
}
}
log.info(" lock end =======================> {}",lockKey);
return lockResult;
}
/**
* 釋放鎖
* @param lockKey 鎖key
*/
@Override
public void unlock(String lockKey) {
LockModel lockModel = lockMapper.selectById(lockKey);
//獲取當(dāng)前線程的請求id
String reqId = this.getRequestId();
//獲取鎖次數(shù)
int count = 0;
//當(dāng)前線程requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖
if (Objects.nonNull(lockModel)
&& reqId.equals(lockModel.getRequestId())
&& (count = lockModel.getLockCount()) > 0) {
if (count == 1) {
//重置鎖
this.restLock(lockModel);
}
//重入鎖的問題,鎖的次數(shù)減一
else {
lockModel.setLockCount(lockModel.getLockCount() - 1);
//更新次數(shù)
lockMapper.updateById(lockModel);
}
}
}
/**
* 重置鎖
* @param lockModel 鎖對象
* @return 更新條數(shù)
*/
@Override
public int restLock(LockModel lockModel) {
lockModel.setLockCount(0);
lockModel.setRequestId("");
lockModel.setTimeout(0L);
return lockMapper.updateById(lockModel);
}
}
上面代碼中實現(xiàn)了文章開頭列的分布式鎖的所有功能毕箍,大家可以認(rèn)真研究下獲取鎖的方法:lock脂矫,釋放鎖的方法:unlock。
測試用例
package com.shiguiwu.springmybatis;
import com.shiguiwu.springmybatis.lock.ILock;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @description: 鎖測試
* @author: stone
* @date: Created by 2021/9/10 15:32
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis
*/
@SpringBootTest
@Slf4j
public class LockApplicationTests {
@Autowired
private ILock<LockModel> mysqlLock;
////測試重復(fù)獲取和重復(fù)釋放
@Test
public void testRepeat() throws Exception {
for (int i = 0; i < 10; i++) {
mysqlLock.lock("key1", 10000L, 1000);
}
for (int i = 0; i < 10; i++) {
mysqlLock.unlock("key1");
}
}
// //獲取之后不釋放霉晕,超時之后被thread1獲取
@Test
public void testTimeout() throws Exception {
String lockKey = "key2";
mysqlLock.lock(lockKey, 5000L, 1000);
Thread thread1 = new Thread(() -> {
try {
mysqlLock.lock(lockKey, 5000L, 7000);
} catch (Exception e) {
e.printStackTrace();
} finally {
mysqlLock.unlock(lockKey);
}
}, "thread1");
thread1.start();
thread1.join();
}
}
test1方法測試了重入鎖的效果庭再。
test2測試了主線程獲取鎖之后一直未釋放,持有鎖超時之后被thread1獲取到了
留給大家一個問題
上面分布式鎖還需要考慮一個問題:比如A機(jī)會獲取了key1的鎖牺堰,并設(shè)置持有鎖的超時時間為10秒拄轻,但是獲取鎖之后,執(zhí)行了一段業(yè)務(wù)操作伟葫,業(yè)務(wù)操作耗時超過10秒了恨搓,此時機(jī)器B去獲取鎖時可以獲取成功的,此時會導(dǎo)致A筏养、B兩個機(jī)器都獲取鎖成功了斧抱,都在執(zhí)行業(yè)務(wù)操作,這種情況應(yīng)該怎么處理渐溶?大家可以思考一下然后留言辉浦,我們一起討論一下。