spark streaming stateful DStream 持久保存RDD/有狀態(tài)的內(nèi)存

在面向流處理的分布式計(jì)算中,經(jīng)常會(huì)有這種需求,希望需要處理的某個(gè)數(shù)據(jù)集能夠不隨著流式數(shù)據(jù)的流逝而消失。

以spark streaming為例固灵,就是希望有個(gè)數(shù)據(jù)集能夠在當(dāng)前批次中更新,再下個(gè)批次后又可以繼續(xù)訪問(wèn)劫流。一個(gè)最簡(jiǎn)單的實(shí)現(xiàn)是在driver的內(nèi)存中巫玻,我們可以自行保存一個(gè)大的內(nèi)存結(jié)構(gòu)。這種hack的方式就是我們無(wú)法利用spark提供的分布式計(jì)算的能力困介。

對(duì)此大审,spark streaming提供了stateful streaming, 可以創(chuàng)建一個(gè)有狀態(tài)的DStream蘸际,我們可以操作一個(gè)跨越不同批次的RDD座哩。

1 updateStateByKey

該方法提供了這樣的一種機(jī)制: 維護(hù)了一個(gè)可以跨越不同批次的RDD, 姑且成為StateRDD粮彤,在每個(gè)批次遍歷StateRDD的所有數(shù)據(jù)根穷,對(duì)每條數(shù)據(jù)執(zhí)行update方法。當(dāng)update方法返回None時(shí)导坟,淘汰StateRDD中的該條數(shù)據(jù)屿良。

具體接口如下:

/**

* Return a new "state" DStream where the state for each key is updated by applying

* the given function on the previous state of the key and the new values of each key.

* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.

* @param updateFunc State update function. If `this` function returns None, then

*                   corresponding state key-value pair will be eliminated.

* @param numPartitions Number of partitions of each RDD in the new DStream.

* @tparam S State type

*/

def updateStateByKey[S: ClassTag](

    updateFunc: (Seq[V], Option[S]) => Option[S],

    numPartitions: Int

  ): DStream[(K, S)] = ssc.withScope {

  updateStateByKey(updateFunc, defaultPartitioner(numPartitions))

}

即用戶需要實(shí)現(xiàn)一個(gè)updateFunc的函數(shù),該函數(shù)的參數(shù):

Seq[V] 該批次中相同key的數(shù)據(jù)惫周,以Seq數(shù)組形式傳遞

Option[S] 歷史狀態(tài)中的數(shù)據(jù)

返回值: 返回需要保持的歷史狀態(tài)數(shù)據(jù)尘惧,為None時(shí)表示刪除該數(shù)據(jù)

def updateStateFunc(lines: Seq[Array[String]], state: Option[Array[String]]): Option[Array[String]] = {...}

這種做法簡(jiǎn)單清晰明了,但是其中有一些可以優(yōu)化的地方:

a) 如果DRDD增長(zhǎng)到比較大的時(shí)候递递,而每個(gè)進(jìn)入的批次數(shù)據(jù)量相比并不大喷橙,此時(shí)每次都需要遍歷DRDD,無(wú)論該批次中是否有數(shù)據(jù)需要更新DRDD登舞。這種情況有的時(shí)候可能會(huì)引發(fā)性能問(wèn)題贰逾。

b) 需要用戶自定義數(shù)據(jù)的淘汰機(jī)制。有的時(shí)候顯得不是那么方便菠秒。

c) 返回的類型需要和緩存中的類型相同疙剑。類型不能發(fā)生改變。

2 mapWithState

該接口是對(duì)updateSateByKey的改良,解決了updateStateFunc中可以優(yōu)化的地方:

/**

* :: Experimental ::

* Return a [[MapWithStateDStream]] by applying a function to every key-value element of

* `this` stream, while maintaining some state data for each unique key. The mapping function

* and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this

* transformation can be specified using [[StateSpec]] class. The state data is accessible in

* as a parameter of type [[State]] in the mapping function.

*

* Example of using `mapWithState`:

* {{{

*    // A mapping function that maintains an integer state and return a String

*    def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {

*      // Use state.exists(), state.get(), state.update() and state.remove()

*      // to manage state, and return the necessary string

*    }

*

*    val spec = StateSpec.function(mappingFunction).numPartitions(10)

*

*    val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)

* }}}

*

* @param spec          Specification of this transformation

* @tparam StateType    Class type of the state data

* @tparam MappedType   Class type of the mapped data

*/

@Experimental

def mapWithState[StateType: ClassTag, MappedType: ClassTag](

    spec: StateSpec[K, V, StateType, MappedType]

  ): MapWithStateDStream[K, V, StateType, MappedType] = {

  new MapWithStateDStreamImpl[K, V, StateType, MappedType](

    self,

    spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]

  )

}

其中spec封裝了用戶自定義的函數(shù)言缤,用以更新緩存數(shù)據(jù):

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType

實(shí)現(xiàn)樣例如下:

val mappingFunc = (k: String, line: Option[Array[String]], state: State[Array[String]]) => {...}

參數(shù)分別代表:

數(shù)據(jù)的key: k

RDD中的每行數(shù)據(jù): line

state: 緩存數(shù)據(jù)

當(dāng)對(duì)state調(diào)用remove方法時(shí)嚼蚀,該數(shù)據(jù)會(huì)被刪除。

注意管挟,如果數(shù)據(jù)超時(shí)驰坊,不要調(diào)用remove方法,因?yàn)閟park會(huì)在mappingFunc后自動(dòng)調(diào)用remove哮独。

a) 與updateStateByKey 每次都要遍歷緩存數(shù)據(jù)不同拳芙,mapWithState每次遍歷每個(gè)批次中的數(shù)據(jù),更新緩存中的數(shù)據(jù)皮璧。對(duì)于緩存數(shù)據(jù)較大的情況來(lái)說(shuō)舟扎,性能會(huì)有較大提升。

b) 提供了內(nèi)置的超時(shí)機(jī)制悴务,當(dāng)數(shù)據(jù)一定時(shí)間內(nèi)沒(méi)有更新時(shí)睹限,淘汰相應(yīng)數(shù)據(jù)。

注意讯檐,當(dāng)有數(shù)據(jù)到來(lái)或者有超時(shí)發(fā)生時(shí)羡疗,mappingFunc都會(huì)被調(diào)用。

3 checkpointing

通常情況下别洪,在一個(gè)DStream鐘叨恨,對(duì)RDD的各種轉(zhuǎn)換而依賴的數(shù)據(jù)都是來(lái)自于當(dāng)前批次中。但是當(dāng)在進(jìn)行有狀態(tài)的transformations時(shí)挖垛,包括updateStateByKey/reduceByKeyAndWindow 痒钝、mapWithSate,還會(huì)依賴于以前批次的數(shù)據(jù)痢毒,RDD的容錯(cuò)機(jī)制送矩,在異常情況需要重新計(jì)算RDD時(shí),需要以前批次的RDD信息哪替。如果這個(gè)依賴的鏈路過(guò)長(zhǎng)栋荸,會(huì)需要大量的內(nèi)存,即使有些RDD的數(shù)據(jù)在內(nèi)存中凭舶,不需要計(jì)算晌块。此時(shí)spark通過(guò)checkpoint來(lái)打破依賴鏈路。checkpoint會(huì)生成一個(gè)新的RDD到hdfs中库快,該RDD是計(jì)算后的結(jié)果集摸袁,而沒(méi)有對(duì)之前的RDD依賴。

此時(shí)一定要啟用checkpointing义屏,以進(jìn)行周期性的RDD Checkpointing

在StateDstream在實(shí)現(xiàn)RDD的compute方法時(shí)靠汁,就是將之前的PreStateRDD與當(dāng)前批次中依賴的ParentRDD進(jìn)行合并蜂大。

而checkpoint的實(shí)現(xiàn)是將上述合并的RDD寫(xiě)入HDFS中。

現(xiàn)在checkpoint的實(shí)現(xiàn)中蝶怔,數(shù)據(jù)寫(xiě)入hdfs的過(guò)程是由一個(gè)固定的線程池異步完成的奶浦。一種存在的風(fēng)險(xiǎn)是上次checkpoint的數(shù)據(jù)尚未完成,此次又來(lái)了新的要寫(xiě)的checkpoint數(shù)據(jù)踢星,會(huì)加大集群的負(fù)載澳叉,可能會(huì)引發(fā)一系列的問(wèn)題。

4 checkpoint周期設(shè)置:

對(duì)mapWithStateByKey/updateStateByKey返回的DStream可以調(diào)用checkpoint方法設(shè)置checkpoint的周期沐悦。注意傳遞的時(shí)間只能是批次時(shí)間的整數(shù)倍成洗。

另外,對(duì)于mapWithState而言藏否,checkpoint執(zhí)行時(shí)瓶殃,才會(huì)進(jìn)行數(shù)據(jù)的刪除。 State.remove方法只是設(shè)置狀態(tài)副签,標(biāo)記為刪除遥椿,數(shù)據(jù)并不會(huì)真的刪除。 SnapShot方法還是可以獲取得到淆储。

參考:
[1] https://halfvim.github.io/2016/06/19/Checkpointing-in-Spark-Streaming/
[2] http://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末冠场,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子本砰,更是在濱河造成了極大的恐慌碴裙,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灌具,死亡現(xiàn)場(chǎng)離奇詭異青团,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)咖楣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)芦昔,“玉大人诱贿,你說(shuō)我怎么就攤上這事」径校” “怎么了珠十?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)凭豪。 經(jīng)常有香客問(wèn)我焙蹭,道長(zhǎng),這世上最難降的妖魔是什么嫂伞? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任孔厉,我火速辦了婚禮拯钻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘撰豺。我一直安慰自己粪般,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布污桦。 她就那樣靜靜地躺著亩歹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪凡橱。 梳的紋絲不亂的頭發(fā)上小作,一...
    開(kāi)封第一講書(shū)人閱讀 51,370評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音稼钩,去河邊找鬼躲惰。 笑死,一個(gè)胖子當(dāng)著我的面吹牛变抽,可吹牛的內(nèi)容都是我干的础拨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼绍载,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诡宗!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起击儡,我...
    開(kāi)封第一講書(shū)人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤塔沃,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后阳谍,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蛀柴,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年矫夯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鸽疾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡训貌,死狀恐怖制肮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情递沪,我是刑警寧澤豺鼻,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站款慨,受9級(jí)特大地震影響儒飒,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜檩奠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一桩了、第九天 我趴在偏房一處隱蔽的房頂上張望附帽。 院中可真熱鬧,春花似錦圣猎、人聲如沸士葫。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)慢显。三九已至,卻和暖如春欠啤,著一層夾襖步出監(jiān)牢的瞬間荚藻,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工洁段, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留应狱,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓祠丝,卻偏偏與公主長(zhǎng)得像疾呻,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子写半,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容