目錄
前言
在上一篇文章中杆查,我們認識了Spark管理磁盤塊的組件DiskBlockManager扮惦,本文接著來看真正負責磁盤存儲的組件DiskStore,以及與它相關的BlockData亲桦。這部分內容會涉及到一點與Java NIO相關的東西崖蜜,看官需要稍微注意一下。
磁盤存儲DiskStore
構造方法與屬性成員
代碼#28.1 - o.a.s.storage.DiskStore類的構造方法與屬性成員
private[spark] class DiskStore(
conf: SparkConf,
diskManager: DiskBlockManager,
securityManager: SecurityManager) extends Logging {
private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
Int.MaxValue.toString)
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
// ......
}
DiskStore接受3個構造方法參數(shù)客峭,分別是SparkConf豫领、DiskBlockManager和SecurityManager的實例,其中SecurityManager用于提供對數(shù)據(jù)加密的支持舔琅。3個屬性字段的含義如下:
- minMemoryMapBytes:使用內存映射(memory map)讀取文件的最小閾值等恐,由配置項spark.storage.memoryMapThreshold指定,默認值2M。當磁盤中的文件大小超過該值時课蔬,就不會直接讀取囱稽,而用內存映射文件來讀取,提高效率二跋。
- maxMemoryMapBytes:使用內存映射讀取文件的最大閾值战惊,由配置項spark.storage.memoryMapLimitForTests指定。它是個測試參數(shù)扎即,默認值為不限制吞获。
- blockSizes:維護塊ID與其對應大小之間的映射關系的ConcurrentHashMap。
寫入塊
寫入塊的邏輯由put()方法來實現(xiàn)谚鄙。
代碼#28.2 - o.a.s.storage.DiskStore.put()/contains()方法
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
writeFunc(out)
blockSizes.put(blockId, out.getCount)
threwException = false
} finally {
try {
out.close()
} catch {
case ioe: IOException =>
if (!threwException) {
threwException = true
throw ioe
}
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
put()方法首先調用contains()方法檢查塊是否已經(jīng)以文件的形式寫入了各拷,只有沒有寫入才會繼續(xù)操作。然后襟锐,調用DiskBlockManager.getFile()方法打開塊ID對應的文件撤逢,然后獲取該文件的WritableByteChannel(NIO中的寫通道,表示可以通過調用write()方法向文件寫入數(shù)據(jù))粮坞。最后蚊荣,調用參數(shù)中傳入的writeFunc函數(shù),操作WritableByteChannel將數(shù)據(jù)寫入莫杈,并將塊ID與其對應的字節(jié)數(shù)加入blockSizes映射互例。
接下來看一看代碼#28.2中調用的openForWrite()方法。
代碼#28.3 - o.a.s.storage.DiskStore.openForWrite()方法
private def openForWrite(file: File): WritableByteChannel = {
val out = new FileOutputStream(file).getChannel()
try {
securityManager.getIOEncryptionKey().map { key =>
CryptoStreamUtils.createWritableChannel(out, conf, key)
}.getOrElse(out)
} catch {
case e: Exception =>
Closeables.close(out, true)
file.delete()
throw e
}
}
可見筝闹,該方法就是通過文件對象構造了文件輸出流FileOutputStream媳叨,然后獲取它對應的Channel對象用于寫數(shù)據(jù)。特別地关顷,如果I/O需要加密糊秆,就需要另外調用CryptoStreamUtils.createWritableChannel()方法包裝,本文就不涉及了议双。至于CountingWritableChannel痘番,也只是基于WritableByteChannel接口擴展出來的一個簡單類,增加了統(tǒng)計字節(jié)數(shù)的方法平痰,代碼也就不再列出汞舱。
寫入字節(jié)
代碼#28.4 - o.a.s.storage.DiskStore.putBytes()方法
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { channel =>
bytes.writeFully(channel)
}
}
可見,該方法除了塊ID外宗雇,還需要傳入封裝在ChunkedByteBuffer中的數(shù)據(jù)昂芜。調用上述put()方法時,傳入的writeFunc函數(shù)調用了ChunkedByteBuffer.writeFully()方法赔蒲,負責將數(shù)據(jù)以一定的Chunk大小分塊寫入WritableByteChannel泌神。
讀取字節(jié)
代碼#28.5 - o.a.s.storage.DiskStore.getBytes()方法
def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)
securityManager.getIOEncryptionKey() match {
case Some(key) =>
new EncryptedBlockData(file, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
}
這段代碼很簡單良漱,但可以注意到,在加密環(huán)境下和非加密環(huán)境下返回的結果是不同的腻扇,前者是EncryptedBlockData對象债热,后者是DiskBlockData對象,而它們都是BlockData的子類幼苛。顧名思義窒篱,BlockData就是對磁盤塊數(shù)據(jù)的具體封裝,下面選擇最常見的DiskBlockData來看一看舶沿。
磁盤塊數(shù)據(jù)DiskBlockData
這個類是定義在DiskStore下方的私有類墙杯,比較短,因此直接全貼在下面括荡。
代碼#28.6 - o.a.s.storage.DiskBlockData類
private class DiskBlockData(
minMemoryMapBytes: Long,
maxMemoryMapBytes: Long,
file: File,
blockSize: Long) extends BlockData {
override def toInputStream(): InputStream = new FileInputStream(file)
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxMemoryMapBytes)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
override def toByteBuffer(): ByteBuffer = {
require(blockSize < maxMemoryMapBytes,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}
override def size: Long = blockSize
override def dispose(): Unit = {}
private def open() = new FileInputStream(file).getChannel
}
很久之前也已經(jīng)大概說過高镐,BlockData特征只是定義了塊數(shù)據(jù)的轉化方式,具體的細節(jié)則留給各個實現(xiàn)類畸冲。我們具體看看toChunkedByteBuffer()和toByteBuffer()這兩個方法嫉髓。
轉化為ChunkedByteBuffer
Utils.tryWithResource()方法實際上就是Java中try-with-resources的Scala實現(xiàn),因為Scala中并沒有這個語法糖邑闲。
toChunkedByteBuffer()方法會將文件轉化為輸入流FileInputStream算行,并獲取其ReadableFileChannel,再調用JavaUtils.readFully()方法將從Channel中取得的數(shù)據(jù)填充到ByteBuffer中苫耸。每個ByteBuffer即為一個Chunk州邢,所有Chunk的數(shù)組形成最終的ChunkedByteBuffer乳绕。關于ChunkedByteBuffer在文章#21簡要提到過烙博,之后會很快寫一篇番外文章專門講解它,因為有點意思飞涂。
轉化為ByteBuffer
toByteBuffer()方法會檢查塊大小是否小于spark.storage.memoryMapThreshold(終于出現(xiàn)了)嫌褪。如果小于的話呀枢,就會采用與toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之笼痛,就調用ReadableFileChannel.map()方法將數(shù)據(jù)映射到MappedByteBuffer中硫狞,即進程的虛擬內存中。不過晃痴,考慮到內存映射的應用場景的話,2MB的閾值可能有點胁坪觥(保守)了倘核,一點碎碎念,請勿在意即彪。
總結
本文研究了Spark磁盤存儲類DiskStore的具體實現(xiàn)紧唱,主要是寫入塊/字節(jié)以及讀取字節(jié)的方法活尊。另外,DiskStore讀取的字節(jié)會用BlockData來封裝漏益,因此也順便了解了一下DiskBlockData的一點細節(jié)蛹锰。