DRC實踐

DRC簡介

DRC(Data Replication Center)是我在阿里聽過的一個概念横腿,它的業(yè)務域是支持異構數(shù)據(jù)庫實時同步锭亏,數(shù)據(jù)記錄變更訂閱服務诉瓦。為跨域實時同步姐呐、實時增量分發(fā)、異地雙活廉涕、分庫分表等場景提供產品級的解決方案泻云。支持異地多活、大數(shù)據(jù)實時抽取狐蜕、搜索實時更新數(shù)據(jù)宠纯、數(shù)據(jù)表結構重構、多視圖數(shù)據(jù)存儲层释、大屏實時刷新等婆瓜。DRC在阿里服務了數(shù)萬個實時通道,已經成為阿里的基礎設施贡羔,重要性不言而喻廉白。
DRC需要保障數(shù)據(jù)庫的事務一致性,包括DDL(表結構變更)也可以進行同步或過濾乖寒。而DBA天生就在這個坑里猴蹂,絕對不能讓主備不一致、或事務不完整宵统,哪怕只是一條數(shù)據(jù)晕讲。而且DBA迫切希望以后不用通知下游了覆获,讓DRC自動適配主備切換或拆庫马澈。
DRC必須具備的三大特性:1)穩(wěn)定性,所有環(huán)節(jié)必須支持HA弄息; 2)實時性(<1S) 3)一致性痊班,數(shù)據(jù)同步前后必須保證數(shù)據(jù)的一致性。

我們公司對DRC的需求場景:

  • MySQL原生復制
  • 大數(shù)據(jù)實時抽取
  • 搜索實時數(shù)據(jù)
  • 數(shù)據(jù)表結構重構(拆表摹量、合表等)
  • 多視圖數(shù)據(jù)存儲
  • 大屏實時刷新
  • 緩存更新
  • 支持Oracle涤伐、mysql兩種數(shù)據(jù)源數(shù)據(jù)相互轉換

技術選型預研

數(shù)據(jù)同步中間件開源的主要有canal、databus缨称、kettle凝果、otter四種,下面進行簡單的對比說明睦尽。
canal:canal是阿里巴巴旗下的一款開源項目器净,純Java開發(fā)〉狈玻基于數(shù)據(jù)庫增量日志解析山害,提供增量數(shù)據(jù)訂閱&消費纠俭,目前主要支持了MySQL。
databus:2011年在LinkedIn正式進入生產系統(tǒng)浪慌,2013年開源冤荆,Java開發(fā)。databus是一個實時的权纤、可靠的钓简、支持事務的、保持一致性的數(shù)據(jù)變更抓取系統(tǒng)汹想,同canal也是監(jiān)聽mysql的binlog涌庭。 Databus通過挖掘數(shù)據(jù)庫日志的方式,將數(shù)據(jù)庫變更實時欧宜、可靠的從數(shù)據(jù)庫拉取出來坐榆,業(yè)務可以通過定制化client實時獲取變更。
kettle: kettle可以實現(xiàn)從不同數(shù)據(jù)源(excel冗茸、數(shù)據(jù)庫席镀、文本文件等)獲取數(shù)據(jù),然后將數(shù)據(jù)進行整合夏漱、轉換處理豪诲,可以再將數(shù)據(jù)輸出到指定的位置(excel、數(shù)據(jù)庫挂绰、文本文件)等屎篱;是B/S架構,多用于數(shù)倉作業(yè)葵蒂。
otter:阿里巴巴旗下的另一款開源項目交播,始于中美數(shù)據(jù)同步需求,純Java開發(fā)践付∏厥浚可以理解為canal+ETL,對數(shù)據(jù)抽取進行了擴展永高,加入自由門隧土、反查等功能,拓展了已經無法從binlog獲取的數(shù)據(jù)來源命爬。同時提供頁面的ETL編輯配置功能曹傀,方便快速實現(xiàn)帶邏輯的業(yè)務數(shù)據(jù)同步。

Otter的功能更加強大饲宛,滿足DRC所有特性需求皆愉。在otter上進行二次開發(fā)成本是最低的。所以我們公司選擇基于otter進行二次開發(fā),打造內部的DRC系統(tǒng)亥啦。

DRC架構

image.png

數(shù)據(jù)同步過程可以分為Select-->Extract炭剪、Transform-->Load四個過程,也就是上圖中的S翔脱、E奴拦、T、L届吁,通過將這4個步驟進行服務拆分错妖,每個服務都具有自己的線程池。通過S疚沐、L過程的串型暂氯,保證數(shù)據(jù)的一致性,E亮蛔、T過程的并行提升系統(tǒng)處理的性能痴施。

滑動窗口

image.png

說明:

  1. otter通過select模塊串行獲取canal的批數(shù)據(jù),注意是串行獲取究流,每批次獲取到的數(shù)據(jù)辣吃,就會有一個全局標識,otter里稱之為processId.
  2. select模塊獲取到數(shù)據(jù)后芬探,將其傳遞給后續(xù)的ETL模型. 這里E和T模塊會是一個并行處理
  3. 將數(shù)據(jù)最后傳遞到Load時神得,會根據(jù)每批數(shù)據(jù)對應的processId,按照順序進行串行加載偷仿。 ( 比如有一個processId=2的數(shù)據(jù)先到了Load模塊哩簿,但會阻塞等processId=1的數(shù)據(jù)Load完成后才會被執(zhí)行)

簡單一點說,Select/Load模塊會是一個串行機制來保證binlog處理的順序性酝静,Extract/Transform會是一個并行节榜,加速傳輸效率。

并行度
類似于tcp滑動窗口大小形入,比如整個滑動窗口設置了并行度為5時全跨,只有等第一個processId Load完成后缝左,第6個Select才會去獲取數(shù)據(jù)亿遂。

Otter源碼解讀

otter核心model關系圖

image.png

Pipeline:從源端到目標端的整個過程描述,主要由一些同步映射過程組成渺杉∩呤可以對應為一個數(shù)據(jù)庫(當然也可以一個實例上的多個庫配同一個pipeline)。
Channel:同步通道是越,單向同步中一個Pipeline組成耳舅,在雙向同步中由兩個Pipeline組成。一個數(shù)據(jù)庫實例一個Channel,一個channel對應一個canal浦徊。
DataMediaPair:根據(jù)業(yè)務表定義映射關系馏予,比如源表和目標表,字段映射盔性,字段組等霞丧。
DataMedia : 抽象的數(shù)據(jù)介質概念,可以理解為數(shù)據(jù)表/mq隊列定義
DataMediaSource : 抽象的數(shù)據(jù)介質源信息冕香,補充描述DataMedia
ColumnPair : 定義字段映射關系

otter工程結構如下

image.png

包含三部分:Share | Node | Manager蛹尝。 其中Share是Node和Manager共享工程,并不是獨立部署的節(jié)點悉尾。Node和Manager是獨立部署的突那。
Node:獨立部署的節(jié)點,執(zhí)行SETL過程的服務節(jié)點构眯,擁有獨立的JVM愕难,數(shù)據(jù)同步的過程實際上都發(fā)生在Node之間。
Manager:管理的節(jié)點惫霸,邏輯上只有一個(一個Manager管理多個Node節(jié)點)务漩,如果不考慮HA的話。負責管理同步的數(shù)據(jù)定義它褪,包括數(shù)據(jù)源饵骨、Channel、PipeLine茫打、數(shù)據(jù)映射等居触,各個Node節(jié)點從Manager處獲取并執(zhí)行這些信息。另外還有監(jiān)控等信息老赤。

Share各個子系統(tǒng)的說明:

  • Common: 公共內容定義
  • Arbitrate: 用于Manager與Node之間轮洋、Node與Node之間的調度、S.E.T.L幾個過程的調度等抬旺;
  • Communication: 數(shù)據(jù)傳輸?shù)牡讓颖子瑁蠈拥腜ipe、一些調度等都是依賴于Communication的开财,簡單點說它負責點對點的Event發(fā)送和接收汉柒,封裝了dubbo、rmi兩種方式的調用
  • Etl:實際上并不負責ETL的具體實現(xiàn)责鳍,只是一些接口&數(shù)據(jù)結構的定義而已碾褂,包括開放給用戶自定義Extract階段處理邏輯的接口,具體的實現(xiàn)在Node里面历葛。

Node各個子系統(tǒng)的說明:

  • Common:公共內容定義
  • Canal: Canal的封裝正塌,Otter采用的是Embed的方式引入Canal(Canal有Embed和獨立運行兩種模式)
  • Deployer:內置Jetty的啟動
  • Etl: S.E.T.L 調度、處理的實現(xiàn),是Otter最復雜乓诽、也是最核心的部分帜羊。

Manager各個子系統(tǒng)的說明:

  • Biz:管理頁面對應的業(yè)務邏輯實現(xiàn),包含我們公司web工程規(guī)范中的manager鸠天、dal兩個工程的內容逮壁。
  • Web:頁面請求入口,執(zhí)行controller邏輯粮宛。otter采用的是阿里內部的webx框架窥淆。
  • Deployer:內置Jetty的啟動,同時包含頁面的template等

核心類設計

Communication的設計

image.png

比較關鍵的部分圖中已經使用注釋的方式進行了說明巍杈。理解Communication的關鍵在于Event的模式+EndPoint方式進行遠程調用忧饭。

Node-common關于Node節(jié)點管理的機制

image.png

節(jié)點是在Manager上面管理的,但是Node節(jié)點實際上是需要與其他的Node節(jié)點及manager通訊的筷畦,因此NodeList(Group內的其他節(jié)點)的信息在Node節(jié)點是需要相互知道的词裤。 Otter采用的是類似于Lazy+cache的模式管理的。即:
1)真正使用到的時候再考慮去Manager節(jié)點取過來鳖宾;
2)取過來以后暫存到本地內存吼砂,但是伴隨著一個失效機制(失效機制的檢查是不單獨占用線程的,這個同學們可以注意一下鼎文,設計框架的時候需要盡可能做到這一點)

PipeLine設計

image.png

PipeLine主要的操作就是Put/Get渔肩,對于S-->E、T-->L拇惋,還有節(jié)點內部的處理周偎,可以使用基于Memory的PipeLine,對于遠程的節(jié)點數(shù)據(jù)傳輸(比如E-->T的跨節(jié)點傳輸)撑帖,使用的是RPC或者Http蓉坎,這里面需要注意的幾個事項,圖中已經做了說明:

  1. 數(shù)據(jù)傳輸實際上是Pull的模式胡嘿,并不是Push的模式蛉艾,即數(shù)據(jù)準備好以后等待另外一端需要的時候再傳輸;
  2. 數(shù)據(jù)的序列化采用的是ProtoBuf(https://code.google.com/p/protobuf/)衷敌,也可以做加密傳輸扬舒,但是使用的Key是Path肺樟,一般性的安全需求可以滿足鸭丛,但是如果傳輸?shù)臄?shù)據(jù)是非常敏感的眉抬,還是用專線的好;
  3. 壓縮也是在Pipe這一層做掉的瞒爬,具體就不展開了。

SETL中的Select過程

image.png

每個SETL過程的設計基本上都是由xxxTask + OtterXXXFactroy + OtterXXX的設計方式,但是細節(jié)上差別比較大侧但。
Select過程是需要串行的(需要保證順序性)矢空,但是為了盡可能提高效率,將Get和ACK(Canal的滑動窗口)分在兩個線程里面去做禀横,依據(jù)的假定就是絕大多數(shù)數(shù)據(jù)是不需要回滾的屁药,但是一旦回滾了,代價就比較大(Otter的官方文檔有相關的說明)柏锄。Otter采用的是at last once策略酿箭,不丟失一條消息,但是異常場景下可能存在消息重發(fā)趾娃,因為有數(shù)據(jù)庫有主鍵限制缭嫡,對數(shù)據(jù)庫同步沒影響,業(yè)務使用方需要自己保證冪等抬闷。

SETL中的Extract過程

image.png

這里的OtterExtractorFactory與OtterExtractor并不是選擇一個合適的Extractor處理妇蛀,而是搭建成一個職責鏈(但設計上并不完全是,個人覺得設計成職責鏈更合適一些)笤成,每個Extractor順序處理评架。

SETL中的Transform過程

image.png

Transform實際上解決的就是異構數(shù)據(jù)的映射,在Transform這個節(jié)點做相應的轉換炕泳。

SETL中的Load過程

image.png

1)Load過程是并發(fā)執(zhí)行的纵诞,但是受Weight的控制(并非全局的);
2)在Load過程中包含了打標記的過程(與Select過程是呼應的培遵,即Load打的標記會被Select過程所識別挣磨,然后不會同步回去了,這一點官方文檔有相關說明

SETL時序

image.png

我們做的大改造

模型擴展

在支撐業(yè)務重構的數(shù)據(jù)表重構時荤懂,業(yè)務方的需求可以歸為下面4類:

  • 多表中的多條記錄 合成 一個表中的一條記錄(N:1)
  • 一個表中的一條記錄 拆成 多表中的多條記錄(1:N)
  • 單表中的多個字段 合成 一個字段(n:1)
  • 單表中的1個字段 拆成 多個字段(1:n)
    上面分別從“表”茁裙、“列”兩個維度進行“拆”和“合”,數(shù)據(jù)重構其實還有一個維度节仿,就是“行”晤锥,一行記錄拆分成多行、多行記錄合成一行廊宪。
    由于“行”維度的拆分需求比較少矾瘾,這一次沒有對這種需求進行支持。但是可以部分參考“表”箭启、“列”的拆合壕翩,已經在一定層度上支持了。其中一行記錄拆分成多行需要放棄原來的行主鍵(同時update傅寡、delete需要在Extract階段進行主鍵修復)放妈,像
    “一個表中的一條記錄 拆成 多表中的多條記錄”北救,只不過這個“多表”映射為“同一個表”;其中多行記錄合成一行可以參考“多表中的多條記錄 合成 一個表中的一條記錄”思想去做芜抒,Extract階段的反查對象變成自己珍策。

原生的otter設計是為了一個表到另一個表的同步,支持對數(shù)據(jù)的修改宅倒,支持簡單的列名的轉化(支持字段的刪減)攘宙。沒有考慮一個表到多個表的同步,以及字段的新增拐迁。

一個表到多個表的同步支持
otter設計是為了一個表到另一個表的同步蹭劈,始終是一條binlog記錄。而一個表到多個表的同步线召,需要將一條binlog記錄在某個階段進行copy分發(fā)铺韧,我們將這個階段選為Transform階段。在E階段灶搜,各個目標表根據(jù)自己對數(shù)據(jù)的要求進行加工處理祟蚀,E階段取的是各個目標表處理結果的并集。在Transform階段根據(jù)每個目標表的字段需求割卖,各取所需前酿,生成多條記錄。Transform的拆分邏輯如下:

    for (EventData eventData : rowBatch.getDatas()) {
            // 處理eventData
            Long tableId = eventData.getTableId();
            Pipeline pipeline = configClientService.findPipeline(identity.getPipelineId());
            
            List<DataMediaPair> dataMediaPairs = ConfigHelper.findDataMediaPairByMediaId(pipeline, tableId);
            
            List<Object> itemList = new ArrayList<Object>();
            Object item = null;
            
            for (DataMediaPair pair : dataMediaPairs) {
                //每個目標庫數(shù)據(jù)源過濾不屬于自己該處理的數(shù)據(jù)
                if (!pair.getSource().getId().equals(tableId)) { // 過濾tableID不為源的同步
                    continue;
                }
                鹏溯。罢维。。丙挽。肺孵。。
                //每個目標庫只處理路由到自己的數(shù)據(jù)
                    boolean isSelfNameSpace = false;
                    for(String value:ConfigHelper.parseMode(pair.getTarget().getNamespace()).getMultiValue()){
                        if(value.equalsIgnoreCase(slotNode.getDataSourceName())){
                            isSelfNameSpace = true;
                        }
                    }
                    if(isSelfNameSpace==false){
                        continue;
                    }
                    
                    OtterTransformer translate = lookup(pair.getSource(), pair.getTarget());
                    // 進行轉化
                    item = translate.transform(eventData, new OtterTransformerContext(identity, pair, pipeline),slotNode);
                }else{
                    OtterTransformer translate = lookup(pair.getSource(), pair.getTarget());
                    // 進行轉化
                    item = translate.transform(eventData, new OtterTransformerContext(identity, pair, pipeline));
                }
                if(item != null){
                    itemList.add(item);
                }
            }
           
            if (itemList.size() == 0) {
                continue;
            }
            // 合并結果
            merge(identity, result, itemList);

        }

新增字段的支持
原先的otter支持原表到目標表映射過程中的字段刪減和字段內容修改(在Extract階段可以通過嵌入腳本進行字段內容修改)颜阐,對字段的新增沒有支持平窘。
我們通過對字段映射頁面進行擴展,支持手動新增字段凳怨,然后在Extract階段對新增字段進行內容填充瑰艘,完成對新增字段的支持。

image.png

通過在原表增加目標表不存在的字段肤舞,完成“虛擬”字段填入紫新,在后續(xù)步驟完成“虛擬”字段到實字段的映射配置。在Extract階段對“虛擬”增字段進行內容填充李剖,將“虛擬”變成實字段芒率。

分庫分表支持

原生的otter是不支持分庫分表的,分庫分表已經不屬于Otter數(shù)據(jù)同步的業(yè)務域篙顺,但是分庫分表的支持又是大公司數(shù)據(jù)同步過程中不可避免偶芍。也可能是otter開源版本把分庫分表的支持給閹割了充择。
我們公司業(yè)務在改造過程中,涉及單庫單表到分庫分表的數(shù)據(jù)同步需求。
1)我們對DataMediaPair進行了擴展腋寨,支持簡單分庫分表配置聪铺。


image.png

2)我們在transform階段進行了邏輯擴展化焕。當表的轉換映射中目標表是需要分庫分表時萄窜,這時會加載目標表的分庫分表路由器(分庫分表的庫表是通過解析pipeline下面所有目標表配置而來,分表算法由用戶的配置而來)撒桨。

    for (EventData eventData : rowBatch.getDatas()) {
            // 處理eventData
            Long tableId = eventData.getTableId();
            Pipeline pipeline = configClientService.findPipeline(identity.getPipelineId());
            
            List<DataMediaPair> dataMediaPairs = ConfigHelper.findDataMediaPairByMediaId(pipeline, tableId);
            
            List<Object> itemList = new ArrayList<Object>();
            Object item = null;
            
            for (DataMediaPair pair : dataMediaPairs) {
              //每個目標庫數(shù)據(jù)源過濾不屬于自己該處理的數(shù)據(jù)
                if (!pair.getSource().getId().equals(tableId)) { // 過濾tableID不為源的同步
                    continue;
                }
                //如果映射的目標表是分庫分表
                if(true == pair.getIsTargetSharingJDBC()){
                    //根據(jù)管道信息獲取路由器
                    SlotRouter<String> slotRouter = configClientService.findSlotRouterByPipelineId(identity.getPipelineId(), pair.getId());
                    //獲取分表鍵的值
                    List<EventColumn> allColumns = new ArrayList<EventColumn>();
                    allColumns.addAll(eventData.getKeys());
                    allColumns.addAll(eventData.getColumns());
                    String shardValue = null;
                    int shardValueType = 0;
                    //獲取分庫分表路由字段的值
                    for(EventColumn eventColumn : allColumns){
                        if(eventColumn.getColumnName().equalsIgnoreCase(pair.getSharingColumn())){
                            shardValue = eventColumn.getColumnValue();
                            shardValueType = eventColumn.getColumnType();
                            break;
                        }
                    }
                    //如果分庫分表字段為null
                    if(shardValue == null){
                        throw new RuntimeException("分表字段:{"+pair.getSharingColumn()+"}為null查刻,eventData:{"+eventData+"}");
                    }
                    SlotNode slotNode = slotRouter.slotRouter(shardValue,shardValueType);
                    
                    //每個目標庫只處理路由到自己的數(shù)據(jù)
                    boolean isSelfNameSpace = false;
                    for(String value:ConfigHelper.parseMode(pair.getTarget().getNamespace()).getMultiValue()){
                        if(value.equalsIgnoreCase(slotNode.getDataSourceName())){
                            isSelfNameSpace = true;
                        }
                    }
                    if(isSelfNameSpace==false){
                        continue;
                    }
                    
                    OtterTransformer translate = lookup(pair.getSource(), pair.getTarget());
                    // 進行轉化
                    item = translate.transform(eventData, new OtterTransformerContext(identity, pair, pipeline),slotNode);
                }else{
                    OtterTransformer translate = lookup(pair.getSource(), pair.getTarget());
                    // 進行轉化
                    item = translate.transform(eventData, new OtterTransformerContext(identity, pair, pipeline));
                }
                if(item != null){
                    itemList.add(item);
                }
            }
           
            if (itemList.size() == 0) {
                continue;
            }
            // 合并結果
            merge(identity, result, itemList);

        }
        //構建每個映射的路由算法緩存
        slotRouterCache = new RefreshMemoryMirror<String, SlotRouter>(DEFAULT_PERIOD, new ComputeFunction<String, SlotRouter>() {
            public SlotRouter apply(String key, SlotRouter oldValue) {
                if(StringUtils.isBlank(key) || key.split(Pipeline_Pair_Connector).length != 2){
                    return null;
                }
                Long pipelineId = Long.parseLong(key.split(Pipeline_Pair_Connector)[0]);
                Long pairId = Long.parseLong(key.split(Pipeline_Pair_Connector)[1]);
                Pipeline pipeline = findPipeline(pipelineId);
                if(pipeline == null){
                    return null;
                }
                DataMediaPair dataMediaPair = null;
                for(DataMediaPair pair :pipeline.getPairs()){
                    if(pair.getId().equals(pairId)){
                        dataMediaPair = pair;
                    }
                }
                if(dataMediaPair == null){
                    return null;
                }
                //為pipeline下該pair對應目標庫表構建路由器
                Set<SlotNode> slotSet = new TreeSet<SlotNode>();
                String namespace = dataMediaPair.getTarget().getNamespace();
                String tableName = dataMediaPair.getTarget().getName();
                String nameSpacePrefix = ConfigHelper.getPrefix(namespace);
                String tableNamePrefix = ConfigHelper.getPrefix(tableName);
                if(nameSpacePrefix == null || tableNamePrefix == null){
                    return null;
                }
                
                
                for(DataMediaPair pair : pipeline.getPairs()){
                    String namespaceTemp = pair.getTarget().getNamespace();
                    String tableNameTemp = pair.getTarget().getName();
                    String nameSpaceTempPrefix = ConfigHelper.getPrefix(namespaceTemp);
                    String tableNameTempPrefix = ConfigHelper.getPrefix(tableNameTemp);
                    if(nameSpacePrefix.equals(nameSpaceTempPrefix) && tableNamePrefix.equals(tableNameTempPrefix)){
                        ModeValue dataSourceNames = ConfigHelper.parseMode(namespaceTemp);
                        ModeValue tableNames = ConfigHelper.parseMode(tableNameTemp);
                        if(dataSourceNames == null || tableNames == null){
                            continue;
                        }
                        DbMediaSource dbMediaSource = (DbMediaSource) pair.getTarget().getSource(); 
                        for(String dataSourceNameInPair : dataSourceNames.getMultiValue()){
                            for(String tableNameInPair : tableNames.getMultiValue()){
                                SlotNode slotNode = new SlotNode(dataSourceNameInPair,tableNameInPair);
                                slotNode.setUrl(dbMediaSource.getUrl());
                                slotNode.setDriver(dbMediaSource.getDriver());
                                slotNode.setEncode(dbMediaSource.getEncode());
                                slotNode.setGmtCreate(dbMediaSource.getGmtCreate());
                                slotNode.setGmtModified(dbMediaSource.getGmtModified());
                                slotNode.setId(dbMediaSource.getId());
                                slotNode.setName(dbMediaSource.getName());
                                slotNode.setPassword(dbMediaSource.getPassword());
                                slotNode.setProperties(dbMediaSource.getProperties());
                                slotNode.setType(dbMediaSource.getType());
                                slotNode.setUsername(dbMediaSource.getUsername());
                                slotSet.add(slotNode);
                            }
                        }
                    }
                }
                SlotRouter slotRouter = null; 
                //獲取路由算法的參數(shù),生成具體的路由算法
                Integer slotAlgorithm = dataMediaPair.getSlotAlgorithm();
                Long tableBalanceSize = dataMediaPair.getTableBalanceSize();
                if(null== slotAlgorithm || SlotAlgorithmEnum.MODULO_BALANCE.getValue() == slotAlgorithm){
                    slotRouter = new ModuloBalanceSlotRouterBuilder(slotSet).build();
                }else if(SlotAlgorithmEnum.QUOTIENT_BALANCE.getValue()==slotAlgorithm &&  tableBalanceSize != null){
                    slotRouter = new QuotientBalanceSlotRouterBuilder(slotSet,tableBalanceSize).build();
                }else{
                    throw new RuntimeException("目前暫不支持該算法或者算法參數(shù)異常");
                }
                return slotRouter;
            }
        });

自由門集中控制

數(shù)據(jù)庫的binlog也有刪除策略凤类,不可能永久保存所有的binlog穗泵。如何遷移binlog已經不存在的存量數(shù)據(jù)?
otter針對這種場景需求設計了自由門模塊谜疤。詳見otter中的自由門說明 佃延。
自由門的原理如下:
a. 基于otter系統(tǒng)表retl_buffer,插入特定的數(shù)據(jù)夷磕,包含需要同步的表名履肃,pk信息。
b. otter系統(tǒng)感知后會根據(jù)表名和pk提取對應的數(shù)據(jù)(整行記錄)坐桩,和正常的增量同步一起同步到目標庫尺棋。
原先需要在每一個遷移的庫所在實例建立retl.retl_buffer庫表(存量數(shù)據(jù)遷移控制表)。當遷移的庫比較多時绵跷,在多個實例上面分別建立retl庫膘螟,不利于統(tǒng)一控制,同時給庫表元數(shù)據(jù)管理帶來一定的難度碾局。為了后續(xù)DRC的統(tǒng)一快捷運維和減少運維成本荆残,我們對自由門進行集中控制(不同實例上的數(shù)據(jù)遷移由同一個retl.retl_buffer庫表控制)。通過在retl_buffer表上增加channel净当、pipeline兩個字段内斯,區(qū)分retl.retl_buffer庫表中的數(shù)據(jù)屬于不同的庫表。然后在SelectTask階段對數(shù)據(jù)進行分批整理 蚯瞧,每批的管道改成同步管道信息嘿期。(統(tǒng)一控制相對單獨控制存在一個風險點:如果同步的這批存量數(shù)據(jù)在Extract階段后和Load階段前存在源庫數(shù)據(jù)對應記錄的修改,同時修改的增量binlog又比存量同步的數(shù)據(jù)同步更快埋合,存在數(shù)據(jù)老數(shù)據(jù)覆蓋新數(shù)據(jù)的風險备徐,不過這種場景概率極小)

//如果數(shù)據(jù)來自RETL庫RETL_BUFFER表甚颂,將數(shù)據(jù)分批蜜猾,每批的管道改成同步管道信息
if (StringUtils.equalsIgnoreCase(RETL_BUFFER, pipeline.getPairs().get(0).getSource().getName())
            && StringUtils.equalsIgnoreCase(RETL, pipeline.getPairs().get(0).getSource().getNamespace())) {
        Long lastPipeLineId = null;
        Long lastChannelId = null;
        for (EventData data : eventData) {
            // 獲取每一條數(shù)據(jù)對應的pipeline
            EventColumn pipelineColumn = getMatchColumn(data.getColumns(), PIPELINE_ID);
            // 獲取每一條數(shù)據(jù)對應的channelID
            EventColumn channelColumn = getMatchColumn(data.getColumns(), CHANNEL_ID);

            if(pipelineColumn == null || channelColumn == null){
                logger.warn("data from RETL.RETL_BUFFER has no PIPELINE_ID OR CHANNEL_ID,the getKeys are {}",new Object[]{data.getKeys().toArray()});
                continue;
            }
            
            Long pipeLineId = Long.valueOf(pipelineColumn.getColumnValue());
            Long channelId = Long.valueOf(channelColumn.getColumnValue());

            if (pipeLineId == null || channelId == null) {
                continue;
            }
            
            //第一條數(shù)據(jù)秀菱,不發(fā)送
            if (lastPipeLineId == null && lastChannelId == null) {
                lastPipeLineId = pipeLineId;
                lastChannelId = channelId;
                rowBatch.merge(data);
                continue;
            }

            //數(shù)據(jù)管道或通道有變化時,每個管道號數(shù)據(jù)作為一批發(fā)送
            if (pipeLineId != lastPipeLineId || channelId != lastChannelId) {
                // 構造唯一標識
                Identity identity = new Identity();
                identity.setChannelId(lastChannelId);
                identity.setPipelineId(lastPipeLineId);
                identity.setProcessId(etlEventData.getProcessId());
                rowBatch.setIdentity(identity);

                long nextNodeId = etlEventData.getNextNid();
                List<PipeKey> pipeKeys = rowDataPipeDelegate.put(new DbBatch(rowBatch),
                        nextNodeId);
                etlEventData.setDesc(pipeKeys);
                etlEventData.setNumber((long) rowBatch.getDatas().size());
                etlEventData.setFirstTime(startTime); // 使用原始數(shù)據(jù)的第一條
                etlEventData.setBatchId(message.getId());

                if (profiling) {
                    Long profilingEndTime = System.currentTimeMillis();
                    stageAggregationCollector.push(pipelineId, StageType.SELECT,
                            new AggregationItem(profilingStartTime, profilingEndTime));
                }
                arbitrateEventService.selectEvent().single(etlEventData);
                rowBatch = new RowBatch();
            }
            lastPipeLineId = pipeLineId;
            lastChannelId = channelId;
            rowBatch.merge(data);

        }
        if(rowBatch!=null && rowBatch.getDatas() != null && rowBatch.getDatas().size()>0){
         // 構造唯一標識
            Identity identity = new Identity();
            identity.setChannelId(lastChannelId);
            identity.setPipelineId(lastPipeLineId);
            identity.setProcessId(etlEventData.getProcessId());
            rowBatch.setIdentity(identity);

            long nextNodeId = etlEventData.getNextNid();
            List<PipeKey> pipeKeys = rowDataPipeDelegate.put(new DbBatch(rowBatch),
                    nextNodeId);
            etlEventData.setDesc(pipeKeys);
            etlEventData.setNumber((long) rowBatch.getDatas().size());
            etlEventData.setFirstTime(startTime); // 使用原始數(shù)據(jù)的第一條
            etlEventData.setBatchId(message.getId());

            if (profiling) {
                Long profilingEndTime = System.currentTimeMillis();
                stageAggregationCollector.push(pipelineId, StageType.SELECT,
                        new AggregationItem(profilingStartTime, profilingEndTime));
            }
            arbitrateEventService.selectEvent().single(etlEventData);
        }

    }

可以通過下面這個圖來理解:


image.png

參考資料

https://yq.aliyun.com/articles/2350
http://eyuxu.iteye.com/blog/1941894

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末蹭睡,一起剝皮案震驚了整個濱河市衍菱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肩豁,老刑警劉巖脊串,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異清钥,居然都是意外死亡琼锋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門祟昭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缕坎,“玉大人,你說我怎么就攤上這事篡悟∶仗荆” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵搬葬,是天一觀的道長荷腊。 經常有香客問我,道長踩萎,這世上最難降的妖魔是什么停局? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮香府,結果婚禮上董栽,老公的妹妹穿的比我還像新娘。我一直安慰自己企孩,他們只是感情好锭碳,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著勿璃,像睡著了一般擒抛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上补疑,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天歧沪,我揣著相機與錄音,去河邊找鬼莲组。 笑死诊胞,一個胖子當著我的面吹牛,可吹牛的內容都是我干的锹杈。 我是一名探鬼主播撵孤,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼迈着,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了邪码?” 一聲冷哼從身側響起裕菠,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎闭专,沒想到半個月后奴潘,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡喻圃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年萤彩,在試婚紗的時候發(fā)現(xiàn)自己被綠了粪滤。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斧拍。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖杖小,靈堂內的尸體忽然破棺而出肆汹,到底是詐尸還是另有隱情,我是刑警寧澤予权,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布昂勉,位于F島的核電站,受9級特大地震影響扫腺,放射性物質發(fā)生泄漏岗照。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一笆环、第九天 我趴在偏房一處隱蔽的房頂上張望攒至。 院中可真熱鬧,春花似錦躁劣、人聲如沸迫吐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽志膀。三九已至,卻和暖如春鳖擒,著一層夾襖步出監(jiān)牢的瞬間溉浙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工蒋荚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留戳稽,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓圆裕,卻偏偏與公主長得像广鳍,于是被迫代替她去往敵國和親荆几。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內容