如何基于redis分布式鎖實現(xiàn)“秒殺”

業(yè)務(wù)場景

所謂秒殺蕉汪,從業(yè)務(wù)角度看,是短時間內(nèi)多個用戶“爭搶”資源逞怨,這里的資源在大部分秒殺場景里是商品者疤;將業(yè)務(wù)抽象,技術(shù)角度看叠赦,秒殺就是多個線程對資源進(jìn)行操作驹马,所以實現(xiàn)秒殺除秀,就必須控制線程對資源的爭搶糯累,既要保證高效并發(fā),也要保證操作的正確册踩。

一些可能的實現(xiàn)

剛才提到過泳姐,實現(xiàn)秒殺的關(guān)鍵點是控制線程對資源的爭搶,根據(jù)基本的線程知識暂吉,可以不加思索的想到下面的一些方法:

1胖秒、秒殺在技術(shù)層面的抽象應(yīng)該就是一個方法,在這個方法里可能的操作是將商品庫存-1慕的,將商品加入用戶的購物車等等阎肝,在不考慮緩存的情況下應(yīng)該是要操作數(shù)據(jù)庫的。那么最簡單直接的實現(xiàn)就是在這個方法上加上synchronized關(guān)鍵字肮街,通俗的講就是鎖住整個方法风题;

2、鎖住整個方法這個策略簡單方便嫉父,但是似乎有點粗暴沛硅。可以稍微優(yōu)化一下熔号,只鎖住秒殺的代碼塊稽鞭,比如寫數(shù)據(jù)庫的部分;

3引镊、既然有并發(fā)問題朦蕴,那我就讓他“不并發(fā)”,將所有的線程用一個隊列管理起來弟头,使之變成串行操作吩抓,自然不會有并發(fā)問題。

上面所述的方法都是有效的赴恨,但是都不好疹娶。為什么?第一和第二種方法本質(zhì)上是“加鎖”伦连,但是鎖粒度依然比較高雨饺。什么意思钳垮?試想一下,如果兩個線程同時執(zhí)行秒殺方法额港,這兩個線程操作的是不同的商品,從業(yè)務(wù)上講應(yīng)該是可以同時進(jìn)行的饺窿,但是如果采用第一二種方法,這兩個線程也會去爭搶同一個鎖移斩,這其實是不必要的肚医。第三種方法也沒有解決上面說的問題。

那么如何將鎖控制在更細(xì)的粒度上呢向瓷?可以考慮為每個商品設(shè)置一個互斥鎖肠套,以和商品ID相關(guān)的字符串為唯一標(biāo)識,這樣就可以做到只有爭搶同一件商品的線程互斥猖任,不會導(dǎo)致所有的線程互斥你稚。分布式鎖恰好可以幫助我們解決這個問題。

何為分布式鎖

分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式朱躺。在分布式系統(tǒng)中入宦,常常需要協(xié)調(diào)他們的動作。如果不同的系統(tǒng)或是同一個系統(tǒng)的不同主機(jī)之間共享了一個或一組資源室琢,那么訪問這些資源的時候乾闰,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下盈滴,便需要使用到分布式鎖涯肩。

我們來假設(shè)一個最簡單的秒殺場景:數(shù)據(jù)庫里有一張表,column分別是商品ID巢钓,和商品ID對應(yīng)的庫存量病苗,秒殺成功就將此商品庫存量-1。現(xiàn)在假設(shè)有1000個線程來秒殺兩件商品症汹,500個線程秒殺第一個商品硫朦,500個線程秒殺第二個商品。我們來根據(jù)這個簡單的業(yè)務(wù)場景來解釋一下分布式鎖背镇。

通常具有秒殺場景的業(yè)務(wù)系統(tǒng)都比較復(fù)雜咬展,承載的業(yè)務(wù)量非常巨大,并發(fā)量也很高瞒斩。這樣的系統(tǒng)往往采用分布式的架構(gòu)來均衡負(fù)載破婆。那么這1000個并發(fā)就會是從不同的地方過來,商品庫存就是共享的資源胸囱,也是這1000個并發(fā)爭搶的資源祷舀,這個時候我們需要將并發(fā)互斥管理起來。這就是分布式鎖的應(yīng)用。

而key-value存儲系統(tǒng)裳扯,如redis抛丽,因為其一些特性,是實現(xiàn)分布式鎖的重要工具饰豺。

具體的實現(xiàn)

先來看看一些redis的基本命令:

SETNX key value

如果key不存在铺纽,就設(shè)置key對應(yīng)字符串value。在這種情況下哟忍,該命令和SET一樣。當(dāng)key已經(jīng)存在時陷寝,就不做任何操作锅很。SETNX是”SET if Not eXists”。

expire KEY seconds

設(shè)置key的過期時間凤跑。如果key已過期爆安,將會被自動刪除。

del KEY

刪除key

由于筆者的實現(xiàn)只用到這三個命令仔引,就只介紹這三個命令扔仓,更多的命令以及redis的特性和使用,可以參考redis官網(wǎng)咖耘。

需要考慮的問題

1翘簇、用什么操作redis?幸虧redis已經(jīng)提供了jedis客戶端用于java應(yīng)用程序儿倒,直接調(diào)用jedis API即可版保。

2、怎么實現(xiàn)加鎖夫否?“鎖”其實是一個抽象的概念彻犁,將這個抽象概念變?yōu)榫唧w的東西,就是一個存儲在redis里的key-value對凰慈,key是于商品ID相關(guān)的字符串來唯一標(biāo)識汞幢,value其實并不重要,因為只要這個唯一的key-value存在微谓,就表示這個商品已經(jīng)上鎖森篷。

3、如何釋放鎖豺型?既然key-value對存在就表示上鎖疾宏,那么釋放鎖就自然是在redis里刪除key-value對。

4触创、阻塞還是非阻塞坎藐?筆者采用了阻塞式的實現(xiàn),若線程發(fā)現(xiàn)已經(jīng)上鎖,會在特定時間內(nèi)輪詢鎖岩馍。

5碉咆、如何處理異常情況?比如一個線程把一個商品上了鎖蛀恩,但是由于各種原因疫铜,沒有完成操作(在上面的業(yè)務(wù)場景里就是沒有將庫存-1寫入數(shù)據(jù)庫),自然沒有釋放鎖双谆,這個情況筆者加入了鎖超時機(jī)制壳咕,利用redis的expire命令為key設(shè)置超時時長,過了超時時間redis就會將這個key自動刪除顽馋,即強制釋放鎖(可以認(rèn)為超時釋放鎖是一個異步操作谓厘,由redis完成,應(yīng)用程序只需要根據(jù)系統(tǒng)特點設(shè)置超時時間即可)寸谜。

talk is cheap,show me the code

在代碼實現(xiàn)層面竟稳,注解有并發(fā)的方法和參數(shù),通過動態(tài)代理獲取注解的方法和參數(shù)熊痴,在代理中加鎖他爸,執(zhí)行完被代理的方法后釋放鎖。

幾個注解定義:

cachelock是方法級的注解果善,用于注解會產(chǎn)生并發(fā)問題的方法:

@Target(ElementType.METHOD)

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface CacheLock {

String lockedPrefix() default "";//redis 鎖key的前綴

long timeOut() default 2000;//輪詢鎖的時間

int expireTime() default 1000;//key在redis里存在的時間诊笤,1000S

}

lockedObject是參數(shù)級的注解,用于注解商品ID等基本類型的參數(shù):

@Target(ElementType.PARAMETER)

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface LockedObject {

//不需要值

}

LockedComplexObject也是參數(shù)級的注解巾陕,用于注解自定義類型的參數(shù):

@Target(ElementType.PARAMETER)

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface LockedComplexObject {

String field() default "";//含有成員變量的復(fù)雜對象中需要加鎖的成員變量盏混,如一個商品對象的商品ID

}

CacheLockInterceptor實現(xiàn)InvocationHandler接口,在invoke方法中獲取注解的方法和參數(shù)惜论,在執(zhí)行注解的方法前加鎖许赃,執(zhí)行被注解的方法后釋放鎖:

public class CacheLockInterceptor implements InvocationHandler{

public static int ERROR_COUNT = 0;

private Object proxied;

public CacheLockInterceptor(Object proxied) {

this.proxied = proxied;

}

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

CacheLock cacheLock = method.getAnnotation(CacheLock.class);

//沒有cacheLock注解,pass

if(null == cacheLock){

System.out.println("no cacheLock annotation");

return method.invoke(proxied, args);

}

//獲得方法中參數(shù)的注解

Annotation[][] annotations = method.getParameterAnnotations();

//根據(jù)獲取到的參數(shù)注解和參數(shù)列表獲得加鎖的參數(shù)

Object lockedObject = getLockedObject(annotations,args);

String objectValue = lockedObject.toString();

//新建一個鎖

RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);

//加鎖

boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());

if(!result){//取鎖失敗

ERROR_COUNT += 1;

throw new CacheLockException("get lock fail");

}

try{

//加鎖成功馆类,執(zhí)行方法

return method.invoke(proxied, args);

}finally{

lock.unlock();//釋放鎖

}

}

/**

*

* @param annotations

* @param args

* @return

* @throws CacheLockException

*/

private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{

if(null == args || args.length == 0){

throw new CacheLockException("方法參數(shù)為空混聊,沒有被鎖定的對象");

}

if(null == annotations || annotations.length == 0){

throw new CacheLockException("沒有被注解的參數(shù)");

}

//不支持多個參數(shù)加鎖,只支持第一個注解為lockedObject或者lockedComplexObject的參數(shù)

int index = -1;//標(biāo)記參數(shù)的位置指針

for(int i = 0;i < annotations.length;i++){

for(int j = 0;j < annotations[i].length;j++){

if(annotations[i][j] instanceof LockedComplexObject){//注解為LockedComplexObject

index = i;

try {

return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());

} catch (NoSuchFieldException | SecurityException e) {

throw new CacheLockException("注解對象中沒有該屬性" + ((LockedComplexObject)annotations[i][j]).field());

}

}

if(annotations[i][j] instanceof LockedObject){

index = i;

break;

}

}

//找到第一個后直接break乾巧,不支持多參數(shù)加鎖

if(index != -1){

break;

}

}

if(index == -1){

throw new CacheLockException("請指定被鎖定參數(shù)");

}

return args[index];

}

}

最關(guān)鍵的RedisLock類中的lock方法和unlock方法:

/**

* 加鎖

* 使用方式為:

* lock();

* try{

* executeMethod();

* }finally{

* unlock();

* }

* @param timeout timeout的時間范圍內(nèi)輪詢鎖

* @param expire 設(shè)置鎖超時時間

* @return 成功 or 失敗

*/

public boolean lock(long timeout,int expire){

long nanoTime = System.nanoTime();

timeout *= MILLI_NANO_TIME;

try {

//在timeout的時間范圍內(nèi)不斷輪詢鎖

while (System.nanoTime() - nanoTime < timeout) {

//鎖不存在的話句喜,設(shè)置鎖并設(shè)置鎖過期時間,即加鎖

if (this.redisClient.setnx(this.key, LOCKED) == 1) {

this.redisClient.expire(key, expire);//設(shè)置鎖過期時間是為了在沒有釋放

//鎖的情況下鎖過期后消失沟于,不會造成永久阻塞

this.lock = true;

return this.lock;

}

System.out.println("出現(xiàn)鎖等待");

//短暫休眠咳胃,避免可能的活鎖

Thread.sleep(3, RANDOM.nextInt(30));

}

} catch (Exception e) {

throw new RuntimeException("locking error",e);

}

return false;

}

public void unlock() {

try {

if(this.lock){

redisClient.delKey(key);//直接刪除

}

} catch (Throwable e) {

}

}

上述的代碼是框架性的代碼,現(xiàn)在來講解如何使用上面的簡單框架來寫一個秒殺函數(shù)旷太。

先定義一個接口展懈,接口里定義了一個秒殺方法:

public interface SeckillInterface {

/**

*現(xiàn)在暫時只支持在接口方法上注解

*/

//cacheLock注解可能產(chǎn)生并發(fā)的方法

@CacheLock(lockedPrefix="TEST_PREFIX")

public void secKill(String userID,@LockedObject Long commidityID);//最簡單的秒殺方法销睁,參數(shù)是用戶ID和商品ID〈嫜拢可能有多個線程爭搶一個商品冻记,所以商品ID加上LockedObject注解

}

上述SeckillInterface接口的實現(xiàn)類,即秒殺的具體實現(xiàn):

public class SecKillImpl implements SeckillInterface{

static Map<Long, Long> inventory ;

static{

inventory = new HashMap<>();

inventory.put(10000001L, 10000l);

inventory.put(10000002L, 10000l);

}

@Override

public void secKill(String arg1, Long arg2) {

//最簡單的秒殺来惧,這里僅作為demo示例

reduceInventory(arg2);

}

//模擬秒殺操作冗栗,姑且認(rèn)為一個秒殺就是將庫存減一,實際情景要復(fù)雜的多

public Long reduceInventory(Long commodityId){

inventory.put(commodityId,inventory.get(commodityId) - 1);

return inventory.get(commodityId);

}

}

模擬秒殺場景供搀,1000個線程來爭搶兩個商品:

@Test

public void testSecKill(){

int threadCount = 1000;

int splitPoint = 500;

CountDownLatch endCount = new CountDownLatch(threadCount);

CountDownLatch beginCount = new CountDownLatch(1);

SecKillImpl testClass = new SecKillImpl();

Thread[] threads = new Thread[threadCount];

//起500個線程隅居,秒殺第一個商品

for(int i= 0;i < splitPoint;i++){

threads[i] = new Thread(new Runnable() {

public void run() {

try {

//等待在一個信號量上,掛起

beginCount.await();

//用動態(tài)代理的方式調(diào)用secKill方法

SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),

new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));

proxy.secKill("test", commidityId1);

endCount.countDown();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

});

threads[i].start();

}

//再起500個線程葛虐,秒殺第二件商品

for(int i= splitPoint;i < threadCount;i++){

threads[i] = new Thread(new Runnable() {

public void run() {

try {

//等待在一個信號量上胎源,掛起

beginCount.await();

//用動態(tài)代理的方式調(diào)用secKill方法

SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),

new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));

proxy.secKill("test", commidityId2);

//testClass.testFunc("test", 10000001L);

endCount.countDown();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

});

threads[i].start();

}

long startTime = System.currentTimeMillis();

//主線程釋放開始信號量,并等待結(jié)束信號量挡闰,這樣做保證1000個線程做到完全同時執(zhí)行,保證測試的正確性

beginCount.countDown();

try {

//主線程等待結(jié)束信號量

endCount.await();

//觀察秒殺結(jié)果是否正確

System.out.println(SecKillImpl.inventory.get(commidityId1));

System.out.println(SecKillImpl.inventory.get(commidityId2));

System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);

System.out.println("total cost " + (System.currentTimeMillis() - startTime));

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

在正確的預(yù)想下掰盘,應(yīng)該每個商品的庫存都減少了500摄悯,在多次試驗后,實際情況符合預(yù)想愧捕。如果不采用鎖機(jī)制奢驯,會出現(xiàn)庫存減少499,498的情況次绘。

這里采用了動態(tài)代理的方法瘪阁,利用注解和反射機(jī)制得到分布式鎖ID,進(jìn)行加鎖和釋放鎖操作邮偎。當(dāng)然也可以直接在方法進(jìn)行這些操作管跺,采用動態(tài)代理也是為了能夠?qū)㈡i操作代碼集中在代理中,便于維護(hù)禾进。

通常秒殺場景發(fā)生在web項目中豁跑,可以考慮利用spring的AOP特性將鎖操作代碼置于切面中,當(dāng)然AOP本質(zhì)上也是動態(tài)代理泻云。

小結(jié)

這篇文章從業(yè)務(wù)場景出發(fā)艇拍,從抽象到實現(xiàn)闡述了如何利用redis實現(xiàn)分布式鎖,完成簡單的秒殺功能宠纯,也記錄了筆者思考的過程卸夕,希望能給閱讀到本篇文章的人一些啟發(fā)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婆瓜,一起剝皮案震驚了整個濱河市快集,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖碍讨,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件治力,死亡現(xiàn)場離奇詭異,居然都是意外死亡勃黍,警方通過查閱死者的電腦和手機(jī)宵统,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來覆获,“玉大人马澈,你說我怎么就攤上這事∨ⅲ” “怎么了痊班?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長摹量。 經(jīng)常有香客問我涤伐,道長,這世上最難降的妖魔是什么缨称? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任凝果,我火速辦了婚禮,結(jié)果婚禮上睦尽,老公的妹妹穿的比我還像新娘器净。我一直安慰自己,他們只是感情好当凡,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布山害。 她就那樣靜靜地躺著,像睡著了一般沿量。 火紅的嫁衣襯著肌膚如雪浪慌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天朴则,我揣著相機(jī)與錄音眷射,去河邊找鬼。 笑死佛掖,一個胖子當(dāng)著我的面吹牛妖碉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播芥被,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼欧宜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了拴魄?” 一聲冷哼從身側(cè)響起冗茸,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤席镀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后夏漱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體豪诲,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年挂绰,在試婚紗的時候發(fā)現(xiàn)自己被綠了屎篱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡葵蒂,死狀恐怖交播,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情践付,我是刑警寧澤秦士,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站永高,受9級特大地震影響隧土,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜命爬,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一曹傀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧遇骑,春花似錦卖毁、人聲如沸揖曾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽炭剪。三九已至练链,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間奴拦,已是汗流浹背媒鼓。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留错妖,地道東北人绿鸣。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像暂氯,于是被迫代替她去往敵國和親潮模。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容