什么是分布式鎖
分布式鎖是用于解決分布式系統(tǒng)控制協(xié)調(diào)任務(wù)用的,保證分布式服務(wù)中寒锚,能達(dá)到控制任務(wù)的目的。
常見(jiàn)的分布式鎖
- 基于zookeeper
- 基于etcd
- 基于consul
- 基于redis
為啥用redis實(shí)現(xiàn)分布式鎖
首先因?yàn)楹?jiǎn)單违孝,redis中有個(gè)命令是setnx, 這個(gè)命令就能起到分布式鎖的作用刹前,已經(jīng)存在key就會(huì)設(shè)置失敗。
其次redis是一般項(xiàng)目中都有的組件雌桑,如果采用其他的喇喉,可能會(huì)增加成本。
用go開(kāi)發(fā)基于redis的分布式鎖
大致功能如下
- 設(shè)置過(guò)期時(shí)間
防止單點(diǎn)服務(wù)異常校坑,鎖不釋放 - 自動(dòng)續(xù)租
防止鎖超時(shí)問(wèn)題拣技,如設(shè)置過(guò)期時(shí)間2秒,但是中間的任務(wù)如執(zhí)行幾條sql語(yǔ)句用了5秒耍目,那么這個(gè)鎖就可能是無(wú)效了膏斤,很可能被其他服務(wù)給占用,導(dǎo)致sql任務(wù)的異常出現(xiàn)邪驮。 當(dāng)鎖住2/3過(guò)期時(shí)間還未過(guò)期的莫辨,我們可以對(duì)他續(xù)租。 這樣鎖就還是有效的耕捞。 - redisClient可以使用全局的衔掸,也可以有用戶傳入
首先我們用到最常見(jiàn)的go-redis庫(kù)
go get github.com/go-redis/redis/v8
核心代碼就200行烫幕。
全局redisClient實(shí)現(xiàn)
package goredislock
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
var GlobalRedisClient *redis.Client
var redisClientOnce sync.Once
func NewRedisClient(addr string) *redis.Client {
redisClientOnce.Do(func() {
GlobalRedisClient = redis.NewClient(&redis.Options{
Network: "tcp",
Addr: addr,
Password: "", //密碼
DB: 0, // redis數(shù)據(jù)庫(kù)
//連接池容量及閑置連接數(shù)量
PoolSize: 15, // 連接池?cái)?shù)量
MinIdleConns: 10, //好比最小連接數(shù)
//超時(shí)
DialTimeout: 5 * time.Second, //連接建立超時(shí)時(shí)間
ReadTimeout: 3 * time.Second, //讀超時(shí)俺抽,默認(rèn)3秒, -1表示取消讀超時(shí)
WriteTimeout: 3 * time.Second, //寫(xiě)超時(shí)较曼,默認(rèn)等于讀超時(shí)
PoolTimeout: 4 * time.Second, //當(dāng)所有連接都處在繁忙狀態(tài)時(shí)磷斧,客戶端等待可用連接的最大等待時(shí)長(zhǎng),默認(rèn)為讀超時(shí)+1秒。
//閑置連接檢查包括IdleTimeout弛饭,MaxConnAge
IdleCheckFrequency: 60 * time.Second, //閑置連接檢查的周期冕末,默認(rèn)為1分鐘,-1表示不做周期性檢查侣颂,只在客戶端獲取連接時(shí)對(duì)閑置連接進(jìn)行處理档桃。
IdleTimeout: 5 * time.Minute, //閑置超時(shí)
MaxConnAge: 0 * time.Second, //連接存活時(shí)長(zhǎng),從創(chuàng)建開(kāi)始計(jì)時(shí)憔晒,超過(guò)指定時(shí)長(zhǎng)則關(guān)閉連接藻肄,默認(rèn)為0,即不關(guān)閉存活時(shí)長(zhǎng)較長(zhǎng)的連接
//命令執(zhí)行失敗時(shí)的重試策略
MaxRetries: 0, // 命令執(zhí)行失敗時(shí)拒担,最多重試多少次嘹屯,默認(rèn)為0即不重試
MinRetryBackoff: 8 * time.Millisecond, //每次計(jì)算重試間隔時(shí)間的下限,默認(rèn)8毫秒从撼,-1表示取消間隔
MaxRetryBackoff: 512 * time.Millisecond, //每次計(jì)算重試間隔時(shí)間的上限州弟,默認(rèn)512毫秒,-1表示取消間隔
})
pong, err := GlobalRedisClient.Ping(context.Background()).Result()
if err != nil {
log.Fatal(fmt.Errorf("redis connect error:%s", err))
}
log.Println(pong)
})
return GlobalRedisClient
}
分布式鎖實(shí)現(xiàn)
package goredislock
import (
"context"
"log"
"time"
"github.com/go-redis/redis/v8"
)
const defaultExpireTime = time.Second * 10
// 分布式鎖
type Locker struct {
key string // redis key
unlock bool // 是否已經(jīng)解鎖 低零,解鎖則不用續(xù)租
incrScript *redis.Script // lua script
option options // 可選項(xiàng)
}
type Options func(o *options)
type options struct {
expire time.Duration // expiration time ,默認(rèn)是10秒
redisClient *redis.Client // redis 實(shí)例婆翔, 可以傳, 不傳就得用全局的
ctx context.Context
}
// 續(xù)租的時(shí)候 獲得和設(shè)置過(guò)期原子操作.
const incrLua = `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1],ARGV[2])
else
return '0'
end`
func NewLocker(key string, opts ...Options) *Locker {
var lock = &Locker{
key: key,
incrScript: redis.NewScript(incrLua),
}
for _, opt := range opts {
opt(&lock.option)
}
// 沒(méi)設(shè)置過(guò)期時(shí)間
if lock.option.expire == 0 {
lock.option.expire = defaultExpireTime
}
// 未設(shè)置redis 實(shí)例
if lock.option.redisClient == nil {
lock.option.redisClient = GlobalRedisClient
}
// 未設(shè)置context
if lock.option.ctx == nil {
lock.option.ctx = context.Background()
}
return lock
}
// 過(guò)期選項(xiàng)
func WithExpire(expire time.Duration) Options {
return func(o *options) {
o.expire = expire
}
}
// redisClient 選項(xiàng),可以每次都傳毁兆,如果沒(méi)傳浙滤,就用全局都
func WithRedisClient(redisClient *redis.Client) Options {
return func(o *options) {
o.redisClient = redisClient
}
}
func WithContext(ctx context.Context) Options {
return func(o *options) {
o.ctx = ctx
}
}
// 第一個(gè)返回:返回鎖,方便鏈?zhǔn)讲僮?// 第二個(gè) 返回結(jié)果
func (this *Locker) Lock() (*Locker, bool) {
boolcmd := this.option.redisClient.SetNX(context.Background(), this.key, "1", this.option.expire)
if ok, err := boolcmd.Result(); err != nil || !ok {
return this, false
}
this.expandLockTime()
return this, true
}
// 續(xù)租
func (this *Locker) expandLockTime() {
sleepTime := this.option.expire * 2 / 3
go func() {
for {
time.Sleep(sleepTime)
if this.unlock {
break
}
this.resetExpire()
}
}()
}
// 重新設(shè)置過(guò)期時(shí)間
func (this *Locker) resetExpire() {
cmd := this.incrScript.Run(this.option.ctx, this.option.redisClient, []string{this.key}, 1, this.option.expire.Seconds())
v, err := cmd.Result()
log.Printf("key=%s ,續(xù)期結(jié)果:%v,%v\n", this.key, err, v)
}
// 釋放鎖 干完活后釋放鎖
func (this *Locker) Unlock() {
this.unlock = true
this.option.redisClient.Del(this.option.ctx, this.key)
}
倉(cāng)庫(kù)地址: https://github.com/cr-mao/goredislock