一個job的map階段并行度由客戶端在提交job時決定
客戶端對map階段并行度的規(guī)劃基本邏輯為:
一官疲、將待處理的文件進行邏輯切片(根據(jù)處理數(shù)據(jù)文件的大小途凫,劃分多個split)溢吻,然后每一個split分配一個maptask并行處理實例
二、具體切片規(guī)劃是由FileInputFormat實現(xiàn)類的getSplits()方法完成
切分規(guī)則如下:
1.簡單地按照文件的內(nèi)容長度進行切片
2.切片大小默認是datanode的切塊大小128M
3.切片時不是考慮一個整體數(shù)據(jù)集犀盟,而是針對每一個文件單獨切片
比如待處理數(shù)據(jù)有兩個文件:
file1.txt 200M
file2.txt 50M
經(jīng)過FileInputFormat的切片機制運算后阅畴,形成的切片信息如下:
file1.txt.split1– 0~128M —–maptask
file1.txt.split2– 128M~200M —–maptask
file2.txt.split1– 0~50M —–maptask
三迅耘、如何改變切片大小(參數(shù)設(shè)置)
源碼是通過這個方法來規(guī)劃切片大小的
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
minsize:默認值:1纽哥;配置參數(shù): mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue春塌; 配置參數(shù):mapreduce.input.fileinputformat.split.maxsize
blocksize:hdfs切片大小
調(diào)整切片大小結(jié)論:
maxsize(切片最大值):
參數(shù)如果調(diào)得比blocksize小累魔,則會讓切片變小,而且就等于配置的這個參數(shù)的值
minsize (切片最小值):
參數(shù)調(diào)的比blockSize大吕世,則可以讓切片變得比blocksize還大
控制map個數(shù)的核心源碼
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//getFormatMinSplitSize 默認返回1梯投,getMinSplitSize 為用戶設(shè)置的最小分片數(shù), 如果用戶設(shè)置的大于1,則為用戶設(shè)置的最小分片數(shù)
long maxSize = getMaxSplitSize(job);
//getMaxSplitSize為用戶設(shè)置的最大分片數(shù)尔艇,默認最大為long 9223372036854775807L
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
由上述代碼可以看出在
maxSize默認等于long(長整形)
blockSize默認在hadoop2.0之后為128M
minSize默認等于1
因此默認的切片大小splitSize等于128M也就是說等于塊大小
一個切片對應于一個map任務终娃,因此在默認情況下一個塊對應于一個map任務。
要想人為控制map的個數(shù)可以從minSize和MaxSize入手余佛。
想要增加map的個數(shù)窍荧,可以將maxSize調(diào)整小于blockSize;想要減小map的個數(shù)蕊退,可以調(diào)整minSize>blockSize瓤荔。
具體調(diào)整可以在job配置中增加如下配置
FileInputFormat.setMinInputSplitSize(job, 301349250);//設(shè)置minSize
FileInputFormat.setMaxInputSplitSize(job, 10000);//設(shè)置maxSize
在實驗中准潭,
測試 文件大小 297M(311349250)
塊大小128M
測試代碼
FileInputFormat.setMinInputSplitSize(job, 301349250);
FileInputFormat.setMaxInputSplitSize(job, 10000);
測試后Map個數(shù)為1腔丧,由上面分片公式算出分片大小為301349250, 比 311349250小愉粤, 理論應該為兩個map, 這是為什么呢?在上源碼
while (bytesRemaining / splitSize > 1.1D) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
可以看出只要剩余的文件大小不超過分片大小的1.1倍如蚜, 則會分到一個分片中影暴,避免開兩個MAP, 其中一個運行數(shù)據(jù)太小撬呢,浪費資源妆兑。
總結(jié),分片過程大概為芯勘,先遍歷目標文件荷愕,過濾部分不符合要求的文件, 然后添加到列表狈癞,然后按照文件名來切分分片 (大小為前面計算分片大小的公式, 最后有個文件尾可能合并茂契,其實常寫網(wǎng)絡(luò)程序的都知道)慨绳, 然后添加到分片列表脐雪,然后每個分片讀取自身對應的部分給MAP處理