1. BG
實(shí)習(xí)需求經(jīng)過了測試戳吝,上線后幾天浩销,QA對新需求的測試過程中突然上一版需求中出現(xiàn)了問題,導(dǎo)致數(shù)據(jù)重復(fù)+1听哭。并且該問題復(fù)現(xiàn)時間不固定慢洋,重復(fù)+1次數(shù)不固定,在2~5之間陆盘。
2. 問題分析
經(jīng)過查詢服務(wù)器日志發(fā)現(xiàn)普筹,本該用redis去重只執(zhí)行一次的操作,執(zhí)行了多次隘马。而該部分在上一版需求單獨(dú)測試與上線中并沒有發(fā)現(xiàn)問題太防。
問題定位代碼塊如下(去隱私代碼):
//如果監(jiān)聽到Kafka的消息,那么執(zhí)行以下操作
threadPool.execute(() -> {
if (!redisCheck()) {
//業(yè)務(wù)值+1
}
}
);
private boolean redisCheck(args) {
String key =args;
String uniqueId = (String) redisTemplate.opsForValue().get(key);
if ("key已存在".equals(uniqueId)) {
return true;
} else {
//不存在key
redisTemplate.opsForValue().setIfAbsent(key, "key已存在", 3600, TimeUnit.SECONDS);
}
return false;
}
本質(zhì)上 此處邏輯在監(jiān)聽Kafka的消息酸员,當(dāng)監(jiān)聽到Kafka消息時蜒车,對Kafka的消息解析出參數(shù)args
,再對args
去redis中檢驗(yàn)是否重復(fù)幔嗦,如果不重復(fù)酿愧,則+1操作。
測試環(huán)境下有N臺測試機(jī)器部署應(yīng)用邀泉,M臺Kafka環(huán)境嬉挡。測試環(huán)境的每個測試都有一個唯一的kafka group-id钝鸽。那么在測試的時候,同一個group中的worker庞钢,只有一個worker能拿到這個數(shù)據(jù)拔恰,但每個group都可以拿到同樣的所有數(shù)據(jù)。因此上訴代碼監(jiān)聽Kafka消息部分就會收到多條消息基括,那么就會造成一個并發(fā)的操作仁连。
明明用redis去重了?
注意到并發(fā)操作阱穗,那么實(shí)際上在redisTemplate
的get與set方法之間存在延時(非原子操作)饭冬,這里必然就出現(xiàn)了線程不安全的情況。所以會造成+1操作執(zhí)行多次揪阶。為什么QA之前測試的時候沒出錯情況昌抠,線上也沒有出錯?
QA在測試的時候鲁僚,都是單獨(dú)對當(dāng)前需求分支部署后的應(yīng)用進(jìn)行測試炊苫,這個單獨(dú)的需求分支一般只會部署到一個測試環(huán)境中,那么這個時候在這個需求里面只會存在一個Kafka group冰沙。那么部署的應(yīng)用只會收到一個group下的一個worker發(fā)出的Kafka消息侨艾,固然不會出現(xiàn)多線程對于一個KEY
并發(fā)的情況。對于線上拓挥,線上是只有一個group唠梨,固然線上不會出現(xiàn)這個問題。為什么上個需求上線后在新需求出現(xiàn)了這個問題侥啤?
QA測完需求后交付會把代碼合并到master分支当叭,那么在新需求的測試過程中,所有的test環(huán)境下都是包含master代碼的盖灸,也就是后面的所有的測試環(huán)境的實(shí)例都包含上述的代碼蚁鳖。那么在新需求的測試過程中,test-n環(huán)境下的實(shí)例會收到對應(yīng)的group下的Kafka的消息赁炎,如果有多個實(shí)例同時在線的話醉箕,就會遇到上述的并發(fā)問題。
3. 問題解決
實(shí)際上這個問題是可以不用去解決的徙垫,是測試環(huán)境多個不同的實(shí)例監(jiān)聽不同group的Kafka導(dǎo)致的讥裤。但是原因與解決還是要弄清楚。雖然這是一個并發(fā)問題松邪,但使用普通的加鎖坞琴,如java
中的是無法解決問題的,因?yàn)椴豢赡苤粫幸粋€實(shí)例運(yùn)行在服務(wù)器上逗抑,所以需要分布式鎖剧辐。redis的setnx命令就可避免上面的問題寒亥。
4. 基于redis實(shí)現(xiàn)分布式鎖
分布式鎖特點(diǎn):
- 互斥性:同一時刻只有一個線程持有鎖
- 可重入性:同一節(jié)點(diǎn)上的同一個線程如果獲取鎖之后能再次獲取鎖
- 鎖超時:防止死鎖
- 高性能與高可用:加解鎖都要高效,防止分布式鎖失效
- 具有阻塞與非阻塞性荧关,能及時從阻塞狀態(tài)被喚醒
一般分布式鎖實(shí)現(xiàn)方式:
- 基于數(shù)據(jù)庫
- 基于zookeeper
- 基于redis
主要就是使用redis的SETNX命令(set if not exist) 成功返回1溉奕,否則為0。
SETNX key value 將 key 的值設(shè)為 value 忍啤,當(dāng)且僅當(dāng) key 不存在加勤。 若給定的 key 已經(jīng)存在,則 SETNX 不做任何動作同波。 SETNX 是『SET if Not eXists』(如果不存在鳄梅,則 SET)的簡寫。
返回值: 設(shè)置成功未檩,返回 1 戴尸。 設(shè)置失敗,返回 0 冤狡。
那么基于分布式鎖的特性孙蒙,使用setnx+expire可以實(shí)現(xiàn)(非原子),但如果出現(xiàn)錯誤悲雳,當(dāng)setnx執(zhí)行完后出現(xiàn)問題導(dǎo)致expire操作未執(zhí)行挎峦,那么就會出現(xiàn)鎖無法過期。一種改善方案就是使用Lua腳本來保證原子性(包含setnx和expire兩條指令)(腳本略)
后續(xù)redis中有實(shí)現(xiàn):
SET key value[EX seconds][PX milliseconds][NX|XX]
- EX seconds: 設(shè)定過期時間合瓢,單位為秒
- PX milliseconds: 設(shè)定過期時間坦胶,單位為毫秒
- NX: 僅當(dāng)key不存在時設(shè)置值
- XX: 僅當(dāng)key存在時設(shè)置值
這一條命令即等同于setnx。
Solve: jedis進(jìn)行操作:
String result = template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, "鎖定的資源", "NX", "PX", 3000);
}
});