Balancer.runOneIteration()--》Dispatcher.dispatchAndCheckContinue()
Dispatcher.dispatchAndCheckContinue()--》dispatchBlockMoves()
1. dispatchBlockMoves()
?對每個source進(jìn)行block移動的處理卓囚,相應(yīng)的線程會選擇要移動的block,向proxy source發(fā)送請求來進(jìn)行block移動的初始化操作驱敲。這個過程是流式控制的鹿霸。如果有太多un-confirmed block要移動泵喘,block選擇的操作會被鎖住。
final long bytesLastMoved = getBytesMoved();
//java.util.concurrent.Future可以獲取任務(wù)的執(zhí)行結(jié)果
final Future<?>[] futures = new Future<?>[sources.size()];
//sources是HashSet<Source>集合類對象
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {
final Source s = i.next();
//dispatchExecutor是一個ExecutorService對象般妙,ExecutorService.submit()會返回一個對象
futures[j] = dispatchExecutor.submit(new Runnable() {
@Override
public void run() {
s.dispatchBlocks();
}
});
}
2. dispatchBlocks()
?這個方法會迭代地進(jìn)行以下步驟:首先選取要移動的block纪铺,然后向proxy source發(fā)送請求
//Time.monotonicNow()會調(diào)用System.nanoTime(),以毫微秒為單位
final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false;
int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove();
if (p != null) {
// Reset no pending move counter
noPendingMoveIteration=0;
executePendingMove(p);
continue;
}
3. chooseNextMove()
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
final Task task = i.next();
final DDatanode target = task.target.getDDatanode();
final PendingMove pendingBlock = new PendingMove(this, task.target);
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
if (pendingBlock.chooseBlockAndProxy()) {
long blockSize = pendingBlock.block.getNumBytes();
incScheduledSize(-blockSize);
task.size -= blockSize;
if (task.size == 0) {
i.remove();
}
return pendingBlock;
} else {
// cancel the tentative move
target.removePendingBlock(pendingBlock);
}
}
}
return null;