一杠袱、otter介紹
????阿里巴巴B2B公司,因?yàn)闃I(yè)務(wù)的特性窝稿,賣(mài)家主要集中在國(guó)內(nèi)霞掺,買(mǎi)家主要集中在國(guó)外,所以衍生出了杭州和美國(guó)異地機(jī)房的需求讹躯,同時(shí)為了提升用戶(hù)體驗(yàn)菩彬,整個(gè)機(jī)房的架構(gòu)為雙A,兩邊均可寫(xiě)潮梯,由此誕生了otter這樣一個(gè)產(chǎn)品骗灶。
????otter第一版本可追溯到04~05年,此次外部開(kāi)源的版本為第4版秉馏,開(kāi)發(fā)時(shí)間從2011年7月份一直持續(xù)到現(xiàn)在耙旦,目前阿里巴巴B2B內(nèi)部的本地/異地機(jī)房的同步需求基本全上了otte4。
目前同步規(guī)模:
- 同步數(shù)據(jù)量6億
- 文件同步1.5TB(2000w張圖片)
- 涉及200+個(gè)數(shù)據(jù)庫(kù)實(shí)例之間的同步
- 80+臺(tái)機(jī)器的集群規(guī)模
Otter項(xiàng)目地址:https://github.com/alibaba/otter
Otter文檔地址:https://github.com/alibaba/otter/wiki
二萝究、基礎(chǔ)概念
Pipeline:從源端到目標(biāo)端的整個(gè)過(guò)程描述免都,主要由一些同步映射過(guò)程組成;
Channel:同步通道帆竹,單向同步中一個(gè)Pipeline組成绕娘,在雙向同步中有兩個(gè)Pipeline組成;
DataMediaPair:根據(jù)業(yè)務(wù)表定義映射關(guān)系栽连,比如源表和目標(biāo)表险领,字段映射侨舆,字段組等;
DataMedia: 抽象的數(shù)據(jù)介質(zhì)概念绢陌,可以理解為數(shù)據(jù)表/mq隊(duì)列定義挨下;
DataMediaSource: 抽象的數(shù)據(jù)介質(zhì)源信息,補(bǔ)充描述DateMedia脐湾;
ColumnPair: 定義字段映射關(guān)系臭笆;
ColumnGroup: 定義字段映射組;
Node: 處理同步過(guò)程的工作節(jié)點(diǎn)秤掌,對(duì)應(yīng)一個(gè)jvm耗啦;
基礎(chǔ)概念直接的關(guān)系如圖:
三、架構(gòu)設(shè)計(jì)
下圖是關(guān)于Otter運(yùn)行原理圖:
根據(jù)上圖里面關(guān)鍵幾個(gè)元素進(jìn)行介紹
- db : 數(shù)據(jù)源以及需要同步到的庫(kù)
- Canal : 用戶(hù)獲取數(shù)據(jù)庫(kù)增量日志
- manager : 配置同步規(guī)則設(shè)置數(shù)據(jù)源同步源等
- zookeeper : 協(xié)調(diào)node進(jìn)行協(xié)調(diào)工作
- node : 負(fù)責(zé)任務(wù)處理机杜,即根據(jù)任務(wù)配置對(duì)數(shù)據(jù)源進(jìn)行解析并同步到目標(biāo)數(shù)據(jù)庫(kù)的操作帜讲。
原理描述:
基于Canal開(kāi)源產(chǎn)品,獲取數(shù)據(jù)庫(kù)增量日志數(shù)據(jù)椒拗。
a. 開(kāi)源鏈接地址:http://github.com/alibaba/canal
b. 推薦一個(gè)講Canal源碼的博客:http://www.tianshouzhi.com/api/tutorials/canal/380-
典型管理系統(tǒng)架構(gòu)似将,manager(web管理)+node(工作節(jié)點(diǎn))
a. manager運(yùn)行時(shí)推送同步配置到node節(jié)點(diǎn)
b. node節(jié)點(diǎn)將同步狀態(tài)反饋到manager上
基于zookeeper,解決分布式狀態(tài)調(diào)度的蚀苛,允許多node節(jié)點(diǎn)之間協(xié)同工作.
流程:
- 定義數(shù)據(jù)源
- 定義數(shù)據(jù)介質(zhì)
- 建立映射規(guī)則
- 建立字段映射
- 建立字段組
四在验、代碼結(jié)構(gòu)
工程結(jié)構(gòu):
包含三部分:Share | Node | Manager。 其中Share是Node和Manager共享的子系統(tǒng)堵未,并不是獨(dú)立部署的節(jié)點(diǎn)腋舌。Node和Manager是獨(dú)立部署的。
Node:一個(gè)獨(dú)立部署的節(jié)點(diǎn)渗蟹,比如兩個(gè)機(jī)房需要做通訊块饺,則每個(gè)機(jī)房至少要部署一個(gè)Node節(jié)點(diǎn)(不考慮HA的話),數(shù)據(jù)同步的過(guò)程實(shí)際上都發(fā)生在Node之間
Manager:管理的節(jié)點(diǎn)雌芽,邏輯上只有一個(gè)(一個(gè)Manager管理多個(gè)Node節(jié)點(diǎn))授艰,如果不考慮HA的話。負(fù)責(zé)管理同步的數(shù)據(jù)定義世落,包括數(shù)據(jù)源淮腾、Channel、PipeLine屉佳、數(shù)據(jù)映射等谷朝,各個(gè)Node節(jié)點(diǎn)從Manager處獲取并執(zhí)行這些信息。另外還有監(jiān)控等信息武花。
Manger各個(gè)子系統(tǒng)說(shuō)明:
- biz 對(duì)系統(tǒng)數(shù)據(jù)加載(即初始化時(shí)候執(zhí)行SQL初始化的系統(tǒng)表數(shù)據(jù))
- deployer 本地啟動(dòng)
- web 配置管理的webUI
Node各個(gè)子系統(tǒng)的說(shuō)明:
- Common:公共內(nèi)容定義
- Canal: Canal的封裝圆凰,Otter采用的是Embed的方式引入Canal(Canal有Embed和獨(dú)立運(yùn)行兩種模式)
- Deployer:內(nèi)置Jetty的啟動(dòng)
- etl: S.E.T.L 調(diào)度、處理的實(shí)現(xiàn)髓堪,是Otter最復(fù)雜送朱、也是最核心的部分。(Select干旁、Extract驶沼、Transform、Load)
SETL過(guò)程功能說(shuō)明如圖:
Share各個(gè)子系統(tǒng)的說(shuō)明:
- Common: 公共內(nèi)容定義
- Arbitrate: 用于Manager與Node之間争群、Node與Node之間的調(diào)度回怜、S.E.T.L幾個(gè)過(guò)程的調(diào)度等;
- Communication 數(shù)據(jù)傳輸?shù)牡讓踊槐。蠈拥腜ipe玉雾、一些調(diào)度等都是依賴(lài)于Communication的, 簡(jiǎn)單點(diǎn)說(shuō)它負(fù)責(zé)點(diǎn)對(duì)點(diǎn)的Event發(fā)送和接收
- etl:實(shí)際上并不負(fù)責(zé)ETL的具體實(shí)現(xiàn)轻要,只是一些接口&數(shù)據(jù)結(jié)構(gòu)的定義而已复旬,具體的實(shí)現(xiàn)在Node里面。
五冲泥、基本操作
這部分操作參考官網(wǎng):https://github.com/alibaba/otter/wiki
1. Manger和Node的配置和部署
2. 配置任務(wù)
六驹碍、本地調(diào)試環(huán)境搭建
1. 代碼下載和導(dǎo)入idea
環(huán)境搭建:
- 進(jìn)入$otter_home目錄
- 執(zhí)行:mvn clean install
- 導(dǎo)入maven項(xiàng)目。如果eclipse下報(bào)"Missing artifact com.oracle:ojdbc14:jar:10.2.0.3.0"凡恍,修改
{user.dir}/lib/ojdbc14-10.2.0.3.0.jar"為絕對(duì)路徑志秃,比如"d:/lib/ojdbc14-10.2.0.3.0.jar"
導(dǎo)入idea之后的結(jié)構(gòu)如圖:
本地調(diào)試環(huán)境搭建:
-
- 配置Manager
- 1.1 打開(kāi)Manger下的deployer模塊下的otter.properties文件
-
1.2 修改如下屬性:
圖片.png
圖片.png- 1.3 運(yùn)行OtterManagerLauncher.java 啟動(dòng)Manager
- 1.4 訪問(wèn)http://localhost:8080,配置集群
- 1.4 訪問(wèn)http://localhost:8080嚼酝,配置Node,ip為本機(jī)ip
-
配置Node
2.1 打開(kāi)node下的deployer模塊下的otter.properties文件
-
2.2 修改屬性浮还,這個(gè)node要連接去那個(gè)mananger,我們是本機(jī)調(diào)試寫(xiě)本機(jī)Ip,或者127.0.0.1
圖片.png -
2.3 運(yùn)行OtterLauncher.java時(shí)候設(shè)置運(yùn)行參數(shù)-Dnid=id闽巩,這個(gè)id為manager钧舌,web界面中配置node對(duì)應(yīng)的id。
如圖:
圖片.png 2.4 運(yùn)行OtterLauncher.java
2.5 啟動(dòng)完成之后可以在manager的web界面看到node已經(jīng)啟動(dòng)
-
啟動(dòng)
- 3.1 配置同步任務(wù)
- 3.2 啟動(dòng)同步任務(wù)涎跨,修改數(shù)據(jù)源的表延刘,可以查看輸出端的表是否有數(shù)據(jù)同步過(guò)去了。
七六敬、改造支持kafka的代碼介紹
修改分為三大部分:
1. manager端的改造
- 修改頁(yè)面 addDataSource.vm 添加 Kafka 選項(xiàng)碘赖;
- dbCheck.js 引用了 Hello.js,這里應(yīng)用了dwr技術(shù),通過(guò)js去調(diào)用后端java代碼外构。
配置文件manager中的biz模塊下的otter-manager-service.xml文件:
<bean id="dataSourceChecker" class="com.alibaba.otter.manager.biz.utils.DataSourceChecker">
<property name="dataMediaSourceService">
<ref bean="dataMediaSourceService" />
</property>
<property name="dataSourceCreator">
<ref bean="dataSourceCreator" />
</property>
<dwr:remote javascript="Hello">
<dwr:include method="check" />
<dwr:include method="checkMap" />
<dwr:include method="checkNamespaceTables" />
</dwr:remote>
</bean>
dbCheck.js調(diào)用DataSourceChecker.java中的方法去校驗(yàn)數(shù)據(jù)庫(kù)是否可用普泡,增加kafka的校驗(yàn)方法。
dwr簡(jiǎn)單教程
editDataSource.vm頁(yè)面類(lèi)型選項(xiàng)添加 Kafka 類(lèi)型
DataSourceList.java中增加kafka數(shù)據(jù)源處理
addDataMedia.vm頁(yè)面中的配置kafka的主題時(shí)如何驗(yàn)證主題是否存在审编?
addDataMedia.vm頁(yè)面中的kafka的主題無(wú)法驗(yàn)證是否存在
-
addColumnPair.vm中如果存在kafka的數(shù)據(jù)源時(shí)候撼班,列直接復(fù)制mysql表的列。
數(shù)據(jù)來(lái)源:AddColumnPairGroup.java中的execute方法調(diào)用buildColumnPairFromDataMedia方法垒酬。
在kafka中沒(méi)有列的概念砰嘁,要把數(shù)據(jù)源的列直接賦值給kafka件炉,這一步操作需要在幾個(gè)類(lèi)中進(jìn)行:
com.alibaba.otter.manager.web.home.module.screen.AddColumnPair
com.alibaba.otter.manager.web.home.module.screen.AddColumnPairGroup
- ColumnPairAction.java 中的doSave方法中kafka沒(méi)有所謂的列名,這里要處理一下進(jìn)行保存矮湘。
- 添加 Kafka 數(shù)據(jù)源類(lèi)型
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaDataMedia.java
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaMediaSource.java
到這里斟冕,前端的改造基本完成∶逖簦可以通過(guò)manager的web界面新建出kafka的數(shù)據(jù)源磕蛇。
2. node端的改造 selector部分
- com.alibaba.otter.node.etl.select.selector.MessageParser類(lèi)負(fù)責(zé)解析數(shù)據(jù)對(duì)象解析,將對(duì)應(yīng)canal送出來(lái)的Entry對(duì)象解析為otter使用的內(nèi)部對(duì)象十办。這里對(duì)配置的數(shù)據(jù)源做了一個(gè)檢查:源和目標(biāo)的庫(kù)名表名是否是一致秀撇。這里需要排除kafka數(shù)據(jù)源做判斷。
3. node端的改造 transform部分
- node中的T部分工作的代碼在com.alibaba.otter.node.etl.transform包下向族。
- 新增com.alibaba.otter.node.etl.transform.transformer.NoStructTransformer.java類(lèi)處理RowData轉(zhuǎn)到Kafka需要的數(shù)據(jù)格式呵燕。
- 在com.alibaba.otter.node.etl.transform.OtterTransformerFactory類(lèi)中的lookup方法中增加針對(duì)KafkaDataMedia的處理。
- 在node中的etl模塊中的otter-node-transform.xml增加NoStructTransformer的注入配置件相。
4. node端的改造 load部分
- 增加Kafka數(shù)據(jù)源的方言處理類(lèi):com.alibaba.otter.node.etl.common.db.dialect.kafka.KafkaDialect虏等,構(gòu)建kafka的producer來(lái)發(fā)送消息。
- com.alibaba.otter.node.etl.load.loader.db.DbLoadAction類(lèi)中的doCall方法增加對(duì)kafka方言的判斷和處理适肠。
- kafka分區(qū)有序霍衫,所以在發(fā)送binlog數(shù)據(jù)時(shí)候需要根據(jù)數(shù)據(jù)情況設(shè)置好分區(qū)的方式,可以通過(guò)設(shè)定分區(qū)字段侯养,然后自定義分區(qū)去完成
至此整體改造完成敦跌。
可以通過(guò)如下步驟進(jìn)行驗(yàn)證:
- manager啟動(dòng)
- manager中配置mysql=>kafka的任務(wù)
- 啟動(dòng)node
- manager中啟動(dòng)同步任務(wù)
- mysql中修改數(shù)據(jù)
- 在kafka的customer中查看是否有消息發(fā)送過(guò)來(lái)