從數(shù)據(jù)層面俺陋,ReceivedBlockTracker為整個(gè)SparkStreaming應(yīng)用程序記錄元數(shù)據(jù)信息。
從調(diào)度層面秧耗,DStreamGraph和JobGenerator是Spark Streaming調(diào)度的核心钉迷,記錄當(dāng)前調(diào)度到哪一進(jìn)度,和業(yè)務(wù)有關(guān)羞芍。
ReceivedBlockTracker在接收到元數(shù)據(jù)信息后調(diào)用addBlock方法,先寫入磁盤中郊艘,然后在寫入內(nèi)存中荷科。
ReceivedBlockTracker:
/**?Add?received?block.?This?event?will?get?written?to?the?write?ahead?log?(if?enabled).?*/
defaddBlock(receivedBlockInfo:?ReceivedBlockInfo):?Boolean?=?{
try?{
val?writeResult?=writeToLog(BlockAdditionEvent(receivedBlockInfo))
if?(writeResult)?{
synchronized?{
getReceivedBlockQueue(receivedBlockInfo.streamId)?+=?receivedBlockInfo
}
logDebug(s"Stream?${receivedBlockInfo.streamId}?received?"?+
s"block?${receivedBlockInfo.blockStoreResult.blockId}")
}?else?{
logDebug(s"Failed?to?acknowledge?stream?${receivedBlockInfo.streamId}?receiving?"?+
s"block?${receivedBlockInfo.blockStoreResult.blockId}?in?the?Write?Ahead?Log.")
}
writeResult
}?catch?{
case?NonFatal(e)?=>
logError(s"Error?adding?block?$receivedBlockInfo",?e)
false
}
}
ReceivedBlockTracker:
private?type?ReceivedBlockQueue?=?mutable.Queue[ReceivedBlockInfo]
// 為分配的ReceivedBlock
private?val?streamIdToUnallocatedBlockQueues?=?new?mutable.HashMap[Int,?ReceivedBlockQueue]
// 已分配的ReceivedBlock
private?val?timeToAllocatedBlocks?=?new?mutable.HashMap[Time,?AllocatedBlocks]
private?val?writeAheadLogOption?=?createWriteAheadLog()
根據(jù)batchTime分配屬于當(dāng)前BatchDuration要處理的數(shù)據(jù)到timToAllocatedBlocks數(shù)據(jù)結(jié)構(gòu)中。
ReceivedBlockTracker:
/**
*?Allocate?all?unallocated?blocks?to?the?given?batch.
*?This?event?will?get?written?to?the?write?ahead?log?(if?enabled).
*/
def?allocateBlocksToBatch(batchTime:?Time):?Unit?=?synchronized?{
if?(lastAllocatedBatchTime?==?null?||?batchTime?>?lastAllocatedBatchTime)?{
val?streamIdToBlocks?=?streamIds.map?{?streamId?=>
(streamId,?getReceivedBlockQueue(streamId).dequeueAll(x?=>?true))
}.toMap
val?allocatedBlocks?=?AllocatedBlocks(streamIdToBlocks)
if?(writeToLog(BatchAllocationEvent(batchTime,?allocatedBlocks)))?{
timeToAllocatedBlocks.put(batchTime,?allocatedBlocks)
lastAllocatedBatchTime?=?batchTime
}?else?{
...
Time類的是一個(gè)case Class纱注,記錄時(shí)間步做,重載了操作符,隱式轉(zhuǎn)換奈附。
case?class?Time(private?val?millis:?Long)?{
def?milliseconds:?Long?=?millis
def?<?(that:?Time):?Boolean?=?(this.millis?<?that.millis)
def?<=?(that:?Time):?Boolean?=?(this.millis?<=?that.millis)
def?>?(that:?Time):?Boolean?=?(this.millis?>?that.millis)
def?>=?(that:?Time):?Boolean?=?(this.millis?>=?that.millis)
def?+?(that:?Duration):?Time?=?new?Time(millis?+?that.milliseconds)
def?-?(that:?Time):?Duration?=?new?Duration(millis?-?that.millis)
def?-?(that:?Duration):?Time?=?new?Time(millis?-?that.milliseconds)
//?Java-friendlier?versions?of?the?above.
def?less(that:?Time):?Boolean?=?this?<?that
def?lessEq(that:?Time):?Boolean?=?this?<=?that
def?greater(that:?Time):?Boolean?=?this?>?that
def?greaterEq(that:?Time):?Boolean?=?this?>=?that
def?plus(that:?Duration):?Time?=?this?+?that
def?minus(that:?Time):?Duration?=?this?-?that
def?minus(that:?Duration):?Time?=?this?-?that
def?floor(that:?Duration):?Time?=?{
val?t?=?that.milliseconds
new?Time((this.millis?/?t)?*?t)
}
def?floor(that:?Duration,?zeroTime:?Time):?Time?=?{
val?t?=?that.milliseconds
new?Time(((this.millis?-?zeroTime.milliseconds)?/?t)?*?t?+?zeroTime.milliseconds)
}
def?isMultipleOf(that:?Duration):?Boolean?=
(this.millis?%?that.milliseconds?==?0)
def?min(that:?Time):?Time?=?if?(this?<?that)?this?else?that
def?max(that:?Time):?Time?=?if?(this?>?that)?this?else?that
def?until(that:?Time,?interval:?Duration):?Seq[Time]?=?{
(this.milliseconds)?until?(that.milliseconds)?by?(interval.milliseconds)?map?(new?Time(_))
}
def?to(that:?Time,?interval:?Duration):?Seq[Time]?=?{
(this.milliseconds)?to?(that.milliseconds)?by?(interval.milliseconds)?map?(new?Time(_))
}
override?def?toString:?String?=?(millis.toString?+?"?ms")
}
object?Time?{
implicit?val?ordering?=?Ordering.by((time:?Time)?=>?time.millis)
}
跟蹤Time對(duì)象全度,ReceiverTracker的allocateBlocksToBatch方法中的入?yún)atchTime是被JobGenerator的generateJobs方法調(diào)用的。
ReceiverTracker:
/**?Allocate?all?unallocated?blocks?to?the?given?batch.?*/
def?allocateBlocksToBatch(batchTime:?Time):?Unit?=?{
if?(receiverInputStreams.nonEmpty)?{
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
}
JobGenerator的generateJobs方法是被定時(shí)器發(fā)送GenerateJobs消息調(diào)用的斥滤。
JobGenerator:
/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/
private?defgenerateJobs(time:?Time)?{
//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment
//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager
//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.
SparkEnv.set(ssc.env)
Try?{
jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch
graph.generateJobs(time)?//?generate?jobs?using?allocated?block
}?match?{
case?Success(jobs)?=>
val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))
case?Failure(e)?=>
jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)
}
eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))
}
JobGenerator:
/**?Processes?all?events?*/
private?def?processEvent(event:?JobGeneratorEvent)?{
logDebug("Got?event?"?+?event)
event?match?{
caseGenerateJobs(time)?=>generateJobs(time)
case?ClearMetadata(time)?=>?clearMetadata(time)
case?DoCheckpoint(time,?clearCheckpointDataLater)?=>
doCheckpoint(time,?clearCheckpointDataLater)
case?ClearCheckpointData(time)?=>?clearCheckpointData(time)
}
}
JobGenerator:
private?val?timer?=?newRecurringTimer(clock,?ssc.graph.batchDuration.milliseconds,
longTime?=>?eventLoop.post(GenerateJobs(new?Time(longTime))),?"JobGenerator")
GenerateJobs中的時(shí)間參數(shù)就是nextTime将鸵,而nextTime+=period,這個(gè)period就是ssc.graph.batchDuration.milliseconds佑颇。
RecurringTimer:
private?def?triggerActionForNextInterval():?Unit?=?{
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime?=?nextTime
nextTime?+=?period
logDebug("Callback?for?"?+?name?+?"?called?at?time?"?+?prevTime)
}
nextTime的初始值是在start方法中傳入的startTime賦值的顶掉,即RecurringTimer的getStartTime方法的返回值,是當(dāng)前時(shí)間period的(整數(shù)倍+1)挑胸。
RecurringTimer:
/**
*?Start?at?the?given?start?time.
*/
def?start(startTime:?Long):?Long?=?synchronized?{
nextTime?=?startTime
thread.start()
logInfo("Started?timer?for?"?+?name?+?"?at?time?"?+?nextTime)
nextTime
}
JobGenerator:
/**?Starts?the?generator?for?the?first?time?*/
private?def?startFirstTime()?{
val?startTime?=?newTime(timer.getStartTime())
graph.start(startTime?-?graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started?JobGenerator?at?"?+?startTime)
}
RecurringTimer:
/**
*?Get?the?time?when?this?timer?will?fire?if?it?is?started?right?now.
*?The?time?will?be?a?multiple?of?this?timer's?period?and?more?than
*?current?system?time.
*/
def?getStartTime():?Long?=?{
(math.floor(clock.getTimeMillis().toDouble?/?period)?+?1).toLong?*?period
}
Period這個(gè)值是我們調(diào)用new StreamingContext來構(gòu)造StreamingContext時(shí)傳入的Duration值痒筒。
DStreamGraph:
def?setBatchDuration(duration:?Duration)?{
this.synchronized?{
require(batchDuration?==?null,
s"Batch?duration?already?set?as?$batchDuration.?Cannot?set?it?again.")
batchDuration?=?duration
}
}
StreamingContext:
private[streaming]?val?graph:?DStreamGraph?=?{
if?(isCheckpointPresent)?{
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
}?else?{
require(batchDur_?!=?null,?"Batch?duration?for?StreamingContext?cannot?be?null")
val?newGraph?=?new?DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
ReceivedBlockTracker會(huì)清除過期的元數(shù)據(jù)信息,從HashMap中移除茬贵,也是先寫入磁盤簿透,然后在寫入內(nèi)存。
StreamingContext:
class?StreamingContext?private[streaming]?(
sc_?:?SparkContext,
cp_?:?Checkpoint,
batchDur_?:?Duration
)?extends?Logging?{
/**
*?Create?a?StreamingContext?using?an?existing?SparkContext.
*?@param?sparkContext?existing?SparkContext
*?@param?batchDuration?the?time?interval?at?which?streaming?data?will?be?divided?into?batches
*/
def?this(sparkContext:?SparkContext,?batchDuration:?Duration)?=?{
this(sparkContext,?null,?batchDuration)
}
元數(shù)據(jù)的生成解藻,消費(fèi)和銷毀都有WAL老充,所以失敗時(shí)就可以從日志中恢復(fù)。從源碼分析中得出只有設(shè)置了checkpoint目錄螟左,才進(jìn)行WAL機(jī)制啡浊。
ReceiverTracker:
class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{
private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()
private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}
private?val?receivedBlockTracker?=?newReceivedBlockTracker(
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
private?val?listenerBus?=?ssc.scheduler.listenerBus
對(duì)傳入的checkpoint目錄來創(chuàng)建日志目錄進(jìn)行WAL。
ReceivedBlockTracker:
/**?Optionally?create?the?write?ahead?log?manager?only?if?the?feature?is?enabled?*/
private?defcreateWriteAheadLog():?Option[WriteAheadLog]?=?{
checkpointDirOption.map?{?checkpointDir?=>
val?logDir?=?ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
WriteAheadLogUtils.createLogForDriver(conf,?logDir,?hadoopConf)
}
}
這里是在checkpoint目錄下創(chuàng)建文件夾名為receivedBlockMetadata的文件夾來保存WAL記錄的數(shù)據(jù)胶背。
ReceivedBlockTracker:
private[streaming]?object?ReceivedBlockTracker?{
def?checkpointDirToLogDir(checkpointDir:?String):?String?=?{
new?Path(checkpointDir,?"receivedBlockMetadata").toString
}
}
ReceivedBlockTracker:
/**?Write?an?update?to?the?tracker?to?the?write?ahead?log?*/
private?def?writeToLog(record:?ReceivedBlockTrackerLogEvent):?Boolean?=?{
if?(isWriteAheadLogEnabled)?{
logTrace(s"Writing?record:?$record")
try?{
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
}?catch?{
case?NonFatal(e)?=>
logWarning(s"Exception?thrown?while?writing?record:?$record?to?the?WriteAheadLog.",?e)
false
}
}?else?{
true
}
}
把當(dāng)前的DStream和JobGenerator的狀態(tài)進(jìn)行checkpoint巷嚣,該方法是在generateJobs方法最后通過發(fā)送DoCheckpoint消息,來調(diào)用的钳吟。
JobGenerator:
/**?Perform?checkpoint?for?the?give?`time`.?*/
private?defdoCheckpoint(time:?Time,?clearCheckpointDataLater:?Boolean)?{
if?(shouldCheckpoint?&&?(time?-?graph.zeroTime).isMultipleOf(ssc.checkpointDuration))?{
logInfo("Checkpointing?graph?for?time?"?+?time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new?Checkpoint(ssc,?time),?clearCheckpointDataLater)
}
}
JobGenerator:
/**?Processes?all?events?*/
private?def?processEvent(event:?JobGeneratorEvent)?{
logDebug("Got?event?"?+?event)
event?match?{
case?GenerateJobs(time)?=>?generateJobs(time)
case?ClearMetadata(time)?=>?clearMetadata(time)
case?DoCheckpoint(time,?clearCheckpointDataLater)?=>
doCheckpoint(time,?clearCheckpointDataLater)
case?ClearCheckpointData(time)?=>?clearCheckpointData(time)
}
}
JobGenerator:
/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/
private?defgenerateJobs(time:?Time)?{
//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment
//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager
//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.
SparkEnv.set(ssc.env)
Try?{
jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch
graph.generateJobs(time)?//?generate?jobs?using?allocated?block
}?match?{
case?Success(jobs)?=>
val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))
case?Failure(e)?=>
jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)
}
eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))
}
總結(jié):
ReceivedBlockTracker是通過WAL方式來進(jìn)行數(shù)據(jù)容錯(cuò)的穆壕。
DStreamGraph和JobGenerator是通過checkpoint方式來進(jìn)行數(shù)據(jù)容錯(cuò)的瓮钥。
備注:
資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容矮固,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark
如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課树枫,地址YY房間號(hào):68917580