利用Canal投遞MySQL Binlog到Kafka

Update:
Canal與Camus的結(jié)合使用嚎于,見http://www.reibang.com/p/4c4213385368

Canal是阿里開源的一個比較有名的Java中間件询件,主要作用是接入數(shù)據(jù)庫(MySQL)的binlog日志,實現(xiàn)數(shù)據(jù)的增量訂閱褒翰、解析與消費盈包,即CDC(Change Data Capture)代乃。近期我們計劃將數(shù)據(jù)倉庫由基于Sqoop的離線按天入庫方式改為近實時入庫旬牲,Canal自然是非常符合需求的。

Canal的模塊設(shè)計精妙,但代碼質(zhì)量低原茅,閱讀起來比較困難吭历。在其GitHub Wiki中詳細(xì)敘述了其設(shè)計思路,值得學(xué)習(xí)擂橘,這里不再贅述晌区,參見:https://github.com/alibaba/canal/wiki/Introduction

在最新的Canal 1.1.x版本中贝室,其新增了對消息隊列的原生支持契讲,通過不算復(fù)雜的配置可以直接將binlog投遞到Kafka或者RocketMQ仿吞,無需再自己寫producer程序(源碼中有現(xiàn)成的CanalKafkaProducer和CanalRocketMQProducer類)滑频。

我們使用目前的穩(wěn)定版本1.1.2小試一下。

Canal最簡單原理示意

前置工作

  • 保證MySQL的binlog-format=ROW
  • 為canal用戶配置MySQL slave的權(quán)限
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

canal.properties設(shè)置

順便還可以復(fù)習(xí)一下Kafka producer的一些配置參數(shù)含義唤冈。

# 默認(rèn)值tcp峡迷,這里改為投遞到Kafka
canal.serverMode = kafka
# Kafka bootstrap.servers,可以不用寫上全部的brokers
canal.mq.servers = 10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
# 投遞失敗的重試次數(shù)你虹,默認(rèn)0绘搞,改為2
canal.mq.retries = 2
# Kafka batch.size,即producer一個微批次的大小傅物,默認(rèn)16K夯辖,這里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一個請求的最大大小董饰,默認(rèn)1M蒿褂,這里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender線程在檢查微批次是否就緒時的超時卒暂,默認(rèn)0ms啄栓,這里改為200ms
# 滿足batch.size和linger.ms其中之一,就會發(fā)送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory也祠,緩存大小昙楚,默認(rèn)32M
canal.mq.bufferMemory = 33554432
# 獲取binlog數(shù)據(jù)的批次大小,默認(rèn)50
canal.mq.canalBatchSize = 50
# 獲取binlog數(shù)據(jù)的超時時間诈嘿,默認(rèn)200ms
canal.mq.canalGetTimeout = 200
# 是否將binlog轉(zhuǎn)為JSON格式堪旧。如果為false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 壓縮類型奖亚,官方文檔沒有描述
canal.mq.compressionType = none
# Kafka acks崎场,默認(rèn)all,表示分區(qū)leader會等所有follower同步完才給producer發(fā)送ack
# 0表示不等待ack遂蛀,1表示leader寫入完畢之后直接ack
canal.mq.acks = all
# Kafka消息投遞是否使用事務(wù)
# 主要針對flatMessage的異步發(fā)送和動態(tài)多topic消息投遞進(jìn)行事務(wù)控制來保持和Canal binlog位置的一致性
# flatMessage模式下建議開啟
canal.mq.transaction = true

instance.properties設(shè)置

# 需要接入binlog的表名谭跨,支持正則,但這里手動指定了每張表,注意轉(zhuǎn)義
canal.instance.filter.regex=mall\\.address,mall\\.orders,mall\\.order_product,mall\\.product,mall\\.mall_category,mall\\.mall_comment,mall\\.mall_goods_category,mall\\.mall_goods_info,mall\\.mall_goods_wish,mall\\.mall_new_tags_v2,mall\\.mall_topic,mall\\.mall_topic_goods,mall\\.mall_user_cart_info
# 黑名單
canal.instance.filter.black.regex=
# 消息隊列對應(yīng)topic名
canal.mq.topic=binlog_mall_1
# 發(fā)送到哪一個分區(qū)螃宙,由于下面用hash做分區(qū)蛮瞄,因此不設(shè)
#canal.mq.partition=0
# 根據(jù)正則表達(dá)式做動態(tài)topic,目前采用單topic谆扎,因此也不設(shè)
#canal.mq.dynamicTopic=mall\\..*
# 10個分區(qū)
canal.mq.partitionsNum=10
# 各個表的主鍵挂捅,依照主鍵來做hash分區(qū)
canal.mq.partitionHash=mall.address:address_id,mall.orders:order_id,mall.order_product:order_product_id,mall.product:product_id,mall.mall_category:category_id,mall.mall_comment:comment_id,mall.mall_goods_category:goods_category_id,mall.mall_goods_info:goods_id,mall.mall_goods_wish:id,mall.mall_new_tags_v2:tags_id,mall.mall_topic:topic_id,mall.mall_topic_goods:id,mall.mall_user_cart_info:id

上面的配置相當(dāng)靈活,dynamicTopic選項可以控制單topic還是多topic堂湖,partitionHash選項可以控制單partition還是多partition闲先。

但是binlog是有序的,必須保證它進(jìn)入到消息隊列之后仍然有序无蜂。參照以上的配置伺糠,有以下幾個方法:

  • 單topic單partition:可以嚴(yán)格保證與binlog相同的順序,但效率比較低斥季,TPS只有2~3K训桶。
  • 多topic單partition:由于是按照表劃分topic,因此可以保證表級別的有序性酣倾,但是每個表的熱度不一樣舵揭,對于熱點表仍然會有性能問題。
  • 單/多topic多partition:按照給定的hash方法來劃分partition躁锡,性能無疑是最好的午绳。但必須要多加小心,每個表的hash依據(jù)都必須是其主鍵或者主鍵組映之。只有保證每表每主鍵binlog的順序性拦焚,才能準(zhǔn)確恢復(fù)變動數(shù)據(jù)。

經(jīng)過權(quán)衡惕医,我們采用單topic多partition的方式來處理耕漱。還可以參考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

Kafka版本兼容性

通過閱讀Canal工程中的pom文件抬伺,得知它集成的Kafka版本為1.1.1螟够,而我們的集群中,之前為了兼容一些老舊業(yè)務(wù)峡钓,采用的Kafka版本為0.8.2妓笙。起初我們做試驗時,消息能夠正常發(fā)送能岩,但topic中始終沒有任何消息寞宫。

這是因為在0.10.2版本之前,Kafka只對客戶端版本有向前兼容性拉鹃,亦即高版本broker能夠處理低版本client的請求辈赋,但低版本broker不能處理高版本client的請求鲫忍。0.10.2版本提出了雙向兼容性(bidirectional compatibility)改進(jìn),低版本broker與高版本client也能兼容了钥屈,但仍然對過時的0.8.x版本沒有支持悟民。

鑒于1.1.1版本producer發(fā)送的消息不能被0.8.2版本的broker解析,后來我們索性將Kafka broker全部升級到了1.0.1(對應(yīng)CDH Kafka版本為3.1.1篷就,是目前最新的)射亏,兼容性問題就解決了。

另外Kafka自帶有命令行工具kafka-broker-api-versions.sh來檢測broker支持的API版本竭业,這里不表智润。

Canal 1.1.2源碼中的一處小bug

一切配置好后,運行bin/startup.sh啟動Canal未辆,觀察canal.log窟绷,發(fā)現(xiàn)瘋狂報空指針異常,如下圖所示鼎姐。

大量NPE

通過仔細(xì)觀察钾麸,發(fā)現(xiàn)對于類型為UPDATE的消息沒有問題更振,但一旦觸發(fā)INSERT就跪掉了炕桨。

繼續(xù)追根溯源,找到com.alibaba.otter.canal.protocol.FlatMessage類中肯腕,有一個messagePartition()方法献宫,顯然是做hash分區(qū)用的,其前半段源碼如下实撒,已經(jīng)改正確了:

    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
                                                 Map<String, String> pkHashConfig) {
        if (partitionsNum == null) {
            partitionsNum = 1;
        }
        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
        if (pk == null || flatMessage.getIsDdl()) {
            partitionMessages[0] = flatMessage;
        } else {
            if (flatMessage.getData() != null) {
                int idx = 0;
                for (Map<String, String> row : flatMessage.getData()) {
                    String value = null;
                    if (flatMessage.getOld() != null) {
                        // [!]
                        Map<String, String> o = flatMessage.getOld().get(idx);
                        // String value;
                        // 如果old中有pk值說明主鍵有修改, 以舊的主鍵值hash為準(zhǔn)
                        if (o != null && o.containsKey(pk)) {
                            value = o.get(pk);
                        }
                    }
                    if (value == null) {
                        value = row.get(pk);
                    }
                    if (value == null) {
                        value = "";
                    }
                    int hash = value.hashCode();
                    int pkHash = Math.abs(hash) % partitionsNum;
................

注意上面代碼中打[!]標(biāo)記的地方姊途,原有的代碼根本沒有對flatMessage.getOld()的結(jié)果做空校驗,而INSERT操作恰好又沒有變動之前的記錄信息知态,自然就會產(chǎn)生NPE了捷兰。對于當(dāng)前一個穩(wěn)定版本release而言,代碼中出現(xiàn)低級錯誤實屬不該负敏。

修正這個bug之后贡茅,將canal.protocol模塊重新打成jar包,替換掉原有deployer包中的同名文件其做,問題解決顶考。

附上Canal內(nèi)部Kafka producer類的實現(xiàn)源碼

從中可以看出,producer還沒有啟用事務(wù)性妖泄,也就是說上面的canal.mq.transactions配置項其實是無效的驹沿。

public class CanalKafkaProducer implements CanalMQProducer {
    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
    private Producer<String, Message> producer;
    private Producer<String, String>  producer2;                                                 // 用于扁平message的數(shù)據(jù)投遞
    private MQProperties              kafkaProperties;

    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("retries", kafkaProperties.getRetries());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            producer = new KafkaProducer<String, Message>(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            producer2 = new KafkaProducer<String, String>(properties);
        }

        // producer.initTransactions();
    }

    @Override
    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            if (producer != null) {
                producer.close();
            }
            if (producer2 != null) {
                producer2.close();
            }
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        } finally {
            logger.info("## kafka producer is down.");
        }
    }

    @Override
    public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {

        // producer.beginTransaction();
        if (!kafkaProperties.getFlatMessage()) {
            try {
                ProducerRecord<String, Message> record;
                if (canalDestination.getPartition() != null) {
                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
                        canalDestination.getPartition(),
                        null,
                        message);
                } else {
                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                }

                producer.send(record).get();

                if (logger.isDebugEnabled()) {
                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
                        canalDestination.getTopic(),
                        message.toString());
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                // producer.abortTransaction();
                callback.rollback();
                return;
            }
        } else {
            // 發(fā)送扁平數(shù)據(jù)json
            List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {
                    if (canalDestination.getPartition() != null) {
                        try {
                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                canalDestination.getTopic(),
                                canalDestination.getPartition(),
                                null,
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                            producer2.send(record).get();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                            // producer.abortTransaction();
                            callback.rollback();
                            return;
                        }
                    } else {
                        if (canalDestination.getPartitionHash() != null
                            && !canalDestination.getPartitionHash().isEmpty()) {
                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
                                canalDestination.getPartitionsNum(),
                                canalDestination.getPartitionHash());
                            int length = partitionFlatMessage.length;
                            for (int i = 0; i < length; i++) {
                                FlatMessage flatMessagePart = partitionFlatMessage[i];
                                if (flatMessagePart != null) {
                                    try {
                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                            canalDestination.getTopic(),
                                            i,
                                            null,
                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
                                        producer2.send(record).get();
                                    } catch (Exception e) {
                                        logger.error(e.getMessage(), e);
                                        // producer.abortTransaction();
                                        callback.rollback();
                                        return;
                                    }
                                }
                            }
                        } else {
                            try {
                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                    canalDestination.getTopic(),
                                    0,
                                    null,
                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                                producer2.send(record).get();
                            } catch (Exception e) {
                                logger.error(e.getMessage(), e);
                                // producer.abortTransaction();
                                callback.rollback();
                                return;
                            }
                        }
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                            canalDestination.getTopic(),
                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                    }
                }
            }
        }

        // producer.commitTransaction();
        callback.commit();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蹈胡,隨后出現(xiàn)的幾起案子渊季,更是在濱河造成了極大的恐慌朋蔫,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件却汉,死亡現(xiàn)場離奇詭異斑举,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)病涨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進(jìn)店門富玷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人既穆,你說我怎么就攤上這事赎懦。” “怎么了幻工?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵励两,是天一觀的道長。 經(jīng)常有香客問我囊颅,道長当悔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任踢代,我火速辦了婚禮盲憎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘胳挎。我一直安慰自己饼疙,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布慕爬。 她就那樣靜靜地躺著窑眯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪医窿。 梳的紋絲不亂的頭發(fā)上磅甩,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天,我揣著相機(jī)與錄音姥卢,去河邊找鬼卷要。 笑死,一個胖子當(dāng)著我的面吹牛隔显,可吹牛的內(nèi)容都是我干的却妨。 我是一名探鬼主播,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼括眠,長吁一口氣:“原來是場噩夢啊……” “哼彪标!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起掷豺,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤捞烟,失蹤者是張志新(化名)和其女友劉穎薄声,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體题画,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡默辨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了苍息。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缩幸。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖竞思,靈堂內(nèi)的尸體忽然破棺而出表谊,到底是詐尸還是另有隱情,我是刑警寧澤盖喷,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布爆办,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜换薄,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望跨算。 院中可真熱鬧,春花似錦沾歪、人聲如沸漂彤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至立润,卻和暖如春狂窑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背桑腮。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工泉哈, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人破讨。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓丛晦,卻偏偏與公主長得像,于是被迫代替她去往敵國和親提陶。 傳聞我的和親對象是個殘疾皇子烫沙,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容