一遥昧、背景
在上一篇文章中,我們使用 Canal Admin 搭建了Canal Server 集群狱窘,在這篇文章中隐轩,我們使用上篇文章的基礎(chǔ)窗慎,將消息發(fā)送到kafka消息隊(duì)列中图云。
二惯悠、需要修改的地方
以下 配置文件的修改,都是在 Canal Admin 上修改的竣况。
1克婶、canal.properties 配置文件修改
1、修改canal.serverMode的值
2丹泉、修改kafka配置
2鸠补、修改 instance.propertios 配置文件
3、canal發(fā)消息到mq性能優(yōu)化
影響性能的幾個(gè)參數(shù):
-
canal.instance.memory.rawEntry = true
(表示是否需要提前做序列化嘀掸,非flatMessage場(chǎng)景需要設(shè)置為true) -
canal.mq.flatMessage = false
(false代表二進(jìn)制協(xié)議,true代表使用json格式规惰,二進(jìn)制協(xié)議有更好的性能) -
canal.mq.dynamicTopic
(動(dòng)態(tài)topic配置定義睬塌,可以針對(duì)不同表設(shè)置不同的topic,在flatMessage模式下可以提升并行效率) -
canal.mq.partitionsNum/canal.mq.partitionHash
(分區(qū)配置歇万,對(duì)寫入性能有反作用揩晴,不過(guò)可以提升消費(fèi)端的吞吐)
參考鏈接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
三、kafka接收消息
1贪磺、canal 發(fā)送過(guò)來(lái)的消息
/**
* canal 發(fā)送過(guò)來(lái)的消息
*
* @author huan.fu 2021/9/2 - 下午4:06
*/
@Getter
@Setter
@ToString
public class CanalMessage {
/**
* 測(cè)試得出 同一個(gè)事物下產(chǎn)生多個(gè)修改硫兰,這個(gè)id的值是一樣的。
*/
private Integer id;
/**
* 數(shù)據(jù)庫(kù)或schema
*/
private String database;
/**
* 表名
*/
private String table;
/**
* 主鍵字段名
*/
private List<String> pkNames;
/**
* 是否是ddl語(yǔ)句
*/
private Boolean isDdl;
/**
* 類型:INSERT/UPDATE/DELETE
*/
private String type;
/**
* binlog executeTime, 執(zhí)行耗時(shí)
*/
private Long es;
/**
* dml build timeStamp, 同步時(shí)間
*/
private Long ts;
/**
* 執(zhí)行的sql,dml sql為空
*/
private String sql;
/**
* 數(shù)據(jù)列表
*/
private List<Map<String, Object>> data;
/**
* 舊數(shù)據(jù)列表,用于update,size和data的size一一對(duì)應(yīng)
*/
private List<Map<String, Object>> old;
}
2寒锚、監(jiān)聽(tīng)消息
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5")
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class);
log.info("\r=================================");
log.info("接收到的原始 canal message為: {}", record.value());
log.info("轉(zhuǎn)換成Java對(duì)象后轉(zhuǎn)換成Json為 : {}", JSON.toJSONString(canalMessage));
ack.acknowledge();
}
}
3劫映、獲取消息
四、MQ配置相關(guān)的參數(shù)
參數(shù)名 | 參數(shù)說(shuō)明 | 默認(rèn)值 |
---|---|---|
canal.mq.servers | kafka為bootstrap.servers rocketMQ中為nameserver列表 | 127.0.0.1:6667 |
canal.mq.retries | 發(fā)送失敗重試次數(shù) | 0 |
canal.mq.batchSize | kafka為ProducerConfig.BATCH_SIZE_CONFIG rocketMQ無(wú)意義 |
16384 |
canal.mq.maxRequestSize | kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ無(wú)意義 |
1048576 |
canal.mq.lingerMs | kafka為ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建議將該值調(diào)大, 如: 200 rocketMQ無(wú)意義 |
1 |
canal.mq.bufferMemory | kafka為ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ無(wú)意義 |
33554432 |
canal.mq.acks | kafka為ProducerConfig.ACKS_CONFIG rocketMQ無(wú)意義 |
all |
canal.mq.kafka.kerberos.enable | kafka為ProducerConfig.ACKS_CONFIG rocketMQ無(wú)意義 |
false |
canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos認(rèn)證 rocketMQ無(wú)意義 | ../conf/kerberos/krb5.conf |
canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos認(rèn)證 rocketMQ無(wú)意義 | ../conf/kerberos/jaas.conf |
canal.mq.producerGroup | kafka無(wú)意義 rocketMQ為ProducerGroup名 | Canal-Producer |
canal.mq.accessChannel | kafka無(wú)意義 rocketMQ為channel模式刹前,如果為aliyun則配置為cloud | local |
--- | --- | --- |
canal.mq.vhost= | rabbitMQ配置 | 無(wú) |
canal.mq.exchange= | rabbitMQ配置 | 無(wú) |
canal.mq.username= | rabbitMQ配置 | 無(wú) |
canal.mq.password= | rabbitMQ配置 | 無(wú) |
canal.mq.aliyunuid= | rabbitMQ配置 | 無(wú) |
--- | --- | --- |
canal.mq.canalBatchSize | 獲取canal數(shù)據(jù)的批次大小 | 50 |
canal.mq.canalGetTimeout | 獲取canal數(shù)據(jù)的超時(shí)時(shí)間 | 100 |
canal.mq.parallelThreadSize | mq數(shù)據(jù)轉(zhuǎn)換并行處理的并發(fā)度 | 8 |
canal.mq.flatMessage | 是否為json格式 如果設(shè)置為false,對(duì)應(yīng)MQ收到的消息為protobuf格式 需要通過(guò)CanalMessageDeserializer進(jìn)行解碼 | false |
--- | --- | --- |
canal.mq.topic | mq里的topic名 | 無(wú) |
canal.mq.dynamicTopic | mq里的動(dòng)態(tài)topic規(guī)則, 1.1.3版本支持 | 無(wú) |
canal.mq.partition | 單隊(duì)列模式的分區(qū)下標(biāo)泳赋, | 1 |
canal.mq.partitionsNum | 散列模式的分區(qū)數(shù) | 無(wú) |
canal.mq.partitionHash | 散列規(guī)則定義 庫(kù)名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支持新語(yǔ)法喇喉,見(jiàn)下文 |
參考文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
五祖今、MQ接收binlog代碼
https://gitee.com/huan1993/spring-cloud-parent/tree/master/canal/canal-kafka-consumer
六、參考文章
1拣技、canal 發(fā)送binlog到mq中性能測(cè)試.
2千诬、canal發(fā)送消息到kafka中