Flink 寫入數(shù)據(jù)到 Kafka

Flink 寫入數(shù)據(jù)到Kafka


前言

通過Flink官網(wǎng)可以看到Flink里面就默認支持了不少sink脱茉,比如也支持Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就來看看如何將數(shù)據(jù)寫入到Kafka纤掸。

準備

Flink里面支持Kafka 0.8阐肤、0.9凫佛、0.10、0.11.

這里我們需要安裝下Kafka孕惜,請對應(yīng)添加對應(yīng)的Flink Kafka connector依賴的版本愧薛,這里我們使用的是0.11 版本:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

目前我們先看下本地Kafka是否有這個student-write topic呢?需要執(zhí)行下這個命令:

?  kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
lambda-pipeline-topic
metrics
my-topic
my-topic-thread1
my-topic-thread2
qb_ad
qbad
qbad_test
student
topic1
wikipedia
wikipedia_stream

如果等下我們的程序運行起來后衫画,再次執(zhí)行這個命令出現(xiàn)student-write topic毫炉,那么證明我的程序確實起作用了,已經(jīng)將其他集群的Kafka數(shù)據(jù)寫入到本地Kafka了削罩。

程序代碼

public class FlinkSinkToKafka {

    private static final String READ_TOPIC = "student";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                READ_TOPIC,   //這個 kafka topic 需要和上面的工具類的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "student-write");

        student.addSink(new FlinkKafkaProducer011<>(
                "localhost:9092",
                "student-write",
                new SimpleStringSchema()
        )).name("flink-connectors-kafka")
                .setParallelism(5);

        env.execute("flink learning connectors kafka");
    }
}

運行結(jié)果

運行flink程序之后再次查看topic瞄勾,發(fā)現(xiàn)多了student-write這個topic

?  kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
lambda-pipeline-topic
metrics
my-topic
my-topic-thread1
my-topic-thread2
qb_ad
qbad
qbad_test
student
student-write
topic1
wikipedia
wikipedia_stream

查看topic student-write

?  kafka_2.11-0.10.2.0 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic student-write
Topic:student-write PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: student-write    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

IDEA打印如下:

2> {"age":20,"id":2,"name":"itzzy2","password":"password2"}
6> {"age":24,"id":6,"name":"itzzy6","password":"password6"}
2> {"age":28,"id":10,"name":"itzzy10","password":"password10"}
6> {"age":32,"id":14,"name":"itzzy14","password":"password14"}
2> {"age":36,"id":18,"name":"itzzy18","password":"password18"}
6> {"age":40,"id":22,"name":"itzzy22","password":"password22"}
2> {"age":44,"id":26,"name":"itzzy26","password":"password26"}
6> {"age":48,"id":30,"name":"itzzy30","password":"password30"}
2> {"age":52,"id":34,"name":"itzzy34","password":"password34"}
6> {"age":56,"id":38,"name":"itzzy38","password":"password38"}

查看topic信息

?  kafka_2.11-0.10.2.0 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic student
student:0:0
?  kafka_2.11-0.10.2.0 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic student_write
student_write:0:0

查看日志logs

?  kafka_2.11-0.10.2.0 ll /tmp/kafka-logs/student-write-0
total 0
-rw-r--r--  1 zzy  wheel  10485760 Jan 29 12:03 00000000000000000000.index
-rw-r--r--  1 zzy  wheel         0 Jan 29 12:03 00000000000000000000.log
-rw-r--r--  1 zzy  wheel  10485756 Jan 29 12:03 00000000000000000000.timeindex
?  kafka_2.11-0.10.2.0 ll /tmp/kafka-logs/student-0
total 0
-rw-r--r--  1 zzy  wheel  10485760 Jan 29 12:03 00000000000000000000.index
-rw-r--r--  1 zzy  wheel         0 Jan 29 12:03 00000000000000000000.log
-rw-r--r--  1 zzy  wheel  10485756 Jan 29 12:03 00000000000000000000.timeindex

分析

上面代碼我們使用 Flink Kafka Producer 只傳了三個參數(shù):brokerList、topicId弥激、serializationSchema(序列化)


其實也可以傳入多個參數(shù)進去进陡,現(xiàn)在有的參數(shù)用的是默認參數(shù),因為這個內(nèi)容比較多微服,后面可以抽出一篇文章單獨來講趾疚。

總結(jié)

本篇文章寫了Flink讀取Kafka集群的數(shù)據(jù),然后寫入到本地的Kafka上以蕴。

附上kafka生產(chǎn)者代碼

public class KafkaUtils {
    private static final String broker_list = "localhost:9092";
    private static final String topic = "student-1";  //kafka topic 需要和 flink 程序用同一個 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        try {
            for (int i = 1; i <= 100; i++) {
                Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
                ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
                producer.send(record);
                System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(student));
            }
            Thread.sleep(3000);
        }catch (Exception e){

        }

        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {

    private int id;
    private String name;
    private String password;
    private int age;

}

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末糙麦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子丛肮,更是在濱河造成了極大的恐慌喳资,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腾供,死亡現(xiàn)場離奇詭異仆邓,居然都是意外死亡,警方通過查閱死者的電腦和手機伴鳖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門节值,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人榜聂,你說我怎么就攤上這事搞疗。” “怎么了须肆?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵匿乃,是天一觀的道長桩皿。 經(jīng)常有香客問我,道長幢炸,這世上最難降的妖魔是什么泄隔? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮宛徊,結(jié)果婚禮上佛嬉,老公的妹妹穿的比我還像新娘。我一直安慰自己闸天,他們只是感情好暖呕,可當(dāng)我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著苞氮,像睡著了一般湾揽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上笼吟,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天钝腺,我揣著相機與錄音,去河邊找鬼赞厕。 笑死艳狐,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的皿桑。 我是一名探鬼主播毫目,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诲侮!你這毒婦竟也來了镀虐?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤沟绪,失蹤者是張志新(化名)和其女友劉穎刮便,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绽慈,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡恨旱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了坝疼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片搜贤。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钝凶,靈堂內(nèi)的尸體忽然破棺而出仪芒,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布掂名,位于F島的核電站据沈,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏饺蔑。R本人自食惡果不足惜锌介,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望膀钠。 院中可真熱鬧掏湾,春花似錦裹虫、人聲如沸肿嘲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽雳窟。三九已至,卻和暖如春匣屡,著一層夾襖步出監(jiān)牢的瞬間封救,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工捣作, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留誉结,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓券躁,卻偏偏與公主長得像惩坑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子也拜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容