概要
目前Spark官方提供Java,Scala,Python三種語言的API奔滑。因?yàn)镾park是用Scala開發(fā)宗雇,而Scala本身是基于JVM的語言,所以Scala和Java的API完整穩(wěn)定稽莉;Python相對(duì)不太完整处硬,有些新特性剛開始不支持,但Python語言簡單明了蛙紫,且省去了編譯打包拍屑,用起稍微能方便一些。
本文件將介紹在IDEA+Maven環(huán)境下使用Java和Scala的開發(fā)Spark和SparkStreaming程序的過程坑傅。包括開發(fā)工具安裝僵驰、配置、scala&java混合項(xiàng)目創(chuàng)建唁毒、樣例代碼開發(fā)硝拧、運(yùn)行麸恍、打包。詳細(xì)API介紹不在本文范圍,請(qǐng)查閱官方文檔暂题。
參考資料
- 官方文檔:http://spark.apache.org/docs/ :英文友鼻,所有版本的都在這里面忧设。最權(quán)威也比較全面。
- 漢化的官方文檔 http://ifeve.com/spark/ :v1.6官方文檔漢化版崔泵。1.3.0到1.6之間API之間變化不大,可以參考猪瞬。
開發(fā)工具&&環(huán)境
本小節(jié)介紹IDEA憎瘸、scala插件的安裝,如果您已經(jīng)安裝好了IDEA & Scala插件陈瘦,請(qǐng)直接跳過這一節(jié)幌甘。
安裝IDEA
https://www.jetbrains.com/idea/選擇社區(qū)版即可,免費(fèi)的痊项!
IDEA自帶maven锅风,所以不用再單獨(dú)下載安裝了:),也可以不用IDEA自帶的鞍泉,安裝完成后在在“setting”->"maven"中設(shè)置一下即可
注意:
JDK是必須的啦皱埠,而且是版本要1.8+哦
maven構(gòu)建時(shí)會(huì)根據(jù)pom.xml中的配置從網(wǎng)絡(luò)倉庫中下載依賴包,所以要聯(lián)網(wǎng)咖驮,網(wǎng)速要好_
安裝Scala插件
安裝完成后打開IDEA,選擇"configure"-> "Plugins"

搜索"scala"沒有結(jié)果边器,點(diǎn)擊"Search in repositories"

在搜索結(jié)果中選擇"scala" 選擇“install”安裝完成后需要重啟

最新社區(qū)版的IDEA在安裝完成后的初始界面就已經(jīng)提供了"scala插件"的安裝選擇,直接選擇即可
scala插件是在用scala開發(fā)Spark程序時(shí)所需要托修,如果只是用java開發(fā)忘巧,可以不用安裝,考慮到有時(shí)候會(huì)看scala代碼睦刃,有這個(gè)插件還是方便很多
Spark WordCount
本小節(jié)通過用Scala和Java8分別實(shí)現(xiàn)一個(gè)簡單的統(tǒng)計(jì)單詞個(gè)數(shù)的程序?yàn)槔庾欤来谓榻B工程創(chuàng)建、編碼眯勾、測試運(yùn)行枣宫、打包的完整過程婆誓。
創(chuàng)建工程
-
新建工程
"create new project" -> "maven" 吃环,如下圖
填寫相關(guān)信息
創(chuàng)建scala代碼目錄
IDEA的maven工程會(huì)默認(rèn)創(chuàng)建java代碼的目錄,scala代碼目錄需要手工創(chuàng)建洋幻,在"main"目錄下新建“scala”目錄 郁轻,如下圖

- 將main/scala添加至源代碼目錄
"File"->"project structure"(快捷鍵:ALT+CTRL+SHIFT+S) ->"Modules"->" main/scala" 右鍵單擊,選擇 "Sources" 添加至源代碼目錄如下圖
聲明依賴
使用maven的好處在于只需要在pom.xml聲明依賴文留,后續(xù)工作maven會(huì)自動(dòng)處理好唯,而不需要我們手工下載每個(gè)依賴包添加到classpath中,此項(xiàng)目中我們需要在pom.xml中聲明scala庫燥翅、scala編譯插件及spark的依賴骑篙,在pom.xml中<project>
標(biāo)簽中添加以下內(nèi)容
<dependencies>
<!--scala項(xiàng)目需要-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.10.4</version>
<scope>compile</scope>
</dependency>
<!--spark程序依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--scala項(xiàng)目需要-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<!--[maven lifecycle](http://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html)-->
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意:開發(fā)依賴的spark版本和scala的版本要兼容,<artifactId>spark-core_${scala.binary.version}</artifactId>森书,${scala.binary.version}要和spark編譯的scala版本一致靶端,官方發(fā)布二進(jìn)制的都是2.10
代碼開發(fā)
樣例代碼以統(tǒng)計(jì)文件中的單詞個(gè)數(shù)為例谎势,單詞間以空格分開,計(jì)數(shù)不區(qū)分大小寫杨名。
待統(tǒng)計(jì)文件內(nèi)容很簡單如下x.txt脏榆,只有三行:
Java vs Scala
java8 is good
scala is better
scala 版代碼
在scala目錄上右鍵單擊,選擇“NEW”->"Scala Class" 如下圖:

然后在彈出的對(duì)話框中輸入類名"WordCount"台谍,選擇“Kind”為"object" 默認(rèn)為class
//WordCount.scala
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WorkCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//從文件中創(chuàng)建RDD
val rdd = sc.textFile("e:/x.txt")
//文件中的單詞用空格區(qū)分
rdd.flatMap(_.split("\\s+"))
.map(w => (w, 1))
.reduceByKey(_+_)
.foreach(println)
}
}
java 版代碼
功能和scala一樣须喂,用JAVA8實(shí)現(xiàn)代碼也簡潔了很多,限了需要聲明類型外趁蕊,幾乎和scala一樣坞生。
右鍵單擊“main/java”新建Java類 JWordCount
//JWordCount.java
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("JWordCount");
conf.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd = sc.textFile("e:/x.txt");
rdd.flatMap(line-> Arrays.asList(line.split("\\s+")))
.mapToPair(w -> new Tuple2<String,Integer>(w.toLowerCase(),1))
.reduceByKey((a,b)->a+b)
.foreach(w-> System.out.println(w));
}
Scala和Java程序運(yùn)行方式一樣,在左側(cè)"project"窗口(或在編輯窗口)中右鍵單擊文件掷伙,選擇“Run 'WordCount'”(快捷鍵:Ctrl+Shift+F10)恨胚,可以分別運(yùn)行一下,以下是scala版運(yùn)行的結(jié)果:

src/main/java和src/main/scala下分別放java和scala代碼炎咖≡吲荩可以用java實(shí)現(xiàn)基礎(chǔ)代碼,在scala中調(diào)用乘盼。
一般開發(fā)時(shí)master不會(huì)寫死升熊,會(huì)由參數(shù)傳進(jìn)去,這種方式可以邊開發(fā)邊測試绸栅,不需要部署一套spark集群级野,提高開發(fā)效率。
在IDEA中可以通過"Run"->"Editor Configurations"->"添加 or 選擇 主類"-> "program arguments" 來添加程序參數(shù)粹胯,以方便測試蓖柔,如果開發(fā)環(huán)境可以直接連接spark集群,可以直接傳入master的地址风纠,提交至集群中運(yùn)行况鸣。
打包
程序開發(fā)完畢不管最終是以spark on yarn 還是spark standalone方式運(yùn)行,都需要首先要將開發(fā)的程序以及依賴打成jar包竹观,然后一般會(huì)通過spark-submit
這個(gè)腳本來提交至集群中運(yùn)行镐捧。在IDEA+Maven的環(huán)境下可以用maven來打包,也可以用IDEA來打包臭增,各有各的優(yōu)點(diǎn)懂酱,maven功能強(qiáng)大靈活,可以實(shí)現(xiàn)一些復(fù)雜的流程和功能誊抛,且可以脫離IDEA運(yùn)行在命令行中列牺,可以和其它自動(dòng)化工具方便集成,但強(qiáng)大功能配置起來比較麻煩拗窃。IDEA自身的打包相對(duì)簡單瞎领,對(duì)日常開發(fā)足夠用了蔓榄。
不管是用java還是scala開發(fā)的spark程序,提交到集群時(shí)默刚,spark本身及其依賴是不需要打包到程序中的甥郑,也就是說要打入程序包中的是除spark以及其依賴之外的包是需要打入程序包中的。像本文的例子程序只依賴scala和spark本身荤西,spark依賴scala澜搅,所以只需要打包開發(fā)的程序即可,不需要打入其它依賴包邪锌,用IDEA或Maven打包都很方便勉躺,下面分別介紹兩種打包方式。
maven
打開"maven project": "view"->"tool windows"->"maven project"

在"maven project" 中選擇 "lifecycle"->package 右單擊選擇"Run Maven Build" 運(yùn)行結(jié)束后觅丰,工程中的scala的java都會(huì)被編碼打包饵溅。工程目錄下的target/下會(huì)有jar包生成,如下圖。

上面其實(shí)是在執(zhí)行maven的命令
mvn package
妇萄,如果你本地安裝好了maven,可以直接在命令行下到pom.xml所在的目錄中執(zhí)行各種maven命令蜕企,而不用每次都要在IDEA中執(zhí)行。maven更多內(nèi)容和各種NB的插件可以問狗哥或度娘冠句!
IDEA打包
IDEA要稍顯復(fù)雜轻掩,要多點(diǎn)幾次鼠標(biāo),需要先創(chuàng)建一個(gè)artifacts然后在每次需要打包時(shí)在build artifacts中選擇這個(gè)artifacts執(zhí)行即可懦底。
- 創(chuàng)建一個(gè)artifacts
"File"->"project structure"(快捷鍵:ALT+CTRL+SHIFT+S) ->"artifacts"->選擇"+" ->"jar"->"empty"
在"Name"中填入jar名稱 唇牧,"Output directory"為輸入jar包的路徑,然后在"available elements"中右單擊'helloss compile output'選擇'pack into Output Root'聚唐,點(diǎn)'OK'保存丐重,如下圖
'helloss compile output'只是你當(dāng)前工程中源碼編譯出來的class。如果要打入其它依賴包杆查,也在此選擇加入即可

-
打包:
“build”->"build artifacts"->"helloss"(你起的名字)->"build"即可
完整代碼見:https://github.com/longforfreedom/hellospark
生產(chǎn)環(huán)境中要提交到集群中運(yùn)行時(shí)一般會(huì)用spark-submit來提交運(yùn)行扮惦,類似以下語句:spark-submit --master yarn-client --num-executors 10 --executor-memory 20g --executor-cores 10 --class "WordCount" helloss-1.0-SNAPSHOT.jar
集群部署方式不一樣 --master 后面的參數(shù)不一樣,部分參數(shù)也會(huì)有一些不同根灯, 更多信息可以參考:http://spark.apache.org/docs/1.6.2/submitting-applications.html 径缅,程序的部署和運(yùn)行監(jiān)控后續(xù)會(huì)有單獨(dú)進(jìn)行介紹掺栅。
一般情況開發(fā)的spark程序不會(huì)以local方式正式運(yùn)行烙肺,但能以這樣方式運(yùn)行對(duì)于開發(fā)、測試非常方便氧卧,需要注意的是有些情況local方式運(yùn)行正確桃笙,但在集群中不一定能正確運(yùn)行。因?yàn)橐詌ocal方式運(yùn)行時(shí)Spark的所有角色(Driver,Master,Worker,Executor)f是在本地的同一個(gè)JVM中沙绝,以多個(gè)線程運(yùn)行搏明,具體的任務(wù)執(zhí)行是一個(gè)或多個(gè)線程鼠锈,而集群中運(yùn)行時(shí)是不同機(jī)器不同的JVM中運(yùn)行,需要注意并發(fā)問題星著。
以上介紹完了IDEA+Maven環(huán)境下用scala和java各開發(fā)了一個(gè)簡單單詞計(jì)數(shù)Spark程序的完整過程购笆,包括開發(fā)環(huán)境搭建,工程創(chuàng)建虚循,代碼開發(fā)同欠,以及測試運(yùn)行,打包横缔。在這個(gè)程序中數(shù)據(jù)源來自文件铺遂,程序運(yùn)行時(shí)需要處理的數(shù)據(jù)已確定,數(shù)據(jù)處理完畢茎刚,程序結(jié)束襟锐。但是如果數(shù)據(jù)是動(dòng)態(tài)的,源源不斷的膛锭,比如來自socket或消息隊(duì)列中時(shí)粮坞,要簡單及時(shí)的處理這些數(shù)據(jù)時(shí)就需要引入流處理了,下面介紹用spark streaming從kafka中統(tǒng)計(jì)單詞個(gè)數(shù)的示例程序初狰。
SparkStreaming && Kafka WordCount
運(yùn)行環(huán)境
如果您不是很了解kafka或著手頭沒有可以使用的kafka集群捞蚂,可以用以下方式快速搭建一個(gè)測試環(huán)境。
本文環(huán)境為windows+VMware(Centos)跷究,kafka是在vmware下的centos中運(yùn)行,centos的hostname:vm-centos-00姓迅,IP:192.168.99.130
kafka也可以直接在windows中運(yùn)行,運(yùn)行bin/windows下的啟動(dòng)腳本即可俊马。
注意:如果kafka和消費(fèi)者和服務(wù)器以及zookeeper沒有在同一臺(tái)機(jī)器上時(shí)丁存,需要將kafka server和zookeeper的hostname加到消費(fèi)者機(jī)器的hosts文件中。比如本文中柴我,需要在windows的C:\WINDOWS\System32\drivers\etc\hosts文件中添加一條記錄192.168.99.130 vm-centos-00
否則消費(fèi)時(shí)會(huì)出錯(cuò)
- 下載kafka:
wget http://mirror.bit.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
- 解壓:
tar zxvf kafka_2.10-0.9.0.0.tgz
- 啟動(dòng)kafka:
cd kafka_2.10-0.9.0.0
## 啟動(dòng)zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
## 新開一個(gè)終端窗口解寝,啟動(dòng)kafka
bin/kafka-server-start.sh config/server.properties
- 創(chuàng)建topic:
bin/kafka-topics.sh --create --zookeeper vm-centos-00:2181 --replication-factor 1 --partitions 3 --topic helloss
- 啟動(dòng)生產(chǎn)者
bin/kafka-console-producer.sh --broker-list vm-centos-00:9092 --topic helloss
- 啟動(dòng)消費(fèi)者
bin/kafka-console-consumer.sh --zookeeper vm-centos-00:2181 --from-beginning --topic helloss
可以在生產(chǎn)者窗戶中輸入消息,在消費(fèi)者窗口中查看艘儒。測試無誤后可以進(jìn)入下一步
創(chuàng)建工程
這一步和spark程序一樣聋伦,為方便起間,本文直接在之前spark程序工程中添加代碼
添加依賴
需要添加scala庫界睁、scala編譯插件是必須的觉增。SparkStreaming和以及與Kafka的集成依賴包也需要引入,在前面spark程序的項(xiàng)目基礎(chǔ)上翻斟,在pom.xml中添加以下內(nèi)容
由于spark streaming依賴spark core逾礁,所以在pom.xml中添加spark streaming后,可以不用顯式聲明spark core的依賴访惜,spark core的依賴會(huì)自動(dòng)加進(jìn)來
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!--與kafka集成時(shí)需要-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>
代碼開發(fā)
在scala目錄上右鍵單擊嘹履,選擇“NEW”->"Scala Class",新增一個(gè)名為SSWordCount的object腻扇。代碼如下
scala
//SSWordCount.scala
object SSWordCount {
def main(args: Array[String]): Unit = {
//方便起間,程序中寫死以local方式運(yùn)行
val sparkConf = new SparkConf().setAppName("SSWordCount").setMaster("local[2]")
//每10秒鐘統(tǒng)計(jì)一次接收到單詞數(shù)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicMap = Map("helloss"-> 1)
val messages = KafkaUtils.createStream(ssc,"vm-centos-00:2181","ss-group",topicMap)
val r = messages.map(_._2).flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_)
//只打印10記錄砾嫉,實(shí)際中一般會(huì)保存到HDFS幼苛,Redis,Kafka中
//spark streaming需要一個(gè)Output Operations來觸發(fā)執(zhí)行,否則再多的Transformations也不會(huì)執(zhí)行
r.print(10)
//啟動(dòng)Streaming程序
ssc.start()
ssc.awaitTermination()
}
}
//JSSWordCount.java
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("JSSWordCount");
conf.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("helloss",1);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "vm-centos-00:2181","ss-group",topicMap);
JavaPairDStream<String, Integer> r = messages.map(x -> x._2())
.flatMap(line -> Arrays.asList(line.split("\\s+")))
.mapToPair(w -> new Tuple2<String, Integer>(w.toLowerCase(), 1))
.reduceByKey((a, b) -> a + b);
r.print(10);
jssc.start();
jssc.awaitTermination();
}
完整代碼見:https://github.com/longforfreedom/hellospark
運(yùn)行程序后焕刮,在前面打開的消費(fèi)者窗口中輸入消息

在IDEA中觀察輸出情況蚓峦,可以看到類似如下輸出:

可以通過Spark Web UIhttp://localhost:4040/來監(jiān)控流處理程序運(yùn)行情況,比如延遲多少批次济锄,已處理完成多少個(gè)批次等等,如下圖所示

打包暑椰、部署運(yùn)行和spark程序沒有區(qū)別,但需要注意的是spark程序處理結(jié)束后會(huì)自動(dòng)退出荐绝,釋放資源一汽。而spark streaming處理的是連續(xù)不斷的數(shù)據(jù),程序不會(huì)退出低滩,即使kafka中沒有數(shù)據(jù)也不會(huì)釋放資源召夹,更不會(huì)退出,真到人為結(jié)束(出錯(cuò)了當(dāng)然就結(jié)束了:( )
結(jié)束
本文只是簡單的介紹了開發(fā)工具安裝恕沫、配置监憎,并通過兩個(gè)簡單的例子介紹了IDEA+Maven環(huán)境下使用Java8和Scala的開發(fā)spark和spark streaming程序的過程。Spark婶溯、Spark Streaming以及Kafka涉及很多知識(shí)點(diǎn)鲸阔,詳細(xì)的部署方式以及參數(shù)設(shè)置,運(yùn)行監(jiān)控等后續(xù)會(huì)慢慢整理介紹迄委。
后續(xù)有更新會(huì)在github先更新