Mysql+Debezium+kafka+Flink 寫入MySQL 實(shí)時(shí)數(shù)據(jù)

本次使用版本
kafka_2.12-2.70
Apache-Flink 1.12
Debezium 1.3
環(huán)境均為本地啟動(dòng).提前下好各種應(yīng)用包.

本文中涉及到的{kafka_2.12-2.7.0}以及{flink-1.12.0} 均為文件放置的路徑地址

前提需要開啟MySQL bin_log 日志 關(guān)于如何開啟請(qǐng)自行搜索.

  • 首先啟動(dòng)zookeeper
    執(zhí)行命令: ${kafka_2.12-2.7.0}% bin/zookeeper-server-start.sh config/zookeeper.propertieszookeeper.properties
    在啟動(dòng)kafka
    執(zhí)行命令:${kafka_2.12-2.7.0}% bin/kafka-server-start.sh config/server.properties
  • 官網(wǎng)下載Debezium debezium-connector-mysql-1.3.1.Final-plugin.tar 將解壓的包放置${kafka_2.12-2.7.0}/lib 以及自定義一個(gè) /Users/XXX/connect
    修改 ${kafka_2.12-2.7.0} % vi config/connect-distributed.propertieskafka
    將最后一項(xiàng)取消注釋加入:
    plugin.path=/Users/XXX/connect
    執(zhí)行:${kafka_2.12-2.7.0} % bin/connect-distributed.sh config/connect-distributed.properties
    啟動(dòng)kafka connect
    以上步驟正常后
    創(chuàng)建一個(gè)topic連接器
    指令:
${kafka_2.12-2.7.0} % bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic b2c_flink

查看kafka topic

${kafka_2.12-2.7.0} % bin/kafka-topics.sh --list --zookeeper localhost:2181

刪除為記錄與本次操作無關(guān)...
刪除kafka topic

${kafka_2.12-2.7.0} % bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic b2c_flink

使用curl 進(jìn)行測試連接器
curl -H "Accept:application/json" localhost:8083/
會(huì)有一個(gè)json返回結(jié)果

{
"version": "2.7.0",
"commit": "448719dc99a19793",
"kafka_cluster_id": "p-c8Qz4STr2C2LRzy-xB0g"
}

接著發(fā)送一個(gè)POST請(qǐng)求 讓連接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors -d '{ "name": "connector_demo", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "184054", "database.server.name": "big_data", "database.include.list": "big_data", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "b2c_flink","include.schema.changes": "true"} }'

JSON 內(nèi)容

{
    "name":"connector_demo", #連接器名稱 唯一別重復(fù)
    "config":{
        "connector.class":"io.debezium.connector.mysql.MySqlConnector", # 使用到的類 參照官網(wǎng)
        "tasks.max":"1",
        "database.hostname":"localhost", # 數(shù)據(jù)庫連接地址
        "database.port":"3306", # 端口
        "database.user":"root", #用戶名
        "database.password":"123456", # 密碼
        "database.server.id":"184054", # 連接器服務(wù)唯一id
        "database.server.name":"big_data", # 連接器名稱 后續(xù)會(huì)出現(xiàn)在kafka中topic內(nèi)
        "database.include.list":"big_data", # 包含的庫列表
        "database.history.kafka.bootstrap.servers":"localhost:9092", 
        "database.history.kafka.topic":"b2c_flink", # topic名稱
        "include.schema.changes":"true"
    }
}

發(fā)送完curl后會(huì)收到一個(gè)json返回值

HTTP/1.1 201 Created
Date: Thu, 31 Dec 2020 04:54:13 GMT
Location: http://localhost:8083/connectors/connector_demo
Content-Type: application/json
Content-Length: 507
Server: Jetty(9.4.33.v20201020)

也可以通過

curl -H "Accept:application/json" localhost:8083/connectors/  #查看所有連接器
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/connector_demo
#讀取連接器對(duì)應(yīng)內(nèi)容 connnecotrs/XXXX為json配置內(nèi)容中的name
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/connector_demo #刪除對(duì)應(yīng)連接器

完成以上后均已可以通過Debezium去讀取MySQL中變化數(shù)據(jù)
可以使用kafka kafka-console-consumer.sh進(jìn)行消費(fèi)數(shù)據(jù)
指令為:

${kafka_2.12-2.7.0} % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic b2c_flink  

但是讀取到后發(fā)現(xiàn)內(nèi)容太多.
這個(gè)時(shí)候在查詢一次kafka內(nèi)的topic會(huì)發(fā)現(xiàn)多出很多topic
本次本地mysql創(chuàng)建的庫為big_data 設(shè)置的連接器名稱也為big_data.
查詢topic后出現(xiàn)

b2c_flink
big_data
big_data.big_data.save_result
big_data.big_data.test_result

列內(nèi)的big_data.big_data.save_result 以及test_result為本次所需使用到的表
接著到了本次重頭Flink
首先啟動(dòng)Flink集群(還為本地)

${flink-1.12.0} $ bin/start-cluster.sh

查看端口localhost:8081 是否能看見Dashboard 能看見即可
因?yàn)榭紤]到任務(wù)多..已提前修改過

vi conf/flink-conf.yaml
其中的
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1

Flink所需要用到的jar包: flink-sql-connector-kafka_2.11-1.12.0.jar mysql-connector-java-5.1.30.jar flink-connector-jdbc_2.12-1.12.0.jar
以上jar包均為從官方地址下載
接著執(zhí)行

${flink-1.12.0} $ bin/sql-client.sh embedded

使用到的SQL

CREATE TABLE test_result (
  id BIGINT,
  test_result STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'big_data.big_data.test_result', #因Debezium創(chuàng)建出的topic 
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json', # 采用到Flink 內(nèi)的轉(zhuǎn)換
 'debezium-json.schema-include' = 'true' 
)

# 目標(biāo)表 直接存入mysql中
CREATE TABLE save_result(
    id BIGINT,
    test_result STRING,
    PRIMARY KEY (id) NOT ENFORCED  #因主鍵問題需要設(shè)置否則會(huì)報(bào)錯(cuò)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/big_data', #本地?cái)?shù)據(jù)庫
    'username' = 'root',
    'password' = '123456',
    'table-name' = 'save_result'
)

創(chuàng)建完畢連接后直接輸入

SQL>insert into save_result 
SQL>select * from test_result;

這時(shí)可以在Flink Dashboard看到啟動(dòng)了一個(gè)running 任務(wù)
在原表中insert update 以及 delete數(shù)據(jù)后查看目標(biāo)表均可以發(fā)現(xiàn)已實(shí)時(shí)將數(shù)據(jù)添加或刪除.
至此一個(gè)實(shí)時(shí)數(shù)據(jù)均已使用了
倒騰了一天的東西終于出結(jié)果了...不容易啊..期間各種報(bào)錯(cuò)各種找包...
后續(xù)研究如何跟Oracle連接以及實(shí)時(shí).在進(jìn)行更新

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涮雷,一起剝皮案震驚了整個(gè)濱河市善涨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖势誊,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)嘶是,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蛛碌,“玉大人聂喇,你說我怎么就攤上這事。” “怎么了希太?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵克饶,是天一觀的道長。 經(jīng)常有香客問我誊辉,道長矾湃,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任堕澄,我火速辦了婚禮邀跃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘奈偏。我一直安慰自己坞嘀,他們只是感情好躯护,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布惊来。 她就那樣靜靜地躺著,像睡著了一般棺滞。 火紅的嫁衣襯著肌膚如雪裁蚁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天继准,我揣著相機(jī)與錄音枉证,去河邊找鬼。 笑死移必,一個(gè)胖子當(dāng)著我的面吹牛室谚,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播崔泵,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼秒赤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了憎瘸?” 一聲冷哼從身側(cè)響起入篮,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎幌甘,沒想到半個(gè)月后潮售,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡锅风,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年酥诽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片皱埠。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肮帐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漱逸,到底是詐尸還是另有隱情泪姨,我是刑警寧澤游沿,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站肮砾,受9級(jí)特大地震影響诀黍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜仗处,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一眯勾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧婆誓,春花似錦吃环、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至文留,卻和暖如春好唯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背燥翅。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來泰國打工骑篙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人森书。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓靶端,卻偏偏與公主長得像,于是被迫代替她去往敵國和親凛膏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子杨名,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容