HDFS balance策略詳解

【前言】線上長(zhǎng)時(shí)間運(yùn)行的大規(guī)模Hadoop集群陆赋,各個(gè)datanode節(jié)點(diǎn)磁盤空間使用率經(jīng)常會(huì)出現(xiàn)分布不均衡的情況撞鹉,尤其在新增和下架節(jié)點(diǎn)痊银、或者人為干預(yù)副本數(shù)量的時(shí)候哩簿。節(jié)點(diǎn)空間使用率不均勻會(huì)導(dǎo)致計(jì)算引擎頻繁在跨節(jié)點(diǎn)拷貝數(shù)據(jù)(A節(jié)點(diǎn)上運(yùn)行的task所需數(shù)據(jù)在其它節(jié)點(diǎn)上),引起不必要的耗時(shí)和帶寬躏升。另外辩棒,當(dāng)部分節(jié)點(diǎn)空間使用率很高但未滿(90%左右)時(shí),分配在該節(jié)點(diǎn)上的task會(huì)存在任務(wù)失敗的風(fēng)險(xiǎn)膨疏。因此一睁,引入balance策略使集群中的節(jié)點(diǎn)空間使用率均勻分布必不可少。

一. balancer命令詳解

hdfs --config /hadoop-client/conf balancer
-threshold  10                    \\集群平衡的條件佃却,datanode間磁盤使用率相差閾值者吁,區(qū)間選擇:0~100
-policy datanode                  \\默認(rèn)為datanode,datanode級(jí)別的平衡策略
-exclude  -f  /tmp/ip1.txt        \\默認(rèn)為空双霍,指定該部分ip不參與balance砚偶, -f:指定輸入為文件
-include  -f  /tmp/ip2.txt        \\默認(rèn)為空,只允許該部分ip參與balance洒闸,-f:指定輸入為文件
 -idleiterations  5               \\迭代次數(shù)染坯,默認(rèn)為 5

hdfs balance時(shí)datanode之間數(shù)據(jù)遷移的帶寬設(shè)置(/hadoop-client/conf/hdfs-site.xml, 修改需重啟hdfs):

<property>
    <name>dfs.datanode.balance.bandwidthPerSec</name>
    <value>6250000</value>
</property>
<備注:6250000 / (1024 * 1024) = 6M/s>

動(dòng)態(tài)增大帶寬(不需重啟,需要切換到hdfs用戶丘逸,不可設(shè)置太大单鹿,會(huì)占用mapreduce任務(wù)的帶寬):

hdfs dfsadmin -fs hdfs://${active-namenode-hostname}:8020 -setBalancerBandwidth 104857600

balance腳本在滿足以下任何一個(gè)條件都會(huì)自動(dòng)退出:

 * The cluster is balanced;
 * No block can be moved;
 * No block has been moved for specified consecutive iterations (5 by default);
 * An IOException occurs while communicating with the namenode;
 * Another balancer is running.

二. 源碼解析

源碼路徑:org.apache.hadoop.hdfs.server.balancer

統(tǒng)計(jì)需要balance的datanode:

  private boolean shouldIgnore(DatanodeInfo dn) {
    // ignore decommissioned nodes (忽略已經(jīng)下架的datanode)
    final boolean decommissioned = dn.isDecommissioned();
    // ignore decommissioning nodes(忽略正在下架的datanode)
    final boolean decommissioning = dn.isDecommissionInProgress();
    // ignore nodes in exclude list (忽略參數(shù):-exclude配置的datanode)
    final boolean excluded = Util.isExcluded(excludedNodes, dn);
    // ignore nodes not in the include list (if include list is not empty)
    // (如果參數(shù):-include不為空,忽略不在include列表里的datanode)
    final boolean notIncluded = !Util.isIncluded(includedNodes, dn);

    if (decommissioned || decommissioning || excluded || notIncluded) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Excluding datanode " + dn
            + ": decommissioned=" + decommissioned
            + ", decommissioning=" + decommissioning
            + ", excluded=" + excluded
            + ", notIncluded=" + notIncluded);
      }
      return true;
    }
    return false;
  }

集群平均使用率(計(jì)算公式):average = totalUsedSpaces * 100 / totalCapacities
totalUsedSpaces:各datanode已使用空間(dfsUsed深纲,不包含non dfsUsed)相加仲锄;
totalCapacities:各datanode總空間(DataNode配置的服務(wù)器磁盤目錄)相加;

  void initAvgUtilization() {
    for(StorageType t : StorageType.asList()) {
      final long capacity = totalCapacities.get(t);
      if (capacity > 0L) {
        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
        avgUtilizations.set(t, avg);
      }
    }
  }

單個(gè)datanode使用率:utilization = dfsUsed * 100.0 / capacity
dfsUsed:當(dāng)前datanode dfs(dfsUsed湃鹊,不包含non dfsUsed)已使用空間儒喊;
capacity:當(dāng)前datanode(DataNode配置的服務(wù)器磁盤目錄)總空間;

    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
      long capacity = 0L;
      long dfsUsed = 0L;
      for(StorageReport s : r.getStorageReports()) {
        if (s.getStorage().getStorageType() == t) {
          capacity += s.getCapacity();
          dfsUsed += s.getDfsUsed();
        }
      }
      return capacity == 0L? null: dfsUsed*100.0/capacity;
    }

單個(gè)datanode使用率與集群平均使用率差值:utilizationDiff = utilization - average
單個(gè)datanode utilizationDiff與閾值的差值: thresholdDiff = |utilizationDiff| - threshold

需要遷移或者可以遷入的空間:maxSize2Move = |utilizationDiff| * capacity

可以遷入的空間計(jì)算:Math.min(remaining, maxSizeToMove)
需要遷移的空間計(jì)算:Math.min(max, maxSizeToMove)
remaining:datanode節(jié)點(diǎn)剩余空間
max:默認(rèn)單個(gè)datanode單次balance迭代可以遷移的最大空間限制币呵,缺省10G)
默認(rèn)迭代次數(shù)為5怀愧,即運(yùn)行一次balance腳本,單個(gè)datanode可以最大遷移的空間為:5*10G = 50G

    for(DatanodeStorageReport r : reports) {
      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
      final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
      for(StorageType t : StorageType.getMovableTypes()) {
        final Double utilization = policy.getUtilization(r, t);
        if (utilization == null) { // datanode does not have such storage type 
          continue;
        }
        
        final double average = policy.getAvgUtilization(t);
        if (utilization >= average && !isSource) {
          LOG.info(dn + "[" + t + "] has utilization=" + utilization
              + " >= average=" + average
              + " but it is not specified as a source; skipping it.");
          continue;
        }

        final double utilizationDiff = utilization - average;
        final long capacity = getCapacity(r, t);
        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
        final long maxSize2Move = computeMaxSize2Move(capacity,
            getRemaining(r, t), utilizationDiff, maxSizeToMove);

        final StorageGroup g;
        if (utilizationDiff > 0) {
          final Source s = dn.addSource(t, maxSize2Move, dispatcher);
          if (thresholdDiff <= 0) { // within threshold
            aboveAvgUtilized.add(s);
          } else {
            overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
            overUtilized.add(s);
          }
          g = s;
        } else {
          g = dn.addTarget(t, maxSize2Move);
          if (thresholdDiff <= 0) { // within threshold
            belowAvgUtilized.add(g);
          } else {
            underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
            underUtilized.add(g);
          }
        }
        dispatcher.getStorageGroupMap().put(g);
      }

差值判斷后datanode的保存隊(duì)列:

overUtilized:utilizationDiff > 0 && thresholdDiff > 0        <使用率超過(guò)平均值余赢,且差值大于閾值>
aboveAvgUtilized:utilizationDiff > 0 && thresholdDiff <= 0   <使用率超過(guò)平均值芯义,且差值小于等于閾值>
belowAvgUtilized:utilizationDiff < 0 && thresholdDiff <= 0   <使用率低于平均值,且差值小于等于閾值>
underUtilized:utilizationDiff > 0 && thresholdDiff > 0       <使用率低于平均值妻柒,且差值大于等于閾值>

需要遷移數(shù)據(jù)的節(jié)點(diǎn)(Source類型): overUtilized, aboveAvgUtilized
能夠遷入數(shù)據(jù)的節(jié)點(diǎn)(Target類型): underUtilized, belowAvgUtilized

數(shù)據(jù)遷移配對(duì)(原則:1. 優(yōu)先為同機(jī)架扛拨,其次為其它機(jī)架; 2. 一對(duì)多配對(duì)):
第一步[Source -> Target]:each overUtilized datanode => one or more underUtilized datanodes
第二步[Source -> Target]:match each remaining overutilized datanode => one or more belowAvgUtilized datanodes
第三步[Target -> Source]:each remaining underutilized datanode (step 1未和overUtilized匹配過(guò)) => one or more aboveAvgUtilized datanodes

  /** Decide all <source, target> pairs according to the matcher. */
  private void chooseStorageGroups(final Matcher matcher) {
    /* first step: match each overUtilized datanode (source) to
     * one or more underUtilized datanodes (targets).
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
    chooseStorageGroups(overUtilized, underUtilized, matcher);
    
    /* match each remaining overutilized datanode (source) to 
     * below average utilized datanodes (targets).
     * Note only overutilized datanodes that haven't had that max bytes to move
     * satisfied in step 1 are selected
     */
    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
    chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);

    /* match each remaining underutilized datanode (target) to 
     * above average utilized datanodes (source).
     * Note only underutilized datanodes that have not had that max bytes to
     * move satisfied in step 1 are selected.
     */
    LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
    chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
  }

構(gòu)建每一對(duì)<source, target>時(shí),需要計(jì)算當(dāng)前可以遷移或者遷入的空間大小举塔。
dispatcher創(chuàng)建dispatchExecutor線程池執(zhí)行數(shù)據(jù)遷移調(diào)度绑警。

  private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
    final Task task = new Task(target, size);
    source.addTask(task);
    target.incScheduledSize(task.getSize());
    dispatcher.add(source, target);
    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
        + source.getDisplayName() + " to " + target.getDisplayName());
  }

【結(jié)語(yǔ)】
1. 對(duì)于一些大型的HDFS集群(隨時(shí)可能擴(kuò)容或下架服務(wù)器),balance腳本需要作為后臺(tái)常駐進(jìn)程啤贩;
2. 根據(jù)官方建議待秃,腳本需要部署在相對(duì)空閑的服務(wù)器上;
3. 停止腳本通過(guò)kill進(jìn)程實(shí)現(xiàn)(建議不kill痹屹,后臺(tái)運(yùn)行完會(huì)自動(dòng)停止章郁,多次執(zhí)行同時(shí)也只會(huì)有一個(gè)線程存在,其它自動(dòng)失斨狙堋)暖庄;

針對(duì)datanode存儲(chǔ)維護(hù),可以針對(duì)以下幾個(gè)方向進(jìn)行優(yōu)化:
* 通過(guò)參數(shù)(threshold)增加迭代次數(shù)楼肪,以增加datanode允許遷移的數(shù)據(jù)培廓;   
* 通過(guò)參數(shù)(exclude, include)設(shè)計(jì)合理的允許進(jìn)行balance策略的服務(wù)器,比如將使用率最低(20%)和最高(20%)的進(jìn)行balance策略;
* 通過(guò)參數(shù)(threshold )設(shè)計(jì)合理的閾值;
<備注:理想狀態(tài)能夠通過(guò)程序自動(dòng)發(fā)現(xiàn)調(diào)整參數(shù)春叫,無(wú)需人為介入>

博客主頁(yè):http://www.reibang.com/u/e97bb429f278

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末肩钠,一起剝皮案震驚了整個(gè)濱河市泣港,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌价匠,老刑警劉巖当纱,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異踩窖,居然都是意外死亡坡氯,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門洋腮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)箫柳,“玉大人,你說(shuō)我怎么就攤上這事啥供∶趸校” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵伙狐,是天一觀的道長(zhǎng)坪稽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)鳞骤,這世上最難降的妖魔是什么窒百? 我笑而不...
    開封第一講書人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮豫尽,結(jié)果婚禮上篙梢,老公的妹妹穿的比我還像新娘。我一直安慰自己美旧,他們只是感情好渤滞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著榴嗅,像睡著了一般妄呕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嗽测,一...
    開封第一講書人閱讀 51,370評(píng)論 1 302
  • 那天绪励,我揣著相機(jī)與錄音,去河邊找鬼唠粥。 笑死疏魏,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晤愧。 我是一名探鬼主播大莫,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼官份!你這毒婦竟也來(lái)了只厘?” 一聲冷哼從身側(cè)響起烙丛,我...
    開封第一講書人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎羔味,沒想到半個(gè)月后蜀变,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡介评,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了爬舰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片们陆。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖情屹,靈堂內(nèi)的尸體忽然破棺而出坪仇,到底是詐尸還是另有隱情,我是刑警寧澤垃你,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布椅文,位于F島的核電站,受9級(jí)特大地震影響惜颇,放射性物質(zhì)發(fā)生泄漏皆刺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一凌摄、第九天 我趴在偏房一處隱蔽的房頂上張望羡蛾。 院中可真熱鬧,春花似錦锨亏、人聲如沸痴怨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)浪藻。三九已至,卻和暖如春乾翔,著一層夾襖步出監(jiān)牢的瞬間爱葵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工反浓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留钧惧,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓勾习,卻偏偏與公主長(zhǎng)得像浓瞪,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子巧婶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354