為什么要用分布式鎖
- 數(shù)據(jù)庫樂觀鎖
- redis分布式鎖
- zookeeper分布式鎖
使用分布式鎖的場景
實現(xiàn)分布式鎖的方案
必要條件
1.互斥杨伙。
同一時刻,多臺服務(wù)器的多個請求只允許一個請求加鎖成功
2.避免死鎖传藏。
加鎖的客戶端在持有鎖期間由于宕機網(wǎng)絡(luò)延遲等原因沒有主動解鎖,也能保證鎖會釋放,不應(yīng)想其他請求獲取鎖成功
3.解鈴還須系鈴人
加鎖和解鎖的客戶端要保持一致毯侦。
數(shù)據(jù)庫樂觀鎖實現(xiàn)或字段唯一性約束
此處代碼省略
redis分布式鎖實現(xiàn)
/**
* 嘗試獲取鎖
* @param key
* @param value 為了滿足解鈴還須系鈴人西壮,此處傳入requestId,標識哪個客戶端加的鎖〗芯可以用 UUID.randomUUID().toString()生成
* @param expireTime 過期時間 避免鎖持有者后續(xù)發(fā)生崩潰而未解鎖 造成死鎖
* @param unit 過期時間單位
* @return 是否加鎖成功
*/
public static boolean tryLock(String key, String value, Long expireTime, TimeUnit unit) {
return redisService.setIfAbsent(key, value, expireTime, unit);
}
public<K, V> boolean setIfAbsent(final K key,V value,Long expireTime,TimeUnit unit) {
boolean result = false;
try {
ValueOperations<K, V> operations = redisTemplate.opsForValue();
//原子操作
result = operations.setIfAbsent(key,value,expireTime,unit);
} catch (Exception e) {
logger.error("setIfAbsent error: key {}, value {},expireTime {}",key,value,expireTime,e);
}
return result;
}
執(zhí)行上面的方法款青,如果當前鎖(key)不能存在,那么就進行加鎖操作同時對鎖設(shè)置有效期返回true霍狰。 value是加鎖客戶端的標識抡草。如果當前鎖(key)已存在,不做任何操作返回false蔗坯。
注意:加鎖和設(shè)置時間要是一條命令康震,保證原子性。不能兩條命令分開做宾濒。如果加鎖后客戶端突然崩潰腿短,導(dǎo)致鎖沒有設(shè)置過期時間,將會發(fā)生死鎖
/**
* 解鎖
* @param key
* @param value 此處傳入requestId绘梦,請求標識
* @return 是否釋放鎖成功
*/
public static boolean unLock(String key,String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
return redisService.execute(key,value,script);
}
Lua腳本作用:首先獲取鎖對應(yīng)的value值橘忱,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)卸奉。
zookepeer分布式鎖實現(xiàn)
基于臨時有序節(jié)點的特性實現(xiàn)和wahcher機制實現(xiàn)钝诚。 001 002 003
為了避免驚群效應(yīng),每個節(jié)點只監(jiān)聽比自己大的前一個節(jié)點即可榄棵。否則會帶來巨大的性能開銷凝颇。
//就不用zkCliend費勁去寫了,curator 可以非常方便的使用zk分布式鎖
@Override
public void lock() throws DistributedLockException {
try{
interProcessMutex.acquire();
} catch (Exception e){
throw new DistributedLockException("加鎖異常: ", e);
}
}
@Override
public boolean tryLock(Long time, TimeUnit timeUnit) throws DistributedLockException {
try {
return interProcessMutex.acquire(time, timeUnit);
} catch (Exception e) {
throw new DistributedLockException("加鎖異常: ", e);
}
}
@Override
public void unlock() throws DistributedLockException {
try {
interProcessMutex.release();
} catch (Exception e) {
throw new DistributedLockException("釋放鎖異常: ", e);
}
}
三者比較:
數(shù)據(jù)庫:
1.這把鎖強依賴數(shù)據(jù)庫的可用性疹鳄,數(shù)據(jù)庫是一個單點拧略,一旦數(shù)據(jù)庫掛掉,會導(dǎo)致業(yè)務(wù)系統(tǒng)不可用瘪弓。
2.這把鎖沒有失效時間垫蛆,一旦解鎖操作失敗,就會導(dǎo)致鎖記錄一直在數(shù)據(jù)庫中杠茬,其他線程無法再獲得到鎖月褥。
3.這把鎖只能是非阻塞的弛随,因為數(shù)據(jù)的insert操作,一旦插入失敗就會直接報錯。沒有獲得鎖的線程并不會進入排隊隊列扰法,要想再次獲得鎖就要再次觸發(fā)獲得鎖操作密任。
4.這把鎖是非重入的,同一個線程在沒有釋放鎖之前無法再次獲得該鎖。因為數(shù)據(jù)中數(shù)據(jù)已經(jīng)存在了走贪。
5.數(shù)據(jù)庫是寶貴的系統(tǒng)資源佛猛,考慮是否會影響 正常業(yè)務(wù)的使用。
redis:
1.失效時間我設(shè)置多長時間為好坠狡?如何設(shè)置的失效時間太短继找,方法沒等執(zhí)行完,鎖就自動釋放了逃沿,那么就會產(chǎn)生并發(fā)問題婴渡。如果設(shè)置的時間太長,其他獲取鎖的線程就可能要平白的多等一段時間凯亮。這個問題使用數(shù)據(jù)庫實現(xiàn)分布式鎖同樣存在
2.非阻塞边臼?while重復(fù)執(zhí)行。
3.非可重入假消?在一個線程獲取到鎖之后柠并,把當前主機信息和線程信息保存起來,下次再獲取之前先檢查自己是不是當前鎖的擁有者富拗。
4.可以使用緩存來代替數(shù)據(jù)庫來實現(xiàn)分布式鎖臼予,這個可以提供更好的性能,同時啃沪,很多緩存服務(wù)都是集群部署的瘟栖,可以避免單點問題。并且很多緩存服務(wù)都提供了可以用來實現(xiàn)分布式鎖的方法谅阿,比如Tair的put方法半哟,redis的setnx方法等。并且签餐,這些緩存服務(wù)也都提供了對數(shù)據(jù)的過期自動刪除的支持寓涨,可以直接設(shè)置超時時間來控制鎖的釋放。
性能好氯檐,實現(xiàn)起來較為方便戒良。
通過超時時間來控制鎖的失效時間并不是十分的靠譜。
zk:
1.鎖無法釋放冠摄?使用Zookeeper可以有效的解決鎖無法釋放的問題糯崎,因為在創(chuàng)建鎖的時候,客戶端會在ZK中創(chuàng)建一個臨時節(jié)點河泳,一旦客戶端獲取到鎖之后突然掛掉(Session連接斷開)沃呢,那么這個臨時節(jié)點就會自動刪除掉。其他客戶端就可以再次獲得鎖拆挥。
2.非阻塞鎖薄霜?使用Zookeeper可以實現(xiàn)阻塞的鎖,客戶端可以通過在ZK中創(chuàng)建順序節(jié)點,并且在節(jié)點上綁定監(jiān)聽器惰瓜,一旦節(jié)點有變化否副,Zookeeper會通知客戶端,客戶端可以檢查自己創(chuàng)建的節(jié)點是不是當前所有節(jié)點中序號最小的崎坊,如果是备禀,那么自己就獲取到鎖,便可以執(zhí)行業(yè)務(wù)邏輯了奈揍。
3.不可重入痹届?使用Zookeeper也可以有效的解決不可重入的問題,客戶端在創(chuàng)建節(jié)點的時候打月,把當前客戶端的主機信息和線程信息直接寫入到節(jié)點中队腐,下次想要獲取鎖的時候和當前最小的節(jié)點中的數(shù)據(jù)比對一下就可以了。如果和自己的信息一樣奏篙,那么自己直接獲取到鎖柴淘,如果不一樣就再創(chuàng)建一個臨時的順序節(jié)點,參與排隊秘通。
4.單點問題为严?使用Zookeeper可以有效的解決單點問題,ZK是集群部署的肺稀,只要集群中有半數(shù)以上的機器存活第股,就可以對外提供服務(wù)。
Curator提供的InterProcessMutex是分布式鎖的實現(xiàn)话原。acquire方法用戶獲取鎖夕吻,release方法用于釋放鎖。
5.使用ZK實現(xiàn)的分布式鎖好像完全符合了本文開頭我們對一個分布式鎖的所有期望繁仁。但是涉馅,其實并不是,Zookeeper實現(xiàn)的分布式鎖其實存在一個缺點黄虱,那就是性能上可能并沒有緩存服務(wù)那么高稚矿。因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建捻浦、銷毀瞬時節(jié)點來實現(xiàn)鎖功能晤揣。ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務(wù)器來執(zhí)行,然后將數(shù)據(jù)同不到所有的Follower機器上朱灿。
6.其實昧识,使用Zookeeper也有可能帶來并發(fā)問題,只是并不常見而已母剥≈团担考慮這樣的情況形导,由于網(wǎng)絡(luò)抖動环疼,客戶端可ZK集群的session連接斷了习霹,那么zk以為客戶端掛了,就會刪除臨時節(jié)點炫隶,這時候其他客戶端就可以獲取到分布式鎖了淋叶。就可能產(chǎn)生并發(fā)問題。這個問題不常見是因為zk有重試機制伪阶,一旦zk集群檢測不到客戶端的心跳煞檩,就會重試,Curator客戶端支持多種重試策略栅贴。多次重試之后還不行的話才會刪除臨時節(jié)點斟湃。(所以,選擇一個合適的重試策略也比較重要檐薯,要在鎖的粒度和并發(fā)之間找一個平衡凝赛。)
總結(jié)
上面幾種方式,哪種方式都無法做到完美坛缕。就像CAP一樣墓猎,在復(fù)雜性、可靠性赚楚、性能等方面無法同時滿足毙沾,所以,根據(jù)不同的應(yīng)用場景選擇最適合自己的才是王道宠页。
從理解的難易程度角度(從低到高)
數(shù)據(jù)庫 > 緩存 > Zookeeper
從實現(xiàn)的復(fù)雜性角度(從低到高)
Zookeeper >= 緩存 > 數(shù)據(jù)庫
從性能角度(從高到低)
緩存 > Zookeeper >= 數(shù)據(jù)庫
從可靠性角度(從高到低)
Zookeeper > 緩存 > 數(shù)據(jù)庫
坑:
如圖所示左胞,curator 加鎖成功后leases下回創(chuàng)建臨時節(jié)點,
在釋放鎖后举户,此目錄并不會刪掉罩句。
因此需要一個定時任務(wù),定時清理目錄敛摘。
public class LockBackGroundConf {
/** 執(zhí)行頻率, 默認一小時一次, 單位秒 */
private Long frequency = 60*60L;
/** 刪除幾天前的數(shù)據(jù), 默認1天前的數(shù)據(jù), 單位秒 */
private Long beforeTime = 24*60*60L;
}
public class LockBackGroundThread extends Thread{
private Logger logger = LoggerFactory.getLogger(getClass());
CuratorFramework client;
protected LockBackGroundThread(CuratorFramework client){
this.client = client;
this.setDaemon(true);
this.setName("ZkMutexDistributedLock---background");
}
@Override
public synchronized void run() {
super.run();
try {
while (true){
//TODO 后期可以通過配置中心 配置
LockBackGroundConf conf = new LockBackGroundConf();
deleteInvalidNode(conf);
// 默認一小時執(zhí)行一次(配置中心可配)
Thread.currentThread().wait(conf.getFrequency()*1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void deleteInvalidNode(LockBackGroundConf conf) throws Exception{
String projectDir = ZkMutexDistributedLockFactory.lockPath + ZkMutexDistributedLockFactory.projectName;
Stat exitDir = client.checkExists().forPath(projectDir.substring(0, projectDir.length()-1));
if(exitDir == null){
logger.error("根目錄尚未創(chuàng)建门烂,本次清理結(jié)束--" + projectDir);
return;
}
List<String> paths = client.getChildren().forPath(projectDir.substring(0, projectDir.length()-1));
Date date = new Date();
paths.forEach(currPath -> {
try{
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(projectDir + currPath);
// 默認刪除一天前無效的數(shù)據(jù)。 子節(jié)點為0兄淫,說明當前節(jié)點無效
if(stat.getMtime()<(date.getTime() - (conf.getBeforeTime()*1000)) && stat.getNumChildren() == 0){
// 只刪除空目錄
client.delete().forPath(projectDir + currPath);
logger.info("刪除路徑: " + projectDir + currPath);
}
}catch (Exception e){
logger.error("刪除節(jié)點失敗: ", e);
}
});
}
初始化zk客戶端的時候屯远,啟動后臺線程清理空目錄。
private static synchronized void init() {
if(client==null){
String IPAndPort = PropertiesReader.getProperties("zkConfig").getProperty("lockServers");
String projectName = ProjectUtils.PROJECT_NAME.toLowerCase();
if(StringUtils.isEmpty(IPAndPort) || StringUtils.isEmpty(projectName)){
logger.error("zk鎖啟動失敗缺少配置--IP和端口號/項目名");
throw new RuntimeException("zk鎖啟動異常--缺少配置--IP和端口號/項目名");
}
ZkMutexDistributedLockFatory.projectName = projectName+"/";
client = CuratorFrameworkFactory.builder().connectString(IPAndPort).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
// 啟動后臺線程
LockBackGroundThread backGroundThread = new LockBackGroundThread(client);
backGroundThread.start();
}
}
其他
分布式鎖的各種實現(xiàn)見仁見智捕虽,在適當?shù)膱鼍斑x擇合適的實現(xiàn)即可慨丐。在開發(fā)中,我們可以講分布式鎖的實現(xiàn)封裝在公共模塊泄私,對專注于業(yè)務(wù)開發(fā)的 程序員大兄弟們 屏蔽 底層實現(xiàn)的差異房揭,讓他們用最簡單的方式备闲,就可以讓某一方法實現(xiàn)分布式鎖的效果,沒錯正是自定義注解+AOP的形式捅暴。
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomerLock {
/**
* lock key
* eg #arg.id
*
* @return
*/
String lockKey();
/** 后綴
* @return
*/
String lockSuffix() default "";
/** 前綴
* @return
*/
String lockPrefix() default "";
/** 分割符
* @return
*/
String separator() default "#";
/** 實現(xiàn)類對應(yīng)的名稱 默認使用redis
* @return
*/
String lockType() default "";
/**
* 是否使用嘗試鎖恬砂。
*/
boolean tryLock() default false;
/**
* 最長等待時間。
* 該字段只有當tryLock()返回true才有效蓬痒。
*/
int waitTime() default 0;
/**
* 鎖超時時間泻骤。
* 超時時間過后,鎖自動釋放梧奢。
* 建議:
* 盡量縮簡需要加鎖的邏輯狱掂。
*/
int leaseTime() default 30;
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
@Component
@Aspect
@EnableAspectJAutoProxy
public class DistributedLockAspect {
public static final Logger logger = LoggerFactory.getLogger(DistributedLockAspect.class);
@Pointcut("@annotation(com.gpmall.commons.lock.annotation.CustomerLock)")
public void distributedLockPointcut() {
}
@Around("distributedLockPointcut()")
public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
//組成key
//切點所在的類
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
final String lockKey = getLockKey(method, pjp.getArgs());
return startLock(lockKey, pjp, method);
}
private Object startLock(final String lockKey, ProceedingJoinPoint pjp, Method method) throws Throwable {
CustomerLock annotation = method.getAnnotation(CustomerLock.class);
boolean tryLock = annotation.tryLock();
if (tryLock) {
return tryLock(pjp, annotation, lockKey);
} else {
return lock(pjp, annotation, lockKey);
}
}
private Object lock(ProceedingJoinPoint pjp, CustomerLock annotation, String lockKey) throws Throwable {
int leaseTime = annotation.leaseTime();
TimeUnit timeUnit = annotation.timeUnit();
String type = annotation.lockType();
DistributedLock distributedLock = getByType(type);
try {
distributedLock.lock(lockKey, timeUnit, leaseTime);
return pjp.proceed();
} finally {
distributedLock.unlock(lockKey);
}
}
private Object tryLock(ProceedingJoinPoint pjp, CustomerLock customerLock, String lockKey) throws Throwable {
int leaseTime = customerLock.leaseTime();
int waitTime = customerLock.waitTime();
TimeUnit timeUnit = customerLock.timeUnit();
String type = customerLock.lockType();
DistributedLock distributedLock = getByType(type);
try {
if (waitTime == 0) {
if (distributedLock.tryLock(lockKey)) {
return pjp.proceed();
}
} else {
distributedLock.tryLock(lockKey, timeUnit, waitTime, leaseTime);
return pjp.proceed();
}
} finally {
distributedLock.unlock(lockKey);
}
return null;
}
/**
* 生成分布式鎖key
*
* @param method
* @param args
* @return
*/
public String getLockKey(Method method, Object[] args) {
Objects.requireNonNull(method);
CustomerLock annotation = method.getAnnotation(CustomerLock.class);
String lockKey = parseKey(annotation.lockKey(), method, args),
separator = annotation.separator(),
prefix = annotation.lockPrefix(),
suffix = annotation.lockSuffix();
if (StringUtils.isBlank(lockKey)) {
throw new IllegalArgumentException(String.format("lock [%s] is error", lockKey));
}
StringBuilder keyGenerator = new StringBuilder();
if (StringUtils.isNotBlank(prefix)) {
keyGenerator.append(prefix).append(separator);
}
keyGenerator.append(lockKey.trim());
if (StringUtils.isNotBlank(suffix)) {
keyGenerator.append(separator).append(suffix);
}
lockKey = keyGenerator.toString().trim();
// key不允許為空
if (StringUtils.isBlank(lockKey)) {
throw new IllegalArgumentException("Can't get or generate lock accurately!");
}
logger.info("generator lock_key [" + lockKey + "]");
return lockKey;
}
/**
* 獲取緩存的key
* key 定義在注解上,支持SPEL表達式
*/
private String parseKey(String key, Method method, Object[] args) {
//獲取被攔截方法參數(shù)名列表(使用Spring支持類庫)
LocalVariableTableParameterNameDiscoverer u =
new LocalVariableTableParameterNameDiscoverer();
String[] paraNameArr = u.getParameterNames(method);
//使用SPEL進行key的解析
ExpressionParser parser = new SpelExpressionParser();
//SPEL上下文
StandardEvaluationContext context = new StandardEvaluationContext();
//把方法參數(shù)放入SPEL上下文中
for (int i = 0; i < paraNameArr.length; i++) {
context.setVariable(paraNameArr[i], args[i]);
}
return parser.parseExpression(key).getValue(context, String.class);
}
//通過 dubbo-SPI 的設(shè)計 選擇分布式鎖 實現(xiàn)
private DistributedLock getByType(String type) {
return (DistributedLock) ExtensionLoader.getExtensionLoader(DistributedLock.class).getExtension(type);
}
通過 dubbo-SPI 的設(shè)計 選擇分布式鎖 實現(xiàn),默認redis 了解dubbo-SPI
@LockSpi("redis")
public interface DistributedLock {
void lock(String key) throws DistributedLockException;
boolean tryLock(String key) throws DistributedLockException;
void lock(String lockKey, TimeUnit unit, int timeout) throws DistributedLockException;
/**
* 嘗試獲取鎖
*
* @param lockKey
* @param unit 時間單位
* @param waitTime 最多等待時間
* @param leaseTime 上鎖后自動釋放鎖時間
* @return
*/
boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) throws DistributedLockException;
/**
* 釋放鎖
* @param lockKey
* @throws DistributedLockException
*/
void unlock(String lockKey) throws DistributedLockException;
}
使用
@Override
@CustomerLock(lockKey = "#request.tradeNo",lockType = "zookeeper", tryLock = true)
public PaymentResponse execPay(PaymentRequest request) {
PaymentResponse paymentResponse=new PaymentResponse();
try {
……
}
}
開發(fā)過程中 通過spi的方式 實現(xiàn)分布式鎖策略亲轨,當業(yè)務(wù)小伙伴 不滿足于現(xiàn)有的策略 想要拓展分布式鎖時趋惨,只需要實現(xiàn)DistributedLock 接口 然后在 META-INF/lock 下建立com.gpmall.commons.lock.DistributedLock文件,配置一下即可惦蚊,例如:
工具包開發(fā)者都是通過spi搞的器虾, 業(yè)務(wù)開發(fā)者也當然也可以通過spi去拓展, 就像dubbo官網(wǎng)文檔上所說养筒,平等對待三方包一樣曾撤。大家都一樣,不搞特殊~~