canal+kafka+go處理Mysql數(shù)據(jù)庫(kù)增量信息

注意:canal使用java寫的托慨,需要安裝jdk環(huán)境

canal介紹安裝

說(shuō)明

  • canal是阿里開(kāi)源的監(jiān)控?cái)?shù)據(jù)庫(kù)增量信息的框架球散,支持的數(shù)據(jù)庫(kù)包括mysql.
  • 主要原理是canal是模擬MySQL的從機(jī)(slave), 發(fā)送指令獲取master的增量信息
  • 主要用途是實(shí)現(xiàn)跨區(qū)域MySQL數(shù)據(jù)備份處理
  • 需要配置MySQL的參數(shù)和具體的詳細(xì)說(shuō)明七芭,參考canal官網(wǎng)https://github.com/alibaba/canal
  • 本人安裝的版本是v1.1.4 , 下載鏈接https://github.com/alibaba/canal/releases下載的文件是canal.deployer-1.1.4.tar.gz

安裝說(shuō)明

  • 采用的是單節(jié)點(diǎn)的方式,集群可以參考官網(wǎng)。環(huán)境為centos7, 阿里云服務(wù)器
  • 將下載好的壓縮包解壓到指定的目錄中,會(huì)有4個(gè)文件夾
    image.png
  • bin是執(zhí)行的二進(jìn)制文件亦渗,conf為配置文件,logs為日志文件
  • 打開(kāi)conf/example/instance.properties文件配置數(shù)據(jù)庫(kù)和kafka信息
    image.png
    image.png
    image.png
  • 注意:過(guò)濾表數(shù)據(jù)的配置 canal.instance.filter.regex 和 按照表名定義kafka主題的canal.instance.filter.regex配置是支持表達(dá)式的汁尺。例如:.*\..*是表示所有庫(kù)的所有表法精,test\..*是test庫(kù)的所有表,test.table1表示test庫(kù)的table1表
  • 配置canal信息:打開(kāi)conf目錄下的canal.properties文件
    image.png
    image.png
    image.png
  • canal是支持連接池和kafka集群的,可以參考官網(wǎng)
  • 進(jìn)入到bin目錄下搂蜓,執(zhí)行./startup.sh bin目錄下生成canal.pid狼荞,查看端口有11110-11112說(shuō)明啟動(dòng)成功,前提是先啟動(dòng)kafka
  • 關(guān)閉canal是./stop.sh ,canal.pid文件將被刪除

kafka安裝

  • 從官網(wǎng)下載壓縮文件帮碰,我安裝的版本是kafka_2.13-2.6.0.tgz
  • 解壓到指定的目錄
  • kafka是依賴zookeeper的粘秆,可以使用文件中已經(jīng)配置好的zookeeper,也可以單獨(dú)安裝
  • kafka的執(zhí)行文件在bin目錄下收毫,提供一下幾個(gè)命令
### 開(kāi)啟本地zookeeper指令(使用kafka已經(jīng)配置好的)
./zookeeper-server-start.sh config/zookeeper.properties

### 開(kāi)啟kafka指令
./kafka-server-start.sh config/server.properties

### 查看指定topic信息, 例如查看topic為 example
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic example

### 監(jiān)聽(tīng)指定topic消息客戶端
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example

### 發(fā)送消息到指定topic客戶端
./kafka-console-producer.sh --topic=test --broker-list 127.0.0.1:9092

### 刪除指定topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092  --delete --topic example

  • 說(shuō)明9092是kafka的端口,2181是zookeeper的端口殷勘〈嗽伲可以通過(guò)指令 netstat -tunlp查看
  • kafka遇到的坑:本人的是在阿里云上不是的單節(jié)點(diǎn)kafka,如果在其他的地方通過(guò)公網(wǎng)去訪問(wèn)kafka是不行的玲销,即便開(kāi)啟了端口映射输拇,也不可以。參照網(wǎng)上的做法是:為該機(jī)器的ip申請(qǐng)一個(gè)域名贤斜,通過(guò)配置文件配置域名就可以在其他的通過(guò)域名加9092端口就可以訪問(wèn)了策吠,配置文件說(shuō)明,打開(kāi)config/server.properties
    image.png
  • 如果要關(guān)閉kafka必須先關(guān)閉kafka服務(wù)再關(guān)閉zookeeper服務(wù)瘩绒,關(guān)閉指令再bin目錄下

操作的go代碼

consumer.go

package consumer

import (
    "context"
    "github.com/Shopify/sarama"
)

type ConsumeTopic struct {
    //一個(gè)消費(fèi)者組里包含幾個(gè)消費(fèi)者
    ConsumeNum int
    //消費(fèi)者組監(jiān)聽(tīng)的主題
    Topics []string
    //回調(diào)的Handler, 需要調(diào)用者自己實(shí)現(xiàn)
    Callback sarama.ConsumerGroupHandler
}

type consumer struct {
    //kafka地址集合: 例如[]string{域名:9092, ip:9093, ...}
    //addressSet []string
    //消費(fèi)者組綁定主題: key為group主題id
    consumerGroupTopic map[string]ConsumeTopic
    //consumer配置
    consumerConfig *sarama.Config
    client         sarama.Client

    consumerGroup []sarama.ConsumerGroup
}

//創(chuàng)建消費(fèi)者對(duì)象
//addressSet: kafka地址集合
//consumerGroupTopic: 消費(fèi)者組信息
//consumerConfig: 消費(fèi)者配置信息猴抹,如果為空就采用默認(rèn)的配置
func CreateNewConsumer(addressSet []string, consumerGroupTopic map[string]ConsumeTopic, consumerConfig *sarama.Config) *consumer {
    consumer := consumer{
        //addressSet:         addressSet,
        consumerGroupTopic: consumerGroupTopic,
        consumerConfig:     consumerConfig,
    }
    //啟用默認(rèn)配置
    if consumer.consumerConfig == nil {
        consumer.consumerConfig = sarama.NewConfig()
        consumer.consumerConfig.Consumer.Return.Errors = false
        consumer.consumerConfig.Version = sarama.V2_6_0_0
        consumer.consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    client, err := sarama.NewClient(addressSet, consumer.consumerConfig)
    if err != nil {
        panic(err)
    }
    consumer.client = client

    //開(kāi)始
    consumer.init()
    return &consumer
}

//初始化
func (c *consumer) init() {
    for groupId, v := range c.consumerGroupTopic {
        consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, c.client)
        if err != nil {
            panic(err)
        }

        for i := 0; i < v.ConsumeNum; i++ {
            go c.consume(&consumerGroup, c.consumerGroupTopic[groupId].Topics, c.consumerGroupTopic[groupId].Callback)
        }

        c.consumerGroup = append(c.consumerGroup, consumerGroup)
    }
}

func (c *consumer) consume(group *sarama.ConsumerGroup, topics []string, consumerGroupHandler sarama.ConsumerGroupHandler) {
    ctx := context.Background()
    for {
        err := (*group).Consume(ctx, topics, consumerGroupHandler)
        if err != nil {
            panic(err)
        }
    }
}

//關(guān)閉
func (c *consumer) Close() {
    for _, v := range c.consumerGroup {
        v.Close()
    }
    c.client.Close()
}

consumer_test.go

package consumer

import (
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "testing"
)

type consumerGroupHandler struct {
    name string
}

func (h consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",
            h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        // 手動(dòng)確認(rèn)消息
        sess.MarkMessage(msg, "")
    }
    return nil
}

func TestCreateNewConsumer(t *testing.T) {
    cgt := map[string]ConsumeTopic{}
    cgt["id01"] = ConsumeTopic{
        ConsumeNum: 2,
        Topics:     []string{"read_book.wode"},
        Callback:   consumerGroupHandler{},
    }
    cClient := CreateNewConsumer([]string{"你的域名.com:9092"}, cgt, nil)

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:
    }
    cClient.Close()
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市锁荔,隨后出現(xiàn)的幾起案子蟀给,更是在濱河造成了極大的恐慌,老刑警劉巖阳堕,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跋理,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡恬总,警方通過(guò)查閱死者的電腦和手機(jī)前普,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)壹堰,“玉大人拭卿,你說(shuō)我怎么就攤上這事∽号裕” “怎么了记劈?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)并巍。 經(jīng)常有香客問(wèn)我目木,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任刽射,我火速辦了婚禮军拟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘誓禁。我一直安慰自己懈息,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布摹恰。 她就那樣靜靜地躺著辫继,像睡著了一般。 火紅的嫁衣襯著肌膚如雪俗慈。 梳的紋絲不亂的頭發(fā)上姑宽,一...
    開(kāi)封第一講書(shū)人閱讀 51,165評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音闺阱,去河邊找鬼炮车。 笑死,一個(gè)胖子當(dāng)著我的面吹牛酣溃,可吹牛的內(nèi)容都是我干的瘦穆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赊豌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扛或!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起碘饼,我...
    開(kāi)封第一講書(shū)人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤告喊,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后派昧,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體黔姜,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年蒂萎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了秆吵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡五慈,死狀恐怖纳寂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情泻拦,我是刑警寧澤毙芜,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站争拐,受9級(jí)特大地震影響腋粥,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一隘冲、第九天 我趴在偏房一處隱蔽的房頂上張望闹瞧。 院中可真熱鬧,春花似錦展辞、人聲如沸奥邮。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)洽腺。三九已至,卻和暖如春覆旱,著一層夾襖步出監(jiān)牢的瞬間已脓,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工通殃, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人厕宗。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓画舌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親已慢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子曲聂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353