hadoop-distcp源碼篇

0.背景介紹

~~~~DistCp(Distributed Copy)是Apache Hadoop自帶的工具搀突,目前存在兩個版本乏屯,DistCp1和DistCp2病梢。它是用于大規(guī)模集群內(nèi)部或者集群之間的高性能拷貝工具蜓陌。 它使用Map/Reduce實現(xiàn)文件分發(fā)护奈,錯誤處理和恢復,以及報告生成痴奏。 它把文件和目錄的列表作為map任務(wù)的輸入读拆,每個任務(wù)會完成源列表中部分文件的拷貝檐晕。
~~~~DistCp第一版使用了MapReduce并發(fā)拷貝數(shù)據(jù)辟灰,它將整個數(shù)據(jù)拷貝過程轉(zhuǎn)化為一個map-only Job以加快拷貝速度芥喇。由于DistCp本質(zhì)上是一個MapReduce作業(yè)继控,它需要保證文件中各個block的有序性武通,因此它的最小數(shù)據(jù)切分粒度是文件,也就是說尾菇,一個文件不能被切分成不同部分讓多個任務(wù)并行拷貝错沽,最小只能做到一個文件交給一個任務(wù)千埃。

1.使用樣例

1.1參數(shù)介紹

標識 描述 備注
-i 忽略失敗 這個選項會比默認情況提供關(guān)于拷貝的更精確的統(tǒng)計放可, 同時它還將保留失敗拷貝操作的日志,這些日志信息可以用于調(diào)試拾氓。最后咙鞍,如果一個map失敗了续滋,但并沒完成所有分塊任務(wù)的嘗試疲酌,這不會導致整個作業(yè)的失敗。
-log <logdir> 記錄日志到 <logdir> DistCp為每個文件的每次嘗試拷貝操作都記錄日志载绿,并把日志作為map的輸出卢鹦。 如果一個map失敗了,當重新執(zhí)行時這個日志不會被保留揉稚。
-m <num_maps> 同時拷貝的最大數(shù)目 指定了拷貝數(shù)據(jù)時map的數(shù)目余境,默認是20灌诅。請注意并不是map數(shù)越多吞吐量越大。
-overwrite 覆蓋目標 如果一個map失敗并且沒有使用-i選項即舌,不僅僅那些拷貝失敗的文件顽聂,這個分塊任務(wù)中的所有文件都會被重新拷貝紊搪。
-update 如果源和目標的大小不一樣則進行覆蓋 像之前提到的全景,這不是"同步"操作滞伟。 執(zhí)行覆蓋的唯一標準是源文件和目標文件大小是否相同诗良;如果不同鉴裹,則源文件替換目標文件督禽。
-strategy {dynamic/uniformsize} 選擇要在DistCp中使用的復制策略狈惫。 默認情況下胧谈,使用uniformsize(每個map分配均衡的文件數(shù)量)菱肖。如果指定了“dynamic”稳强,則使用DynamicInputFormat(動態(tài)進行map承擔文件數(shù)量的分配,速度快的map分配的文件數(shù)量更多)褒繁。
... ... ...

1.2文件復制

hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo

將nn1上面的foo/bar文件復制導nn2的bar/foo路徑下面澜汤。

1.3文件夾復制

hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target

將nn1上面文件夾source/first和source/second下面的文件復制到nn2的target文件夾下面俊抵。

2.源碼分析

2.1源碼宏觀流程

distcp宏觀流程圖.jpg

2.2源碼細節(jié)分析

2.2.1細節(jié)主要的調(diào)用圖

distcp.jpg

2.2.2步驟分析

  • shell入口
    進入hadoop文件
......
elif [ "$COMMAND" = "distcp" ] ; then
      CLASS=org.apache.hadoop.tools.DistCp
      CLASSPATH=${CLASSPATH}:${TOOL_PATH}
......
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"

export CLASSPATH=$CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
;;
......
  • distcp的main方法
    Distcp是一個Tool、ToolRunner應(yīng)用谎替,Tool應(yīng)用要求實現(xiàn)run方法钱贯。進入main方法主要有構(gòu)造器構(gòu)造distcp類秩命;建立Cleanup,并加入ShutdownHookManager袄友;最后運行ToolRunner.run執(zhí)行復制功能剧蚣。
public class DistCp extends Configured implements Tool {
    ......
    public int run(String[] argv) {
        ......
    }
    ......
    ......
    public static void main(String argv[]) {
        System.out.println("test");
        int exitCode;
        try {
          DistCp distCp = new DistCp();
          Cleanup CLEANUP = new Cleanup(distCp);
    
          ShutdownHookManager.get().addShutdownHook(CLEANUP,
            SHUTDOWN_HOOK_PRIORITY);
          exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
        }
        catch (Exception e) {
          LOG.error("Couldn't complete DistCp operation: ", e);
          exitCode = DistCpConstants.UNKNOWN_ERROR;
        }
        System.exit(exitCode);
      }
    ......
}
  • run方法
    OptionsParser類是distcp單獨實現(xiàn)的參數(shù)解析工具類鸠按。將輸入?yún)?shù)解析成DistCpOptions inputOptions類型。如常見的參數(shù)overwrite = false等等饶碘。程序中相關(guān)參數(shù)都有特定的默認值待诅,比如map數(shù)量的默認值是20,分配策略類的默認方式是均衡分配等等熊镣。
    setTargetPathExists():從參數(shù)中解析出目標路徑。
    execute():核心執(zhí)行方法募书。
@Override
  public int run(String[] argv) {
    if (argv.length < 1) {
      OptionsParser.usage();
      return DistCpConstants.INVALID_ARGUMENT;
    }
    
    try {
      inputOptions = (OptionsParser.parse(argv));
      setTargetPathExists();
      LOG.info("Input Options: " + inputOptions);
    } catch (Throwable e) {
      LOG.error("Invalid arguments: ", e);
      System.err.println("Invalid arguments: " + e.getMessage());
      OptionsParser.usage();      
      return DistCpConstants.INVALID_ARGUMENT;
    }
    
    try {
      execute();
    } catch (InvalidInputException e) {
      LOG.error("Invalid input: ", e);
      return DistCpConstants.INVALID_ARGUMENT;
    } catch (DuplicateFileException e) {
      LOG.error("Duplicate files in input path: ", e);
      return DistCpConstants.DUPLICATE_INPUT;
    } catch (AclsNotSupportedException e) {
      LOG.error("ACLs not supported on at least one file system: ", e);
      return DistCpConstants.ACLS_NOT_SUPPORTED;
    } catch (XAttrsNotSupportedException e) {
      LOG.error("XAttrs not supported on at least one file system: ", e);
      return DistCpConstants.XATTRS_NOT_SUPPORTED;
    } catch (Exception e) {
      LOG.error("Exception encountered ", e);
      return DistCpConstants.UNKNOWN_ERROR;
    }
    return DistCpConstants.SUCCESS;
  }
  • execute()
    在execute()方法中绪囱,會調(diào)用createAndSubmitJob()創(chuàng)建MR任務(wù),準備數(shù)據(jù)莹捡,設(shè)定數(shù)據(jù)輸入格式,并把任務(wù)提交到hadoop集群運行,最后等待任務(wù)執(zhí)行完畢。于是我們可以看到,主體功能實現(xiàn)就在createAndSubmitJob()這個函數(shù)體里,工程中其它的各個類無非都是為這個函數(shù)接口服務(wù)的。
public Job execute() throws Exception {
    Job job = createAndSubmitJob();

    if (inputOptions.shouldBlock()) {
      waitForJobCompletion(job);
    }
    return job;
  }
public Job createAndSubmitJob() throws Exception {
    assert inputOptions != null;
    assert getConf() != null;
    Job job = null;
    try {
      synchronized(this) {
        //Don't cleanup while we are setting up.
        metaFolder = createMetaFolderPath();
        jobFS = metaFolder.getFileSystem(getConf());
        job = createJob();
      }
      if (inputOptions.shouldUseDiff()) {
        if (!DistCpSync.sync(inputOptions, getConf())) {
          inputOptions.disableUsingDiff();
        }
      }
      createInputFileListing(job);

      job.submit();
      submitted = true;
    } finally {
      if (!submitted) {
        cleanup();
      }
    }

    String jobID = job.getJobID().toString();
    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
    LOG.info("DistCp job-id: " + jobID);

    return job;
  }
  • metaFolder
    一個Path類型费尽。其存放著distcp工具需要的元數(shù)據(jù)信息,也就是所有需要拷貝的源目錄/文件信息列表缘缚。這些數(shù)據(jù)在一個fileList.seq文件中以Key/Value結(jié)構(gòu)進行保存齐媒,其中Key是源文件的Text格式的相對路徑望蜡,而Value則記錄源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息状您,這里FileStatus是hadoop已經(jīng)封裝好了的描述HDFS文件信息的類,metafolder目錄中的fileList.seq最終會作為參數(shù)傳遞給MR任務(wù)中的Mapper魁淳。
  private Path createMetaFolderPath() throws Exception {
    Configuration configuration = getConf();
    Path stagingDir = JobSubmissionFiles.getStagingDir(
            new Cluster(configuration), configuration);
    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
    if (LOG.isDebugEnabled())
      LOG.debug("Meta folder location: " + metaFolderPath);
    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
    return metaFolderPath;
  }
  • job = createJob()
    創(chuàng)建MR job的地方。
  private Job createJob() throws IOException {
    String jobName = "distcp";
    String userChosenName = getConf().get(JobContext.JOB_NAME);
    if (userChosenName != null)
      jobName += ": " + userChosenName;
    Job job = Job.getInstance(getConf());
    job.setJobName(jobName);
    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
    job.setJarByClass(CopyMapper.class);
    configureOutputFormat(job);

    job.setMapperClass(CopyMapper.class);
    job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(CopyOutputFormat.class);
    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
    job.getConfiguration().set(JobContext.NUM_MAPS,
                  String.valueOf(inputOptions.getMaxMaps()));

    if (inputOptions.getSslConfigurationFile() != null) {
      setupSSLConfig(job);
    }

    inputOptions.appendToConf(job.getConfiguration());
    return job;
  }

重點看這兩行代碼,job.setInputFormatClass主要是設(shè)定job中任務(wù)的分配策略,分為UniformSizeInputFormat和DynamicInputFormat兩種,UniformSizeInputFormat表示均衡分配任務(wù),也就是設(shè)定的map中挑辆,每個map分配同樣的任務(wù)數(shù)魁亦,DynamicInputFormat表示動態(tài)分為任務(wù)書利术,也就是動態(tài)的根據(jù)每個map運行的速度來分為具體map的任務(wù)數(shù);job.setMapperClass主要設(shè)定map任務(wù)的具體邏輯肠虽。

job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setMapperClass(CopyMapper.class)
  • CopyMapper類的源碼
    拷貝工作實際做的地方韩玩,主要看setup()合愈、map()方法
public void setup(Context context) throws IOException, InterruptedException {
    conf = context.getConfiguration();

    syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
    ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
    skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
    overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
    append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
    preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
        PRESERVE_STATUS.getConfigLabel()));

    targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
    Path targetFinalPath = new Path(conf.get(
            DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
    targetFS = targetFinalPath.getFileSystem(conf);

    if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
      overWrite = true; // When target is an existing file, overwrite it.
    }

    if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
      initializeSSLConf(context);
    }
  }
  public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
          Context context) throws IOException, InterruptedException {
    Path sourcePath = sourceFileStatus.getPath();

    if (LOG.isDebugEnabled())
      LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);

    Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
                          targetFS.getWorkingDirectory()) + relPath.toString());

    EnumSet<DistCpOptions.FileAttribute> fileAttributes
            = getFileAttributeSettings(context);
    final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
        DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);

    final String description = "Copying " + sourcePath + " to " + target;
    context.setStatus(description);

    LOG.info(description);

    try {
      CopyListingFileStatus sourceCurrStatus;
      FileSystem sourceFS;
      try {
        sourceFS = sourcePath.getFileSystem(conf);
        final boolean preserveXAttrs =
            fileAttributes.contains(FileAttribute.XATTR);
        sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
          sourceFS.getFileStatus(sourcePath),
          fileAttributes.contains(FileAttribute.ACL), 
          preserveXAttrs, preserveRawXattrs);
      } catch (FileNotFoundException e) {
        throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
      }

      FileStatus targetStatus = null;

      try {
        targetStatus = targetFS.getFileStatus(target);
      } catch (FileNotFoundException ignore) {
        if (LOG.isDebugEnabled())
          LOG.debug("Path could not be found: " + target, ignore);
      }

      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
        throw new IOException("Can't replace " + target + ". Target is " +
            getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
      }

      if (sourceCurrStatus.isDirectory()) {
        createTargetDirsWithRetry(description, target, context);
        return;
      }

      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
      if (action == FileAction.SKIP) {
        LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                 + " to " + target);
        updateSkipCounters(context, sourceCurrStatus);
        context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
      } else {
        copyFileWithRetry(description, sourceCurrStatus, target, context,
            action, fileAttributes);
      }

      DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
          fileAttributes, preserveRawXattrs);
    } catch (IOException exception) {
      handleFailures(exception, sourceFileStatus, target, context);
    }
  }
  • copyFileWithRetry
    拷貝動作,這個函數(shù)最底層調(diào)用的是常用的Java輸入輸出流的方式,以此方式來完成點對點拷貝。即copyToFile里面的copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。
private void copyFileWithRetry(String description,
      FileStatus sourceFileStatus, Path target, Context context,
      FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
      throws IOException {
    long bytesCopied;
    try {
      bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
          action).execute(sourceFileStatus, target, context, fileAttributes);
    } catch (Exception e) {
      context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
      throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
          " --> " + target, e);
    }
    incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
    incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
    incrementCounter(context, Counter.COPY, 1);
  }

至此多搀,拷貝的job任務(wù)完成設(shè)定、提交以及執(zhí)行灾部,也就意味著distcp命令執(zhí)行完成康铭,即實現(xiàn)了跨集群的文件拷貝。

文本資源來自于互聯(lián)網(wǎng)和書本整理赌髓,僅供學習从藤,有侵權(quán)聯(lián)系刪除。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末锁蠕,一起剝皮案震驚了整個濱河市夷野,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌匿沛,老刑警劉巖扫责,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件榛鼎,死亡現(xiàn)場離奇詭異逃呼,居然都是意外死亡,警方通過查閱死者的電腦和手機者娱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門抡笼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人黄鳍,你說我怎么就攤上這事推姻。” “怎么了框沟?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵藏古,是天一觀的道長增炭。 經(jīng)常有香客問我,道長拧晕,這世上最難降的妖魔是什么隙姿? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮厂捞,結(jié)果婚禮上输玷,老公的妹妹穿的比我還像新娘。我一直安慰自己靡馁,他們只是感情好欲鹏,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著臭墨,像睡著了一般赔嚎。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上裙犹,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天尽狠,我揣著相機與錄音,去河邊找鬼叶圃。 笑死袄膏,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的掺冠。 我是一名探鬼主播沉馆,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼德崭!你這毒婦竟也來了斥黑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤眉厨,失蹤者是張志新(化名)和其女友劉穎锌奴,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體憾股,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡鹿蜀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了服球。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茴恰。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖斩熊,靈堂內(nèi)的尸體忽然破棺而出往枣,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布分冈,位于F島的核電站圾另,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏雕沉。R本人自食惡果不足惜盯捌,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蘑秽。 院中可真熱鬧饺著,春花似錦、人聲如沸肠牲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缀雳。三九已至渡嚣,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肥印,已是汗流浹背识椰。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留深碱,地道東北人腹鹉。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像敷硅,于是被迫代替她去往敵國和親功咒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 先思考問題 我們處在一個大數(shù)據(jù)的時代已經(jīng)是不爭的事實绞蹦,這主要表現(xiàn)在數(shù)據(jù)源多且大力奋,如互聯(lián)網(wǎng)數(shù)據(jù),人們也認識到數(shù)據(jù)里往...
    墻角兒的花閱讀 7,359評論 0 9
  • Zookeeper用于集群主備切換幽七。 YARN讓集群具備更好的擴展性景殷。 Spark沒有存儲能力。 Spark的Ma...
    Yobhel閱讀 7,267評論 0 34
  • Hadoop MapReduce 整個MR的過程可以分解為下面幾步 讀取數(shù)據(jù) Map reduce output ...
    流浪山人閱讀 601評論 0 1
  • 為什么要有Hadoop澡屡? 從計算機誕生到現(xiàn)今猿挚,積累了海量的數(shù)據(jù),這些海量的數(shù)據(jù)有結(jié)構(gòu)化挪蹭、半結(jié)構(gòu)化亭饵、非 結(jié)構(gòu)的數(shù)據(jù)...
    _Levi__閱讀 753評論 1 0
  • Hadoop部署方式 本地模式 偽分布模式(在一臺機器中模擬休偶,讓所有進程在一臺機器上運行) 集群模式 服務(wù)器只是一...
    陳半仙兒閱讀 1,609評論 0 9