Redis-Shake【一】簡(jiǎn)要介紹
Redis-Shake【二】 Sync功能實(shí)現(xiàn)簡(jiǎn)介
上一節(jié)簡(jiǎn)要介紹了一下Redis-Shake的使用尽楔,接下來詳細(xì)說一下Redis-Shake的sync模式逝她。
sync模式的基本原理是讓Redis-Shake模擬成一個(gè)redis slave,直接對(duì)源redis實(shí)例執(zhí)行sync/psync命令气破,該模式主要包含兩部分:全量同步(full)和增量同步(increment)。
Redis-Shake sync模式原理圖.png
sync模式的入口函數(shù)泽疆,遍歷所有的AddressList要门,創(chuàng)建SyncNode對(duì)象,使用多線程的方式批量執(zhí)行dbSyncer的sync方法冠蒋,開始同步邏輯
// 主要代碼在redis-shake/sync.go文件中羽圃,入口函數(shù)是Main()
func (cmd *CmdSync) Main() {
type syncNode struct {
id int
source string
sourcePassword string
target []string
targetPassword string
}
// source redis number
total := utils.GetTotalLink()
syncChan := make(chan syncNode, total)
cmd.dbSyncers = make([]*dbSyncer, total)
// 遍歷SourceAddress ,每一個(gè)SourceAddress對(duì)應(yīng)一個(gè)syncNode對(duì)象抖剿,
for i, source := range conf.Options.SourceAddressList {
var target []string
if conf.Options.TargetType == conf.RedisTypeCluster {
target = conf.Options.TargetAddressList
} else {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target = []string{conf.Options.TargetAddressList[pick]}
}
nd := syncNode{
id: i,
source: source,
sourcePassword: conf.Options.SourcePasswordRaw,
target: target,
targetPassword: conf.Options.TargetPasswordRaw,
}
syncChan <- nd
}
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
// SourceRdbParallel用來設(shè)置多個(gè)數(shù)據(jù)源時(shí)朽寞,同時(shí)最多可以處理多少個(gè)Redis數(shù)據(jù)源
for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
go func() {
for {
nd, ok := <-syncChan
if !ok {
break
}
ds := NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword,
conf.Options.HttpProfile+i)
cmd.dbSyncers[nd.id] = ds
log.Infof("routine[%v] starts syncing data from %v to %v with http[%v]",
ds.id, ds.source, ds.target, ds.httpProfilePort)
// run in routine
go ds.sync()
// wait full sync done 全量階段處理完時(shí)會(huì)close waitFull這個(gè)channel
<-ds.waitFull
wg.Done()
}
}()
}
wg.Wait()
close(syncChan)
// never quit because increment syncing is still running
select {}
}
dbSyncer.sync 對(duì)源Redis執(zhí)行Sync/Psync命令,并依次執(zhí)行全量和增量同步
func (ds *dbSyncer) sync() {
var sockfile *os.File
if len(conf.Options.SockFileName) != 0 {
sockfile = utils.OpenReadWriteFile(conf.Options.SockFileName)
defer sockfile.Close()
}
// base.Status用于標(biāo)示sync所處的階段斩郎,waitfull full incr
base.Status = "waitfull"
var input io.ReadCloser
var nsize int64
//執(zhí)行sync/psync命令脑融,獲取連接
if conf.Options.Psync {
input, nsize = ds.sendPSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
} else {
input, nsize = ds.sendSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
}
defer input.Close()
...
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
// sync rdb 全量同步階段
base.Status = "full"
ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize, conf.Options.TargetTLSEnable)
// sync increment 增量同步階段
base.Status = "incr"
close(ds.waitFull)
ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, conf.Options.TargetTLSEnable)
}
全量同步階段支持并發(fā)寫入目標(biāo)Redis,通過對(duì)目標(biāo)Redis執(zhí)行restore命令來實(shí)現(xiàn)key數(shù)據(jù)的寫入
func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, tlsEnable bool) {
// pipe是從源Redis接收缩宜、解析RDB文件的管道
pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize)
wait := make(chan struct{})
// 全量階段寫入目標(biāo)Redis的操作是可以并發(fā)執(zhí)行的肘迎,可以通過Parallel設(shè)置并發(fā)數(shù)
go func() {
// 全部寫入Redis成功之后通過wait channel
defer close(wait)
var wg sync.WaitGroup
wg.Add(conf.Options.Parallel)
for i := 0; i < conf.Options.Parallel; i++ {
go func() {
defer wg.Done()
c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster,
tlsEnable)
defer c.Close()
var lastdb uint32 = 0
// 獲取源redis解析到數(shù)據(jù)
for e := range pipe {
//執(zhí)行過濾DB的邏輯,對(duì)應(yīng)配置文件中的FilterDBBlacklist锻煌、FilterDBWhitelist
if filter.FilterDB(int(e.DB)) {
// db filter
ds.ignore.Incr()
} else {
ds.nentry.Incr()
...
// 執(zhí)行過濾Key邏輯妓布,對(duì)應(yīng)FilterKeyBlacklist、FilterKeyWhitelist配置
if filter.FilterKey(string(e.Key)) == true {
// 1. judge if not pass filter key
ds.ignore.Incr()
continue
} else {
slot := int(utils.KeyToSlot(string(e.Key)))
if filter.FilterSlot(slot) == true {
// 2. judge if not pass filter slot
ds.ignore.Incr()
continue
}
}
log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
// 對(duì)目標(biāo)Redis執(zhí)行Restore命令把當(dāng)前key寫入到目標(biāo)redis
utils.RestoreRdbEntry(c, e)
log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key)
}
}
}()
}
wg.Wait()
}()
var stat *syncerStat
for done := false; !done; {
select {
case <-wait:
done = true
case <-time.After(time.Second):
}
stat = ds.Stat()
var b bytes.Buffer
// fmt.Fprintf(&b, "dbSyncer[%v] total=%s - %12d [%3d%%] entry=%-12d",
fmt.Fprintf(&b, "dbSyncer[%v] total = %s - %12s [%3d%%] entry=%-12d",
ds.id, utils.GetMetric(nsize), utils.GetMetric(stat.rbytes), 100*stat.rbytes/nsize, stat.nentry)
if stat.ignore != 0 {
fmt.Fprintf(&b, " ignore=%-12d", stat.ignore)
}
log.Info(b.String())
metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
}
log.Infof("dbSyncer[%v] sync rdb done", ds.id)
}
增量寫入階段
func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) {
readeTimeout := time.Duration(10) * time.Minute
writeTimeout := time.Duration(10) * time.Minute
isCluster := conf.Options.TargetType == conf.RedisTypeCluster
// 打開目標(biāo)Redis的連接
c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable)
defer c.Close()
ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount)
ds.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize)
var sendId, recvId, sendMarkId atomic2.Int64 // sendMarkId is also used as mark the sendId in sender routine
// 開啟一個(gè)協(xié)程宋梧,定期從Source中獲取當(dāng)前redis-shake模擬的slave的SlaveOffset值
go func() {
...
srcConn := utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword,
readeTimeout, writeTimeout, false, conf.Options.SourceTLSEnable)
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
offset, err := utils.GetFakeSlaveOffset(srcConn)
if err != nil {
...
} else {
// 更新sourceOffset metric數(shù)據(jù)
if ds.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil {
log.Errorf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s",
ds.id, conf.Options.Id, err.Error())
}
}
}
}()
// 開啟一個(gè)協(xié)程匣沼,用于處理目標(biāo)Redis返回的寫入成功的命令,主要用于統(tǒng)計(jì)successCount捂龄、successCountTotal释涛、delay
go func() {
var node *delayNode
for {
reply, err := c.Receive()
recvId.Incr()
id := recvId.Get() // receive id
// print debug log of receive reply
log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err)
if conf.Options.Metric == false {
continue
}
if err == nil {
metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
} else {
metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
if utils.CheckHandleNetError(err) {
log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
ds.id, conf.Options.Id, err.Error())
} else {
log.Panicf("dbSyncer[%v] Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s",
ds.id, conf.Options.Id, err.Error())
}
}
if node == nil {
// non-blocking read from delay channel
select {
case node = <-ds.delayChannel:
default:
// it's ok, channel is empty
}
}
if node != nil {
if node.id == id {
metric.GetMetric(ds.id).AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms
node = nil
} else if node.id < id {
log.Panicf("dbSyncer[%v] receive id invalid: node-id[%v] < receive-id[%v]",
ds.id, node.id, id)
}
}
}
}()
//開啟一個(gè)攜程,用于接收源Redis的增量Redis命令
go func() {
var (
lastdb int32 = 0
bypass = false
isselect = false
scmd string
argv, newArgv [][]byte
err error
reject bool
)
decoder := redis.NewDecoder(reader)
log.Infof("dbSyncer[%v] Event:IncrSyncStart\tId:%s\t", ds.id, conf.Options.Id)
for {
ignorecmd := false
isselect = false
resp := redis.MustDecodeOpt(decoder)
if scmd, argv, err = redis.ParseArgs(resp); err != nil {
log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
} else {
metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)
...
if scmd != "ping" {
if strings.EqualFold(scmd, "select") {
if len(argv) != 1 {
log.Panicf("dbSyncer[%v] select command len(args) = %d", ds.id, len(argv))
}
s := string(argv[0])
n, err := strconv.Atoi(s)
if err != nil {
log.PanicErrorf(err, "dbSyncer[%v] parse db = %s failed", ds.id, s)
}
bypass = filter.FilterDB(n)
isselect = true
} else if filter.FilterCommands(scmd) {
ignorecmd = true
}
if bypass || ignorecmd {
ds.nbypass.Incr()
// ds.SyncStat.BypassCmdCount.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
continue
}
}
// 過濾出寫命令倦沧,同時(shí)基于FilterKeyWhitelist枢贿、FilterKeyBlacklist對(duì)key做過濾
newArgv, reject = filter.HandleFilterKeyWithCommand(scmd, argv)
if bypass || ignorecmd || reject {
ds.nbypass.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
continue
}
}
...
// 把過濾、解析之后的數(shù)據(jù)放入到sendBuf channel中
ds.sendBuf <- cmdDetail{Cmd: scmd, Args: newArgv}
}
}()
// 開啟一個(gè)協(xié)程把sendBuf channel中的命令寫入到目標(biāo)Redis中
go func() {
var noFlushCount uint
var cachedSize uint64
for item := range ds.sendBuf {
length := len(item.Cmd)
data := make([]interface{}, len(item.Args))
for i := range item.Args {
data[i] = item.Args[i]
length += len(item.Args[i])
}
// 對(duì)目標(biāo)Redis執(zhí)行命令
err := c.Send(item.Cmd, data...)
...
}
}()
// 阻塞當(dāng)前協(xié)程刀脏。定時(shí)打印統(tǒng)計(jì)數(shù)據(jù)
for lstat := ds.Stat(); ; {
time.Sleep(time.Second)
nstat := ds.Stat()
var b bytes.Buffer
fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id)
fmt.Fprintf(&b, " +forwardCommands=%-6d", nstat.forward-lstat.forward)
fmt.Fprintf(&b, " +filterCommands=%-6d", nstat.nbypass-lstat.nbypass)
fmt.Fprintf(&b, " +writeBytes=%d", nstat.wbytes-lstat.wbytes)
log.Info(b.String())
lstat = nstat
}
}