spark上傳附件颓屑、加載py文件
1. 第一種方式: 在spark-submit中加載
spark-submit \
--queue xxx \
--archives ch_cut.zip#ch_cut \
--py-files label.py \
test.py
2. 第二種方式: 在py腳本中加載
sc.addFile("/user/data/py_module/normal", recursive=True) # 添加文件夾
spark加載自定義python環(huán)境
1. 打包anaconda環(huán)境
# 1. 創(chuàng)建虛擬環(huán)境
conda create -p ~/anaconda_test --copy -y -q python=2.7
# 2. 激活環(huán)境安裝包
source activate /home/xiaoming/anaconda_test
# 3. 開始打包環(huán)境
zip -p -r anaconda_test.zip anaconda_test
2. spark-submit傳入python環(huán)境路徑棕叫,腳本中配置python環(huán)境
(1)spark-submit傳入python環(huán)境路徑
PYSPARK_DRIVER_PYTHON=~/anaconda_test/bin/python \
spark-submit \
--queue xxx \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
-- conf spark.port.maxRetries=300 \
--archives anaconda_test.zip#anaconda_test \
test.py xxx xxx
(2)py腳本中配置Python環(huán)境
spark = SparkSession.builder \
.appName("test.py") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.pythonExec = spark.conf.get("spark.yarn.appMasterEnv.PYSPARK_PYTHON")
spark-submit 常用參數(shù)配置
PYSPARK_DEIVER_PYTHON=~/anaconda_test/bin/python \
spark-submit \
--queue xxx \
--name test.py \
--deploy-mode client \
--master yarn \
--driver-memory 4g \
--executor-memory 12g \
--num-executors 100 \
--executor-cores 2 \
--archives ./anaconda_test.zip#mypython \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=200 \
--conf spark.default.parallelism=9600 \
--conf spark.port.maxRetries=100 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
test.py
參數(shù)說明:
-
--queue
:設(shè)置隊(duì)列名稱仗岸; -
--name
:設(shè)置application名稱允耿; -
--deploy-name
:driver進(jìn)程部署模式借笙,一般為client或cluster,默認(rèn)為client较锡; -
--master
:集群資源管理业稼,一般設(shè)置為yarn; -
--driver-memory
:driver進(jìn)程使用的內(nèi)存蚂蕴; -
--executor-memory
:executor進(jìn)程使用的內(nèi)存低散; -
--num-executors
:設(shè)置spark作業(yè)總共要用多少個(gè)executor進(jìn)程來執(zhí)行; -
--executor-cores
:每個(gè)executor進(jìn)程使用的cpu核數(shù)骡楼; -
--archives
:需要傳送到executor的附件熔号; -
spark.yarn.appMasterEnv.PYSPARK_PYTHON=xxx
:設(shè)置使用的Python環(huán)境; -
spark.dynamicAllocation.enabled=true
:是否動(dòng)態(tài)分布資源鸟整; -
spark.default.parallelism=9600
:用于設(shè)置每個(gè)stage默認(rèn)task線程數(shù)量引镊; -
spark.port.maxRetries=100
:端口最大重試次數(shù); -
spark.storage.memoryFraction=0.5
:設(shè)置rdd持久化數(shù)據(jù)在executor內(nèi)存中占的比例吃嘿,默認(rèn)為0.6祠乃; -
spark.shuffle.memoryFraction=0.3
:設(shè)置shuffle過程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor的內(nèi)存比例兑燥;
Spark UI簡(jiǎn)介
1. Jobs頁(yè)面: 此頁(yè)面可以看到當(dāng)前spark啟動(dòng)了多少個(gè)job亮瓷;
2. Stages頁(yè)面: 此頁(yè)面可以看到每個(gè)job,啟動(dòng)了多少個(gè)stage降瞳;job是根據(jù)spark任務(wù)中有多少個(gè)action操作得到的嘱支;每個(gè)job由多個(gè)stage組成,是根據(jù)shuffle操作數(shù)得到的挣饥;
3. Storage頁(yè)面:所有代碼中cache, persist等操作可以在這里看到除师,可以看到當(dāng)前使用了多少緩存;
4. Environment頁(yè)面:此頁(yè)面展示了spark所依賴的環(huán)境扔枫,比如jdk汛聚,lib等;spark任務(wù)所設(shè)置的參數(shù)短荐;
5. Executors頁(yè)面:spark任務(wù)使用的資源匯總倚舀;
spark 查找異常數(shù)據(jù)技巧
- try...except... 將錯(cuò)誤捕獲,并在except中返回忍宋;
- 然后使用rdd.take(5)痕貌,查看錯(cuò)誤數(shù)據(jù);
def get_json(line):
try:
item = json.loads(line)
return False
except:
return line
pyspark使用C++模塊
current_dir = os.getcwd()
script_dir = os.path.split(os.path.realpath(__file__))[0] # 獲取當(dāng)前腳本路徑
os.chdir(script_dir)
lib = CDLL("./xxx.so") # 加載so文件
os.chdir(current_dir)
yarn殺死spark任務(wù)
yarn application -kill application_1428487296152_25597
spark文件壓縮存儲(chǔ)
1. bzip2壓縮格式: org.apache.hadoop.io.compress.BZip2Codec
糠排;特點(diǎn):壓縮率最高舵稠,壓縮解壓速度慢,支持split;
2. snappy壓縮格式: org.apache.hadoop.io.compress.SnappyCodec
哺徊;特點(diǎn):json文本壓縮率38.25%室琢,壓縮和解壓時(shí)間短;
3. gzip壓縮格式: org.apache.hadoop.io.compress.GzipCodec
唉工; 特點(diǎn):壓縮率高研乒,壓縮和解壓速度快,不支持split淋硝;json文本壓縮率23.5%雹熬,適合使用率低,長(zhǎng)期存儲(chǔ)的文件谣膳;
sc.textFile讀取多個(gè)目錄
第一種方式:
sc.textFile("xxx,xxx,xxxx")
第二種方式:
from datetime import datetime
all_rdd = sc.parallelize([])
start_day = datetime.strptime("20200801", "%Y%m%d") # 開始時(shí)間
ndays = 10 # 往前多少天
for i in range(ndays):
now_day = start_day + datetime.timedelat(days=-i)
now_day_str = now_day.strftime("%Y%m%d")
input_path = os.path.join("/home/test", now_day_str)
rdd = sc.textFile(input_path)
all_rdd = all_rdd.union(rdd)
參考資料
- Spark Configuration - 官方文檔參數(shù)說明
http://spark.apache.org/docs/latest/configuration.html - Spark性能優(yōu)化指南——基礎(chǔ)篇
- Spark性能優(yōu)化指南——高級(jí)篇
- [看圖說話] 基于Spark UI性能優(yōu)化與調(diào)試——初級(jí)篇
https://cloud.tencent.com/developer/article/1021744 - spark & 文件壓縮
https://blog.csdn.net/lsshlsw/article/details/51992569