Spark Streaming之MapWithStateDSteam

MapWithStateDStream

MapWithStateDStreammapWithState算子的結(jié)果挪蹭;

def stateSnapshots(): DStream[(KeyType, StateType)]
  • MapWithStateDStreamsealed abstract class類型暴区,因此所有其實現(xiàn)均在其srouce文件中可見鸣驱;
  • MapWithStateDStreamImplMapWithStateDStream的唯一實現(xiàn);

sealed關(guān)鍵字的作用:

其修飾的trait莲兢,class只能在當(dāng)前文件里面被繼承
用sealed修飾這樣做的目的是告訴scala編譯器在檢查模式匹配的時候妄壶,讓scala知道這些case的所有情況吊奢,scala就能夠在編譯的時候進(jìn)行檢查央勒,看你寫的代碼是否有沒有漏掉什么沒case到,減少編程的錯誤葱跋。

MapWithStateDStreamImpl

  • MapWithStateDStreamImpl為內(nèi)部(私有)持寄、其父依賴為key-value的DStream;
  • 其內(nèi)部實現(xiàn)依賴`InternalMapWithStateDStream類娱俺;
  • slideDuration/dependencies值均取自internalStream變量稍味;

InternalMapWithStateDStream

  • InternalMapWithStateDStream用于實現(xiàn)MapWithStateDStreamImpl
  • 其集成DStream[MapWithStateRDDRecord[K, S, E]]類荠卷,并默認(rèn)使用MEMORY_ONLY存儲級別模庐;
  • 其使用StateSpecHashPartitioner作為其分區(qū);
  • 其強(qiáng)制執(zhí)行checkpoint(override val mustCheckpoint = true)油宜,如果checkpointDuration為空掂碱,則設(shè)置為sliceDuration窗口大小慎冤;

InternalMapWithStateDStream.compute()

  /** Method that generates an RDD for the given time */
  // 生成給定時間的RDD疼燥,其主要作用是將State操作->轉(zhuǎn)換為MapWithRecordRDD
  override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
    // Get the previous state or create a new empty state RDD
    val prevStateRDD = getOrCompute(validTime - slideDuration) match {
      case Some(rdd) =>
        if (rdd.partitioner != Some(partitioner)) {
          // If the RDD is not partitioned the right way, let us repartition it using the
          // partition index as the key. This is to ensure that state RDD is always partitioned
          // before creating another state RDD using it
          // 如果之前的RDD的partition不一致,需要基于partition index作為key進(jìn)行repartition粪薛,
          // 這是確保state RDD 在使用之前是paritition正確
          MapWithStateRDD.createFromRDD[K, V, S, E](
            rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
        } else {
          rdd
        }
      case None =>
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )
    }


    // Compute the new state RDD with previous state RDD and partitioned data RDD
    // Even if there is no data RDD, use an empty one to create a new state RDD
    // 基于之前的state RDD悴了,計算新的RDD
    // 如果沒有data RDD,使用一個空的創(chuàng)建
    val dataRDD = parent.getOrCompute(validTime).getOrElse {
      context.sparkContext.emptyRDD[(K, V)]
    }
    val partitionedDataRDD = dataRDD.partitionBy(partitioner)
    val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
      (validTime - interval).milliseconds
    }
    Some(new MapWithStateRDD(
      prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
  }

下面我們研究MapWithStateRDD.createFromPairRDD方法违寿,

def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      pairRDD: RDD[(K, S)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {
    
    // 將pairRDD轉(zhuǎn)換為 MapWithStateRDDRecord()
    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)

    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)

    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](
      stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
  }

MapWithStateRDD

  • 繼承RDD, 其Dependencies依賴prevStateRDD和partitionedDataRDD;
RDD[MapWithStateRDDRecord[K, S, E]](
    partitionedDataRDD.sparkContext,
    List(
      new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
      new OneToOneDependency(partitionedDataRDD))

其compute()邏輯:

 override def compute(
      partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {

    val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
    val prevStateRDDIterator = prevStateRDD.iterator(
      stateRDDPartition.previousSessionRDDPartition, context)
    val dataIterator = partitionedDataRDD.iterator(
      stateRDDPartition.partitionedDataRDDPartition, context)

    val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
    val newRecord = MapWithStateRDDRecord.updateRecordWithData(
      prevRecord,
      dataIterator,
      mappingFunction,
      batchTime,
      timeoutThresholdTime,
      removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    )
    Iterator(newRecord)
  }

其主要依賴MapWithStateRDDRecord.updateRecordWithData的方法熟空,生成一個Iterator迭代器藤巢,其中stateMap存儲了key的狀態(tài),mappedData存儲了mapping function函數(shù)的返回值

    // Create a new state map by cloning the previous one (if it exists) or by creating an empty one
    // 如果之前的state map存在息罗,則clone它掂咒;
    // 否則則創(chuàng)建一個空的;
    // Key -> State之間的mapping ,存儲了key的狀態(tài)
    val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
    
    // 調(diào)動mappingFunction()的返回結(jié)果集,mapping function函數(shù)的返回值
    val mappedData = new ArrayBuffer[E]
    
    // State的wrap實現(xiàn)
    val wrappedState = new StateImpl[S]()

    // Call the mapping function on each record in the data iterator, and accordingly
    // update the states touched, and collect the data returned by the mapping function
    // 此處調(diào)用mappingFunction方法,并更新其state存儲狀態(tài)
    dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated
          || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

    // Get the timed out state records, call the mapping function on each and collect the
    // data returned
    // 用戶可以設(shè)置超時時的處理機(jī)制绍刮,此處遍歷所有超時key温圆,并觸發(fā)其超時邏輯
    if (removeTimedoutData && timeoutThresholdTime.isDefined) {
      newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
        wrappedState.wrapTimingOutState(state)
        val returned = mappingFunction(batchTime, key, None, wrappedState)
        mappedData ++= returned
        newStateMap.remove(key)
      }
    }

    MapWithStateRDDRecord(newStateMap, mappedData)
  }

StateMap

/** Internal interface for defining the map that keeps track of sessions. */
private[streaming] abstract class StateMap[K, S] extends Serializable {

  /** Get the state for a key if it exists */
  def get(key: K): Option[S]

  /** Get all the keys and states whose updated time is older than the given threshold time */
  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]

  /** Get all the keys and states in this map. */
  def getAll(): Iterator[(K, S, Long)]

  /** Add or update state */
  def put(key: K, state: S, updatedTime: Long): Unit

  /** Remove a key */
  def remove(key: K): Unit

  /**
   * Shallow copy `this` map to create a new state map.
   * Updates to the new map should not mutate `this` map.
   */
  def copy(): StateMap[K, S]

  def toDebugString(): String = toString()
}
  • 位置org.apache.spark.streaming.util.StateMap;
  • 存儲Spark Streaming 狀態(tài)信息類孩革;
  • 默認(rèn)提供EmptyStateMapOpenHashMapBasedStateMap兩種實現(xiàn)岁歉;
  • OpenHashMap為支持nullabled的HashMap,其性能為jdk默認(rèn)HashMap的5倍以上膝蜈,但是當(dāng)處理0.0/0/0L/non-exist值時锅移,用戶需要小心;

Demo

object SparkStatefulRunner {
  /**
    * Aggregates User Sessions using Stateful Streaming transformations.
    *
    * Usage: SparkStatefulRunner <hostname> <port>
    * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
    */
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: SparkRunner <hostname> <port>")
      System.exit(1)
    }

    val sparkConfig = loadConfigOrThrow[SparkConfiguration]("spark")

    val sparkContext = new SparkContext(sparkConfig.sparkMasterUrl, "Spark Stateful Streaming")
    val ssc = new StreamingContext(sparkContext, Milliseconds(4000))
    ssc.checkpoint(sparkConfig.checkpointDirectory)

    val stateSpec =
      StateSpec
        .function(updateUserEvents _)
        .timeout(Minutes(sparkConfig.timeoutInMinutes))

    ssc
      .socketTextStream(args(0), args(1).toInt)
      .map(deserializeUserEvent)
      .filter(_ != UserEvent.empty)
      .mapWithState(stateSpec)
      .foreachRDD { rdd =>
        if (!rdd.isEmpty()) {
          rdd.foreach(maybeUserSession => maybeUserSession.foreach {
            userSession =>
              // Store user session here
              println(userSession)
          })
        }
      }

    ssc.start()
    ssc.awaitTermination()
  }

  def deserializeUserEvent(json: String): (Int, UserEvent) = {
    json.decodeEither[UserEvent] match {
      case \/-(userEvent) =>
        (userEvent.id, userEvent)
      case -\/(error) =>
        println(s"Failed to parse user event: $error")
        (UserEvent.empty.id, UserEvent.empty)
    }
  }

  def updateUserEvents(key: Int,
                       value: Option[UserEvent],
                       state: State[UserSession]): Option[UserSession] = {
    def updateUserSessions(newEvent: UserEvent): Option[UserSession] = {
      val existingEvents: Seq[UserEvent] =
        state
          .getOption()
          .map(_.userEvents)
          .getOrElse(Seq[UserEvent]())

      val updatedUserSessions = UserSession(newEvent +: existingEvents)

      updatedUserSessions.userEvents.find(_.isLast) match {
        case Some(_) =>
          state.remove()
          Some(updatedUserSessions)
        case None =>
          state.update(updatedUserSessions)
          None
      }
    }

    value match {
      case Some(newEvent) => updateUserSessions(newEvent)
      case _ if state.isTimingOut() => state.getOption()
    }
  }
}

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饱搏,一起剝皮案震驚了整個濱河市非剃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌推沸,老刑警劉巖备绽,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異鬓催,居然都是意外死亡肺素,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門深浮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來压怠,“玉大人,你說我怎么就攤上這事飞苇【保” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵布卡,是天一觀的道長雨让。 經(jīng)常有香客問我,道長忿等,這世上最難降的妖魔是什么栖忠? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮贸街,結(jié)果婚禮上庵寞,老公的妹妹穿的比我還像新娘。我一直安慰自己薛匪,他們只是感情好捐川,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著逸尖,像睡著了一般古沥。 火紅的嫁衣襯著肌膚如雪瘸右。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天岩齿,我揣著相機(jī)與錄音太颤,去河邊找鬼。 笑死盹沈,一個胖子當(dāng)著我的面吹牛龄章,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播襟诸,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼瓦堵,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了歌亲?” 一聲冷哼從身側(cè)響起菇用,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陷揪,沒想到半個月后惋鸥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡悍缠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年卦绣,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片飞蚓。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡滤港,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出趴拧,到底是詐尸還是另有隱情溅漾,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布著榴,位于F島的核電站添履,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏脑又。R本人自食惡果不足惜暮胧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望问麸。 院中可真熱鬧往衷,春花似錦、人聲如沸严卖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽妄田。三九已至俺亮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間疟呐,已是汗流浹背脚曾。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留启具,地道東北人本讥。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像鲁冯,于是被迫代替她去往敵國和親拷沸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容