在日常開發(fā)中有很多地方都有類似扣減庫存的操作币厕,比如電商系統(tǒng)中的商品庫存母谎,抽獎系統(tǒng)中的獎品庫存等。
解決方案
- 使用mysql數(shù)據(jù)庫倔监,使用一個字段來存儲庫存,每次扣減庫存去更新這個字段菌仁。
- 還是使用數(shù)據(jù)庫浩习,但是將庫存分層多份存到多條記錄里面,扣減庫存的時候路由一下济丘,這樣子增大了并發(fā)量谱秽,但是還是避免不了大量的去訪問數(shù)據(jù)庫來更新庫存。
- 將庫存放到redis使用redis的incrby特性來扣減庫存摹迷。
分析
在上面的第一種和第二種方式都是基于數(shù)據(jù)來扣減庫存疟赊。
基于數(shù)據(jù)庫單庫存
第一種方式在所有請求都會在這里等待鎖,獲取鎖有去扣減庫存峡碉。在并發(fā)量不高的情況下可以使用近哟,但是一旦并發(fā)量大了就會有大量請求阻塞在這里,導(dǎo)致請求超時鲫寄,進而整個系統(tǒng)雪崩吉执;而且會頻繁的去訪問數(shù)據(jù)庫疯淫,大量占用數(shù)據(jù)庫資源,所以在并發(fā)高的情況下這種方式不適用戳玫。
基于數(shù)據(jù)庫多庫存
第二種方式其實是第一種方式的優(yōu)化版本熙掺,在一定程度上提高了并發(fā)量,但是在還是會大量的對數(shù)據(jù)庫做更新操作大量占用數(shù)據(jù)庫資源咕宿。
基于數(shù)據(jù)庫來實現(xiàn)扣減庫存還存在的一些問題:
- 用數(shù)據(jù)庫扣減庫存的方式币绩,扣減庫存的操作必須在一條語句中執(zhí)行,不能先selec在update府阀,這樣在并發(fā)下會出現(xiàn)超扣的情況缆镣。如:
update number set x=x-1 where x > 0
MySQL自身對于高并發(fā)的處理性能就會出現(xiàn)問題,一般來說试浙,MySQL的處理性能會隨著并發(fā)thread上升而上升董瞻,但是到了一定的并發(fā)度之后會出現(xiàn)明顯的拐點,之后一路下降川队,最終甚至?xí)葐蝨hread的性能還要差。
當(dāng)減庫存和高并發(fā)碰到一起的時候睬澡,由于操作的庫存數(shù)目在同一行固额,就會出現(xiàn)爭搶InnoDB行鎖的問題,導(dǎo)致出現(xiàn)互相等待甚至死鎖煞聪,從而大大降低MySQL的處理性能斗躏,最終導(dǎo)致前端頁面出現(xiàn)超時異常。
基于redis
針對上述問題的問題我們就有了第三種方案昔脯,將庫存放到緩存啄糙,利用redis的incrby特性來扣減庫存,解決了超扣和性能問題云稚。但是一旦緩存丟失需要考慮恢復(fù)方案隧饼。比如抽獎系統(tǒng)扣獎品庫存的時候,初始庫存=總的庫存數(shù)-已經(jīng)發(fā)放的獎勵數(shù)静陈,但是如果是異步發(fā)獎燕雁,需要等到MQ消息消費完了才能重啟redis初始化庫存,否則也存在庫存不一致的問題鲸拥。
基于redis實現(xiàn)扣減庫存的具體實現(xiàn)
- 我們使用redis的lua腳本來實現(xiàn)扣減庫存
- 由于是分布式環(huán)境下所以還需要一個分布式鎖來控制只能有一個服務(wù)去初始化庫存
- 需要提供一個回調(diào)函數(shù)拐格,在初始化庫存的時候去調(diào)用這個函數(shù)獲取初始化庫存
初始化庫存回調(diào)函數(shù)(IStockCallback )
/**
* 獲取庫存回調(diào)
* @author yuhao.wang
*/
public interface IStockCallback {
/**
* 獲取庫存
* @return
*/
int getStock();
}
扣減庫存服務(wù)(StockService)
/**
* 扣庫存
*
* @author yuhao.wang
*/
@Service
public class StockService {
Logger logger = LoggerFactory.getLogger(StockService.class);
/**
* 不限庫存
*/
public static final long UNINITIALIZED_STOCK = -3L;
/**
* Redis 客戶端
*/
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 執(zhí)行扣庫存的腳本
*/
public static final String STOCK_LUA;
static {
/**
*
* @desc 扣減庫存Lua腳本
* 庫存(stock)-1:表示不限庫存
* 庫存(stock)0:表示沒有庫存
* 庫存(stock)大于0:表示剩余庫存
*
* @params 庫存key
* @return
* -3:庫存未初始化
* -2:庫存不足
* -1:不限庫存
* 大于等于0:剩余庫存(扣減之后剩余的庫存)
* redis緩存的庫存(value)是-1表示不限庫存,直接返回1
*/
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
sb.append(" local num = tonumber(ARGV[1]);");
sb.append(" if (stock == -1) then");
sb.append(" return -1;");
sb.append(" end;");
sb.append(" if (stock >= num) then");
sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");
sb.append(" end;");
sb.append(" return -2;");
sb.append("end;");
sb.append("return -3;");
STOCK_LUA = sb.toString();
}
/**
* @param key 庫存key
* @param expire 庫存有效時間,單位秒
* @param num 扣減數(shù)量
* @param stockCallback 初始化庫存回調(diào)函數(shù)
* @return -2:庫存不足; -1:不限庫存; 大于等于0:扣減庫存之后的剩余庫存
*/
public long stock(String key, long expire, int num, IStockCallback stockCallback) {
long stock = stock(key, num);
// 初始化庫存
if (stock == UNINITIALIZED_STOCK) {
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
// 獲取鎖
if (redisLock.tryLock()) {
// 雙重驗證刑赶,避免并發(fā)時重復(fù)回源到數(shù)據(jù)庫
stock = stock(key, num);
if (stock == UNINITIALIZED_STOCK) {
// 獲取初始化庫存
final int initStock = stockCallback.getStock();
// 將庫存設(shè)置到redis
redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
// 調(diào)一次扣庫存的操作
stock = stock(key, num);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
redisLock.unlock();
}
}
return stock;
}
/**
* 加庫存(還原庫存)
*
* @param key 庫存key
* @param num 庫存數(shù)量
* @return
*/
public long addStock(String key, int num) {
return addStock(key, null, num);
}
/**
* 加庫存
*
* @param key 庫存key
* @param expire 過期時間(秒)
* @param num 庫存數(shù)量
* @return
*/
public long addStock(String key, Long expire, int num) {
boolean hasKey = redisTemplate.hasKey(key);
// 判斷key是否存在捏浊,存在就直接更新
if (hasKey) {
return redisTemplate.opsForValue().increment(key, num);
}
Assert.notNull(expire,"初始化庫存失敗,庫存過期時間不能為null");
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
if (redisLock.tryLock()) {
// 獲取到鎖后再次判斷一下是否有key
hasKey = redisTemplate.hasKey(key);
if (!hasKey) {
// 初始化庫存
redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
redisLock.unlock();
}
return num;
}
/**
* 獲取庫存
*
* @param key 庫存key
* @return -1:不限庫存; 大于等于0:剩余庫存
*/
public int getStock(String key) {
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
return stock == null ? -1 : stock;
}
/**
* 扣庫存
*
* @param key 庫存key
* @param num 扣減庫存數(shù)量
* @return 扣減之后剩余的庫存【-3:庫存未初始化; -2:庫存不足; -1:不限庫存; 大于等于0:扣減庫存之后的剩余庫存】
*/
private Long stock(String key, int num) {
// 腳本里的KEYS參數(shù)
List<String> keys = new ArrayList<>();
keys.add(key);
// 腳本里的ARGV參數(shù)
List<String> args = new ArrayList<>();
args.add(Integer.toString(num));
long result = redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和單機模式雖然執(zhí)行腳本的方法一樣撞叨,但是沒有共同的接口金踪,所以只能分開執(zhí)行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
}
// 單機模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
}
return UNINITIALIZED_STOCK;
}
});
return result;
}
}
調(diào)用
/**
* @author yuhao.wang
*/
@RestController
public class StockController {
@Autowired
private StockService stockService;
@RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object stock() {
// 商品ID
long commodityId = 1;
// 庫存ID
String redisKey = "redis_key:stock:" + commodityId;
long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
return stock >= 0;
}
/**
* 獲取初始的庫存
*
* @return
*/
private int initStock(long commodityId) {
// TODO 這里做一些初始化庫存的操作
return 1000;
}
@RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object getStock() {
// 商品ID
long commodityId = 1;
// 庫存ID
String redisKey = "redis_key:stock:" + commodityId;
return stockService.getStock(redisKey);
}
@RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object addStock() {
// 商品ID
long commodityId = 2;
// 庫存ID
String redisKey = "redis_key:stock:" + commodityId;
return stockService.addStock(redisKey, 2);
}
}
源碼: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-stock-redis 工程
參考: