怎么預防redis的緩存擊穿
緩存穿透
緩存穿透是指查詢一個一定不存在的數(shù)據(jù),由于緩存是不命中時被動寫的驻呐,并且出于容錯考慮灌诅,如果從存儲層查不到數(shù)據(jù)則不寫入緩存,這將導致這個不存在的數(shù)據(jù)每次請求都要到存儲層去查詢含末,失去了緩存的意義猜拾。在流量大時,可能DB就掛掉了答渔,要是有人利用不存在的key頻繁攻擊我們的應用关带,這就是漏洞。
解決方案
1.接口層增加校驗,如用戶鑒權(quán)校驗宋雏,id做基礎校驗芜飘,id<=0的直接攔截;
2. 有很多種方法可以有效地解決緩存穿透問題磨总,最常見的則是采用布隆過濾器嗦明,將所有可能存在的數(shù)據(jù)哈希到一個足夠大的bitmap中,一個一定不存在的數(shù)據(jù)會被 這個bitmap攔截掉蚪燕,從而避免了對底層存儲系統(tǒng)的查詢壓力娶牌。另外也有一個更為簡單粗暴的方法(我們采用的就是這種),如果一個查詢返回的數(shù)據(jù)為空(不管是數(shù) 據(jù)不存在馆纳,還是系統(tǒng)故障)诗良,我們?nèi)匀话堰@個空結(jié)果進行緩存,但它的過期時間會很短鲁驶,最長不超過五分鐘鉴裹。
bloomfilter就類似于一個hash set,用于快速判某個元素是否存在于集合中钥弯,其典型的應用場景就是快速判斷一個key是否存在于某容器径荔,不存在就直接返回。布隆過濾器的關(guān)鍵就在于hash算法和容器大小脆霎,下面先來簡單的實現(xiàn)下看看效果总处,我這里用guava實現(xiàn)的布隆過濾器:
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
</dependencies>
public class BloomFilterTest {
private static final int capacity = 1000000;
private static final int key = 999998;
private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), capacity);
static {
for (int i = 0; i < capacity; i++) {
bloomFilter.put(i);
}
}
public static void main(String[] args) {
/*返回計算機最精確的時間,單位微妙*/
long start = System.nanoTime();
if (bloomFilter.mightContain(key)) {
System.out.println("成功過濾到" + key);
}
long end = System.nanoTime();
System.out.println("布隆過濾器消耗時間:" + (end - start));
int sum = 0;
for (int i = capacity + 20000; i < capacity + 30000; i++) {
if (bloomFilter.mightContain(i)) {
sum = sum + 1;
}
}
System.out.println("錯判率為:" + sum);
}
}
成功過濾到999998
布隆過濾器消耗時間:215518
錯判率為:318
可以看到睛蛛,100w個數(shù)據(jù)中只消耗了約0.2毫秒就匹配到了key鹦马,速度足夠快。然后模擬了1w個不存在于布隆過濾器中的key玖院,匹配錯誤率為318/10000菠红,也就是說第岖,出錯率大概為3%难菌,跟蹤下BloomFilter的源碼發(fā)現(xiàn)默認的容錯率就是0.03:
public static <T> BloomFilter<T> create(Funnel<T> funnel, int expectedInsertions /* n */) {
return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
}
我們項目中很多都是變化的數(shù)據(jù),所以空數(shù)據(jù)很快就會增加蔑滓。
緩存雪崩
緩存雪崩是指在我們設置緩存時采用了相同的過期時間郊酒,導致緩存在某一時刻同時失效,請求全部轉(zhuǎn)發(fā)到DB键袱,DB瞬時壓力過重雪崩燎窘。
解決方案
緩存失效時的雪崩效應對底層系統(tǒng)的沖擊非常可怕蹄咖。大多數(shù)系統(tǒng)設計者考慮用加鎖或者隊列的方式保證緩存的單線 程(進程)寫褐健,從而避免失效時大量的并發(fā)請求落到底層存儲系統(tǒng)上。這里分享一個簡單方案就時講緩存失效時間分散開,比如我們可以在原有的失效時間基礎上增加一個隨機值蚜迅,比如1-5分鐘隨機舵匾,這樣每一個緩存的過期時間的重復率就會降低,就很難引發(fā)集體失效的事件谁不。
緩存擊穿
對于一些設置了過期時間的key坐梯,如果這些key可能會在某些時間點被超高并發(fā)地訪問,是一種非成才粒“熱點”的數(shù)據(jù)吵血。這個時候,需要考慮一個問題:緩存被“擊穿”的問題偷溺,這個和緩存雪崩的區(qū)別在于這里針對某一key緩存蹋辅,前者則是很多key。
緩存在某個時間點過期的時候挫掏,恰好在這個時間點對這個Key有大量的并發(fā)請求過來晕翠,這些請求發(fā)現(xiàn)緩存過期一般都會從后端DB加載數(shù)據(jù)并回設到緩存,這個時候大并發(fā)的請求可能會瞬間把后端DB壓垮砍濒。
解決方案
1.使用互斥鎖(mutex key)
業(yè)界比價普遍的一種做法淋肾,即根據(jù)key獲取value值為空時,鎖上爸邢,從數(shù)據(jù)庫中l(wèi)oad數(shù)據(jù)后再釋放鎖樊卓。若其它線程獲取鎖失敗,則等待一段時間后重試杠河。這里要注意碌尔,分布式環(huán)境中要使用分布式鎖,單機的話用普通的鎖(synchronized券敌、Lock)就夠了唾戚。
業(yè)界比較常用的做法,是使用mutex。簡單地來說蠢熄,就是在緩存失效的時候(判斷拿出來的值為空)挖息,不是立即去load db,而是先使用緩存工具的某些帶成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一個mutex key募书,當操作返回成功時,再進行l(wèi)oad db的操作并回設緩存测蹲;否則莹捡,就重試整個get緩存的方法。
SETNX扣甲,是「SET if Not eXists」的縮寫篮赢,也就是只有不存在的時候才設置,可以利用它來實現(xiàn)鎖的效果。在redis2.6.1之前版本未實現(xiàn)setnx的過期時間启泣,所以這里給出兩種版本代碼參考:
//2.6.1前單機版本鎖
String get(String key) {
String value = redis.get(key);
if (value == null) {
if (redis.setnx(key_mutex, "1")) {
// 3 min timeout to avoid mutex holder crash
redis.expire(key_mutex, 3 * 60)
value = db.get(key);
redis.set(key, value);
redis.delete(key_mutex);
} else {
//其他線程休息50毫秒后重試
Thread.sleep(50);
get(key);
}
}
}
最新版本代碼:
public String get(key) {
String value = redis.get(key);
if (value == null) { //代表緩存值過期
//設置3min的超時媒咳,防止del操作失敗的時候,下次緩存過期一直不能load db
if (redis.setnx(key_mutex, 1, 3 * 60) == 1) { //代表設置成功
value = db.get(key);
redis.set(key, value, expire_secs);
redis.del(key_mutex);
} else { //這個時候代表同時候的其他線程已經(jīng)load db并回設到緩存了种远,這時候重試獲取緩存值即可
sleep(50);
get(key); //重試
}
} else {
return value;
}
}
public String getWithLock(String key, Jedis jedis, String lockKey, String uniqueId, long expireTime) {
// 通過key獲取value
String value = redisService.get(key);
if (StringUtil.isEmpty(value)) {
// 分布式鎖涩澡,詳細可以參考https://blog.csdn.net/fanrenxiang/article/details/79803037
//封裝的tryDistributedLock包括setnx和expire兩個功能,在低版本的redis中不支持
try {
boolean locked = redisService.tryDistributedLock(jedis, lockKey, uniqueId, expireTime);
if (locked) {
value = userService.getById(key);
redisService.set(key, value);
redisService.del(lockKey);
return value;
} else {
// 其它線程進來了沒獲取到鎖便等待50ms后重試
Thread.sleep(50);
getWithLock(key, jedis, lockKey, uniqueId, expireTime);
}
} catch (Exception e) {
log.error("getWithLock exception=" + e);
return value;
} finally {
redisService.releaseDistributedLock(jedis, lockKey, uniqueId);
}
}
return value;
}
這樣做思路比較清晰坠敷,也從一定程度上減輕數(shù)據(jù)庫壓力妙同,但是鎖機制使得邏輯的復雜度增加,吞吐量也降低了膝迎,有點治標不治本粥帚。
2. "提前"使用互斥鎖(mutex key):
在value內(nèi)部設置1個超時值(timeout1), timeout1比實際的memcache timeout(timeout2)小。當從cache讀取到timeout1發(fā)現(xiàn)它已經(jīng)過期時候限次,馬上延長timeout1并重新設置到cache芒涡。然后再從數(shù)據(jù)庫加載數(shù)據(jù)并設置到cache中。偽代碼如下:
v = memcache.get(key);
if (v == null) {
if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {
value = db.get(key);
memcache.set(key, value);
memcache.delete(key_mutex);
} else {
sleep(50);
retry();
}
} else {
if (v.timeout <= now()) {
if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {
// extend the timeout for other threads
v.timeout += 3 * 60 * 1000;
memcache.set(key, v, KEY_TIMEOUT * 2);
// load the latest value from db
v = db.get(key);
v.timeout = KEY_TIMEOUT;
memcache.set(key, value, KEY_TIMEOUT * 2);
memcache.delete(key_mutex);
} else {
sleep(50);
retry();
}
}
}
3卖漫、接口限流與熔斷费尽、降級
重要的接口一定要做好限流策略,防止用戶惡意刷接口羊始,同時要降級準備旱幼,當接口中的某些服務不可用時候,進行熔斷突委,失敗快速返回機制柏卤。