首先準(zhǔn)備好hadoop和spark以及scala的環(huán)境
主節(jié)點(diǎn)如下
分節(jié)點(diǎn)如下
然后完成以及idea的安裝以及idea上scala的插件安裝窑多,我們就可以開始編程了定硝。
有以下兩點(diǎn)需要注意的:
1.scala和spark的版本场刑,最好按照推薦安裝,我是用的spark-2.20,之前用scala-2.12.* 出現(xiàn)巨多問題喷舀,例如運(yùn)行任務(wù)時(shí)報(bào)錯(cuò):
java.lang.NoSuchMethodError:scala.Predef$.ArrowAssoc(....)
按照官網(wǎng)上的推薦spark-2.20就最好是用scala-2.11.*砍濒。
2.如果虛擬機(jī)上編程太慢的話,其實(shí)更建議直接在本地用idea編程打包后硫麻,把jar包傳輸?shù)教摂M機(jī)
一爸邢、WordCount
(著重以WordCount編程為重點(diǎn)進(jìn)行練習(xí),后面的例子若有重復(fù)的步驟就簡(jiǎn)單略過)
1.打開idea拿愧,創(chuàng)建scala工程
其中杠河,JDK和Scala SDK就是java和scala的路徑
2.在src文件夾下創(chuàng)建兩個(gè)子目錄,一個(gè)cluster用于跑spark浇辜,另外一個(gè)local用于idea上調(diào)試券敌。(其中out目錄和META-INF創(chuàng)建jar包后自動(dòng)生成的,開始并沒有)然后在兩個(gè)個(gè)文件夾下分別創(chuàng)建scala.class
3.然后要想進(jìn)行spark編程柳洋,我們就得導(dǎo)入spark的相關(guān)包
File → Projecte Structure → Libraries → “+”→ Java → *選擇spark目錄下的jars文件夾*
ps:其實(shí)我們的編程暫時(shí)用不到這個(gè)目錄下的所有包陪白,可以只導(dǎo)入需要的,但就需要花時(shí)間去找膳灶;也可以全部導(dǎo)入咱士,但是整個(gè)工程就會(huì)變得臃腫然后點(diǎn)OK再點(diǎn)OK,回到界面轧钓,我們的相關(guān)包就導(dǎo)入完成了
4.接下來就是正式的編程序厉,我們先上WordCount的代碼
//指在cluster這個(gè)目錄下
package cluster
//導(dǎo)入了spark的SparkConf, SparkContext兩個(gè)類
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: ")
System.exit(1)
}
//實(shí)例化configuration,用于編輯我們?nèi)蝿?wù)的相關(guān)信息毕箍,后面的setAppName可以在控制臺(tái)運(yùn)行的時(shí)候再設(shè)置
val conf = new SparkConf().setAppName("MySparkApp")
// sc是Spark Context弛房,指的是“上下文”,也就是我們運(yùn)行的環(huán)境而柑,需要把conf當(dāng)參數(shù)傳進(jìn)去文捶;
val sc = new SparkContext(conf)
//通過sc獲取一個(gè)(hdfs上的)文本文件,args(0)就是我們控制臺(tái)上傳入的參數(shù)媒咳,local運(yùn)行的話就是傳入本地一個(gè)文本的path
val line = sc.textFile(args(0))
//下面就是wordcount具體的執(zhí)行代碼
line.flatMap(_.split("")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
sc.stop()
}
}
這就是WordCount的代碼粹排,用于在Spark平臺(tái)上跑。如果需要在idea上測(cè)試的話涩澡,就可以把a(bǔ)rgs(0)具體改為一個(gè)文本文件的path顽耳。比如在工程的目錄下創(chuàng)建data文件夾,把test.txt扔上去妙同,args(0)就可以修改為"data/test.txt"射富;然后把sc設(shè)置為
val spark=new SparkContext("local","SparkPI")
這樣的本地模式,就可以直接在idea上run粥帚。
5.打包成jar文件
File → Projecte Structure → Artifacts → “+” → JAR → From modules with dependencies... ...(這個(gè)選項(xiàng)的意思是把我們引入的所有外部包都封裝起來胰耗,Empty就是不算上我們引入的spark包)
然后Main Class就選擇我們的cluster,local是我們用于本地測(cè)試的芒涡,并且Main Class的名字一定要記住柴灯,后面spark上運(yùn)行是要使用的卖漫。然后點(diǎn)ok就可以創(chuàng)建相關(guān)文件。如果之前就創(chuàng)建了的話弛槐,需要把之前的相關(guān)信息懊亡,也就是工程下的META-INF文件夾刪除才可以成功創(chuàng)建。
回到主界面乎串,然后Build → BuildArtifacts 就可以自行創(chuàng)建jar包了店枣。
6.idea會(huì)把創(chuàng)建的jar包放進(jìn)工程下的out文件夾,我們把它找到叹誉,為了方便鸯两,把jar包放進(jìn)spark目錄下,然后打開长豁,進(jìn)入META-INF文件夾钧唐,把后綴名為.DSA .SF . RSA的文件刪除,
因?yàn)椴橘Y料說某些包的簽名有問題匠襟,會(huì)導(dǎo)致我們?cè)谶\(yùn)行時(shí)找不到主類钝侠,事實(shí)上也確實(shí)是....
7.我們進(jìn)入spark目錄,通過bin文件夾下的spark-submit來提交任務(wù)酸舍,執(zhí)行命令./bin/spark-submit -h來獲得幫助文檔帅韧,我就拿挑幾個(gè)常用的:
8.
基本格式:
Usage: spark-submit [options] [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]
選項(xiàng):
--master MASTER_URL ????????spark://host:port, mesos://host:port, yarn, or local.
--class CLASS_NAME ?????????Your application's main class (for Java / Scala apps).
--deploy-mode DEPLOY_MODE ??Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).
--driver-memory MEM ????????Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--executor-memory MEM ??????Memory per executor (e.g. 1000M, 2G) (Default: 1G).
我們就按上面所說執(zhí)行如下命令
./bin/spark-submit --master yarn-clien --class cluster.WordCount WordCount.jar /tmp/text.txt
--master 就是我們master的url
--class 就是我們打包jar的主類名稱
WordCount.jar 就是jar包名
/tmp/text.txt 是我事先放在hdfs上的測(cè)試文本文件,也就是我們編程中的args(0)參數(shù)
測(cè)試文本text.txt 內(nèi)容如下
執(zhí)行之后我們就可以等他完成啃勉,看到如下的結(jié)果:
對(duì)比下沒有問題忽舟,中間缺失字母的地方應(yīng)該是空格和換行符。
WordCount執(zhí)行完畢淮阐。
二叮阅、Pi
重復(fù)步驟就不再多余贅述了
1.先上cluster上的代碼
package // avoid overflow
import scala.math.random
import org.apache.spark._
object Pi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid ???overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
args(0)也是我們之后運(yùn)行時(shí)要添加的參數(shù)。如果是在本地模式下泣特,依舊是把sc設(shè)置成本地模式浩姥。
val spark=new SparkContext("local","SparkPI")
如果要設(shè)置幾個(gè)核跑的話就這樣"local[*]",*為你人為設(shè)定的個(gè)數(shù)群扶。其實(shí)也可以在控制臺(tái)上設(shè)置及刻。
2.打成jar包,刪除包下相關(guān)文件竞阐,再mv到spark目錄下
3.啟動(dòng)hadoop,spark后暑劝,照例用spark-submit提交任務(wù)
這個(gè)地方我沒設(shè)置參數(shù)是因?yàn)榇a中自帶了判斷骆莹,沒有參數(shù)這一情況下,他就自己設(shè)置為了2担猛。
if (args.length > 0) args(0).toInt else 2
然后等待結(jié)果
在一堆info中查找結(jié)果還是挺費(fèi)眼睛的幕垦,所以我們可以在代碼中對(duì)打印結(jié)果這個(gè)步驟稍加修改丢氢,增添兩行*號(hào):
再打包運(yùn)行一遍,找起來就應(yīng)該方便多了先改。
三疚察、K-Means
這個(gè)例子我們就嘗試用本地(local)模式來運(yùn)行
1.首先我們得在idea上準(zhǔn)備好測(cè)試的文本和輸出的路徑
我們?cè)诠こ滔聞?chuàng)建data目錄,把測(cè)試文本扔進(jìn)去仇奶,內(nèi)容如下
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
然后再創(chuàng)建一個(gè)result文件夾用于存放結(jié)果貌嫡,準(zhǔn)備工作做好
2.準(zhǔn)備好代碼,如下:
package local
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object K_Means {
def main(args: Array[String]) {
//初始化sc
val sc = new SparkContext("local", "Kmeans")
//讀入數(shù)據(jù)
val rdd = sc.textFile("data/Kmeans_data.txt")
//rdd轉(zhuǎn)化该溯,轉(zhuǎn)化成對(duì)應(yīng)的RDD
val data = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
//最大迭代次數(shù)
val numIteration = 20
//聚類個(gè)數(shù)
val numClass = 5
//構(gòu)建模型
val model = KMeans.train(data, numClass, numIteration)
//輸出聚類中心
println("Cluster centers:")
for (c <- model.clusterCenters) {
println(" ?" + c.toString)
}
//使用誤差平方之和來評(píng)估數(shù)據(jù)模型
val cost = model.computeCost(data)
println("Within Set Sum of Squared Errors = " + cost)
//使用模型測(cè)試單點(diǎn)數(shù)據(jù)
println("Vectors 0.2 0.2 0.2 is belongs to clusters:" + model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
println("Vectors 0.25 0.25 0.25 is belongs to clusters:" + model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
println("Vectors 8 8 8 is belongs to clusters:" + model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
//交叉評(píng)估1岛抄,只返回結(jié)果
val testdata = rdd.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
val result1 = model.predict(testdata)
result1.saveAsTextFile("result/kmeanout1/")
//交叉評(píng)估2,返回?cái)?shù)據(jù)集和結(jié)果
val result2 = rdd.map {
line =>
val linevectore = Vectors.dense(line.split(" ").map(_.toDouble))
val prediction = model.predict(linevectore)
line + " " + prediction
}.saveAsTextFile("result/kmeanout2/")
sc.stop()
}
}
我們可以看到代碼中運(yùn)用到了mllib庫(kù)狈茉,專門用于機(jī)器學(xué)習(xí)的算法夫椭。然而我們本地運(yùn)行的重點(diǎn)是如下幾個(gè)地方:
一是我們的sc不再由conf做參數(shù),而是直接運(yùn)用local的本地模式
二是我們讀取文件的地方不再由控制臺(tái)提供氯庆,而是直接由代碼提供蹭秋。路徑為我們創(chuàng)建工程時(shí)就準(zhǔn)備好的測(cè)試數(shù)據(jù),把它轉(zhuǎn)化為rdd然后才能后續(xù)操作堤撵。
三是我們輸出的結(jié)果由saveAsTextFile方法存入本地指定的目標(biāo)文件夾result仁讨,而沒有像之前打印在控制臺(tái)上(其實(shí)也可以打印的)
3.然后我們就開始運(yùn)行Run → Run... →K_Means(主類名稱)
我們可以通過idea的控制臺(tái)看到各種info信息,和我們?cè)趕park上跑的信息一樣粒督。甚至如圖還給了你端口號(hào)陪竿,說明雖然是本地運(yùn)行,但還是啟用了spark平臺(tái)屠橄,也說明我們運(yùn)行是成功的族跛。
4.查看我們運(yùn)行的結(jié)果,也就是我們是之前設(shè)定的輸出路徑
可以看到我們之前什么都沒有的result文件夾下多了兩個(gè)子文件夾(說明在輸出時(shí)不存在的文件夾它會(huì)自行創(chuàng)建)锐墙,里面就包含了我們Kmeans的結(jié)果礁哄,分別打開兩個(gè)文件夾的part-00000
我們得到了我們想要的結(jié)果,Kmeans的本地模式運(yùn)行也就成功結(jié)束了溪北。
如果要轉(zhuǎn)入到spark上運(yùn)行桐绒,也就像之前的,更改sc之拨,然后修改數(shù)據(jù)來源和輸出路徑為控制臺(tái)輸入的參數(shù)就行了茉继。
至此結(jié)束,有任何問題歡迎指出(#^.^#)