注意:canal使用java寫的托慨,需要安裝jdk環(huán)境
canal介紹安裝
說(shuō)明
- canal是阿里開(kāi)源的監(jiān)控?cái)?shù)據(jù)庫(kù)增量信息的框架球散,支持的數(shù)據(jù)庫(kù)包括mysql.
- 主要原理是canal是模擬MySQL的從機(jī)(slave), 發(fā)送指令獲取master的增量信息
- 主要用途是實(shí)現(xiàn)跨區(qū)域MySQL數(shù)據(jù)備份處理
- 需要配置MySQL的參數(shù)和具體的詳細(xì)說(shuō)明七芭,參考canal官網(wǎng)https://github.com/alibaba/canal
- 本人安裝的版本是v1.1.4 , 下載鏈接https://github.com/alibaba/canal/releases下載的文件是canal.deployer-1.1.4.tar.gz
安裝說(shuō)明
- 采用的是單節(jié)點(diǎn)的方式,集群可以參考官網(wǎng)。環(huán)境為centos7, 阿里云服務(wù)器
-
將下載好的壓縮包解壓到指定的目錄中,會(huì)有4個(gè)文件夾
- bin是執(zhí)行的二進(jìn)制文件亦渗,conf為配置文件,logs為日志文件
-
打開(kāi)conf/example/instance.properties文件配置數(shù)據(jù)庫(kù)和kafka信息
- 注意:過(guò)濾表數(shù)據(jù)的配置 canal.instance.filter.regex 和 按照表名定義kafka主題的canal.instance.filter.regex配置是支持表達(dá)式的汁尺。例如:.*\..*是表示所有庫(kù)的所有表法精,test\..*是test庫(kù)的所有表,test.table1表示test庫(kù)的table1表
-
配置canal信息:打開(kāi)conf目錄下的canal.properties文件
- canal是支持連接池和kafka集群的,可以參考官網(wǎng)
- 進(jìn)入到bin目錄下搂蜓,執(zhí)行./startup.sh bin目錄下生成canal.pid狼荞,查看端口有11110-11112說(shuō)明啟動(dòng)成功,前提是先啟動(dòng)kafka
- 關(guān)閉canal是./stop.sh ,canal.pid文件將被刪除
kafka安裝
- 從官網(wǎng)下載壓縮文件帮碰,我安裝的版本是kafka_2.13-2.6.0.tgz
- 解壓到指定的目錄
- kafka是依賴zookeeper的粘秆,可以使用文件中已經(jīng)配置好的zookeeper,也可以單獨(dú)安裝
- kafka的執(zhí)行文件在bin目錄下收毫,提供一下幾個(gè)命令
### 開(kāi)啟本地zookeeper指令(使用kafka已經(jīng)配置好的)
./zookeeper-server-start.sh config/zookeeper.properties
### 開(kāi)啟kafka指令
./kafka-server-start.sh config/server.properties
### 查看指定topic信息, 例如查看topic為 example
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic example
### 監(jiān)聽(tīng)指定topic消息客戶端
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example
### 發(fā)送消息到指定topic客戶端
./kafka-console-producer.sh --topic=test --broker-list 127.0.0.1:9092
### 刪除指定topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic example
- 說(shuō)明9092是kafka的端口,2181是zookeeper的端口殷勘〈嗽伲可以通過(guò)指令 netstat -tunlp查看
-
kafka遇到的坑:本人的是在阿里云上不是的單節(jié)點(diǎn)kafka,如果在其他的地方通過(guò)公網(wǎng)去訪問(wèn)kafka是不行的玲销,即便開(kāi)啟了端口映射输拇,也不可以。參照網(wǎng)上的做法是:為該機(jī)器的ip申請(qǐng)一個(gè)域名贤斜,通過(guò)配置文件配置域名就可以在其他的通過(guò)域名加9092端口就可以訪問(wèn)了策吠,配置文件說(shuō)明,打開(kāi)config/server.properties
- 如果要關(guān)閉kafka必須先關(guān)閉kafka服務(wù)再關(guān)閉zookeeper服務(wù)瘩绒,關(guān)閉指令再bin目錄下
操作的go代碼
consumer.go
package consumer
import (
"context"
"github.com/Shopify/sarama"
)
type ConsumeTopic struct {
//一個(gè)消費(fèi)者組里包含幾個(gè)消費(fèi)者
ConsumeNum int
//消費(fèi)者組監(jiān)聽(tīng)的主題
Topics []string
//回調(diào)的Handler, 需要調(diào)用者自己實(shí)現(xiàn)
Callback sarama.ConsumerGroupHandler
}
type consumer struct {
//kafka地址集合: 例如[]string{域名:9092, ip:9093, ...}
//addressSet []string
//消費(fèi)者組綁定主題: key為group主題id
consumerGroupTopic map[string]ConsumeTopic
//consumer配置
consumerConfig *sarama.Config
client sarama.Client
consumerGroup []sarama.ConsumerGroup
}
//創(chuàng)建消費(fèi)者對(duì)象
//addressSet: kafka地址集合
//consumerGroupTopic: 消費(fèi)者組信息
//consumerConfig: 消費(fèi)者配置信息猴抹,如果為空就采用默認(rèn)的配置
func CreateNewConsumer(addressSet []string, consumerGroupTopic map[string]ConsumeTopic, consumerConfig *sarama.Config) *consumer {
consumer := consumer{
//addressSet: addressSet,
consumerGroupTopic: consumerGroupTopic,
consumerConfig: consumerConfig,
}
//啟用默認(rèn)配置
if consumer.consumerConfig == nil {
consumer.consumerConfig = sarama.NewConfig()
consumer.consumerConfig.Consumer.Return.Errors = false
consumer.consumerConfig.Version = sarama.V2_6_0_0
consumer.consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
}
client, err := sarama.NewClient(addressSet, consumer.consumerConfig)
if err != nil {
panic(err)
}
consumer.client = client
//開(kāi)始
consumer.init()
return &consumer
}
//初始化
func (c *consumer) init() {
for groupId, v := range c.consumerGroupTopic {
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, c.client)
if err != nil {
panic(err)
}
for i := 0; i < v.ConsumeNum; i++ {
go c.consume(&consumerGroup, c.consumerGroupTopic[groupId].Topics, c.consumerGroupTopic[groupId].Callback)
}
c.consumerGroup = append(c.consumerGroup, consumerGroup)
}
}
func (c *consumer) consume(group *sarama.ConsumerGroup, topics []string, consumerGroupHandler sarama.ConsumerGroupHandler) {
ctx := context.Background()
for {
err := (*group).Consume(ctx, topics, consumerGroupHandler)
if err != nil {
panic(err)
}
}
}
//關(guān)閉
func (c *consumer) Close() {
for _, v := range c.consumerGroup {
v.Close()
}
c.client.Close()
}
consumer_test.go
package consumer
import (
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
"testing"
)
type consumerGroupHandler struct {
name string
}
func (h consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n",
h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手動(dòng)確認(rèn)消息
sess.MarkMessage(msg, "")
}
return nil
}
func TestCreateNewConsumer(t *testing.T) {
cgt := map[string]ConsumeTopic{}
cgt["id01"] = ConsumeTopic{
ConsumeNum: 2,
Topics: []string{"read_book.wode"},
Callback: consumerGroupHandler{},
}
cClient := CreateNewConsumer([]string{"你的域名.com:9092"}, cgt, nil)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
cClient.Close()
}