spark應(yīng)用開發(fā)-streaming&kafka

之前一段時間通過SparkStreaming+Kafka處理上網(wǎng)日志數(shù)據(jù)的一些記錄诽嘉,做個備忘侧但。

KafkaUtils.createDirectStream or KafkaUtils.createStream

  1. createDirectStream:Direct DStream方式由kafka的SimpleAPI實(shí)現(xiàn) ,比較靈活廉赔,可以自行指定起始的offset印蓖,性能較createStream高,
    SparkStreaming讀取時在其內(nèi)自行維護(hù)offset但不會自動提交到zk中,如果要監(jiān)控offset情況遵馆,需要自己實(shí)現(xiàn)鲸郊。

spark-streaming-kafka-0-10中已經(jīng)實(shí)現(xiàn)offset自動提交zk中

  1. createStream:采用了Receiver DStream方式由kafka的high-level API實(shí)現(xiàn)

最新的實(shí)現(xiàn)中createDirectStream也可以提交offset了spark-streaming-kafka-0-10http://spark.apache.org/docs/latest/streaming-kafka-integration.html但要求 kafka是0.10.0及以后。

createDirectStream中的offset

createDirectStream不會自動提交offset到zk中货邓,不能方便的監(jiān)控數(shù)據(jù)消費(fèi)情況

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
       .transform(rdd => {
       val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
       for (offset <- offsets) {
           val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
           //保存offset至zk可redis中方便監(jiān)控
           //commitOffset(kafkaParams,groupId, Map(topicAndPartition -> offset.untilOffset))
       }
       rdd
       })

如果可以只是用來監(jiān)控消費(fèi)情況在transform中轉(zhuǎn)換成HasOffsetRanges取出offset保存到zk中即可秆撮,

"rdd.asInstanceOf[HasOffsetRanges].offsetRanges" 如果已經(jīng)經(jīng)過其它Transformations或output操作之后此rdd已經(jīng)不是KafkaRDD,再轉(zhuǎn)換會報錯!换况!

另外還有一個控制能更強(qiáng)的createDirectStream方法职辨,可以指定fromOffsets和messageHandler
def createDirectStream(
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
)

可以將offset保存在zk或redis等外部存儲中方便監(jiān)控盗蟆,然后下次啟動時再從中讀取

分區(qū)partition

Kafka中的partition和Spark中的partition是不同的概念,但createDirectStream方式時topic的總partition數(shù)量和Spark和partition數(shù)量相等舒裤。
```
//KafkaRDD.getPartitions
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}

    ```

partition中數(shù)據(jù)分布不均會導(dǎo)致有些任務(wù)快有些任務(wù)慢喳资,影響整體性能,可以根據(jù)實(shí)際情況做repartition腾供,單個topic比較容易實(shí)現(xiàn)partition中數(shù)據(jù)分布均勻仆邓,但如果同一個程序中需要同時處理多個topic的話,可以考慮能否合并成一個topic伴鳖,增加partition數(shù)量节值,不過topic很多時間會和其它系統(tǒng)共用,所以可能不容易合并榜聂,這情況只能做repartition搞疗。雖然repartition會消耗一些時間,但總的來說须肆,如果數(shù)據(jù)分布不是很均勻的話repartition還是值得匿乃,repartition之后各任務(wù)處理數(shù)據(jù)量基本一樣,而且Locality_level會變成“PROCESS_LOCAL”

P莘汀扳埂!使用flume加載到kafka的使用默認(rèn)配置十有八九分布不勻

檢查點(diǎn)

代碼:

Object SparkApp(){
def gnStreamContext(chkdir:String,batchDuration: Duration,partitions:Int)={
    val conf = new SparkConf().setAppName("GnDataToHive") //.setMaster("local[2]")
    val ssc = new StreamingContext(conf, batchDuration)
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
    ...........
    ...........
    ...........
    val terminfos = ssc.sparkContext.broadcast(ttis) 
    ssc.checkpoint(chkdir)
    ssc
  }
 def main(args: Array[String]): Unit = {
    val chkdir="hdfs://xxxxx/chkpoint/chkpoint-1"
    val chkssc = StreamingContext.getOrCreate(chkdir,()=>gnStreamContext(chkdir,Seconds(args(0).toInt),args(1).toInt))
    chkssc.start()
    chkssc.awaitTermination()
  }
}

offset會在保存至檢查點(diǎn)中,下次啟動會繼續(xù)接著讀取但是以下問題需要注意:

  1. kafka中數(shù)通常保存周期都不會太長瘤礁,都有清理周期阳懂,如果記錄的offset對應(yīng)數(shù)據(jù)已經(jīng)被清理,從檢查點(diǎn)恢復(fù)時程序會一直報錯柜思。

  2. 如果程序邏輯發(fā)生變化岩调,需要先刪除檢查點(diǎn),否則不管數(shù)據(jù)還是邏輯都會從舊檢查點(diǎn)恢復(fù)赡盘。

限流

可以用spark.streaming.kafka.maxRatePerPartition指定每個批次從每個partition中每秒中最大拉取的數(shù)據(jù)量号枕,比如將值設(shè)為1000的話,每秒鐘最多從每個partition中拉取1000條數(shù)據(jù)陨享,如果batchDuration設(shè)為1分鐘的話葱淳,則每個批次最多從每個partition中拉取60000條數(shù)據(jù)。
此值要設(shè)置合理抛姑,太小有可能導(dǎo)致資源浪費(fèi)赞厕,但kafka中的數(shù)據(jù)消費(fèi)不完,太多又達(dá)不到限流的目的

具體代碼見:
DirectKafkaInputDStream.maxMessagesPerPartition
DirectKafkaInputDStream.clamp

    ```
     // limits the maximum number of messages per partition
      protected def clamp(
        leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
        maxMessagesPerPartition.map { mmp =>
          leaderOffsets.map { case (tp, lo) =>
            tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
          }
        }.getOrElse(leaderOffsets)
      }
    ```

spark-submit提交時帶上即可:--conf spark.streaming.kafka.maxRatePerPartition=10000

貌似只能在createDirectStream中起作用定硝,在createStream方式中沒看到有類似設(shè)置

hdfs輸出文件名:

寫入hdfs時默認(rèn)目錄名格式為:"prefix-TIME_IN_MS.suffix"皿桑,每個目錄下的文件名為"part-xxxx"。
如果只想自定義目錄名可以通過foreachRDD,調(diào)用RDD的saveAsXXX dstream.foreachRDD(rdd=>rdd.saveAsxxxx(""))
如果需要自定義輸出的文件名诲侮,需要自定義一個FileOutputFormat的子類镀虐,修改getRecordWriter方法中的name即可,然后調(diào)用saveAsHadoopFile[MyTextOutputFormat[NullWritable, Text]]沟绪。

外部數(shù)據(jù)關(guān)聯(lián)

某些情況下載關(guān)聯(lián)外部數(shù)據(jù)進(jìn)行關(guān)聯(lián)或計算刮便。

  1. 外部數(shù)據(jù)放在redis中,在mapPartitionsforeachRDD.foreachPartitions中關(guān)聯(lián)
  2. 外部數(shù)據(jù)以broadcast變量形式做關(guān)聯(lián)

其它

  1. 日志:提交作業(yè)時spark-submit默認(rèn)會讀取$SPARK_HOME/conf/log4j.properties如果需要自定義可以在提交作業(yè)時可以帶上 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file://xx/xx/log4j.properties
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市绽慈,隨后出現(xiàn)的幾起案子诺核,更是在濱河造成了極大的恐慌,老刑警劉巖久信,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異漓摩,居然都是意外死亡裙士,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門管毙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腿椎,“玉大人,你說我怎么就攤上這事夭咬】姓ǎ” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵卓舵,是天一觀的道長南用。 經(jīng)常有香客問我,道長掏湾,這世上最難降的妖魔是什么裹虫? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮融击,結(jié)果婚禮上筑公,老公的妹妹穿的比我還像新娘。我一直安慰自己尊浪,他們只是感情好匣屡,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著拇涤,像睡著了一般捣作。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上工育,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天虾宇,我揣著相機(jī)與錄音,去河邊找鬼如绸。 笑死嘱朽,一個胖子當(dāng)著我的面吹牛旭贬,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播搪泳,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼稀轨,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了岸军?” 一聲冷哼從身側(cè)響起奋刽,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎艰赞,沒想到半個月后佣谐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡方妖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年狭魂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片党觅。...
    茶點(diǎn)故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡雌澄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出杯瞻,到底是詐尸還是另有隱情镐牺,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布魁莉,位于F島的核電站睬涧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏旗唁。R本人自食惡果不足惜宙地,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望逆皮。 院中可真熱鬧宅粥,春花似錦、人聲如沸电谣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剿牺。三九已至企垦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間晒来,已是汗流浹背钞诡。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人荧降。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓接箫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親朵诫。 傳聞我的和親對象是個殘疾皇子辛友,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容