第一次接觸Spark兼蕊,自己整理了(從網(wǎng)絡(luò)初厚,書籍,同事那里)一些Spark的相關(guān)內(nèi)容當(dāng)做筆記孙技。路過的朋友僅供參考产禾,不能保證說得都對(duì)。
什么是 Spark
簡(jiǎn)單來說绪杏,Spark是一種面向?qū)ο笙掠⒑瘮?shù)式編程語言。Spark能夠像操作本地集合對(duì)象一樣輕松地操作分布式數(shù)據(jù)集蕾久。它具有運(yùn)行速度快势似、易用性好、通用性強(qiáng)和隨處運(yùn)行等特點(diǎn)僧著。
Spark提供了支持Java履因、scala、Python以及R語言的API盹愚。還支持更高級(jí)的工具如:Spark Sql栅迄、Spark Streaming、MLlib皆怕、GraphX等毅舆。
官方介紹 :Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
百度百科:Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎
Spark 有什么特點(diǎn)
以下摘自百度百科
更快的速度。內(nèi)存中計(jì)算愈腾, 比 Hadoop 快100倍憋活。
易用性。Spark 提供了80多個(gè)高級(jí)運(yùn)算符虱黄。
通用性悦即。Spark 提供了大量的庫,包括SQL橱乱、DataFrames辜梳、MLlib、GraphX泳叠、Spark Streaming作瞄。 開發(fā)者可以在同一個(gè)應(yīng)用程序中無縫組合使用這些庫。
支持多種資源管理器危纫。Spark 支持 Hadoop YARN宗挥,Apache Mesos节预,及其自帶的獨(dú)立集群管理器
更多詳細(xì)介紹可參考:https://blog.csdn.net/xwc35047/article/details/51072145
什么是 RDD
在我使用Spark的過程中,用到最多的對(duì)象就是RDD属韧,比如JavaRDD、JavaPairRDD蛤吓。然后RDD之間又可以互相轉(zhuǎn)化宵喂。這個(gè)RDD是個(gè)啥?
RDD全文是 Resilient Distributed DataSet(彈性·分布式·數(shù)據(jù)集)会傲。
RDD是一個(gè)只讀的锅棕、可分區(qū)的、支持多種來源 淌山、有容錯(cuò)機(jī)制 裸燎、可以被緩存 、支持并行操作的分布式數(shù)據(jù)集泼疑,可以裝載任何你想裝載的數(shù)據(jù)德绿。他的彈性特點(diǎn)體現(xiàn)在RDD的數(shù)據(jù)可以在內(nèi)存與磁盤(外存)靈活交換。
Spark 模型
再來認(rèn)識(shí)一下下面幾個(gè)重要概念
Application退渗。也就是我們編寫完Spark程序移稳,負(fù)責(zé)生成SparkContext。
Job会油。所謂 job个粱,就是由一個(gè) rdd 的 action算子(后面再說action) 觸發(fā)的動(dòng)作,可以簡(jiǎn)單的理解為翻翩,當(dāng)你需要執(zhí)行一個(gè) rdd 的 action 的時(shí)候都许,會(huì)生成一個(gè) job。
Stage嫂冻。stage 是一個(gè) job 的組成單位胶征,就是說,一個(gè) job 會(huì)被切分成 1 個(gè)或多個(gè) stage絮吵,然后各個(gè) stage 會(huì)按照?qǐng)?zhí)行順序依次執(zhí)行弧烤。
Task。stage 下的一個(gè)任務(wù)執(zhí)行單元蹬敲,一般來說暇昂,一個(gè) rdd 有多少個(gè) partition(分區(qū),后面再說partition)伴嗡,就會(huì)有多少個(gè) task急波,因?yàn)槊恳粋€(gè) task 只是處理一個(gè) partition 上的數(shù)據(jù)。
簡(jiǎn)單來說就是以RDD為基準(zhǔn)瘪校,每觸發(fā)一個(gè)action操作澄暮,就會(huì)生成一個(gè)job名段。job內(nèi)部有一個(gè)或多個(gè)stage順序執(zhí)行,組成stage的是一系列task泣懊,即任務(wù)執(zhí)行單元伸辟。
關(guān)于Partition分區(qū)。Spark RDD主要由Dependency馍刮、Partition信夫、Partitioner組成,Partition是其中之一卡啰。一份待處理的原始數(shù)據(jù)會(huì)被按照相應(yīng)的邏輯切分成n份静稻,每份數(shù)據(jù)對(duì)應(yīng)到RDD中的一個(gè)Partition,Partition的數(shù)量決定了task的數(shù)量匈辱,影響著程序的并行度振湾。
關(guān)于Stage。Stage以shuffle和result這兩種類型來劃分亡脸。Spark中有兩類task押搪,一類是shuffleMapTask,一類是resultTask梗掰,第一類task的輸出是shuffle所需數(shù)據(jù)嵌言,第二類task的輸出是result,stage的劃分也以此為依據(jù)及穗,shuffle之前的所有變換是一個(gè)stage摧茴,shuffle之后的操作是另一個(gè)stage。
那么剛剛提到的action又是什么埂陆?我們?cè)賮砹私庖幌翿DD操作算子苛白。
什么是 RDD 操作算子
RDD有兩種操作算子:Transformation(轉(zhuǎn)換) 和 Action(執(zhí)行)
Transformation。即一個(gè)rdd數(shù)據(jù)集經(jīng)過數(shù)據(jù)轉(zhuǎn)換變成一個(gè)新的rdd數(shù)據(jù)集焚虱。常用的Transformation操作有:map购裙、filter、union鹃栽、distinct躏率、groupByKey 等。Transformation 屬于延遲計(jì)算民鼓,當(dāng)觸發(fā)Transformation算子時(shí)rdd并沒有立即進(jìn)行轉(zhuǎn)換薇芝,僅僅是記住了數(shù)據(jù)集的邏輯操作。
Action丰嘉。觸發(fā)Spark作業(yè)的運(yùn)行夯到,真正觸發(fā)轉(zhuǎn)換算子的計(jì)算。常用的操作有:reduce饮亏、collect耍贾、count阅爽、countByKey等等。
什么是 Shuffle
以下摘自官網(wǎng)
Shuffle 即洗牌荐开。以reduceByKey 操作來說付翁,reduceByKey操作生成一個(gè)新的RDD,其中相同key的所有值都組合為一個(gè)元組——key和reduce函數(shù)的結(jié)果晃听。但是胆敞,并非所有相同key的值都必須位于同一個(gè)分區(qū)甚至是同一臺(tái)計(jì)算機(jī)上,然而它們必須位于同一位置才能計(jì)算結(jié)果杂伟。
在Spark中,數(shù)據(jù)通常不會(huì)根據(jù)特定的操作在必要位置進(jìn)行跨分區(qū)分布仍翰。在計(jì)算過程中赫粥,單個(gè)任務(wù)在單個(gè)分區(qū)上運(yùn)行。因此予借,要執(zhí)行reduceByKey任務(wù)的所有數(shù)據(jù)越平,Spark需要執(zhí)行全部操作。它必須從所有分區(qū)中讀取數(shù)據(jù)以找到單個(gè)key的所有值灵迫,然后將各分區(qū)中的值匯總以計(jì)算每個(gè)key的最終結(jié)果 - 這稱為洗牌秦叛。
什么是窄依賴、寬依賴
窄依賴瀑粥。指父RDD的每一個(gè)分區(qū)最多被一個(gè)子RDD的分區(qū)所用挣跋,表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū),和兩個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD 的分區(qū)狞换。
寬依賴避咆。指子RDD的分區(qū)依賴于父RDD的所有分區(qū)。
舉例
下面通過一個(gè)例子修噪,盡量把我所理解的那部分通過這個(gè)小例子表達(dá)出來查库,不保證說的都對(duì)。
統(tǒng)計(jì)某高中今年參加高考的男生人數(shù)
首先我們需要將數(shù)據(jù)源讀取到Spark RDD中(先不管如何讀然魄怼)樊销,一個(gè)數(shù)據(jù)源只生成一個(gè)rdd。
rdd內(nèi)部會(huì)按照一定的邏輯分割成n個(gè)partition分區(qū)脏款,分區(qū)數(shù)也可以自己指定围苫,如果沒有指定,那么就會(huì)采用默認(rèn)值弛矛。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目够吩,每個(gè)分區(qū)由一個(gè)task 執(zhí)行。
rdd先執(zhí)行filter 操作即Transformation算子丈氓。將參加高考的周循,性別為男性的對(duì)象過濾出來强法。但Transformation 是惰性的,不會(huì)立刻觸發(fā)spark 作業(yè)湾笛。
過濾后的rdd 需要進(jìn)行reduce操作即Action算子饮怯。此時(shí)觸發(fā)spark 作業(yè)。每個(gè)action將生成一個(gè)Job嚎研。
Job 包含stage蓖墅,stage有兩種:shuffle和result,取決于算子的執(zhí)行邏輯临扮。如果一個(gè)job中有寬依賴论矾,即有shuffle操作,shuffle之前的生成一個(gè)shuffle stage杆勇。shuffle之后的生成一個(gè)result stage贪壳。
每個(gè)stage 都是一組task 在執(zhí)行,task 取決于分區(qū)數(shù)蚜退。
reduce 過程將符合條件的學(xué)生數(shù)計(jì)數(shù)并返回闰靴。
代碼示例
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SparkDemo {
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
// 生成數(shù)據(jù)源
List<Student> data = getList();
//生成rdd
JavaRDD<Student> rdd = sc.parallelize(data);
//過濾符合條件的數(shù)據(jù)
rdd = rdd.filter(new Function<Student, Boolean>() {
public Boolean call(Student s) throws Exception {
return s.isGaoKao() && s.getSex().equals("男");
}
});
// map && reduce
Student result = rdd.map(new Function<Student, Student>() {
public Student call(Student s) throws Exception {
s.setCount(1);
return s;
}
}).reduce(new Function2<Student, Student, Student>() {
public Student call(Student s1, Student s2) throws Exception {
s1.setCount(s1.getCount() + s2.getCount());
return s1;
}
});
System.out.println("執(zhí)行結(jié)果:" + result.getCount());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
public static List<Student> getList(){
List<Student> data = new ArrayList<Student>();
data.add(new Student(true,"男", "A"));
data.add(new Student(false,"女", "B"));
data.add(new Student(false,"男", "C"));
data.add(new Student(true,"女", "D"));
data.add(new Student(true,"男", "E"));
data.add(new Student(false,"女", "F"));
data.add(new Student(true,"男", "G"));
return data;
}
static class Student implements Serializable{
private String name;
private boolean gaoKao;
private String sex;
private int count;
public Student(boolean gaoKao, String sex, String name) {
this.gaoKao = gaoKao;
this.sex = sex;
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isGaoKao() {
return gaoKao;
}
public void setGaoKao(boolean gaoKao) {
this.gaoKao = gaoKao;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
}
控制臺(tái)輸出
//省略若干行
18/06/22 18:42:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 41 ms on localhost (executor driver) (1/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 58 ms on localhost (executor driver) (2/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 44 ms on localhost (executor driver) (3/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 44 ms on localhost (executor driver) (4/4)
18/06/22 18:42:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/06/22 18:42:25 INFO DAGScheduler: ResultStage 0 (reduce at SparkDemo.java:44) finished in 0.219 s
執(zhí)行結(jié)果:3
請(qǐng)注意:實(shí)例中的Student 類必須序列化,否則會(huì)報(bào)錯(cuò)钻注!