本次使用版本
kafka_2.12-2.70
Apache-Flink 1.12
Debezium 1.3
環(huán)境均為本地啟動(dòng).提前下好各種應(yīng)用包.
本文中涉及到的{flink-1.12.0} 均為文件放置的路徑地址
前提需要開啟MySQL bin_log 日志 關(guān)于如何開啟請(qǐng)自行搜索.
- 首先啟動(dòng)zookeeper
執(zhí)行命令:${kafka_2.12-2.7.0}% bin/zookeeper-server-start.sh config/zookeeper.propertieszookeeper.properties
在啟動(dòng)kafka
執(zhí)行命令:${kafka_2.12-2.7.0}% bin/kafka-server-start.sh config/server.properties
- 官網(wǎng)下載Debezium debezium-connector-mysql-1.3.1.Final-plugin.tar 將解壓的包放置
${kafka_2.12-2.7.0}/lib
以及自定義一個(gè)/Users/XXX/connect
下
修改${kafka_2.12-2.7.0} % vi config/connect-distributed.propertieskafka
將最后一項(xiàng)取消注釋加入:
plugin.path=/Users/XXX/connect
執(zhí)行:${kafka_2.12-2.7.0} % bin/connect-distributed.sh config/connect-distributed.properties
啟動(dòng)kafka connect
以上步驟正常后
創(chuàng)建一個(gè)topic連接器
指令:
${kafka_2.12-2.7.0} % bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic b2c_flink
查看kafka topic
${kafka_2.12-2.7.0} % bin/kafka-topics.sh --list --zookeeper localhost:2181
刪除為記錄與本次操作無關(guān)...
刪除kafka topic
${kafka_2.12-2.7.0} % bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic b2c_flink
使用curl 進(jìn)行測試連接器
curl -H "Accept:application/json" localhost:8083/
會(huì)有一個(gè)json返回結(jié)果
{
"version": "2.7.0",
"commit": "448719dc99a19793",
"kafka_cluster_id": "p-c8Qz4STr2C2LRzy-xB0g"
}
接著發(fā)送一個(gè)POST請(qǐng)求 讓連接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors -d '{ "name": "connector_demo", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "184054", "database.server.name": "big_data", "database.include.list": "big_data", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "b2c_flink","include.schema.changes": "true"} }'
JSON 內(nèi)容
{
"name":"connector_demo", #連接器名稱 唯一別重復(fù)
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector", # 使用到的類 參照官網(wǎng)
"tasks.max":"1",
"database.hostname":"localhost", # 數(shù)據(jù)庫連接地址
"database.port":"3306", # 端口
"database.user":"root", #用戶名
"database.password":"123456", # 密碼
"database.server.id":"184054", # 連接器服務(wù)唯一id
"database.server.name":"big_data", # 連接器名稱 后續(xù)會(huì)出現(xiàn)在kafka中topic內(nèi)
"database.include.list":"big_data", # 包含的庫列表
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.topic":"b2c_flink", # topic名稱
"include.schema.changes":"true"
}
}
發(fā)送完curl后會(huì)收到一個(gè)json返回值
HTTP/1.1 201 Created
Date: Thu, 31 Dec 2020 04:54:13 GMT
Location: http://localhost:8083/connectors/connector_demo
Content-Type: application/json
Content-Length: 507
Server: Jetty(9.4.33.v20201020)
也可以通過
curl -H "Accept:application/json" localhost:8083/connectors/ #查看所有連接器
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/connector_demo
#讀取連接器對(duì)應(yīng)內(nèi)容 connnecotrs/XXXX為json配置內(nèi)容中的name
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/connector_demo #刪除對(duì)應(yīng)連接器
完成以上后均已可以通過Debezium去讀取MySQL中變化數(shù)據(jù)
可以使用kafka kafka-console-consumer.sh進(jìn)行消費(fèi)數(shù)據(jù)
指令為:
${kafka_2.12-2.7.0} % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic b2c_flink
但是讀取到后發(fā)現(xiàn)內(nèi)容太多.
這個(gè)時(shí)候在查詢一次kafka內(nèi)的topic會(huì)發(fā)現(xiàn)多出很多topic
本次本地mysql創(chuàng)建的庫為big_data 設(shè)置的連接器名稱也為big_data.
查詢topic后出現(xiàn)
b2c_flink
big_data
big_data.big_data.save_result
big_data.big_data.test_result
列內(nèi)的big_data.big_data.save_result 以及test_result為本次所需使用到的表
接著到了本次重頭Flink
首先啟動(dòng)Flink集群(還為本地)
${flink-1.12.0} $ bin/start-cluster.sh
查看端口localhost:8081 是否能看見Dashboard 能看見即可
因?yàn)榭紤]到任務(wù)多..已提前修改過
vi conf/flink-conf.yaml
其中的
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
Flink所需要用到的jar包: flink-sql-connector-kafka_2.11-1.12.0.jar mysql-connector-java-5.1.30.jar flink-connector-jdbc_2.12-1.12.0.jar
以上jar包均為從官方地址下載
接著執(zhí)行
${flink-1.12.0} $ bin/sql-client.sh embedded
使用到的SQL
CREATE TABLE test_result (
id BIGINT,
test_result STRING
) WITH (
'connector' = 'kafka',
'topic' = 'big_data.big_data.test_result', #因Debezium創(chuàng)建出的topic
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json', # 采用到Flink 內(nèi)的轉(zhuǎn)換
'debezium-json.schema-include' = 'true'
)
# 目標(biāo)表 直接存入mysql中
CREATE TABLE save_result(
id BIGINT,
test_result STRING,
PRIMARY KEY (id) NOT ENFORCED #因主鍵問題需要設(shè)置否則會(huì)報(bào)錯(cuò)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/big_data', #本地?cái)?shù)據(jù)庫
'username' = 'root',
'password' = '123456',
'table-name' = 'save_result'
)
創(chuàng)建完畢連接后直接輸入
SQL>insert into save_result
SQL>select * from test_result;
這時(shí)可以在Flink Dashboard看到啟動(dòng)了一個(gè)running 任務(wù)
在原表中insert update 以及 delete數(shù)據(jù)后查看目標(biāo)表均可以發(fā)現(xiàn)已實(shí)時(shí)將數(shù)據(jù)添加或刪除.
至此一個(gè)實(shí)時(shí)數(shù)據(jù)均已使用了
倒騰了一天的東西終于出結(jié)果了...不容易啊..期間各種報(bào)錯(cuò)各種找包...
后續(xù)研究如何跟Oracle連接以及實(shí)時(shí).在進(jìn)行更新