MapWithStateDStream
MapWithStateDStream
為mapWithState
算子的結(jié)果挪蹭;
def stateSnapshots(): DStream[(KeyType, StateType)]
-
MapWithStateDStream
是sealed abstract class
類型暴区,因此所有其實現(xiàn)均在其srouce文件中可見鸣驱; -
MapWithStateDStreamImpl
是MapWithStateDStream
的唯一實現(xiàn);
sealed
關(guān)鍵字的作用:
其修飾的trait莲兢,class只能在當(dāng)前文件里面被繼承
用sealed修飾這樣做的目的是告訴scala編譯器在檢查模式匹配的時候妄壶,讓scala知道這些case的所有情況吊奢,scala就能夠在編譯的時候進(jìn)行檢查央勒,看你寫的代碼是否有沒有漏掉什么沒case到,減少編程的錯誤葱跋。
MapWithStateDStreamImpl
-
MapWithStateDStreamImpl
為內(nèi)部(私有)持寄、其父依賴為key-value的DStream; - 其內(nèi)部實現(xiàn)依賴`InternalMapWithStateDStream類娱俺;
-
slideDuration
/dependencies
值均取自internalStream
變量稍味;
InternalMapWithStateDStream
-
InternalMapWithStateDStream
用于實現(xiàn)MapWithStateDStreamImpl
; - 其集成
DStream[MapWithStateRDDRecord[K, S, E]]
類荠卷,并默認(rèn)使用MEMORY_ONLY
存儲級別模庐; - 其使用
StateSpec
的HashPartitioner
作為其分區(qū); - 其強(qiáng)制執(zhí)行checkpoint(
override val mustCheckpoint = true
)油宜,如果checkpointDuration
為空掂碱,則設(shè)置為sliceDuration窗口大小慎冤;
InternalMapWithStateDStream.compute()
/** Method that generates an RDD for the given time */
// 生成給定時間的RDD疼燥,其主要作用是將State操作->轉(zhuǎn)換為MapWithRecordRDD
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
// 如果之前的RDD的partition不一致,需要基于partition index作為key進(jìn)行repartition粪薛,
// 這是確保state RDD 在使用之前是paritition正確
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}
// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
// 基于之前的state RDD悴了,計算新的RDD
// 如果沒有data RDD,使用一個空的創(chuàng)建
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
下面我們研究MapWithStateRDD.createFromPairRDD
方法违寿,
def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
pairRDD: RDD[(K, S)],
partitioner: Partitioner,
updateTime: Time): MapWithStateRDD[K, V, S, E] = {
// 將pairRDD轉(zhuǎn)換為 MapWithStateRDDRecord()
val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
}, preservesPartitioning = true)
val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)
val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None
new MapWithStateRDD[K, V, S, E](
stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
}
MapWithStateRDD
- 繼承RDD, 其Dependencies依賴prevStateRDD和partitionedDataRDD;
RDD[MapWithStateRDDRecord[K, S, E]](
partitionedDataRDD.sparkContext,
List(
new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
new OneToOneDependency(partitionedDataRDD))
其compute()邏輯:
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}
其主要依賴MapWithStateRDDRecord.updateRecordWithData
的方法熟空,生成一個Iterator迭代器藤巢,其中stateMap存儲了key的狀態(tài),mappedData存儲了mapping function函數(shù)的返回值
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
// 如果之前的state map存在息罗,則clone它掂咒;
// 否則則創(chuàng)建一個空的;
// Key -> State之間的mapping ,存儲了key的狀態(tài)
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
// 調(diào)動mappingFunction()的返回結(jié)果集,mapping function函數(shù)的返回值
val mappedData = new ArrayBuffer[E]
// State的wrap實現(xiàn)
val wrappedState = new StateImpl[S]()
// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
// 此處調(diào)用mappingFunction方法,并更新其state存儲狀態(tài)
dataIterator.foreach { case (key, value) =>
wrappedState.wrap(newStateMap.get(key))
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
}
// Get the timed out state records, call the mapping function on each and collect the
// data returned
// 用戶可以設(shè)置超時時的處理機(jī)制绍刮,此處遍歷所有超時key温圆,并觸發(fā)其超時邏輯
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
MapWithStateRDDRecord(newStateMap, mappedData)
}
StateMap
/** Internal interface for defining the map that keeps track of sessions. */
private[streaming] abstract class StateMap[K, S] extends Serializable {
/** Get the state for a key if it exists */
def get(key: K): Option[S]
/** Get all the keys and states whose updated time is older than the given threshold time */
def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
/** Get all the keys and states in this map. */
def getAll(): Iterator[(K, S, Long)]
/** Add or update state */
def put(key: K, state: S, updatedTime: Long): Unit
/** Remove a key */
def remove(key: K): Unit
/**
* Shallow copy `this` map to create a new state map.
* Updates to the new map should not mutate `this` map.
*/
def copy(): StateMap[K, S]
def toDebugString(): String = toString()
}
- 位置org.apache.spark.streaming.util.StateMap;
- 存儲Spark Streaming 狀態(tài)信息類孩革;
- 默認(rèn)提供
EmptyStateMap
和OpenHashMapBasedStateMap
兩種實現(xiàn)岁歉; - OpenHashMap為支持
nullable
d的HashMap,其性能為jdk默認(rèn)HashMap的5倍以上膝蜈,但是當(dāng)處理0.0/0/0L/non-exist值時锅移,用戶需要小心;
Demo
object SparkStatefulRunner {
/**
* Aggregates User Sessions using Stateful Streaming transformations.
*
* Usage: SparkStatefulRunner <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*/
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SparkRunner <hostname> <port>")
System.exit(1)
}
val sparkConfig = loadConfigOrThrow[SparkConfiguration]("spark")
val sparkContext = new SparkContext(sparkConfig.sparkMasterUrl, "Spark Stateful Streaming")
val ssc = new StreamingContext(sparkContext, Milliseconds(4000))
ssc.checkpoint(sparkConfig.checkpointDirectory)
val stateSpec =
StateSpec
.function(updateUserEvents _)
.timeout(Minutes(sparkConfig.timeoutInMinutes))
ssc
.socketTextStream(args(0), args(1).toInt)
.map(deserializeUserEvent)
.filter(_ != UserEvent.empty)
.mapWithState(stateSpec)
.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.foreach(maybeUserSession => maybeUserSession.foreach {
userSession =>
// Store user session here
println(userSession)
})
}
}
ssc.start()
ssc.awaitTermination()
}
def deserializeUserEvent(json: String): (Int, UserEvent) = {
json.decodeEither[UserEvent] match {
case \/-(userEvent) =>
(userEvent.id, userEvent)
case -\/(error) =>
println(s"Failed to parse user event: $error")
(UserEvent.empty.id, UserEvent.empty)
}
}
def updateUserEvents(key: Int,
value: Option[UserEvent],
state: State[UserSession]): Option[UserSession] = {
def updateUserSessions(newEvent: UserEvent): Option[UserSession] = {
val existingEvents: Seq[UserEvent] =
state
.getOption()
.map(_.userEvents)
.getOrElse(Seq[UserEvent]())
val updatedUserSessions = UserSession(newEvent +: existingEvents)
updatedUserSessions.userEvents.find(_.isLast) match {
case Some(_) =>
state.remove()
Some(updatedUserSessions)
case None =>
state.update(updatedUserSessions)
None
}
}
value match {
case Some(newEvent) => updateUserSessions(newEvent)
case _ if state.isTimingOut() => state.getOption()
}
}
}
參考:
- spark-stateful-example: https://github.com/YuvalItzchakov/spark-stateful-example
- 分析stateful的一篇文章: http://www.reibang.com/p/261636f397b8
- databricks的example: https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html