Flink 寫入數(shù)據(jù)到Kafka
前言
通過Flink官網(wǎng)可以看到Flink里面就默認支持了不少sink脱茉,比如也支持Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就來看看如何將數(shù)據(jù)寫入到Kafka纤掸。
準備
Flink里面支持Kafka 0.8阐肤、0.9凫佛、0.10、0.11.
這里我們需要安裝下Kafka孕惜,請對應(yīng)添加對應(yīng)的Flink Kafka connector依賴的版本愧薛,這里我們使用的是0.11 版本:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
目前我們先看下本地Kafka是否有這個student-write topic呢?需要執(zhí)行下這個命令:
? kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
lambda-pipeline-topic
metrics
my-topic
my-topic-thread1
my-topic-thread2
qb_ad
qbad
qbad_test
student
topic1
wikipedia
wikipedia_stream
如果等下我們的程序運行起來后衫画,再次執(zhí)行這個命令出現(xiàn)student-write topic毫炉,那么證明我的程序確實起作用了,已經(jīng)將其他集群的Kafka數(shù)據(jù)寫入到本地Kafka了削罩。
程序代碼
public class FlinkSinkToKafka {
private static final String READ_TOPIC = "student";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "student-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
READ_TOPIC, //這個 kafka topic 需要和上面的工具類的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
student.print();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "student-write");
student.addSink(new FlinkKafkaProducer011<>(
"localhost:9092",
"student-write",
new SimpleStringSchema()
)).name("flink-connectors-kafka")
.setParallelism(5);
env.execute("flink learning connectors kafka");
}
}
運行結(jié)果
運行flink程序之后再次查看topic瞄勾,發(fā)現(xiàn)多了student-write這個topic
? kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
lambda-pipeline-topic
metrics
my-topic
my-topic-thread1
my-topic-thread2
qb_ad
qbad
qbad_test
student
student-write
topic1
wikipedia
wikipedia_stream
查看topic student-write
? kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic student-write
Topic:student-write PartitionCount:1 ReplicationFactor:1 Configs:
Topic: student-write Partition: 0 Leader: 0 Replicas: 0 Isr: 0
IDEA打印如下:
2> {"age":20,"id":2,"name":"itzzy2","password":"password2"}
6> {"age":24,"id":6,"name":"itzzy6","password":"password6"}
2> {"age":28,"id":10,"name":"itzzy10","password":"password10"}
6> {"age":32,"id":14,"name":"itzzy14","password":"password14"}
2> {"age":36,"id":18,"name":"itzzy18","password":"password18"}
6> {"age":40,"id":22,"name":"itzzy22","password":"password22"}
2> {"age":44,"id":26,"name":"itzzy26","password":"password26"}
6> {"age":48,"id":30,"name":"itzzy30","password":"password30"}
2> {"age":52,"id":34,"name":"itzzy34","password":"password34"}
6> {"age":56,"id":38,"name":"itzzy38","password":"password38"}
查看topic信息
? kafka_2.11-0.10.2.0 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic student
student:0:0
? kafka_2.11-0.10.2.0 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic student_write
student_write:0:0
查看日志logs
? kafka_2.11-0.10.2.0 ll /tmp/kafka-logs/student-write-0
total 0
-rw-r--r-- 1 zzy wheel 10485760 Jan 29 12:03 00000000000000000000.index
-rw-r--r-- 1 zzy wheel 0 Jan 29 12:03 00000000000000000000.log
-rw-r--r-- 1 zzy wheel 10485756 Jan 29 12:03 00000000000000000000.timeindex
? kafka_2.11-0.10.2.0 ll /tmp/kafka-logs/student-0
total 0
-rw-r--r-- 1 zzy wheel 10485760 Jan 29 12:03 00000000000000000000.index
-rw-r--r-- 1 zzy wheel 0 Jan 29 12:03 00000000000000000000.log
-rw-r--r-- 1 zzy wheel 10485756 Jan 29 12:03 00000000000000000000.timeindex
分析
上面代碼我們使用 Flink Kafka Producer 只傳了三個參數(shù):brokerList、topicId弥激、serializationSchema(序列化)
其實也可以傳入多個參數(shù)進去进陡,現(xiàn)在有的參數(shù)用的是默認參數(shù),因為這個內(nèi)容比較多微服,后面可以抽出一篇文章單獨來講趾疚。
總結(jié)
本篇文章寫了Flink讀取Kafka集群的數(shù)據(jù),然后寫入到本地的Kafka上以蕴。
附上kafka生產(chǎn)者代碼
public class KafkaUtils {
private static final String broker_list = "localhost:9092";
private static final String topic = "student-1"; //kafka topic 需要和 flink 程序用同一個 topic
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
try {
for (int i = 1; i <= 100; i++) {
Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(student));
}
Thread.sleep(3000);
}catch (Exception e){
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
private int id;
private String name;
private String password;
private int age;
}
參考: