Structured Streaming 與0.10及以上版本的Kafka整合來對(duì)Kafka中的讀書進(jìn)行讀取和寫入操作誊涯。
Linking
對(duì)于使用SBT/Maven定義的Scala/Java應(yīng)用程序,請(qǐng)將你的應(yīng)用程序與如下的artifact相連接:
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0
對(duì)于Python引用程序,你需要在發(fā)布應(yīng)用程序時(shí)添加上述的庫及其依賴,詳情請(qǐng)參考下面的發(fā)布模塊介紹扮授。
從Kafka中讀取數(shù)據(jù)
為流式查詢創(chuàng)建一個(gè)Kafka Source
Scala 代碼:
// 訂閱一個(gè)topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 訂閱多個(gè)topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 訂閱滿足一定正則式的topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Java代碼:
// 訂閱一個(gè)topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 訂閱多個(gè)topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 訂閱滿足一定正則式的topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Python代碼:
# 訂閱一個(gè)topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 訂閱多個(gè)topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 訂閱滿足一定正則式的topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
為批量查詢定義一個(gè)Kafak Source
如果你的用例更適用于批處理的話闸天,你可以根據(jù)既定的offset范圍來創(chuàng)建一個(gè)DataSet/DataFrame
Scala代碼:
// 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Java代碼:
// 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Python代碼:
# 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Source中的每一行都遵循下列模式:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
不論是批處理還是流式處理都必須為Kafka Source設(shè)置如下選項(xiàng):
選項(xiàng) | 值 | 意義 |
---|---|---|
assign | json值{"topicA":[0,1],"topicB":[2,4]} | 指定消費(fèi)的TopicPartition得滤,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè) |
subscribe | 一個(gè)以逗號(hào)隔開的topic列表 | 訂閱的topic列表奶稠,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè) |
subscribePattern | Java正則表達(dá)式 | 訂閱的topic列表的正則式俯艰,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè) |
kafka.bootstrap.servers | 以逗號(hào)隔開的host:port列表 | Kafka的"bootstrap.servers"配置 |
下面的配置是可選的:
選項(xiàng) | 值 | 默認(rèn)值 | 支持的查詢類型 | 意義 |
---|---|---|---|---|
startingOffsets | "earliest","lates"(僅streaming支持);或者json 字符"""{"topicA":{"0":23,"1":-1},"TopicB":{"0":-2}}""" | 對(duì)于流式處理來說是"latest",對(duì)于批處理來說是"earliest" | streaming和batch | 查詢開始的位置可以是"earliest"(從最早的位置開始),"latest"(從最新的位置開始),或者通過一個(gè)json為每個(gè)TopicPartition指定開始的offset锌订。通過Json指定的話,json中-2可以用于表示earliest画株,-1可以用于表示latest辆飘。注意:對(duì)于批處理而言,latest值不允許使用的谓传。 |
endingOffsets | latest or json string{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | batch | 一個(gè)批查詢的結(jié)束位置蜈项,可以是"latest",即最近的offset续挟,或者通過json來為每個(gè)TopicPartition指定一個(gè)結(jié)束位置紧卒,在json中,-1表示latest诗祸,而-2是不允許使用的 |
failOnDataLoss | true or false | true | streaming query | 數(shù)據(jù)丟失之后(topic被刪除跑芳,或者offset不在可用范圍內(nèi)時(shí))查詢是否失敗,這可能會(huì)引起一個(gè)告警直颅,如果你覺得不適用于你的應(yīng)用程序時(shí)博个,你可以禁用掉。批查詢?nèi)绻谧x出數(shù)據(jù)的時(shí)候發(fā)現(xiàn)數(shù)據(jù)丟失了總會(huì)失敗功偿。 |
kafkaConsumer.pollTimeoutMs | long | 512 | streaming 和batch | executor輪詢kafka中的數(shù)據(jù)的超時(shí)時(shí)間 |
fetchOffset.numRetries | int | 3 | streaming 和batch | 獲取Kafka offset的重試次數(shù) |
fetchOffset.retryIntervalMs | long | 10 | 獲取Kafka offset的時(shí)間間隔 | |
maxOffsetPerTrigger | long | none | streaming 和batch | 不想翻譯了盆佣。。械荷。 |
寫數(shù)據(jù)到Kafka
這里我們討論對(duì)鞋流式查詢或者批查詢數(shù)據(jù)到Apache Kafka的支持共耍,請(qǐng)注意Apache Kafka只支持至少一次語義,所以當(dāng)寫流式查詢數(shù)據(jù)或者批查詢數(shù)據(jù)到Kafka時(shí)吨瞎,有些記錄可能會(huì)重復(fù)痹兜,這是有可能發(fā)生的,例如关拒,kafka需要重新獲取還未被一個(gè)Broker識(shí)別的消息記錄佃蚜,即使這條消息已經(jīng)被Broker接收并將消息寫入記錄中了庸娱。由于Kafka本身的這些寫語義,Structured Streaming無法避免寫入的重復(fù)記錄的發(fā)生谐算。如果一個(gè)查詢的寫入成功熟尉,你就可以認(rèn)為是至少寫入了一次,一個(gè)刪除寫入重復(fù)記錄的解決方案就是引入一個(gè)唯一主鍵洲脂,這就可以在讀取時(shí)執(zhí)行去重操作了斤儿。
Column | Type |
---|---|
key(可選) | string或者binary |
value(必選) | string或者binary |
topic(*可選) | string |
寫入Kafka中的DataFrame必須遵循如下格式:
Column | Type |
---|---|
key(可選) | string或者binary |
value(必選) | string或者binary |
topic(*可選) | string |
注意:如果topic配置項(xiàng)沒有指定的話,topic列是需要指定的
value列是唯一必選的選項(xiàng)恐锦,如果key列沒有指定的話往果,系統(tǒng)會(huì)指定為null。如果topic列指定的話一铅,會(huì)將給定的數(shù)據(jù)寫入到Kafak中對(duì)應(yīng)的topic中陕贮,除非"topic"配置項(xiàng)已經(jīng)指定,如果配置項(xiàng)已經(jīng)指定的話潘飘,配置項(xiàng)中的配置會(huì)覆蓋掉topic列中的配置肮之。
選項(xiàng) | 值 | 意義 |
---|---|---|
kafka.bootstrap.server | 逗號(hào)分隔的host:port列表 | Kafka的"bootstrap.servers"配置 |
無論是批查詢還是流式查詢,下面的選項(xiàng)必須得為Kafka sink指定卜录;
選項(xiàng) | 值 | 意義 |
---|---|---|
kafka.bootstrap.server | 逗號(hào)分隔的host:port列表 | Kafka的"bootstrap.servers"配置 |
選項(xiàng) | 值 | 默認(rèn)值 | 支持的查詢類型 | 意義 |
---|---|---|---|---|
topic | string | none | streaming和batch | 設(shè)置允許寫入所有行到Kafka中的topic列表戈擒,這個(gè)配置會(huì)覆蓋數(shù)據(jù)中存在的topic列 |
下面的配置是可選的:
選項(xiàng) | 值 | 默認(rèn)值 | 支持的查詢類型 | 意義 |
---|---|---|---|---|
topic | string | none | streaming和batch | 設(shè)置允許寫入所有行到Kafka中的topic列表,這個(gè)配置會(huì)覆蓋數(shù)據(jù)中存在的topic列 |
為Streaming查詢創(chuàng)建一個(gè)Kafka Sink
Scala代碼:
// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
Java代碼:
// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
Python代碼:
# 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
寫B(tài)atch查詢的結(jié)果到Kafka中
Scala代碼:
//寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
Java代碼:
// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
Python代碼:
# 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
Kafka特殊配置
Kafka自身的配置可以通過DataStreamReader.option中以kafka.為前綴來指定艰毒,如:stream.option("kafka.bootstrap.servers","host:port"),有關(guān)Kafka的可配參數(shù)筐高,請(qǐng)參閱Kafka Consumer COnfig文檔中關(guān)于讀數(shù)據(jù)的參數(shù)以及Kafka Producer COnfig文檔中關(guān)于寫數(shù)據(jù)的參數(shù)。
注意:下面的Kafka參數(shù)是不能設(shè)置的丑瞧,如果設(shè)置的話Kafka的Source或者Sink會(huì)拋出異常:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
enable.auto.commit: Kafka source doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.
發(fā)布
作為Spark應(yīng)用程序柑土,通過spark-submit
來啟動(dòng)你的應(yīng)用程序。spark-sql-kafka-0-10_2.11
以及它的依賴可以使用--packages
添加到'spark-submit'中嗦篱,如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...
請(qǐng)參考應(yīng)用程序提交指南來獲取更多關(guān)于提交帶有外部依賴的應(yīng)用程序的信息冰单。