本篇結構:
- 讀取數(shù)據(jù)塊過程
- 內(nèi)存讀取
- 磁盤讀取
- 遠程讀取
一遏佣、讀取數(shù)據(jù)塊過程
BlockManager 的 get 方法是讀數(shù)據(jù)的入口點平酿,有本地讀取和遠程讀取兩個分叉口塞椎。本地讀取使用 getLocalValues 方法,根據(jù)存儲級別的不同喜庞,使用 MemoryStore.getValues 或者 DiskStore.getBytes 讀取數(shù)據(jù)。
遠程讀取使用 getRemoteValues 方法棋返,調用遠程數(shù)據(jù)傳輸服務類 BlockTransferService 的 fetchBlockSync 獲取數(shù)據(jù)延都。
完整的數(shù)據(jù)讀取過程如下:
二、內(nèi)存讀取
根據(jù)緩存的數(shù)據(jù)是否反序列化睛竣,getLocalValues 讀取內(nèi)存中的數(shù)據(jù)方法不同晰房,如果反序列化,則調用 MemoryStore 的 getValues 方法射沟,如果沒有反序列化殊者,則調用 MemoryStore 的 getBytes 方法。
BlockManager # getLocalValues:
if (level.useMemory && memoryStore.contains(blockId)) {
// 如果反序列化验夯,則直接讀取內(nèi)存中的數(shù)據(jù)
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
// 否則讀取字節(jié)數(shù)組猖吴,并需要做反序列化處理
serializerManager.dataDeserializeStream(
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
// We need to capture the current taskId in case the iterator completion is triggered
// from a different thread which does not have TaskContext set; see SPARK-18406 for
// discussion.
// 返回數(shù)據(jù)及數(shù)據(jù)塊大小、讀取方法等
val ci = CompletionIterator[Any, Iterator[Any]](iter, {
releaseLock(blockId, taskAttemptId)
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
在 MemoryStore 中挥转, getValues 和 getBytes 都根據(jù) BlockId 獲取內(nèi)存中的數(shù)據(jù)塊海蔽。
MemoryStore # getValues:
def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
case DeserializedMemoryEntry(values, _, _) =>
val x = Some(values)
x.map(_.iterator)
}
}
MemoryStore # getBytes:
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
觀察 entries,發(fā)現(xiàn)其實就是一個 LinkedHashMap绑谣。所以緩存在內(nèi)存里的數(shù)據(jù)都是放入 LinkedHashMap 中党窜。
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
LinkedHashMap 保存了插入的順序,遍歷 LinkedHashMap 時借宵,先得到的記錄是先插入的幌衣。如果內(nèi)存不夠,先保存的數(shù)據(jù)會被先清除壤玫。
三泼掠、磁盤讀取
getLocalValues 方法中怔软,根據(jù)緩存級別,如果使用磁盤緩存择镇,則調用 DiskStore 的 getBytes 方法挡逼。
BlockManager # getLocalValues:
else if (level.useDisk && diskStore.contains(blockId)) {
// 從磁盤中獲取數(shù)據(jù),由于保存到磁盤的數(shù)據(jù)是序列化的腻豌,讀取到的數(shù)據(jù)也是序列化后的
val diskData = diskStore.getBytes(blockId)
val iterToReturn: Iterator[Any] = {
if (level.deserialized) {
// 如果儲存級別需要反序列化家坎,則先反序列化,然后根據(jù)是否 level.useMemory 的值吝梅,判斷是否存儲到內(nèi)存中
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskData.toInputStream())(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
// 如果不需要反序列化虱疏,則直接判斷是否需要將這些序列化數(shù)據(jù)緩存到內(nèi)存中
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
.map { _.toInputStream(dispose = false) }
.getOrElse { diskData.toInputStream() }
// 返回的數(shù)據(jù)需做反序列化處理
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskAttemptId)
})
// 返回數(shù)據(jù)及數(shù)據(jù)塊大小、讀取方法等
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
重點看 DiskStore # getBytes:
def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)
securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedBlockData(file, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
}
3.1苏携、獲取磁盤存儲目錄
DiskStore 通過 DiskBlockManager 管理 Block 和相應磁盤文件的映射關系做瞪,從而將 Block 存儲到磁盤的文件中。
val file = diskManager.getFile(blockId.name)
DiskBlockManager 根據(jù) LOCAL_DIRS(yarn模式)右冻,SPARK_LOCAL_DIRS 或 spark.local.dir(其他模式装蓬,默認值 System.getProperty(“java.io.tmpdir“))配置的本地根目錄(可能有多個,以逗號分隔)來生成 DiskStore 存放 Block 的根目錄(與配置的根目錄對應纱扭,也有可能有多個):
- …/blockmgr-UUID.randomUUID.toString(yarn模式)
- …/spark-UUID.randomUUID.toString/blockmgr-UUID.randomUUID.toString(其他模式)
同時 DiskBlockManager 會為每個根目錄生成conf.getInt(“spark.diskStore.subDirectories“, 64) 個子目錄用來存放 Block 對應的文件牍帚,每個 Block 會根據(jù)它的 name 哈希到相應的子目錄,然后以 Block 的 name 為文件名來生成文件存儲乳蛾。
具體過程參看 DiskBlockManager 的 localDirs 屬性賦值過程:
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
DiskBlockManager # createLocalDirs :
/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}
DiskBlockManager # getConfiguredLocalDirs:
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.getenv("SPARK_LOCAL_DIRS").split(",")
} else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
// Mesos already creates a directory per Mesos task. Spark should use that directory
// instead so all temporary files are automatically cleaned up when the Mesos task ends.
// Note that we don't want this if the shuffle service is enabled because we want to
// continue to serve shuffle files after the executors that wrote them have already exited.
Array(conf.getenv("MESOS_DIRECTORY"))
} else {
if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
"spark.shuffle.service.enabled is enabled.")
}
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
// permissions under each listed directory.
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
}
}
3.2暗赶、從文件讀取數(shù)據(jù)塊
參看 DiskBlockData 源碼:
private class DiskBlockData(
minMemoryMapBytes: Long,
maxMemoryMapBytes: Long,
file: File,
blockSize: Long) extends BlockData {
override def toInputStream(): InputStream = new FileInputStream(file)
/**
* Returns a Netty-friendly wrapper for the block's data.
*
* Please see `ManagedBuffer.convertToNetty()` for more details.
*/
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxMemoryMapBytes)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
override def toByteBuffer(): ByteBuffer = {
require(blockSize < maxMemoryMapBytes,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
// For small files, directly read rather than memory map.
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}
override def size: Long = blockSize
override def dispose(): Unit = {}
private def open() = new FileInputStream(file).getChannel
}
提供 toInputStream、toChunkedByteBuffer肃叶、 toByteBuffer 的方式讀取數(shù)據(jù)蹂随。
四、遠程讀取
Spark 讀取遠程節(jié)點的數(shù)據(jù)因惭,依賴 Netty 實現(xiàn)的 Spark Rpc 框架糙及,涉及兩個重要的類:
- NettyBlockTransferService:為 Shuffle、存儲模塊提供了數(shù)據(jù)存取的接口實現(xiàn)筛欢,接收到數(shù)據(jù)存取的命令時浸锨,通過 Netty RPC 框架發(fā)送消息給指定節(jié)點,請求進行數(shù)據(jù)存取操作版姑。
- NettyBlockRpcServer:Executor啟動時柱搜,會啟動 RPC 監(jiān)聽器,當監(jiān)聽到消息時將消息傳遞到該類進行處理剥险,消息包括讀取數(shù)據(jù) OpenBlocks 和寫入數(shù)據(jù) uploadBlock 兩種聪蘸。
4.1、獲取數(shù)據(jù)塊位置
入口為 BlockManager # getRemoteValues,接著調用 getRemoteBytes 方法健爬。在 getRemoteBytes 方法中調用 getLocationsAndStatus 方法向 BlockManagerMasterEndpoint 發(fā)送 GetLocationsAndStatus 消息控乾,請求數(shù)據(jù)塊所在的位置和狀態(tài)。
/**
* Get block from remote block managers.
*
* This does not acquire a lock on this block in this JVM.
*/
private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val ct = implicitly[ClassTag[T]]
getRemoteBytes(blockId).map { data =>
val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
}
}
BlockManagerMaster # getLocationsAndStatus:
/** Get locations as well as status of the blockId from the driver */
def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
GetLocationsAndStatus(blockId))
}
獲取到 Block 的位置列表后娜遵,BlockManager 的 getRemoteBytes 方法中調用 BlockTransferService 的 fetchBlockSync 方法蜕衡。
4.2、向數(shù)據(jù)塊所在節(jié)點發(fā)送 OpenBlocks 消息
BlockTransferService 的 fetchBlockSync 調用其實現(xiàn) NettyBlockTransferService 的fetchBlocks 方法设拟。
/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
*
* It is also only available after [[init]] is invoked.
*/
def fetchBlockSync(
host: String,
port: Int,
execId: String,
blockId: String,
tempFileManager: TempFileManager): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
data match {
case f: FileSegmentManagedBuffer =>
result.success(f)
case _ =>
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
}
}
}, tempFileManager)
ThreadUtils.awaitResult(result.future, Duration.Inf)
}
NettyBlockTransferService # fetchBlocks:
override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: TempFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
// 根據(jù)遠程節(jié)點的地址和端口創(chuàng)建通信客戶端
val client = clientFactory.createClient(host, port)
// 通過該客戶端向指定節(jié)點發(fā)送讀取數(shù)據(jù)消息
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempFileManager).start()
}
}
val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}
fetchBlocks 中慨仿,根據(jù)遠程節(jié)點的地址和端口創(chuàng)建通信客戶端 TransportClient,通過該客戶端向指定節(jié)點發(fā)送讀取數(shù)據(jù)消息纳胧。
消息的具體發(fā)送是在 OneForOneBlockFetcher 的 start 方法中镰吆。
public void start() {
if (blockIds.length == 0) {
throw new IllegalArgumentException("Zero-sized blockIds array");
}
client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
...
}
@Override
public void onFailure(Throwable e) {
...
}
});
}
openMessage 是 OpenBlocks 類型。
this.openMessage = new OpenBlocks(appId, execId, blockIds);
4.3跑慕、遠程節(jié)點響應并傳輸對應的數(shù)據(jù)塊
對應的遠程節(jié)點監(jiān)聽消息万皿,當接收到消息后,在 NettyBlockRpcServer 中進行消息匹配核行。
override def receive(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit = {
val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
logTrace(s"Received request: $message")
message match {
case openBlocks: OpenBlocks =>
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view)
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
// 注冊 ManagedBuffer牢硅,利用 Netty 傳輸
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance()
.deserialize(ByteBuffer.wrap(uploadBlock.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
如上源碼,當匹配到 OpenBlocks 時钮科,調用 BlockManager 的 getBlockData 方法讀取該節(jié)點上的數(shù)據(jù)唤衫。讀取的數(shù)據(jù)塊封裝為 ManagedBuffer 婆赠,然后使用 Netty 傳輸通道绵脯,把數(shù)據(jù)傳遞到請求節(jié)點上,完成數(shù)據(jù)傳輸休里。
BlockManager # getBlockData:
/**
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
case None =>
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
}
}