背景
CDH 最后一個免費版 6.3.2 發(fā)布一年有余,離線計算核心組件版本停在了 Hadoop 3.0.0睦柴,Hive 2.1.1余指,Spark 2.4.0。隨著 Spark 3.0 的重磅發(fā)布矾瑰,在性能方面又迎來了一次飛躍砖茸,本文將描述把 Spark 3 集成到 CDH 6.3.1(未開啟 Kerberos) 的過程,并使用 Kyuubi 替換 HiveServer2殴穴,實現(xiàn) OLAP凉夯、ETL 等場景下從 HiveQL 到 SparkSQL 的無縫遷移,享受 5x-10x 的性能紅利采幌。
CDH 缺陷修復
[ORC-125] 修復 Hive 不能讀取高版本 ORC 寫入的數(shù)據(jù)
當使用 Hive 讀取由 Presto 或者 Spark 等寫入的 ORC 文件時劲够,會出現(xiàn)以下錯誤。
ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6
該問題在 ORC 上游被修復 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE休傍。
ORC 最早是 Hive 的一個子項目征绎,在 CDH 6 集成的 Hive 2.1 這個版本里,ORC 還沒有分離出去磨取,所以這個問題要在 Hive 源碼里修復人柿。
我做了一個打包好的修復版本,GitHub 傳送門忙厌,下載更換 /opt/cloudera/parcels/CDH/lib/hive/lib
路徑下的 hive-exec-2.1.1-cdh6.3.1.jar
, hive-orc-2.1.1-cdh6.3.1.jar
即可顷扩。(至少需要更換 Hadoop Client、HiveServer2 節(jié)點,如果你不知道我在說什么,就把所有節(jié)點都換掉)
Spark 3.1
[SPARK-33212] Spark 使用 Hadoop Shaded Client
Hadoop 3.0 提供了 Shaded Client治笨,用于下游項目規(guī)避依賴沖突 [HADOOP-11656] Classpath isolation for downstream clients知举。
Spark 3.2 在 hadoop-3.2
profile 中切換到了 Hadoop Shaded Client
[SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile辕棚。
該更改不是必須的贪磺,但個人建議從 Spark 主線將該補丁移植到 branch-3.1 使用坯苹,以規(guī)避潛在的依賴沖突参咙。
[CDH-71907] Spark HiveShim 適配 CDH Hive
Spark 通過反射和隔離的類加載器來實現(xiàn)對多版本 Hive Metastore 的支持犀农,詳情參考
Interacting with Different Versions of Hive Metastore - Spark Documentation惰赋。
CDH 6 使用修改過的 Hive 2.1.1 版本,其方法簽名與 Apache 版本有所不同呵哨,故 Spark HiveShim 反射調用會出現(xiàn)找不到方法簽名赁濒,需要手動將 CDH-71907 補丁打到 branch-3.1。
Spark External Shuffle Service 協(xié)議兼容
Spark shuffle 時孟害,mapper 會將數(shù)據(jù)寫入到本地磁盤而非 HDFS拒炎,引入 ESS 后,會將文件信息注冊到 ESS 中挨务,將 mapper 與 reducer 解耦击你。CDH 中,默認會啟用 Spark Yarn External Shuffle Service谎柄,作為 YARN AUX Service 在所有 Yarn Node 上啟動丁侄。
Spark 3 修改了 shuffle 通信協(xié)議,在與 CDH 2.4 版本的 ESS 交互時朝巫,需要設置 spark.shuffle.useOldFetchProtocol=true
鸿摇,否則可能報如下錯誤。[SPARK-29435] Spark 3 doesn't work with older shuffle service
IllegalArgumentException: Unexpected message type: <number>.
Spark 版本遷移指南
如果你有現(xiàn)存的基于 CDH Spark 2.4 的 Spark SQL/Job劈猿,在將其遷移至 Spark 3 版本前户辱,請參閱完整的官方遷移指南。
- Migration Guide: Spark Core - Spark Documentation
- Migration Guide: SQL, Datasets and DataFrame - Spark Documentation
Spark 部署
官方文檔 Running Spark on YARN - Documentation 中提到
To make Spark runtime jars accessible from YARN side, you can specify
spark.yarn.archive
orspark.yarn.jars
. For details please refer to Spark Properties. If neitherspark.yarn.archive
norspark.yarn.jars
is specified, Spark will create a zip file with all jars under$SPARK_HOME/jars
and upload it to the distributed cache.
因此糙臼,無需在 CDH 所有節(jié)點上部署 Spark 3,只需在 Hadoop Client 節(jié)點上部署 Spark 3 即可恩商。
如果你對集群權限管理沒有十分嚴格的要求变逃,請使用 hive 用戶以避免權限問題。
我基于 Spark 3.1.2 制作了一個適配 CDH 6 的版本怠堪, GitHub 傳送門 揽乱,下載解壓至 /opt
,并軟鏈至 /opt/spark3
粟矿。
[hive@cdh-kyuubi]$ ls -l /opt | grep spark
lrwxrwxrwx 1 root root 39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
drwxr-xr-x 13 hive hive 4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2
配置 Hadoop凰棉、Hive
CDH 會將配置文件自動分發(fā)到所有節(jié)點 /etc
目錄下,建立軟鏈即可陌粹。
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/
配置 Spark 環(huán)境變量 /opt/spark3/conf/spark-env.sh
#!/usr/bin/env bash
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf
配置 Spark 默認參數(shù) /opt/spark3/conf/spark-defaults.conf
請參考 Configuration - Spark Documentation 根據(jù)集群環(huán)境實際情況進行微調
spark.authenticate=false
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true
spark.files.overwrite=true
spark.files.useFetchCache=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.shuffle.useOldFetchProtocol=true
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.yarn.historyServer.address=http://cdh-master2:18088
spark.yarn.historyServer.allowTracking=true
spark.master=yarn
spark.submit.deployMode=cluster
spark.driver.memory=2G
spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=2G
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2G
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.sql.cbo.enabled=true
spark.sql.cbo.starSchemaDetection=true
spark.sql.datetime.java8API.enabled=false
spark.sql.sources.partitionOverwriteMode=dynamic
spark.sql.hive.convertMetastoreParquet=false
spark.sql.hive.convertMetastoreParquet.mergeSchema=false
spark.sql.hive.metastore.version=2.1.1
spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.orc.mergeSchema=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.writeLegacyFormat=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.autoBroadcastJoinThreshold=-1
驗證 spark-shell 工作正常
[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://cdh-master1:4040
Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2-cdh6
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
+----------------+
| namespace|
+----------------+
| test|
+----------------+
scala>
至此撒犀,Spark 3 on CDH 6 已經(jīng)部署完成,可以像 CDH 自帶的 Spark 2.4 一樣,正常的使用 spark-submit或舞、spark-shell荆姆,但依舊不支持 spark-sql,spark-thriftserver映凳。
相比略顯雞肋的 spark-sql胆筒,spark-thriftserver,Kyuubi 是更好的選擇诈豌,因此我故意移除了這兩個功能仆救。
注意:我提供的構建版,沒有啟用 spark-thriftserver 模塊矫渔,為正常使用 Kyuubi彤蔽,請下載 hive-service-rpc-3.1.2.jar
添加到 /opt/spark3/jars
路徑。
Kyuubi —— 解鎖 Spark SQL 更多場景
簡言之蚌斩,Apache Kyuubi (Incubating) 之于 Spark铆惑,類似 HiveServer2 之于 Hive。
Kyuubi 通過將 Spark 暴露一個與 HiveServer2 完全兼容的 Thrift API送膳,可以兼容現(xiàn)有的 Hive 生態(tài)员魏,如 beeline,hive-jdbc-driver叠聋,HUE撕阎,Superset 等。
可以通過以下文檔來了解 Kyuubi 的架構碌补,Kyuubi 與 HiveServer2 和 Spark Thrift Server 的異同虏束。
- Welcome to Kyuubi’s documentation
- Kyuubi Architecture — Kyuubi documentation
- Kyuubi v.s. HiveServer2 — Kyuubi documentation
- Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS) — Kyuubi documentation
Kyuubi 部署
Kyuubi 無需任何修改即可適配 CDH 6,下面給出關鍵步驟厦章,詳情可以參考 Deploy Kyuubi engines on Yarn — Kyuubi documentation镇匀。
同樣的,如果你對集群權限管理沒有十分嚴格的要求袜啃,請使用 hive 用戶以避免權限問題汗侵。
解壓部署
下載 kyuubi-1.3.0-incubating-bin.tgz 解壓至 /opt
,并創(chuàng)建軟鏈到 /opt/kyuubi
群发。
[hive@cdh-external opt]$ ls -l /opt | grep kyuubi
lrwxrwxrwx 1 root root 32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
drwxrwxr-x 13 hive hive 4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin
修改配置
按需修改 /opt/kyuubi/conf/kyuubi-env.sh
#!/usr/bin/env bash
export JAVA_HOME=/usr/java/default
export SPARK_HOME=/opt/spark3
export SPARK_CONF_DIR=${SPARK_HOME}/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
export KYUUBI_MAX_LOG_FILES=10
參考 Kyuubi Configurations System — Kyuubi documentation 按需修改 /opt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.authentication=NONE
kyuubi.engine.share.level=USER
kyuubi.frontend.bind.host=0.0.0.0
kyuubi.frontend.bind.port=10009
kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
kyuubi.ha.zookeeper.namespace=kyuubi
kyuubi.session.engine.idle.timeout=PT10H
spark.master=yarn
spark.submit.deployMode=cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60
注意:kyuubi-defaults.conf
中的 spark 配置優(yōu)先級高于 spark-defaults.conf
性能調優(yōu)
How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation
啟動
使用 /opt/kyuubi/bin/kyuubi
在前臺或后臺啟動 Kyuubi Server晰韵。
[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
Usage: bin/kyuubi command
commands:
start - Run a Kyuubi server as a daemon
run - Run a Kyuubi server in the foreground
stop - Stop the Kyuubi daemon
status - Show status of the Kyuubi daemon
-h | --help - Show this help message
Beeline 連接
使用默認參數(shù)連接 Kyuubi
beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata
使用自定義參數(shù)連接 Kyuubi
beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch
詳細配置參考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation
HUE 連接
簡單說,在 Cloudera Manager 中修改 HUE 配置項 Hue Service Advanced Configuration Snippet 如下熟妓,即可在 HUE 中開啟 Spark SQL 引擎雪猪。
[desktop]
app_blacklist=zookeeper,hbase,impala,search,sqoop,security
use_new_editor=true
[[interpreters]]
[[[sparksql]]]
name=Spark SQL
interface=hiveserver2
[[[hive]]]
name=Hive
interface=hiveserver2
# other interpreters
...
[spark]
sql_server_host=kyuubi
sql_server_port=10009
詳細配置參考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation
Kyuubi engine 共享級別與應用場景
Kyuubi Spark engine 即一個 Spark driver,通過控制 engine 的共享策略起愈,可以在隔離性和資源利用率上取得平衡只恨。Kyuubi 提供 3 種 engine 共享級別译仗,分別為 CONNECTION,USER(默認)坤次,SERVER古劲。
下面的討論均假設使用 YARN Cluster 模式啟動 Kyuubi Spark engine。
我們首先補充一些時間開銷和 Spark 操作執(zhí)行的信息缰猴。
Kyuubi Spark engine 在冷啟動時产艾,會由 Kyuubi Server 通過 spark-submit
命令向 YARN 提交一個 Spark App,即 engine滑绒,該過程從提交到 Spark driver 啟動約需要 5-6s闷堡,然后 engine 將自己注冊到 Zookeeper,Kyuubi Server 監(jiān)聽 Zookeeper 發(fā)現(xiàn) engine 并建立連接疑故,該過程約 1-2s杠览,如此算來,在 YARN 資源空閑時纵势,整個 engine 冷啟動時間約 6-8s踱阿;Client 連接到存在的 engine,通常耗時 1s 內钦铁。
如果啟動了 Spark 的 executor 動態(tài)伸縮特性软舌,真正執(zhí)行 SQL 任務時,如果資源有富余牛曹,會動態(tài)創(chuàng)建 executor佛点,每個 executor 創(chuàng)建耗時約為 2-3s。
元數(shù)據(jù)黎比、DDL 等操作超营,如獲取 database 列表,CREATE TABLE
語句等阅虫,會在 driver 上執(zhí)行演闭;計算任務如 JOIN
、COUNT()
等颓帝,會由 driver 生成執(zhí)行計劃米碰,在 executor 上執(zhí)行。
在我們的生產(chǎn)環(huán)境中躲履,大概有如下三種使用場景:
- 使用 HUE 進行 ad-hoc 查詢
該場景中,會有多個用戶進行查詢聊闯,一般會運行相對較大的查詢任務工猜,用戶對連接創(chuàng)建時間以及元數(shù)據(jù)加載時間較為敏感,但對查詢結果響應時間有一定的容忍性菱蔬。
在這種場景中篷帅,使用默認的 USER 共享級別史侣,每個用戶只使用一個 Spark engine,配合使用 Spark 的動態(tài)伸縮特性魏身,動態(tài)的創(chuàng)建和銷毀 executor惊橱,在保證用戶之間隔離的基礎上,降低啟動時間和資源占用箭昵。建議的關鍵配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT1H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120
- 使用 Beeline 運行批任務 SQL
該場景會使用統(tǒng)一的 batch 賬號提交 SQL 任務税朴,對響應時間敏感度低,但對穩(wěn)定性要求非常要家制,并且應盡可能的最大化利用集群資源正林。
推薦在該場景下將共享級別調整為 CONNECTION,這樣每個 SQL 執(zhí)行將會使用獨立的 Spark driver颤殴,并且 SQL 執(zhí)行完畢后 Spark driver 立即退出觅廓,保障離線任務互不干擾,并且資源及時釋放涵但。建議的關鍵配置如下:
kyuubi.engine.share.level=CONNECTION
spark.dynamicAllocation.enabled=true
# 根據(jù)任務具體資源消耗估算杈绸,從 workflow 整體上提升集群資源利用率
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=30
- 使用 Superset 進行多數(shù)據(jù)源聯(lián)邦查詢
該場景中,只有一個 service 賬號矮瘟,會定時同時刷新大量的圖表瞳脓,對連接創(chuàng)建時間、查詢結果響應時間芥永、并發(fā)度都有較高的要求篡殷。
該場景中,driver 和 executor 的啟動時間都是不容忽視的埋涧,因此在 ad-hoc 查詢配置的基礎上板辽,應延長 driver、executor 的閑置等待時間棘催,并且設定最小的 executor 本⑾遥活數(shù)量,保證時刻有 executor 常駐醇坝,能快速響應較小的查詢邑跪。建議的關鍵配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=6
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.executorIdleTimeout=120
我們可以啟動三個 Kyuubi Server(單機或集群)來分別應對如上的三種場景,但也可以僅啟動一個 Kyuubi Server(單機或集群)來滿足不同的場景呼猪。
一種建議的實踐方式是 Kyuubi Server 配置使用默認的 USER 共享級別画畅,這樣客戶端連接時,會使用默認配置宋距;當 ETL 跑批場景時轴踱,可以通過 beeline 參數(shù)將本次連接共享級別調整為 CONNECTION;類似的谚赎,在 Superset 連接中配置獨立參數(shù)淫僻。
結語
現(xiàn)在去跑一些 SQL诱篷,體驗 Spark SQL 帶來的性能飛躍吧!