目錄
- 前言
- Spark streaming接收Kafka數(shù)據(jù)
- Spark向kafka中寫入數(shù)據(jù)
- Spark streaming+Kafka應(yīng)用
- Spark streaming+Kafka調(diào)優(yōu)
- 參考文獻(xiàn)
前言
在WeTest輿情項(xiàng)目中,需要對(duì)每天千萬(wàn)級(jí)的游戲評(píng)論信息進(jìn)行詞頻統(tǒng)計(jì),在生產(chǎn)者一端巍实,我們將數(shù)據(jù)按照每天的拉取時(shí)間存入了Kafka當(dāng)中,而在消費(fèi)者一端丸边,我們利用了spark streaming從kafka中不斷拉取數(shù)據(jù)進(jìn)行詞頻統(tǒng)計(jì)纬朝。本文首先對(duì)spark streaming嵌入kafka的方式進(jìn)行歸納總結(jié),之后簡(jiǎn)單闡述Spark streaming+kafka在輿情項(xiàng)目中的應(yīng)用俄讹,最后將自己在Spark Streaming+kafka的實(shí)際優(yōu)化中的一些經(jīng)驗(yàn)進(jìn)行歸納總結(jié)。(如有任何紕漏歡迎補(bǔ)充來(lái)踩踪蹬,我會(huì)第一時(shí)間改正v)
Spark streaming接收Kafka數(shù)據(jù)
用spark streaming流式處理kafka中的數(shù)據(jù),第一步當(dāng)然是先把數(shù)據(jù)接收過(guò)來(lái)疚漆,轉(zhuǎn)換為spark streaming中的數(shù)據(jù)結(jié)構(gòu)Dstream。接收數(shù)據(jù)的方式有兩種:1.利用Receiver接收數(shù)據(jù),2.直接從kafka讀取數(shù)據(jù)狡耻。
基于Receiver的方式
這種方式利用接收器(Receiver)來(lái)接收kafka中的數(shù)據(jù),其最基本是使用Kafka高階用戶API接口沼头。對(duì)于所有的接收器,從kafka接收來(lái)的數(shù)據(jù)會(huì)存儲(chǔ)在spark的executor中,之后spark streaming提交的job會(huì)處理這些數(shù)據(jù)毡庆。如下圖:
在使用時(shí)亚铁,我們需要添加相應(yīng)的依賴包:
<dependency><!-- Spark Streaming Kafka -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.3</version>
</dependency>
而對(duì)于Scala的基本使用方式如下:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
還有幾個(gè)需要注意的點(diǎn):
- 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的站粟,所以如果我們加大每個(gè)topic的partition數(shù)量,僅僅是增加線程來(lái)處理由單一Receiver消費(fèi)的主題。但是這并沒(méi)有增加Spark在處理數(shù)據(jù)上的并行度。
- 對(duì)于不同的Group和topic我們可以使用多個(gè)Receiver創(chuàng)建不同的Dstream來(lái)并行接收數(shù)據(jù)翘单,之后可以利用union來(lái)統(tǒng)一成一個(gè)Dstream柬唯。
- 如果我們啟用了Write Ahead Logs復(fù)制到文件系統(tǒng)如HDFS失晴,那么storage level需要設(shè)置成 StorageLevel.MEMORY_AND_DISK_SER书在,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
直接讀取方式
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式?jīng)]有receiver這一層制轰,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。其形式如下圖:
這種方法相較于Receiver方式的優(yōu)勢(shì)在于:
- 簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來(lái)合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù)业岁,這種映射關(guān)系也更利于理解和優(yōu)化棍好。
- 高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問(wèn)題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
- 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式汛聚,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步瞄桨,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)芯侥。而第二種方式唉工,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄继谚,消除了這種不一致性济瓢。
以上主要是對(duì)官方文檔[1]的一個(gè)簡(jiǎn)單翻譯,詳細(xì)內(nèi)容大家可以直接看下官方文檔這里不再贅述纽帖。
不同于Receiver的方式宠漩,是從Zookeeper中讀取offset值,那么自然zookeeper就保存了當(dāng)前消費(fèi)的offset值懊直,那么如果重新啟動(dòng)開(kāi)始消費(fèi)就會(huì)接著上一次offset值繼續(xù)消費(fèi)扒吁。而在Direct的方式中,我們是直接從kafka來(lái)讀數(shù)據(jù)室囊,那么offset需要自己記錄雕崩,可以利用checkpoint、數(shù)據(jù)庫(kù)或文件記錄或者回寫到zookeeper中進(jìn)行記錄融撞。這里我們給出利用Kafka底層API接口盼铁,將offset及時(shí)同步到zookeeper中的通用類,我將其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一個(gè)通用類尝偎,而KafkaCluster是kafka源碼中的一個(gè)類饶火,由于包名權(quán)限的原因我把它單獨(dú)提出來(lái)鹏控,ComsumerMain簡(jiǎn)單展示了通用類的使用方法,在每次創(chuàng)建KafkaStream時(shí)肤寝,都會(huì)先從zooker中查看上次的消費(fèi)記錄offsets当辐,而每個(gè)batch處理完成后,會(huì)同步offsets到zookeeper中鲤看。
Spark向kafka中寫入數(shù)據(jù)
上文闡述了Spark如何從Kafka中流式的讀取數(shù)據(jù)缘揪,下面我整理向Kafka中寫數(shù)據(jù)。與讀數(shù)據(jù)不同义桂,Spark并沒(méi)有提供統(tǒng)一的接口用于寫入Kafka找筝,所以我們需要使用底層Kafka接口進(jìn)行包裝。
最直接的做法我們可以想到如下這種方式:
input.foreachRDD(rdd =>
// 不能在這里創(chuàng)建KafkaProducer
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)
但是這種方式缺點(diǎn)很明顯慷吊,對(duì)于每個(gè)partition的每條記錄袖裕,我們都需要?jiǎng)?chuàng)建KafkaProducer,然后利用producer進(jìn)行輸出操作罢浇,注意這里我們并不能將KafkaProducer的新建任務(wù)放在foreachPartition外邊陆赋,因?yàn)镵afkaProducer是不可序列化的(not serializable)沐祷。顯然這種做法是不靈活且低效的嚷闭,因?yàn)槊織l記錄都需要建立一次連接。如何解決呢赖临?
- 首先胞锰,我們需要將KafkaProducer利用lazy val的方式進(jìn)行包裝如下:
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- 之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個(gè)executor兢榨,如下:
// 廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", Conf.brokers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
這樣我們就能在每個(gè)executor中愉快的將數(shù)據(jù)輸入到kafka當(dāng)中:
//輸出到kafka
segmentedStream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreach(record => {
kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
// do something else
})
}
})
Spark streaming+Kafka應(yīng)用
WeTest輿情監(jiān)控對(duì)于每天爬取的千萬(wàn)級(jí)游戲玩家評(píng)論信息都要實(shí)時(shí)的進(jìn)行詞頻統(tǒng)計(jì)嗅榕,對(duì)于爬取到的游戲玩家評(píng)論數(shù)據(jù),我們會(huì)生產(chǎn)到Kafka中吵聪,而另一端的消費(fèi)者我們采用了Spark Streaming來(lái)進(jìn)行流式處理凌那,首先利用上文我們闡述的Direct方式從Kafka拉取batch,之后經(jīng)過(guò)分詞吟逝、統(tǒng)計(jì)等相關(guān)處理帽蝶,回寫到DB上(至于Spark中DB的回寫方式可參考我之前總結(jié)的博文:Spark踩坑記——數(shù)據(jù)庫(kù)(Hbase+Mysql)),由此高效實(shí)時(shí)的完成每天大量數(shù)據(jù)的詞頻統(tǒng)計(jì)任務(wù)块攒。
Spark streaming+Kafka調(diào)優(yōu)
Spark streaming+Kafka的使用中励稳,當(dāng)數(shù)據(jù)量較小,很多時(shí)候默認(rèn)配置和使用便能夠滿足情況囱井,但是當(dāng)數(shù)據(jù)量大的時(shí)候驹尼,就需要進(jìn)行一定的調(diào)整和優(yōu)化,而這種調(diào)整和優(yōu)化本身也是不同的場(chǎng)景需要不同的配置庞呕。
合理的批處理時(shí)間(batchDuration)
幾乎所有的Spark Streaming調(diào)優(yōu)文檔都會(huì)提及批處理時(shí)間的調(diào)整新翎,在StreamingContext初始化的時(shí)候,有一個(gè)參數(shù)便是批處理時(shí)間的設(shè)定。如果這個(gè)值設(shè)置的過(guò)短料祠,即個(gè)batchDuration所產(chǎn)生的Job并不能在這期間完成處理骆捧,那么就會(huì)造成數(shù)據(jù)不斷堆積,最終導(dǎo)致Spark Streaming發(fā)生阻塞髓绽。而且敛苇,一般對(duì)于batchDuration的設(shè)置不會(huì)小于500ms,因?yàn)檫^(guò)小會(huì)導(dǎo)致SparkStreaming頻繁的提交作業(yè)顺呕,對(duì)整個(gè)streaming造成額外的負(fù)擔(dān)枫攀。在平時(shí)的應(yīng)用中,根據(jù)不同的應(yīng)用場(chǎng)景和硬件配置株茶,我設(shè)在1~10s之間来涨,我們可以根據(jù)SparkStreaming的可視化監(jiān)控界面,觀察Total Delay來(lái)進(jìn)行batchDuration的調(diào)整启盛,如下圖:
合理的Kafka拉取量(maxRatePerPartition重要)
對(duì)于Spark Streaming消費(fèi)kafka中數(shù)據(jù)的應(yīng)用場(chǎng)景蹦掐,這個(gè)配置是非常關(guān)鍵的,配置參數(shù)為:spark.streaming.kafka.maxRatePerPartition僵闯。這個(gè)參數(shù)默認(rèn)是沒(méi)有上線的卧抗,即kafka當(dāng)中有多少數(shù)據(jù)它就會(huì)直接全部拉出。而根據(jù)生產(chǎn)者寫入Kafka的速率以及消費(fèi)者本身處理數(shù)據(jù)的速度鳖粟,同時(shí)這個(gè)參數(shù)需要結(jié)合上面的batchDuration社裆,使得每個(gè)partition拉取在每個(gè)batchDuration期間拉取的數(shù)據(jù)能夠順利的處理完畢,做到盡可能高的吞吐量向图,而這個(gè)參數(shù)的調(diào)整可以參考可視化監(jiān)控界面中的Input Rate和Processing Time泳秀,如下圖:
緩存反復(fù)使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,如果被反復(fù)的使用榄攀,最好利用cache()嗜傅,將該數(shù)據(jù)流緩存起來(lái),防止過(guò)度的調(diào)度資源造成的網(wǎng)絡(luò)開(kāi)銷檩赢÷类郑可以參考觀察Scheduling Delay參數(shù),如下圖:
設(shè)置合理的GC
長(zhǎng)期使用Java的小伙伴都知道漠畜,JVM中的垃圾回收機(jī)制币他,可以讓我們不過(guò)多的關(guān)注與內(nèi)存的分配回收,更加專注于業(yè)務(wù)邏輯憔狞,JVM都會(huì)為我們搞定蝴悉。對(duì)JVM有些了解的小伙伴應(yīng)該知道,在Java虛擬機(jī)中瘾敢,將內(nèi)存分為了初生代(eden generation)拍冠、年輕代(young generation)尿这、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費(fèi)一定時(shí)間的庆杜,尤其是老年代的GC回收射众,需要對(duì)內(nèi)存碎片進(jìn)行整理,通常采用標(biāo)記-清楚的做法晃财。同樣的在Spark程序中叨橱,JVM GC的頻率和時(shí)間也是影響整個(gè)Spark效率的關(guān)鍵因素。在通常的使用中建議:
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
設(shè)置合理的CPU資源數(shù)
CPU的core數(shù)量断盛,每個(gè)executor可以占用一個(gè)或多個(gè)core罗洗,可以通過(guò)觀察CPU的使用率變化來(lái)了解計(jì)算資源的使用情況,例如钢猛,很常見(jiàn)的一種浪費(fèi)是一個(gè)executor占用了多個(gè)core伙菜,但是總的CPU使用率卻不高(因?yàn)橐粋€(gè)executor并不總能充分利用多核的能力),這個(gè)時(shí)候可以考慮讓么個(gè)executor占用更少的core命迈,同時(shí)worker下面增加更多的executor贩绕,或者一臺(tái)host上面增加更多的worker來(lái)增加并行執(zhí)行的executor的數(shù)量,從而增加CPU利用率壶愤。但是增加executor的時(shí)候需要考慮好內(nèi)存消耗淑倾,因?yàn)橐慌_(tái)機(jī)器的內(nèi)存分配給越多的executor,每個(gè)executor的內(nèi)存就越小公你,以致出現(xiàn)過(guò)多的數(shù)據(jù)spill over甚至out of memory的情況踊淳。
設(shè)置合理的parallelism
partition和parallelism假瞬,partition指的就是數(shù)據(jù)分片的數(shù)量陕靠,每一次task只能處理一個(gè)partition的數(shù)據(jù),這個(gè)值太小了會(huì)導(dǎo)致每片數(shù)據(jù)量太大脱茉,導(dǎo)致內(nèi)存壓力剪芥,或者諸多executor的計(jì)算能力無(wú)法利用充分;但是如果太大了則會(huì)導(dǎo)致分片太多琴许,執(zhí)行效率降低税肪。在執(zhí)行action類型操作的時(shí)候(比如各種reduce操作),partition的數(shù)量會(huì)選擇parent RDD中最大的那一個(gè)榜田。而parallelism則指的是在RDD進(jìn)行reduce類操作的時(shí)候益兄,默認(rèn)返回?cái)?shù)據(jù)的paritition數(shù)量(而在進(jìn)行map類操作的時(shí)候,partition數(shù)量通常取自parent RDD中較大的一個(gè)箭券,而且也不會(huì)涉及shuffle净捅,因此這個(gè)parallelism的參數(shù)沒(méi)有影響)。所以說(shuō)辩块,這兩個(gè)概念密切相關(guān)蛔六,都是涉及到數(shù)據(jù)分片的荆永,作用方式其實(shí)是統(tǒng)一的。通過(guò)spark.default.parallelism可以設(shè)置默認(rèn)的分片數(shù)量国章,而很多RDD的操作都可以指定一個(gè)partition參數(shù)來(lái)顯式控制具體的分片數(shù)量具钥。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式液兽,前文闡述過(guò)Spark中的partition和Kafka中的Partition是一一對(duì)應(yīng)的骂删,我們一般默認(rèn)設(shè)置為Kafka中Partition的數(shù)量。
使用高性能的算子
這里參考了美團(tuán)技術(shù)團(tuán)隊(duì)的博文四啰,并沒(méi)有做過(guò)具體的性能測(cè)試桃漾,其建議如下:
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map
- 使用foreachPartitions替代foreach
- 使用filter之后進(jìn)行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
使用Kryo優(yōu)化序列化性能
這個(gè)優(yōu)化原則我本身也沒(méi)有經(jīng)過(guò)測(cè)試,但是好多優(yōu)化文檔有提到拟逮,這里也記錄下來(lái)撬统。
在Spark中,主要有三個(gè)地方涉及到了序列化:
- 在算子函數(shù)中使用到外部變量時(shí)敦迄,該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見(jiàn)“原則七:廣播大變量”中的講解)恋追。
- 將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型)罚屋,所有自定義類型對(duì)象苦囱,都會(huì)進(jìn)行序列化。因此這種情況下脾猛,也要求自定義的類必須實(shí)現(xiàn)Serializable接口撕彤。
- 使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組猛拴。
對(duì)于這三種出現(xiàn)序列化的地方羹铅,我們都可以通過(guò)使用Kryo序列化類庫(kù),來(lái)優(yōu)化序列化和反序列化的性能愉昆。Spark默認(rèn)使用的是Java的序列化機(jī)制职员,也就是ObjectOutputStream/ObjectInputStream API來(lái)進(jìn)行序列化和反序列化。但是Spark同時(shí)支持使用Kryo序列化庫(kù)跛溉,Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多焊切。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制芳室,性能高10倍左右专肪。Spark之所以默認(rèn)沒(méi)有使用Kryo作為序列化類庫(kù),是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型堪侯,因此對(duì)于開(kāi)發(fā)者來(lái)說(shuō)嚎尤,這種方式比較麻煩。
以下是使用Kryo的代碼示例抖格,我們只要設(shè)置序列化類诺苹,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型咕晋、作為RDD泛型類型的自定義類型等):
// 創(chuàng)建SparkConf對(duì)象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer收奔。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型掌呜。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
結(jié)果
經(jīng)過(guò)種種調(diào)試優(yōu)化,我們最終要達(dá)到的目的是坪哄,Spark Streaming能夠?qū)崟r(shí)的拉取Kafka當(dāng)中的數(shù)據(jù)质蕉,并且能夠保持穩(wěn)定,如下圖所示:
當(dāng)然不同的應(yīng)用場(chǎng)景會(huì)有不同的圖形翩肌,這是本文詞頻統(tǒng)計(jì)優(yōu)化穩(wěn)定后的監(jiān)控圖模暗,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數(shù)Batch都能夠在這一虛線下處理完畢念祭,說(shuō)明整體Spark Streaming是運(yùn)行穩(wěn)定的兑宇。