分布式鎖的實現(xiàn)原理
#上面的命令的意思是設(shè)置lock_mjw的值為1 過期時間10秒鐘,set時如果lock_mjw已經(jīng)有值腻扇,
#設(shè)置失敗,并返回0,如果沒有值沙庐,設(shè)置成功鲤妥,并返回1.
#EX設(shè)置過期時間 NX:設(shè)置值時判斷key是否已經(jīng)有值佳吞,如果有值不做修改并返回0,如果沒有值設(shè)置成功棉安,返回1.
#為什么加過期時間底扳,因為如果不加過期時間,某個業(yè)務(wù)系統(tǒng)獲取鎖成功后贡耽,還未釋放鎖的情況下
#掛掉了衷模,就會導(dǎo)致lock_mjw的值永遠存在,從而導(dǎo)致其他業(yè)務(wù)系統(tǒng)和線程永遠無法獲取鎖成功蒲赂。
#加了過期時間及時業(yè)務(wù)系統(tǒng)在未釋放鎖的情況下掛掉了阱冶,過一段時間lock_mjw會被清理掉,其他線程或應(yīng)用也可以獲取到該所滥嘴。
192.168.29.134:7001> set lock_mjw 1 EX 10 NX
上面的分布式鎖還不夠完美木蹬,會存在什么問題呢?請看下圖:
上圖的問題是若皱,如果鎖在還沒有執(zhí)行完業(yè)務(wù)代碼時就已經(jīng)失效镊叁;并發(fā)訪問的線程2就會在線程1還沒執(zhí)行
完業(yè)務(wù)代碼就已經(jīng)獲取鎖成功,會導(dǎo)致鎖起不了鎖的作用走触,為了解決以上問題晦譬,需要對lock_mjw時間進行續(xù)約,直接上代碼:
@Controller
@Api(tags = "RedisTestController", description = "redis測試")
@RequestMapping("/redis")
public class RedisTestController {
@Autowired
private RedisLockUtil redisLockUtil;
@RequestMapping(value = "/send5",method = RequestMethod.POST)
@ApiOperation("發(fā)送消息到消息隊列")
@ResponseBody
@ApiImplicitParam(name = "key", value = "鍵", defaultValue = "mju",paramType = "query")
public CommonResult<String> send5(@RequestParam String key) throws InterruptedException {
try{
redisLockUtil.lock(key);
System.out.println("成功獲取鎖..正在執(zhí)行業(yè)務(wù)代碼....");
Thread.sleep(5000L);
}finally {
System.out.println("業(yè)務(wù)代碼執(zhí)行完畢互广,釋放鎖....");
redisLockUtil.unlock(key);
}
return CommonResult.success("馬軍偉","返回正確");
}
}
package com.luban.mall.search.controller;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class RedisLockUtil {
private static final String PREX_LOCK="lock_";
private static AtomicBoolean isStart = new AtomicBoolean(false);
private byte[] lock = new byte[1];
@Autowired
private JedisPool jedisPool;
@Autowired
private JedisCluster cluster;
/**
* 加鎖
* @param key
*/
@SneakyThrows
public void lock(String key){
//循環(huán)嘗試加鎖直到加鎖成功
while(true){
//嘗試加鎖
if(tryLock(key)){
break;
}else{
Thread.sleep(100l);
}
}
}
/**
* 嘗試加鎖
* @param key
* @return
*/
private boolean tryLock(String key) {
//設(shè)置lock_key的值到redis中敛腌,失效時間20秒,設(shè)置時,如果lock_key不存在像樊,會設(shè)置成功返回:ok,
//如果已存在值夸溶,則設(shè)置失敗,返回null
String ok = cluster.set(PREX_LOCK + key, "1", "NX", "EX", 5);
if (ok == null) {
return false;
} else {
//設(shè)置值成功凶硅,相當(dāng)于獲取鎖成功缝裁,獲取鎖成功后需要定時對鎖的過期時間進行續(xù)約,防止鎖過期
//需要開一個守護線程(主線程完成后足绅,守護線程也會跟著結(jié)束)來做這個事兒
renewalTime(key);
return true;
}
}
private void renewalTime(String key) {
//把鎖記錄到redis的set集合lock_中
cluster.sadd(PREX_LOCK,PREX_LOCK+key);
if(isStart.get()){
return;
}
synchronized (lock){
if(isStart.get()){
return;
}
isStart.set(true);
Thread t = new Thread(){
@SneakyThrows
@Override
public void run() {
while (true){
//從redis的set集合lock_中獲取生成的鎖
Set<String> locks = cluster.smembers(PREX_LOCK);
for(String redlock: locks){
Long ttl = cluster.ttl(redlock);
if(ttl==null || ttl<0){
cluster.srem(PREX_LOCK,PREX_LOCK+key);
}else if(ttl<5){
//續(xù)約
cluster.expire(redlock, ttl.intValue()+1);
System.out.println("續(xù)約:"+redlock+":"+ttl.intValue()+1);
}
}
Thread.sleep(1000l);
}
}
};
t.setDaemon(true);
t.start();
}
}
/**
* 釋放鎖
* @param key
*/
public void unlock(String key) {
//刪除鎖
cluster.del(PREX_LOCK+key);
//從鎖集合中移除
cluster.srem(PREX_LOCK,PREX_LOCK+key);
}
}
package com.luban.mall.search.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.Set;
@Configuration
@PropertySource(value={"classpath:jedis.properties"})
public class RedisConfiguration {
@Value("${redis.node.maxTotal}")
private Integer maxTotal;
@Value("${redis.node.host}")
private String host;
@Value("${redis.node.port}")
private Integer port;
@Value("${redis.node.password}")
private String passwd;
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
/*
* 序列化后會產(chǎn)生java類型說明捷绑,如果不需要用“Jackson2JsonRedisSerializer”
* 和“ObjectMapper ”配合效果更好
*/
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean //這個注解注入工廠的名稱是方法名
public JedisPool jedisPool(){
JedisPoolConfig jedisPoolConfig = jedisPoolConfig();
return new JedisPool(jedisPoolConfig, host, port, 2000, passwd, 0, (String)null);
}
public JedisPoolConfig jedisPoolConfig(){ //這個是修改redis性能的時候需要的對象
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
return jedisPoolConfig;
}
@Bean
public JedisCluster jedisCluster(){
Set<HostAndPort> set = new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.29.134",7001));
set.add(new HostAndPort("192.168.29.134",7002));
set.add(new HostAndPort("192.168.29.134",7003));
set.add(new HostAndPort("192.168.29.134",7004));
set.add(new HostAndPort("192.168.29.134",7005));
set.add(new HostAndPort("192.168.29.134",7006));
JedisCluster cluster = new JedisCluster(set,5000,5000,20,"123456",jedisPoolConfig());
return cluster;
}
}
#jedis.properties
#資源池中最大連接數(shù)
redis.node.maxTotal=10
redis.node.host=192.168.29.134
redis.node.port=6379
redis.node.password=123456
#資源池中允許的最大空閑連接數(shù)
redis.node.maxIdle=10
#資源池確保的最少空閑連接數(shù)
redis.node.minIdle=0
#當(dāng)資源池用盡后,調(diào)用者是否要等待氢妈。只有當(dāng)值為true時粹污,下面的maxWaitMillis才會生效。
redis.node.blockWhenExhausted=true
#當(dāng)資源池連接用盡后首量,調(diào)用者的最大等待時間(單位為毫秒)
redis.node.maxWaitMillis=5000
#向資源池借用連接時是否做連接有效性檢測(ping)壮吩。檢測到的無效連接將會被移除。
#業(yè)務(wù)量很大時候建議設(shè)置為false加缘,減少一次ping的開銷鸭叙。
redis.node.testOnBorrow=false
#向資源池歸還連接時是否做連接有效性檢測(ping)。檢測到無效連接將會被移除拣宏。
#業(yè)務(wù)量很大時候建議設(shè)置為false沈贝,減少一次ping的開銷。
redis.node.testOnReturn=false
#是否開啟JMX監(jiān)控
redis.node.jmxEnabled=false
建議開啟勋乾,請注意應(yīng)用本身也需要開啟宋下。
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.10.RELEASE</version>
<scope>compile</scope>
</dependency>
<!--redis客戶端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
單機情況下redis節(jié)點掛掉的情況下會存在鎖失效的問題,
為解決這個問題:可以在不同節(jié)點上加多個鎖,其中有一個存在鎖定狀態(tài)就認為是鎖定狀態(tài)辑莫,
這就會解決單機版redis鎖失效問題学歧。
利用redisson框架實現(xiàn)高可用分布式鎖
@Configuration
@PropertySource(value={"classpath:jedis.properties"})
public class RedisConfiguration {
@Bean
public Redisson redisson(){
Config config= new Config();
config.useClusterServers()
.addNodeAddress("redis://192.168.29.134:7001")
.addNodeAddress("redis://192.168.29.134:7002")
.addNodeAddress("redis://192.168.29.134:7003")
.addNodeAddress("redis://192.168.29.134:7004")
.addNodeAddress("redis://192.168.29.134:7005")
.addNodeAddress("redis://192.168.29.134:7006")
.setPassword("123456");
Redisson redisson = (Redisson) Redisson.create(config);
return redisson;
}
}
package com.luban.mall.search.controller;
import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RedisLockUtil2 {
@Autowired
private Redisson redisson;
public void lock(String key){
RLock lock1 = redisson.getLock(key+0);
RLock lock2 = redisson.getLock(key+1);
RLock lock3 = redisson.getLock(key+2);
RedissonRedLock lock = new RedissonRedLock(lock1,lock2,lock3);
lock.lock();
}
public void unlock(String key){
RLock lock1 = redisson.getLock(key+0);
RLock lock2 = redisson.getLock(key+1);
RLock lock3 = redisson.getLock(key+2);
RedissonRedLock lock = new RedissonRedLock(lock1,lock2,lock3);
lock.unlock();
}
}
@RequestMapping(value = "/send6",method = RequestMethod.POST)
@ApiOperation("發(fā)送消息到消息隊列")
@ResponseBody
@ApiImplicitParams({
@ApiImplicitParam(name = "key", value = "鍵", defaultValue = "mju",paramType = "query"),
@ApiImplicitParam(name = "flag", value = "鍵", defaultValue = "1",paramType = "query")
})
public CommonResult<String> send6(@RequestParam String key,@RequestParam String flag) throws InterruptedException {
try{
redisLockUtil2.lock(key);
System.out.println("成功獲取鎖..正在執(zhí)行業(yè)務(wù)代碼....key:"+key+"----flag:"+flag);
Thread.sleep(50000L);
}finally {
System.out.println("業(yè)務(wù)代碼執(zhí)行完畢,釋放鎖....");
redisLockUtil2.unlock(key);
}
return CommonResult.success("馬軍偉","返回正確");
}
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.7</version>
</dependency>