[spark streaming] 狀態(tài)管理 updateStateByKey&mapWithState

前言
SparkStreaming 7*24 小時不間斷的運行约啊,有時需要管理一些狀態(tài)饵逐,比如wordCount劲阎,每個batch的數(shù)據(jù)不是獨立的而是需要累加的拙徽,這時就需要sparkStreaming來維護一些狀態(tài)刨沦,目前有兩種方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存狀態(tài)的方案膘怕,官方聲稱有10倍性能提升想诅。
updateStateByKey
先上一個示例:
def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = {
val currValueSum = currValues.sum
//上面的Int類型都可以用對象類型替換
Some(currValueSum + preValue.getOrElse(0)) //當前值的和加上歷史值
}
kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)

這里的updateFunction方法就是需要我們自己去實現(xiàn)的狀態(tài)跟新的邏輯,currValues就是當前批次的所有值岛心,preValue是歷史維護的狀態(tài)来破,updateStateByKey返回的是包含歷史所有狀態(tài)信息的DStream,下面我們來看底層是怎么實現(xiàn)狀態(tài)的管理的忘古,通過跟蹤源碼看到最核心的實現(xiàn)方法:
private [this] def computeUsingPreviousRDD(
batchTime: Time,
parentRDD: RDD[(K, V)],
prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
}
updateFuncLocal(batchTime, i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}

可以看到是將parentRDD和preStateRDD進行co-group徘禁,然后將finalFunc方法作用于每個Partition,看到finalFunc方法的實現(xiàn)里面(t._1, t._2._1.toSeq, headOption)這樣的形式髓堪,(key送朱,currValues,preValue)這不就是和我們需要自己實現(xiàn)的updateFun類似的結(jié)構(gòu)嗎干旁,是的沒錯驶沼,我們的方法已經(jīng)被包裝了一次:
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = ssc.withScope {
val cleanedUpdateF = sparkContext.clean(updateFunc)
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true)
}

可以知道每次調(diào)用updateStateByKey都會將舊的狀態(tài)RDD和當前batch的RDD進行co-group來得到一個新的狀態(tài)RDD,即使真正需要跟新的數(shù)據(jù)只有1條也需要將兩個RDD進行cogroup争群,所有的數(shù)據(jù)都會被計算一遍回怜,而且隨著狀態(tài)的不斷增加,運行速度會越來越慢换薄。
為了解決這一問題玉雾,mapWithState應運而生。
mapWithState
先來個示例:
val initialRDD = ssc.sparkContext.parallelize(List(String, Int))
//自定義mappingFunction轻要,累加單詞出現(xiàn)的次數(shù)并更新狀態(tài)
val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
//調(diào)用mapWithState進行管理流數(shù)據(jù)的狀態(tài)
kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()

這里的initialRDD就是初始化狀態(tài)复旬,updateStateByKey也有對應的API。這里的mappingFun也是需要我們自己實現(xiàn)的狀態(tài)跟新邏輯伦腐,調(diào)用state.update()就是對狀態(tài)的跟新赢底,output就是通過mapWithState后返回的DStream中的數(shù)據(jù)形式失都。注意這里不是直接傳入的mappingFunc函數(shù)柏蘑,而是一個StateSpec 的對象,其實也是對函數(shù)的一個包裝而已粹庞。接下來我們跟蹤源碼看看是怎么實現(xiàn)狀態(tài)的管理的咳焚,會創(chuàng)建一個MapWithStateDStreamImpl實例:
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}

當然是要看看其compute方法是怎么實現(xiàn)的:
private val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)

override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}

compute方法又把處理邏輯給了internalStream:InternalMapWithStateDStream,繼續(xù)看InternalMapWithStateDStream的compute方法主要處理邏輯:
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD(K, S)),
partitioner,
validTime
)
}

// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
  context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
  (validTime - interval).milliseconds
}
Some(new MapWithStateRDD(
  prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))

}

先后獲取prevStateRDD和parentRDD庞溜,并且保證使用的是同樣的partitioner革半,接著以兩個rdd為參數(shù)碑定、自定義的mappingFunction函數(shù)、以及key的超時時間等為參數(shù)又創(chuàng)建了MapWithStateRDD又官,該RDD繼承了RDD[MapWithStateRDDRecord[K, S, E]]延刘,MapWithStateRDD中的數(shù)據(jù)都是MapWithStateRDDRecord對象,每個分區(qū)對應一個對象來保存狀態(tài)(這就是為什么兩個RDD需要用同一個Partitioner)六敬,看看MapWithStateRDD的compute方法:
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {

val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
  stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
  stateRDDPartition.partitionedDataRDDPartition, context)

val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
  prevRecord,
  dataIterator,
  mappingFunction,
  batchTime,
  timeoutThresholdTime,
  removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)

}

拿到prevStateRDD和parentRDD對應分區(qū)的迭代器碘赖,接著獲取了prevStateRDD的一條數(shù)據(jù),這個分區(qū)也只有一條MapWithStateRDDRecord類型的數(shù)據(jù)外构,維護了對應分區(qū)所有數(shù)據(jù)狀態(tài)普泡,接著調(diào)用了最核心的方法來跟新狀態(tài),最后返回了只包含一條數(shù)據(jù)的迭代器审编,我們來看看是怎么這個核心的計算邏輯:
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMapK, S }

val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()

// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
dataIterator.foreach { case (key, value) =>
  wrappedState.wrap(newStateMap.get(key))
  val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
  if (wrappedState.isRemoved) {
    newStateMap.remove(key)
  } else if (wrappedState.isUpdated
      || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
    newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
  }
  mappedData ++= returned
}

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime, key, None, wrappedState)
    mappedData ++= returned
    newStateMap.remove(key)
  }
}

MapWithStateRDDRecord(newStateMap, mappedData)

}

先copy了原來的狀態(tài)撼班,接著定義了兩個變量,mappedData是最終要返回的結(jié)果垒酬,wrappedState可以看成是對state的包裝砰嘁,添加了一些額外的方法。
接著遍歷當前批次的數(shù)據(jù)勘究,從狀態(tài)中取出key對應的原來的state般码,并根據(jù)自定義的函數(shù)來對state進行跟新,這里涉及到state的remove&update&timeout來對newStateMap進行跟新操作乱顾,并將有跟新的狀態(tài)加入到了mappedData中板祝。
若有設置超時時間,則還會對超時了的key進行移除走净,也會加入到mappedData中券时,最終通過新的狀態(tài)對象newStateMap和需返回的mappedData數(shù)組構(gòu)建了MapWithStateRDDRecord對象來返回。
而在前面提到的MapWithStateDStreamImpl實例的compute方法中:
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}

調(diào)用的就是這個mappedData數(shù)據(jù)伏伯。
我們發(fā)現(xiàn)返回的都是有update的數(shù)據(jù)橘洞,若要獲取所有的狀態(tài)在mapWithState之后調(diào)用stateSnapshots即可。若要清除某個key的狀態(tài)说搅,可在自定義的方法中調(diào)用state.remove()炸枣。
總結(jié)

updateStateByKey底層是將preSateRDD和parentRDD進行co-group,然后對所有數(shù)據(jù)都將經(jīng)過自定義的mapFun函數(shù)進行一次計算弄唧,即使當前batch只有一條數(shù)據(jù)也會進行這么復雜的計算适肠,大大的降低了性能,并且計算時間會隨著維護的狀態(tài)的增加而增加候引。
mapWithstate底層是創(chuàng)建了一個MapWithStateRDD侯养,存的數(shù)據(jù)是MapWithStateRDDRecord對象,一個Partition對應一個MapWithStateRDDRecord對象澄干,該對象記錄了對應Partition所有的狀態(tài)逛揩,每次只會對當前batch有的數(shù)據(jù)進行跟新柠傍,而不會像updateStateByKey一樣對所有數(shù)據(jù)計算。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辩稽,一起剝皮案震驚了整個濱河市惧笛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逞泄,老刑警劉巖徐紧,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異炭懊,居然都是意外死亡并级,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進店門侮腹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘲碧,“玉大人,你說我怎么就攤上這事父阻∮” “怎么了?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵加矛,是天一觀的道長履婉。 經(jīng)常有香客問我,道長斟览,這世上最難降的妖魔是什么毁腿? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮苛茂,結(jié)果婚禮上已烤,老公的妹妹穿的比我還像新娘。我一直安慰自己妓羊,他們只是感情好胯究,可當我...
    茶點故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著躁绸,像睡著了一般裕循。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上净刮,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天剥哑,我揣著相機與錄音,去河邊找鬼庭瑰。 笑死星持,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的弹灭。 我是一名探鬼主播督暂,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼穷吮!你這毒婦竟也來了逻翁?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤捡鱼,失蹤者是張志新(化名)和其女友劉穎八回,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體驾诈,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡缠诅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了乍迄。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片管引。...
    茶點故事閱讀 40,444評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖闯两,靈堂內(nèi)的尸體忽然破棺而出褥伴,到底是詐尸還是另有隱情,我是刑警寧澤漾狼,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布重慢,位于F島的核電站,受9級特大地震影響逊躁,放射性物質(zhì)發(fā)生泄漏似踱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一稽煤、第九天 我趴在偏房一處隱蔽的房頂上張望屯援。 院中可真熱鬧,春花似錦念脯、人聲如沸狞洋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吉懊。三九已至,卻和暖如春假勿,著一層夾襖步出監(jiān)牢的瞬間借嗽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工转培, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留恶导,地道東北人。 一個月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓浸须,卻偏偏與公主長得像惨寿,于是被迫代替她去往敵國和親邦泄。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內(nèi)容