鎖是開發(fā)過程中十分常見的工具假消,在處理高并發(fā)請求的時候和訂單數(shù)據(jù)的時候往往需要鎖來幫助我們保證數(shù)據(jù)的安全萍嬉。
場景1.前端點(diǎn)擊太快揽乱,導(dǎo)致后端重復(fù)調(diào)用接口。兩次調(diào)用一個接口弦讽,這樣就會產(chǎn)生同一個請求執(zhí)行了兩次污尉,而從用戶的角度出發(fā),他是因?yàn)樘ǘc(diǎn)了兩次往产,他的目標(biāo)是執(zhí)行一次請求被碗。
場景2.對于高并發(fā)場景,我們往往需要引入分布式緩存仿村,來加快整個系統(tǒng)的響應(yīng)速度锐朴。但是緩存是有失效機(jī)制的,如果某一時刻緩存失效奠宜,而此時有大量的請求過來包颁,那么所有的請求會瞬間直接打到DB上,那么這么大的并發(fā)量压真,DB可能是扛不住的娩嚼。那么這里需要引入一個保護(hù)機(jī)制。當(dāng)發(fā)生“緩存擊穿”的時候加鎖滴肿,從而保護(hù)DB不被拖垮岳悟。
看完了上面的場景,其實(shí)分布式鎖的場景一直在我們身邊泼差。說分布式鎖之前贵少,應(yīng)該先說一下java提供的鎖,比較能單機(jī)解決的并發(fā)問題堆缘,沒必要引入分布式的解決方案滔灶。
java提供了兩種內(nèi)置的鎖的實(shí)現(xiàn),一種是由JVM實(shí)現(xiàn)的synchronized和JDK提供的Lock吼肥,當(dāng)你的應(yīng)用是單機(jī)或者說單進(jìn)程應(yīng)用時录平,可以使用synchronized或Lock來實(shí)現(xiàn)鎖。
但是缀皱,當(dāng)你的應(yīng)用涉及到多機(jī)斗这、多進(jìn)程共同完成時,例如現(xiàn)在的互聯(lián)網(wǎng)架構(gòu)啤斗,一般都是分布式的RPC框架來支撐表箭,那么這樣你的Server有多個,由于負(fù)載均衡的路由規(guī)則隨機(jī)钮莲,相同的請求可能會打到不同的Server上進(jìn)行處理免钻,那么這時候就需要一個全局鎖來實(shí)現(xiàn)多個線程(不同的進(jìn)程)之間的同步彼水。
實(shí)現(xiàn)全局的鎖需要依賴一個第三方系統(tǒng),此系統(tǒng)需要滿足高可用伯襟、一致性比較強(qiáng)同時能應(yīng)付高并發(fā)的請求猿涨。
常見的處理辦法有三種:數(shù)據(jù)庫、緩存姆怪、分布式協(xié)調(diào)系統(tǒng)叛赚。數(shù)據(jù)庫和緩存是比較常用的,但是分布式協(xié)調(diào)系統(tǒng)是不常用的稽揭。
數(shù)據(jù)庫實(shí)現(xiàn)分布式鎖####
利用DB來實(shí)現(xiàn)分布式鎖俺附,有兩種方案。兩種方案各有好壞溪掀,但是總體效果都不是很好事镣。但是實(shí)現(xiàn)還是比較簡單的。
- 利用主鍵唯一規(guī)則:
我們知道數(shù)據(jù)庫是有唯一主鍵規(guī)則的揪胃,主鍵不能重復(fù)璃哟,對于重復(fù)的主鍵會拋出主鍵沖突異常。
了解JDK reentrantlock的人都知道喊递,reentrantlock是利用了OS的CAS特性實(shí)現(xiàn)的鎖随闪。主要是維護(hù)一個全局的狀態(tài),每次競爭鎖都會CAS修改鎖的狀態(tài)骚勘,修改成功之后就占用了鎖铐伴,失敗的加入到同步隊(duì)列中,等待喚醒俏讹。
其實(shí)這和分布式鎖實(shí)現(xiàn)方案基本是一致的当宴,首先我們利用主鍵唯一規(guī)則,在爭搶鎖的時候向DB中寫一條記錄泽疆,這條記錄主要包含鎖的id户矢、當(dāng)前占用鎖的線程名、重入的次數(shù)和創(chuàng)建時間等殉疼,如果插入成功表示當(dāng)前線程獲取到了鎖梯浪,如果插入失敗那么證明鎖被其他人占用,等待一會兒繼續(xù)爭搶株依,直到爭搶到或者超時為止。
這里我主要寫了一個簡單的實(shí)現(xiàn):
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 利用mysql實(shí)現(xiàn)可重入分布式鎖
*/
public class MysqlprimaryLock {
private static Connection connection;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://10.0.0.212:3308/dbwww_lock?user=lock_admin&password=lock123";
try {
connection = DriverManager.getConnection(url);
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* 加鎖
* @param lockID
*/
public void lock(String lockID) {
acquire(lockID);
}
/**
* 獲取鎖
* @param lockID
* @return
*/
public boolean acquire(String lockID) {
String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
while (true) {
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();//如果成功延窜,那么就是獲取到了鎖
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
}
/**
* 超時獲取鎖
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquire(String lockID, long timeOuts) throws InterruptedException {
String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();//如果成功恋腕,那么就是獲取到了鎖
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
latch.await(timerange, TimeUnit.MILLISECONDS);
ranmain = futureTime - System.currentTimeMillis();
if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}
continue;
}
return false;
}
/**
* 釋放鎖
* @param lockID
* @return
* @throws SQLException
*/
public boolean unlock(String lockID) throws SQLException {
String sql = "DELETE from test_lock where id = ?";
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
boolean ifsucess = statement.execute();
if (ifsucess)
return true;
return false;
}
}
這里是利用主鍵沖突規(guī)則,加入了id','count','thName','addtime'逆瑞,count主要是為了重入計數(shù)荠藤,thName為了判斷占用鎖的線程伙单,addtime是記錄占用時間。上面代碼沒有實(shí)現(xiàn)重入的邏輯哈肖。
重入主要實(shí)現(xiàn)思路是吻育,在每次獲取鎖之前去取當(dāng)前鎖的信息,如果鎖的線程是當(dāng)前線程淤井,那么更新鎖的count+1布疼,并且執(zhí)行鎖之后的邏輯。如果不是當(dāng)前鎖币狠,那么進(jìn)行重試游两。釋放的時候也要進(jìn)行count-1,最后減到0時漩绵,刪除鎖標(biāo)識釋放鎖贱案。
優(yōu)點(diǎn):實(shí)現(xiàn)簡單
缺點(diǎn):沒有超時保護(hù)機(jī)制,mysql存在單點(diǎn)止吐,并發(fā)量大的時候請求量太大宝踪、沒有線程喚醒機(jī)制,用異常去控制邏輯多少優(yōu)點(diǎn)惡心碍扔。
對于超時保護(hù):如果可能瘩燥,可以采用定時任務(wù)去掃描超過一定閾值的鎖,并刪除蕴忆。但是也會存在颤芬,鎖住的任務(wù)執(zhí)行時間很長,刪除鎖會導(dǎo)致并發(fā)問題套鹅。所以需要對超時時間有一個很好的預(yù)估站蝠。
對于單點(diǎn)問題:有條件可以搞一個主從,但是為了一個鎖來搞一個主從是不是優(yōu)點(diǎn)浪費(fèi)卓鹿?同時主從切換的時候系統(tǒng)不可用菱魔,這也是一個問題。
并發(fā)量大的時候請求量太大:因?yàn)檫@種實(shí)現(xiàn)方式是沒有鎖的喚醒機(jī)制的吟孙,不像reentrantlock在同步隊(duì)列中的節(jié)點(diǎn)澜倦,可以通過喚醒來避免多次的循環(huán)請求。但是分布式環(huán)境數(shù)據(jù)庫這種鎖的實(shí)現(xiàn)是不能做到喚醒的杰妓。所以只能將獲取鎖的時間間隔調(diào)高藻治,避免死循環(huán)給系統(tǒng)和DB帶來的巨大壓力。這樣也犧牲了系統(tǒng)的吞吐量巷挥,因?yàn)榭倳幸欢ǖ拈g隔鎖是空閑的桩卵。
用異常去控制邏輯多少優(yōu)點(diǎn)惡心:就不說了,每次失敗都拋異常.....
- 利用Mysql行鎖的特性:
Mysql是有表鎖、頁鎖和行鎖的機(jī)制的雏节,可以利用這個機(jī)制來實(shí)現(xiàn)鎖胜嗓。這里盡量使用行鎖,它的吞吐量是最高的钩乍。
/**
* 超時獲取鎖
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {
String sql = "SELECT id from test_lock where id = ? for UPDATE ";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
connection.setAutoCommit(false);
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();//如果成功辞州,那么就是獲取到了鎖
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
latch.await(timerange, TimeUnit.MILLISECONDS);
ranmain = futureTime - System.currentTimeMillis();
if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}
continue;
}
return false;
}
/**
* 釋放鎖
* @param lockID
* @return
* @throws SQLException
*/
public void unlockforUpdtate(String lockID) throws SQLException {
connection.commit();
}
利用for update加顯式的行鎖,這樣就能利用這個行級的排他鎖來實(shí)現(xiàn)分布式鎖了寥粹,同時unlock的時候只要釋放commit這個事務(wù)变过,就能達(dá)到釋放鎖的目的。
優(yōu)點(diǎn):實(shí)現(xiàn)簡單
缺點(diǎn):連接池爆滿和事務(wù)超時的問題單點(diǎn)的問題排作,單點(diǎn)問題牵啦,行鎖升級為表鎖的問題,并發(fā)量大的時候請求量太大妄痪、沒有線程喚醒機(jī)制哈雏。
連接池爆滿和事務(wù)超時的問題單點(diǎn)的問題:利用事務(wù)進(jìn)行加鎖的時候,query需要占用數(shù)據(jù)庫連接衫生,在行鎖的時候連接不釋放裳瘪,這就會導(dǎo)致連接池爆滿。同時由于事務(wù)是有超時時間的罪针,過了超時時間自動回滾彭羹,會導(dǎo)致鎖的釋放,這個超時時間要把控好泪酱。
對于單點(diǎn)問題:同上派殷。
并發(fā)量大的時候請求量太大:同上。
行鎖升級為表鎖的問題:Mysql行鎖默認(rèn)需要走索引墓阀,如果不走索引會導(dǎo)致鎖表毡惜,如果可以,在sql中可以強(qiáng)制指定索引斯撮。
緩存分布式鎖####
緩存實(shí)現(xiàn)分布式鎖還是比較常見的经伙,因?yàn)榫彺姹容^輕量,并且緩存的響應(yīng)快勿锅、吞吐高帕膜。最重要的是還有自動失效的機(jī)制來保證鎖一定能釋放。
緩存的分布式鎖主要通過Redis實(shí)現(xiàn)溢十,當(dāng)然其他的緩存也是可以的垮刹。關(guān)于緩存有兩種實(shí)現(xiàn)吧:
- 基于SetNX實(shí)現(xiàn):
setNX是Redis提供的一個原子操作,如果指定key存在张弛,那么setNX失敗荒典,如果不存在會進(jìn)行Set操作并返回成功宗挥。我們可以利用這個來實(shí)現(xiàn)一個分布式的鎖,主要思路就是种蝶,set成功表示獲取鎖,set失敗表示獲取失敗瞒大,失敗后需要重試。
具體看下代碼:
import redis.clients.jedis.Jedis;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Redis分布式鎖
*/
public class RedisLockTest {
private Jedis jedisCli = new Jedis("localhost",6381);
private int expireTime = 1;
/**
* 獲取鎖
* @param lockID
* @return
*/
public boolean lock(String lockID){
while(true){
long returnFlag = jedisCli.setnx(lockID,"1");
if (returnFlag == 1){
System.out.println(Thread.currentThread().getName() + " get lock....");
return true;
}
System.out.println(Thread.currentThread().getName() + " is trying lock....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
}
/**
* 超時獲取鎖
* @param lockID
* @param timeOuts
* @return
*/
public boolean lock(String lockID,long timeOuts){
long current = System.currentTimeMillis();
long future = current + timeOuts;
long timeStep = 500;
CountDownLatch latch = new CountDownLatch(1);
while(future > current){
long returnFlag = jedisCli.setnx(lockID,"1");
if (returnFlag == 1){
System.out.println(Thread.currentThread().getName() + " get lock....");
jedisCli.expire(lockID,expireTime);
return true;
}
System.out.println(Thread.currentThread().getName() + " is trying lock....");
try {
latch.await(timeStep, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
current = current + timeStep;
}
return false;
}
public void unlock(String lockId){
long flag = jedisCli.del(lockId);
if (flag>0){
System.out.println(Thread.currentThread().getName() + " release lock....");
}else {
System.out.println(Thread.currentThread().getName() + " release lock fail....");
}
}
/**
* 線程工廠,命名線程
*/
public static class MyThreadFactory implements ThreadFactory{
public static AtomicInteger count = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
count.getAndIncrement();
Thread thread = new Thread(r);
thread.setName("Thread-lock-test "+count);
return thread;
}
}
public static void main(String args[]){
final String lockID = "test1";
Runnable task = () ->{
RedisLockTest testCli = new RedisLockTest();
testCli.lock(lockID);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
testCli.unlock(lockID);
};
MyThreadFactory factory = new MyThreadFactory();
ExecutorService services = Executors.newFixedThreadPool(10);
for (int i = 0;i<3;i++)
services.execute(factory.newThread(task));
}
}
看看結(jié)果:
pool-1-thread-3 is trying lock....
pool-1-thread-2 get lock....
pool-1-thread-1 is trying lock....
pool-1-thread-3 is trying lock....
pool-1-thread-2 release lock....
pool-1-thread-1 get lock....
pool-1-thread-3 is trying lock....
pool-1-thread-1 release lock....
pool-1-thread-3 get lock....
pool-1-thread-3 release lock....
可以看到,幾個線程很好的進(jìn)行了同步阅畴。
這種方式也是有優(yōu)點(diǎn)和缺點(diǎn):
優(yōu)點(diǎn):實(shí)現(xiàn)簡單定页,吞吐量十分客觀,對于高并發(fā)情況應(yīng)付自如酗电,自帶超時保護(hù)魄藕,對于網(wǎng)絡(luò)抖動的情況也可以利用超時刪除策略保證不會阻塞所有流程。
缺點(diǎn):單點(diǎn)問題撵术、沒有線程喚醒機(jī)制背率、網(wǎng)絡(luò)抖動可能會引起鎖刪除失敗。
對單點(diǎn)問題:因?yàn)閞edis一般都是單實(shí)例使用嫩与,那么對于單點(diǎn)問題寝姿,可以做一個主從。當(dāng)然主從切換的時候也是不可用的划滋,因?yàn)橹鲝耐绞钱惒降亩赡軙l(fā)問題。如果對于主從還是不能保證可靠性的話处坪,可以上Redis集群根资,對于Redis集群,因?yàn)槭褂昧祟愐恢滦訦ash算法同窘,雖然不能避免節(jié)點(diǎn)下線的并發(fā)問題(當(dāng)前的任務(wù)沒有執(zhí)行完玄帕,其他任務(wù)就開始執(zhí)行),但是能保證Redis是可用的塞椎。可用性的問題是出了問題之后的備選方案桨仿,如果我們系統(tǒng)天天都出問題還玩毛啊,對于突發(fā)情況犧牲一兩個請求還是沒問題的案狠。
對于線程喚醒機(jī)制:分布式鎖大多都是這樣輪訓(xùn)獲取鎖的服傍,所以控制住你的重試頻率,也不會導(dǎo)致負(fù)載特別高的骂铁〈盗悖可能就是吞吐量低點(diǎn)而已。
對于鎖刪除失斃帧:分布式鎖基本都有這個問題灿椅,可以對key設(shè)置失效時間。這個超時時間需要把控好,過大那么系統(tǒng)吞吐量低茫蛹,很容易導(dǎo)致超時操刀。如果過小那么會有并發(fā)問題,部分耗時時間比較長的任務(wù)就要遭殃了婴洼。
基于Zookeeper的分布式鎖####
Zookeeper是一個分布式一致性協(xié)調(diào)框架骨坑,主要可以實(shí)現(xiàn)選主、配置管理和分布式鎖等常用功能柬采,因?yàn)閆ookeeper的寫入都是順序的欢唾,在一個節(jié)點(diǎn)創(chuàng)建之后,其他請求再次創(chuàng)建便會失敗粉捻,同時可以對這個節(jié)點(diǎn)進(jìn)行Watch礁遣,如果節(jié)點(diǎn)刪除會通知其他節(jié)點(diǎn)搶占鎖。
Zookeeper實(shí)現(xiàn)分布式鎖雖然是比較重量級的肩刃,但實(shí)現(xiàn)的鎖功能十分健全祟霍,由于Zookeeper本身需要維護(hù)自己的一致性,所以性能上較Redis還是有一定差距的盈包。
Zookeeper實(shí)現(xiàn)分布式鎖有幾種形式浅碾,后面會單獨(dú)的總結(jié)一下。
對比:####
Mysql實(shí)現(xiàn)比較簡單续语,不需要引入第三個應(yīng)用垂谢,但實(shí)現(xiàn)多少有些重,性能不是很好疮茄。
Redis的話實(shí)現(xiàn)比較簡單滥朱,同時性能很好,引入集群可以提高可用性力试。同時定期失效的機(jī)制可以解決因網(wǎng)絡(luò)抖動鎖刪除失敗的問題徙邻,所以我比較傾向Redis實(shí)現(xiàn)。
Zookeeper實(shí)現(xiàn)是有些重的畸裳,同時我們還需要維護(hù)Zookeeper集群缰犁,實(shí)現(xiàn)起來還是比較復(fù)雜的,實(shí)現(xiàn)不好的話還會引起“羊群效應(yīng)”怖糊。如果不是原有系統(tǒng)就依賴Zookeeper帅容,同時壓力不大的情況下。一般不使用Zookeeper實(shí)現(xiàn)分布式鎖伍伤。