DRF算法主要目的是按按照主資源進(jìn)行公平分配
考慮一個(gè)有9個(gè)cpu和18GB的系統(tǒng)凌停,有兩個(gè)用戶:用戶A的每個(gè)任務(wù)都請求(1CPU饿凛,4GB)資源;用戶B的每個(gè)任務(wù)都請求(3CPU瞬欧,1GB)資源颜凯。如何為這種情況構(gòu)建一個(gè)公平分配策略?
假設(shè)A任務(wù)分配X個(gè)實(shí)例暑竟,B任務(wù)分配Y個(gè)實(shí)例斋射,有
A任務(wù)每個(gè)實(shí)例需要的cup占總資源的比例為1/9,內(nèi)存占總資源的比例為4/18但荤,4/18>1/9,所以A的主資源為內(nèi)存罗岖,同理可得B的主資源為CPU,現(xiàn)在讓A任務(wù)分配的主資源內(nèi)存和B任務(wù)分配的主資源CPU 公平腹躁,則由:
通過三個(gè)方程桑包,解得X=3,Y=2
即A類任務(wù)可以啟動3個(gè) 纺非,占用資源為(3,12)
B類任務(wù)可以啟動2個(gè)哑了,占用資源為(6,2)
hadoop對DRF的應(yīng)用
hadoop 2.7 中使用DominantResourceCalculator實(shí)現(xiàn)了cup赘方、memory這兩種主資源的公平調(diào)度
代碼如下
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util.resource;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* A {@link ResourceCalculator} which uses the concept of
* <em>dominant resource</em> to compare multi-dimensional resources.
*
* Essentially the idea is that the in a multi-resource environment,
* the resource allocation should be determined by the dominant share
* of an entity (user or queue), which is the maximum share that the
* entity has been allocated of any resource.
*
* In a nutshell, it seeks to maximize the minimum dominant share across
* all entities.
*
* For example, if user A runs CPU-heavy tasks and user B runs
* memory-heavy tasks, it attempts to equalize CPU share of user A
* with Memory-share of user B.
*
* In the single resource case, it reduces to max-min fairness for that resource.
*
* See the Dominant Resource Fairness paper for more details:
* www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf
*/
@Private
@Unstable
public class DominantResourceCalculator extends ResourceCalculator {
@Override
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
if (lhs.equals(rhs)) {
return 0;
}
if (isInvalidDivisor(clusterResource)) {
//除數(shù)為0,cup和memory一大一小
if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
.getVirtualCores())
|| (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
.getVirtualCores())) {
return 0;
} else if (lhs.getMemory() > rhs.getMemory()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
return 1;
} else if (lhs.getMemory() < rhs.getMemory()
|| lhs.getVirtualCores() < rhs.getVirtualCores()) {
return -1;
}
}
//比較主資源
float l = getResourceAsValue(clusterResource, lhs, true);
float r = getResourceAsValue(clusterResource, rhs, true);
if (l < r) {
return -1;
} else if (l > r) {
return 1;
} else {
//主資源相等弱左,比較副資源占比
l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false);
if (l < r) {
return -1;
} else if (l > r) {
return 1;
}
}
return 0;
}
/**
* Use 'dominant' for now since we only have 2 resources - gives us a slight
* performance boost.
*
* Once we add more resources, we'll need a more complicated (and slightly
* less performant algorithm).
*/
protected float getResourceAsValue(
Resource clusterResource, Resource resource, boolean dominant) {
// Just use 'dominant' resource
return (dominant) ?
Math.max(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
)
:
Math.min(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
);
}
@Override
public int computeAvailableContainers(Resource available, Resource required) {
return Math.min(
available.getMemory() / required.getMemory(),
available.getVirtualCores() / required.getVirtualCores());
}
@Override
public float divide(Resource clusterResource,
Resource numerator, Resource denominator) {
return
getResourceAsValue(clusterResource, numerator, true) /
getResourceAsValue(clusterResource, denominator, true);
}
@Override
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
return true;
}
return false;
}
@Override
public float ratio(Resource a, Resource b) {
return Math.max(
(float)a.getMemory()/b.getMemory(),
(float)a.getVirtualCores()/b.getVirtualCores()
);
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemory(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator)
);
}
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
stepFactor.getMemory()),
maximumResource.getMemory());
int normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()),
maximumResource.getVirtualCores());
return Resources.createResource(normalizedMemory,
normalizedCores);
}
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
roundUp(r.getMemory(), stepFactor.getMemory()),
roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemory(), stepFactor.getMemory()),
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@Override
public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundUp(
(int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
roundUp(
(int)Math.ceil(r.getVirtualCores() * by),
stepFactor.getVirtualCores())
);
}
@Override
public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundDown(
(int)(r.getMemory() * by),
stepFactor.getMemory()
),
roundDown(
(int)(r.getVirtualCores() * by),
stepFactor.getVirtualCores()
)
);
}
}