Spark Core源碼精讀計劃#28:磁盤存儲DiskStore

目錄

前言

在上一篇文章中杆查,我們認識了Spark管理磁盤塊的組件DiskBlockManager扮惦,本文接著來看真正負責磁盤存儲的組件DiskStore,以及與它相關的BlockData亲桦。這部分內容會涉及到一點與Java NIO相關的東西崖蜜,看官需要稍微注意一下。

磁盤存儲DiskStore

構造方法與屬性成員

代碼#28.1 - o.a.s.storage.DiskStore類的構造方法與屬性成員

private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging {

  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
  private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
    Int.MaxValue.toString)
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()

  // ......
}

DiskStore接受3個構造方法參數(shù)客峭,分別是SparkConf豫领、DiskBlockManager和SecurityManager的實例,其中SecurityManager用于提供對數(shù)據(jù)加密的支持舔琅。3個屬性字段的含義如下:

  • minMemoryMapBytes:使用內存映射(memory map)讀取文件的最小閾值等恐,由配置項spark.storage.memoryMapThreshold指定,默認值2M。當磁盤中的文件大小超過該值時课蔬,就不會直接讀取囱稽,而用內存映射文件來讀取,提高效率二跋。
  • maxMemoryMapBytes:使用內存映射讀取文件的最大閾值战惊,由配置項spark.storage.memoryMapLimitForTests指定。它是個測試參數(shù)扎即,默認值為不限制吞获。
  • blockSizes:維護塊ID與其對應大小之間的映射關系的ConcurrentHashMap。

寫入塊

寫入塊的邏輯由put()方法來實現(xiàn)谚鄙。

代碼#28.2 - o.a.s.storage.DiskStore.put()/contains()方法

  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val out = new CountingWritableChannel(openForWrite(file))
    var threwException: Boolean = true
    try {
      writeFunc(out)
      blockSizes.put(blockId, out.getCount)
      threwException = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: IOException =>
          if (!threwException) {
            threwException = true
            throw ioe
          }
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

  def contains(blockId: BlockId): Boolean = {
    val file = diskManager.getFile(blockId.name)
    file.exists()
  }

put()方法首先調用contains()方法檢查塊是否已經(jīng)以文件的形式寫入了各拷,只有沒有寫入才會繼續(xù)操作。然后襟锐,調用DiskBlockManager.getFile()方法打開塊ID對應的文件撤逢,然后獲取該文件的WritableByteChannel(NIO中的寫通道,表示可以通過調用write()方法向文件寫入數(shù)據(jù))粮坞。最后蚊荣,調用參數(shù)中傳入的writeFunc函數(shù),操作WritableByteChannel將數(shù)據(jù)寫入莫杈,并將塊ID與其對應的字節(jié)數(shù)加入blockSizes映射互例。

接下來看一看代碼#28.2中調用的openForWrite()方法。

代碼#28.3 - o.a.s.storage.DiskStore.openForWrite()方法

  private def openForWrite(file: File): WritableByteChannel = {
    val out = new FileOutputStream(file).getChannel()
    try {
      securityManager.getIOEncryptionKey().map { key =>
        CryptoStreamUtils.createWritableChannel(out, conf, key)
      }.getOrElse(out)
    } catch {
      case e: Exception =>
        Closeables.close(out, true)
        file.delete()
        throw e
    }
  }

可見筝闹,該方法就是通過文件對象構造了文件輸出流FileOutputStream媳叨,然后獲取它對應的Channel對象用于寫數(shù)據(jù)。特別地关顷,如果I/O需要加密糊秆,就需要另外調用CryptoStreamUtils.createWritableChannel()方法包裝,本文就不涉及了议双。至于CountingWritableChannel痘番,也只是基于WritableByteChannel接口擴展出來的一個簡單類,增加了統(tǒng)計字節(jié)數(shù)的方法平痰,代碼也就不再列出汞舱。

寫入字節(jié)

代碼#28.4 - o.a.s.storage.DiskStore.putBytes()方法

  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    put(blockId) { channel =>
      bytes.writeFully(channel)
    }
  }

可見,該方法除了塊ID外宗雇,還需要傳入封裝在ChunkedByteBuffer中的數(shù)據(jù)昂芜。調用上述put()方法時,傳入的writeFunc函數(shù)調用了ChunkedByteBuffer.writeFully()方法赔蒲,負責將數(shù)據(jù)以一定的Chunk大小分塊寫入WritableByteChannel泌神。

讀取字節(jié)

代碼#28.5 - o.a.s.storage.DiskStore.getBytes()方法

  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        new EncryptedBlockData(file, blockSize, conf, key)
      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

這段代碼很簡單良漱,但可以注意到,在加密環(huán)境下和非加密環(huán)境下返回的結果是不同的腻扇,前者是EncryptedBlockData對象债热,后者是DiskBlockData對象,而它們都是BlockData的子類幼苛。顧名思義窒篱,BlockData就是對磁盤塊數(shù)據(jù)的具體封裝,下面選擇最常見的DiskBlockData來看一看舶沿。

磁盤塊數(shù)據(jù)DiskBlockData

這個類是定義在DiskStore下方的私有類墙杯,比較短,因此直接全貼在下面括荡。

代碼#28.6 - o.a.s.storage.DiskBlockData類

private class DiskBlockData(
    minMemoryMapBytes: Long,
    maxMemoryMapBytes: Long,
    file: File,
    blockSize: Long) extends BlockData {
  override def toInputStream(): InputStream = new FileInputStream(file)

  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
      var remaining = blockSize
      val chunks = new ListBuffer[ByteBuffer]()
      while (remaining > 0) {
        val chunkSize = math.min(remaining, maxMemoryMapBytes)
        val chunk = allocator(chunkSize.toInt)
        remaining -= chunkSize
        JavaUtils.readFully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new ChunkedByteBuffer(chunks.toArray)
    }
  }

  override def toByteBuffer(): ByteBuffer = {
    require(blockSize < maxMemoryMapBytes,
      s"can't create a byte buffer of size $blockSize" +
      s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
    Utils.tryWithResource(open()) { channel =>
      if (blockSize < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(blockSize.toInt)
        JavaUtils.readFully(channel, buf)
        buf.flip()
        buf
      } else {
        channel.map(MapMode.READ_ONLY, 0, file.length)
      }
    }
  }

  override def size: Long = blockSize

  override def dispose(): Unit = {}

  private def open() = new FileInputStream(file).getChannel
}

很久之前也已經(jīng)大概說過高镐,BlockData特征只是定義了塊數(shù)據(jù)的轉化方式,具體的細節(jié)則留給各個實現(xiàn)類畸冲。我們具體看看toChunkedByteBuffer()和toByteBuffer()這兩個方法嫉髓。

轉化為ChunkedByteBuffer

Utils.tryWithResource()方法實際上就是Java中try-with-resources的Scala實現(xiàn),因為Scala中并沒有這個語法糖邑闲。

toChunkedByteBuffer()方法會將文件轉化為輸入流FileInputStream算行,并獲取其ReadableFileChannel,再調用JavaUtils.readFully()方法將從Channel中取得的數(shù)據(jù)填充到ByteBuffer中苫耸。每個ByteBuffer即為一個Chunk州邢,所有Chunk的數(shù)組形成最終的ChunkedByteBuffer乳绕。關于ChunkedByteBuffer在文章#21簡要提到過烙博,之后會很快寫一篇番外文章專門講解它,因為有點意思飞涂。

轉化為ByteBuffer

toByteBuffer()方法會檢查塊大小是否小于spark.storage.memoryMapThreshold(終于出現(xiàn)了)嫌褪。如果小于的話呀枢,就會采用與toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之笼痛,就調用ReadableFileChannel.map()方法將數(shù)據(jù)映射到MappedByteBuffer中硫狞,即進程的虛擬內存中。不過晃痴,考慮到內存映射的應用場景的話,2MB的閾值可能有點胁坪觥(保守)了倘核,一點碎碎念,請勿在意即彪。

總結

本文研究了Spark磁盤存儲類DiskStore的具體實現(xiàn)紧唱,主要是寫入塊/字節(jié)以及讀取字節(jié)的方法活尊。另外,DiskStore讀取的字節(jié)會用BlockData來封裝漏益,因此也順便了解了一下DiskBlockData的一點細節(jié)蛹锰。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市绰疤,隨后出現(xiàn)的幾起案子铜犬,更是在濱河造成了極大的恐慌,老刑警劉巖轻庆,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件癣猾,死亡現(xiàn)場離奇詭異,居然都是意外死亡余爆,警方通過查閱死者的電腦和手機纷宇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蛾方,“玉大人像捶,你說我怎么就攤上這事∽椋” “怎么了拓春?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長五芝。 經(jīng)常有香客問我痘儡,道長,這世上最難降的妖魔是什么枢步? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任沉删,我火速辦了婚禮,結果婚禮上醉途,老公的妹妹穿的比我還像新娘矾瑰。我一直安慰自己,他們只是感情好隘擎,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布殴穴。 她就那樣靜靜地躺著,像睡著了一般货葬。 火紅的嫁衣襯著肌膚如雪采幌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天震桶,我揣著相機與錄音休傍,去河邊找鬼。 笑死蹲姐,一個胖子當著我的面吹牛磨取,可吹牛的內容都是我干的人柿。 我是一名探鬼主播,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼忙厌,長吁一口氣:“原來是場噩夢啊……” “哼凫岖!你這毒婦竟也來了?” 一聲冷哼從身側響起逢净,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤哥放,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后汹胃,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體婶芭,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年着饥,在試婚紗的時候發(fā)現(xiàn)自己被綠了犀农。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡宰掉,死狀恐怖呵哨,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情轨奄,我是刑警寧澤孟害,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站挪拟,受9級特大地震影響挨务,放射性物質發(fā)生泄漏。R本人自食惡果不足惜玉组,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一谎柄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧惯雳,春花似錦朝巫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至潮孽,卻和暖如春揪荣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背往史。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工变逃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人怠堪。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓揽乱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粟矿。 傳聞我的和親對象是個殘疾皇子凰棉,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351