今天仍然處于感冒狀態(tài),打開電腦隨便寫一篇忆首,然后滾回床上休息。
我們都知道详幽,在HDFS中不宜存儲大量的小文件。所謂小文件唇聘,就是大小遠小于dfs.block.size
的文件。如果有大量小文件的話剥险,會浪費block,使元數(shù)據(jù)增加表制,擠占寶貴的NameNode內(nèi)存控乾。另外,大文件能夠發(fā)揮磁盤順序讀寫的優(yōu)勢蜕衡,小文件會產(chǎn)生很多隨機讀寫,性能下降久脯。
在我們的數(shù)倉體系中镶骗,有一部分業(yè)務(wù)的日志數(shù)據(jù)來源是RocketMQ。我們編寫了Spark Streaming程序作為consumer鼎姊,將這些日志下沉到以天分區(qū)的Hive外部表中相赁,批次間隔(batch duration)為1分鐘。久而久之唤衫,產(chǎn)生了很多小文件绵脯。直覺上講可以通過增長batch duration來減少輸出,但這肯定是下下策蛆挫。
實在更不動了,明天繼續(xù)吧(╯‵□′)╯︵┻━┻
感覺稍微好了一些瞧剖,繼續(xù)寫。我們用兩種方法合并解決該問題抓于,十分有效,下面簡要敘述下怕品。
利用coalesce()和repartition()算子
在真正落盤之前巾遭,可以對RDD做如下兩種操作之一:
rdd.coalesce(1, true)
rdd.repartition(1)
Spark Streaming在將結(jié)果輸出到HDFS時是按分區(qū)來的,分區(qū)越多恢总,產(chǎn)生的小文件自然也越多。coalesce()算子就用來為RDD重新分區(qū)纹安,其源碼如下砂豌,位于RDD類中。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
該算子主要參數(shù)有兩個:numPartitions表示目標分區(qū)數(shù)塔粒,shuffle表示重分區(qū)過程中是否Shuffle筐摘。
如果shuffle參數(shù)為true的話,會從一個隨機分區(qū)開始咖熟,利用HashPartitioner將所有數(shù)據(jù)重新均勻分布到numPartitions個分區(qū)上,返回一個由CoalescedRDD包裝的ShuffleRDD郭赐,父子RDD之間為寬依賴确沸。如果shuffle參數(shù)為false,就直接返回CoalescedRDD罗捎,其內(nèi)部就只是簡單地將多個分區(qū)的數(shù)據(jù)flatMap之后合并為一個分區(qū),父子RDD之間為窄依賴坎匿。
由上面的分析可知,若numPartitions大于原分區(qū)數(shù)替蔬,那么shuffle參數(shù)一定要設(shè)為true才可以。若numPartitions小于原分區(qū)數(shù)驻粟,那么又有兩種情況要考慮:
分區(qū)數(shù)之間的比例不太懸殊凶异。比如原有1000個分區(qū),減少到200個分區(qū)剩彬,這時可以將shuffle設(shè)為false,因為子RDD中的一個分區(qū)只對應(yīng)父RDD的5個分區(qū)沃饶,壓力不大轻黑。
分區(qū)數(shù)之間的比例懸殊。比如原有500個分區(qū)氓鄙,減少到1個分區(qū),就要將shuffle設(shè)為true升酣,保證生成CoalescedRDD之前的操作有足夠的并行度蟋座,防止Executor出現(xiàn)單點問題。這也就是本節(jié)開頭的做法了向臀。
repartition()算子是借助coalesce()實現(xiàn)的诸狭,就是shuffle參數(shù)默認為true的版本。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
這種方法非常簡單芹彬,只需要一句話就可以使每批次輸出只有一個文件叉庐。不過它會增加批次處理時長,如果數(shù)據(jù)量巨大,可能會造成數(shù)據(jù)堆積肢执,因此需要觀察之后再使用译红。
利用copyMerge()方法
Hadoop的FileUtil工具類中提供了copyMerge()方法,它專門用來將一個HDFS目錄下的所有文件合并成一個文件并輸出侦厚,其源碼如下。
public static boolean copyMerge(FileSystem srcFS, Path srcDir,
FileSystem dstFS, Path dstFile,
boolean deleteSource,
Configuration conf, String addString) throws IOException {
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
if (!srcFS.getFileStatus(srcDir).isDirectory())
return false;
OutputStream out = dstFS.create(dstFile);
try {
FileStatus contents[] = srcFS.listStatus(srcDir);
Arrays.sort(contents);
for (int i = 0; i < contents.length; i++) {
if (contents[i].isFile()) {
InputStream in = srcFS.open(contents[i].getPath());
try {
IOUtils.copyBytes(in, out, conf, false);
if (addString!=null)
out.write(addString.getBytes("UTF-8"));
} finally {
in.close();
}
}
}
} finally {
out.close();
}
if (deleteSource) {
return srcFS.delete(srcDir, true);
} else {
return true;
}
}
我們就可以寫一個簡單的程序诗宣,通過調(diào)用copyMerge()方法合并Hive外部表對應(yīng)分區(qū)的文件想诅,并且按照分區(qū)的時間粒度(天、小時等)調(diào)度裁眯。源數(shù)據(jù)的文件夾可以通過參數(shù)來指定讳癌,并且設(shè)置deleteSource參數(shù)為true,就能在合并完成后刪除原來的小文件晌坤。需要注意的是,為了避免將當前正在寫入的文件也合并進去它改,調(diào)度需要有一點延時商乎。