數(shù)據(jù)傳輸工具 —— Kafka Connect

1、什么是 kafka connect叁征?

??Kafka Connect 是一種用于在 kafka 和其他系統(tǒng)之間可擴展全闷、可靠的流式傳輸數(shù)據(jù)的工具漂问。它使得能夠快速定義將大量數(shù)據(jù)集合移入和移出 kafka 的連接器變得簡單蔼水。

??Kafka Connect 可以獲取整個數(shù)據(jù)庫或從應(yīng)用程序服務(wù)器收集指標到 kafka 主題震糖,使數(shù)據(jù)可用于低延遲的流處理。

??導(dǎo)出作業(yè)可以將數(shù)據(jù)從 kafka topic 傳輸?shù)蕉未鎯筒樵兿到y(tǒng)趴腋,或者傳遞到批處理系統(tǒng)以進行離線分析试伙。


2、功能

  • kafka connector 通用框架于样,提供統(tǒng)一的集成 API
  • 同時支持分布式模式和單機模式
  • 自動化的 offset 管理,開發(fā)人員不必擔心錯誤處理的影響
  • rest 接口潘靖,用來查看和管理 kafka connectors


3穿剖、概念

Connectors:通過管理任務(wù)來處理數(shù)據(jù)流的高級抽象
Tasks:數(shù)據(jù)寫入 kafka 和從 kafka 讀出的實現(xiàn)
Workers:運行 connectors 和 tasks 的進程
Converters:kafka connect 和其他存儲系統(tǒng)直接發(fā)送和接收數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù)

??Connector 決定了數(shù)據(jù)要從哪里復(fù)制過來以及數(shù)據(jù)應(yīng)該寫到哪里去,一個 connector 實例是一個需要負責在 kafka 和其他系統(tǒng)之間復(fù)制數(shù)據(jù)的邏輯作業(yè)卦溢,connector plugin 是 jar 文件糊余,實現(xiàn)了 kafka 定義的一些接口來完成特定的任務(wù)。

??Task 是 kafka connect 數(shù)據(jù)模型的主角单寂,每一個 connector 都會協(xié)調(diào)一系列的 task 去執(zhí)行任務(wù)贬芥,connector 可以把一項工作分割成許多的 task,然后再把 task 分發(fā)到各個 worker 中去執(zhí)行(分布式模式下)宣决,task 不自己保存自己的狀態(tài)信息蘸劈,而是交給特定的 kafka 主題去保存(config.storage.topicstatus.storage.topic)。在分布式模式下有一個概念叫做任務(wù)再平衡(Task Rebalancing)尊沸,當一個 connector 第一次提交到集群時威沫,所有的 worker 都會做一個 task rebalancing 從而保證每一個 worker 都運行了差不多數(shù)量的工作,而不是所有的工作壓力都集中在某個 worker 進程中洼专,而當每個進程掛了之后也會執(zhí)行 task rebalance棒掠。

??Connectors 和 Tasks 都是邏輯工作單位,必須安排在進程中執(zhí)行屁商,而在 kafka connect 中烟很,這些進程就是 workers,分別有兩種 worker:standalone、distributed雾袱。生產(chǎn)中 distributed worker 表現(xiàn)很棒恤筛,因為它提供了可擴展性以及自動容錯的功能,可以用一個 group.id 來啟動很多 worker 進程谜酒,在有效的 worker 進程中它們會自動地去協(xié)調(diào)執(zhí)行 connector 和 task叹俏,如果新加或者掛了一個 worker,其他的 worker 會檢測到然后再重新分配 connector 和 task僻族。

??Converter 會把 bytes 數(shù)據(jù)轉(zhuǎn)換為 kafka connect 內(nèi)部的格式粘驰,也可以把 kafka connect 內(nèi)部存儲格式的數(shù)據(jù)變成 bytes,converter 對 connector 來說是解耦的述么,所以其他的 connector 都可以重用蝌数。例如使用了 avro converter,那么 jdbc connector 可以寫 avro 格式的數(shù)據(jù)到 kafka度秘,同時 hfds connector 也可以從 kafka 中讀出 avro 格式的數(shù)據(jù)顶伞。




4、實戰(zhàn)

??啟動 confluent

cd /app/confluent/bin
./confluent local start

??使用 standalone 模式啟動

# 啟動 kafka connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
connector1.properties [connector2.properties]

??在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件

??其中 connect-standalone.properties 是啟動 connect 服務(wù)組件自身的配置剑梳,內(nèi)容如下:

# kafka 服務(wù)
bootstrap.servers=localhost:9092

# 轉(zhuǎn)換器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 是否啟用轉(zhuǎn)換器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 偏移量存儲文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# 插件路徑
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components

# 默認端口為8083唆貌,需要修改端口時啟動以下配置
# rest.port=8084


(1)標準 connect

啟動一個帶 FileSource 的 Connect

??connect-file-source.properties 是一個 source connect 的模板配置,啟用該配置就能夠從指定文件中復(fù)制數(shù)據(jù)到 kafka 中垢乙,其默認的配置如下:

# connect 的名字
name=local-file-source
# 將文件讀取到數(shù)據(jù)流中
connector.class=FileStreamSource
# 工作線程是 1 個
tasks.max=1
# 讀取的文件名為 test.txt
file=test.txt
# 復(fù)制到的主題為 connect-test
topic=connect-test

??啟動 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties

??結(jié)果報錯 Java 內(nèi)存不足

??關(guān)閉虛擬機锨咙,加大內(nèi)存,重啟服務(wù)器和 confluent追逮,再次啟動 connect酪刀,報錯 8083 端口已被綁定

??修改 connect-standalone.properties 配置中的端口為 8084 再啟動,新的報錯:不存在 source 配置文件中的指定的文件钮孵,在啟動路徑下創(chuàng)建文件骂倘,日志恢復(fù)正常

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

??可以通過 kafka tools 看到新增了主題 connect-test,寫入了3條數(shù)據(jù)

??往文件中寫入數(shù)據(jù)巴席,會報告又成功提交一次偏移量

# 寫數(shù)據(jù)
/app/confluent# echo -e "foo1\nbar1\n" >> test.txt

# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...

??然后可以看到主題中多了3條數(shù)據(jù)


啟動帶 FileSource 和 FileSink 的 Connect

??connect-file-sink.properties 是一個 source connect 的模板配置历涝,啟用該配置就能夠從指定文件中復(fù)制數(shù)據(jù)到 kafka 中,其默認的配置如下:

# connect 的名字
name=local-file-sink
# 從數(shù)據(jù)流中讀取數(shù)據(jù)到文件中
connector.class=FileStreamSink
# 工作線程是 1 個
tasks.max=1
# 寫入的文件是 test.sink.txt
file=test.sink.txt
# 讀取數(shù)據(jù)的主題是 connect-test
topics=connect-test

??啟動 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

??可以看到自動創(chuàng)建了 test.sink.txt 文件

??同時可以看到 consumer 中多了一個 connect-local-file-sink 漾唉,偏移量為6(即已將6條數(shù)據(jù)都 sink 到了文件中)



(2)REST API

??使用 Rest API 必須啟動分布式模式睬关,通過 Rest API 可以管理集群中的 connect 服務(wù),默認端口是 8083毡证。

GET /connectors - 返回所有正在運行的connector名电爹。
POST /connectors - 新建一個connector;請求體必須是json格式并且需要包含name字段和config字段,name是connectors的名字料睛,config是json格式丐箩,必須包含connector的配置信息摇邦。
GET /connectors/{name} - 獲取指定connector的信息。
GET /connectors/{name}/config - 獲取指定connector的配置信息屎勘。

??在分布式模式下施籍,有兩種方式來配置 connector,第一種是類似 standalone 模式一樣概漱,寫好配置文件丑慎,然后在啟動時指定

$CONFLUENT_HOME/bin/connect-distributed \
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

??另外一種方式更加靈活,就是直接通過 Rest API 來對 connector 配置進行增刪查瓤摧。

??查看 connectors

??添加 connectors

??查看某個 connector

??這里指定的文件是相對路徑娃承,所以要在 $CONFLUENT_HOME/bin 路徑下創(chuàng)建一個 test-distributed.txt 文件

cd $CONFLUENT_HOME/bin
echo -e "foo\nbar\n" > test-distributed.txt

??可以看到出現(xiàn)了 connect-distributed 主題

??添加 sink

??從服務(wù)器可以看到產(chǎn)生了 sink 文件

??刪除 connector

??再次往 test-distributed.txt 文件中追加數(shù)據(jù)轴脐,可以看到 connect-distributed 主題中的數(shù)據(jù)增加了,source connector 依然在工作,但是 sink connector 已經(jīng)停止了呛哟,所以 test-distributed.sink.txt 文件中數(shù)據(jù)不再從主題中復(fù)制民轴。

【注意】

??如果要在腳本中處理涝开,發(fā)起HTTP請求怀吻,可以使用 curl 工具,將請求的配置在 json 文件中给赞,如:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors


創(chuàng)建帶有 Convert 的 connector

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

??添加 connector(由于跟上述實驗 name 一致机打,所以需要先刪除或者換個 name)

??創(chuàng)建 test-transformation.txt 文件,可以看到自動創(chuàng)建了 connect-transformation 主題

??添加 sink

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-transformation.sink.txt",
        "topics": "connect-transformation"
    }
}

??可以看到 sink 自動生成了 test-transformation.sink.txt 文件片迅,并且內(nèi)容不是 source 過來的原始數(shù)據(jù)残邀,而是經(jīng)過 convertor 處理后的帶格式的數(shù)據(jù)


(3)MySQL Source、ESSink

??演示將數(shù)據(jù)從 MySQL 復(fù)制到 kafka 中障涯,再通過 kafka 將數(shù)據(jù)下沉到 ElasticSearch。這里 MySQL 是數(shù)據(jù)源膳汪,所以需要支持 MySQL 的 source connector唯蝶,ES 是目標數(shù)據(jù)系統(tǒng),所以需要支持 ES 的 sink connectors遗嗽,可以從 https://www.confluent.io/hub/ 下載粘我。

MySQL

??MySQL 下載插件搜索關(guān)鍵字 "JDBC",可以看到提供了在線安裝的腳本和離線安裝的包下載痹换。

??MySQL 環(huán)境準備

# 安裝 MySQL
sudo apt-get install mysql-server

# 安裝 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1

# 將 MySQL 驅(qū)動上傳到 confluent 目錄
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

【注意】下載下來的 jdbc connector 插件征字,在處理 mysql 時需要相應(yīng)的驅(qū)動,而插件不帶驅(qū)動娇豫,實際采集數(shù)據(jù)時會報錯匙姜,這時需要將驅(qū)動 jar 包拷貝到插件庫目錄中。

??數(shù)據(jù)準備冯痢,創(chuàng)建用戶并授權(quán)氮昧,用該用戶創(chuàng)建數(shù)據(jù)庫框杜、表和插入數(shù)據(jù)

grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);

??創(chuàng)建 source 配置文件(connect-mysql-source.properties),內(nèi)容如下:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表會被采集到的 topic 名前綴袖肥,比如表名叫 students咪辱,對應(yīng)的 topic 就為 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-

??啟動 mysql source connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties

??可以看到啟動之后,開啟了 JDBC source task椎组,然后執(zhí)行了查詢的 SQL油狂,最后提交和刷新的偏移量

??與此同時,可以看到 kafka 中新增了一個 topic test-mysql-jdbc-students

??里面有一條數(shù)據(jù)寸癌,如果此時往表中再插入兩條數(shù)據(jù)专筷,可以看到數(shù)據(jù)變成了3條



ElasticSearch

??ES 下載插件搜索關(guān)鍵字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector灵份、ElasticSearch Source Connector仁堪,注意有些插件是支持 source、sink填渠,有些是分開兩個插件弦聂。

??ES 環(huán)境準備

tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch

# 配置環(huán)境變量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH

# 安裝 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0

??啟動 ES

cd /app/elasticsearch
.bin/elasticsearch

??報錯不能以root用戶啟動

??創(chuàng)建用戶用戶組es,并修改 es 安裝目錄所屬用戶和組

chown -R es:es elasticsearch/

??再次啟動看到以下日志即正常

??配置 sink 配置文件(connect-es-sink.properties)氛什,內(nèi)容如下:

name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students 
key.ignore=true
type.name=kafka-connect

??啟動 ES sink connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-es-sink.properties

??訪問 es 9092 端口查詢數(shù)據(jù)莺葫,可以查到有三條數(shù)據(jù)

# 查詢命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search

# 查到的結(jié)果
{
    "took": 121,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1,
        "hits": [
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+0",
                "_score": 1,
                "_source": {
                    "rollno": 1,
                    "name": "James",
                    "marks": "35"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+1",
                "_score": 1,
                "_source": {
                    "rollno": 2,
                    "name": "James2",
                    "marks": "36"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+2",
                "_score": 1,
                "_source": {
                    "rollno": 3,
                    "name": "James3",
                    "marks": "37"
                }
            }
        ]
    }
}

??往數(shù)據(jù)庫插入一條新的數(shù)據(jù)

insert into students (name, marks) values ('James4', 38);

??可以看到 es 側(cè)接收到了這條數(shù)據(jù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市枪眉,隨后出現(xiàn)的幾起案子捺檬,更是在濱河造成了極大的恐慌,老刑警劉巖贸铜,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堡纬,死亡現(xiàn)場離奇詭異,居然都是意外死亡蒿秦,警方通過查閱死者的電腦和手機烤镐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來棍鳖,“玉大人炮叶,你說我怎么就攤上這事《纱Γ” “怎么了镜悉?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長医瘫。 經(jīng)常有香客問我侣肄,道長,這世上最難降的妖魔是什么醇份? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任茫孔,我火速辦了婚禮叮喳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘缰贝。我一直安慰自己馍悟,他們只是感情好,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布剩晴。 她就那樣靜靜地躺著锣咒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赞弥。 梳的紋絲不亂的頭發(fā)上毅整,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天,我揣著相機與錄音绽左,去河邊找鬼悼嫉。 笑死,一個胖子當著我的面吹牛拼窥,可吹牛的內(nèi)容都是我干的戏蔑。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼鲁纠,長吁一口氣:“原來是場噩夢啊……” “哼总棵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起改含,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤情龄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后捍壤,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體骤视,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年鹃觉,在試婚紗的時候發(fā)現(xiàn)自己被綠了专酗。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡帜慢,死狀恐怖笼裳,靈堂內(nèi)的尸體忽然破棺而出唯卖,到底是詐尸還是另有隱情粱玲,我是刑警寧澤,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布拜轨,位于F島的核電站抽减,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏橄碾。R本人自食惡果不足惜卵沉,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一颠锉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧史汗,春花似錦琼掠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至戈毒,卻和暖如春艰猬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背埋市。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工冠桃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人道宅。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓食听,卻偏偏與公主長得像,于是被迫代替她去往敵國和親培己。 傳聞我的和親對象是個殘疾皇子碳蛋,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容