我之前寫了一篇 canal server TCP 模式部署筆記 http://www.reibang.com/p/23ae08609e2d. 今天把它改造成了 Kafka 模式,整個過程還是很簡單的雷滋,需要提前準備 zookeeper 環(huán)境 http://www.reibang.com/p/558c591469b0 以及 Kafka 環(huán)境 https://kafka.apachecn.org/quickstart.html
1. 修改 conf/canal.properties
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
canal.mq.flatMessage = false
2. 修改 conf/content/instance.properties
# 如果你之前按照官方文檔搭建的 TCP 模式的 canal server
# 那么 canal.instance.filter.regex = .\*\\\\..\* 需要改成 canal.instance.filter.regex = .*\\..*
# 不符合 dynamicTopic 規(guī)則的 binlog 推送到默認 topic : schemas
canal.mq.topic = schemas
# content database 的 binlog 推送到 content topic
canal.mq.dynamicTopic = content
# topic 分區(qū)
canal.mq.partition = 0
3. 補充
1. Kafka 模式與 TCP 模式的 canal server 啟動過程是沒有區(qū)別的不撑,都是根據(jù)規(guī)則確定 dump binlog 的位點信息,具體分析可以參考 http://www.reibang.com/p/23ae08609e2d
2. Kafka 模式與 TCP 模式最大的區(qū)別在于 dump binlog 的時機
- 在 TCP 模式下 canal server dump binlog 是由 canal client 請求觸發(fā)的晤斩,如果沒有 canal client 那么 canal server 的位點一直是滯后于 mysql server 的
- 在 Kafka 模式下 canal server 是定時主動 dump binlog, 在獲取到新的 binlog 之后將其推送到 Kafka 對應的 topic, 如果 topic 不存在則自動創(chuàng)建 topic, 然后更新 canal server 存儲的位點信息 last position
canal client 通過自己維護的 topic/partition/offset 信息訂閱 Kafka 數(shù)據(jù)焕檬,定時主動查詢 Kafka; 整個過程中 canal server 與 canal client 沒有任何交互