業(yè)務(wù)場(chǎng)景
所謂秒殺顽频,從業(yè)務(wù)角度看藤肢,是短時(shí)間內(nèi)多個(gè)用戶“爭(zhēng)搶”資源,這里的資源在大部分秒殺場(chǎng)景里是商品糯景;將業(yè)務(wù)抽象嘁圈,技術(shù)角度看,秒殺就是多個(gè)線程對(duì)資源進(jìn)行操作蟀淮,所以實(shí)現(xiàn)秒殺最住,就必須控制線程對(duì)資源的爭(zhēng)搶,既要保證高效并發(fā)怠惶,也要保證操作的正確涨缚。
一些可能的實(shí)現(xiàn)
剛才提到過(guò),實(shí)現(xiàn)秒殺的關(guān)鍵點(diǎn)是控制線程對(duì)資源的爭(zhēng)搶策治,根據(jù)基本的線程知識(shí)脓魏,可以不加思索的想到下面的一些方法:
1、秒殺在技術(shù)層面的抽象應(yīng)該就是一個(gè)方法通惫,在這個(gè)方法里可能的操作是將商品庫(kù)存-1茂翔,將商品加入用戶的購(gòu)物車等等,在不考慮緩存的情況下應(yīng)該是要操作數(shù)據(jù)庫(kù)的履腋。那么最簡(jiǎn)單直接的實(shí)現(xiàn)就是在這個(gè)方法上加上synchronized關(guān)鍵字珊燎,通俗的講就是鎖住整個(gè)方法;
2遵湖、鎖住整個(gè)方法這個(gè)策略簡(jiǎn)單方便悔政,但是似乎有點(diǎn)粗暴⊙泳桑可以稍微優(yōu)化一下谋国,只鎖住秒殺的代碼塊,比如寫(xiě)數(shù)據(jù)庫(kù)的部分迁沫;
3芦瘾、既然有并發(fā)問(wèn)題闷盔,那我就讓他“不并發(fā)”,將所有的線程用一個(gè)隊(duì)列管理起來(lái)旅急,使之變成串行操作,自然不會(huì)有并發(fā)問(wèn)題牡整。
上面所述的方法都是有效的藐吮,但是都不好。為什么逃贝?第一和第二種方法本質(zhì)上是“加鎖”谣辞,但是鎖粒度依然比較高。什么意思沐扳?試想一下泥从,如果兩個(gè)線程同時(shí)執(zhí)行秒殺方法,這兩個(gè)線程操作的是不同的商品,從業(yè)務(wù)上講應(yīng)該是可以同時(shí)進(jìn)行的沪摄,但是如果采用第一二種方法躯嫉,這兩個(gè)線程也會(huì)去爭(zhēng)搶同一個(gè)鎖,這其實(shí)是不必要的杨拐。第三種方法也沒(méi)有解決上面說(shuō)的問(wèn)題祈餐。
那么如何將鎖控制在更細(xì)的粒度上呢?可以考慮為每個(gè)商品設(shè)置一個(gè)互斥鎖哄陶,以和商品ID相關(guān)的字符串為唯一標(biāo)識(shí)帆阳,這樣就可以做到只有爭(zhēng)搶同一件商品的線程互斥,不會(huì)導(dǎo)致所有的線程互斥屋吨。分布式鎖恰好可以幫助我們解決這個(gè)問(wèn)題蜒谤。
何為分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問(wèn)共享資源的一種方式。在分布式系統(tǒng)中至扰,常常需要協(xié)調(diào)他們的動(dòng)作鳍徽。如果不同的系統(tǒng)或是同一個(gè)系統(tǒng)的不同主機(jī)之間共享了一個(gè)或一組資源,那么訪問(wèn)這些資源的時(shí)候渊胸,往往需要互斥來(lái)防止彼此干擾來(lái)保證一致性旬盯,在這種情況下,便需要使用到分布式鎖翎猛。
我們來(lái)假設(shè)一個(gè)最簡(jiǎn)單的秒殺場(chǎng)景:數(shù)據(jù)庫(kù)里有一張表胖翰,column分別是商品ID,和商品ID對(duì)應(yīng)的庫(kù)存量切厘,秒殺成功就將此商品庫(kù)存量-1∪龋現(xiàn)在假設(shè)有1000個(gè)線程來(lái)秒殺兩件商品,500個(gè)線程秒殺第一個(gè)商品疫稿,500個(gè)線程秒殺第二個(gè)商品培他。我們來(lái)根據(jù)這個(gè)簡(jiǎn)單的業(yè)務(wù)場(chǎng)景來(lái)解釋一下分布式鎖鹃两。
通常具有秒殺場(chǎng)景的業(yè)務(wù)系統(tǒng)都比較復(fù)雜,承載的業(yè)務(wù)量非常巨大舀凛,并發(fā)量也很高俊扳。這樣的系統(tǒng)往往采用分布式的架構(gòu)來(lái)均衡負(fù)載。那么這1000個(gè)并發(fā)就會(huì)是從不同的地方過(guò)來(lái)猛遍,商品庫(kù)存就是共享的資源馋记,也是這1000個(gè)并發(fā)爭(zhēng)搶的資源,這個(gè)時(shí)候我們需要將并發(fā)互斥管理起來(lái)懊烤。這就是分布式鎖的應(yīng)用梯醒。
而key-value存儲(chǔ)系統(tǒng),如redis腌紧,因?yàn)槠湟恍┨匦匀紫埃菍?shí)現(xiàn)分布式鎖的重要工具。
具體的實(shí)現(xiàn)
先來(lái)看看一些redis的基本命令:
SETNX key value
如果key不存在壁肋,就設(shè)置key對(duì)應(yīng)字符串value号胚。在這種情況下,該命令和SET一樣墩划。當(dāng)key已經(jīng)存在時(shí)涕刚,就不做任何操作。SETNX是”SET if Not eXists”乙帮。
expire KEY seconds
設(shè)置key的過(guò)期時(shí)間杜漠。如果key已過(guò)期,將會(huì)被自動(dòng)刪除察净。
del KEY
刪除key
由于筆者的實(shí)現(xiàn)只用到這三個(gè)命令驾茴,就只介紹這三個(gè)命令,更多的命令以及redis的特性和使用氢卡,可以參考redis官網(wǎng)锈至。
需要考慮的問(wèn)題
1、用什么操作redis译秦?幸虧redis已經(jīng)提供了jedis客戶端用于java應(yīng)用程序峡捡,直接調(diào)用jedis API即可。
2筑悴、怎么實(shí)現(xiàn)加鎖们拙?“鎖”其實(shí)是一個(gè)抽象的概念,將這個(gè)抽象概念變?yōu)榫唧w的東西阁吝,就是一個(gè)存儲(chǔ)在redis里的key-value對(duì)砚婆,key是于商品ID相關(guān)的字符串來(lái)唯一標(biāo)識(shí),value其實(shí)并不重要突勇,因?yàn)橹灰@個(gè)唯一的key-value存在装盯,就表示這個(gè)商品已經(jīng)上鎖坷虑。
3、如何釋放鎖埂奈?既然key-value對(duì)存在就表示上鎖迄损,那么釋放鎖就自然是在redis里刪除key-value對(duì)。
4账磺、阻塞還是非阻塞海蔽?筆者采用了阻塞式的實(shí)現(xiàn),若線程發(fā)現(xiàn)已經(jīng)上鎖绑谣,會(huì)在特定時(shí)間內(nèi)輪詢鎖。
5拗引、如何處理異常情況借宵?比如一個(gè)線程把一個(gè)商品上了鎖,但是由于各種原因矾削,沒(méi)有完成操作(在上面的業(yè)務(wù)場(chǎng)景里就是沒(méi)有將庫(kù)存-1寫(xiě)入數(shù)據(jù)庫(kù))壤玫,自然沒(méi)有釋放鎖,這個(gè)情況筆者加入了鎖超時(shí)機(jī)制哼凯,利用redis的expire命令為key設(shè)置超時(shí)時(shí)長(zhǎng)欲间,過(guò)了超時(shí)時(shí)間redis就會(huì)將這個(gè)key自動(dòng)刪除,即強(qiáng)制釋放鎖(可以認(rèn)為超時(shí)釋放鎖是一個(gè)異步操作断部,由redis完成猎贴,應(yīng)用程序只需要根據(jù)系統(tǒng)特點(diǎn)設(shè)置超時(shí)時(shí)間即可)。
talk is cheap, show me the code
在代碼實(shí)現(xiàn)層面蝴光,注解有并發(fā)的方法和參數(shù)她渴,通過(guò)動(dòng)態(tài)代理獲取注解的方法和參數(shù),在代理中加鎖蔑祟,執(zhí)行完被代理的方法后釋放鎖趁耗。
幾個(gè)注解定義:
cachelock是方法級(jí)的注解,用于注解會(huì)產(chǎn)生并發(fā)問(wèn)題的方法:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default "";//redis 鎖key的前綴
long timeOut() default 2000;//輪詢鎖的時(shí)間
int expireTime() default 1000;//key在redis里存在的時(shí)間疆虚,1000S
}
lockedObject是參數(shù)級(jí)的注解苛败,用于注解商品ID等基本類型的參數(shù):
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
//不需要值
}
LockedComplexObject也是參數(shù)級(jí)的注解,用于注解自定義類型的參數(shù):
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default "";//含有成員變量的復(fù)雜對(duì)象中需要加鎖的成員變量径簿,如一個(gè)商品對(duì)象的商品ID
}
CacheLockInterceptor實(shí)現(xiàn)InvocationHandler接口罢屈,在invoke方法中獲取注解的方法和參數(shù),在執(zhí)行注解的方法前加鎖牍帚,執(zhí)行被注解的方法后釋放鎖:
public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;
public CacheLockInterceptor(Object proxied) {
this.proxied = proxied;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CacheLock cacheLock = method.getAnnotation(CacheLock.class);
//沒(méi)有cacheLock注解儡遮,pass
if(null == cacheLock){
System.out.println("no cacheLock annotation");
return method.invoke(proxied, args);
}
//獲得方法中參數(shù)的注解
Annotation[][] annotations = method.getParameterAnnotations();
//根據(jù)獲取到的參數(shù)注解和參數(shù)列表獲得加鎖的參數(shù)
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
//新建一個(gè)鎖
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
//加鎖
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){//取鎖失敗
ERROR_COUNT += 1;
throw new CacheLockException("get lock fail");
}
try{
//加鎖成功,執(zhí)行方法
return method.invoke(proxied, args);
}finally{
lock.unlock();//釋放鎖
}
}
/**
*
* @param annotations
* @param args
* @return
* @throws CacheLockException
*/
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException("方法參數(shù)為空暗赶,沒(méi)有被鎖定的對(duì)象");
}
if(null == annotations || annotations.length == 0){
throw new CacheLockException("沒(méi)有被注解的參數(shù)");
}
//不支持多個(gè)參數(shù)加鎖鄙币,只支持第一個(gè)注解為lockedObject或者lockedComplexObject的參數(shù)
int index = -1;//標(biāo)記參數(shù)的位置指針
for(int i = 0;i < annotations.length;i++){
for(int j = 0;j < annotations[i].length;j++){
if(annotations[i][j] instanceof LockedComplexObject){//注解為L(zhǎng)ockedComplexObject
index = i;
try {
return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException("注解對(duì)象中沒(méi)有該屬性" + ((LockedComplexObject)annotations[i][j]).field());
}
}
if(annotations[i][j] instanceof LockedObject){
index = i;
break;
}
}
//找到第一個(gè)后直接break肃叶,不支持多參數(shù)加鎖
if(index != -1){
break;
}
}
if(index == -1){
throw new CacheLockException("請(qǐng)指定被鎖定參數(shù)");
}
return args[index];
}
}
最關(guān)鍵的RedisLock類中的lock方法和unlock方法:
/**
* 加鎖
* 使用方式為:
* lock();
* try {
* executeMethod();
* } finally {
* unlock();
* }
* @param timeout timeout的時(shí)間范圍內(nèi)輪詢鎖
* @param expire 設(shè)置鎖超時(shí)時(shí)間
* @return 成功 or 失敗
*/
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
try {
//在timeout的時(shí)間范圍內(nèi)不斷輪詢鎖
while (System.nanoTime() - nanoTime < timeout) {
//鎖不存在的話,設(shè)置鎖并設(shè)置鎖過(guò)期時(shí)間十嘿,即加鎖
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);//設(shè)置鎖過(guò)期時(shí)間是為了在沒(méi)有釋放
//鎖的情況下鎖過(guò)期后消失因惭,不會(huì)造成永久阻塞
this.lock = true;
return this.lock;
}
System.out.println("出現(xiàn)鎖等待");
//短暫休眠,避免可能的活鎖
Thread.sleep(3, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException("locking error",e);
}
return false;
}
public void unlock() {
try {
if(this.lock){
redisClient.delKey(key);//直接刪除
}
} catch (Throwable e) {
}
}
上述的代碼是框架性的代碼绩衷,現(xiàn)在來(lái)講解如何使用上面的簡(jiǎn)單框架來(lái)寫(xiě)一個(gè)秒殺函數(shù)蹦魔。
先定義一個(gè)接口,接口里定義了一個(gè)秒殺方法:
public interface SeckillInterface {
/**
*現(xiàn)在暫時(shí)只支持在接口方法上注解
*/
//cacheLock注解可能產(chǎn)生并發(fā)的方法
@CacheLock(lockedPrefix="TEST_PREFIX")
public void secKill(String userID,@LockedObject Long commidityID);//最簡(jiǎn)單的秒殺方法咳燕,參數(shù)是用戶ID和商品ID勿决。可能有多個(gè)線程爭(zhēng)搶一個(gè)商品招盲,所以商品ID加上LockedObject注解
}
上述SeckillInterface接口的實(shí)現(xiàn)類低缩,即秒殺的具體實(shí)現(xiàn):
public class SecKillImpl implements SeckillInterface{
static Map<Long, Long> inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}
@Override
public void secKill(String arg1, Long arg2) {
//最簡(jiǎn)單的秒殺,這里僅作為demo示例
reduceInventory(arg2);
}
//模擬秒殺操作曹货,姑且認(rèn)為一個(gè)秒殺就是將庫(kù)存減一咆繁,實(shí)際情景要復(fù)雜的多
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) - 1);
return inventory.get(commodityId);
}
}
模擬秒殺場(chǎng)景,1000個(gè)線程來(lái)爭(zhēng)搶兩個(gè)商品:
@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();
Thread[] threads = new Thread[threadCount];
//起500個(gè)線程顶籽,秒殺第一個(gè)商品
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一個(gè)信號(hào)量上玩般,掛起
beginCount.await();
//用動(dòng)態(tài)代理的方式調(diào)用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
//再起500個(gè)線程慧起,秒殺第二件商品
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一個(gè)信號(hào)量上龟梦,掛起
beginCount.await();
//用動(dòng)態(tài)代理的方式調(diào)用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId2);
//testClass.testFunc("test", 10000001L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
long startTime = System.currentTimeMillis();
//主線程釋放開(kāi)始信號(hào)量阿逃,并等待結(jié)束信號(hào)量正塌,這樣做保證1000個(gè)線程做到完全同時(shí)執(zhí)行馅袁,保證測(cè)試的正確性
beginCount.countDown();
try {
//主線程等待結(jié)束信號(hào)量
endCount.await();
//觀察秒殺結(jié)果是否正確
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
System.out.println("total cost " + (System.currentTimeMillis() - startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
在正確的預(yù)想下灶体,應(yīng)該每個(gè)商品的庫(kù)存都減少了500谆扎,在多次試驗(yàn)后霉撵,實(shí)際情況符合預(yù)想镰吆。如果不采用鎖機(jī)制帘撰,會(huì)出現(xiàn)庫(kù)存減少499,498的情況万皿。
這里采用了動(dòng)態(tài)代理的方法摧找,利用注解和反射機(jī)制得到分布式鎖ID,進(jìn)行加鎖和釋放鎖操作牢硅。當(dāng)然也可以直接在方法進(jìn)行這些操作蹬耘,采用動(dòng)態(tài)代理也是為了能夠?qū)㈡i操作代碼集中在代理中,便于維護(hù)减余。
通常秒殺場(chǎng)景發(fā)生在web項(xiàng)目中综苔,可以考慮利用spring的AOP特性將鎖操作代碼置于切面中,當(dāng)然AOP本質(zhì)上也是動(dòng)態(tài)代理。
小結(jié)
這篇文章從業(yè)務(wù)場(chǎng)景出發(fā)如筛,從抽象到實(shí)現(xiàn)闡述了如何利用redis實(shí)現(xiàn)分布式鎖堡牡,完成簡(jiǎn)單的秒殺功能,也記錄了筆者思考的過(guò)程杨刨,希望能給閱讀到本篇文章的人一些啟發(fā)晤柄。
源碼:https://github.com/lsfire/redisframework
https://blog.csdn.net/huangpeigui/java/article/details/90611454