Spark Streaming源碼解讀之Driver容錯(cuò)安全性

從數(shù)據(jù)層面俺陋,ReceivedBlockTracker為整個(gè)SparkStreaming應(yīng)用程序記錄元數(shù)據(jù)信息。

從調(diào)度層面秧耗,DStreamGraph和JobGenerator是Spark Streaming調(diào)度的核心钉迷,記錄當(dāng)前調(diào)度到哪一進(jìn)度,和業(yè)務(wù)有關(guān)羞芍。

ReceivedBlockTracker在接收到元數(shù)據(jù)信息后調(diào)用addBlock方法,先寫入磁盤中郊艘,然后在寫入內(nèi)存中荷科。

ReceivedBlockTracker:

/**?Add?received?block.?This?event?will?get?written?to?the?write?ahead?log?(if?enabled).?*/

defaddBlock(receivedBlockInfo:?ReceivedBlockInfo):?Boolean?=?{

try?{

val?writeResult?=writeToLog(BlockAdditionEvent(receivedBlockInfo))

if?(writeResult)?{

synchronized?{

getReceivedBlockQueue(receivedBlockInfo.streamId)?+=?receivedBlockInfo

}

logDebug(s"Stream?${receivedBlockInfo.streamId}?received?"?+

s"block?${receivedBlockInfo.blockStoreResult.blockId}")

}?else?{

logDebug(s"Failed?to?acknowledge?stream?${receivedBlockInfo.streamId}?receiving?"?+

s"block?${receivedBlockInfo.blockStoreResult.blockId}?in?the?Write?Ahead?Log.")

}

writeResult

}?catch?{

case?NonFatal(e)?=>

logError(s"Error?adding?block?$receivedBlockInfo",?e)

false

}

}

ReceivedBlockTracker:

private?type?ReceivedBlockQueue?=?mutable.Queue[ReceivedBlockInfo]

// 為分配的ReceivedBlock

private?val?streamIdToUnallocatedBlockQueues?=?new?mutable.HashMap[Int,?ReceivedBlockQueue]

// 已分配的ReceivedBlock

private?val?timeToAllocatedBlocks?=?new?mutable.HashMap[Time,?AllocatedBlocks]

private?val?writeAheadLogOption?=?createWriteAheadLog()

根據(jù)batchTime分配屬于當(dāng)前BatchDuration要處理的數(shù)據(jù)到timToAllocatedBlocks數(shù)據(jù)結(jié)構(gòu)中。

ReceivedBlockTracker:

/**

*?Allocate?all?unallocated?blocks?to?the?given?batch.

*?This?event?will?get?written?to?the?write?ahead?log?(if?enabled).

*/

def?allocateBlocksToBatch(batchTime:?Time):?Unit?=?synchronized?{

if?(lastAllocatedBatchTime?==?null?||?batchTime?>?lastAllocatedBatchTime)?{

val?streamIdToBlocks?=?streamIds.map?{?streamId?=>

(streamId,?getReceivedBlockQueue(streamId).dequeueAll(x?=>?true))

}.toMap

val?allocatedBlocks?=?AllocatedBlocks(streamIdToBlocks)

if?(writeToLog(BatchAllocationEvent(batchTime,?allocatedBlocks)))?{

timeToAllocatedBlocks.put(batchTime,?allocatedBlocks)

lastAllocatedBatchTime?=?batchTime

}?else?{

...

Time類的是一個(gè)case Class纱注,記錄時(shí)間步做,重載了操作符,隱式轉(zhuǎn)換奈附。

case?class?Time(private?val?millis:?Long)?{

def?milliseconds:?Long?=?millis

def?<?(that:?Time):?Boolean?=?(this.millis?<?that.millis)

def?<=?(that:?Time):?Boolean?=?(this.millis?<=?that.millis)

def?>?(that:?Time):?Boolean?=?(this.millis?>?that.millis)

def?>=?(that:?Time):?Boolean?=?(this.millis?>=?that.millis)

def?+?(that:?Duration):?Time?=?new?Time(millis?+?that.milliseconds)

def?-?(that:?Time):?Duration?=?new?Duration(millis?-?that.millis)

def?-?(that:?Duration):?Time?=?new?Time(millis?-?that.milliseconds)

//?Java-friendlier?versions?of?the?above.

def?less(that:?Time):?Boolean?=?this?<?that

def?lessEq(that:?Time):?Boolean?=?this?<=?that

def?greater(that:?Time):?Boolean?=?this?>?that

def?greaterEq(that:?Time):?Boolean?=?this?>=?that

def?plus(that:?Duration):?Time?=?this?+?that

def?minus(that:?Time):?Duration?=?this?-?that

def?minus(that:?Duration):?Time?=?this?-?that

def?floor(that:?Duration):?Time?=?{

val?t?=?that.milliseconds

new?Time((this.millis?/?t)?*?t)

}

def?floor(that:?Duration,?zeroTime:?Time):?Time?=?{

val?t?=?that.milliseconds

new?Time(((this.millis?-?zeroTime.milliseconds)?/?t)?*?t?+?zeroTime.milliseconds)

}

def?isMultipleOf(that:?Duration):?Boolean?=

(this.millis?%?that.milliseconds?==?0)

def?min(that:?Time):?Time?=?if?(this?<?that)?this?else?that

def?max(that:?Time):?Time?=?if?(this?>?that)?this?else?that

def?until(that:?Time,?interval:?Duration):?Seq[Time]?=?{

(this.milliseconds)?until?(that.milliseconds)?by?(interval.milliseconds)?map?(new?Time(_))

}

def?to(that:?Time,?interval:?Duration):?Seq[Time]?=?{

(this.milliseconds)?to?(that.milliseconds)?by?(interval.milliseconds)?map?(new?Time(_))

}

override?def?toString:?String?=?(millis.toString?+?"?ms")

}

object?Time?{

implicit?val?ordering?=?Ordering.by((time:?Time)?=>?time.millis)

}

跟蹤Time對(duì)象全度,ReceiverTracker的allocateBlocksToBatch方法中的入?yún)atchTime是被JobGenerator的generateJobs方法調(diào)用的。

ReceiverTracker:

/**?Allocate?all?unallocated?blocks?to?the?given?batch.?*/

def?allocateBlocksToBatch(batchTime:?Time):?Unit?=?{

if?(receiverInputStreams.nonEmpty)?{

receivedBlockTracker.allocateBlocksToBatch(batchTime)

}

}

JobGenerator的generateJobs方法是被定時(shí)器發(fā)送GenerateJobs消息調(diào)用的斥滤。

JobGenerator:

/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/

private?defgenerateJobs(time:?Time)?{

//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment

//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager

//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.

SparkEnv.set(ssc.env)

Try?{

jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch

graph.generateJobs(time)?//?generate?jobs?using?allocated?block

}?match?{

case?Success(jobs)?=>

val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))

case?Failure(e)?=>

jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)

}

eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))

}

JobGenerator:

/**?Processes?all?events?*/

private?def?processEvent(event:?JobGeneratorEvent)?{

logDebug("Got?event?"?+?event)

event?match?{

caseGenerateJobs(time)?=>generateJobs(time)

case?ClearMetadata(time)?=>?clearMetadata(time)

case?DoCheckpoint(time,?clearCheckpointDataLater)?=>

doCheckpoint(time,?clearCheckpointDataLater)

case?ClearCheckpointData(time)?=>?clearCheckpointData(time)

}

}

JobGenerator:

private?val?timer?=?newRecurringTimer(clock,?ssc.graph.batchDuration.milliseconds,

longTime?=>?eventLoop.post(GenerateJobs(new?Time(longTime))),?"JobGenerator")

GenerateJobs中的時(shí)間參數(shù)就是nextTime将鸵,而nextTime+=period,這個(gè)period就是ssc.graph.batchDuration.milliseconds佑颇。

RecurringTimer:

private?def?triggerActionForNextInterval():?Unit?=?{

clock.waitTillTime(nextTime)

callback(nextTime)

prevTime?=?nextTime

nextTime?+=?period

logDebug("Callback?for?"?+?name?+?"?called?at?time?"?+?prevTime)

}

nextTime的初始值是在start方法中傳入的startTime賦值的顶掉,即RecurringTimer的getStartTime方法的返回值,是當(dāng)前時(shí)間period的(整數(shù)倍+1)挑胸。

RecurringTimer:

/**

*?Start?at?the?given?start?time.

*/

def?start(startTime:?Long):?Long?=?synchronized?{

nextTime?=?startTime

thread.start()

logInfo("Started?timer?for?"?+?name?+?"?at?time?"?+?nextTime)

nextTime

}

JobGenerator:

/**?Starts?the?generator?for?the?first?time?*/

private?def?startFirstTime()?{

val?startTime?=?newTime(timer.getStartTime())

graph.start(startTime?-?graph.batchDuration)

timer.start(startTime.milliseconds)

logInfo("Started?JobGenerator?at?"?+?startTime)

}

RecurringTimer:

/**

*?Get?the?time?when?this?timer?will?fire?if?it?is?started?right?now.

*?The?time?will?be?a?multiple?of?this?timer's?period?and?more?than

*?current?system?time.

*/

def?getStartTime():?Long?=?{

(math.floor(clock.getTimeMillis().toDouble?/?period)?+?1).toLong?*?period

}

Period這個(gè)值是我們調(diào)用new StreamingContext來構(gòu)造StreamingContext時(shí)傳入的Duration值痒筒。

DStreamGraph:

def?setBatchDuration(duration:?Duration)?{

this.synchronized?{

require(batchDuration?==?null,

s"Batch?duration?already?set?as?$batchDuration.?Cannot?set?it?again.")

batchDuration?=?duration

}

}

StreamingContext:

private[streaming]?val?graph:?DStreamGraph?=?{

if?(isCheckpointPresent)?{

cp_.graph.setContext(this)

cp_.graph.restoreCheckpointData()

cp_.graph

}?else?{

require(batchDur_?!=?null,?"Batch?duration?for?StreamingContext?cannot?be?null")

val?newGraph?=?new?DStreamGraph()

newGraph.setBatchDuration(batchDur_)

newGraph

}

}

ReceivedBlockTracker會(huì)清除過期的元數(shù)據(jù)信息,從HashMap中移除茬贵,也是先寫入磁盤簿透,然后在寫入內(nèi)存。

StreamingContext:

class?StreamingContext?private[streaming]?(

sc_?:?SparkContext,

cp_?:?Checkpoint,

batchDur_?:?Duration

)?extends?Logging?{

/**

*?Create?a?StreamingContext?using?an?existing?SparkContext.

*?@param?sparkContext?existing?SparkContext

*?@param?batchDuration?the?time?interval?at?which?streaming?data?will?be?divided?into?batches

*/

def?this(sparkContext:?SparkContext,?batchDuration:?Duration)?=?{

this(sparkContext,?null,?batchDuration)

}

元數(shù)據(jù)的生成解藻,消費(fèi)和銷毀都有WAL老充,所以失敗時(shí)就可以從日志中恢復(fù)。從源碼分析中得出只有設(shè)置了checkpoint目錄螟左,才進(jìn)行WAL機(jī)制啡浊。

ReceiverTracker:

class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{

private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()

private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}

private?val?receivedBlockTracker?=?newReceivedBlockTracker(

ssc.sparkContext.conf,

ssc.sparkContext.hadoopConfiguration,

receiverInputStreamIds,

ssc.scheduler.clock,

ssc.isCheckpointPresent,

Option(ssc.checkpointDir)

)

private?val?listenerBus?=?ssc.scheduler.listenerBus

對(duì)傳入的checkpoint目錄來創(chuàng)建日志目錄進(jìn)行WAL。

ReceivedBlockTracker:

/**?Optionally?create?the?write?ahead?log?manager?only?if?the?feature?is?enabled?*/

private?defcreateWriteAheadLog():?Option[WriteAheadLog]?=?{

checkpointDirOption.map?{?checkpointDir?=>

val?logDir?=?ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

WriteAheadLogUtils.createLogForDriver(conf,?logDir,?hadoopConf)

}

}

這里是在checkpoint目錄下創(chuàng)建文件夾名為receivedBlockMetadata的文件夾來保存WAL記錄的數(shù)據(jù)胶背。

ReceivedBlockTracker:

private[streaming]?object?ReceivedBlockTracker?{

def?checkpointDirToLogDir(checkpointDir:?String):?String?=?{

new?Path(checkpointDir,?"receivedBlockMetadata").toString

}

}

ReceivedBlockTracker:

/**?Write?an?update?to?the?tracker?to?the?write?ahead?log?*/

private?def?writeToLog(record:?ReceivedBlockTrackerLogEvent):?Boolean?=?{

if?(isWriteAheadLogEnabled)?{

logTrace(s"Writing?record:?$record")

try?{

writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

clock.getTimeMillis())

true

}?catch?{

case?NonFatal(e)?=>

logWarning(s"Exception?thrown?while?writing?record:?$record?to?the?WriteAheadLog.",?e)

false

}

}?else?{

true

}

}

把當(dāng)前的DStream和JobGenerator的狀態(tài)進(jìn)行checkpoint巷嚣,該方法是在generateJobs方法最后通過發(fā)送DoCheckpoint消息,來調(diào)用的钳吟。

JobGenerator:

/**?Perform?checkpoint?for?the?give?`time`.?*/

private?defdoCheckpoint(time:?Time,?clearCheckpointDataLater:?Boolean)?{

if?(shouldCheckpoint?&&?(time?-?graph.zeroTime).isMultipleOf(ssc.checkpointDuration))?{

logInfo("Checkpointing?graph?for?time?"?+?time)

ssc.graph.updateCheckpointData(time)

checkpointWriter.write(new?Checkpoint(ssc,?time),?clearCheckpointDataLater)

}

}

JobGenerator:

/**?Processes?all?events?*/

private?def?processEvent(event:?JobGeneratorEvent)?{

logDebug("Got?event?"?+?event)

event?match?{

case?GenerateJobs(time)?=>?generateJobs(time)

case?ClearMetadata(time)?=>?clearMetadata(time)

case?DoCheckpoint(time,?clearCheckpointDataLater)?=>

doCheckpoint(time,?clearCheckpointDataLater)

case?ClearCheckpointData(time)?=>?clearCheckpointData(time)

}

}

JobGenerator:

/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/

private?defgenerateJobs(time:?Time)?{

//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment

//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager

//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.

SparkEnv.set(ssc.env)

Try?{

jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch

graph.generateJobs(time)?//?generate?jobs?using?allocated?block

}?match?{

case?Success(jobs)?=>

val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))

case?Failure(e)?=>

jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)

}

eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))

}

總結(jié):

ReceivedBlockTracker是通過WAL方式來進(jìn)行數(shù)據(jù)容錯(cuò)的穆壕。

DStreamGraph和JobGenerator是通過checkpoint方式來進(jìn)行數(shù)據(jù)容錯(cuò)的瓮钥。


備注:

資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)

更多私密內(nèi)容矮固,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark

如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課树枫,地址YY房間號(hào):68917580

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末直焙,一起剝皮案震驚了整個(gè)濱河市景东,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌奔誓,老刑警劉巖斤吐,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異厨喂,居然都是意外死亡和措,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門蜕煌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來派阱,“玉大人,你說我怎么就攤上這事斜纪∑赌福” “怎么了?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵盒刚,是天一觀的道長(zhǎng)腺劣。 經(jīng)常有香客問我,道長(zhǎng)因块,這世上最難降的妖魔是什么橘原? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮涡上,結(jié)果婚禮上趾断,老公的妹妹穿的比我還像新娘。我一直安慰自己吩愧,他們只是感情好歼冰,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耻警,像睡著了一般隔嫡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上甘穿,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天腮恩,我揣著相機(jī)與錄音,去河邊找鬼温兼。 笑死秸滴,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的募判。 我是一名探鬼主播荡含,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼咒唆,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了释液?” 一聲冷哼從身側(cè)響起全释,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎误债,沒想到半個(gè)月后浸船,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡寝蹈,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年李命,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片箫老。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡封字,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出耍鬓,到底是詐尸還是另有隱情阔籽,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布界斜,位于F島的核電站仿耽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏各薇。R本人自食惡果不足惜项贺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望峭判。 院中可真熱鬧开缎,春花似錦、人聲如沸林螃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疗认。三九已至完残,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間横漏,已是汗流浹背谨设。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缎浇,地道東北人扎拣。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親二蓝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子誉券,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容