作業(yè)腳本采用Python語言編寫握爷,Spark為Python開發(fā)者提供了一個API-----PySpark嚷掠,利用PySpark可以很方便的連接Hive
下面是準(zhǔn)備要查詢的HiveSQL
select
sum(o.sale_price)
,sum(case when cate_id2 in(16,18) then o.sale_price else 0 end )
,sum(CASE WHEN cate_id2 in(13,15,17,19,20,21,22,156) THEN o.sale_price else 0 end )
FROM dw.or_order_item_total o
join dw.cd_item_total i on o.item_id = i.item_id and i.ds ='2018-03-31'
WHERE o.ds = '2018-03-31' and substr(o.ord_tm,1,7) ='2018-03'
;
下面是準(zhǔn)備提交的Python腳本
#!/usr/bin/python
#-*-coding:utf-8 -*-
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
import sys
def test():
reload(sys)
sys.setdefaultencoding( "utf-8" )
conf = SparkConf().setMaster("yarn-client").setAppName("My App")
sc = SparkContext(conf = conf)
hive_context = HiveContext(sc)
hive_context.sql(''' select
sum(o.sale_price)
,sum(case when cate_id2 in(16,18) then o.sale_price else NULL end )
,sum(CASE WHEN cate_id2 in(13,15,17,19,20,21,22,156) THEN o.sale_price else NULL end )
FROM dw.or_order_item_total o
join dw.cd_item_total i on o.item_id = i.item_id and i.ds ='2018-03-31'
WHERE o.ds = '2018-03-31' and substr(o.ord_tm,1,7) ='2018-03' ''').show()
if __name__ == '__main__':
test()
腳本開頭指定utf8編碼,否則腳本內(nèi)如果有unicode字符時,運(yùn)行會報錯
任何Spark程序都是由SparkContext開始的
SparkContext的初始化需要一個SparkConf對象尚蝌,SparkConf包含了Spark集群配置的各種參數(shù)。
在SparkConf里指定采用yarn-client模式充尉,然后指定作業(yè)名
pyspark不支持yarn-cluster模式
要連接Hive的話飘言,要先獲得HiveContext,初始化HiveContext時將上面得到的SparkContext傳進(jìn)去
之后就可以將SQL傳進(jìn) hive_context.sql(‘’‘ ’‘’)中驼侠,最后調(diào)用show()顯示結(jié)果
注意hive_context.sql(‘’‘ ’‘’)姿鸿,括面是三個單引號,python中用三個單引號包裹多行的字符串倒源,所以SQL粘貼進(jìn)來的時候要與這三個單引號有一個空格的距離苛预,SQL末尾的分號要去掉,case when里面的then和else數(shù)據(jù)類型要一樣笋熬,目前遇到的坑就這么多
寫完腳本不能用python直接運(yùn)行的方式热某,要用spark-submit提交,下面是提交命令
/usr/local/spark/bin/spark-submit \
--master yarn-client \
--executor-memory 29G \
--num-executors 10 \
--executor-cores 2 \
--conf spark.default.parallelism=200 \
/home/dwetl/sunwenxue/sun0403/spark_app_demo.py
其中各個參數(shù)解釋如下
1.num-executors
- 參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請資源時胳螟,YARN集群管理器會盡可能按照你的設(shè)置來在集群的各個工作節(jié)點(diǎn)上昔馋,啟動相應(yīng)數(shù)量的Executor進(jìn)程。這個參數(shù)非常之重要糖耸,如果不設(shè)置的話秘遏,默認(rèn)只會給你啟動少量的Executor進(jìn)程,此時你的Spark作業(yè)的運(yùn)行速度是非常慢的嘉竟。
- 參數(shù)調(diào)優(yōu)建議:每個Spark作業(yè)的運(yùn)行一般設(shè)置50~100個左右的Executor進(jìn)程比較合適垄提,設(shè)置太少或太多的Executor進(jìn)程都不好榔袋。設(shè)置的太少,無法充分利用集群資源铡俐;設(shè)置的太多的話凰兑,大部分隊(duì)列可能無法給予充分的資源。
2.executor-memory
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的內(nèi)存审丘。Executor內(nèi)存的大小吏够,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常滩报,也有直接的關(guān)聯(lián)锅知。
- 參數(shù)調(diào)優(yōu)建議:每個Executor進(jìn)程的內(nèi)存設(shè)置4G8G較為合適。但是這只是一個參考值脓钾,具體的設(shè)置還是得根據(jù)不同部門的資源隊(duì)列來定售睹。可以看看自己團(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少可训,num-executors乘以executor-memory昌妹,是不能超過隊(duì)列的最大內(nèi)存量的。此外握截,如果你是跟團(tuán)隊(duì)里其他人共享這個資源隊(duì)列飞崖,那么申請的內(nèi)存量最好不要超過資源隊(duì)列最大總內(nèi)存的1/31/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源谨胞,導(dǎo)致別的同學(xué)的作業(yè)無法運(yùn)行固歪。
3.executor-cores
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的CPU core數(shù)量。這個參數(shù)決定了每個Executor進(jìn)程并行執(zhí)行task線程的能力胯努。因?yàn)槊總€CPU core同一時間只能執(zhí)行一個task線程牢裳,因此每個Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程叶沛。
- 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個較為合適贰健。同樣得根據(jù)不同部門的資源隊(duì)列來定,可以看看自己的資源隊(duì)列的最大CPU core限制是多少恬汁,再依據(jù)設(shè)置的Executor數(shù)量伶椿,來決定每個Executor進(jìn)程可以分配到幾個CPU core。同樣建議氓侧,如果是跟他人共享這個隊(duì)列脊另,那么num-executors * executor-cores不要超過隊(duì)列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運(yùn)行约巷。
4.spark.default.parallelism
- 參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認(rèn)task數(shù)量偎痛。這個參數(shù)極為重要,如果不設(shè)置可能會直接影響你的Spark作業(yè)性能独郎。
- 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個較為合適踩麦。很多同學(xué)常犯的一個錯誤就是不去設(shè)置這個參數(shù)枚赡,那么此時就會導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個HDFS block對應(yīng)一個task谓谦。通常來說贫橙,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個task),如果task數(shù)量偏少的話反粥,就會導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄卢肃。試想一下,無論你的Executor進(jìn)程有多少個才顿,內(nèi)存和CPU有多大莫湘,但是task只有1個或者10個,那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行郑气,也就是白白浪費(fèi)了資源幅垮!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適尾组,比如Executor的總CPU core數(shù)量為300個忙芒,那么設(shè)置1000個task是可以的,此時可以充分地利用Spark集群的資源演怎。
結(jié)論
結(jié)論:在SparkSQL占用資源比Hive略少的情況下,查詢同樣的SQL避乏,Hive花了115s爷耀,SparkSQL花了83秒,證明SparkSQL至少有20%性能提升
除了編寫腳本然后提交的方式拍皮,pyspark也支持在linux命令行里直接輸入 pyspark --master yarn-client 的方式歹叮,進(jìn)入pyspark 命令行,方便的調(diào)試與查詢
參考鏈接:
https://blog.csdn.net/chenjieit619/article/details/53421080
http://lxw1234.com/archives/2015/08/448.htm
http://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html
http://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html