1、什么是 kafka connect叁征?
??Kafka Connect 是一種用于在 kafka 和其他系統(tǒng)之間可擴展全闷、可靠的流式傳輸數(shù)據(jù)的工具漂问。它使得能夠快速定義將大量數(shù)據(jù)集合移入和移出 kafka 的連接器變得簡單蔼水。
??Kafka Connect 可以獲取整個數(shù)據(jù)庫或從應(yīng)用程序服務(wù)器收集指標到 kafka 主題震糖,使數(shù)據(jù)可用于低延遲的流處理。
??導(dǎo)出作業(yè)可以將數(shù)據(jù)從 kafka topic 傳輸?shù)蕉未鎯筒樵兿到y(tǒng)趴腋,或者傳遞到批處理系統(tǒng)以進行離線分析试伙。
2、功能
- kafka connector 通用框架于样,提供統(tǒng)一的集成 API
- 同時支持分布式模式和單機模式
- 自動化的 offset 管理,開發(fā)人員不必擔心錯誤處理的影響
- rest 接口潘靖,用來查看和管理 kafka connectors
3穿剖、概念
Connectors:通過管理任務(wù)來處理數(shù)據(jù)流的高級抽象
Tasks:數(shù)據(jù)寫入 kafka 和從 kafka 讀出的實現(xiàn)
Workers:運行 connectors 和 tasks 的進程
Converters:kafka connect 和其他存儲系統(tǒng)直接發(fā)送和接收數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù)
??Connector 決定了數(shù)據(jù)要從哪里復(fù)制過來以及數(shù)據(jù)應(yīng)該寫到哪里去,一個 connector 實例是一個需要負責在 kafka 和其他系統(tǒng)之間復(fù)制數(shù)據(jù)的邏輯作業(yè)卦溢,connector plugin 是 jar 文件糊余,實現(xiàn)了 kafka 定義的一些接口來完成特定的任務(wù)。
??Task 是 kafka connect 數(shù)據(jù)模型的主角单寂,每一個 connector 都會協(xié)調(diào)一系列的 task 去執(zhí)行任務(wù)贬芥,connector 可以把一項工作分割成許多的 task,然后再把 task 分發(fā)到各個 worker 中去執(zhí)行(分布式模式下)宣决,task 不自己保存自己的狀態(tài)信息蘸劈,而是交給特定的 kafka 主題去保存(config.storage.topic 和 status.storage.topic)。在分布式模式下有一個概念叫做任務(wù)再平衡(Task Rebalancing)尊沸,當一個 connector 第一次提交到集群時威沫,所有的 worker 都會做一個 task rebalancing 從而保證每一個 worker 都運行了差不多數(shù)量的工作,而不是所有的工作壓力都集中在某個 worker 進程中洼专,而當每個進程掛了之后也會執(zhí)行 task rebalance棒掠。
??Connectors 和 Tasks 都是邏輯工作單位,必須安排在進程中執(zhí)行屁商,而在 kafka connect 中烟很,這些進程就是 workers,分別有兩種 worker:standalone、distributed雾袱。生產(chǎn)中 distributed worker 表現(xiàn)很棒恤筛,因為它提供了可擴展性以及自動容錯的功能,可以用一個 group.id 來啟動很多 worker 進程谜酒,在有效的 worker 進程中它們會自動地去協(xié)調(diào)執(zhí)行 connector 和 task叹俏,如果新加或者掛了一個 worker,其他的 worker 會檢測到然后再重新分配 connector 和 task僻族。
??Converter 會把 bytes 數(shù)據(jù)轉(zhuǎn)換為 kafka connect 內(nèi)部的格式粘驰,也可以把 kafka connect 內(nèi)部存儲格式的數(shù)據(jù)變成 bytes,converter 對 connector 來說是解耦的述么,所以其他的 connector 都可以重用蝌数。例如使用了 avro converter,那么 jdbc connector 可以寫 avro 格式的數(shù)據(jù)到 kafka度秘,同時 hfds connector 也可以從 kafka 中讀出 avro 格式的數(shù)據(jù)顶伞。
4、實戰(zhàn)
??啟動 confluent
cd /app/confluent/bin
./confluent local start
??使用 standalone 模式啟動
# 啟動 kafka connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
connector1.properties [connector2.properties]
??在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件
??其中 connect-standalone.properties 是啟動 connect 服務(wù)組件自身的配置剑梳,內(nèi)容如下:
# kafka 服務(wù)
bootstrap.servers=localhost:9092
# 轉(zhuǎn)換器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否啟用轉(zhuǎn)換器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 偏移量存儲文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# 插件路徑
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components
# 默認端口為8083唆貌,需要修改端口時啟動以下配置
# rest.port=8084
(1)標準 connect
啟動一個帶 FileSource 的 Connect
??connect-file-source.properties 是一個 source connect 的模板配置,啟用該配置就能夠從指定文件中復(fù)制數(shù)據(jù)到 kafka 中垢乙,其默認的配置如下:
# connect 的名字
name=local-file-source
# 將文件讀取到數(shù)據(jù)流中
connector.class=FileStreamSource
# 工作線程是 1 個
tasks.max=1
# 讀取的文件名為 test.txt
file=test.txt
# 復(fù)制到的主題為 connect-test
topic=connect-test
??啟動 connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties
??結(jié)果報錯 Java 內(nèi)存不足
??關(guān)閉虛擬機锨咙,加大內(nèi)存,重啟服務(wù)器和 confluent追逮,再次啟動 connect酪刀,報錯 8083 端口已被綁定
??修改 connect-standalone.properties 配置中的端口為 8084 再啟動,新的報錯:不存在 source 配置文件中的指定的文件钮孵,在啟動路徑下創(chuàng)建文件骂倘,日志恢復(fù)正常
echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt
??可以通過 kafka tools 看到新增了主題 connect-test,寫入了3條數(shù)據(jù)
??往文件中寫入數(shù)據(jù)巴席,會報告又成功提交一次偏移量
# 寫數(shù)據(jù)
/app/confluent# echo -e "foo1\nbar1\n" >> test.txt
# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...
??然后可以看到主題中多了3條數(shù)據(jù)
啟動帶 FileSource 和 FileSink 的 Connect
??connect-file-sink.properties 是一個 source connect 的模板配置历涝,啟用該配置就能夠從指定文件中復(fù)制數(shù)據(jù)到 kafka 中,其默認的配置如下:
# connect 的名字
name=local-file-sink
# 從數(shù)據(jù)流中讀取數(shù)據(jù)到文件中
connector.class=FileStreamSink
# 工作線程是 1 個
tasks.max=1
# 寫入的文件是 test.sink.txt
file=test.sink.txt
# 讀取數(shù)據(jù)的主題是 connect-test
topics=connect-test
??啟動 connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
??可以看到自動創(chuàng)建了 test.sink.txt 文件
??同時可以看到 consumer 中多了一個 connect-local-file-sink 漾唉,偏移量為6(即已將6條數(shù)據(jù)都 sink 到了文件中)
(2)REST API
??使用 Rest API 必須啟動分布式模式睬关,通過 Rest API 可以管理集群中的 connect 服務(wù),默認端口是 8083毡证。
GET /connectors - 返回所有正在運行的connector名电爹。
POST /connectors - 新建一個connector;請求體必須是json格式并且需要包含name字段和config字段,name是connectors的名字料睛,config是json格式丐箩,必須包含connector的配置信息摇邦。
GET /connectors/{name} - 獲取指定connector的信息。
GET /connectors/{name}/config - 獲取指定connector的配置信息屎勘。
??在分布式模式下施籍,有兩種方式來配置 connector,第一種是類似 standalone 模式一樣概漱,寫好配置文件丑慎,然后在啟動時指定
$CONFLUENT_HOME/bin/connect-distributed \
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
??另外一種方式更加靈活,就是直接通過 Rest API 來對 connector 配置進行增刪查瓤摧。
??查看 connectors
??添加 connectors
??查看某個 connector
??這里指定的文件是相對路徑娃承,所以要在 $CONFLUENT_HOME/bin 路徑下創(chuàng)建一個 test-distributed.txt 文件
cd $CONFLUENT_HOME/bin
echo -e "foo\nbar\n" > test-distributed.txt
??可以看到出現(xiàn)了 connect-distributed 主題
??添加 sink
??從服務(wù)器可以看到產(chǎn)生了 sink 文件
??刪除 connector
??再次往 test-distributed.txt 文件中追加數(shù)據(jù)轴脐,可以看到 connect-distributed 主題中的數(shù)據(jù)增加了,source connector 依然在工作,但是 sink connector 已經(jīng)停止了呛哟,所以 test-distributed.sink.txt 文件中數(shù)據(jù)不再從主題中復(fù)制民轴。
【注意】
??如果要在腳本中處理涝开,發(fā)起HTTP請求怀吻,可以使用 curl 工具,將請求的配置在 json 文件中给赞,如:
curl -d @$CONFLUENT_HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
創(chuàng)建帶有 Convert 的 connector
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
??添加 connector(由于跟上述實驗 name 一致机打,所以需要先刪除或者換個 name)
??創(chuàng)建 test-transformation.txt 文件,可以看到自動創(chuàng)建了 connect-transformation 主題
??添加 sink
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-transformation.sink.txt",
"topics": "connect-transformation"
}
}
??可以看到 sink 自動生成了 test-transformation.sink.txt 文件片迅,并且內(nèi)容不是 source 過來的原始數(shù)據(jù)残邀,而是經(jīng)過 convertor 處理后的帶格式的數(shù)據(jù)
(3)MySQL Source、ESSink
??演示將數(shù)據(jù)從 MySQL 復(fù)制到 kafka 中障涯,再通過 kafka 將數(shù)據(jù)下沉到 ElasticSearch。這里 MySQL 是數(shù)據(jù)源膳汪,所以需要支持 MySQL 的 source connector唯蝶,ES 是目標數(shù)據(jù)系統(tǒng),所以需要支持 ES 的 sink connectors遗嗽,可以從 https://www.confluent.io/hub/ 下載粘我。
MySQL
??MySQL 下載插件搜索關(guān)鍵字 "JDBC",可以看到提供了在線安裝的腳本和離線安裝的包下載痹换。
??MySQL 環(huán)境準備
# 安裝 MySQL
sudo apt-get install mysql-server
# 安裝 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1
# 將 MySQL 驅(qū)動上傳到 confluent 目錄
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
【注意】下載下來的 jdbc connector 插件征字,在處理 mysql 時需要相應(yīng)的驅(qū)動,而插件不帶驅(qū)動娇豫,實際采集數(shù)據(jù)時會報錯匙姜,這時需要將驅(qū)動 jar 包拷貝到插件庫目錄中。
??數(shù)據(jù)準備冯痢,創(chuàng)建用戶并授權(quán)氮昧,用該用戶創(chuàng)建數(shù)據(jù)庫框杜、表和插入數(shù)據(jù)
grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);
??創(chuàng)建 source 配置文件(connect-mysql-source.properties),內(nèi)容如下:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表會被采集到的 topic 名前綴袖肥,比如表名叫 students咪辱,對應(yīng)的 topic 就為 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-
??啟動 mysql source connector
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties
??可以看到啟動之后,開啟了 JDBC source task椎组,然后執(zhí)行了查詢的 SQL油狂,最后提交和刷新的偏移量
??與此同時,可以看到 kafka 中新增了一個 topic test-mysql-jdbc-students
??里面有一條數(shù)據(jù)寸癌,如果此時往表中再插入兩條數(shù)據(jù)专筷,可以看到數(shù)據(jù)變成了3條
ElasticSearch
??ES 下載插件搜索關(guān)鍵字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector灵份、ElasticSearch Source Connector仁堪,注意有些插件是支持 source、sink填渠,有些是分開兩個插件弦聂。
??ES 環(huán)境準備
tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch
# 配置環(huán)境變量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH
# 安裝 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0
??啟動 ES
cd /app/elasticsearch
.bin/elasticsearch
??報錯不能以root用戶啟動
??創(chuàng)建用戶用戶組es,并修改 es 安裝目錄所屬用戶和組
chown -R es:es elasticsearch/
??再次啟動看到以下日志即正常
??配置 sink 配置文件(connect-es-sink.properties)氛什,內(nèi)容如下:
name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students
key.ignore=true
type.name=kafka-connect
??啟動 ES sink connector
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-es-sink.properties
??訪問 es 9092 端口查詢數(shù)據(jù)莺葫,可以查到有三條數(shù)據(jù)
# 查詢命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search
# 查到的結(jié)果
{
"took": 121,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+0",
"_score": 1,
"_source": {
"rollno": 1,
"name": "James",
"marks": "35"
}
},
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+1",
"_score": 1,
"_source": {
"rollno": 2,
"name": "James2",
"marks": "36"
}
},
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+2",
"_score": 1,
"_source": {
"rollno": 3,
"name": "James3",
"marks": "37"
}
}
]
}
}
??往數(shù)據(jù)庫插入一條新的數(shù)據(jù)
insert into students (name, marks) values ('James4', 38);
??可以看到 es 側(cè)接收到了這條數(shù)據(jù)