一般使用hdfs sink都會采用滾動生成文件的方式二跋,hdfs sink滾動生成文件的策略有:
- 基于時間
- 基于文件大小
- 基于hdfs文件副本數(shù)(一般要規(guī)避這種情況)
- 基于event數(shù)量
- 基于文件閑置時間
下面將詳細講解這些策略的配置以及原理
基于時間策略
配置項:hdfs.rollInterval
默認值:30秒
說明:如果設(shè)置為0表示禁用這個策略
原理:
在org.apache.flume.sink.hdfs.BucketWriter.append
方法中打開一個文件淫茵,都會調(diào)用open
方法蓖议,如果設(shè)置了hdfs.rollInterval脉让,那么hdfs.rollInterval秒之內(nèi)只要其他策略沒有關(guān)閉文件,文件會在hdfs.rollInterval秒之后關(guān)閉空繁。
// if time-based rolling is enabled, schedule the roll
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch (Throwable t) {
LOG.error("Unexpected error", t);
}
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
基于文件大小和event數(shù)量策略
配置項:
- 文件大小策略:hdfs.rollSize
- event數(shù)量策略:hdfs.rollCount
默認值:
- 文件大小策略:1024字節(jié)
- event數(shù)量策略:10
說明:如果設(shè)置為0表示禁用這些策略
原理:
這2種策略都是在org.apache.flume.sink.hdfs.BucketWriter.shouldRotate
方法中進行判斷的落塑,只要doRotate
的值為true,那么當前文件就會關(guān)閉榨惠,即滾動到下一個文件奋早。
private boolean shouldRotate() {
boolean doRotate = false;
//判斷文件副本數(shù)
if (writer.isUnderReplicated()) {
this.isUnderReplicated = true;
doRotate = true;
} else {
this.isUnderReplicated = false;
}
//判斷event數(shù)量
if ((rollCount > 0) && (rollCount <= eventCounter)) {
LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
doRotate = true;
}
//判斷文件大小
if ((rollSize > 0) && (rollSize <= processSize)) {
LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
doRotate = true;
}
return doRotate;
}
注意:如果同時配置了時間策略和文件大小策略,那么會先判斷時間赠橙,如果時間沒到再判斷其他的條件耽装。
基于hdfs文件副本數(shù)
配置項:hdfs.minBlockReplicas
默認值:和hdfs的副本數(shù)一致
原理:
從上面的代碼中可以看到,判斷副本數(shù)的關(guān)鍵方法是writer.isUnderReplicated()
期揪,即
public boolean isUnderReplicated() {
try {
int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {
return false;
}
int desiredBlocks;
if (configuredMinReplicas != null) {
desiredBlocks = configuredMinReplicas;
} else {
desiredBlocks = getFsDesiredReplication();
}
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
也就是說掉奄,如果當前正在寫的文件的副本數(shù)小于hdfs.minBlockReplicas,此方法返回true
横侦,其他情況都返回false
挥萌。假設(shè)這個方法返回true
,那么看一下會發(fā)生什么事情枉侧。
首先就是上面代碼提到的shouldRotate
方法肯定返回的是true
引瀑。再繼續(xù)跟蹤,下面的代碼是關(guān)鍵
// check if it's time to rotate the file
if (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}
if (doRotate) {
close();
open();
}
}
這里maxConsecUnderReplRotations
是固定的值30榨馁,也就是說憨栽,文件滾動生成了30個之后,就不會再滾動了,因為將doRotate
設(shè)置為了false
屑柔。所以屡萤,從這里可以看到,如果isUnderReplicated
方法返回的是true
掸宛,可能會導(dǎo)致文件的滾動和預(yù)期的不一致死陆。規(guī)避這個問題的方法就是將hdfs.minBlockReplicas設(shè)置為1,一般hdfs的副本數(shù)肯定都是大于等于1的唧瘾,所以isUnderReplicated
方法一定會返回false
措译。
所以一般情況下,要規(guī)避這種情況饰序,避免影響文件的正常滾動领虹。
基于文件閑置時間策略
配置項:hdfs.idleTimeout
默認值:0
說明:默認啟動這個功能
這種策略很簡單,如果文件在hdfs.idleTimeout秒的時間里都是閑置的求豫,沒有任何數(shù)據(jù)寫入塌衰,那么當前文件關(guān)閉,滾動到下一個文件蝠嘉。
public synchronized void flush() throws IOException, InterruptedException {
checkAndThrowInterruptedException();
if (!isBatchComplete()) {
doFlush();
if (idleTimeout > 0) {
// if the future exists and couldn't be cancelled, that would mean it has already run
// or been cancelled
if (idleFuture == null || idleFuture.cancel(false)) {
Callable<Void> idleAction = new Callable<Void>() {
public Void call() throws Exception {
LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
System.currentTimeMillis());
if (isOpen) {
close(true);
}
return null;
}
};
idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
TimeUnit.SECONDS);
}
}
}
}
注:本文使用的源碼版本為flume-1.7.0