前言
在多線程情況下訪問資源,我們需要加鎖來保證業(yè)務(wù)的正常進行,JDK中提供了很多并發(fā)控制相關(guān)的工具包,來保證多線程下可以高效工作,同樣在分布式環(huán)境下,有些互斥操作我們可以借助分布式鎖來實現(xiàn)兩個操作不能同時運行,必須等到另外一個任務(wù)結(jié)束了把鎖釋放了才能獲取鎖然后執(zhí)行,因為跨JVM我們需要一個第三方系統(tǒng)來協(xié)助實現(xiàn)分布式鎖,一般我們可以用
數(shù)據(jù)庫,redis,zookeeper,etcd等來實現(xiàn).
要實現(xiàn)一把分布式鎖,我們需要先分析下這把鎖有哪些特性
1.在分布式集群中,也就是不同的JVM中,相互有沖突的方法,可以是不同JVM相同實例內(nèi)的同一個方法,也可以是不同方法,也就是不同業(yè)務(wù)間的隔離和同一個業(yè)務(wù)操作不能并行運行,而分布式鎖需要保證這兩個方法在同一時間只能有一個運行.
2.這把鎖最好是可重入的,因為不可重入的鎖很容易出現(xiàn)死鎖
3.獲取鎖和釋放鎖的性能要很高
4.支持獲取鎖的時候可以阻塞等待,以及等待時間
5.獲取鎖后支持設(shè)置一個期限,超過這個期限可以自動釋放,防止程序沒有自己釋放的情況
6.這是一把輕量鎖,對業(yè)務(wù)侵入小
7.易用
數(shù)據(jù)庫實現(xiàn)分布式鎖
由于數(shù)據(jù)庫的鎖無能是在性能高可用上都不及其他方式,這里我們簡單介紹下可能的方案
- 1.獲取鎖的時候,往數(shù)據(jù)庫里插入一條記錄,可以根據(jù)方法名作唯一鍵約束,其他線程獲取鎖的時候無法插入所以會等待,釋放鎖的時候刪除,這種方式不支持可重入
- 2.根據(jù)數(shù)據(jù)庫的排他鎖 for update實現(xiàn),當(dāng)commit的時候釋放,這種方式如果鎖不釋放就會一直占有一個connection,而且加鎖導(dǎo)致性能低
- 3.將每一個鎖作為表里的一條記錄,這個記錄加一個狀態(tài),每次獲取鎖的時候都update status = 1 where status = -1,這種類似CAS的方式可以解決排他鎖性能低.但是mysql是一個單點,而且和業(yè)務(wù)系統(tǒng)關(guān)聯(lián),因為兩個業(yè)務(wù)方可能屬于不同系統(tǒng)不同數(shù)據(jù)庫,如果做到不和業(yè)務(wù)關(guān)聯(lián)還需要增加一次RPC請求,將鎖業(yè)務(wù)抽為一個單獨系統(tǒng),不夠輕量
redis的分布式鎖
SET resource_name my_random_value NX PX 30000
- SET NX 只會在key不存在的時候給key賦值,當(dāng)多個進程同時爭搶鎖資源的時候,會下發(fā)多個SET NX只會有一個返回成功,并且SET NX對外是一個原子操作
- PX 設(shè)置過期時間,代表這個key的存活時間,也就是獲取到的鎖只會占有這么長,超過這個時間將會自動釋放
- my_random_value 一般是全局唯一值,這個隨機數(shù)一般可以用時間戳加隨機數(shù),這種方式在多機器實例上可能不唯一,如果需要保證絕對唯一可以采用UUID,但是性能會有影響,這個值的用途會在鎖釋放的時候用到
我們可以看看下面獲取分布式鎖的使用場景,假設(shè)我們釋放鎖,直接del這個key
if (!redisComponent.acquireLock(lockKey) {
LOGGER.warn(">>分布式并發(fā)鎖獲取失敗");
return ;
}
try {
// do business ...
} catch (BusinessException e) {
// exception handler ...
} finally {
redisComponent.releaseLock(lockKey);
}
- 1.進程A獲取到鎖,超時時間為1分鐘
- 2.1分鐘時間到,進程A還沒有處理完,鎖自動釋放了
- 3.進程B獲取到鎖,開始進行業(yè)務(wù)處理
- 4.進程A處理結(jié)束,釋放鎖,這個時候?qū)⑦M程B獲取到的鎖釋放了
- 5.進程C獲取到鎖,開始業(yè)務(wù)處理,進程B還沒有處理結(jié)束,結(jié)果B和C開始并行處理,發(fā)生并發(fā)
為了解決以上問題,我們可以在釋放鎖的時候,判斷下鎖是否存在,這樣進程A在釋放鎖的時候就不會將進程B加的鎖釋放了,
或者通過以下方式,將過期時間做為value存儲在對應(yīng)的key中,釋放鎖的時候,判斷當(dāng)前時間是否小于過期時間,只有小于當(dāng)前時間才處理,我們也可以在進行del操作的時候判斷下對應(yīng)的value是否相等,這個時候就需要在del操作的時候傳人
my_random_value
下面我們看下redis實現(xiàn)分布式鎖java代碼實現(xiàn),我們采用在del的時候判斷下當(dāng)前時間是否小于過期時間
public boolean acquireLock(String lockKey, long expired) {
ShardedJedis jedis = null;
try {
jedis = pool.getResource();
String value = String.valueOf(System.currentTimeMillis() + expired + 1);
int tryTimes = 0;
while (tryTimes++ < 3) {
/*
* 1. 嘗試鎖
* setnx : set if not exist
*/
if (jedis.setnx(lockKey, value).equals(1L)) {
return true;
}
/*
* 2. 已經(jīng)被別的線程鎖住,判斷是否失效
*/
String oldValue = jedis.get(lockKey);
if (StringUtils.isBlank(oldValue)) {
/*
* 2.1 value存的是超時時間努咐,如果為空有2種情況
* 1. 異常數(shù)據(jù)坤检,沒有value 或者 value為空字符
* 2. 鎖恰好被別的線程釋放了
* 此時需要嘗試重新嘗試空扎,為了避免出現(xiàn)情況1時導(dǎo)致死循環(huán)叫挟,只重試3次
*/
continue;
}
Long oldValueL = Long.valueOf(oldValue);
if (oldValueL < System.currentTimeMillis()) {
/*
* 已超時双炕,重新嘗試鎖
*
* Redis:getSet 操作步驟:
* 1.獲取 Key 對應(yīng)的 Value 作為返回值踱蠢,不存在時返回null
* 2.設(shè)置 Key 對應(yīng)的 Value 為傳入的值
* 這里如果返回的 getValue != oldValue 表示已經(jīng)被其它線程重新修改了
*/
String getValue = jedis.getSet(lockKey, value);
return oldValue.equals(getValue);
} else {
// 未超時袖肥,則直接返回失敗
return false;
}
}
return false;
} catch (Throwable e) {
logger.error("acquireLock error", e);
return false;
} finally {
returnResource(jedis);
}
}
/**
* 釋放鎖
*
* @param lockKey
* key
*/
public void releaseLock(String lockKey) {
ShardedJedis jedis = null;
try {
jedis = pool.getResource();
long current = System.currentTimeMillis();
// 避免刪除非自己獲取到的鎖
String value = jedis.get(lockKey);
if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) {
jedis.del(lockKey);
}
} catch (Throwable e) {
logger.error("releaseLock error", e);
} finally {
returnResource(jedis);
}
}
這種方式?jīng)]有用到剛剛說的my_random_value,我們看下如果我們按以下代碼獲取鎖會有什么問題
if (!redisComponent.acquireLock(lockKey) {
LOGGER.warn(">>分布式并發(fā)鎖獲取失敗");
return ;
}
try {
boolean locked = redisComponent.acquireLock(lockKey);
if(locked)
// do business ...
} catch (BusinessException e) {
// exception handler ...
} finally {
redisComponent.releaseLock(lockKey);
}
同樣這種方式當(dāng)進程A沒有獲取到鎖,之后進程B獲取到鎖,進程A會釋放進程B的鎖,這個時候我們可以借助my_random_value來實現(xiàn)
/**
* 釋放鎖
*
* @param lockKey ,value
*/
public void releaseLock(String lockKey, long oldvalue) {
ShardedJedis jedis = null;
try {
jedis = pool.getResource();
String value = jedis.get(lockKey);
if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) {
jedis.del(lockKey);
}
} catch (Throwable e) {
logger.error("releaseLock error", e);
} finally {
returnResource(jedis);
}
}
這種方式需要保存之前獲取鎖時候的value值,并在釋放鎖的帶上value值,不過這種實現(xiàn)方式,value的值為過期時間也不唯一
由于我們用了redis得超時機制來釋放鎖,那么當(dāng)進程在鎖租約到期后還沒有執(zhí)行結(jié)束,那么其他進程獲取到鎖后則會產(chǎn)生并發(fā)寫的情況,這種如果業(yè)務(wù)上需要精確控制,只能用樂觀鎖來控制了,每次寫入數(shù)據(jù)都帶一個鎖的版本,如果下次獲取鎖的時候版本加1,這樣上面那種情況,鎖到期釋放了新的進程獲取到鎖后會使用新的版本號,之前的進程鎖已經(jīng)釋放了如果繼續(xù)使用該鎖則會發(fā)現(xiàn)版本已經(jīng)不對了
zookeeper實現(xiàn)分布式鎖
可以借助zookeeper的順序節(jié)點,在一個父節(jié)點下,所有需要爭搶鎖的資源都去這個目錄下創(chuàng)建一個順序節(jié)點,然后判斷這個臨時順序節(jié)點是否是兄弟節(jié)點中順序最小的,如果是最小的則獲取到鎖,如果不是則監(jiān)聽這個順序最小的節(jié)點的刪除事件,然后在繼續(xù)根據(jù)這個流程獲取最小節(jié)點
public void lock() {
try {
// 創(chuàng)建臨時子節(jié)點
String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));
// 取出所有子節(jié)點
List<String> subNodes = zk.getChildren(root, false);
TreeSet<String> sortedNodes = new TreeSet<>();
for(String node :subNodes) {
sortedNodes.add(root +"/" +node);
}
String smallNode = sortedNodes.first();
String preNode = sortedNodes.lower(myNode);
if (myNode.equals( smallNode)) {
// 如果是最小的節(jié)點,則表示取得鎖
System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
this.nodeId.set(myNode);
return;
}
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同時注冊監(jiān)聽咪辱。
// 判斷比自己小一個數(shù)的節(jié)點是否存在,如果不存在則無需等待鎖,同時注冊監(jiān)聽
if (stat != null) {
System.out.println(j.join(Thread.currentThread().getName(), myNode,
" waiting for " + root + "/" + preNode + " released lock"));
latch.await();// 等待,這里應(yīng)該一直等待其他線程釋放鎖
nodeId.set(myNode);
latch = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void unlock() {
try {
System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
if (null != nodeId) {
zk.delete(nodeId.get(), -1);
}
nodeId.remove();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
當(dāng)然如果我們開發(fā)環(huán)境使用的是etcs也可以用etcd來實現(xiàn)分布式鎖,原理和zookeeper類似