之前一段時間通過SparkStreaming+Kafka處理上網(wǎng)日志數(shù)據(jù)的一些記錄诽嘉,做個備忘侧但。
KafkaUtils.createDirectStream or KafkaUtils.createStream
-
createDirectStream:Direct DStream方式由kafka的SimpleAPI實(shí)現(xiàn) ,比較靈活廉赔,可以自行指定起始的offset印蓖,性能較createStream高,
SparkStreaming讀取時在其內(nèi)自行維護(hù)offset但不會自動提交到zk中,如果要監(jiān)控offset情況遵馆,需要自己實(shí)現(xiàn)鲸郊。
spark-streaming-kafka-0-10中已經(jīng)實(shí)現(xiàn)offset自動提交zk中
- createStream:采用了Receiver DStream方式由kafka的high-level API實(shí)現(xiàn)
最新的實(shí)現(xiàn)中createDirectStream也可以提交offset了spark-streaming-kafka-0-10http://spark.apache.org/docs/latest/streaming-kafka-integration.html但要求 kafka是0.10.0及以后。
createDirectStream中的offset
createDirectStream不會自動提交offset到zk中货邓,不能方便的監(jiān)控數(shù)據(jù)消費(fèi)情況
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
.transform(rdd => {
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offset <- offsets) {
val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
//保存offset至zk可redis中方便監(jiān)控
//commitOffset(kafkaParams,groupId, Map(topicAndPartition -> offset.untilOffset))
}
rdd
})
如果可以只是用來監(jiān)控消費(fèi)情況在transform中轉(zhuǎn)換成HasOffsetRanges取出offset保存到zk中即可秆撮,
"rdd.asInstanceOf[HasOffsetRanges].offsetRanges" 如果已經(jīng)經(jīng)過其它Transformations或output操作之后此rdd已經(jīng)不是KafkaRDD,再轉(zhuǎn)換會報錯!换况!
另外還有一個控制能更強(qiáng)的createDirectStream方法职辨,可以指定fromOffsets和messageHandler
def createDirectStream(
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
)
可以將offset保存在zk或redis等外部存儲中方便監(jiān)控盗蟆,然后下次啟動時再從中讀取
分區(qū)partition
Kafka中的partition和Spark中的partition是不同的概念,但createDirectStream方式時topic的總partition數(shù)量和Spark和partition數(shù)量相等舒裤。
```
//KafkaRDD.getPartitions
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
```
partition中數(shù)據(jù)分布不均會導(dǎo)致有些任務(wù)快有些任務(wù)慢喳资,影響整體性能,可以根據(jù)實(shí)際情況做repartition腾供,單個topic比較容易實(shí)現(xiàn)partition中數(shù)據(jù)分布均勻仆邓,但如果同一個程序中需要同時處理多個topic的話,可以考慮能否合并成一個topic伴鳖,增加partition數(shù)量节值,不過topic很多時間會和其它系統(tǒng)共用,所以可能不容易合并榜聂,這情況只能做repartition搞疗。雖然repartition會消耗一些時間,但總的來說须肆,如果數(shù)據(jù)分布不是很均勻的話repartition還是值得匿乃,repartition之后各任務(wù)處理數(shù)據(jù)量基本一樣,而且Locality_level會變成“PROCESS_LOCAL”
P莘汀扳埂!使用flume加載到kafka的使用默認(rèn)配置十有八九分布不勻
檢查點(diǎn)
代碼:
Object SparkApp(){
def gnStreamContext(chkdir:String,batchDuration: Duration,partitions:Int)={
val conf = new SparkConf().setAppName("GnDataToHive") //.setMaster("local[2]")
val ssc = new StreamingContext(conf, batchDuration)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
...........
...........
...........
val terminfos = ssc.sparkContext.broadcast(ttis)
ssc.checkpoint(chkdir)
ssc
}
def main(args: Array[String]): Unit = {
val chkdir="hdfs://xxxxx/chkpoint/chkpoint-1"
val chkssc = StreamingContext.getOrCreate(chkdir,()=>gnStreamContext(chkdir,Seconds(args(0).toInt),args(1).toInt))
chkssc.start()
chkssc.awaitTermination()
}
}
offset會在保存至檢查點(diǎn)中,下次啟動會繼續(xù)接著讀取但是以下問題需要注意:
kafka中數(shù)通常保存周期都不會太長瘤礁,都有清理周期阳懂,如果記錄的offset對應(yīng)數(shù)據(jù)已經(jīng)被清理,從檢查點(diǎn)恢復(fù)時程序會一直報錯柜思。
如果程序邏輯發(fā)生變化岩调,需要先刪除檢查點(diǎn),否則不管數(shù)據(jù)還是邏輯都會從舊檢查點(diǎn)恢復(fù)赡盘。
限流
可以用spark.streaming.kafka.maxRatePerPartition指定每個批次從每個partition中每秒中最大拉取的數(shù)據(jù)量号枕,比如將值設(shè)為1000的話,每秒鐘最多從每個partition中拉取1000條數(shù)據(jù)陨享,如果batchDuration設(shè)為1分鐘的話葱淳,則每個批次最多從每個partition中拉取60000條數(shù)據(jù)。
此值要設(shè)置合理抛姑,太小有可能導(dǎo)致資源浪費(fèi)赞厕,但kafka中的數(shù)據(jù)消費(fèi)不完,太多又達(dá)不到限流的目的
具體代碼見:
DirectKafkaInputDStream.maxMessagesPerPartition
DirectKafkaInputDStream.clamp
```
// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
}
}.getOrElse(leaderOffsets)
}
```
spark-submit提交時帶上即可:--conf spark.streaming.kafka.maxRatePerPartition=10000
貌似只能在createDirectStream中起作用定硝,在createStream方式中沒看到有類似設(shè)置
hdfs輸出文件名:
寫入hdfs時默認(rèn)目錄名格式為:"prefix-TIME_IN_MS.suffix"皿桑,每個目錄下的文件名為"part-xxxx"。
如果只想自定義目錄名可以通過foreachRDD,調(diào)用RDD的saveAsXXX dstream.foreachRDD(rdd=>rdd.saveAsxxxx(""))
如果需要自定義輸出的文件名诲侮,需要自定義一個FileOutputFormat的子類镀虐,修改getRecordWriter方法中的name即可,然后調(diào)用saveAsHadoopFile[MyTextOutputFormat[NullWritable, Text]]
沟绪。
外部數(shù)據(jù)關(guān)聯(lián)
某些情況下載關(guān)聯(lián)外部數(shù)據(jù)進(jìn)行關(guān)聯(lián)或計算刮便。
- 外部數(shù)據(jù)放在redis中,在
mapPartitions
或foreachRDD.foreachPartitions
中關(guān)聯(lián) - 外部數(shù)據(jù)以broadcast變量形式做關(guān)聯(lián)
其它
- 日志:提交作業(yè)時spark-submit默認(rèn)會讀取$SPARK_HOME/conf/log4j.properties如果需要自定義可以在提交作業(yè)時可以帶上 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file://xx/xx/log4j.properties