本文基于Spark2.1.0版本
雖然很少有生產(chǎn)環(huán)境用Kafka傳遞超過1M消息的場景(因為高吞吐、低延時的要求,Kafka 發(fā)布-訂閱模型中Producer-Broker-Consumer 3方的相關(guān)默認配置都是1M)麦向,但由于手上項目的特殊需求子檀,希望Spark Streaming抽取Kafka數(shù)據(jù)源時,能消費30M-40MB大小的消息陶衅。下面我把相關(guān)配置及源碼提供一下屡立,有需要的同學可以參考。
(本文只涉及ETL過程中數(shù)據(jù)的抽取搀军,不涉及轉(zhuǎn)換和加載過程)
業(yè)務模型如下:
1膨俐,修改Producer-Broker-Consumer 3方配置,使其支持超大消息的傳遞和接收
A:修改Producer的max.request.size罩句,允許生產(chǎn)者發(fā)送超大消息焚刺,默認是1M
vim $KAFKA_HOME/config/producer.properties
修改max.request.size=41943040 ?#40MB
修改Producer的buffer.memory,允許生產(chǎn)者發(fā)送超大消息時的緩沖區(qū)的止,默認是32M
buffer.memory=45000000 # >40MB
B:修改Broker的message.max.bytes檩坚、replica.fetch.max.bytes、max.message.bytes允許Broker傳遞和備份超大消息诅福,默認是1M
vim $KAFKA_HOME/config/server.properties
修改message.max.bytes=41943040? #40MB
修改replica.fetch.max.bytes=41943040? #40MB
max.message.bytes可以不用修改匾委,默認值=message.max.bytes
C:修改Consumer的fetch.message.max.bytes,允許消費者拉取超大消息氓润,默認是1M
vim $KAFKA_HOME/config/consumer.properties
修改fetch.message.max.bytes=41943040? #40MB
上述配置修改完成后赂乐,所有Broker都需要重啟Kafka服務。
2咖气,Spark Streaming用Kafka 0.10.2的new Kafka consumer API 來消費超大消息
3挨措,測試
通過$KAFKA_HOME/bin下的kafka-producer-perf-test.sh充當生產(chǎn)者,發(fā)送40MB左右的消息
./kafka-producer-perf-test.sh --topic wl_test --num-records 1 --record-size 40000000 --throughput? 1 --producer-props bootstrap.servers=wl1:9092 max.request.size=45000000 buffer.memory=45000000
生產(chǎn)者發(fā)送消息后崩溪,通過Kafka Manager觀察浅役,Consumer已經(jīng)完成了消息抽取后的commit,Consumer offset增加伶唯。
driver端輸出消息
web ui能看到消費的具體情況
4觉既,Spark 2.1.0 的Streaming使用Kafka 0.10.2新Consumer API的整合接口好處多多
詳見官網(wǎng):
spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html