CanalAdapter同步ES7
預(yù)準(zhǔn)備
- Mysql5.7
- ElasticSearch7.12.1集群或單擊(7.x +)
- 一個(gè)數(shù)據(jù)庫(kù)和一個(gè)表結(jié)構(gòu)
1.下載
canal.deployer-1.1.5.tar.gz對(duì)應(yīng)的是canal的server端珍语,負(fù)責(zé)訂閱并解析Mysql-Binlog
canal.adapter-1.1.5.tar.gz對(duì)應(yīng)的是適配器蝶缀,負(fù)責(zé)將server的binlog轉(zhuǎn)換并發(fā)送給對(duì)應(yīng)的應(yīng)用
canal.admin-1.1.5.tar.gz一個(gè)可視化webui可以不安裝
額外需要下載v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)
分別解壓縮后敛滋,將v1.1.5-alpha-2解壓縮文件夾下plugin文件夾中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替換掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar
2. 開(kāi)啟Mysql-Binlog
進(jìn)入mysql終端執(zhí)行
show variables like 'log_bin';
如果是ON就代表已經(jīng)開(kāi)啟蔬崩。
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
3. 修改配置
- 先修改canal.deployer的配置
參考官方中文文檔: https://github.com/alibaba/canal/wiki/QuickStart - 修改canal.adapter配置 conf/application.yml
更多請(qǐng)參考官方中文文檔:https://github.com/alibaba/canal/tree/master/client-adapter
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# 省略.... 默認(rèn)即可
srcDataSources: # 注意打開(kāi)注釋?zhuān)缓笠蜕厦鎸?duì)齊否則會(huì)報(bào)錯(cuò)(但是卻又不告訴你是格式問(wèn)題..)
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es7 # 這里要改為es7 他會(huì)去es7下去找配置
key: exampleKey #這里是全量導(dǎo)入時(shí)的taskid
hosts: es1:9201,es2:9202,es3:9203 # 我這里是集群,如果單機(jī)寫(xiě)一個(gè)就行ip:port
properties:
mode: rest # restful 模式
cluster.name: es-cluster # 集群名字 GET _cat/health?v 可以看到對(duì)應(yīng)的名字
- 修改es7下模板文件 conf/es7/mytest_user.yml
dataSourceKey: defaultDS #和上面canal.conf.srcDataSources.defaultDS要一樣
outerAdapterKey: exampleKey #和上面canal.conf.canalAdapters.instance.groups.outerAdapters.key要一樣
destination: example #和上面canal.conf.canalAdapters.instance要一樣
groupId: g1 #和上面canal.conf.canalAdapters.instance.groups.groupId要一樣
esMapping:
_index: test #索引名稱(chēng)
_id: _id #documentid
_type: _doc # type
# upsert: true
# pk: id
sql: "select a.id as _id, a.name_cn, a.name_en, a.email
from user a" # 查詢(xún)的sql返回的結(jié)構(gòu)要使用as別名和es的filed對(duì)應(yīng)
# objFields:
# _labels: array:;
commitBatch: 3000 #批量提交數(shù)量
4. 創(chuàng)建對(duì)應(yīng)的索引
PUT test
{
"settings":{
"number_of_shards":1,
"number_of_replicas":2
},
"mappings":{
"properties":{
"name_cn" : {
"type" : "keyword"
},
"name_en" : {
"type" : "keyword"
},
"email" : {
"type" : "keyword"
}
}
}
}
5. 啟動(dòng)
均在對(duì)應(yīng)目錄下 /bin/startup.sh
- 先啟動(dòng)deployer,查看/logs/canal/canal.log 無(wú)異常后再查看/logs/example/example.log均無(wú)異常則啟動(dòng)成功
- 啟動(dòng)adapter,查看/logs/adapter/adapter.log 無(wú)異常則啟動(dòng)成功
6.首次全量導(dǎo)入
curl http://localhost:8081/etl/es7/exampleKey/mytest_user.yml -X POST
//執(zhí)行完對(duì)應(yīng)在/logs/adapter/adapter.log會(huì)有批量導(dǎo)入成功的日志
start etl to import data to index: test
數(shù)據(jù)全量導(dǎo)入完成, 一共導(dǎo)入 1585 條數(shù)據(jù), 耗時(shí): 643
/**
* ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
*
* @param type 類(lèi)型 hbase, es
* @param key adapter key
* @param task 任務(wù)名對(duì)應(yīng)配置文件名 mytest_user.yml
* @param params etl where條件參數(shù), 為空全部導(dǎo)入,匹配etlCondition中的參數(shù),使用;分號(hào)多個(gè)值
*/
@PostMapping("/etl/{type}/{key}/{task}")
public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
@RequestParam(name = "params", required = false) String params)
7.增量添加更新刪除自動(dòng)同步
- 如果以上操作在程序運(yùn)行期間無(wú)任何異常則自動(dòng)就開(kāi)啟了同步添吗,可以手動(dòng)添加更新刪除一個(gè)記錄試試。
- 如果出現(xiàn)提示表異常份名,可能是因?yàn)閹?kù)太多某些庫(kù)有問(wèn)題
修改/deployer/conf/example/instance.properties的匹配策略
#只同步test庫(kù)的全部表數(shù)據(jù)
canal.instance.filter.regex=test\\..*
如果解決不了則建議去看官方的issue碟联,https://github.com/alibaba/canal/issues妓美。