es建立索引:
PUT 索引庫(kù)名稱(chēng)
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}
GET /索引庫(kù)/_mapping/表名
{
"properties":
{
"custNo":
{
"type":"text"
},
"ifEver":
{
"type":"text"
},
"ifApp":
{
"type":"text"
}
}
}
canal-server:
- conf/canal.properties
這里主要配置消費(fèi)模式為tcp(canal.serverMode = tcp)
#################################################
######### common argument #############
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip = 172.16.2.74
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers = 172.16.2.74:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
#################################################
######### destinations #############
#################################################
canal.destinations = examplesss
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =
- /conf/xxxx/instance.properties xxx為實(shí)例名稱(chēng), 后面adapter配置屬性文件時(shí)需要對(duì)應(yīng)空另。我這里叫做example2
這里主要配置canal要監(jiān)聽(tīng)哪個(gè)數(shù)據(jù)庫(kù)(canal.instance.master.address)及用戶(hù)名密碼,
對(duì)數(shù)據(jù)庫(kù)中的哪張表數(shù)據(jù)進(jìn)行過(guò)濾(canal.instance.filter.regex)
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=172.16.2.207:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=byxf1qaz
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=dcp.dcp_db_config
#canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
Canal-adapter:
-
Conf/application.yml
1.1 主要配置要處理的數(shù)據(jù)源信息(srcDataSources)掷贾。
1.2 配置canal-server中的instance實(shí)例與具體“消費(fèi)者”adapter之間的關(guān)聯(lián)關(guān)系(canalAdapters), 這里可以配置多個(gè)關(guān)聯(lián)關(guān)系蜒茄。
example2是我在admin-server上起的一個(gè)實(shí)例, 有消費(fèi)組g1中的es消費(fèi)厂捞。
1.3 outAdapters下可以配置多個(gè)“消費(fèi)者”输玷,
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.16.2.207:3306/dcp?useUnicode=true
username: root
password: byxf1qaz
canalAdapters:
- instance: example2
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 172.16.2.74:9300
properties:
cluster.name: okami-application #es集群名稱(chēng)okami-application
# - instance: example # canal instance Name or mq topic name
# groups:
# - groupId: g1
# outerAdapters:
# - name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300
# properties:
# cluster.name: elasticsearch
- conf/es/xx.yml xxx名字可以隨意,這里不做匹配
dataSourceKey: defaultDS
destination: example2
groupId: g1
esMapping:
_index: canal //es索引庫(kù)名
_type: testTable //es中的表名
_id: custNo
sql: "select db_code as custNo, db_name as ifApp, db_user as ifEver from dcp.dcp_db_config"