Canal是阿里巴巴集團(tuán)提供的一個(gè)開源產(chǎn)品占贫,能夠通過解析數(shù)據(jù)庫的增量日志,提供增量數(shù)據(jù)的訂閱和消費(fèi)功能掺涛。當(dāng)您需要將MySQL中的增量數(shù)據(jù)同步至ES時(shí)巴席,可通過Canal來實(shí)現(xiàn)大猛。
準(zhǔn)備環(huán)境(以下幾個(gè)的版本是我用的扭倾,不是強(qiáng)制使用):
- JDK(1.8)
- Mysql (5.7.30)
- ElasticSearch (7.7.0)
- Canal Server(1.1.5)
- Canal-adapter(1.1.5)(由于使用的es7,1.1.4及以下不支持es7)
1 mysql
1.1 開啟mysql的binlog寫入功能
執(zhí)行show master staus;如果顯示為空挽绩,則是沒有配置主從膛壹,修改my.cnf并重啟mysql,https://github.com/alibaba/canal/wiki/AdminGuide:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
1.2 創(chuàng)建數(shù)據(jù)庫es_test
1.3 創(chuàng)建表user
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL,
`detail` text COLLATE utf8mb4_unicode_ci NOT NULL,
`age` int(3) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
2 elasticsearch
2.1 創(chuàng)建索引
PUT user
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "user"
}
2.2 創(chuàng)建mapping
PUT user
{
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "1"
}
},
"mappings": {
"properties": {
"age": {
"type": "integer"
},
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_smart"
},
"detail": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
執(zhí)行GET /user,查看創(chuàng)建的索引:
GET /user
{
"user" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "integer"
},
"detail" : {
"type" : "text",
"analyzer" : "ik_max_word"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"analyzer" : "ik_smart"
}
}
},
"settings" : {
"index" : {
"creation_date" : "1591092311266",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "ZSl8PnqcSriSPtNNyS8qdA",
"version" : {
"created" : "7070099"
},
"provided_name" : "user"
}
}
}
}
3 Canal-server
3.1 下載Canal-deployer,本文使用Canal-deployer 1.1.5版本唉堪。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
3.2 解壓canal.deployer-1.1.5-SNAPSHOT.tar.gz模聋。
mkdir canal.deployer-1.1.5
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C ./canal.deployer-1.1.5/
3.3 修改conf/example/instance.properties文件。
vim conf/example/instance.properties
配置mysql的相關(guān)信息:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#table regex
canal.instance.filter.regex=es_test\\..*
數(shù)據(jù)表的過濾條件https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94
使用:wq命令保存文件并退出vim模式巨坊。
3.4 啟動(dòng)Canal-server撬槽,并查看日志此改。
./bin/startup.sh
cat logs/canal/canal.log
4 Canal-adapter
4.1 下載Canal-adapter,本文使用Canal-adapter1.1.5版本趾撵。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz
4.2 解壓canal.adapter-1.1.5-SNAPSHOT.tar.gz。
mkdir canal.adapter-1.1.5
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C ./canal.adapter-1.1.5/
4.3 修改conf/application.yml文件。
vim conf/application.yml
修改以下配置:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/es_test?useUnicode=true
username: x
password: x123
- name: logger
key: logger1
- name: es7
key: es71 #這個(gè)很重要占调,必須設(shè)置唯一暂题,不然有坑
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
security.auth: elastic:123456 # only used for rest mode
cluster.name: xzp-application
配置項(xiàng) | 說明 |
---|---|
canal.conf.canalServerHost | canalDeployer訪問地址。保持默認(rèn)(127.0.0.1:11111)即可究珊。 |
canal.conf.srcDataSources.defaultDS.url | jdbc:mysql://<MySQL地址>:<端口>/<數(shù)據(jù)庫名稱>?useUnicode=true |
canal.conf.srcDataSources.defaultDS.username | MySQL數(shù)據(jù)庫的賬號(hào)名稱 |
canal.conf.srcDataSources.defaultDS.password | MySQL數(shù)據(jù)庫的密碼 |
canal.conf.canalAdapters.groups.outerAdapters.hosts | 定位到name:es的位置薪者,將hosts替換為<ES的地址>:<端口> |
canal.conf.canalAdapters.groups.outerAdapters.mode | 必須設(shè)置為rest。 |
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth | <ES的賬號(hào)>:<密碼>剿涮。例如elastic:es_password言津。 |
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name | es集群的名字 |
使用:wq命令保存文件并退出vim模式。
4.4 在conf/es/目錄下新建user.yml,定義MySQL數(shù)據(jù)到ES數(shù)據(jù)的映射字段取试。
vim user.yml
dataSourceKey: defaultDS
outerAdapterKey: es71
destination: example
groupId: g1
esMapping:
_index: user
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "select id as _id,id,name,detail,age from user"
etlCondition: "where id<='{0}'" #etl的條件參數(shù)悬槽,可以將之前沒能同步的數(shù)據(jù)同步,數(shù)據(jù)量大的話可以用logstash
commitBatch: 3000
使用:wq命令保存文件并退出vim模式瞬浓。
配置項(xiàng) | 說明 |
---|---|
esMapping._index | 創(chuàng)建的索引的名稱,user |
esMapping._type | 默認(rèn)_doc即可 |
esMapping._id | 需要同步到ES實(shí)例的文檔的id初婆,可自定義。本文使用_id猿棉。 |
esMapping.sql | SQL語句磅叛,用來查詢需要同步到ES中的字段。本文使用select id as _id,id,name,detail,age from user |
4.5 啟動(dòng)Canal-adapter服務(wù)萨赁,并查看日志弊琴。
./bin/startup.sh
cat logs/adapter/adapter.log
5 adapter管理REST接口
5.1 查詢所有訂閱同步的canal instance或MQ topic
http://ip:8081/destinations
[{"destination":"example","status":"on"}]
5.2 查詢數(shù)據(jù)同步開關(guān)狀態(tài)
查看指定 canal instance/MQ topic 的數(shù)據(jù)同步開關(guān)狀態(tài)
http://ip:8081/syncSwitch/example
{"stauts":"on"}
5.3 打開數(shù)據(jù)同步開關(guān)
針對 example 這個(gè)canal instance/MQ topic 進(jìn)行開關(guān)操作. off代表關(guān)閉, instance/topic下的同步將阻塞或者斷開連接不再接收數(shù)據(jù), on代表開啟
注: 如果在配置文件中配置了 zookeeperHosts 項(xiàng), 則會(huì)使用分布式鎖來控制HA中的數(shù)據(jù)同步開關(guān), 如果是單機(jī)模式則使用本地鎖來控制開關(guān)
PUT http://ip:8081/syncSwitch/example/on
{
"code": 20000,
"message": "實(shí)例: example 開啟同步成功"
}
改為/syncSwitch/example/off是關(guān)閉
{
"code": 20000,
"message": "實(shí)例: example 關(guān)閉同步成功"
}
5.4 手動(dòng)ETL
導(dǎo)入數(shù)據(jù)到指定類型的庫, 如果params參數(shù)為空則全表導(dǎo)入, 參數(shù)對應(yīng)的查詢條件在配置中的etlCondition指定
#/etl/es7/{key}/user.yml
#curl http://ip:8081/etl/es7/es71/user.yml -X POST -d "params=50"
curl http://ip:8081/etl/es7/es71/user.yml -X POST
{
"succeeded": true,
"resultMessage": "導(dǎo)入ES 數(shù)據(jù):59 條"
}
6 出現(xiàn)的問題
6.1 數(shù)據(jù)沒同步
執(zhí)行show master staus;如果顯示為空,則是沒有配置主從杖爽,修改my.cnf并重啟mysql,https://github.com/alibaba/canal/wiki/AdminGuide:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1