Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎。Spark是UC Berkeley AMP lab (加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室)所開(kāi)源的類Hadoop MapReduce的通用并行框架兑凿,Spark擁有Hadoop MapReduce所具有的優(yōu)點(diǎn)籍救;但不同MapReduce的是Job中間輸出結(jié)果可以保存在內(nèi)存中狼纬,從而不再需要讀寫(xiě)HDFS鞭铆,因此Spark能更好 適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法蛮位。
Spark提供了一個(gè)更快穷吮、更通用的數(shù)據(jù)處理平臺(tái)孝治。和Hadoop相比列粪,Spark可以讓你的程序在內(nèi)存中運(yùn)行時(shí)速度提升100倍,或者在磁盤(pán)上運(yùn)行時(shí)速度提升10倍谈飒。去年岂座,在100 TB Daytona GraySort比賽中,Spark戰(zhàn)勝了Hadoop杭措,它只使用了十分之一的機(jī)器费什,但運(yùn)行速度提升了3倍。Spark也已經(jīng)成為針對(duì) PB 級(jí)別數(shù)據(jù)排序的最快的開(kāi)源引擎手素。
Spark支持Scala鸳址、Java、Python泉懦、R等接口稿黍,本文均使用Python環(huán)境進(jìn)行學(xué)習(xí)。
下載
下載地址:http://spark.apache.org/downloads.html
如下圖所示崩哩,選擇最新版本的Spark巡球,Pre-built版本下載之后可以直接運(yùn)行言沐,不需要我們?cè)俅尉幾g。在不使用集群環(huán)境時(shí)酣栈,我們可以不安裝Hadoop環(huán)境直接進(jìn)行運(yùn)行险胰。選擇完成后就可以點(diǎn)擊選項(xiàng)4中的鏈接進(jìn)行下載。
安裝
1.下載的Spark直接解壓就完成了安裝矿筝,Windows 用戶如果把Spark 安裝到帶有空格的路徑下起便,可能會(huì)遇到一些問(wèn)題。所以需要把Spark 安裝到不帶空格的路徑下窖维,比如C:\spark 這樣的目錄中榆综。
2.Spark由于是使用Scala語(yǔ)言編寫(xiě),因此需要安裝對(duì)應(yīng)版本的JDK包才能正常使用铸史。JDK同樣安裝到不帶空格的路徑下奖年。
3.Spark啟動(dòng)時(shí)需要根據(jù)HADOOP_HOME找到winutils.exe,因此需要下載對(duì)應(yīng)版本的環(huán)境沛贪。
1.下載winutils的windows版本
由于我們的包是2.7版本,因此下載2.7版本的環(huán)境震贵。
2.配置環(huán)境變量
增加用戶變量HADOOP_HOME利赋,指向文件解壓的目錄,然后在系統(tǒng)變量path里增加%HADOOP_HOME%\bin 即可猩系。
4.將spark-python文件夾下的pyspark文件夾拷貝到python對(duì)應(yīng)的文件夾中媚送。或者執(zhí)行以下命令安裝:
pip install PySpark
Linux環(huán)境的配置類似即可寇甸。
測(cè)試
在conf 目錄下復(fù)制log4j.properties.template為log4j.properties 塘偎,這個(gè)文件用來(lái)管理日志設(shè)置。接下來(lái)找到下面這一行:
log4j.rootCategory=INFO, console
然后通過(guò)下面的設(shè)定降低日志級(jí)別拿霉,只顯示警告及更嚴(yán)重的信息:
log4j.rootCategory=WARN, console
在spark-bin文件夾下執(zhí)行pyspark命令就可以進(jìn)入pyspark-shell環(huán)境吟秩。也可以為Spark設(shè)置環(huán)境變量,同以上Hadoop環(huán)境的操作绽淘。
在shell中執(zhí)行下列代碼測(cè)試涵防,計(jì)算文件行數(shù):
lines = sc.textFile("E:\Documents\Desktop\s.txt")
lines.count()
測(cè)試獨(dú)立應(yīng)用連接spark,將下列代碼保存在demo.py沪铭,執(zhí)行spark-submit demo.py壮池,命令。
# coding:utf-8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
# 連接集群local杀怠,本應(yīng)用名稱為Demo
conf = SparkConf().setMaster('local').setAppName('Demo')
sc = SparkContext(conf=conf)
# 統(tǒng)計(jì)文件中包含mape的行數(shù)椰憋,并打印第一行
lines = sc.textFile("E:\Documents\Desktop\s.txt")
plines = lines.filter(lambda lines: 'mape' in lines)
print(plines.count())
print(plines.first())
sc.stop
在Spark2.0中只要?jiǎng)?chuàng)建一個(gè)SparkSession就夠了,SparkConf赔退、SparkContext和SQLContext都已經(jīng)被封裝在SparkSession當(dāng)中,因此代碼也可以寫(xiě)成如下:
from pyspark.sql import SparkSession
# 連接集群local橙依,本應(yīng)用名稱為Demo
sc = SparkSession.builder.master("local").appName(Demo").config("spark.some.config.option", "some-value").getOrCreate()
向spark提交的運(yùn)行結(jié)果如圖所示:
異常處理
在windows下進(jìn)行安裝調(diào)試Spark時(shí)出現(xiàn)了多種異常狀況,就我遇到的異常狀況進(jìn)行一下總結(jié)。
Hadoop winutils不存在異常
最開(kāi)始沒(méi)有配置winutils環(huán)境導(dǎo)致的異常票编,異常關(guān)鍵提示為:
Failed to locate the winutils binary in the hadoop binary path
F:\spark-2.2.0\bin>pyspark
Python 3.5.2 |Anaconda 4.2.0 (64-bit)| (default, Jul 5 2016, 11:41:13) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
17/10/25 16:13:59 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:387)
at org.apache.hadoop.hive.conf.HiveConf$ConfVars.findHadoopBinary(HiveConf.java:2327)
at org.apache.hadoop.hive.conf.HiveConf$ConfVars.<clinit>(HiveConf.java:365)
at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:105)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at py4j.reflection.CurrentThreadClassLoadingStrategy.classForName(CurrentThreadClassLoadingStrategy.java:40)
at py4j.reflection.ReflectionUtil.classForName(ReflectionUtil.java:51)
at py4j.reflection.TypeUtil.forName(TypeUtil.java:243)
at py4j.commands.ReflectionCommand.getUnknownMember(ReflectionCommand.java:175)
at py4j.commands.ReflectionCommand.execute(ReflectionCommand.java:87)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
17/10/25 16:14:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Hadoop winutils版本異常
最開(kāi)始下載的winutils版本是Hadoop 2.0版本褪储,不符合Spark編譯的2.7版本,因此出現(xiàn)了代碼異常慧域,更換版本后正常鲤竹。關(guān)鍵異常提示為:
Caused by: org.apache.spark.sql.AnalysisException: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V;
pyspark.sql.utils.IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"
F:\spark-2.2.0\bin>pyspark
Python 3.5.2 |Anaconda 4.2.0 (64-bit)| (default, Jul 5 2016, 11:41:13) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Traceback (most recent call last):
File "F:\spark-2.2.0\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "F:\spark-2.2.0\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.sessionState.
: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1053)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:129)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:193)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:105)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:93)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:35)
at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:289)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1050)
... 16 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode(NativeIO.java:524)
at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:478)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509)
at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:305)
at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3679)
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:597)
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:191)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:362)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:266)
at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
... 25 more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "F:\spark-2.2.0\bin\..\python\pyspark\shell.py", line 45, in <module>
spark = SparkSession.builder\
File "F:\spark-2.2.0\python\pyspark\sql\session.py", line 179, in getOrCreate
session._jsparkSession.sessionState().conf().setConfString(key, value)
File "F:\spark-2.2.0\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "F:\spark-2.2.0\python\pyspark\sql\utils.py", line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"
訪問(wèn)權(quán)限權(quán)限異常
由于windows與Linux權(quán)限的區(qū)別,在windows啟動(dòng)Spark時(shí)需要在tmp文件夾由于權(quán)限問(wèn)題昔榴,無(wú)法寫(xiě)入導(dǎo)致的異常辛藻。使用管理員權(quán)限啟動(dòng)cmd,并且提前使在對(duì)應(yīng)盤(pán)根目錄下創(chuàng)建tmp文件夾互订,問(wèn)題解決吱肌。關(guān)鍵異常提示:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: ---------
F:\spark-2.2.0\bin>pyspark
Python 3.5.2 |Anaconda 4.2.0 (64-bit)| (default, Jul 5 2016, 11:41:13) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Traceback (most recent call last):
File "F:\spark-2.2.0\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "F:\spark-2.2.0\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.sessionState.
: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1053)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:129)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: ---------;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:193)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:105)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:93)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:35)
at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:289)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1050)
... 16 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: ---------
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:191)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:362)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:266)
at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
... 25 more
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: ---------
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612)
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
... 39 more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "F:\spark-2.2.0\bin\..\python\pyspark\shell.py", line 45, in <module>
spark = SparkSession.builder\
File "F:\spark-2.2.0\python\pyspark\sql\session.py", line 179, in getOrCreate
session._jsparkSession.sessionState().conf().setConfString(key, value)
File "F:\spark-2.2.0\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "F:\spark-2.2.0\python\pyspark\sql\utils.py", line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"