canal實(shí)現(xiàn)從mysql同步數(shù)據(jù)到es

es建立索引:

PUT 索引庫(kù)名稱(chēng)
{
      "settings": {
          "number_of_shards": 3,
          "number_of_replicas": 2
        }
  }
GET /索引庫(kù)/_mapping/表名
{
  "properties":
  {
    "custNo":
    {
      "type":"text"  
    },
    "ifEver":
    {
      "type":"text"  
    },
    "ifApp":
    {
      "type":"text"  
    }
  }
}

canal-server:

  1. conf/canal.properties

這里主要配置消費(fèi)模式為tcp(canal.serverMode = tcp)

#################################################
#########       common argument     ############# 
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip = 172.16.2.74
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers = 172.16.2.74:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000


#################################################
#########       destinations        ############# 
#################################################
canal.destinations = examplesss
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########            MQ              #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =
  1. /conf/xxxx/instance.properties xxx為實(shí)例名稱(chēng), 后面adapter配置屬性文件時(shí)需要對(duì)應(yīng)空另。我這里叫做example2

這里主要配置canal要監(jiān)聽(tīng)哪個(gè)數(shù)據(jù)庫(kù)(canal.instance.master.address)及用戶(hù)名密碼,

對(duì)數(shù)據(jù)庫(kù)中的哪張表數(shù)據(jù)進(jìn)行過(guò)濾(canal.instance.filter.regex)

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=172.16.2.207:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=byxf1qaz
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=dcp.dcp_db_config
#canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

Canal-adapter:

  1. Conf/application.yml

    1.1 主要配置要處理的數(shù)據(jù)源信息(srcDataSources)掷贾。

    1.2 配置canal-server中的instance實(shí)例與具體“消費(fèi)者”adapter之間的關(guān)聯(lián)關(guān)系(canalAdapters), 這里可以配置多個(gè)關(guān)聯(lián)關(guān)系蜒茄。

     example2是我在admin-server上起的一個(gè)實(shí)例, 有消費(fèi)組g1中的es消費(fèi)厂捞。
    

    1.3 outAdapters下可以配置多個(gè)“消費(fèi)者”输玷,

  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS: 
      url: jdbc:mysql://172.16.2.207:3306/dcp?useUnicode=true
      username: root
      password: byxf1qaz
  canalAdapters:
  - instance: example2
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 172.16.2.74:9300
        properties:
          cluster.name: okami-application #es集群名稱(chēng)okami-application
          
  #  - instance: example # canal instance Name or mq topic name
#    groups:
#    - groupId: g1
#      outerAdapters:
#      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300
#        properties:
#          cluster.name: elasticsearch
  1. conf/es/xx.yml xxx名字可以隨意,這里不做匹配
dataSourceKey: defaultDS
destination: example2
groupId: g1
esMapping:
  _index: canal   //es索引庫(kù)名
  _type: testTable  //es中的表名
  _id: custNo
  sql: "select db_code as custNo, db_name as ifApp, db_user as ifEver from dcp.dcp_db_config"
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末靡馁,一起剝皮案震驚了整個(gè)濱河市欲鹏,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌臭墨,老刑警劉巖赔嚎,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異胧弛,居然都是意外死亡尤误,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)叶圃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)袄膏,“玉大人,你說(shuō)我怎么就攤上這事掺冠〕凉荩” “怎么了码党?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)斥黑。 經(jīng)常有香客問(wèn)我揖盘,道長(zhǎng),這世上最難降的妖魔是什么锌奴? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任兽狭,我火速辦了婚禮,結(jié)果婚禮上鹿蜀,老公的妹妹穿的比我還像新娘箕慧。我一直安慰自己,他們只是感情好茴恰,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布颠焦。 她就那樣靜靜地躺著,像睡著了一般往枣。 火紅的嫁衣襯著肌膚如雪伐庭。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天分冈,我揣著相機(jī)與錄音圾另,去河邊找鬼。 笑死雕沉,一個(gè)胖子當(dāng)著我的面吹牛集乔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蘑秽,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼饺著,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了肠牲?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤靴跛,失蹤者是張志新(化名)和其女友劉穎缀雳,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體梢睛,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肥印,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了绝葡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片深碱。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖藏畅,靈堂內(nèi)的尸體忽然破棺而出敷硅,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布绞蹦,位于F島的核電站力奋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏幽七。R本人自食惡果不足惜景殷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望澡屡。 院中可真熱鬧猿挚,春花似錦、人聲如沸驶鹉。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)梁厉。三九已至辜羊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間词顾,已是汗流浹背八秃。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留肉盹,地道東北人昔驱。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像上忍,于是被迫代替她去往敵國(guó)和親骤肛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容