Spark向量化讀取Parquet文件源碼

原文:https://animeshtrivedi.github.io/spark-parquet-reading

Spark 如何讀取Parquet文件

Apache Parquet 是一種流行的列式存儲(chǔ)格式,它把數(shù)據(jù)存儲(chǔ)為一堆文件盼理。
Spark讀取parquet依賴以下API:

val parquetFileDF = spark.read.parquet("test.parquet")

test.parquet文件格式為<int, Array[Byte]>。

關(guān)鍵對(duì)象

在 Spark SQL 中,各種操作都在各自的類中實(shí)現(xiàn),其名稱都以Exec作為后綴甥材。

1.DataSourceScanExec類掌管的是對(duì)數(shù)據(jù)源的讀取实撒。讀取Parquet文件的相關(guān)代碼從這里開(kāi)始,在ParquetFileFormat類中結(jié)束仍侥。

2.ParquetFileFormat中有一個(gè)buildReader函數(shù),返回一個(gè)(PartitionedFile => Iterator[InternalRow])鸳君。此函數(shù)中生成了一個(gè)迭代器:

val iter = new RecordReaderIterator(parquetReader)

這里parquetReader是一個(gè)VectorizedParquetRecordReader农渊。RecordReaderIterator包裝了一個(gè)scala迭代器,以Hadoop RecordReader<K,V>風(fēng)格或颊。它由 VectorizedParquetRecordReader(及其基類 SpecificParquetRecordReaderBase<Object>)實(shí)現(xiàn)砸紊。

  1. VectorizedParquetRecordReader做了什么?根據(jù)文件中的comment:一個(gè)專門(mén)的RecordReader囱挑,直接使用Parquet column API 讀入InternalRows或ColumnarBatches醉顽,基于parquet-mr的ColumnReader。VectorizedParquetRecordReader 對(duì)象分配后平挑,調(diào)用initialize(split, hadoopAttemptContext)函數(shù)和initBatch(partitionSchema, file.partitionValues)函數(shù)游添。
  • initialize調(diào)用父類SpecificParquetRecordReaderBase的initialize函數(shù)。在這個(gè)函數(shù)中通熄,會(huì)讀取文件schema唆涝,推斷請(qǐng)求的schema,并且實(shí)例化一個(gè)ParquetFileReader的讀取器唇辨。在initialize結(jié)束時(shí)廊酣,我們知道讀取的InputFileSplit中有多少行,這存儲(chǔ)在totalRawCount變量中助泽。
  • initBatch主要工作是分配columnarBatch對(duì)象啰扛,后面會(huì)詳細(xì)討論嚎京。

4.VectorizedParquetRecordReader 中 RecordReader 接口的實(shí)現(xiàn)需要多加關(guān)注,它在使用步驟 2 中的迭代器時(shí)調(diào)用的是什么隐解?在調(diào)用nextKeyValue()時(shí)鞍帝,該函數(shù)首先調(diào)用了resultBatch(),然后調(diào)用nextBatch()煞茫。請(qǐng)記住帕涌,我們總是在Batch Mode下操作(returnColumnarBatch 設(shè)置為 true),nextBatch用數(shù)據(jù)填充columnarBatch续徽,且這個(gè)變量會(huì)在getCurrentValue函數(shù)中返回蚓曼。getCurrentKey 在 SpecificParquetRecordReaderBase 的基類中實(shí)現(xiàn),且始終返回null钦扭。

現(xiàn)在纫版,我們知道了迭代器中返回了什么變量。從這開(kāi)始有兩個(gè)方向客情,首先我們描述ColumnarBatch是怎么被Parquet數(shù)據(jù)填充其弊。然后我們描述誰(shuí)使用了步驟2中生成的iter迭代器。

ColumnarBatch 如何被填充膀斋?

在 VectorizedParquetRecordReader.nextBatch() 函數(shù)中梭伐,如果尚未讀取所有行,則調(diào)用 checkEndOfRowGroup() 函數(shù)仰担。然后糊识,checkEndOfRowGroup 函數(shù)讀取一個(gè)rowGroup(可以將rowGroup視為以列格式存儲(chǔ)的一定數(shù)量行的集合),然后為requestedSchema 中的每個(gè)請(qǐng)求列分配一個(gè)VectorizedColumnReader 對(duì)象摔蓝。VectorizedColumnReader 構(gòu)造函數(shù)接受一個(gè) ColumnDescriptor(可以在schema中找到)和一個(gè) PageReader(可以從 rowGroup 中找到赂苗,一個(gè) Parquet API 調(diào)用)。
另外贮尉,missingColumns是確實(shí)列的一個(gè)bitmap(可能是缺失的列或 Spark 不打算讀取的列)哑梳。然后,在nextBatch中調(diào)用readBatch(num, columnarBatch.column(i))绘盟,會(huì)在之前checkEndOfRowGroup(基本上是每列)函數(shù)分配的所有VectorizedColumnReader對(duì)象上調(diào)用。(因此悯仙,ColumnarBatch 和 ColumnVector 只是 VectorizedColumnReader 使用的原始內(nèi)存)龄毡。所以在 readBatch 中,傳遞了行數(shù)和 ColumnVector(存儲(chǔ)在 ColumnarBatch 中)锡垄。什么是ColumnVector沦零?我們可以將其視為一個(gè)類型數(shù)組,由 rowId 索引货岭。

/**
 * An interface representing in-memory columnar data in Spark. This interface defines the main APIs
 * to access the data, as well as their batched versions. The batched versions are considered to be
 * faster and preferable whenever possible.
 *
 * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
 * in this ColumnVector.
 *
 * Spark only calls specific `get` method according to the data type of this {@link ColumnVector},
 * e.g. if it's int type, Spark is guaranteed to only call {@link #getInt(int)} or
 * {@link #getInts(int, int)}.
 *
 * ColumnVector supports all the data types including nested types. To handle nested types,
 * ColumnVector can have children and is a tree structure. Please refer to {@link #getStruct(int)},
 * {@link #getArray(int)} and {@link #getMap(int)} for the details about how to implement nested
 * types.
 *
 * ColumnVector is expected to be reused during the entire data loading process, to avoid allocating
 * memory again and again.
 *
 * ColumnVector is meant to maximize CPU efficiency but not to minimize storage footprint.
 * Implementations should prefer computing efficiency over storage efficiency when design the
 * format. Since it is expected to reuse the ColumnVector instance while loading data, the storage
 * footprint is negligible.
 */
@Evolving
public abstract class ColumnVector implements AutoCloseable {

總之路操,原始數(shù)據(jù)存儲(chǔ)在 ColumnVector 中疾渴,ColumnVector 本身存儲(chǔ)在 ColumnBatch 對(duì)象中。ColumnVector 是在 readBatch 函數(shù)中作為存儲(chǔ)空間傳遞的屯仗。 在 readBatch 函數(shù)內(nèi)部搞坝,它首先調(diào)用 readPage() 函數(shù),該函數(shù)查看我們正在讀取哪個(gè)版本的 parquet 文件(v1 或 v2魁袜,我不知道區(qū)別)桩撮,然后初始化一堆對(duì)象,即 defColumn: VectorizedRleValuesReader峰弹、replicationLevelColumn:ValuesReaderIntIterator店量、definitionLevelColumn:ValuesReaderIntIterator 和 dataColumn:VectorizedRleValuesReader。這些變量中的 ValuesReaderIntIterator 來(lái)自 parquet-mr鞠呈,而 VectorizedRleValuesReader 來(lái)自 Spark融师。接下來(lái),有一堆 read[Type]Batch() 函數(shù)被調(diào)用蚁吝,這些函數(shù)又調(diào)用 defColumn.read[Type]s() 函數(shù)旱爆。 (這里的 [Type] 是一些類型,如 Int灭将、Short疼鸟、Binary 等)。 在 VectorizedRleValuesReader 上的這些函數(shù)中庙曙,數(shù)據(jù)被讀取空镜、解碼(可能來(lái)自 RLE),然后插入到此處傳遞的 ColumnVector 中捌朴。

Scala[ColumnBatch] 迭代器在哪里被消費(fèi)吴攒?

迭代器根據(jù) reader 是否處于批處理模式返回兩種不同的類型,code如下:

  @Override
  public Object getCurrentValue() {
    if (returnColumnarBatch) return columnarBatch;
    return columnarBatch.getRow(batchIdx - 1);
  }

其中砂蔽,columnarBatch的類型是ColumnarBatch洼怔,columnarBatch.getRow 返回一個(gè) ColumnarBatch.Row 類型的嵌套類。這個(gè)迭代器以某種方式傳遞給wholestage code generation左驾。消費(fèi)這個(gè)迭代器并且實(shí)例化UnsafeRow的code示例如下:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator scan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */   private long scan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */   private int scan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */   private UnsafeRow scan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     scan_input = inputs[0];
/* 028 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */     scan_scanTime1 = 0;
/* 031 */     scan_batch = null;
/* 032 */     scan_batchIdx = 0;
/* 033 */     scan_colInstance0 = null;
/* 034 */     scan_colInstance1 = null;
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   private void scan_nextBatch() throws java.io.IOException {
/* 042 */     long getBatchStart = System.nanoTime();
/* 043 */     if (scan_input.hasNext()) {
/* 044 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */       scan_numOutputRows.add(scan_batch.numRows());
/* 046 */       scan_batchIdx = 0;
/* 047 */       scan_colInstance0 = scan_batch.column(0);
/* 048 */       scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */     }
/* 051 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */   }
/* 053 */
/* 054 */   protected void processNext() throws java.io.IOException {
/* 055 */     if (scan_batch == null) {
/* 056 */       scan_nextBatch();
/* 057 */     }
/* 058 */     while (scan_batch != null) {
/* 059 */       int numRows = scan_batch.numRows();
/* 060 */       while (scan_batchIdx < numRows) {
/* 061 */         int scan_rowIdx = scan_batchIdx++;
/* 062 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */         int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */         boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */         byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */         scan_holder.reset();
/* 067 */
/* 068 */         scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */         if (scan_isNull) {
/* 071 */           scan_rowWriter.setNullAt(0);
/* 072 */         } else {
/* 073 */           scan_rowWriter.write(0, scan_value);
/* 074 */         }
/* 075 */
/* 076 */         if (scan_isNull1) {
/* 077 */           scan_rowWriter.setNullAt(1);
/* 078 */         } else {
/* 079 */           scan_rowWriter.write(1, scan_value1);
/* 080 */         }
/* 081 */         scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */         append(scan_result);
/* 083 */         if (shouldStop()) return;
/* 084 */       }
/* 085 */       scan_batch = null;
/* 086 */       scan_nextBatch();
/* 087 */     }
/* 088 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */     scan_scanTime1 = 0;
/* 090 */   }
/* 091 */ }

在scan_nextBatch方法中镣隶,我們通過(guò)調(diào)用next()讀取一個(gè)新的ColumnarBatch。然后我們獲取ColumnVectors對(duì)象(變量 scan_colInstance0/scan_colInstance1)诡右。通過(guò)numRows()方法安岂,我們可以得到ColumnarBatch的行數(shù),通過(guò)調(diào)用ColumnVector對(duì)象的get[Type](rowId: Int)獲取最終的值帆吻。
這些值在BufferHolder和UnsafeRowWriter對(duì)象的幫助下表示為UnsafeRow:

/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末域那,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子猜煮,更是在濱河造成了極大的恐慌次员,老刑警劉巖败许,帶你破解...
    沈念sama閱讀 212,686評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異淑蔚,居然都是意外死亡市殷,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,668評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)束倍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)被丧,“玉大人,你說(shuō)我怎么就攤上這事绪妹∩穑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,160評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵邮旷,是天一觀的道長(zhǎng)黄选。 經(jīng)常有香客問(wèn)我,道長(zhǎng)婶肩,這世上最難降的妖魔是什么办陷? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,736評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮律歼,結(jié)果婚禮上民镜,老公的妹妹穿的比我還像新娘。我一直安慰自己险毁,他們只是感情好制圈,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,847評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著畔况,像睡著了一般鲸鹦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上跷跪,一...
    開(kāi)封第一講書(shū)人閱讀 50,043評(píng)論 1 291
  • 那天馋嗜,我揣著相機(jī)與錄音,去河邊找鬼吵瞻。 笑死葛菇,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的橡羞。 我是一名探鬼主播熟呛,決...
    沈念sama閱讀 39,129評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼尉姨!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起吗冤,我...
    開(kāi)封第一講書(shū)人閱讀 37,872評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤又厉,失蹤者是張志新(化名)和其女友劉穎九府,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體覆致,經(jīng)...
    沈念sama閱讀 44,318評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡侄旬,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,645評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了煌妈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片儡羔。...
    茶點(diǎn)故事閱讀 38,777評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖璧诵,靈堂內(nèi)的尸體忽然破棺而出汰蜘,到底是詐尸還是另有隱情,我是刑警寧澤之宿,帶...
    沈念sama閱讀 34,470評(píng)論 4 333
  • 正文 年R本政府宣布族操,位于F島的核電站,受9級(jí)特大地震影響比被,放射性物質(zhì)發(fā)生泄漏色难。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,126評(píng)論 3 317
  • 文/蒙蒙 一等缀、第九天 我趴在偏房一處隱蔽的房頂上張望枷莉。 院中可真熱鬧,春花似錦尺迂、人聲如沸笤妙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,861評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)危喉。三九已至,卻和暖如春州疾,著一層夾襖步出監(jiān)牢的瞬間辜限,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,095評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工严蓖, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留薄嫡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,589評(píng)論 2 362
  • 正文 我出身青樓颗胡,卻偏偏與公主長(zhǎng)得像毫深,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子毒姨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,687評(píng)論 2 351