spark3.x 生產(chǎn)調(diào)優(yōu)筆記

1 spark sql寫入mysql非常慢

  • 有這樣一個業(yè)務(wù)場景:需要將通過Spark處理之后的數(shù)據(jù)寫入MySQL,并在在網(wǎng)頁端進行可視化輸出瘪板。Spark處理之后有大概40萬條數(shù)據(jù),寫入MySQL卻要耗費將近30分鐘,這也太慢了财忽!
  • 后來翻看了Spark向JDBC數(shù)據(jù)源寫數(shù)據(jù)的那部分源碼捧杉,雖然源碼中的實現(xiàn)使用的確實是 PreparedStatement 的addBatch()方法和executeBatch()方法陕见,但是我們再去翻看executeBatch()方法的實現(xiàn)后發(fā)現(xiàn),它并不是每次執(zhí)行一批插入糠溜,而是循環(huán)的去執(zhí)行每條insert插入語句淳玩,這就造成只插入一條數(shù)據(jù),而不是一批數(shù)據(jù)非竿,導(dǎo)致大多數(shù)的時間都耗費在了與數(shù)據(jù)庫的交互連接上了
  • 解決方法:
jdbc.saas.url=jdbc:mysql://172.25.1.*/saas-hospital?characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true

2 spark sql jdbc并發(fā)分區(qū)

  • jdbcDF.rdd.partitions.size # 結(jié)果返回 1蜕着。該操作的并發(fā)度為1,你所有的數(shù)據(jù)都會在一個partition中進行操作红柱,意味著無論你給的資源有多少承匣,只有一個task會執(zhí)行任務(wù),執(zhí)行效率可想而之锤悄,并且在稍微大點的表中進行操作分分鐘就會OOM韧骗。
def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根據(jù)該字段分區(qū),需要為整形零聚,比如id等
  lowerBound: Long,      # 分區(qū)的下界
  upperBound: Long,      # 分區(qū)的上界
  numPartitions: Int,    # 分區(qū)的個數(shù)
  connectionProperties: Properties): DataFrame

# 指定字段區(qū)間分區(qū)
val predicates =
    Array(
      "2015-09-16" -> "2015-09-30",
      "2015-10-01" -> "2015-10-15",
      "2015-10-16" -> "2015-10-31",
      "2015-11-01" -> "2015-11-14",
      "2015-11-15" -> "2015-11-30",
      "2015-12-01" -> "2015-12-15"
    ).map {
      case (start, end) =>
        s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
    }

// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)

3 SparkSQL與Parquet格式兼容性

  • spark.sql.parquet.writeLegacyFormat 默認是false袍暴。如果設(shè)置為true 數(shù)據(jù)會以spark 1.4和更早的版本的格式寫入些侍。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出政模,該格式是其他系統(tǒng)例如hive岗宣,impala等使用的。如果是false淋样,會使用parquet的新版格式耗式。例如,decimals會以int-based格式寫出趁猴。

4 RDD復(fù)用并序列化存儲

  • 必須對多次使用的 RDD 進行持久化刊咳,通過持久化將公共 RDD 的數(shù)據(jù)
    緩存到內(nèi)存/磁盤中,之后對于公共 RDD 的計算都會從內(nèi)存/磁盤中直接獲取 RDD 數(shù)據(jù)
  • RDD 的持久化是可以進行序列化的儡司,當(dāng)內(nèi)存無法將 RDD 的數(shù)據(jù)完整的進行存放的時候娱挨,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完整存儲在內(nèi)存中
    public static final StorageLevel MEMORY_ONLY_SER_2 
    public static final StorageLevel MEMORY_AND_DISK_SER_2 

5 RDD合理設(shè)置并行度

  • Spark 官方推薦枫慷,task 數(shù)量應(yīng)該設(shè)置為 Spark 作業(yè)總 CPU core 數(shù)量的 2~3 倍让蕾。之所以沒有推薦 task 數(shù)量與 CPU core 總數(shù)相等,是因為 task 的執(zhí)行時間不同或听,有的 task 執(zhí)行速度快而有的 task 執(zhí)行速度慢探孝,如果 task 數(shù)量與 CPU core 總數(shù)相等,那么執(zhí)行快的 task 執(zhí)行完成后誉裆,會出現(xiàn) CPU core 空閑的情況顿颅。如果 task 數(shù)量設(shè)置為 CPU core 總數(shù)的 2~3 倍,那么一個task 執(zhí)行完畢后足丢,CPU core 會立刻執(zhí)行下一個 task粱腻,降低了資源的浪費,同時提升了 Spark作業(yè)運行的效率斩跌。
val conf = new SparkConf().set("spark.default.parallelism", "500")

6 廣播大變量

  • 默認情況下绍些,task 中的算子中如果使用了外部的變量,每個 task 都會獲取一份變量的復(fù)本耀鸦,這就造成了內(nèi)存的極大消耗柬批。一方面,如果后續(xù)對 RDD 進行持久化袖订,可能就無法將 RDD數(shù)據(jù)存入內(nèi)存氮帐,只能寫入磁盤,磁盤 IO 將會嚴重消耗性能洛姑;另一方面上沐,task 在創(chuàng)建對象的時候,也許會發(fā)現(xiàn)堆內(nèi)存無法存放新創(chuàng)建的對象楞艾,這就會導(dǎo)致頻繁的 GC参咙,GC 會導(dǎo)致工作線程停止龄广,進而導(dǎo)致 Spark 暫停工作一段時間,嚴重影響 Spark 性能昂勒。

7 Kryo 序列化

  • Kryo 序列化機制比 Java 序列化機制性能提高 10 倍左右蜀细,Spark 之所以沒有默認使用
    Kryo 作為序列化類庫,是因為它不支持所有對象的序列化戈盈,同時 Kryo 需要用戶在使用前注
    冊需要序列化的類型,不夠方便谆刨,但從 Spark 2.0.0 版本開始塘娶,簡單類型、簡單類型數(shù)組痊夭、字
    符串類型的 Shuffling RDDs 已經(jīng)默認使用 Kryo 序列化方式了
public class MyKryoRegistrator implements KryoRegistrator {
     @Override
     public void registerClasses(Kryo kryo){
        kryo.register(StartupReportLogs.class);
     }
 }

//創(chuàng)建 SparkConf 對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化庫刁岸,如果要使用 Java 序列化庫,需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
//在 Kryo 序列化庫中注冊自定義的類集合她我,如果要使用 Java 序列化庫虹曙,需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "MyKryoRegistrator");

8 foreachPartition 優(yōu)化數(shù)據(jù)庫操作

  • 在生產(chǎn)環(huán)境中,通常使用 foreachPartition 算子來完成數(shù)據(jù)庫的寫入番舆,通過 foreachPartition
    算子的特性酝碳,可以優(yōu)化寫數(shù)據(jù)庫的性能。
  • 對于我們寫的 function 函數(shù)恨狈,一次處理一整個分區(qū)的數(shù)據(jù)疏哗;
  • 對于一個分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫連接禾怠;
  • 只需要向數(shù)據(jù)庫發(fā)送一次 SQL 語句和多組參數(shù)返奉;

9 repartition 解決 SparkSQL 低并行度問題

  • Spark SQL 的并行度不允許用戶自己指定,Spark SQL 自己會默認根據(jù) hive 表對應(yīng)的
    HDFS 文件的 split 個數(shù)自動設(shè)置 Spark SQL 所在的那個 stage 的并行度吗氏,用戶自己通
    spark.default.parallelism 參數(shù)指定的并行度芽偏,只會在沒 Spark SQL 的 stage 中生效。
  • Spark SQL 這一步的并行度和 task 數(shù)量肯定是沒有辦法去改變了弦讽,但是污尉,對于
    Spark SQL 查詢出來的 RDD,立即使用 repartition 算子坦袍,去重新進行分區(qū)十厢,這樣可
    以重新分區(qū)為多個 partition,從 repartition 之后的 RDD 操作捂齐,由于不再設(shè)計 Spark
    SQL蛮放,因此 stage 的并行度就會等于你手動設(shè)置的值,這樣就避免了 Spark SQL 所在
    的 stage 只能用少量的 task 去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯奠宜。


    spark sql重分區(qū)

10 reduceByKey 預(yù)聚合

  • 本地聚合后包颁,在 map 端的數(shù)據(jù)量變少瞻想,減少了磁盤 IO,也減少了對磁盤空間的占用娩嚼;
  • 本地聚合后蘑险,下一個 stage 拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量岳悟;
  • 本地聚合后佃迄,在 reduce 端進行數(shù)據(jù)緩存的內(nèi)存占用減少;
  • 本地聚合后贵少,在 reduce 端進行聚合的數(shù)據(jù)量減少呵俏。
  • 基于 reduceByKey 的本地聚合特征,我們應(yīng)該考慮使用 reduceByKey 代替其他的 shuffle 算
    子滔灶,例如 groupByKey普碎。

11 故障:shuffle file not found

  • 原因:Shuffle 操作中,后面 stage 的 task 想要去上一個 stage 的
    task 所在的 Executor 拉取數(shù)據(jù)录平,結(jié)果對方正在執(zhí)行 GC麻车,執(zhí)行 GC 會導(dǎo)致 Executor 內(nèi)所有的工作現(xiàn)場全部停止,比如 BlockManager斗这、基于 netty 的網(wǎng)絡(luò)通信等动猬,這就會導(dǎo)致后面的 task拉取數(shù)據(jù)拉取了半天都沒有拉取到,就會報出 shuffle file not found 的錯誤涝影,而第二次再次執(zhí)行就不會再出現(xiàn)這種錯誤枣察。
# 調(diào)整 reduce 端拉取數(shù)據(jù)重試次數(shù)和 reduce 端拉取數(shù)據(jù)時間間隔這兩個參數(shù)
val conf = new SparkConf()
 .set("spark.shuffle.io.maxRetries", "60")
 .set("spark.shuffle.io.retryWait", "60s")

12 故障:OOM

  • Shuffle map端:map 端緩沖的默認配置是 32KB,如果每個 task 處理 640KB 的數(shù)據(jù)燃逻,那么會發(fā)生 640/32 = 20 次溢寫序目,如果每個 task 處理 64000KB 的數(shù)據(jù),機會發(fā)生 64000/32=2000 此溢寫伯襟,這對于性能的影響是非常嚴重的猿涨。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
  • shuffle reduce端: reduce task 的 buffer 緩沖區(qū)大小決定了 reduce task 每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量姆怪。reduce 端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過 park.reducer.maxSizeInFlight 參數(shù)進行設(shè)置叛赚,默認為 48MB
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")

13 Executor 堆外內(nèi)存故障:OOM與task lost

  • Spark 作業(yè)處理的數(shù)據(jù)量非常大,達到幾億的數(shù)據(jù)量稽揭,此時運行 Spark作業(yè)會時不時地報錯俺附,例如 shuffle output file cannot find,executor lost溪掀,task lost事镣,out of memory等,這可能是 Executor 的堆外內(nèi)存不太夠用揪胃,導(dǎo)致 Executor 在運行的過程中內(nèi)存溢出璃哟。
  • Executor 的堆外內(nèi)存主要用于程序的共享庫氛琢、Perm Space、 線程 Stack 和一些 Memory mapping 等, 或者類 C 方式 allocate object随闪。
  • stage 的 task 在運行的時候阳似,可能要從一些 Executor 中去拉取 shuffle map output 文件,但是 Executor 可能已經(jīng)由于內(nèi)存溢出掛掉了铐伴,其關(guān)聯(lián)的 BlockManager 也沒有了撮奏,這就可能會報出 shuffle output file cannot find,executor lost当宴,task lost挽荡,out of memory 等錯誤
# Executor 堆外內(nèi)存的配置需要在 spark-submit :
--conf spark.yarn.executor.memoryOverhead=2048

14 spark數(shù)據(jù)傾斜

1. 數(shù)據(jù)傾斜的表現(xiàn)

  • Spark 作業(yè)的大部分 task 都執(zhí)行迅速,只有有限的幾個 task 執(zhí)行的非常慢即供,此時可能出
    現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運行于微,但是運行得非常慢逗嫡;
  • Spark 作業(yè)的大部分 task 都執(zhí)行迅速,但是有的 task 在運行過程中會突然報出 OOM株依,
    反復(fù)執(zhí)行幾次都在某一個 task 報出 OOM 錯誤驱证,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正
    常運行恋腕。

2. 定位數(shù)據(jù)傾斜問題

  • 查閱代碼中的 shuffle 算子抹锄,例如 reduceByKey、countByKey荠藤、groupByKey伙单、join 等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜哈肖;
  • 查看 Spark 作業(yè)的 log 文件吻育,log 文件對于錯誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯誤發(fā)生在第幾個 stage淤井,對應(yīng)的 shuffle 算子是哪一個布疼;

3. 解決方法1 - 過濾產(chǎn)生數(shù)據(jù)傾斜的key

  • 在 Spark 作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的 key 進行
    過濾币狠,濾除可能導(dǎo)致數(shù)據(jù)傾斜的 key 對應(yīng)的數(shù)據(jù)

4. 解決方法2 - 聚合原始數(shù)據(jù)

  • 如果 Spark 作業(yè)的數(shù)據(jù)來源于 Hive 表游两,那么可以先在 Hive 表中對數(shù)據(jù)進行聚合,例如按照 key 進行分組漩绵,將同一 key 對應(yīng)的所有 value 用一種特殊的格式拼接到一個字符串里去贱案, 尚硅谷大數(shù)據(jù)技術(shù)之 Spark 優(yōu)化 這樣,一個 key 就只有一條數(shù)據(jù)了渐行;之后轰坊,對一個 key 的所有 value 進行處理時铸董,只需要進行 map 操作即可,無需再進行任何的 shuffle 操作肴沫。通過上述方式就避免了執(zhí)行 shuffle 操作粟害,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。

5. 解決方法3 - 提高 shuffle 操作中的 reduce 并行度

  • 在大部分的 shuffle 算子中颤芬,都可以傳入一個并行度的設(shè)置參數(shù)悲幅,比如 reduceByKey(500),這個參數(shù)會決定 shuffle 過程中 reduce 端的并行度站蝠,在進行 shuffle 操作的時候汰具,就會對應(yīng)著
    創(chuàng)建指定數(shù)量的 reduce task。對于 Spark SQL 中的 shuffle 類語句菱魔,比如 group by留荔、join 等,需要設(shè)置一個參數(shù)澜倦,即 spark.sql.shuffle.partitions聚蝶,該參數(shù)代表了 shuffle read task 的并行度,該值默認是 200藻治,對于很多場景來說都有點過小碘勉。

6. 解決方法4 - 聚合算子:使用隨機 key 實現(xiàn)雙重聚合

  • 首先,通過 map 算子給每個數(shù)據(jù)的 key 添加隨機數(shù)前綴桩卵,對 key 進行打散验靡,將原先一
    樣的 key 變成不一樣的 key,然后進行第一次聚合雏节,這樣就可以讓原本被一個 task 處理的數(shù)
    據(jù)分散到多個 task 上去做局部聚合胜嗓;隨后,去除掉每個 key 的前綴矾屯,再次進行聚合兼蕊。

7. 解決方法5 - join算子:將 reduce join 轉(zhuǎn)換為 map join

  • 將較小 RDD 中的數(shù)據(jù)直接通過 collect 算子拉取到 Driver 端的內(nèi)存中來,然后對其創(chuàng)建一個 Broadcast 變量件蚕;接著對另外一個 RDD 執(zhí)行 map 類算子孙技,在算子函數(shù)內(nèi),從 Broadcast 變量中獲取較小 RDD 的全量數(shù)據(jù)排作,與當(dāng)前 RDD 的每一條數(shù)據(jù)按照連接 key 進行比對牵啦,如果連接 key 相同的話,那么就將兩個 RDD 的數(shù)據(jù)用你需要的方式連接起來妄痪。

8. 解決方法6 - join算子:sample 采樣對傾斜 key 單獨進行 join

  • 當(dāng)數(shù)據(jù)量非常大時哈雏,可以考慮使用 sample 采樣獲取 10%的數(shù)據(jù),然后分析這 10%的數(shù)
    據(jù)中哪個 key 可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個 key 對應(yīng)的數(shù)據(jù)單獨提取出來裳瘪。

9. 解決方法7 - join算子:使用隨機數(shù)擴容進行 join

  • 我們會將原先一樣的 key 通過附加隨機前綴變成不一樣的 key土浸,然后就可以將這些處理
    后的“不同 key”分散到多個 task 中去處理,而不是讓一個 task 處理大量的相同 key彭羹。這一種
    方案是針對有大量傾斜 key 的情況黄伊,沒法將部分 key 拆分出來進行單獨處理,需要對整個
    RDD 進行數(shù)據(jù)擴容派殷,對內(nèi)存資源要求很高还最。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市毡惜,隨后出現(xiàn)的幾起案子拓轻,更是在濱河造成了極大的恐慌,老刑警劉巖经伙,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扶叉,死亡現(xiàn)場離奇詭異,居然都是意外死亡帕膜,警方通過查閱死者的電腦和手機辜梳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泳叠,“玉大人,你說我怎么就攤上這事茶宵∥H遥” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵乌庶,是天一觀的道長种蝶。 經(jīng)常有香客問我,道長瞒大,這世上最難降的妖魔是什么螃征? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮透敌,結(jié)果婚禮上盯滚,老公的妹妹穿的比我還像新娘。我一直安慰自己酗电,他們只是感情好魄藕,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著撵术,像睡著了一般背率。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天寝姿,我揣著相機與錄音交排,去河邊找鬼。 笑死饵筑,一個胖子當(dāng)著我的面吹牛埃篓,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播翻翩,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼都许,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嫂冻?” 一聲冷哼從身側(cè)響起胶征,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎桨仿,沒想到半個月后睛低,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡服傍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年钱雷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吹零。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡罩抗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出灿椅,到底是詐尸還是另有隱情套蒂,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布茫蛹,位于F島的核電站操刀,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏婴洼。R本人自食惡果不足惜骨坑,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望柬采。 院中可真熱鬧欢唾,春花似錦、人聲如沸粉捻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽杀迹。三九已至亡脸,卻和暖如春押搪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背浅碾。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工大州, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人垂谢。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓厦画,卻偏偏與公主長得像,于是被迫代替她去往敵國和親滥朱。 傳聞我的和親對象是個殘疾皇子根暑,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容