一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定
客戶端對map階段并行度的規(guī)劃基本邏輯為:
一、將待處理的文件進(jìn)行邏輯切片(根據(jù)處理數(shù)據(jù)文件的大小篡悟,劃分多個(gè)split),然后每一個(gè)split分配一個(gè)maptask并行處理實(shí)例
二、具體切片規(guī)劃是由FileInputFormat實(shí)現(xiàn)類的getSplits()方法完成
切分規(guī)則如下:
1.簡單地按照文件的內(nèi)容長度進(jìn)行切片
2.切片大小默認(rèn)是datanode的切塊大小128M
3.切片時(shí)不是考慮一個(gè)整體數(shù)據(jù)集纺座,而是針對每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件:
file1.txt 200M
file2.txt 50M
經(jīng)過FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
file1.txt.split1– 0~128M —–maptask
file1.txt.split2– 128M~200M —–maptask
file2.txt.split1– 0~50M —–maptask
三貌嫡、如何改變切片大斜茸ぁ(參數(shù)設(shè)置)
源碼是通過這個(gè)方法來規(guī)劃切片大小的
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
minsize:默認(rèn)值:1;配置參數(shù): mapreduce.input.fileinputformat.split.minsize
maxsize:默認(rèn)值:Long.MAXValue岛抄; 配置參數(shù):mapreduce.input.fileinputformat.split.maxsize
blocksize:hdfs切片大小
調(diào)整切片大小結(jié)論:
maxsize(切片最大值):
參數(shù)如果調(diào)得比blocksize小别惦,則會(huì)讓切片變小,而且就等于配置的這個(gè)參數(shù)的值
minsize (切片最小值):
參數(shù)調(diào)的比blockSize大夫椭,則可以讓切片變得比blocksize還大
控制map個(gè)數(shù)的核心源碼
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//getFormatMinSplitSize 默認(rèn)返回1掸掸,getMinSplitSize 為用戶設(shè)置的最小分片數(shù), 如果用戶設(shè)置的大于1,則為用戶設(shè)置的最小分片數(shù)
long maxSize = getMaxSplitSize(job);
//getMaxSplitSize為用戶設(shè)置的最大分片數(shù)蹭秋,默認(rèn)最大為long 9223372036854775807L
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
由上述代碼可以看出在
maxSize默認(rèn)等于long(長整形)
blockSize默認(rèn)在hadoop2.0之后為128M
minSize默認(rèn)等于1
因此默認(rèn)的切片大小splitSize等于128M也就是說等于塊大小
一個(gè)切片對應(yīng)于一個(gè)map任務(wù)扰付,因此在默認(rèn)情況下一個(gè)塊對應(yīng)于一個(gè)map任務(wù)。
要想人為控制map的個(gè)數(shù)可以從minSize和MaxSize入手仁讨。
想要增加map的個(gè)數(shù)羽莺,可以將maxSize調(diào)整小于blockSize;想要減小map的個(gè)數(shù)洞豁,可以調(diào)整minSize>blockSize盐固。
具體調(diào)整可以在job配置中增加如下配置
FileInputFormat.setMinInputSplitSize(job, 301349250);//設(shè)置minSize
FileInputFormat.setMaxInputSplitSize(job, 10000);//設(shè)置maxSize
在實(shí)驗(yàn)中,
測試 文件大小 297M(311349250)
塊大小128M
測試代碼
FileInputFormat.setMinInputSplitSize(job, 301349250);
FileInputFormat.setMaxInputSplitSize(job, 10000);
測試后Map個(gè)數(shù)為1丈挟,由上面分片公式算出分片大小為301349250, 比 311349250小刁卜, 理論應(yīng)該為兩個(gè)map, 這是為什么呢?在上源碼
while (bytesRemaining / splitSize > 1.1D) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
可以看出只要剩余的文件大小不超過分片大小的1.1倍曙咽, 則會(huì)分到一個(gè)分片中蛔趴,避免開兩個(gè)MAP, 其中一個(gè)運(yùn)行數(shù)據(jù)太小例朱,浪費(fèi)資源孝情。
總結(jié),分片過程大概為洒嗤,先遍歷目標(biāo)文件咧叭,過濾部分不符合要求的文件, 然后添加到列表烁竭,然后按照文件名來切分分片 (大小為前面計(jì)算分片大小的公式, 最后有個(gè)文件尾可能合并菲茬,其實(shí)常寫網(wǎng)絡(luò)程序的都知道), 然后添加到分片列表派撕,然后每個(gè)分片讀取自身對應(yīng)的部分給MAP處理