Spring-data-redis + redis 分布式鎖(二)

分布式鎖的解決方式

  1. 基于數(shù)據(jù)庫表做樂觀鎖谋减,用于分布式鎖厨剪。(適用于小并發(fā))
  2. 使用memcached的add()方法,用于分布式鎖麦向。
  3. 使用memcached的cas()方法,用于分布式鎖客叉。(不常用)
  4. 使用redis的setnx()诵竭、expire()方法,用于分布式鎖兼搏。
  5. 使用redis的setnx()卵慰、get()、getset()方法向族,用于分布式鎖呵燕。
  6. 使用redis的watch、multi件相、exec命令再扭,用于分布式鎖氧苍。(不常用)
  7. 使用zookeeper,用于分布式鎖泛范。(不常用)

這里主要介紹第四種和第五種:

前文提供的兩種方式其實都有些問題让虐,要么是死鎖,要么是依賴服務(wù)器時間同步罢荡。從Redis 2.6.12 版本開始赡突, SET 命令可以通過參數(shù)來實現(xiàn)和 SETNX 、 SETEX 和 PSETEX 三個命令的效果区赵。這樣我們的可以將加鎖操作用一個set命令來實現(xiàn)惭缰,直接是原子性操作,既沒有死鎖的風(fēng)險笼才,也不依賴服務(wù)器時間同步漱受,可以完美解決這兩個問題。
在redis文檔上有詳細說明:
http://doc.redisfans.com/string/set.html

使用redis的SET resource-name anystring NX EX max-lock-time 方式骡送,用于分布式鎖

原理

命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法昂羡。

客戶端執(zhí)行以上的命令:

  • 如果服務(wù)器返回 OK ,那么這個客戶端獲得鎖摔踱。
  • 如果服務(wù)器返回 NIL 虐先,那么客戶端獲取鎖失敗,可以在稍后再重試派敷。
  • 設(shè)置的過期時間到達之后蛹批,鎖將自動釋放。

可以通過以下修改膀息,讓這個鎖實現(xiàn)更健壯:

  • 不使用固定的字符串作為鍵的值般眉,而是設(shè)置一個不可猜測(non-guessable)的長隨機字符串,作為口令串(token)潜支。
  • 不使用 DEL 命令來釋放鎖,而是發(fā)送一個 Lua 腳本柿汛,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時冗酿,才對鍵進行刪除。
    這兩個改動可以防止持有過期鎖的客戶端誤刪現(xiàn)有鎖的情況出現(xiàn)络断。

以下是一個簡單的解鎖腳本示例:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end

可能存在的問題

要保證redis支持eval命令

具體實現(xiàn)

鎖具體實現(xiàn)RedisLock:


import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * Redis分布式鎖
 * 使用 SET resource-name anystring NX EX max-lock-time 實現(xiàn)
 * <p>
 * 該方案在 Redis 官方 SET 命令頁有詳細介紹裁替。
 * http://doc.redisfans.com/string/set.html
 * <p>
 * 在介紹該分布式鎖設(shè)計之前,我們先來看一下在從 Redis 2.6.12 開始 SET 提供的新特性貌笨,
 * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX]弱判,其中:
 * <p>
 * EX seconds — 以秒為單位設(shè)置 key 的過期時間;
 * PX milliseconds — 以毫秒為單位設(shè)置 key 的過期時間锥惋;
 * NX — 將key 的值設(shè)為value 昌腰,當(dāng)且僅當(dāng)key 不存在开伏,等效于 SETNX。
 * XX — 將key 的值設(shè)為value 遭商,當(dāng)且僅當(dāng)key 存在固灵,等效于 SETEX。
 * <p>
 * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法劫流。
 * <p>
 * 客戶端執(zhí)行以上的命令:
 * <p>
 * 如果服務(wù)器返回 OK 巫玻,那么這個客戶端獲得鎖。
 * 如果服務(wù)器返回 NIL 祠汇,那么客戶端獲取鎖失敗仍秤,可以在稍后再重試。
 *
 * @author yuhao.wangwang
 */
public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    private RedisTemplate redisTemplate;

    /**
     * 將key 的值設(shè)為value 可很,當(dāng)且僅當(dāng)key 不存在徒扶,等效于 SETNX。
     */
    public static final String NX = "NX";

    /**
     * seconds — 以秒為單位設(shè)置 key 的過期時間根穷,等效于EXPIRE key seconds
     */
    public static final String EX = "EX";

    /**
     * 調(diào)用set后的返回值
     */
    public static final String OK = "OK";

    /**
     * 默認請求鎖的超時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默認鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;

    /**
     * 解鎖的lua腳本
     */
    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    /**
     * 鎖標(biāo)志對應(yīng)的key
     */
    private String lockKey;

    /**
     * 記錄到日志的鎖標(biāo)志對應(yīng)的key
     */
    private String lockKeyLog = "";

    /**
     * 鎖對應(yīng)的值
     */
    private String lockValue;

    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的超時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖標(biāo)記
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默認的鎖過期時間和請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默認的請求鎖的超時時間姜骡,指定鎖的過期時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(單位:秒)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默認的鎖的過期時間,指定請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 鎖的過期時間和請求鎖的超時時間都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(單位:秒)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * 嘗試獲取鎖 超時返回
     *
     * @return
     */
    public boolean tryLock() {
        // 生成隨機key
        lockValue = UUID.randomUUID().toString();
        // 請求鎖超時時間屿良,納秒
        long timeout = timeOut * 1000000;
        // 系統(tǒng)當(dāng)前時間圈澈,納秒
        long nowTime = System.nanoTime();
        while ((System.nanoTime() - nowTime) < timeout) {
            if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
                locked = true;
                // 上鎖成功結(jié)束請求
                return true;
            }

            // 每次請求等待一段時間
            seleep(10, 50000);
        }
        return locked;
    }

    /**
     * 嘗試獲取鎖 立即返回
     *
     * @return 是否成功獲得鎖
     */
    public boolean lock() {
        lockValue = UUID.randomUUID().toString();
        //不存在則添加 且設(shè)置過期時間(單位ms)
        String result = set(lockKey, lockValue, expireTime);
        locked = OK.equalsIgnoreCase(result);
        return locked;
    }

    /**
     * 以阻塞方式的獲取鎖
     *
     * @return 是否成功獲得鎖
     */
    public boolean lockBlock() {
        lockValue = UUID.randomUUID().toString();
        while (true) {
            //不存在則添加 且設(shè)置過期時間(單位ms)
            String result = set(lockKey, lockValue, expireTime);
            if (OK.equalsIgnoreCase(result)) {
                locked = true;
                return locked;
            }

            // 每次請求等待一段時間
            seleep(10, 50000);
        }
    }

    /**
     * 解鎖
     * <p>
     * 可以通過以下修改,讓這個鎖實現(xiàn)更健壯:
     * <p>
     * 不使用固定的字符串作為鍵的值尘惧,而是設(shè)置一個不可猜測(non-guessable)的長隨機字符串康栈,作為口令串(token)。
     * 不使用 DEL 命令來釋放鎖喷橙,而是發(fā)送一個 Lua 腳本啥么,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時,才對鍵進行刪除贰逾。
     * 這兩個改動可以防止持有過期鎖的客戶端誤刪現(xiàn)有鎖的情況出現(xiàn)悬荣。
     */
    public Object unlock() {
        // 只有加鎖成功并且鎖還有效才去釋放鎖
        // 只有加鎖成功并且鎖還有效才去釋放鎖
        if (locked) {
            try {
                return redisTemplate.execute((RedisConnection connection) -> {
                    Object nativeConnection = connection.getNativeConnection();
                    Long result = 0L;

                    List<String> keys = new ArrayList<>();
                    keys.add(lockKey);
                    List<String> values = new ArrayList<>();
                    values.add(lockValue);

                    // jedis集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    } else if (nativeConnection instanceof Jedis) {
                        // jedis單機模式
                        result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    } else if (nativeConnection instanceof RedisScriptingAsyncCommands) {
                        // lettuce
                        try {
                            List<byte[]> bkeys = keys.stream().map(key -> redisTemplate.getKeySerializer().serialize(lockKey)).collect(Collectors.toList());
                            List<byte[]> bargs = values.stream().map(arg -> redisTemplate.getValueSerializer().serialize(lockValue)).collect(Collectors.toList());
                            RedisFuture<Long> future = ((RedisScriptingAsyncCommands) nativeConnection).eval(UNLOCK_LUA, ScriptOutputType.INTEGER, bkeys.toArray(new byte[0][0]), bargs.toArray(new byte[0][0]));
                            result = future.get(1, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } else {
                        return redisTemplate.delete(lockKey);
                    }

                    if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
                        logger.debug("Redis分布式鎖,解鎖{}失敻斫!氯迂!解鎖時間:{}", lockKeyLog, System.currentTimeMillis());
                    }

                    locked = result == 0;
                    return result == 1;
                });
            } catch (Throwable e) {
                logger.warn("Redis不支持EVAL命令,使用降級方式解鎖:{}", e.getMessage());
                String value = this.get(lockKey, String.class);
                if (lockValue.equals(value)) {
                    redisTemplate.delete(lockKey);
                    return true;
                }
                return false;
            }
        }

        return true;
    }

    /**
     * 獲取鎖狀態(tài)
     *
     * @return
     * @Title: isLock
     * @author yuhao.wang
     */
    public boolean isLock() {

        return locked;
    }

    /**
     * 重寫redisTemplate的set方法
     * <p>
     * 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法言缤。
     * <p>
     * 客戶端執(zhí)行以上的命令:
     * <p>
     * 如果服務(wù)器返回 OK 嚼蚀,那么這個客戶端獲得鎖。
     * 如果服務(wù)器返回 NIL 管挟,那么客戶端獲取鎖失敗轿曙,可以在稍后再重試。
     *
     * @param key     鎖的Key
     * @param value   鎖里面的值
     * @param seconds 過去時間(秒)
     * @return
     */
    private String set(final String key, final String value, final long seconds) {
        Assert.isTrue(!StringUtils.isEmpty(key), "key不能為空");

        return (String) redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                String result = null;
                // jedis
                if (nativeConnection instanceof JedisCommands) {
                    result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                } else if (nativeConnection instanceof RedisStringAsyncCommands) {
                    // lettuce
                    try {
                        byte[] serializeKey = redisTemplate.getKeySerializer().serialize(key);
                        byte[] serializeValue = redisTemplate.getValueSerializer().serialize(value);
                        RedisFuture<String> future = ((RedisStringAsyncCommands) nativeConnection).set(serializeKey, serializeValue, SetArgs.Builder.nx().ex(seconds));
                        result = future.get(1, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } else {
                    //  默認方式
                    boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
                    if (flag) {
                        redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
                        result = OK;
                    }
                }

                if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
                    logger.info("獲取鎖{}的時間:{}", lockKeyLog, System.currentTimeMillis());
                }

                return result;
            }
        });
    }

    /**
     * 獲取redis里面的值
     *
     * @param key    key
     * @param aClass class
     * @return T
     */
    private <T> T get(final String key, Class<T> aClass) {
        Assert.isTrue(!StringUtils.isEmpty(key), "key不能為空");
        return (T) redisTemplate.execute((RedisConnection connection) -> {
            Object nativeConnection = connection.getNativeConnection();
            Object result = null;
            if (nativeConnection instanceof JedisCommands) {
                result = ((JedisCommands) nativeConnection).get(key);
            }
            return (T) result;
        });
    }


    /**
     * @param millis 毫秒
     * @param nanos  納秒
     * @Title: seleep
     * @Description: 線程等待時間
     * @author yuhao.wang
     */
    private void seleep(long millis, int nanos) {
        try {
            Thread.sleep(millis, random.nextInt(nanos));
        } catch (InterruptedException e) {
            logger.info("獲取分布式鎖休眠被中斷:", e);
        }
    }

    public String getLockKeyLog() {
        return lockKeyLog;
    }

    public void setLockKeyLog(String lockKeyLog) {
        this.lockKeyLog = lockKeyLog;
    }

    public int getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(int expireTime) {
        this.expireTime = expireTime;
    }

    public long getTimeOut() {
        return timeOut;
    }

    public void setTimeOut(long timeOut) {
        this.timeOut = timeOut;
    }
}

調(diào)用方式:

public void redisLock3(int i) {
    RedisLock redisLock3 = new RedisLock(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock3.tryLock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 獲取到鎖要執(zhí)行的代碼塊
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}

對于這種種redis實現(xiàn)分布式鎖的方案還是有一個問題:就是你獲取鎖后執(zhí)行業(yè)務(wù)邏輯的代碼只能在redis鎖的有效時間之內(nèi),因為导帝,redis的key到期后會自動清除守谓,這個鎖就算釋放了。所以這個鎖的有效時間一定要結(jié)合業(yè)務(wù)做好評估舟扎。

源碼: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-data-redis-distributed-lock 工程

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末分飞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子睹限,更是在濱河造成了極大的恐慌譬猫,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羡疗,死亡現(xiàn)場離奇詭異染服,居然都是意外死亡,警方通過查閱死者的電腦和手機叨恨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門柳刮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人痒钝,你說我怎么就攤上這事秉颗。” “怎么了送矩?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵蚕甥,是天一觀的道長。 經(jīng)常有香客問我栋荸,道長菇怀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任晌块,我火速辦了婚禮爱沟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘匆背。我一直安慰自己呼伸,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布靠汁。 她就那樣靜靜地躺著蜂大,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蝶怔。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天兄墅,我揣著相機與錄音踢星,去河邊找鬼。 笑死隙咸,一個胖子當(dāng)著我的面吹牛沐悦,可吹牛的內(nèi)容都是我干的成洗。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼藏否,長吁一口氣:“原來是場噩夢啊……” “哼瓶殃!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起副签,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤遥椿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后淆储,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冠场,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年本砰,在試婚紗的時候發(fā)現(xiàn)自己被綠了碴裙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡点额,死狀恐怖舔株,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情还棱,我是刑警寧澤载慈,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站诱贿,受9級特大地震影響娃肿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜珠十,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一料扰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧焙蹭,春花似錦晒杈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至撰豺,卻和暖如春粪般,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背污桦。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工亩歹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓小作,卻偏偏與公主長得像亭姥,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子顾稀,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 分布式鎖的解決方式 基于數(shù)據(jù)庫表做樂觀鎖达罗,用于分布式鎖。(適用于小并發(fā)) 使用memcached的add()方法静秆,...
    xiaolyuh閱讀 15,515評論 6 17
  • 引題 比如在同一個節(jié)點上粮揉,兩個線程并發(fā)的操作A的賬戶,都是取錢诡宗,如果不加鎖滔蝉,A的賬戶可能會出現(xiàn)負數(shù),正確的方式是對...
    阿康8182閱讀 4,800評論 0 75
  • (轉(zhuǎn)自:https://blog.csdn.net/ugg/article/details/41894947/ ...
    WY長河閱讀 519評論 0 0
  • 目前實現(xiàn)分布式鎖的方式主要有數(shù)據(jù)庫塔沃、Redis和Zookeeper三種蝠引,本文主要闡述利用Redis的相關(guān)命令來實現(xiàn)...
    Aldeo閱讀 2,078評論 0 6
  • 《稗》 文/愚木 你看不見我纖瘦的身軀 我以為你是一只蝌蚪 在水里游在夢里游 被風(fēng)壓彎的腰枝 飛去了黃昏后 陽光驅(qū)...
    愚木的簡書閱讀 513評論 3 2