4. Hadoop之旅——使用篇

Spark

A. 小文件過多

  • 解決方法:
    使用 SparkContext下newAPIHadoopFile完成數(shù)據(jù)輸入丧枪,指定org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat

B. 編程優(yōu)化

Spark程序幾點優(yōu)化

  1. repartition和coalesce

    這兩個方法都可以用在對數(shù)據(jù)的重新分區(qū)中,其中repartition是一個代價很大的操作,它會將所有的數(shù)據(jù)進(jìn)行一次shuffle洼专,然后重新分區(qū)。

    如果你僅僅只是想減少分區(qū)數(shù)凸主,從而達(dá)到減少碎片任務(wù)或者碎片數(shù)據(jù)的目的缴川。使用coalesce就可以實現(xiàn)茉稠,該操作默認(rèn)不會進(jìn)行shuffle。其實repartition只是coalesce的shuffle版本把夸。

    一般我們會在filter算子過濾了大量數(shù)據(jù)后使用它而线。比如將 partition 數(shù)從1000減少到100。這可以減少碎片任務(wù)恋日,降低啟動task的開銷膀篮。

    note1: 如果想查看當(dāng)前rdd的分區(qū)數(shù),在java/scala中可以使用rdd.partitions.size()岂膳,在python中使用rdd.getNumPartitions()誓竿。

    note2: 如果要增加分區(qū)數(shù),只能使用repartition,或者把partition縮減為一個非常小的值谈截,比如說“1”筷屡,也建議使用repartition。

  2. mapPartitions和foreachPartitions

    適當(dāng)使用mapPartitions和foreachPartitions代替map和foreach可以提高程序運(yùn)行速度簸喂。這類操作一次會處理一個partition中的所有數(shù)據(jù)毙死,而不是一條數(shù)據(jù)。

    mapPartition - 因為每次操作是針對partition的娘赴,那么操作中的很多對象和變量都將可以復(fù)用规哲,比如說在方法中使用廣播變量等跟啤。

    foreachPartition - 在和外部數(shù)據(jù)庫交互操作時使用诽表,比如 redis , mysql 等。通過該方法可以避免頻繁的創(chuàng)建和銷毀鏈接隅肥,每個partition使用一個數(shù)據(jù)庫鏈接竿奏,對效率的提升還是非常明顯的。

    note: 此類方法也存在缺陷腥放,因為一次處理一個partition中的所有數(shù)據(jù)泛啸,在內(nèi)存不足的時候,將會遇到OOM的問題秃症。

  3. reduceByKey和aggregateByKey

    使用reduceByKey/aggregateByKey代替groupByKey候址。

    reduceByKey/aggregateByKey會先在map端對本地數(shù)據(jù)按照用戶定義的規(guī)則進(jìn)行一次聚合吕粹,之后再將計算的結(jié)果進(jìn)行shuffle,而groupByKey則會將所以的計算放在reduce階段進(jìn)行(全量數(shù)據(jù)在各個節(jié)點中進(jìn)行了分發(fā)和傳輸)岗仑。所以前者的操作大量的減少shuffle的數(shù)據(jù)匹耕,減少了網(wǎng)絡(luò)IO,提高運(yùn)行效率荠雕。

  4. mapValues

    針對k,v結(jié)構(gòu)的rdd稳其,mapValues直接對value進(jìn)行操作,不對Key造成影響炸卑,可以減少不必要的分區(qū)操作既鞠。

  5. broadcast

    Spark中廣播變量有幾個常見的用法。

    • 實現(xiàn)map-side join

    在需要join操作時盖文,將較小的那份數(shù)據(jù)轉(zhuǎn)化為普通的集合(數(shù)組)進(jìn)行廣播嘱蛋,然后在大數(shù)據(jù)集中使用小數(shù)據(jù)進(jìn)行相應(yīng)的查詢操作,就可以實現(xiàn)map-side join的功能椅寺,避免了join操作的shuffle過程浑槽。在我之前的文章中對此用法有詳細(xì)說明和過程圖解。

    • 使用較大的外部變量

    如果存在較大的外部變量(外部變量可以理解為在driver中定義的變量)返帕,比如說字典數(shù)據(jù)等桐玻。在運(yùn)算過程中,會將這個變量復(fù)制出多個副本荆萤,傳輸?shù)矫總€task之中進(jìn)行執(zhí)行镊靴。如果這個變量的大小有100M或者更大,將會浪費(fèi)大量的網(wǎng)絡(luò)IO链韭,同時偏竟,executor也會因此被占用大量的內(nèi)存,造成頻繁GC敞峭,甚至引發(fā)OOM踊谋。

    因此在這種情況下,我最好提前對該變量進(jìn)行廣播旋讹,它會被事先將副本分發(fā)到每個executor中殖蚕,同一executor中的task則在執(zhí)行時共享該變量。很大程度的減少了網(wǎng)絡(luò)IO開銷以及executor的內(nèi)存使用沉迹。

  6. 復(fù)用RDD

    避免創(chuàng)建一些用處不大的中間RDD(比如從父RDD抽取了某幾個字段形成新的RDD)睦疫,這樣可以減少一些算子操作。

    對多次使用的RDD進(jìn)行緩存操作鞭呕,減少重復(fù)計算蛤育,在下文有說明。

  7. cache和persist

    cache方法等價于persist(StorageLevel.MEMORY_ONLY)

    不要濫用緩存操作。緩存操作非常消耗內(nèi)存瓦糕,緩存前考慮好是否還可以對一些無關(guān)數(shù)據(jù)進(jìn)行過濾底洗。如果你的數(shù)據(jù)在接下來的操作中只使用一次,則不要進(jìn)行緩存咕娄。

    如果需要復(fù)用RDD枷恕,則可以考慮使用緩存操作,將大幅度提高運(yùn)行效率谭胚。緩存也分幾個級別徐块。

    • MEMORY_ONLY

    如果緩存的數(shù)據(jù)量不大或是內(nèi)存足夠,可以使用這種方式灾而,該策略效率是最高的胡控。但是如果內(nèi)存不夠,之前緩存的數(shù)據(jù)則會被清出內(nèi)存旁趟。在spark1.6中昼激,則會直接提示OOM。

    • MEMORY_AND_DISK

    優(yōu)先將數(shù)據(jù)寫入內(nèi)存锡搜,如果內(nèi)存不夠則寫入硬盤橙困。較為穩(wěn)妥的策略,但是如果不是很復(fù)雜的計算耕餐,可能重算的速度比從磁盤中讀取還要快凡傅。

    • MEMORY_ONLY_SER

    會將RDD中的數(shù)據(jù)序列化后存入內(nèi)存,占用更小的內(nèi)存空間肠缔,減少GC頻率夏跷,當(dāng)然,取出數(shù)據(jù)時需要反序列化明未,同樣會消耗資源槽华。

    • MEMORY_AND_DISK_SER

    不再贅述。

    • DISK_ONLY

    該策略類似于checkPoint方法趟妥,把所有的數(shù)據(jù)存入了硬盤猫态,再使用的時候從中讀出。適用于數(shù)據(jù)量很大披摄,重算代價也非常高的操作亲雪。

    • 各種_2結(jié)尾的存儲策略

    實際上是對緩存的數(shù)據(jù)做了一個備份,代價非常高行疏,一般不建議使用匆光。

  • 結(jié)語

    spark的優(yōu)化方法還有很多套像,這篇文章主要從使用的角度講解了常用的優(yōu)化方法酿联,具體的使用方法可以參考博主的其他優(yōu)化文章。

C. SparkSql優(yōu)化配置

  • Caching Data In Memory

Spark SQL can cache tables using an in-memory columnar format by calling spark.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call spark.uncacheTable("tableName") to remove the table from memory.
Configuration of in-memory caching can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.

- spark.sql.inMemoryColumnarStorage.compressed
default: true
> When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.

- spark.sql.inMemoryColumnarStorage.batchSize
default: 10000
> Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
  • spark.sql.files.maxPartitionBytes
    default: 134217728 (128 MB)

The maximum number of bytes to pack into a single partition when reading files.

  • spark.sql.files.openCostInBytes
    default: 4194304 (4 MB)

The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).

  • spark.sql.autoBroadcastJoinThreshold
    default: 10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
has been run.

  • spark.sql.shuffle.partitions
    default: 200

Configures the number of partitions to use when shuffling data for joins or aggregations.

  • 自動決定join和groupby時reducer的數(shù)量。 如果使用默認(rèn)200配置贞让,可能出現(xiàn)周崭,sparksql的reduce task總是200個的情況,導(dǎo)致insert into hive的文件數(shù)量也是200個喳张,容易造成小文件過多
  • 如果partitions數(shù)目過少续镇,容易出現(xiàn)shuffle內(nèi)存超限。org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

D. Spark Sql unsupported (Spark 1.6.2)

  • Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.
  • UNION type
  • Unique join
  • Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore
  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive
  • Block level bitmap indexes and virtual columns (used to build indexes)
  • Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
  • Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.
  • Skew data flag: Spark SQL does not follow the skew data flags in Hive.
  • STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.
  • Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

E.spark executor memory

spark.storage.memoryFraction

  • this would allow cache use more of allocated memory
    大多數(shù)情況下销部,cache memory可以松動摸航,在保證運(yùn)行內(nèi)存和shuffle內(nèi)存的情況下,滿足cache的要求舅桩。否則酱虎,可以采用 內(nèi)存+硬盤的 緩存方式,來解決內(nèi)存不夠分配緩存的情況擂涛。

spark.shuffle.memoryFraction=0.5

  • this would allow shuffle use more of allocated memory

spark.yarn.executor.memoryOverhead=1024

  • this is set in MB. Yarn kills executors when its memory usage is larger then (executor-memory + executor.memoryOverhead)

Little more info

  • If you get shuffle not found exception.
    In case of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
    you should increase spark.shuffle.memoryFraction, for example to 0.5.

  • Most common reason for Yarn killing off my executors was memory usage beyond what it expected. To avoid that you increase spark.yarn.executor.memoryOverhead, I've set it to 1024, even if my executors use only 2-3G of memory.

鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末读串,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子撒妈,更是在濱河造成了極大的恐慌恢暖,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狰右,死亡現(xiàn)場離奇詭異杰捂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)棋蚌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進(jìn)店門琼娘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人附鸽,你說我怎么就攤上這事脱拼。” “怎么了坷备?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵熄浓,是天一觀的道長。 經(jīng)常有香客問我省撑,道長赌蔑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任竟秫,我火速辦了婚禮娃惯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘肥败。我一直安慰自己趾浅,他們只是感情好愕提,可當(dāng)我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著皿哨,像睡著了一般浅侨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上证膨,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天如输,我揣著相機(jī)與錄音,去河邊找鬼央勒。 笑死不见,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的崔步。 我是一名探鬼主播脖祈,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼刷晋!你這毒婦竟也來了盖高?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤眼虱,失蹤者是張志新(化名)和其女友劉穎喻奥,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捏悬,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡撞蚕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了过牙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片甥厦。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖寇钉,靈堂內(nèi)的尸體忽然破棺而出刀疙,到底是詐尸還是另有隱情,我是刑警寧澤扫倡,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布谦秧,位于F島的核電站,受9級特大地震影響撵溃,放射性物質(zhì)發(fā)生泄漏疚鲤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一缘挑、第九天 我趴在偏房一處隱蔽的房頂上張望集歇。 院中可真熱鬧,春花似錦语淘、人聲如沸诲宇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焕窝。三九已至,卻和暖如春维贺,著一層夾襖步出監(jiān)牢的瞬間它掂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工溯泣, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留虐秋,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓垃沦,卻偏偏與公主長得像客给,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子肢簿,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容