RDD的三個(gè)問(wèn)題
1.RDD到底是怎么生成的
2.具體執(zhí)行的時(shí)候破加,是否和基于Spark Core上的RDD有所不同瘦黑,runtime級(jí)別的
3.運(yùn)行之后我們對(duì)RDD如何處理。會(huì)隨batch duration不斷的產(chǎn)生RDD,內(nèi)存無(wú)法完全容納這些對(duì)象。
每個(gè)batch
duration產(chǎn)生的作業(yè)執(zhí)行完RDD之后怎么對(duì)以有的RDD進(jìn)行管理是一個(gè)問(wèn)題笛坦。
RDD生成的全生命周期:
ForEachDStream不一定會(huì)觸發(fā)job的執(zhí)行区转,會(huì)觸發(fā)job產(chǎn)生,但job真正產(chǎn)生是由timer定時(shí)器產(chǎn)生的版扩。
對(duì)DStream進(jìn)行操作其實(shí)就是對(duì)RDD進(jìn)行操作废离,是因?yàn)镈Stream就是一套R(shí)DD的模板,后面的DStream與前面的DStream有依賴(lài)關(guān)系礁芦。因?yàn)閺暮笸耙蕾?lài)所以可以推出前面的RDD(回溯)
* DStreams internally is characterized by a few basic properties:
*- A list of other DStreams that the DStream depends on
*? - A time interval at which the DStream generates an RDD
*? - A function that is used to generate an RDD after each time interval
abstract classDStream[T: ClassTag] (
@transientprivate[streaming]varssc: StreamingContext
)extendsSerializablewithLogging {
源碼
DStream
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
defprint(num: Int): Unit = ssc.withScope {
defforeachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
valfirstNum =
rdd.take(num +1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: "+ time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if(firstNum.length > num)println("...")
println()
// scalastyle:on println}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps =false)
}
private defforeachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
newForEachDStream(this,
context.sparkContext.clean(foreachFunc,false), displayInnerRDDOps).register()
}
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming]final
defgetOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse{
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if(isTimeValid(time)) {
valrddOption =createRDDWithLocalProperties(time, displayInnerRDDOps =false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach {casenewRDD =>
// Register the generated RDD for caching and checkpointingif(storageLevel!=
StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD${newRDD.id}for time$timeto$storageLevel")
}
if(checkpointDuration!=null&&
(time -zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD${newRDD.id}for time$timefor
checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
}else{
None
}
}
}
/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */private[streaming]defisTimeValid(time: Time): Boolean = {
if(!isInitialized) {
throw newSparkException (this+" has not been
initialized")
}else if(time <=zeroTime|| !
(time -zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time "+ time +" is
invalid as zeroTime is "+zeroTime+
" and
slideDuration is "+ slideDuration +" and difference is "+ (time -zeroTime))
false}else{
logDebug("Time "+ time +" is
valid")
true}
}
SocketInputDStream繼承自ReceiverInputDStream
private[streaming]
classSocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host:String,
port: Int,
bytesToObjects: InputStream =>Iterator[T],
storageLevel: StorageLevel
)extendsReceiverInputDStream[T](ssc_) {
ReceiverInputDStream
/**
* Generates RDDs with blocks received by the receiver of this stream. */
override
defcompute(validTime: Time):
Option[RDD[T]] = {
valblockRDD= {
if(validTime <graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
newBlockRDD[T](ssc.sc, Array.empty)
}else{
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
valreceiverTracker = ssc.scheduler.receiverTrackervalblockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id,Seq.empty)
// Register the input blocks information into InputInfoTrackervalinputInfo =StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDDcreateBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
private[streaming]defcreateBlockRDD(time: Time,
blockInfos:Seq[ReceivedBlockInfo]): RDD[T] = {
if(blockInfos.nonEmpty) {
valblockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocksvalareWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if(areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDDvalisBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
valwalRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
newWriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
}else{
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if(blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks
do not have Write Ahead Log information; "+
"this is unexpected and data may not be recoverable after
driver failures")
}else{
logWarning("Some blocks have Write Ahead Log information; this is
unexpected")
}
}
valvalidBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
if(validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be
recovered as they were not found in memory. "+
"To prevent such data loss, enabled Write Ahead Log (see
programming guide "+
"for more
details.")
}
newBlockRDD[T](ssc.sc, validBlockIds)
}
}else{
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
newWriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
}else{
newBlockRDD[T](ssc.sc, Array.empty)
}
}
}
MappedDStream
private[streaming]
classMappedDStream[T: ClassTag,U: ClassTag] (
parent: DStream[T],
mapFunc:T=>U
)extendsDStream[U](parent.ssc) {
override defdependencies:List[DStream[_]] =List(parent)
override defslideDuration: Duration = parent.slideDuration
override defcompute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}
ForEachDStream
private[streaming]
classForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
)extendsDStream[Unit](parent.ssc) {
override defdependencies:List[DStream[_]] =List(parent)
override defslideDuration: Duration = parent.slideDuration
override defcompute(validTime: Time): Option[RDD[Unit]] = None
override defgenerateJob(time: Time): Option[Job] = {
parent.getOrCompute(time)match{
caseSome(rdd) =>
valjobFunc = () =>createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(newJob(time, jobFunc))
caseNone => None
}
}
}
備注:
資料來(lái)源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容蜻韭,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark
如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽(tīng)由王家林老師每天晚上20:00開(kāi)設(shè)的Spark永久免費(fèi)公開(kāi)課柿扣,地址YY房間號(hào):68917580