前幾天寫完了MapReduce的小例子之后身腻,今天再來學(xué)習(xí)Spark的例子代碼就通透了。
MapReduce分為Map和Reduce部分露该,而Spark實(shí)際在上邊寫代碼方面就簡(jiǎn)單一些宪肖,實(shí)際上就是RDD的處理了永淌,那么RDD是啥膨处?
Spark的核心數(shù)據(jù)模型是RDD, Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類(RDD是抽象類见秤,具體操作由各子類實(shí)現(xiàn),如MappedRDD真椿、Shuffled RDD)鹃答。
說人話就是Spark對(duì)數(shù)據(jù)的操作都是通過RDD來進(jìn)行的,例如讀取文件突硝,文件處理测摔,統(tǒng)計(jì)文字個(gè)數(shù)這一系列的操作都是RDD完成。
咱們接下來看看java如何寫Spark的代碼的狞换。
一避咆、Spark例子代碼
通過以下代碼可以很容易的看出來舟肉,沒有那么多的Map修噪,Reduce以及輸入輸出的格式指定查库,代碼邏輯簡(jiǎn)單了,但是難點(diǎn)是在于lambda表達(dá)式的寫法黄琼,寫的很容易樊销,能讀懂,但是再讓我寫一次脏款,我可能還是不會(huì)寫围苫。。撤师。以后有機(jī)會(huì)重點(diǎn)學(xué)習(xí)下這部分剂府。
引入maven
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
主方法
package com.sparkwordcount;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.Arrays;
public class SparkMain {
public static void main(String[] args) throws Exception {
// 設(shè)定appName(為本腳本取個(gè)名字)
String appName = "testSpark";
// 設(shè)定spark master(默認(rèn)支持local)
String master = "local";
// 處理的源文件,輸出的結(jié)果剃盾,這個(gè)文件是咱們前幾天在MapReduce中的文件
String filePath = "/test/input/testFile.txt";
String outputPath = "/test/output/testSpartResult";
// 初始化Spark環(huán)境腺占,為后邊運(yùn)行讀取環(huán)境配置
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
// 讀取文件并處理
JavaRDD<String> lines = sc.textFile(filePath);
// 將每一行通過空格截取成新的RDD
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
// 將所有的文字組成鍵值對(duì),并對(duì)不同的key進(jìn)行計(jì)數(shù)
JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
// 循環(huán)輸出每一個(gè)字的出現(xiàn)次數(shù)
counts.foreach(s -> System.out.println(s._1()+","+s._2()));
// 持久化到內(nèi)存和硬盤中痒谴,能夠?yàn)楹笃谛碌某绦蚍奖阕x取
counts.persist(StorageLevel.MEMORY_AND_DISK());
// 輸出成文本到指定目錄
counts.saveAsTextFile(outputPath);
}
}
二衰伯、打包
設(shè)定Artifacts打包
執(zhí)行Build Artifaces打包
會(huì)在指定目錄生成jar包SparkWordCount.jar
三、上傳到docker并運(yùn)行
#復(fù)制文件到docker中
docker cp /Users/XuesongBu/Documents/git_code/SparkWordCount/out/artifacts/SparkWordCount_jar/SparkWordCount.jar master:/usr/local
#進(jìn)入docker
docker exec -it master bash
#進(jìn)入Spark目錄
cd /usr/local/spark-3.0.3-bin-hadoop2.7
#提交到Spark執(zhí)行
./bin/spark-submit \
--class com.sparkwordcount.SparkMain \
--master local \
../SparkWordCount.jar \
100
#輸出积蔚,其實(shí)還有別的很多數(shù)據(jù)意鲸,咱們忽視吧,太多了
tFile,1
Hello,1
dd,2
ddd,1
242343,1
123,1
tes,1
sdfs,1
43252,1
world,1
df,2
3434s,1
dfdsf,1
#通過hadoop查看文件命令查看本次執(zhí)行的輸出的文件結(jié)果
hadoop fs -cat /test/output/testSpartResult/*
(tFile,1)
(Hello,1)
(dd,2)
(ddd,1)
(242343,1)
(123,1)
(tes,1)
(sdfs,1)
(43252,1)
(world,1)
(df,2)
(3434s,1)
(dfdsf,1)
四尽爆、總結(jié)
這就是一個(gè)簡(jiǎn)單的Spark的小例子怎顾,這只是個(gè)入門,其實(shí)更復(fù)雜的是針對(duì)大數(shù)據(jù)統(tǒng)計(jì)的算法漱贱,我寫出來的一切實(shí)際都是CRUD杆勇,都是利用工具進(jìn)行的簡(jiǎn)單的操作,算法才是最重要的饱亿。
大家有什么不懂得可以在評(píng)論回復(fù)我蚜退,我來給大家詳細(xì)解答。
謝各位的閱讀彪笼,謝謝您動(dòng)動(dòng)手指點(diǎn)贊钻注,萬分感謝各位。另外以下是我之前寫過的文章配猫,感興趣的可以點(diǎn)進(jìn)去繼續(xù)閱讀幅恋。
歷史文章
Hadoop系列-入門安裝
Hadoop系列-HDFS命令
Hadoop系列-Hive安裝
Hadoop系列-Hive數(shù)據(jù)庫常見SQL命令
Hadoop系列-HBase數(shù)據(jù)庫
Hadoop系列-HBase數(shù)據(jù)庫(二)
Hadoop系列-HBase數(shù)據(jù)庫JAVA篇
Hadoop系列-Spark安裝以及HelloWorld
Hadoop系列-MapReduce小例子
Hadoop系列-Spark小例子
JAVA面試匯總(五)數(shù)據(jù)庫(一)
JAVA面試匯總(五)數(shù)據(jù)庫(二)
JAVA面試匯總(五)數(shù)據(jù)庫(三)
JAVA面試匯總(四)JVM(一)
JAVA面試匯總(四)JVM(二)
JAVA面試匯總(四)JVM(三)
JAVA面試匯總(三)集合(一)
JAVA面試匯總(三)集合(二)
JAVA面試匯總(三)集合(三)
JAVA面試匯總(三)集合(四)
JAVA面試匯總(二)多線程(一)
JAVA面試匯總(二)多線程(二)
JAVA面試匯總(二)多線程(三)
JAVA面試匯總(二)多線程(四)
JAVA面試匯總(二)多線程(五)
JAVA面試匯總(二)多線程(六)
JAVA面試匯總(二)多線程(七)
JAVA面試匯總(一)Java基礎(chǔ)知識(shí)