Apache Kafka Connector

此連接器可訪問(wèn)由Apache Kafka提供的事件流清钥。

Flink提供特殊的Kafka連接器屎债,用于從/到Kafka主題讀取和寫入數(shù)據(jù)窑邦。 Flink Kafka Consumer集成了Flink的檢查點(diǎn)機(jī)制,以提供一次性處理語(yǔ)義魁淳。為了達(dá)到這個(gè)目的飘诗,F(xiàn)link并不完全依靠Kafka的消費(fèi)者群體偏移跟蹤,而是跟蹤和檢查點(diǎn)內(nèi)部的偏移界逛。

請(qǐng)為您的用例和環(huán)境選擇一個(gè)包(maven artifact id)和類名昆稿。對(duì)于大多數(shù)用戶來(lái)說(shuō),F(xiàn)linkKafkaConsumer08(flink-connector-kafka的一部分)是適當(dāng)?shù)摹?/p>

Maven Dependency Supported since Consumer and Producer Class name Kafka version Notes
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x 在內(nèi)部使用Kafka 的SimpleConsumer API息拜。偏移量是通過(guò)Flink提交給ZK的溉潭。
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x 使用新的Consumer API Kafka。
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x 此連接器支持帶有時(shí)間戳的Kafka消息少欺,用于生成和使用喳瓣。
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x 由于0.11.x Kafka不支持scala 2.10。此連接器支持Kafka事務(wù)性消息傳遞赞别,為生產(chǎn)者提供一次語(yǔ)義畏陕。

導(dǎo)入maven庫(kù):

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

Kafka Consumer

我們需要編寫一個(gè)Kafka Consumer,通過(guò)Flink計(jì)算引擎從Kafka相應(yīng)的Topic中讀取數(shù)據(jù)仿滔。在Flink中惠毁,我們可以通過(guò)FlinkKafkaConsumer08來(lái)實(shí)現(xiàn),這個(gè)類提供了讀取一個(gè)或者多個(gè)Kafka Topic的機(jī)制崎页。它的構(gòu)造函數(shù)接收以下幾個(gè)參數(shù):

  1. topic的名字鞠绰,可以是String(用于讀取一個(gè)Topic)List(用于讀取多個(gè)Topic);
  2. 可以提供一個(gè)DeserializationSchema / KeyedDeserializationSchema用于反系列化Kafka中的字節(jié)數(shù)組飒焦;
  3. Kafka consumer的一些配置信息洞豁,而且我們必須指定bootstrap.servers、zookeeper.connect(這個(gè)屬性僅僅在Kafka 0.8中需要)和group.id屬性。

使用FlinkKafkaConsumer08類吧丈挟,初始化如下:

    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", "master.bigdata:9092,slave1.bigdata:9092,slave2.bigdata:9092")
    kafkaProps.setProperty("zookeeper.connect", "spmaster.bigdata:2181,spslave1.bigdata:2181,spslave2.bigdata:2181")
    kafkaProps.setProperty("group.id", "nealy_group")

    val kafkaConsumer = new FlinkKafkaConsumer08[String]("train_appevent_topic", new SimpleStringSchema(), kafkaProps)
    kafkaConsumer.print()

上面的例子中使用到SimpleStringSchema來(lái)反系列化message,這個(gè)類是實(shí)現(xiàn)了DeserializationSchema接口志电,并重寫了T deserialize(byte[] message)函數(shù)曙咽,DeserializationSchema接口僅提供了反系列化data的接口,所以如果我們需要反系列化key挑辆,我們需要使用KeyedDeserializationSchema的子類例朱。KeyedDeserializationSchema接口提供了T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)方法,可以反系列化kafka消息的data和key鱼蝉。

為了方便使用洒嗤,F(xiàn)link內(nèi)部提供了一序列的schemas:

  1. TypeInformationSerializationSchemaTypeInformationKeyValueSerializationSchema,它可以根據(jù)Flink的TypeInformation信息來(lái)推斷出需要選擇的schemas魁亦。 此模式是其他通用序列化方法的高性能Flink替代方案渔隶。

  2. JsonDeserializationSchemaJSONKeyValueDeserializationSchema 將序列化的JSON轉(zhuǎn)換為ObjectNode對(duì)象,訪問(wèn)字段可以使用 objectNode.get(“field”)作為(Int / String / ...)(),KeyValue objectNode包含一個(gè)“key”和“value”字段洁奈,其中包含所有字段间唉,以及一個(gè)可選的“metadata”字段,用于獲取offset/partition/topic的信息利术。

  3. AvroDeserializationSchema它使用靜態(tài)模式來(lái)讀取使用Avro格式序列化的數(shù)據(jù)呈野。它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))中推斷出模式,也可以GenericRecords 使用手動(dòng)提供的模式(with AvroDeserializationSchema.forGeneric(...))印叁。此反序列化架構(gòu)要求序列化記錄不包含嵌入式架構(gòu)被冒。

  • 還有一個(gè)可用的模式版本,可以在Confluent Schema Registry中查找編寫器的模式(用于編寫記錄的 模式)轮蜕。使用這些反序列化模式記錄將使用從模式注冊(cè)表中檢索的模式進(jìn)行讀取昨悼,并轉(zhuǎn)換為靜態(tài)提供的模式(通過(guò)ConfluentRegistryAvroDeserializationSchema.forGeneric(...)ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。

要使用此反序列化模式肠虽,必須添加以下POM依賴項(xiàng):

<!-- AvroDeserializationSchema  模式-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>
<!-- ConfluentRegistryAvroDeserializationSchema  模式-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

Kafka Consumer 開始位置配置

Flink Kafka Consumer允許配置Kafka分區(qū)的起始位置幔戏。

val kafkaConsumer = new FlinkKafkaConsumer08[String]("train_appevent_topic", new SimpleStringSchema(), kafkaProps)
kafkaConsumer.setStartFromEarliest()     // 從最早的記錄開始
kafkaConsumer.setStartFromLatest()       // 從最新的記錄開始
kafkaConsumer.setStartFromTimestamp(時(shí)間戳)    // 從指定的時(shí)代時(shí)間戳(毫秒)開始   kafka 0.10.X
kafkaConsumer.setStartFromGroupOffsets() // 默認(rèn)的行為

kafkaConsumer.print()

Flink Kafka Consumer的所有版本都具有上述明確的起始位置配置方法。

  • setStartFromGroupOffsets() : 默認(rèn)行為税课; 從group.id Kafka代理(或Zookeeper for Kafka 0.8)中的消費(fèi)者組(在消費(fèi)者屬性中設(shè)置)提交的偏移量開始讀取分區(qū)闲延。如果找不到分區(qū)的偏移量,t將使用屬性中的auto.offset.rese進(jìn)行設(shè)置韩玩。

  • setStartFromEarliest() / setStartFromLatest():從最早/最新記錄開始垒玲。在這種模式,Kafka中的已提交偏移將被忽略找颓。

  • setStartFromTimestamp(long):從指定的時(shí)間戳開始消費(fèi)合愈。對(duì)于每個(gè)分區(qū),將從時(shí)間戳大于或等于指定時(shí)間戳作為起始位置,開始消費(fèi)佛析。在此模式下益老,Kafka中的已提交偏移將被忽略,不會(huì)用作起始位置寸莫。

您還可以指定消費(fèi)者應(yīng)從每個(gè)分區(qū)開始的確切偏移量:
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

val kafkaConsumer = new FlinkKafkaConsumer010[String]("train_trajectory_appevent", new SimpleStringSchema(), kafkaProps)
kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets)

上述的代碼配置了myTopic的partition 0,1,2在被Flink job消費(fèi)的起始位置捺萌。假設(shè)myTopic總共有5個(gè)partition,那么剩下的兩個(gè)partition沒(méi)有被配置具體的offset的起始位膘茎,所以Flink會(huì)對(duì)這兩個(gè)partition的采用默認(rèn)的offset起始位的配置(setStartFromGroupOffsets)桃纯。

注意,如果你在這個(gè)job中配置了enableCheckpointing() 或者從某個(gè)savepoint來(lái)啟動(dòng)這個(gè)job披坏,那么起始位會(huì)優(yōu)先從savepoint或者checkpoint中獲取态坦。

Kafka Consumers和Fault Tolerance

如果我們啟用了Flink的Checkpint機(jī)制,那么Flink Kafka Consumer將會(huì)從指定的Topic中消費(fèi)消息棒拂,然后定期地將Kafka offsets信息伞梯、狀態(tài)信息以及其他的操作信息進(jìn)行Checkpint。所以着茸,如果Flink作業(yè)出故障了壮锻,F(xiàn)link將會(huì)從最新的Checkpint中恢復(fù),并且從上一次偏移量開始讀取Kafka中消費(fèi)消息涮阔。

我們需要在程序中配置 Flink Kafkaconsumer的容錯(cuò)機(jī)制:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(1000)    // 默認(rèn) 500毫秒

還有一點(diǎn)需要注意的是猜绣,F(xiàn)link只有在task slot的數(shù)量足夠的情況下才可以成功的重啟job,所以如果job是因?yàn)門askManager down掉(或者無(wú)法連接到集群)導(dǎo)致task slot不足而失敗敬特,那么必須要恢復(fù)增加足夠的task slot才能讓job重啟掰邢。而Flink on YARN 支持自動(dòng)的重啟丟失的YARN containers。

Kafka Consumers 發(fā)現(xiàn)新增Topic和分區(qū)

動(dòng)態(tài)發(fā)現(xiàn)分區(qū)

Flink Kafka Consumer支持動(dòng)態(tài)創(chuàng)建的Kafka分區(qū),并可以準(zhǔn)確的保證exactly-once 消費(fèi)伟阔。當(dāng)在Job運(yùn)行時(shí),發(fā)現(xiàn)有新增的分區(qū)辣之,將從最可能早的偏移量中開始消費(fèi)。

默認(rèn)情況下皱炉,禁用發(fā)現(xiàn)分區(qū)怀估。要啟用它,可以在提供的屬性配置中flink.partition-discovery.interval-millis設(shè)置非負(fù)值的時(shí)間間隔合搅。
限制 如果使用Flink 1.3.x之前版本的 Savepoint 恢復(fù)運(yùn)行時(shí)不能啟用分區(qū)發(fā)現(xiàn)多搀。如果啟用,則將恢復(fù)失敗并出現(xiàn)異常灾部。在這種情況下康铭,為了使用分區(qū)發(fā)現(xiàn),請(qǐng)首先在Flink 1.3.x中使用savepoint 赌髓,然后再?gòu)闹谢謴?fù)从藤。

動(dòng)態(tài)發(fā)現(xiàn)Topic

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)

val stream = env.addSource(myConsumer)

在上面的示例中催跪,當(dāng)作業(yè)開始運(yùn)行時(shí),消費(fèi)者將Topic名稱 test-topic-,與指定的正則表達(dá)式匹配所有主題(以單個(gè)數(shù)字開頭并以單個(gè)數(shù)字結(jié)尾)夷野。

為了讓消費(fèi)者在作業(yè)開始運(yùn)行后懊蒸,可以發(fā)現(xiàn)動(dòng)態(tài)創(chuàng)建的主題。請(qǐng)將其屬性flink.partition-discovery.interval-millis設(shè)置為非負(fù)值悯搔。允許消費(fèi)者發(fā)現(xiàn)新的topic的分區(qū)榛鼎, 也可以匹配指定的正則表達(dá)式。

offset提交行為的配置

Flink KafkaConsumer允許配置向 Kafka brokers(或者向Zookeeper)提交offset的行為鳖孤。需要注意的是,F(xiàn)link Kafka Consumer并不依賴于這些提交回Kafka或Zookeeper的offset來(lái)保證容錯(cuò)抡笼。這些被提交的offset只是意味著Flink將消費(fèi)的狀態(tài)暴露在外以便于監(jiān)控苏揣。

  • Checkpointingdisabled: 如果禁用了檢查點(diǎn), Flink Kafka Consumer依賴于它使用的具體的Kafka client的自動(dòng)定期提交offset的行為推姻,相應(yīng)的設(shè)置是 Kafka properties中的 enable.auto.commit (或者 auto.commit.enable 對(duì)于Kafka 0.8) 以及 auto.commit.interval.ms平匈。

  • Checkpointingenabled: 如果啟用了檢查點(diǎn),F(xiàn)link Kafka Consumer會(huì)將offset存到checkpoint中藏古,當(dāng)checkpoint 處于completed的狀態(tài)時(shí)增炭。這保證了在Kafka brokers中的committed offsetcheckpointed states中的offset保持一致。通過(guò)調(diào)用setCommitOffsetOnCheckpoints(boolean)來(lái)調(diào)整 offset自動(dòng)提交是否開啟(默認(rèn)情況下是true拧晕,即開啟自動(dòng)提交)隙姿。請(qǐng)注意,在這種情況下厂捞,配置在properties 中的offset的定時(shí)自動(dòng)提交行為將會(huì)被忽略输玷。

容錯(cuò)機(jī)制

發(fā)生錯(cuò)誤的情況下,F(xiàn)link會(huì)如何處理呢靡馁?在finally塊中記錄最后消費(fèi)到的offset再向JobManager提交checkpoint嗎欲鹏?在通常情況下,比如發(fā)生了手動(dòng)cancel或者userCode的異常時(shí)臭墨,這么做沒(méi)有問(wèn)題赔嚎。可是如果是因?yàn)槠渌?如Full GC)使得TaskManagerhung住了胧弛,甚至是機(jī)器掛了尤误,那么這個(gè)時(shí)候就不能通過(guò)finally 塊來(lái)保證exactly-once了。Flink依賴的是帶barrier的checkpointing機(jī)制來(lái)解決容錯(cuò)的問(wèn)題叶圃。

我們通過(guò)下面一副圖來(lái)簡(jiǎn)述這種機(jī)制:



barrier可以理解為checkpoint之間的分隔符袄膏,在它之前的data屬于前一個(gè)checkpoint,而在它之后的data屬于另一個(gè)checkpoint掺冠。同時(shí)沉馆,barrier會(huì)由source(如FlinkKafkaConsumer)發(fā)起码党,并混在數(shù)據(jù)中,同數(shù)據(jù)一樣傳輸給下一級(jí)的operator斥黑,直到sink為止揖盘。假設(shè)我們的Streaming Job只有一個(gè)source、一個(gè)map operator 以及一個(gè)sink锌奴,屬于barrier所分隔的checkpoint 的數(shù)據(jù)已經(jīng)被處理完畢并sink兽狭,而barrier還處于source和map operator之間,barrier 還處于map和sink之間鹿蜀。由于barrier已經(jīng)被sink收到箕慧,那么說(shuō)明checkpoint已經(jīng)完成了(這個(gè)checkpoint的狀態(tài)為completed并被存到了state backend中),它之前的數(shù)據(jù)已經(jīng)被處理完畢并sink茴恰。

但是由于sink還沒(méi)有收到barrier颠焦,那么所有之前之后的數(shù)據(jù)都會(huì)被緩存在sink的Input Buffer中,也就是說(shuō)這部分?jǐn)?shù)據(jù)雖然已經(jīng)經(jīng)過(guò)source消費(fèi)并經(jīng)過(guò)map處理了往枣,但是還是沒(méi)有寫入目的地伐庭。所以如果Job在這個(gè)時(shí)候失敗了,最后一個(gè)成功committed的checkpoint是checkpoint分冈,所以FlinkKafkaConsumer從checkpoint中恢復(fù)出相應(yīng)的partitionoffset就可以了圾另。

我們注意到,雖然之后的部分?jǐn)?shù)據(jù)和之后的所有數(shù)據(jù)雖然已經(jīng)被source消費(fèi)雕沉,但是都沒(méi)有被sink集乔,這部分?jǐn)?shù)據(jù)會(huì)被FlinkKafkaConsumer“重復(fù)”消費(fèi),我們并沒(méi)有丟失任何的數(shù)據(jù)也沒(méi)有重復(fù)寫入任何數(shù)據(jù)蘑秽,保證了exactly-once饺著。

  1. 在配置了checkpointingenable的情況下,F(xiàn)linkKafkaConsumer08在開始消費(fèi)數(shù)據(jù)之前肠牲,會(huì)優(yōu)先從checkpoint中恢復(fù)出被消費(fèi)的partition的offset幼衰,如果沒(méi)有從checkpoint中恢復(fù)某些partition的offset,它會(huì)從Zookeeper中恢復(fù)缀雳,若從Zookeeper中仍然沒(méi)有恢復(fù)渡嚣,它會(huì)根據(jù)配置的offset起始行為來(lái)配置起始o(jì)ffset。

2.FlinkKafkaConsumer08通過(guò)Kafka的低級(jí)API和Flink帶barrier的輕量級(jí)checkpoint機(jī)制保證了在高吞吐量的情況下的exactly-once肥印。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末识椰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子深碱,更是在濱河造成了極大的恐慌腹鹉,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敷硅,死亡現(xiàn)場(chǎng)離奇詭異功咒,居然都是意外死亡愉阎,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門力奋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)榜旦,“玉大人,你說(shuō)我怎么就攤上這事景殷〗δ兀” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵猿挚,是天一觀的道長(zhǎng)咐旧。 經(jīng)常有香客問(wèn)我,道長(zhǎng)绩蜻,這世上最難降的妖魔是什么休偶? 我笑而不...
    開封第一講書人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮辜羊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘词顾。我一直安慰自己八秃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開白布肉盹。 她就那樣靜靜地躺著昔驱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪上忍。 梳的紋絲不亂的頭發(fā)上骤肛,一...
    開封第一講書人閱讀 51,215評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音窍蓝,去河邊找鬼腋颠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛吓笙,可吹牛的內(nèi)容都是我干的淑玫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼面睛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼絮蒿!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起叁鉴,我...
    開封第一講書人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤土涝,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后幌墓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體但壮,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冀泻,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了茵肃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腔长。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖验残,靈堂內(nèi)的尸體忽然破棺而出捞附,到底是詐尸還是另有隱情,我是刑警寧澤您没,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布鸟召,位于F島的核電站,受9級(jí)特大地震影響氨鹏,放射性物質(zhì)發(fā)生泄漏欧募。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一仆抵、第九天 我趴在偏房一處隱蔽的房頂上張望跟继。 院中可真熱鬧,春花似錦镣丑、人聲如沸舔糖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)金吗。三九已至,卻和暖如春趣竣,著一層夾襖步出監(jiān)牢的瞬間摇庙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工遥缕, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留卫袒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓单匣,卻偏偏與公主長(zhǎng)得像玛臂,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子封孙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容