1. 定義
1.1 block
block是物理塊跨扮,文件存放到HDFS上后,會將大文件按照每塊128MB的大小切分验毡,存放到不同的DataNode上衡创。
1.2 split
split是邏輯上的分片,在MapReduce中Map開始之前晶通,會將輸入文件按照指定大小切分為多個小片璃氢,每一部分對應(yīng)一個Map Task,默認(rèn)split的大小與block的大小相同狮辽,為128MB一也。
2. 參數(shù)設(shè)置
2.1 block默認(rèn)配置在hdfs-site.xml中【$HADOOP_HOME/share/hadoop/hdfs/hadoop-hdfs-x.y.z.jar
】
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>
默認(rèn)block的大小參數(shù)配置以字節(jié)為單位(例如134217728,128 MB)喉脖。
也可以使用如128k椰苟,512m,1g等為單位(不區(qū)分大小寫)树叽。
</description>
</property>
2.2 split大小由minSize
舆蝴、maxSize
、blockSize
決定
- long minSize = 1
在FileInputFormat.getSplits
方法中题诵,minSize的賦值:// 取getFormatMinSplitSize()洁仗、getMinSplitSize(job)的最大值 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); protected long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); }
- long maxSize = Long.MAX_VALUE
在FileInputFormat.getSplits
方法中,maxSize的賦值:long maxSize = getMaxSplitSize(job); public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); } public long getLong(String name, long defaultValue) { // 用戶自定義了參數(shù) String valueString = getTrimmed(name); if (valueString == null) // 返回默認(rèn)值Long.MAX_VALUE return defaultValue; String hexString = getHexDigits(valueString); if (hexString != null) { return Long.parseLong(hexString, 16); } return Long.parseLong(valueString); }
- long blockSize = 128MB
2.3 結(jié)論
在FileInputFormat.getSplits
方法中對文件進(jìn)行了Split:
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
image.png
split與block的對應(yīng)關(guān)系可以是多對一性锭,默認(rèn)一對一:
- 如果blockSize < maxSize && blockSize < minSize赠潦,那么split就是blockSize【一對一】
- 如果blockSize < maxSize && blockSize > minSize,那么split就是minSize
- 如果blockSize > maxSize && maxSize > minSize草冈,那么split就是maxSize【多對一】
如果blockSize > maxSize && maxSize < minSize祭椰,那么split就是minSize
3. 分片主要源碼
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
// 分片最小值臭家,默認(rèn)為 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 分片最大值,默認(rèn)為 LONG.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// 生成分片列表
List<InputSplit> splits = new ArrayList<InputSplit>();
// 列出文件狀態(tài)
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
// 獲得文件路徑和大小
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
// 獲得block塊的位置信息
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判斷文件是否可以分片
if (isSplitable(job, path)) {
// 獲得文件blockSize大小方淤,默認(rèn)128MB
long blockSize = file.getBlockSize();
// 獲得split大小钉赁,默認(rèn)128MB
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 文件剩余大小
long bytesRemaining = length;
// 當(dāng)剩余大小大于split大小的1.1倍時,進(jìn)行分片
// private static final double SPLIT_SLOP = 1.1;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 獲得block塊的索引位置
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 分片
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// 原文件減去已經(jīng)分片大小
bytesRemaining -= splitSize;
}
// 判斷文件是否已經(jīng)完成分片携茂,如果還有剩余你踩,則將剩余部分作為一個分片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
// 分片,封裝分片信息讳苦,文件路徑带膜,分片起始位置,分片大小鸳谜,對應(yīng)block塊所在的hosts列表膝藕,對應(yīng)block塊緩存副本所在的hosts列表
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
寫在最后
上面說到的,當(dāng)剩余大小大于split大小的1.1倍時咐扭,進(jìn)行分片
private static final double SPLIT_SLOP = 1.1;
我還沒有想出問什么是1.1倍芭挽,我猜想是為了減少一些分片數(shù)量,比如這種情況蝗肪?
這種