Redis-Shake【二】 Sync功能實(shí)現(xiàn)簡(jiǎn)介

Redis-Shake【一】簡(jiǎn)要介紹
Redis-Shake【二】 Sync功能實(shí)現(xiàn)簡(jiǎn)介

上一節(jié)簡(jiǎn)要介紹了一下Redis-Shake的使用尽楔,接下來詳細(xì)說一下Redis-Shake的sync模式逝她。

sync模式的基本原理是讓Redis-Shake模擬成一個(gè)redis slave,直接對(duì)源redis實(shí)例執(zhí)行sync/psync命令气破,該模式主要包含兩部分:全量同步(full)和增量同步(increment)。


Redis-Shake sync模式原理圖.png

sync模式的入口函數(shù)泽疆,遍歷所有的AddressList要门,創(chuàng)建SyncNode對(duì)象,使用多線程的方式批量執(zhí)行dbSyncer的sync方法冠蒋,開始同步邏輯

// 主要代碼在redis-shake/sync.go文件中羽圃,入口函數(shù)是Main()
func (cmd *CmdSync) Main() {
    type syncNode struct {
        id             int
        source         string
        sourcePassword string
        target         []string
        targetPassword string
    }

    // source redis number
    total := utils.GetTotalLink()
    syncChan := make(chan syncNode, total)
    cmd.dbSyncers = make([]*dbSyncer, total)
        // 遍歷SourceAddress ,每一個(gè)SourceAddress對(duì)應(yīng)一個(gè)syncNode對(duì)象抖剿,
    for i, source := range conf.Options.SourceAddressList {
        var target []string
        if conf.Options.TargetType == conf.RedisTypeCluster {
            target = conf.Options.TargetAddressList
        } else {
            // round-robin pick
            pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
            target = []string{conf.Options.TargetAddressList[pick]}
        }

        nd := syncNode{
            id:             i,
            source:         source,
            sourcePassword: conf.Options.SourcePasswordRaw,
            target:         target,
            targetPassword: conf.Options.TargetPasswordRaw,
        }
        syncChan <- nd
    }

    var wg sync.WaitGroup
    wg.Add(len(conf.Options.SourceAddressList))
    // SourceRdbParallel用來設(shè)置多個(gè)數(shù)據(jù)源時(shí)朽寞,同時(shí)最多可以處理多少個(gè)Redis數(shù)據(jù)源
    for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
        go func() {
            for {
                nd, ok := <-syncChan
                if !ok {
                    break
                }

                ds := NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword,
                    conf.Options.HttpProfile+i)
                cmd.dbSyncers[nd.id] = ds
                log.Infof("routine[%v] starts syncing data from %v to %v with http[%v]",
                    ds.id, ds.source, ds.target, ds.httpProfilePort)
                // run in routine
                go ds.sync()

                // wait full sync done 全量階段處理完時(shí)會(huì)close waitFull這個(gè)channel
                <-ds.waitFull

                wg.Done()
            }
        }()
    }

    wg.Wait()
    close(syncChan)

    // never quit because increment syncing is still running
    select {}
}

dbSyncer.sync 對(duì)源Redis執(zhí)行Sync/Psync命令,并依次執(zhí)行全量和增量同步

func (ds *dbSyncer) sync() {
    var sockfile *os.File
    if len(conf.Options.SockFileName) != 0 {
        sockfile = utils.OpenReadWriteFile(conf.Options.SockFileName)
        defer sockfile.Close()
    }
        
    // base.Status用于標(biāo)示sync所處的階段斩郎,waitfull full incr
    base.Status = "waitfull"
    var input io.ReadCloser
    var nsize int64
    //執(zhí)行sync/psync命令脑融,獲取連接
    if conf.Options.Psync {
        input, nsize = ds.sendPSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    } else {
        input, nsize = ds.sendSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, conf.Options.SourceTLSEnable)
    }
    defer input.Close()

    ...

    reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)

    // sync rdb 全量同步階段
    base.Status = "full"
    ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize, conf.Options.TargetTLSEnable)

    // sync increment 增量同步階段
    base.Status = "incr"
    close(ds.waitFull)
    ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, conf.Options.TargetTLSEnable)
}

全量同步階段支持并發(fā)寫入目標(biāo)Redis,通過對(duì)目標(biāo)Redis執(zhí)行restore命令來實(shí)現(xiàn)key數(shù)據(jù)的寫入

func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, tlsEnable bool) {
    // pipe是從源Redis接收缩宜、解析RDB文件的管道
    pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize)
    wait := make(chan struct{})
    // 全量階段寫入目標(biāo)Redis的操作是可以并發(fā)執(zhí)行的肘迎,可以通過Parallel設(shè)置并發(fā)數(shù)
    go func() {
        // 全部寫入Redis成功之后通過wait channel
        defer close(wait)
        var wg sync.WaitGroup
        wg.Add(conf.Options.Parallel)
        for i := 0; i < conf.Options.Parallel; i++ {
            go func() {
                defer wg.Done()
                c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster,
                    tlsEnable)
                defer c.Close()
                var lastdb uint32 = 0
                // 獲取源redis解析到數(shù)據(jù)
                for e := range pipe {
                    //執(zhí)行過濾DB的邏輯,對(duì)應(yīng)配置文件中的FilterDBBlacklist锻煌、FilterDBWhitelist
                    if filter.FilterDB(int(e.DB)) {
                        // db filter
                        ds.ignore.Incr()
                    } else {
                        ds.nentry.Incr()

                        ...
                        // 執(zhí)行過濾Key邏輯妓布,對(duì)應(yīng)FilterKeyBlacklist、FilterKeyWhitelist配置
                        if filter.FilterKey(string(e.Key)) == true {
                            // 1. judge if not pass filter key
                            ds.ignore.Incr()
                            continue
                        } else {
                            slot := int(utils.KeyToSlot(string(e.Key)))
                            if filter.FilterSlot(slot) == true {
                                // 2. judge if not pass filter slot
                                ds.ignore.Incr()
                                continue
                            }
                        }

                        log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
                        // 對(duì)目標(biāo)Redis執(zhí)行Restore命令把當(dāng)前key寫入到目標(biāo)redis
                        utils.RestoreRdbEntry(c, e)
                        log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key)
                    }
                }
            }()
        }

        wg.Wait()
    }()

    var stat *syncerStat
        
    for done := false; !done; {
        select {
        case <-wait:
            done = true
        case <-time.After(time.Second):
        }
        stat = ds.Stat()
        var b bytes.Buffer
        // fmt.Fprintf(&b, "dbSyncer[%v] total=%s - %12d [%3d%%]  entry=%-12d",
        fmt.Fprintf(&b, "dbSyncer[%v] total = %s - %12s [%3d%%]  entry=%-12d",
            ds.id, utils.GetMetric(nsize), utils.GetMetric(stat.rbytes), 100*stat.rbytes/nsize, stat.nentry)
        if stat.ignore != 0 {
            fmt.Fprintf(&b, "  ignore=%-12d", stat.ignore)
        }
        log.Info(b.String())
        metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
    }
    log.Infof("dbSyncer[%v] sync rdb done", ds.id)
}

增量寫入階段

func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) {
    readeTimeout := time.Duration(10) * time.Minute
    writeTimeout := time.Duration(10) * time.Minute
    isCluster := conf.Options.TargetType == conf.RedisTypeCluster
// 打開目標(biāo)Redis的連接
    c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable)
    defer c.Close()

    ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount)
    ds.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize)
    var sendId, recvId, sendMarkId atomic2.Int64 // sendMarkId is also used as mark the sendId in sender routine
  // 開啟一個(gè)協(xié)程宋梧,定期從Source中獲取當(dāng)前redis-shake模擬的slave的SlaveOffset值
    go func() {
        ...
        srcConn := utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword,
            readeTimeout, writeTimeout, false, conf.Options.SourceTLSEnable)
        ticker := time.NewTicker(10 * time.Second)
        for range ticker.C {
            offset, err := utils.GetFakeSlaveOffset(srcConn)
            if err != nil {
                ...
            } else {
                // 更新sourceOffset metric數(shù)據(jù)
                if ds.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil {
                    log.Errorf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }
        }
    }()
    // 開啟一個(gè)協(xié)程匣沼,用于處理目標(biāo)Redis返回的寫入成功的命令,主要用于統(tǒng)計(jì)successCount捂龄、successCountTotal释涛、delay
    go func() {
        var node *delayNode
        for {
            reply, err := c.Receive()

            recvId.Incr()
            id := recvId.Get() // receive id

            // print debug log of receive reply
            log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err)

            if conf.Options.Metric == false {
                continue
            }

            if err == nil {
                metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
            } else {
                metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
                if utils.CheckHandleNetError(err) {
                    log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
                        ds.id, conf.Options.Id, err.Error())
                } else {
                    log.Panicf("dbSyncer[%v] Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s",
                        ds.id, conf.Options.Id, err.Error())
                }
            }

            if node == nil {
                // non-blocking read from delay channel
                select {
                case node = <-ds.delayChannel:
                default:
                    // it's ok, channel is empty
                }
            }

            if node != nil {
                if node.id == id {
                    metric.GetMetric(ds.id).AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms
                    node = nil
                } else if node.id < id {
                    log.Panicf("dbSyncer[%v] receive id invalid: node-id[%v] < receive-id[%v]",
                        ds.id, node.id, id)
                }
            }
        }
    }()
//開啟一個(gè)攜程,用于接收源Redis的增量Redis命令
    go func() {
        var (
            lastdb        int32 = 0
            bypass              = false
            isselect            = false
            scmd          string
            argv, newArgv [][]byte
            err           error
            reject        bool
        )

        decoder := redis.NewDecoder(reader)

        log.Infof("dbSyncer[%v] Event:IncrSyncStart\tId:%s\t", ds.id, conf.Options.Id)

        for {
            ignorecmd := false
            isselect = false
            resp := redis.MustDecodeOpt(decoder)

            if scmd, argv, err = redis.ParseArgs(resp); err != nil {
                log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
            } else {
                metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)

                ...

                if scmd != "ping" {
                    if strings.EqualFold(scmd, "select") {
                        if len(argv) != 1 {
                            log.Panicf("dbSyncer[%v] select command len(args) = %d", ds.id, len(argv))
                        }
                        s := string(argv[0])
                        n, err := strconv.Atoi(s)
                        if err != nil {
                            log.PanicErrorf(err, "dbSyncer[%v] parse db = %s failed", ds.id, s)
                        }
                        bypass = filter.FilterDB(n)
                        isselect = true
                    } else if filter.FilterCommands(scmd) {
                        ignorecmd = true
                    }
                    if bypass || ignorecmd {
                        ds.nbypass.Incr()
                        // ds.SyncStat.BypassCmdCount.Incr()
                        metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                        log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
                        continue
                    }
                }
                // 過濾出寫命令倦沧,同時(shí)基于FilterKeyWhitelist枢贿、FilterKeyBlacklist對(duì)key做過濾
                newArgv, reject = filter.HandleFilterKeyWithCommand(scmd, argv)
                if bypass || ignorecmd || reject {
                    ds.nbypass.Incr()
                    metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
                    log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
                    continue
                }
            }

            ...
            // 把過濾、解析之后的數(shù)據(jù)放入到sendBuf channel中
            ds.sendBuf <- cmdDetail{Cmd: scmd, Args: newArgv}
        }
    }()
// 開啟一個(gè)協(xié)程把sendBuf channel中的命令寫入到目標(biāo)Redis中
    go func() {
        var noFlushCount uint
        var cachedSize uint64

        for item := range ds.sendBuf {
            length := len(item.Cmd)
            data := make([]interface{}, len(item.Args))
            for i := range item.Args {
                data[i] = item.Args[i]
                length += len(item.Args[i])
            }
            // 對(duì)目標(biāo)Redis執(zhí)行命令
            err := c.Send(item.Cmd, data...)
            ...
        }
    }()
 // 阻塞當(dāng)前協(xié)程刀脏。定時(shí)打印統(tǒng)計(jì)數(shù)據(jù)
    for lstat := ds.Stat(); ; {
        time.Sleep(time.Second)
        nstat := ds.Stat()
        var b bytes.Buffer
        fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id)
        fmt.Fprintf(&b, " +forwardCommands=%-6d", nstat.forward-lstat.forward)
        fmt.Fprintf(&b, " +filterCommands=%-6d", nstat.nbypass-lstat.nbypass)
        fmt.Fprintf(&b, " +writeBytes=%d", nstat.wbytes-lstat.wbytes)
        log.Info(b.String())
        lstat = nstat
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末局荚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌耀态,老刑警劉巖轮傍,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異首装,居然都是意外死亡创夜,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門仙逻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來驰吓,“玉大人,你說我怎么就攤上這事系奉∶史。” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵缺亮,是天一觀的道長翁涤。 經(jīng)常有香客問我,道長萌踱,這世上最難降的妖魔是什么葵礼? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮并鸵,結(jié)果婚禮上鸳粉,老公的妹妹穿的比我還像新娘。我一直安慰自己园担,他們只是感情好赁严,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著粉铐,像睡著了一般疼约。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蝙泼,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天程剥,我揣著相機(jī)與錄音,去河邊找鬼汤踏。 笑死织鲸,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的溪胶。 我是一名探鬼主播搂擦,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼哗脖!你這毒婦竟也來了瀑踢?” 一聲冷哼從身側(cè)響起扳还,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎橱夭,沒想到半個(gè)月后氨距,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡棘劣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年俏让,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茬暇。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡首昔,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出糙俗,到底是詐尸還是另有隱情勒奇,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布臼节,位于F島的核電站,受9級(jí)特大地震影響珊皿,放射性物質(zhì)發(fā)生泄漏网缝。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一蟋定、第九天 我趴在偏房一處隱蔽的房頂上張望粉臊。 院中可真熱鬧,春花似錦驶兜、人聲如沸扼仲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽屠凶。三九已至,卻和暖如春肆资,著一層夾襖步出監(jiān)牢的瞬間矗愧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工郑原, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留唉韭,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓犯犁,卻偏偏與公主長得像属愤,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子酸役,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 一住诸、Redis主從復(fù)制 主從復(fù)制:主節(jié)點(diǎn)負(fù)責(zé)寫數(shù)據(jù)驾胆,從節(jié)點(diǎn)負(fù)責(zé)讀數(shù)據(jù),主節(jié)點(diǎn)定期把數(shù)據(jù)同步到從節(jié)點(diǎn)保證數(shù)據(jù)的一致性...
    愛情小傻蛋閱讀 965評(píng)論 0 0
  • 1.1 資料 只壳,最好的入門小冊(cè)子俏拱,可以先于一切文檔之前看,免費(fèi)吼句。 作者Antirez的博客锅必,Antirez維護(hù)的R...
    JefferyLcm閱讀 17,056評(píng)論 1 51
  • 本篇就一下方面展開分析 如何使用主從復(fù)制? 主從復(fù)制的原理(重點(diǎn)是全量復(fù)制和部分復(fù)制惕艳、以及心跳機(jī)制) 實(shí)際應(yīng)用中需...
    lucode閱讀 994評(píng)論 0 5
  • 1.主從同步原理像MySQL一樣搞隐,Redis是支持主從同步的,而且也支持一主多從以及多級(jí)從結(jié)構(gòu)远搪。主從結(jié)構(gòu)劣纲,一是為了...
    碼出高效閱讀 2,182評(píng)論 0 1
  • 團(tuán)隊(duì)管理向來是一個(gè)組織最為頭痛的問題,尤其到了我們九零進(jìn)入職場(chǎng)谁鳍,好像管理我們這些年輕人越來越困難癞季,從我進(jìn)入教育機(jī)構(gòu)...
    多瀚Sean閱讀 117評(píng)論 0 0