【前言】線上長(zhǎng)時(shí)間運(yùn)行的大規(guī)模Hadoop集群陆赋,各個(gè)datanode節(jié)點(diǎn)磁盤空間使用率經(jīng)常會(huì)出現(xiàn)分布不均衡的情況撞鹉,尤其在新增和下架節(jié)點(diǎn)痊银、或者人為干預(yù)副本數(shù)量的時(shí)候哩簿。節(jié)點(diǎn)空間使用率不均勻會(huì)導(dǎo)致計(jì)算引擎頻繁在跨節(jié)點(diǎn)拷貝數(shù)據(jù)(A節(jié)點(diǎn)上運(yùn)行的task所需數(shù)據(jù)在其它節(jié)點(diǎn)上),引起不必要的耗時(shí)和帶寬躏升。另外辩棒,當(dāng)部分節(jié)點(diǎn)空間使用率很高但未滿(90%左右)時(shí),分配在該節(jié)點(diǎn)上的task會(huì)存在任務(wù)失敗的風(fēng)險(xiǎn)膨疏。因此一睁,引入balance策略使集群中的節(jié)點(diǎn)空間使用率均勻分布必不可少。
一. balancer命令詳解
hdfs --config /hadoop-client/conf balancer
-threshold 10 \\集群平衡的條件佃却,datanode間磁盤使用率相差閾值者吁,區(qū)間選擇:0~100
-policy datanode \\默認(rèn)為datanode,datanode級(jí)別的平衡策略
-exclude -f /tmp/ip1.txt \\默認(rèn)為空双霍,指定該部分ip不參與balance砚偶, -f:指定輸入為文件
-include -f /tmp/ip2.txt \\默認(rèn)為空,只允許該部分ip參與balance洒闸,-f:指定輸入為文件
-idleiterations 5 \\迭代次數(shù)染坯,默認(rèn)為 5
hdfs balance時(shí)datanode之間數(shù)據(jù)遷移的帶寬設(shè)置(/hadoop-client/conf/hdfs-site.xml, 修改需重啟hdfs):
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>6250000</value>
</property>
<備注:6250000 / (1024 * 1024) = 6M/s>
動(dòng)態(tài)增大帶寬(不需重啟,需要切換到hdfs用戶丘逸,不可設(shè)置太大单鹿,會(huì)占用mapreduce任務(wù)的帶寬):
hdfs dfsadmin -fs hdfs://${active-namenode-hostname}:8020 -setBalancerBandwidth 104857600
balance腳本在滿足以下任何一個(gè)條件都會(huì)自動(dòng)退出:
* The cluster is balanced;
* No block can be moved;
* No block has been moved for specified consecutive iterations (5 by default);
* An IOException occurs while communicating with the namenode;
* Another balancer is running.
二. 源碼解析
源碼路徑:org.apache.hadoop.hdfs.server.balancer
統(tǒng)計(jì)需要balance的datanode:
private boolean shouldIgnore(DatanodeInfo dn) {
// ignore decommissioned nodes (忽略已經(jīng)下架的datanode)
final boolean decommissioned = dn.isDecommissioned();
// ignore decommissioning nodes(忽略正在下架的datanode)
final boolean decommissioning = dn.isDecommissionInProgress();
// ignore nodes in exclude list (忽略參數(shù):-exclude配置的datanode)
final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty)
// (如果參數(shù):-include不為空,忽略不在include列表里的datanode)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
if (decommissioned || decommissioning || excluded || notIncluded) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn
+ ": decommissioned=" + decommissioned
+ ", decommissioning=" + decommissioning
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
}
return true;
}
return false;
}
集群平均使用率(計(jì)算公式):average = totalUsedSpaces * 100 / totalCapacities
totalUsedSpaces:各datanode已使用空間(dfsUsed深纲,不包含non dfsUsed)相加仲锄;
totalCapacities:各datanode總空間(DataNode配置的服務(wù)器磁盤目錄)相加;
void initAvgUtilization() {
for(StorageType t : StorageType.asList()) {
final long capacity = totalCapacities.get(t);
if (capacity > 0L) {
final double avg = totalUsedSpaces.get(t)*100.0/capacity;
avgUtilizations.set(t, avg);
}
}
}
單個(gè)datanode使用率:utilization = dfsUsed * 100.0 / capacity
dfsUsed:當(dāng)前datanode dfs(dfsUsed湃鹊,不包含non dfsUsed)已使用空間儒喊;
capacity:當(dāng)前datanode(DataNode配置的服務(wù)器磁盤目錄)總空間;
Double getUtilization(DatanodeStorageReport r, final StorageType t) {
long capacity = 0L;
long dfsUsed = 0L;
for(StorageReport s : r.getStorageReports()) {
if (s.getStorage().getStorageType() == t) {
capacity += s.getCapacity();
dfsUsed += s.getDfsUsed();
}
}
return capacity == 0L? null: dfsUsed*100.0/capacity;
}
單個(gè)datanode使用率與集群平均使用率差值:utilizationDiff = utilization - average
單個(gè)datanode utilizationDiff與閾值的差值: thresholdDiff = |utilizationDiff| - threshold
需要遷移或者可以遷入的空間:maxSize2Move = |utilizationDiff| * capacity
可以遷入的空間計(jì)算:Math.min(remaining, maxSizeToMove)
需要遷移的空間計(jì)算:Math.min(max, maxSizeToMove)
remaining:datanode節(jié)點(diǎn)剩余空間
max:默認(rèn)單個(gè)datanode單次balance迭代可以遷移的最大空間限制币呵,缺省10G)
默認(rèn)迭代次數(shù)為5怀愧,即運(yùn)行一次balance腳本,單個(gè)datanode可以最大遷移的空間為:5*10G = 50G
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
}
final double average = policy.getAvgUtilization(t);
if (utilization >= average && !isSource) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " >= average=" + average
+ " but it is not specified as a source; skipping it.");
continue;
}
final double utilizationDiff = utilization - average;
final long capacity = getCapacity(r, t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, maxSizeToMove);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
}
g = s;
} else {
g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
underUtilized.add(g);
}
}
dispatcher.getStorageGroupMap().put(g);
}
差值判斷后datanode的保存隊(duì)列:
overUtilized:utilizationDiff > 0 && thresholdDiff > 0 <使用率超過(guò)平均值余赢,且差值大于閾值>
aboveAvgUtilized:utilizationDiff > 0 && thresholdDiff <= 0 <使用率超過(guò)平均值芯义,且差值小于等于閾值>
belowAvgUtilized:utilizationDiff < 0 && thresholdDiff <= 0 <使用率低于平均值,且差值小于等于閾值>
underUtilized:utilizationDiff > 0 && thresholdDiff > 0 <使用率低于平均值妻柒,且差值大于等于閾值>
需要遷移數(shù)據(jù)的節(jié)點(diǎn)(Source類型): overUtilized, aboveAvgUtilized
能夠遷入數(shù)據(jù)的節(jié)點(diǎn)(Target類型): underUtilized, belowAvgUtilized
數(shù)據(jù)遷移配對(duì)(原則:1. 優(yōu)先為同機(jī)架扛拨,其次為其它機(jī)架; 2. 一對(duì)多配對(duì)):
第一步[Source -> Target]:each overUtilized datanode => one or more underUtilized datanodes
第二步[Source -> Target]:match each remaining overutilized datanode => one or more belowAvgUtilized datanodes
第三步[Target -> Source]:each remaining underutilized datanode (step 1未和overUtilized匹配過(guò)) => one or more aboveAvgUtilized datanodes
/** Decide all <source, target> pairs according to the matcher. */
private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
構(gòu)建每一對(duì)<source, target>時(shí),需要計(jì)算當(dāng)前可以遷移或者遷入的空間大小举塔。
dispatcher創(chuàng)建dispatchExecutor線程池執(zhí)行數(shù)據(jù)遷移調(diào)度绑警。
private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size);
source.addTask(task);
target.incScheduledSize(task.getSize());
dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ source.getDisplayName() + " to " + target.getDisplayName());
}
【結(jié)語(yǔ)】
1. 對(duì)于一些大型的HDFS集群(隨時(shí)可能擴(kuò)容或下架服務(wù)器),balance腳本需要作為后臺(tái)常駐進(jìn)程啤贩;
2. 根據(jù)官方建議待秃,腳本需要部署在相對(duì)空閑的服務(wù)器上;
3. 停止腳本通過(guò)kill進(jìn)程實(shí)現(xiàn)(建議不kill痹屹,后臺(tái)運(yùn)行完會(huì)自動(dòng)停止章郁,多次執(zhí)行同時(shí)也只會(huì)有一個(gè)線程存在,其它自動(dòng)失斨狙堋)暖庄;
針對(duì)datanode存儲(chǔ)維護(hù),可以針對(duì)以下幾個(gè)方向進(jìn)行優(yōu)化:
* 通過(guò)參數(shù)(threshold)增加迭代次數(shù)楼肪,以增加datanode允許遷移的數(shù)據(jù)培廓;
* 通過(guò)參數(shù)(exclude, include)設(shè)計(jì)合理的允許進(jìn)行balance策略的服務(wù)器,比如將使用率最低(20%)和最高(20%)的進(jìn)行balance策略;
* 通過(guò)參數(shù)(threshold )設(shè)計(jì)合理的閾值;
<備注:理想狀態(tài)能夠通過(guò)程序自動(dòng)發(fā)現(xiàn)調(diào)整參數(shù)春叫,無(wú)需人為介入>