A Scala Standalone Application
在Spark shell中運(yùn)行了一個(gè)小程序之后潦刃,你可能想要把它打包成自包含應(yīng)用,這樣就可以多次運(yùn)行了腺晾。
示例19-1. 使用Spark找出最高氣溫的Scala應(yīng)用
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
object MaxTemperature {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Max Temperature")
val sc = new SparkContext(conf)
sc.textFile(args(0))
.map(_.split("\t"))
.filter(rec => (rec(1) != "9999" && rec(2).matches("[01459]")))
.map(rec => (rec(0).toInt, rec(1).toInt))
.reduceByKey((a, b) => Math.max(a, b))
.saveAsTextFile(args(1))
}
}
運(yùn)行獨(dú)立程序時(shí),沒(méi)有shell為我們提供SparkContext夸溶,我們需要自己創(chuàng)建矾策。我們用一個(gè)SparkConf來(lái)創(chuàng)建這個(gè)實(shí)例。SparkConf可以用來(lái)向應(yīng)用中傳遞多個(gè)Spark屬性带欢,這里我們僅僅設(shè)置應(yīng)用的名字运授。
還有一些別的微小變化。首先是我們使用命令行參數(shù)來(lái)指定輸入和輸出路徑乔煞。另外還使用了方法鏈來(lái)避免為每一個(gè)RDD創(chuàng)建中間變量,這樣程序更緊湊柒室,如果需要的話渡贾,我們?nèi)匀豢梢栽赟cala IDE中查看每次轉(zhuǎn)變(transformation)的類(lèi)型信息。
并非所有的Spark定義的transformation都可用于RDD類(lèi)本身雄右。在本例中空骚,reduceByKey()(僅僅在鍵值對(duì)的RDD上起作用)實(shí)際上定義在PairRDDFunctions類(lèi)中,但我們能用下面的import來(lái)讓Scala隱含地把RDD[(Int, Int)]轉(zhuǎn)為PairRDDFunctions:
import org.apache.spark.SparkContext._
這個(gè)import不同于Spark使用的隱式轉(zhuǎn)型函數(shù)擂仍,因此理所當(dāng)然地值得包含在程序中囤屹。
這一次我們使用spark-submit來(lái)運(yùn)行這個(gè)程序,把包含編譯后的Scala程序的JAR包作為參數(shù)傳入逢渔,接著傳入命令行參數(shù)(輸入輸出路徑):
% spark-submit --class MaxTemperature --master local \
spark-examples.jar input/ncdc/micro-tab/sample.txt output
% cat output/part-*
(1950,22)
(1949,111)
我們還指定了兩個(gè)選項(xiàng):--class 告訴Spark應(yīng)用類(lèi)的名字肋坚,--master 指定job的運(yùn)行方式,local值告訴Spark在本地機(jī)器的單個(gè)JVM中運(yùn)行肃廓,在“Executors and Cluster Managers”一節(jié)我們將會(huì)學(xué)到在集群中運(yùn)行的選項(xiàng)智厌。接下來(lái),我們看看怎樣用Java語(yǔ)言來(lái)使用Spark盲赊。
A Java Example
Spark是使用Scala實(shí)現(xiàn)的铣鹏,Scala是基于JVM的語(yǔ)言,可以和Java完美集成哀蘑。同樣的例子用Java來(lái)表達(dá)诚卸,很直接葵第,也很啰嗦(使用Java 8的lambda表達(dá)式可以使這個(gè)版本更緊湊)。
示例19-2. 使用Spark找出最高氣溫的Java應(yīng)用
public class MaxTemperatureSpark {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureSpark <input path> <output path>");
System.exit(-1);
}
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext("local", "MaxTemperatureSpark", conf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String[]> records = lines.map(new Function<String, String[]>() {
@Override public String[] call(String s) {
return s.split("\t");
}
});
JavaRDD<String[]> filtered = records.filter(new Function<String[], Boolean>() {
@Override public Boolean call(String[] rec) {
return rec[1] != "9999" && rec[2].matches("[01459]");
}
});
JavaPairRDD<Integer, Integer> tuples = filtered.mapToPair(
new PairFunction<String[], Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(String[] rec) {
return new Tuple2<Integer, Integer>(
Integer.parseInt(rec[0]), Integer.parseInt(rec[1]));
}
});
JavaPairRDD<Integer, Integer> maxTemps = tuples.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return Math.max(i1, i2);
}
});
maxTemps.saveAsTextFile(args[1]);
}
}
在Spark的Java API中合溺,一個(gè)RDD由JavaRDD的實(shí)例表示卒密,在鍵值對(duì)RDD的特殊情況下是JavaPairRDD 。這兩個(gè)類(lèi)都實(shí)現(xiàn)了JavaRDDLike接口辫愉,該接口中可以找到操作RDD的大多數(shù)方法栅受。
運(yùn)行這個(gè)程序和運(yùn)行Scala版本一樣,除了類(lèi)名字是MaxTemperatureSpark 恭朗。
A Python Example
Spark也支持Python語(yǔ)言屏镊,API叫做PySpark。由于Python語(yǔ)言有l(wèi)ambda表達(dá)式痰腮,例子程序非常接近Scala的版本而芥。
示例19-3. 使用Spark找出最高氣溫的Python應(yīng)用
form pyspark import SparkContext
import re, sys
sc = SparkContext("local", "Max Temperature")
sc.textFile(sys.argv[1]) \
.map(lambda s: s.split("\t")) \
.filter(lambda rec: (rec[1] != "9999" and re.match("[01459]", rec[2]))) \
.map(lambda rec: (int(rec[0]), int(rec[1]))) \
.reduceByKey(max) \
.saveAsTextFile(sys.argv[2])
注意到在reduceByKey()的轉(zhuǎn)變中,我們可以使用Python語(yǔ)言內(nèi)建的max函數(shù)膀值。
需要留意的重點(diǎn)是棍丐,這個(gè)程序是用CPython寫(xiě)的,Spark會(huì)創(chuàng)建一個(gè)Python子進(jìn)程來(lái)執(zhí)行用戶的Python代碼(在啟動(dòng)程序launcher和在集群上運(yùn)行用戶任務(wù)的executor上)沧踏。兩個(gè)進(jìn)程間使用socket通訊來(lái)傳遞RDD分區(qū)數(shù)據(jù)歌逢。
要運(yùn)行這個(gè)程序,只需指定Python文件即可:
% spark-submit --master local \
ch19-spark/src/main/python/MaxTemperature.py \
input/ncdc/micro-tab/sample.txt output
還可以使用pyspark命令翘狱,以交互模式運(yùn)行Spark和Python秘案。