1. 簡要說明
基于spark 2.3.1版本學習spark基礎(chǔ)知識及整體框架。本文首先以python版為主進行描述盯桦,后期會主要針對scala版本進行詳細講解乏奥。
2. spark學習環(huán)境搭建
- spark安裝包下載地址
http://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/
作者使用的為spark-2.3.1版本為例進行測試與學習嬉挡。(之所以不選擇最新版本丰泊,大家都懂的,最新版本不穩(wěn)定隙轻,會有很多坑要踩埠帕,索性選擇相對穩(wěn)定的版本)
- 環(huán)境設(shè)置
1. 如想設(shè)置為全局環(huán)境變量,則可配置到bashrc_profile中
2. 僅為開發(fā)調(diào)試玖绿,直接進入到下載安裝包spark-2.3.1-bin-hadoop2.7/bin下指定相關(guān)操作的命令即可敛瓷。
- 啟動spark
啟動python版本spark 客戶端命令(./pyspark)
Python 2.7.10 (default, Aug 17 2018, 19:45:58)
[GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
2019-02-11 17:57:49 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Python version 2.7.10 (default, Aug 17 2018 19:45:58)
SparkSession available as 'spark'.
>>>
到此為止spark學習調(diào)試的環(huán)境基本搭建完成。
3. 核心概念介紹
首先斑匪,
每個spark應用都由一個驅(qū)動器程序發(fā)起集群上的各種并行操作呐籽。shell終端本身即為實際的驅(qū)動器程序。shell啟動時自動創(chuàng)建了一個SparkContext對象蚀瘸,其變量叫sc狡蝶,所以以下的操作都可以基于sc做操作。-
其次贮勃,驅(qū)動器一般管理多個執(zhí)行器(executor)節(jié)點贪惹。即在集群模式下執(zhí)行action操作時,不同的節(jié)點會統(tǒng)計不同部分的數(shù)據(jù)(計算結(jié)果)寂嘉。由于我們在本地模式下執(zhí)行操作馍乙,所以所有的執(zhí)行任務(wù)都會在單節(jié)點上運行。
- 最后垫释,可通過向spark API傳遞函數(shù),亦可操作相應的集群上撑瞧。需要對lambda操作熟悉棵譬。如:
>>> lines = sc.textFile("README.md")
>>> lines.filter(lambda line: "Python" in line)
PythonRDD[4] at RDD at PythonRDD.scala:49
>>> lines.filter(lambda line: "Python" in line).count()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Library/Python/2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError
報錯原因為:spark默認是從hdfs上都文件的,想要讀取本地文件需要增加file://前綴预伺。即:
lambda形式:
lines = sc.textFile("file:///spark-2.3.1-bin-hadoop2.7/README.md")
pyline = lines.filter(lambda line: "Scala" in line)
pyline.count()
函數(shù)形式:
def hasScala(line):
return "Scala" in line
pythonLines = lines.filter(hasPython)
- 獨立應用之運行方式
Java和Scala中订咸,只需要添加Maven依賴,編輯器會自動下載依賴的包酬诀。但 python程序運行需要使用spark自帶的spark-submit腳本來運行脏嚷。(腳本中已經(jīng)幫我們引入了python程序的spark依賴)
例如:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import logging
from pyspark import SparkConf, SparkContext
logging.basicConfig(level=logging.ERROR)
conf = SparkConf().setMaster("local").setAppName("myapp")
sc = SparkContext(conf=conf)
contents = sc.textFile("file://absfilepath")
res = contents.filter(lambda line: "Python" in line)
print "*" * 10,res.count()
sc.stop()
運行方式如:
spark-submit test.py
運行spark-submit時會出現(xiàn)很繁瑣不易識別的INFO信息,如何過濾掉INFO信息呢瞒御?
注意:將rootCategory等級修改為WARN或者ERROR即可父叙。
方法如下:
修改日志過濾等級:【conf/log4j.properties】
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n