1 spark sql寫入mysql非常慢
- 有這樣一個業(yè)務(wù)場景:需要將通過Spark處理之后的數(shù)據(jù)寫入MySQL,并在在網(wǎng)頁端進行可視化輸出瘪板。Spark處理之后有大概40萬條數(shù)據(jù),寫入MySQL卻要耗費將近30分鐘,這也太慢了财忽!
- 后來翻看了Spark向JDBC數(shù)據(jù)源寫數(shù)據(jù)的那部分源碼捧杉,雖然源碼中的實現(xiàn)使用的確實是 PreparedStatement 的addBatch()方法和executeBatch()方法陕见,但是我們再去翻看executeBatch()方法的實現(xiàn)后發(fā)現(xiàn),它并不是每次執(zhí)行一批插入糠溜,而是循環(huán)的去執(zhí)行每條insert插入語句淳玩,這就造成只插入一條數(shù)據(jù),而不是一批數(shù)據(jù)非竿,導(dǎo)致大多數(shù)的時間都耗費在了與數(shù)據(jù)庫的交互連接上了
- 解決方法:
jdbc.saas.url=jdbc:mysql://172.25.1.*/saas-hospital?characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true
2 spark sql jdbc并發(fā)分區(qū)
- jdbcDF.rdd.partitions.size # 結(jié)果返回 1蜕着。該操作的并發(fā)度為1,你所有的數(shù)據(jù)都會在一個partition中進行操作红柱,意味著無論你給的資源有多少承匣,只有一個task會執(zhí)行任務(wù),執(zhí)行效率可想而之锤悄,并且在稍微大點的表中進行操作分分鐘就會OOM韧骗。
def jdbc(
url: String,
table: String,
columnName: String, # 根據(jù)該字段分區(qū),需要為整形零聚,比如id等
lowerBound: Long, # 分區(qū)的下界
upperBound: Long, # 分區(qū)的上界
numPartitions: Int, # 分區(qū)的個數(shù)
connectionProperties: Properties): DataFrame
# 指定字段區(qū)間分區(qū)
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
}
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)
3 SparkSQL與Parquet格式兼容性
- spark.sql.parquet.writeLegacyFormat 默認是false袍暴。如果設(shè)置為true 數(shù)據(jù)會以spark 1.4和更早的版本的格式寫入些侍。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出政模,該格式是其他系統(tǒng)例如hive岗宣,impala等使用的。如果是false淋样,會使用parquet的新版格式耗式。例如,decimals會以int-based格式寫出趁猴。
4 RDD復(fù)用并序列化存儲
- 必須對多次使用的 RDD 進行持久化刊咳,通過持久化將公共 RDD 的數(shù)據(jù)
緩存到內(nèi)存/磁盤中,之后對于公共 RDD 的計算都會從內(nèi)存/磁盤中直接獲取 RDD 數(shù)據(jù) - RDD 的持久化是可以進行序列化的儡司,當(dāng)內(nèi)存無法將 RDD 的數(shù)據(jù)完整的進行存放的時候娱挨,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完整存儲在內(nèi)存中
public static final StorageLevel MEMORY_ONLY_SER_2
public static final StorageLevel MEMORY_AND_DISK_SER_2
5 RDD合理設(shè)置并行度
- Spark 官方推薦枫慷,task 數(shù)量應(yīng)該設(shè)置為 Spark 作業(yè)總 CPU core 數(shù)量的 2~3 倍让蕾。之所以沒有推薦 task 數(shù)量與 CPU core 總數(shù)相等,是因為 task 的執(zhí)行時間不同或听,有的 task 執(zhí)行速度快而有的 task 執(zhí)行速度慢探孝,如果 task 數(shù)量與 CPU core 總數(shù)相等,那么執(zhí)行快的 task 執(zhí)行完成后誉裆,會出現(xiàn) CPU core 空閑的情況顿颅。如果 task 數(shù)量設(shè)置為 CPU core 總數(shù)的 2~3 倍,那么一個task 執(zhí)行完畢后足丢,CPU core 會立刻執(zhí)行下一個 task粱腻,降低了資源的浪費,同時提升了 Spark作業(yè)運行的效率斩跌。
val conf = new SparkConf().set("spark.default.parallelism", "500")
6 廣播大變量
- 默認情況下绍些,task 中的算子中如果使用了外部的變量,每個 task 都會獲取一份變量的復(fù)本耀鸦,這就造成了內(nèi)存的極大消耗柬批。一方面,如果后續(xù)對 RDD 進行持久化袖订,可能就無法將 RDD數(shù)據(jù)存入內(nèi)存氮帐,只能寫入磁盤,磁盤 IO 將會嚴重消耗性能洛姑;另一方面上沐,task 在創(chuàng)建對象的時候,也許會發(fā)現(xiàn)堆內(nèi)存無法存放新創(chuàng)建的對象楞艾,這就會導(dǎo)致頻繁的 GC参咙,GC 會導(dǎo)致工作線程停止龄广,進而導(dǎo)致 Spark 暫停工作一段時間,嚴重影響 Spark 性能昂勒。
7 Kryo 序列化
- Kryo 序列化機制比 Java 序列化機制性能提高 10 倍左右蜀细,Spark 之所以沒有默認使用
Kryo 作為序列化類庫,是因為它不支持所有對象的序列化戈盈,同時 Kryo 需要用戶在使用前注
冊需要序列化的類型,不夠方便谆刨,但從 Spark 2.0.0 版本開始塘娶,簡單類型、簡單類型數(shù)組痊夭、字
符串類型的 Shuffling RDDs 已經(jīng)默認使用 Kryo 序列化方式了
public class MyKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo){
kryo.register(StartupReportLogs.class);
}
}
//創(chuàng)建 SparkConf 對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化庫刁岸,如果要使用 Java 序列化庫,需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化庫中注冊自定義的類集合她我,如果要使用 Java 序列化庫虹曙,需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "MyKryoRegistrator");
8 foreachPartition 優(yōu)化數(shù)據(jù)庫操作
- 在生產(chǎn)環(huán)境中,通常使用 foreachPartition 算子來完成數(shù)據(jù)庫的寫入番舆,通過 foreachPartition
算子的特性酝碳,可以優(yōu)化寫數(shù)據(jù)庫的性能。 - 對于我們寫的 function 函數(shù)恨狈,一次處理一整個分區(qū)的數(shù)據(jù)疏哗;
- 對于一個分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫連接禾怠;
- 只需要向數(shù)據(jù)庫發(fā)送一次 SQL 語句和多組參數(shù)返奉;
9 repartition 解決 SparkSQL 低并行度問題
- Spark SQL 的并行度不允許用戶自己指定,Spark SQL 自己會默認根據(jù) hive 表對應(yīng)的
HDFS 文件的 split 個數(shù)自動設(shè)置 Spark SQL 所在的那個 stage 的并行度吗氏,用戶自己通
spark.default.parallelism 參數(shù)指定的并行度芽偏,只會在沒 Spark SQL 的 stage 中生效。 -
Spark SQL 這一步的并行度和 task 數(shù)量肯定是沒有辦法去改變了弦讽,但是污尉,對于
Spark SQL 查詢出來的 RDD,立即使用 repartition 算子坦袍,去重新進行分區(qū)十厢,這樣可
以重新分區(qū)為多個 partition,從 repartition 之后的 RDD 操作捂齐,由于不再設(shè)計 Spark
SQL蛮放,因此 stage 的并行度就會等于你手動設(shè)置的值,這樣就避免了 Spark SQL 所在
的 stage 只能用少量的 task 去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯奠宜。
spark sql重分區(qū)
10 reduceByKey 預(yù)聚合
- 本地聚合后包颁,在 map 端的數(shù)據(jù)量變少瞻想,減少了磁盤 IO,也減少了對磁盤空間的占用娩嚼;
- 本地聚合后蘑险,下一個 stage 拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量岳悟;
- 本地聚合后佃迄,在 reduce 端進行數(shù)據(jù)緩存的內(nèi)存占用減少;
- 本地聚合后贵少,在 reduce 端進行聚合的數(shù)據(jù)量減少呵俏。
- 基于 reduceByKey 的本地聚合特征,我們應(yīng)該考慮使用 reduceByKey 代替其他的 shuffle 算
子滔灶,例如 groupByKey普碎。
11 故障:shuffle file not found
- 原因:Shuffle 操作中,后面 stage 的 task 想要去上一個 stage 的
task 所在的 Executor 拉取數(shù)據(jù)录平,結(jié)果對方正在執(zhí)行 GC麻车,執(zhí)行 GC 會導(dǎo)致 Executor 內(nèi)所有的工作現(xiàn)場全部停止,比如 BlockManager斗这、基于 netty 的網(wǎng)絡(luò)通信等动猬,這就會導(dǎo)致后面的 task拉取數(shù)據(jù)拉取了半天都沒有拉取到,就會報出 shuffle file not found 的錯誤涝影,而第二次再次執(zhí)行就不會再出現(xiàn)這種錯誤枣察。
# 調(diào)整 reduce 端拉取數(shù)據(jù)重試次數(shù)和 reduce 端拉取數(shù)據(jù)時間間隔這兩個參數(shù)
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "60")
.set("spark.shuffle.io.retryWait", "60s")
12 故障:OOM
- Shuffle map端:map 端緩沖的默認配置是 32KB,如果每個 task 處理 640KB 的數(shù)據(jù)燃逻,那么會發(fā)生 640/32 = 20 次溢寫序目,如果每個 task 處理 64000KB 的數(shù)據(jù),機會發(fā)生 64000/32=2000 此溢寫伯襟,這對于性能的影響是非常嚴重的猿涨。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
- shuffle reduce端: reduce task 的 buffer 緩沖區(qū)大小決定了 reduce task 每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量姆怪。reduce 端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過 park.reducer.maxSizeInFlight 參數(shù)進行設(shè)置叛赚,默認為 48MB
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
13 Executor 堆外內(nèi)存故障:OOM與task lost
- Spark 作業(yè)處理的數(shù)據(jù)量非常大,達到幾億的數(shù)據(jù)量稽揭,此時運行 Spark作業(yè)會時不時地報錯俺附,例如 shuffle output file cannot find,executor lost溪掀,task lost事镣,out of memory等,這可能是 Executor 的堆外內(nèi)存不太夠用揪胃,導(dǎo)致 Executor 在運行的過程中內(nèi)存溢出璃哟。
- Executor 的堆外內(nèi)存主要用于程序的共享庫氛琢、Perm Space、 線程 Stack 和一些 Memory mapping 等, 或者類 C 方式 allocate object随闪。
- stage 的 task 在運行的時候阳似,可能要從一些 Executor 中去拉取 shuffle map output 文件,但是 Executor 可能已經(jīng)由于內(nèi)存溢出掛掉了铐伴,其關(guān)聯(lián)的 BlockManager 也沒有了撮奏,這就可能會報出 shuffle output file cannot find,executor lost当宴,task lost挽荡,out of memory 等錯誤
# Executor 堆外內(nèi)存的配置需要在 spark-submit :
--conf spark.yarn.executor.memoryOverhead=2048
14 spark數(shù)據(jù)傾斜
1. 數(shù)據(jù)傾斜的表現(xiàn)
- Spark 作業(yè)的大部分 task 都執(zhí)行迅速,只有有限的幾個 task 執(zhí)行的非常慢即供,此時可能出
現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運行于微,但是運行得非常慢逗嫡; - Spark 作業(yè)的大部分 task 都執(zhí)行迅速,但是有的 task 在運行過程中會突然報出 OOM株依,
反復(fù)執(zhí)行幾次都在某一個 task 報出 OOM 錯誤驱证,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正
常運行恋腕。
2. 定位數(shù)據(jù)傾斜問題
- 查閱代碼中的 shuffle 算子抹锄,例如 reduceByKey、countByKey荠藤、groupByKey伙单、join 等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜哈肖;
- 查看 Spark 作業(yè)的 log 文件吻育,log 文件對于錯誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯誤發(fā)生在第幾個 stage淤井,對應(yīng)的 shuffle 算子是哪一個布疼;
3. 解決方法1 - 過濾產(chǎn)生數(shù)據(jù)傾斜的key
- 在 Spark 作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的 key 進行
過濾币狠,濾除可能導(dǎo)致數(shù)據(jù)傾斜的 key 對應(yīng)的數(shù)據(jù)
4. 解決方法2 - 聚合原始數(shù)據(jù)
- 如果 Spark 作業(yè)的數(shù)據(jù)來源于 Hive 表游两,那么可以先在 Hive 表中對數(shù)據(jù)進行聚合,例如按照 key 進行分組漩绵,將同一 key 對應(yīng)的所有 value 用一種特殊的格式拼接到一個字符串里去贱案, 尚硅谷大數(shù)據(jù)技術(shù)之 Spark 優(yōu)化 這樣,一個 key 就只有一條數(shù)據(jù)了渐行;之后轰坊,對一個 key 的所有 value 進行處理時铸董,只需要進行 map 操作即可,無需再進行任何的 shuffle 操作肴沫。通過上述方式就避免了執(zhí)行 shuffle 操作粟害,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。
5. 解決方法3 - 提高 shuffle 操作中的 reduce 并行度
- 在大部分的 shuffle 算子中颤芬,都可以傳入一個并行度的設(shè)置參數(shù)悲幅,比如 reduceByKey(500),這個參數(shù)會決定 shuffle 過程中 reduce 端的并行度站蝠,在進行 shuffle 操作的時候汰具,就會對應(yīng)著
創(chuàng)建指定數(shù)量的 reduce task。對于 Spark SQL 中的 shuffle 類語句菱魔,比如 group by留荔、join 等,需要設(shè)置一個參數(shù)澜倦,即 spark.sql.shuffle.partitions聚蝶,該參數(shù)代表了 shuffle read task 的并行度,該值默認是 200藻治,對于很多場景來說都有點過小碘勉。
6. 解決方法4 - 聚合算子:使用隨機 key 實現(xiàn)雙重聚合
- 首先,通過 map 算子給每個數(shù)據(jù)的 key 添加隨機數(shù)前綴桩卵,對 key 進行打散验靡,將原先一
樣的 key 變成不一樣的 key,然后進行第一次聚合雏节,這樣就可以讓原本被一個 task 處理的數(shù)
據(jù)分散到多個 task 上去做局部聚合胜嗓;隨后,去除掉每個 key 的前綴矾屯,再次進行聚合兼蕊。
7. 解決方法5 - join算子:將 reduce join 轉(zhuǎn)換為 map join
- 將較小 RDD 中的數(shù)據(jù)直接通過 collect 算子拉取到 Driver 端的內(nèi)存中來,然后對其創(chuàng)建一個 Broadcast 變量件蚕;接著對另外一個 RDD 執(zhí)行 map 類算子孙技,在算子函數(shù)內(nèi),從 Broadcast 變量中獲取較小 RDD 的全量數(shù)據(jù)排作,與當(dāng)前 RDD 的每一條數(shù)據(jù)按照連接 key 進行比對牵啦,如果連接 key 相同的話,那么就將兩個 RDD 的數(shù)據(jù)用你需要的方式連接起來妄痪。
8. 解決方法6 - join算子:sample 采樣對傾斜 key 單獨進行 join
- 當(dāng)數(shù)據(jù)量非常大時哈雏,可以考慮使用 sample 采樣獲取 10%的數(shù)據(jù),然后分析這 10%的數(shù)
據(jù)中哪個 key 可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個 key 對應(yīng)的數(shù)據(jù)單獨提取出來裳瘪。
9. 解決方法7 - join算子:使用隨機數(shù)擴容進行 join
- 我們會將原先一樣的 key 通過附加隨機前綴變成不一樣的 key土浸,然后就可以將這些處理
后的“不同 key”分散到多個 task 中去處理,而不是讓一個 task 處理大量的相同 key彭羹。這一種
方案是針對有大量傾斜 key 的情況黄伊,沒法將部分 key 拆分出來進行單獨處理,需要對整個
RDD 進行數(shù)據(jù)擴容派殷,對內(nèi)存資源要求很高还最。