解決Spark Streaming寫入HDFS的小文件問題

今天仍然處于感冒狀態(tài),打開電腦隨便寫一篇忆首,然后滾回床上休息。

我們都知道详幽,在HDFS中不宜存儲大量的小文件。所謂小文件唇聘,就是大小遠小于dfs.block.size的文件。如果有大量小文件的話剥险,會浪費block,使元數(shù)據(jù)增加表制,擠占寶貴的NameNode內(nèi)存控乾。另外,大文件能夠發(fā)揮磁盤順序讀寫的優(yōu)勢蜕衡,小文件會產(chǎn)生很多隨機讀寫,性能下降久脯。

在我們的數(shù)倉體系中镶骗,有一部分業(yè)務(wù)的日志數(shù)據(jù)來源是RocketMQ。我們編寫了Spark Streaming程序作為consumer鼎姊,將這些日志下沉到以天分區(qū)的Hive外部表中相赁,批次間隔(batch duration)為1分鐘。久而久之唤衫,產(chǎn)生了很多小文件绵脯。直覺上講可以通過增長batch duration來減少輸出,但這肯定是下下策蛆挫。

實在更不動了,明天繼續(xù)吧(╯‵□′)╯︵┻━┻


感覺稍微好了一些瞧剖,繼續(xù)寫。我們用兩種方法合并解決該問題抓于,十分有效,下面簡要敘述下怕品。

利用coalesce()和repartition()算子

在真正落盤之前巾遭,可以對RDD做如下兩種操作之一:

rdd.coalesce(1, true)
rdd.repartition(1)

Spark Streaming在將結(jié)果輸出到HDFS時是按分區(qū)來的,分區(qū)越多恢总,產(chǎn)生的小文件自然也越多。coalesce()算子就用來為RDD重新分區(qū)纹安,其源碼如下砂豌,位于RDD類中。

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](
          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
          new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

該算子主要參數(shù)有兩個:numPartitions表示目標分區(qū)數(shù)塔粒,shuffle表示重分區(qū)過程中是否Shuffle筐摘。

如果shuffle參數(shù)為true的話,會從一個隨機分區(qū)開始咖熟,利用HashPartitioner將所有數(shù)據(jù)重新均勻分布到numPartitions個分區(qū)上,返回一個由CoalescedRDD包裝的ShuffleRDD郭赐,父子RDD之間為寬依賴确沸。如果shuffle參數(shù)為false,就直接返回CoalescedRDD罗捎,其內(nèi)部就只是簡單地將多個分區(qū)的數(shù)據(jù)flatMap之后合并為一個分區(qū),父子RDD之間為窄依賴坎匿。

由上面的分析可知,若numPartitions大于原分區(qū)數(shù)替蔬,那么shuffle參數(shù)一定要設(shè)為true才可以。若numPartitions小于原分區(qū)數(shù)驻粟,那么又有兩種情況要考慮:

  • 分區(qū)數(shù)之間的比例不太懸殊凶异。比如原有1000個分區(qū),減少到200個分區(qū)剩彬,這時可以將shuffle設(shè)為false,因為子RDD中的一個分區(qū)只對應(yīng)父RDD的5個分區(qū)沃饶,壓力不大轻黑。

  • 分區(qū)數(shù)之間的比例懸殊。比如原有500個分區(qū)氓鄙,減少到1個分區(qū),就要將shuffle設(shè)為true升酣,保證生成CoalescedRDD之前的操作有足夠的并行度蟋座,防止Executor出現(xiàn)單點問題。這也就是本節(jié)開頭的做法了向臀。

repartition()算子是借助coalesce()實現(xiàn)的诸狭,就是shuffle參數(shù)默認為true的版本。

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

這種方法非常簡單芹彬,只需要一句話就可以使每批次輸出只有一個文件叉庐。不過它會增加批次處理時長,如果數(shù)據(jù)量巨大,可能會造成數(shù)據(jù)堆積肢执,因此需要觀察之后再使用译红。

利用copyMerge()方法

Hadoop的FileUtil工具類中提供了copyMerge()方法,它專門用來將一個HDFS目錄下的所有文件合并成一個文件并輸出侦厚,其源碼如下。

  public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                  FileSystem dstFS, Path dstFile, 
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);

    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   
    OutputStream out = dstFS.create(dstFile);
    
    try {
      FileStatus contents[] = srcFS.listStatus(srcDir);
      Arrays.sort(contents);
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
          InputStream in = srcFS.open(contents[i].getPath());
          try {
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              out.write(addString.getBytes("UTF-8"));
                
          } finally {
            in.close();
          } 
        }
      }
    } finally {
      out.close();
    }
    
    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }

我們就可以寫一個簡單的程序诗宣,通過調(diào)用copyMerge()方法合并Hive外部表對應(yīng)分區(qū)的文件想诅,并且按照分區(qū)的時間粒度(天、小時等)調(diào)度裁眯。源數(shù)據(jù)的文件夾可以通過參數(shù)來指定讳癌,并且設(shè)置deleteSource參數(shù)為true,就能在合并完成后刪除原來的小文件晌坤。需要注意的是,為了避免將當前正在寫入的文件也合并進去它改,調(diào)度需要有一點延時商乎。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市鲜戒,隨后出現(xiàn)的幾起案子抹凳,更是在濱河造成了極大的恐慌,老刑警劉巖赢底,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件柏蘑,死亡現(xiàn)場離奇詭異粹庞,居然都是意外死亡,警方通過查閱死者的電腦和手機黔攒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門强缘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赃蛛,“玉大人秽五,你說我怎么就攤上這事菊卷∈任牛” “怎么了典勇?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵叮趴,是天一觀的道長。 經(jīng)常有香客問我伤溉,道長妻率,這世上最難降的妖魔是什么乱顾? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任走净,我火速辦了婚禮孤里,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘扭粱。我一直安慰自己震檩,他們只是感情好蜓堕,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布博其。 她就那樣靜靜地躺著,像睡著了一般背伴。 火紅的嫁衣襯著肌膚如雪峰髓。 梳的紋絲不亂的頭發(fā)上傻寂,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天疾掰,我揣著相機與錄音徐紧,去河邊找鬼。 笑死并级,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的稻励。 我是一名探鬼主播呀潭,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钠署!你這毒婦竟也來了糠聪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤谐鼎,失蹤者是張志新(化名)和其女友劉穎舰蟆,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狸棍,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡身害,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了草戈。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片塌鸯。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖唐片,靈堂內(nèi)的尸體忽然破棺而出丙猬,到底是詐尸還是另有隱情,我是刑警寧澤茧球,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布庭瑰,位于F島的核電站,受9級特大地震影響抢埋,放射性物質(zhì)發(fā)生泄漏弹灭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一揪垄、第九天 我趴在偏房一處隱蔽的房頂上張望穷吮。 院中可真熱鬧,春花似錦福侈、人聲如沸酒来。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽堰汉。三九已至,卻和暖如春伟墙,著一層夾襖步出監(jiān)牢的瞬間翘鸭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工戳葵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留就乓,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓拱烁,卻偏偏與公主長得像生蚁,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子戏自,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    須臾之北閱讀 855評論 0 1
  • HDFS是Hadoop Distribute File System 的簡稱邦投,也就是Hadoop的一個分布式文件系...
    大佛愛讀書閱讀 868評論 0 0
  • 1.背景 HDFS最初是參考谷歌GFS論文原理開發(fā)的一個開源產(chǎn)品,由Lucene開源項目的創(chuàng)始人Doug Cutt...
    架構(gòu)禪話閱讀 1,380評論 0 2
  • 從“涸澤而漁”的字面意思去理解擅笔,而不考慮它引申的意義志衣,談?wù)劇敦懹^長歌》中的一個小故事。很多事情猛们,換個考慮的角度念脯,就...
    殘劍閱讀 611評論 0 1
  • 昨天我參加了一場婚禮绿店,婚禮的主人公是媽媽朋友的兒子。正當我興致勃勃的看新郎和新娘喝交杯酒的時候庐橙,坐在我旁邊的王阿姨...
    韓雅潔閱讀 1,327評論 4 5