0.背景介紹
DistCp(Distributed Copy)是Apache Hadoop自帶的工具搀突,目前存在兩個版本乏屯,DistCp1和DistCp2病梢。它是用于大規(guī)模集群內(nèi)部或者集群之間的高性能拷貝工具蜓陌。 它使用Map/Reduce實現(xiàn)文件分發(fā)护奈,錯誤處理和恢復,以及報告生成痴奏。 它把文件和目錄的列表作為map任務(wù)的輸入读拆,每個任務(wù)會完成源列表中部分文件的拷貝檐晕。
DistCp第一版使用了MapReduce并發(fā)拷貝數(shù)據(jù)辟灰,它將整個數(shù)據(jù)拷貝過程轉(zhuǎn)化為一個map-only Job以加快拷貝速度芥喇。由于DistCp本質(zhì)上是一個MapReduce作業(yè)继控,它需要保證文件中各個block的有序性武通,因此它的最小數(shù)據(jù)切分粒度是文件,也就是說尾菇,一個文件不能被切分成不同部分讓多個任務(wù)并行拷貝错沽,最小只能做到一個文件交給一個任務(wù)千埃。
1.使用樣例
1.1參數(shù)介紹
標識 | 描述 | 備注 |
---|---|---|
-i | 忽略失敗 | 這個選項會比默認情況提供關(guān)于拷貝的更精確的統(tǒng)計放可, 同時它還將保留失敗拷貝操作的日志,這些日志信息可以用于調(diào)試拾氓。最后咙鞍,如果一個map失敗了续滋,但并沒完成所有分塊任務(wù)的嘗試疲酌,這不會導致整個作業(yè)的失敗。 |
-log <logdir> | 記錄日志到 <logdir> | DistCp為每個文件的每次嘗試拷貝操作都記錄日志载绿,并把日志作為map的輸出卢鹦。 如果一個map失敗了,當重新執(zhí)行時這個日志不會被保留揉稚。 |
-m <num_maps> | 同時拷貝的最大數(shù)目 | 指定了拷貝數(shù)據(jù)時map的數(shù)目余境,默認是20灌诅。請注意并不是map數(shù)越多吞吐量越大。 |
-overwrite | 覆蓋目標 | 如果一個map失敗并且沒有使用-i選項即舌,不僅僅那些拷貝失敗的文件顽聂,這個分塊任務(wù)中的所有文件都會被重新拷貝紊搪。 |
-update | 如果源和目標的大小不一樣則進行覆蓋 | 像之前提到的全景,這不是"同步"操作滞伟。 執(zhí)行覆蓋的唯一標準是源文件和目標文件大小是否相同诗良;如果不同鉴裹,則源文件替換目標文件督禽。 |
-strategy {dynamic/uniformsize} | 選擇要在DistCp中使用的復制策略狈惫。 | 默認情況下胧谈,使用uniformsize(每個map分配均衡的文件數(shù)量)菱肖。如果指定了“dynamic”稳强,則使用DynamicInputFormat(動態(tài)進行map承擔文件數(shù)量的分配,速度快的map分配的文件數(shù)量更多)褒繁。 |
... | ... | ... |
1.2文件復制
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo
將nn1上面的foo/bar文件復制導nn2的bar/foo路徑下面澜汤。
1.3文件夾復制
hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
將nn1上面文件夾source/first和source/second下面的文件復制到nn2的target文件夾下面俊抵。
2.源碼分析
2.1源碼宏觀流程
2.2源碼細節(jié)分析
2.2.1細節(jié)主要的調(diào)用圖
2.2.2步驟分析
- shell入口
進入hadoop文件
......
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
......
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
export CLASSPATH=$CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
;;
......
- distcp的main方法
Distcp是一個Tool、ToolRunner應(yīng)用谎替,Tool應(yīng)用要求實現(xiàn)run方法钱贯。進入main方法主要有構(gòu)造器構(gòu)造distcp類秩命;建立Cleanup,并加入ShutdownHookManager袄友;最后運行ToolRunner.run執(zhí)行復制功能剧蚣。
public class DistCp extends Configured implements Tool {
......
public int run(String[] argv) {
......
}
......
......
public static void main(String argv[]) {
System.out.println("test");
int exitCode;
try {
DistCp distCp = new DistCp();
Cleanup CLEANUP = new Cleanup(distCp);
ShutdownHookManager.get().addShutdownHook(CLEANUP,
SHUTDOWN_HOOK_PRIORITY);
exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
}
catch (Exception e) {
LOG.error("Couldn't complete DistCp operation: ", e);
exitCode = DistCpConstants.UNKNOWN_ERROR;
}
System.exit(exitCode);
}
......
}
- run方法
OptionsParser類是distcp單獨實現(xiàn)的參數(shù)解析工具類鸠按。將輸入?yún)?shù)解析成DistCpOptions inputOptions類型。如常見的參數(shù)overwrite = false等等饶碘。程序中相關(guān)參數(shù)都有特定的默認值待诅,比如map數(shù)量的默認值是20,分配策略類的默認方式是均衡分配等等熊镣。
setTargetPathExists():從參數(shù)中解析出目標路徑。
execute():核心執(zhí)行方法募书。
@Override
public int run(String[] argv) {
if (argv.length < 1) {
OptionsParser.usage();
return DistCpConstants.INVALID_ARGUMENT;
}
try {
inputOptions = (OptionsParser.parse(argv));
setTargetPathExists();
LOG.info("Input Options: " + inputOptions);
} catch (Throwable e) {
LOG.error("Invalid arguments: ", e);
System.err.println("Invalid arguments: " + e.getMessage());
OptionsParser.usage();
return DistCpConstants.INVALID_ARGUMENT;
}
try {
execute();
} catch (InvalidInputException e) {
LOG.error("Invalid input: ", e);
return DistCpConstants.INVALID_ARGUMENT;
} catch (DuplicateFileException e) {
LOG.error("Duplicate files in input path: ", e);
return DistCpConstants.DUPLICATE_INPUT;
} catch (AclsNotSupportedException e) {
LOG.error("ACLs not supported on at least one file system: ", e);
return DistCpConstants.ACLS_NOT_SUPPORTED;
} catch (XAttrsNotSupportedException e) {
LOG.error("XAttrs not supported on at least one file system: ", e);
return DistCpConstants.XATTRS_NOT_SUPPORTED;
} catch (Exception e) {
LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR;
}
return DistCpConstants.SUCCESS;
}
- execute()
在execute()方法中绪囱,會調(diào)用createAndSubmitJob()創(chuàng)建MR任務(wù),準備數(shù)據(jù)莹捡,設(shè)定數(shù)據(jù)輸入格式,并把任務(wù)提交到hadoop集群運行,最后等待任務(wù)執(zhí)行完畢。于是我們可以看到,主體功能實現(xiàn)就在createAndSubmitJob()這個函數(shù)體里,工程中其它的各個類無非都是為這個函數(shù)接口服務(wù)的。
public Job execute() throws Exception {
Job job = createAndSubmitJob();
if (inputOptions.shouldBlock()) {
waitForJobCompletion(job);
}
return job;
}
public Job createAndSubmitJob() throws Exception {
assert inputOptions != null;
assert getConf() != null;
Job job = null;
try {
synchronized(this) {
//Don't cleanup while we are setting up.
metaFolder = createMetaFolderPath();
jobFS = metaFolder.getFileSystem(getConf());
job = createJob();
}
if (inputOptions.shouldUseDiff()) {
if (!DistCpSync.sync(inputOptions, getConf())) {
inputOptions.disableUsingDiff();
}
}
createInputFileListing(job);
job.submit();
submitted = true;
} finally {
if (!submitted) {
cleanup();
}
}
String jobID = job.getJobID().toString();
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
LOG.info("DistCp job-id: " + jobID);
return job;
}
- metaFolder
一個Path類型费尽。其存放著distcp工具需要的元數(shù)據(jù)信息,也就是所有需要拷貝的源目錄/文件信息列表缘缚。這些數(shù)據(jù)在一個fileList.seq文件中以Key/Value結(jié)構(gòu)進行保存齐媒,其中Key是源文件的Text格式的相對路徑望蜡,而Value則記錄源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息状您,這里FileStatus是hadoop已經(jīng)封裝好了的描述HDFS文件信息的類,metafolder目錄中的fileList.seq最終會作為參數(shù)傳遞給MR任務(wù)中的Mapper魁淳。
private Path createMetaFolderPath() throws Exception {
Configuration configuration = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(configuration), configuration);
Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
if (LOG.isDebugEnabled())
LOG.debug("Meta folder location: " + metaFolderPath);
configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
return metaFolderPath;
}
- job = createJob()
創(chuàng)建MR job的地方。
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
重點看這兩行代碼,job.setInputFormatClass主要是設(shè)定job中任務(wù)的分配策略,分為UniformSizeInputFormat和DynamicInputFormat兩種,UniformSizeInputFormat表示均衡分配任務(wù),也就是設(shè)定的map中挑辆,每個map分配同樣的任務(wù)數(shù)魁亦,DynamicInputFormat表示動態(tài)分為任務(wù)書利术,也就是動態(tài)的根據(jù)每個map運行的速度來分為具體map的任務(wù)數(shù);job.setMapperClass主要設(shè)定map任務(wù)的具體邏輯肠虽。
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setMapperClass(CopyMapper.class)
- CopyMapper類的源碼
拷貝工作實際做的地方韩玩,主要看setup()合愈、map()方法
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targetFS = targetFinalPath.getFileSystem(conf);
if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
overWrite = true; // When target is an existing file, overwrite it.
}
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
}
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
Path sourcePath = sourceFileStatus.getPath();
if (LOG.isDebugEnabled())
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
targetFS.getWorkingDirectory()) + relPath.toString());
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= getFileAttributeSettings(context);
final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
final String description = "Copying " + sourcePath + " to " + target;
context.setStatus(description);
LOG.info(description);
try {
CopyListingFileStatus sourceCurrStatus;
FileSystem sourceFS;
try {
sourceFS = sourcePath.getFileSystem(conf);
final boolean preserveXAttrs =
fileAttributes.contains(FileAttribute.XATTR);
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
sourceFS.getFileStatus(sourcePath),
fileAttributes.contains(FileAttribute.ACL),
preserveXAttrs, preserveRawXattrs);
} catch (FileNotFoundException e) {
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
}
FileStatus targetStatus = null;
try {
targetStatus = targetFS.getFileStatus(target);
} catch (FileNotFoundException ignore) {
if (LOG.isDebugEnabled())
LOG.debug("Path could not be found: " + target, ignore);
}
if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
throw new IOException("Can't replace " + target + ". Target is " +
getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
}
if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context);
return;
}
FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
if (action == FileAction.SKIP) {
LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+ " to " + target);
updateSkipCounters(context, sourceCurrStatus);
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
} else {
copyFileWithRetry(description, sourceCurrStatus, target, context,
action, fileAttributes);
}
DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
fileAttributes, preserveRawXattrs);
} catch (IOException exception) {
handleFailures(exception, sourceFileStatus, target, context);
}
}
- copyFileWithRetry
拷貝動作,這個函數(shù)最底層調(diào)用的是常用的Java輸入輸出流的方式,以此方式來完成點對點拷貝。即copyToFile里面的copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。
private void copyFileWithRetry(String description,
FileStatus sourceFileStatus, Path target, Context context,
FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
throws IOException {
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
action).execute(sourceFileStatus, target, context, fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
" --> " + target, e);
}
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
incrementCounter(context, Counter.COPY, 1);
}
至此多搀,拷貝的job任務(wù)完成設(shè)定、提交以及執(zhí)行灾部,也就意味著distcp命令執(zhí)行完成康铭,即實現(xiàn)了跨集群的文件拷貝。
文本資源來自于互聯(lián)網(wǎng)和書本整理赌髓,僅供學習从藤,有侵權(quán)聯(lián)系刪除。