在Spark中杰捂,只要涉及到非Partition級別的數據脆侮,都會有一個Block的概念怔蚌,而這里的Block并不是HDFS的Block别厘,而是Spark內部為了數據存儲而設立的一個概念怎棱,每個Block都會有BlockId哩俭,BlockInfo等信息。所以這里先介紹一下BlockId和BlockInfo拳恋。
1 BlockId
該部分代碼在spark-core模塊的org.apache.spark.storage包中凡资,可以看到是將其作為存儲部分進行區(qū)分的。BlockId是一個不可修改的抽象類谬运,有一個name隙赁,及幾個判斷方法;
sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String
// convenience methods
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
override def toString: String = name
}
BlockId有多個繼承case class梆暖,包括RDDBlockId伞访、ShuffleBlockId、ShuffleIndexBlockId等轰驳,只根據傳入的rddId/reduceId/mapId等來計算的到Blockid厚掷,是一個字符串。
還定義了一個BlockId的對象滑废,用于解構具體的BlockId蝗肪,根據名稱的到具體的BlockId,使用了scala的模式匹配以及正則表達式:
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_DATA(shuffleId, mapId, reduceId) => ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case BROADCAST(broadcastId, field) => BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
case TASKRESULT(taskId) => TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEMP_LOCAL(uuid) => TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) => TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) => TestBlockId(value)
case _ => throw new UnrecognizedBlockId(name)
}
}
通過正則表達式蠕趁,我們也可以得知不同的Block在命名上有什么區(qū)別薛闪。
2 BlockInfo
用戶描述Block得元數據信息,包括得屬性又:
level:Block得存儲級別俺陋,即StorageLevel
classTag:Block得類型
tellMaster:Block是否需要告知Master
_size:Block得大小
_readerCount:當Block被加鎖時候得讀取次數
_writerTask:由于Task在寫B(tài)lock時候需要獲得鎖豁延,這里存放每次獲取鎖得TaskId昙篙,初始值為-1;
3 BlockManager
每個Executor會創(chuàng)建一個BlockManager诱咏,在其中運行的一個或多個Task都會共用BlockManager苔可。我們所常用的Broadcast的數據被driver分發(fā)后,就存儲在BlockManager中袋狞。
BlockManager有所有關于存儲焚辅、使用、移除Block的方法苟鸯。