在場景系統(tǒng)中,通過SparkStream直接消費kafka數(shù)據(jù)互纯,出現(xiàn)處理邏輯耗時在毫秒級瑟幕,但是很多的job delay。
示例代碼如下:
valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)
valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)
valtopicSet = topics.split(",").toSet
valkafkaParams =Map("metadata.broker.list"-> brokerList)
logger.info(s"設備運行數(shù)據(jù)kafka brokerList:$brokerList, topics:$topics")
valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))
//采用直接消費的方式,每次只會消費最新的數(shù)據(jù)只盹,對于當前實時業(yè)務適用
valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
data.map(dataParse).print()
代碼邏輯非常簡單辣往,在dataParse方法中也只是進行json的解析,但是一個任務處理都達到4s殖卑,而SparkStream設置為了1s一個批次站削,從而導致越來越多的job等待,如下圖:
從上圖發(fā)現(xiàn)存在stage持續(xù)時間為4s孵稽,故查看其詳細信息许起,發(fā)現(xiàn)當前stage存在很長時間的空閑,如下圖:
故查看executor端日志菩鲜,發(fā)現(xiàn)11:09:09完成task計算后到11:09:12期間园细,executor處于空閑狀態(tài),日志如下:
此時接校,追蹤driver端日志猛频,試圖從driver發(fā)現(xiàn)當前任務在進行怎樣的操作,發(fā)現(xiàn)這段一段日志:
17/09/29 11:09:12 DEBUG scheduler.TaskSetManager: Moving to RACK_LOCAL after waiting for 3000ms蛛勉,在等待3000ms后移動到機架本地模式鹿寻,繼續(xù)追查當前stage啟動時間,找到日志如下:
也就是說诽凌,在9~12這個時間點中毡熏,當前task都在進行一個等待操作,而超時間為3000ms皿淋,超時后執(zhí)行了Moving to RACK_LOCAL操作招刹,并檢測到本地級別的機架本地沒有任務,所以移動到Any級別窝趣。
追蹤源碼試圖找出當前job進行了什么樣的操作疯暑,定位到源碼如下:
從else的判斷條件可知,當(當前時間 - 最新Task啟動時間) > 本地等待時間哑舒,即會答應當前l(fā)og妇拯,繼續(xù)追蹤發(fā)現(xiàn)源碼內容:
至此我們已經找到了上面task等待3s的原因,在設置sparkConf的時候洗鸵,并沒有設置當前三個參數(shù)越锈,則取默認值,但是這個配置又是做什么的呢膘滨?
查找了相關資料甘凭,并向大牛請教后得到這樣的解釋:
spark在消費數(shù)據(jù)時,優(yōu)先采用節(jié)點本地模式火邓,即NODE_LOCAL(節(jié)點本地模式)>RACK_LOCAL(機架本地模式)>ANY(任意)丹弱,這樣在大數(shù)據(jù)量時可以做到減少網(wǎng)絡io德撬,每一批數(shù)據(jù)默認會等待三秒,如果三秒后數(shù)據(jù)所在節(jié)點上依舊沒有啟動task后躲胳,會修改為RACK_LOCAL蜓洪,并且提交任務,失敗后立馬改為ANY模式坯苹。
而基于當前業(yè)務隆檀,SparkStream必須每1s處理一批數(shù)據(jù),并且只給定了一個executor粹湃,所以大部分的節(jié)點上是不存在task的恐仑,如果每批數(shù)據(jù)等待節(jié)點本地啟動task,這樣會導致越來越多的job delay再芋。故只能修改相關參數(shù)的默認值菊霜,跳過wait,直接將模式設置為ANY济赎,修改代碼如下:
valsparkConf =newSparkConf().setAppName("Scene")
sparkConf.set("spark.locality.wait.process","0")
sparkConf.set("spark.locality.wait.node","0")
sparkConf.set("spark.locality.wait.rack","0")
valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)
valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)
valtopicSet = topics.split(",").toSet
valkafkaParams =Map("metadata.broker.list"-> brokerList)
logger.info(s"設備運行數(shù)據(jù)kafka brokerList:$brokerList, topics:$topics")
valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))
//采用直接消費的方式,每次只會消費最新的數(shù)據(jù)记某,對于當前實時業(yè)務適用
valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
data.map(dataParse).print()
問題圓滿解決司训。