Kafka Connect 實戰(zhàn) ---- 入門

前提

首先你需要了解MQ / Kafka相關的知識

本文目標

了解 Kafka Connect 基本概念與功能

什么是Kafka Connect

Kafka Connect 是一款可擴展并且可靠地在 Apache Kafka 和其他系統(tǒng)之間進行數(shù)據(jù)傳輸?shù)墓ぞ摺?可以很簡單的定義 connectors(連接器) 將大量數(shù)據(jù)遷入、遷出Kafka脊另。

例如我現(xiàn)在想要把數(shù)據(jù)從MySQL遷移到ElasticSearch识埋,為了保證高效和數(shù)據(jù)不會丟失袱讹,我們選擇MQ作為中間件保存數(shù)據(jù)幻林。這時候我們需要一個生產(chǎn)者線程愚墓,不斷的從MySQL中讀取數(shù)據(jù)并發(fā)送到MQ杠氢,還需要一個消費者線程消費MQ的數(shù)據(jù)寫到ElasticSearch摘能,這件事情似乎很簡單,不需要任何框架敲街。

但是如果我們想要保證生產(chǎn)者和消費者服務的高可用性团搞,例如重啟后生產(chǎn)者恢復到之前讀取的位置,分布式部署并且節(jié)點宕機后將任務轉移到其他節(jié)點多艇。如果要加上這些的話逻恐,這件事就變得復雜起來了,而Kafka Connect 已經(jīng)為我們造好這些輪子峻黍。

Kafka Connect 如何工作复隆?

image.png

Kafka Connect 特性如下:

  • Kafka 連接器的通用框架:Kafka Connect 標準化了其他數(shù)據(jù)系統(tǒng)與Kafka的集成,從而簡化了連接器的開發(fā)姆涩,部署和管理
  • 支持分布式模式和單機模式部署
  • Rest API:通過簡單的Rest API管理連接器
  • 偏移量管理:針對Source和Sink都有相應的偏移量(Offset)管理方案,程序員無須關心Offset 的提交
  • 分布式模式可擴展的阵面,支持故障轉移

Kafka Connect Concepts

這里簡單介紹下Kafka Connect 的概念與組成
更多細節(jié)請參考 ?? https://docs.confluent.io/platform/current/connect/concepts.html

Connectors

連接器洪鸭,分為兩種 Source(從源數(shù)據(jù)庫拉取數(shù)據(jù)寫入Kafka),Sink(從Kafka消費數(shù)據(jù)寫入目標數(shù)據(jù))

連接器其實并不參與實際的數(shù)據(jù)copy览爵,連接器負責管理Task置鼻。連接器中定義了對應Task的類型蜓竹,對外提供配置選項(用戶創(chuàng)建連接器時需要提供對應的配置信息)。并且連接器還可以決定啟動多少個Task線程俱济。

用戶可以通過Rest API 啟停連接器嘶是,查看連接器狀態(tài)

Confluent 已經(jīng)提供了許多成熟的連接器,傳送門?? https://www.confluent.io/product/connectors/

Task

實際進行數(shù)據(jù)傳輸?shù)膯卧肼担瓦B接器一樣同樣分為 Source和Sink

Task的配置和狀態(tài)存儲在Kafka的Topic中蔚携,config.storage.topicstatus.storage.topic。我們可以隨時啟動誊辉,停止任務亡脑,以提供彈性邀跃、可擴展的數(shù)據(jù)管道

Worker

剛剛我們講的Connectors 和Task 屬于邏輯單元奈偏,而Worker 是實際運行邏輯單元的進程,Worker 分為兩種模式丽涩,單機模式和分布式模式

單機模式:比較簡單裁蚁,但是功能也受限枉证,只有一些特殊的場景會使用到,例如收集主機的日志毡鉴,通常來說更多的是使用分布式模式

分布式模式:為Kafka Connect提供了可擴展和故障轉移秒赤。相同group.id的Worker,會自動組成集群陈瘦。當新增Worker潮售,或者有Worker掛掉時,集群會自動協(xié)調(diào)分配所有的Connector 和 Task(這個過程稱為Rebalance)

Worker 集群

當使用Worker集群時,創(chuàng)建連接器肮帐,或者連接器Task數(shù)量變動時泪姨,都會觸發(fā)Rebalance 以保證集群各個Worker節(jié)點負載均衡。但是當Task 進入Fail狀態(tài)的時候并不會觸發(fā) Rebalance诀黍,只能通過Rest Api 對Task進行重啟

Converters

Kafka Connect 通過 Converter 將數(shù)據(jù)在Kafka(字節(jié)數(shù)組)與Task(Object)之間進行轉換

默認支持以下Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 需要使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (無需 Schema Registry): 轉換為json結構
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 簡單的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何轉換

Converters 與 Connector 是解耦的眯勾,下圖展示了在Kafka Connect中,Converter 在何時進行數(shù)據(jù)轉換

image.png

Transforms

連接器可以通過配置Transform 實現(xiàn)對單個消息(對應代碼中的Record)的轉換和修改也颤,可以配置多個Transform 組成一個郁轻。例如讓所有消息的topic加一個前綴好唯、sink無法消費source 寫入的數(shù)據(jù)格式,這些場景都可以使用Transform 解決

Transform 如果配置在Source 則在Task之后執(zhí)行蜕提,如果配置在Sink 則在Task之前執(zhí)行

Dead Letter Queue

與其他MQ不同靶端,Kafka 并沒有死信隊列這個功能。但是Kafka Connect提供了這一功能它浅。

當Sink Task遇到無法處理的消息镣煮,會根據(jù)errors.tolerance配置項決定如何處理典唇,默認情況下(errors.tolerance=none) Sink 遇到無法處理的記錄會直接拋出異常胯府,Task進入Fail 狀態(tài)。開發(fā)人員需要根據(jù)Worker的錯誤日志解決問題炎咖,然后重啟Task寒波,才能繼續(xù)消費數(shù)據(jù)

設置 errors.tolerance=all俄烁,Sink Task 會忽略所有的錯誤,繼續(xù)處理粹胯。Worker中不會有任何錯誤日志】雒可以通過配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 讓無法處理的消息路由到 Dead Letter Topic

快速上手

下面我來實戰(zhàn)一下竹观,如何使用Kafka Connect栈幸,我們先定一個小目標 將MySQL中的全量數(shù)據(jù)同步到Redis


  1. 新建文件 docker-compose.yaml
version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zk
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      # 宿主機ip
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    depends_on:
      - zookeeper

在終端上執(zhí)行 docker-compose -f docker-compose.yaml up -d 啟動docker容器

準備連接器,這里我是自己寫了一個簡單的連接器??玩焰。下載地址:https://github.com/TavenYin/kafka-connect-example/blob/master/release/kafka-connector-example-bin.jar

# 將連接器上傳到kafka 容器中
docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
  1. 修改配置并啟動Worker
#在配置文件末尾追加 plugin.path=/opt/connectors
vi /opt/kafka/config/connect-distributed.properties

# 啟動Worker
bin/connect-distributed.sh -daemon config/connect-distributed.properties
  1. 準備MySQL

由于我宿主機里已經(jīng)安裝了MySQL昔园,我就直接使用了并炮,使用如下Sql創(chuàng)建表。創(chuàng)建之后隨便造幾條數(shù)據(jù)

CREATE TABLE `test_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ;
  1. 創(chuàng)建連接器

新建 source.json

{
  "name" : "example-source",
  "config" : {
    "connector.class" : "com.github.taven.source.ExampleSourceConnector",
    "tasks.max" : "1",
    "database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true",
    "database.username" : "root",
    "database.password" : "root",
    "database.tables" : "test_user"
  }
}

向Worker 發(fā)送請求荤西,創(chuàng)建連接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

source.json 中邪锌,有一些屬性是Kafka Connect 提供的癌瘾,例如上述文件中 name, connector.class, tasks.max,剩下的屬性可以在開發(fā)Connector 時自定義妇萄。關于Kafka Connect Configuration 相關請閱讀這里 ?? https://docs.confluent.io/platform/current/installation/configuration/connect/index.html

  1. 確認數(shù)據(jù)是否寫入Kafka

首先查看一下Worker中的運行狀態(tài)冠句,如果Task的state = RUNNING幸乒,代表Task沒有拋出任何異常,平穩(wěn)運行

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},
"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}

查看kafka 中Topic 是否創(chuàng)建

bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test_user

這些Topic 都存儲了什么基茵?

  • __consumer_offsets: 記錄所有Kafka Consumer Group的Offset
  • connect-configs: 存儲連接器的配置拱层,對應Connect 配置文件中config.storage.topic
  • connect-offsets: 存儲Source 的Offset,對應Connect 配置文件中offset.storage.topic
  • connect-status: 連接器與Task的狀態(tài)径缅,對應Connect 配置文件中status.storage.topic

查看topic中數(shù)據(jù)烙肺,此時說明MySQL數(shù)據(jù)已經(jīng)成功寫入Kafka

bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}

數(shù)據(jù)結構為Json桃笙,可以回顧一下上面我們修改的connect-distributed.properties,默認提供的Converter 為JsonConverter鼠锈,所有的數(shù)據(jù)包含schema 和 payload 兩項是因為配置文件中默認啟動了key.converter.schemas.enable=truevalue.converter.schemas.enable=true兩個選項

  1. 啟動 Sink

新建sink.json

{
  "name" : "example-sink",
  "config" : {
    "connector.class" : "com.github.taven.sink.ExampleSinkConnector",
    "topics" : "test_user, test_order",
    "tasks.max" : "1",
    "redis.host" : "192.168.3.21",
    "redis.port" : "6379",
    "redis.password" : "",
    "redis.database" : "0"
  }
}

創(chuàng)建Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json

然后查看Sink Connector Status购笆,這里我發(fā)現(xiàn)由于我的Redis端口只對localhost開發(fā)虚循,所以這里我的Task Fail了,修改了Redis配置之后铺遂,重啟Task curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart

在確認了Sink Status 為RUNNING 后剪廉,可以確認下Redis中是否有數(shù)據(jù)

關于Kafka Connect Rest api 文檔斗蒋,請參考??https://docs.confluent.io/platform/current/connect/references/restapi.html

  1. 如何查看Sink Offset消費情況

使用命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink

下圖代表 test_user topic 三條數(shù)據(jù)已經(jīng)全部消費

Kafka Connect 高級功能

我們的小目標已經(jīng)達成了∪矗現(xiàn)在兩個Task無事可做妇押,正好借此機會我們來體驗一下可擴展和故障轉移

集群擴展

我啟動了開發(fā)環(huán)境中的Kafka Connect Worker,根據(jù)官方文檔所示通過注冊同一個Kafka 并且使用相同的 group.id=connect-cluster 可以自動組成集群

啟動我開發(fā)環(huán)境中的Kafka Connect俊马,之后檢查兩個連接器狀態(tài)

bash-4.4#  curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#

bash-4.4#  curl -X GET localhost:8083/connectors/example-sink/status
{"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}

觀察worker_id 可以發(fā)現(xiàn)柴我,兩個Connectors 已經(jīng)分別運行在兩個Worker上了

故障轉移

此時我們通過kill pid結束docker中的Worker進程觀察是否宕機之后自動轉移,但是發(fā)現(xiàn)Task并沒有轉移到僅存的Worker中聋伦,Task 狀態(tài)變?yōu)閁NASSIGNED界睁,這是為啥呢?難道是有什么操作錯了逾礁?

在網(wǎng)上查閱了一番得知杨赤,Kafka Connect 的集群擴展與故障轉移機制是通過Kafka Rebalance 協(xié)議實現(xiàn)的(Consumer也是該協(xié)議)疾牲,當Worker節(jié)點宕機時間超過 scheduled.rebalance.max.delay.ms 時,Kafka才會將其踢出集群焰枢。踢出后將該節(jié)點的連接器和任務分配給其他Worker媳维,scheduled.rebalance.max.delay.ms默認值為五分鐘霍转。

后來經(jīng)測試發(fā)現(xiàn),五分鐘之后查看連接器信息低滩,已經(jīng)轉移到存活的Worker節(jié)點了

本來還計劃寫一下如何開發(fā)連接器和Kafka Rebalance岩喷,但是這篇已經(jīng)夠長了,所以計劃后續(xù)更新這兩篇文章

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婶溯,一起剝皮案震驚了整個濱河市迄委,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌死讹,老刑警劉巖曲梗,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件虏两,死亡現(xiàn)場離奇詭異,居然都是意外死亡笤虫,警方通過查閱死者的電腦和手機祖凫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門惠况,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人峦睡,你說我怎么就攤上這事权埠。” “怎么了龙屉?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵转捕,是天一觀的道長漫雷。 經(jīng)常有香客問我降盹,道長,這世上最難降的妖魔是什么价捧? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任涡戳,我火速辦了婚禮渔彰,結果婚禮上,老公的妹妹穿的比我還像新娘宝惰。我一直安慰自己再沧,他們只是感情好,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布淤堵。 她就那樣靜靜地躺著拐邪,像睡著了一般屎即。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上乘陪,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天啡邑,我揣著相機與錄音井赌,去河邊找鬼。 笑死流部,一個胖子當著我的面吹牛纹坐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播果漾,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼绒障,長吁一口氣:“原來是場噩夢啊……” “哼户辱!你這毒婦竟也來了?” 一聲冷哼從身側響起蒋伦,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤焚鹊,失蹤者是張志新(化名)和其女友劉穎末患,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚷炉,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡申屹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年隧膏,在試婚紗的時候發(fā)現(xiàn)自己被綠了胞枕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡决乎,死狀恐怖构诚,靈堂內(nèi)的尸體忽然破棺而出铆惑,到底是詐尸還是另有隱情凳寺,我是刑警寧澤彤侍,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布盏阶,位于F島的核電站闻书,受9級特大地震影響魄眉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜坑律,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一晃择、第九天 我趴在偏房一處隱蔽的房頂上張望宫屠。 院中可真熱鬧,春花似錦抵栈、人聲如沸坤次。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胰舆。三九已至蹬挤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間倦零,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工蹋嵌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留栽烂,地道東北人恋脚。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓糟描,卻偏偏與公主長得像,于是被迫代替她去往敵國和親躬拢。 傳聞我的和親對象是個殘疾皇子见间,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容