前言:
kafka使用的版本需要0.9.0.X以上焦履;
kafka自帶的shell腳本分別為connect-standalone.sh(單機(jī))和connect-distributed.sh(分布式)已旧;這些腳本放在kafka安裝包的bin目錄下掸冤;
1.外部文件的數(shù)據(jù)導(dǎo)入到kafka中
目前只說明connect-standalone.sh的用法即可毁渗,用這個(gè)腳本導(dǎo)入數(shù)據(jù),所有的kafka集群都已經(jīng)能收到導(dǎo)入的數(shù)據(jù)迈套;
比如:需要導(dǎo)入數(shù)據(jù)甲脏,文件的名稱:testkafka.txt;文件的內(nèi)容:
yesy
alipay
wenxin
導(dǎo)入數(shù)據(jù)的命令為:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
配置文件:connect-standalone.properties?配置信息如下:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
connect-file-source.properties配置信息如下:file配置需要導(dǎo)入的文件名稱蒙秒,topic配置kafka的主題勃黍;
name=local-file-source-test
connector.class=FileStreamSource
tasks.max=1
file=testkafka.txt
topic=connect-test
transforms=MakeMap
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=data
數(shù)據(jù)導(dǎo)入到kafka格式展示,根據(jù)配置文件key.converter和value.converter配置信息可知晕讲,導(dǎo)入kafka的數(shù)據(jù)格式為json格式覆获,具體示例如下:
{"data":"yesy"}
{"data":"alipay"}
{"data":"wenxin"}
2.把kafka的數(shù)據(jù)導(dǎo)出到文件
導(dǎo)出命令為:bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
配置文件connect-standalone.properties的配置跟導(dǎo)入數(shù)據(jù)的配置一致
配置文件connect-file-sink.properties的配置信息如下:file配置存儲(chǔ)導(dǎo)出數(shù)據(jù)的文件名稱,topics配置導(dǎo)出數(shù)據(jù)的kafka主題瓢省;
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sinkp.txt
topics=connect-test
導(dǎo)出的數(shù)據(jù)也是json數(shù)據(jù)
{data=alipay}
{data=wenxin}
{data=yesy}