Flume HDFS Sink常用配置深度解讀

一般使用hdfs sink都會采用滾動生成文件的方式二跋,hdfs sink滾動生成文件的策略有:

  • 基于時間
  • 基于文件大小
  • 基于hdfs文件副本數(shù)(一般要規(guī)避這種情況)
  • 基于event數(shù)量
  • 基于文件閑置時間

下面將詳細講解這些策略的配置以及原理

基于時間策略

配置項:hdfs.rollInterval
默認值:30秒
說明:如果設(shè)置為0表示禁用這個策略
原理:
org.apache.flume.sink.hdfs.BucketWriter.append方法中打開一個文件淫茵,都會調(diào)用open方法蓖议,如果設(shè)置了hdfs.rollInterval脉让,那么hdfs.rollInterval秒之內(nèi)只要其他策略沒有關(guān)閉文件,文件會在hdfs.rollInterval秒之后關(guān)閉空繁。

// if time-based rolling is enabled, schedule the roll
if (rollInterval > 0) {
  Callable<Void> action = new Callable<Void>() {
    public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
          bucketPath, rollInterval);
      try {
        // Roll the file and remove reference from sfWriters map.
        close(true);
      } catch (Throwable t) {
        LOG.error("Unexpected error", t);
      }
      return null;
    }
  };
  timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);
}

基于文件大小和event數(shù)量策略

配置項:

  1. 文件大小策略:hdfs.rollSize
  2. event數(shù)量策略:hdfs.rollCount

默認值:

  1. 文件大小策略:1024字節(jié)
  2. event數(shù)量策略:10

說明:如果設(shè)置為0表示禁用這些策略
原理:
這2種策略都是在org.apache.flume.sink.hdfs.BucketWriter.shouldRotate方法中進行判斷的落塑,只要doRotate的值為true,那么當前文件就會關(guān)閉榨惠,即滾動到下一個文件奋早。

private boolean shouldRotate() {
  boolean doRotate = false;

//判斷文件副本數(shù)
  if (writer.isUnderReplicated()) {
    this.isUnderReplicated = true;
    doRotate = true;
  } else {
    this.isUnderReplicated = false;
  }

//判斷event數(shù)量
  if ((rollCount > 0) && (rollCount <= eventCounter)) {
    LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
    doRotate = true;
  }

//判斷文件大小
  if ((rollSize > 0) && (rollSize <= processSize)) {
    LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
    doRotate = true;
  }

  return doRotate;
}

注意:如果同時配置了時間策略和文件大小策略,那么會先判斷時間赠橙,如果時間沒到再判斷其他的條件耽装。

基于hdfs文件副本數(shù)

配置項:hdfs.minBlockReplicas
默認值:和hdfs的副本數(shù)一致
原理:
從上面的代碼中可以看到,判斷副本數(shù)的關(guān)鍵方法是writer.isUnderReplicated()期揪,即

public boolean isUnderReplicated() {
  try {
    int numBlocks = getNumCurrentReplicas();
    if (numBlocks == -1) {
      return false;
    }
    int desiredBlocks;
    if (configuredMinReplicas != null) {
      desiredBlocks = configuredMinReplicas;
    } else {
      desiredBlocks = getFsDesiredReplication();
    }
    return numBlocks < desiredBlocks;
  } catch (IllegalAccessException e) {
    logger.error("Unexpected error while checking replication factor", e);
  } catch (InvocationTargetException e) {
    logger.error("Unexpected error while checking replication factor", e);
  } catch (IllegalArgumentException e) {
    logger.error("Unexpected error while checking replication factor", e);
  }
  return false;
}

也就是說掉奄,如果當前正在寫的文件的副本數(shù)小于hdfs.minBlockReplicas,此方法返回true横侦,其他情況都返回false挥萌。假設(shè)這個方法返回true,那么看一下會發(fā)生什么事情枉侧。
首先就是上面代碼提到的shouldRotate方法肯定返回的是true引瀑。再繼續(xù)跟蹤,下面的代碼是關(guān)鍵

// check if it's time to rotate the file
if (shouldRotate()) {
  boolean doRotate = true;

  if (isUnderReplicated) {
    if (maxConsecUnderReplRotations > 0 &&
        consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
      doRotate = false;
      if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
        LOG.error("Hit max consecutive under-replication rotations ({}); " +
            "will not continue rolling files under this path due to " +
            "under-replication", maxConsecUnderReplRotations);
      }
    } else {
      LOG.warn("Block Under-replication detected. Rotating file.");
    }
    consecutiveUnderReplRotateCount++;
  } else {
    consecutiveUnderReplRotateCount = 0;
  }

  if (doRotate) {
    close();
    open();
  }
}

這里maxConsecUnderReplRotations是固定的值30榨馁,也就是說憨栽,文件滾動生成了30個之后,就不會再滾動了,因為將doRotate設(shè)置為了false屑柔。所以屡萤,從這里可以看到,如果isUnderReplicated方法返回的是true掸宛,可能會導(dǎo)致文件的滾動和預(yù)期的不一致死陆。規(guī)避這個問題的方法就是將hdfs.minBlockReplicas設(shè)置為1,一般hdfs的副本數(shù)肯定都是大于等于1的唧瘾,所以isUnderReplicated方法一定會返回false措译。
所以一般情況下,要規(guī)避這種情況饰序,避免影響文件的正常滾動领虹。

基于文件閑置時間策略

配置項:hdfs.idleTimeout
默認值:0
說明:默認啟動這個功能
這種策略很簡單,如果文件在hdfs.idleTimeout秒的時間里都是閑置的求豫,沒有任何數(shù)據(jù)寫入塌衰,那么當前文件關(guān)閉,滾動到下一個文件蝠嘉。

public synchronized void flush() throws IOException, InterruptedException {
  checkAndThrowInterruptedException();
  if (!isBatchComplete()) {
    doFlush();

    if (idleTimeout > 0) {
      // if the future exists and couldn't be cancelled, that would mean it has already run
      // or been cancelled
      if (idleFuture == null || idleFuture.cancel(false)) {
        Callable<Void> idleAction = new Callable<Void>() {
          public Void call() throws Exception {
            LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
                     System.currentTimeMillis());
            if (isOpen) {
              close(true);
            }
            return null;
          }
        };
        idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
            TimeUnit.SECONDS);
      }
    }
  }
}

注:本文使用的源碼版本為flume-1.7.0

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末最疆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子是晨,更是在濱河造成了極大的恐慌肚菠,老刑警劉巖舔箭,帶你破解...
    沈念sama閱讀 206,013評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件罩缴,死亡現(xiàn)場離奇詭異,居然都是意外死亡层扶,警方通過查閱死者的電腦和手機箫章,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镜会,“玉大人檬寂,你說我怎么就攤上這事〈帘恚” “怎么了桶至?”我有些...
    開封第一講書人閱讀 152,370評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長匾旭。 經(jīng)常有香客問我镣屹,道長,這世上最難降的妖魔是什么价涝? 我笑而不...
    開封第一講書人閱讀 55,168評論 1 278
  • 正文 為了忘掉前任女蜈,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘伪窖。我一直安慰自己逸寓,他們只是感情好,可當我...
    茶點故事閱讀 64,153評論 5 371
  • 文/花漫 我一把揭開白布覆山。 她就那樣靜靜地躺著竹伸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪簇宽。 梳的紋絲不亂的頭發(fā)上佩伤,一...
    開封第一講書人閱讀 48,954評論 1 283
  • 那天,我揣著相機與錄音晦毙,去河邊找鬼生巡。 笑死,一個胖子當著我的面吹牛见妒,可吹牛的內(nèi)容都是我干的孤荣。 我是一名探鬼主播,決...
    沈念sama閱讀 38,271評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼须揣,長吁一口氣:“原來是場噩夢啊……” “哼盐股!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起耻卡,我...
    開封第一講書人閱讀 36,916評論 0 259
  • 序言:老撾萬榮一對情侶失蹤疯汁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后卵酪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體幌蚊,經(jīng)...
    沈念sama閱讀 43,382評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,877評論 2 323
  • 正文 我和宋清朗相戀三年溃卡,在試婚紗的時候發(fā)現(xiàn)自己被綠了溢豆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,989評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡瘸羡,死狀恐怖漩仙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情犹赖,我是刑警寧澤队他,帶...
    沈念sama閱讀 33,624評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站峻村,受9級特大地震影響麸折,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜雀哨,卻給世界環(huán)境...
    茶點故事閱讀 39,209評論 3 307
  • 文/蒙蒙 一磕谅、第九天 我趴在偏房一處隱蔽的房頂上張望私爷。 院中可真熱鬧,春花似錦膊夹、人聲如沸衬浑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽工秩。三九已至,卻和暖如春进统,著一層夾襖步出監(jiān)牢的瞬間助币,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評論 1 260
  • 我被黑心中介騙來泰國打工螟碎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留眉菱,地道東北人。 一個月前我還...
    沈念sama閱讀 45,401評論 2 352
  • 正文 我出身青樓掉分,卻偏偏與公主長得像俭缓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子酥郭,可洞房花燭夜當晚...
    茶點故事閱讀 42,700評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理华坦,服務(wù)發(fā)現(xiàn),斷路器不从,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 博客原文 翻譯作品惜姐,水平有限,如有錯誤椿息,煩請留言指正歹袁。原文請見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,447評論 13 34
  • 介紹 概述 Apache Flume是為有效收集聚合和移動大量來自不同源到中心數(shù)據(jù)存儲而設(shè)計的可分布,可靠的撵颊,可用...
    ximengchj閱讀 3,513評論 0 13
  • 介紹 概述 Apache Flume是一個分布式的宇攻,可靠的惫叛,高可用的系統(tǒng)倡勇,用于高效地從多個不同的數(shù)據(jù)源收集,匯總及...
    steanxy閱讀 1,052評論 0 1
  • 名字里有月嘉涌,網(wǎng)名是小太陽妻熊,但是我卻深深的愛上了星星。 曾經(jīng)渴望自己活成太陽的樣子仑最,越開心就越嗨扔役,將溫暖帶去...
    大太陽咪咪笑閱讀 122評論 0 0