前言
SparkStreaming 7*24 小時不間斷的運行约啊,有時需要管理一些狀態(tài)饵逐,比如wordCount劲阎,每個batch的數(shù)據(jù)不是獨立的而是需要累加的拙徽,這時就需要sparkStreaming來維護一些狀態(tài)刨沦,目前有兩種方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存狀態(tài)的方案膘怕,官方聲稱有10倍性能提升想诅。
updateStateByKey
先上一個示例:
def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = {
val currValueSum = currValues.sum
//上面的Int類型都可以用對象類型替換
Some(currValueSum + preValue.getOrElse(0)) //當前值的和加上歷史值
}
kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)
這里的updateFunction方法就是需要我們自己去實現(xiàn)的狀態(tài)跟新的邏輯,currValues就是當前批次的所有值岛心,preValue是歷史維護的狀態(tài)来破,updateStateByKey返回的是包含歷史所有狀態(tài)信息的DStream,下面我們來看底層是怎么實現(xiàn)狀態(tài)的管理的忘古,通過跟蹤源碼看到最核心的實現(xiàn)方法:
private [this] def computeUsingPreviousRDD(
batchTime: Time,
parentRDD: RDD[(K, V)],
prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
}
updateFuncLocal(batchTime, i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}
可以看到是將parentRDD和preStateRDD進行co-group徘禁,然后將finalFunc方法作用于每個Partition,看到finalFunc方法的實現(xiàn)里面(t._1, t._2._1.toSeq, headOption)這樣的形式髓堪,(key送朱,currValues,preValue)這不就是和我們需要自己實現(xiàn)的updateFun類似的結(jié)構(gòu)嗎干旁,是的沒錯驶沼,我們的方法已經(jīng)被包裝了一次:
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = ssc.withScope {
val cleanedUpdateF = sparkContext.clean(updateFunc)
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true)
}
可以知道每次調(diào)用updateStateByKey都會將舊的狀態(tài)RDD和當前batch的RDD進行co-group來得到一個新的狀態(tài)RDD,即使真正需要跟新的數(shù)據(jù)只有1條也需要將兩個RDD進行cogroup争群,所有的數(shù)據(jù)都會被計算一遍回怜,而且隨著狀態(tài)的不斷增加,運行速度會越來越慢换薄。
為了解決這一問題玉雾,mapWithState應運而生。
mapWithState
先來個示例:
val initialRDD = ssc.sparkContext.parallelize(List(String, Int))
//自定義mappingFunction轻要,累加單詞出現(xiàn)的次數(shù)并更新狀態(tài)
val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
//調(diào)用mapWithState進行管理流數(shù)據(jù)的狀態(tài)
kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()
這里的initialRDD就是初始化狀態(tài)复旬,updateStateByKey也有對應的API。這里的mappingFun也是需要我們自己實現(xiàn)的狀態(tài)跟新邏輯伦腐,調(diào)用state.update()就是對狀態(tài)的跟新赢底,output就是通過mapWithState后返回的DStream中的數(shù)據(jù)形式失都。注意這里不是直接傳入的mappingFunc函數(shù)柏蘑,而是一個StateSpec 的對象,其實也是對函數(shù)的一個包裝而已粹庞。接下來我們跟蹤源碼看看是怎么實現(xiàn)狀態(tài)的管理的咳焚,會創(chuàng)建一個MapWithStateDStreamImpl實例:
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
當然是要看看其compute方法是怎么實現(xiàn)的:
private val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}
compute方法又把處理邏輯給了internalStream:InternalMapWithStateDStream,繼續(xù)看InternalMapWithStateDStream的compute方法主要處理邏輯:
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD(K, S)),
partitioner,
validTime
)
}
// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
先后獲取prevStateRDD和parentRDD庞溜,并且保證使用的是同樣的partitioner革半,接著以兩個rdd為參數(shù)碑定、自定義的mappingFunction函數(shù)、以及key的超時時間等為參數(shù)又創(chuàng)建了MapWithStateRDD又官,該RDD繼承了RDD[MapWithStateRDDRecord[K, S, E]]延刘,MapWithStateRDD中的數(shù)據(jù)都是MapWithStateRDDRecord對象,每個分區(qū)對應一個對象來保存狀態(tài)(這就是為什么兩個RDD需要用同一個Partitioner)六敬,看看MapWithStateRDD的compute方法:
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}
拿到prevStateRDD和parentRDD對應分區(qū)的迭代器碘赖,接著獲取了prevStateRDD的一條數(shù)據(jù),這個分區(qū)也只有一條MapWithStateRDDRecord類型的數(shù)據(jù)外构,維護了對應分區(qū)所有數(shù)據(jù)狀態(tài)普泡,接著調(diào)用了最核心的方法來跟新狀態(tài),最后返回了只包含一條數(shù)據(jù)的迭代器审编,我們來看看是怎么這個核心的計算邏輯:
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMapK, S }
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()
// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
dataIterator.foreach { case (key, value) =>
wrappedState.wrap(newStateMap.get(key))
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
}
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
MapWithStateRDDRecord(newStateMap, mappedData)
}
先copy了原來的狀態(tài)撼班,接著定義了兩個變量,mappedData是最終要返回的結(jié)果垒酬,wrappedState可以看成是對state的包裝砰嘁,添加了一些額外的方法。
接著遍歷當前批次的數(shù)據(jù)勘究,從狀態(tài)中取出key對應的原來的state般码,并根據(jù)自定義的函數(shù)來對state進行跟新,這里涉及到state的remove&update&timeout來對newStateMap進行跟新操作乱顾,并將有跟新的狀態(tài)加入到了mappedData中板祝。
若有設置超時時間,則還會對超時了的key進行移除走净,也會加入到mappedData中券时,最終通過新的狀態(tài)對象newStateMap和需返回的mappedData數(shù)組構(gòu)建了MapWithStateRDDRecord對象來返回。
而在前面提到的MapWithStateDStreamImpl實例的compute方法中:
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}
調(diào)用的就是這個mappedData數(shù)據(jù)伏伯。
我們發(fā)現(xiàn)返回的都是有update的數(shù)據(jù)橘洞,若要獲取所有的狀態(tài)在mapWithState之后調(diào)用stateSnapshots即可。若要清除某個key的狀態(tài)说搅,可在自定義的方法中調(diào)用state.remove()炸枣。
總結(jié)
updateStateByKey底層是將preSateRDD和parentRDD進行co-group,然后對所有數(shù)據(jù)都將經(jīng)過自定義的mapFun函數(shù)進行一次計算弄唧,即使當前batch只有一條數(shù)據(jù)也會進行這么復雜的計算适肠,大大的降低了性能,并且計算時間會隨著維護的狀態(tài)的增加而增加候引。
mapWithstate底層是創(chuàng)建了一個MapWithStateRDD侯养,存的數(shù)據(jù)是MapWithStateRDDRecord對象,一個Partition對應一個MapWithStateRDDRecord對象澄干,該對象記錄了對應Partition所有的狀態(tài)逛揩,每次只會對當前batch有的數(shù)據(jù)進行跟新柠傍,而不會像updateStateByKey一樣對所有數(shù)據(jù)計算。