https://medium.com/@atul94/getting-started-with-apache-spark-ad9d59e71f6f
Apache Spark被解釋為“用于大規(guī)模數(shù)據(jù)處理的快速通用引擎”。然而器予,這甚至沒(méi)有開(kāi)始包含它成為大數(shù)據(jù)空間中如此突出的參與者的原因挽懦。Apache Spark是一個(gè)分布式計(jì)算平臺(tái)峻贮,大數(shù)據(jù)公司的采用率一直在以驚人的速度增長(zhǎng)。
Spark Architecture
spark的架構(gòu)如下:
Spark是一個(gè)分布式處理引擎鳞陨,但它沒(méi)有自己的資源分布式存儲(chǔ)和集群管理器。它運(yùn)行在開(kāi)箱即用的集群資源管理器和分布式存儲(chǔ)之上。
Spark核心有兩個(gè)部分:
- 核心API:非結(jié)構(gòu)化API(RDD)欢峰,結(jié)構(gòu)化API(DataFrame,數(shù)據(jù)集)涨共∨μ可用于Scala,Python举反,Java和R.
- 計(jì)算引擎:內(nèi)存管理懊直,任務(wù)調(diào)度,故障恢復(fù)火鼻,與Cluster Manager交互室囊。
注意:我們將在本文末尾看到Java中的Core API實(shí)現(xiàn)。
在核心API之外魁索,Spark提供:
- Spark SQL:通過(guò)類似查詢的SQL與結(jié)構(gòu)化數(shù)據(jù)交互融撞。
- 流:消費(fèi)和處理連續(xù)的數(shù)據(jù)流。
- MLlib:機(jī)器學(xué)習(xí)庫(kù)粗蔚。但是尝偎,我不建議在這里培訓(xùn)深度學(xué)習(xí)模型。
- GraphX:典型的圖處理算法。
以上四種都直接依賴于用于分布式計(jì)算的spark核心API致扯。
Spark的優(yōu)點(diǎn)
- Spark為批處理肤寝,結(jié)構(gòu)化數(shù)據(jù)處理,流媒體等提供了統(tǒng)一的平臺(tái)抖僵。
- 與Hadoop的map-reduce相比鲤看,spark代碼更易于編寫和使用。
- Spark最重要的特性耍群,它抽象了并行編程方面义桂。Spark核心抽象了分布式存儲(chǔ),計(jì)算和并行編程的復(fù)雜性蹈垢。
Apache Spark的主要用例之一是大規(guī)模數(shù)據(jù)處理慷吊。我們創(chuàng)建程序并在spark集群上執(zhí)行它們。
計(jì)劃在集群上的執(zhí)行
主要有兩種方法在spark集群上執(zhí)行程序:
- 互動(dòng)客戶耘婚,如spark-shell罢浇,py-spark,筆記本等沐祷。
- 提交一份工作嚷闭。
大多數(shù)開(kāi)發(fā)過(guò)程都發(fā)生在交互式客戶端上,但是當(dāng)我們必須將我們的應(yīng)用程序投入生產(chǎn)時(shí)赖临,我們使用提交作業(yè)方法胞锰。
對(duì)于長(zhǎng)時(shí)間運(yùn)行的流作業(yè)或定期批處理作業(yè)兢榨,我們打包應(yīng)用程序并將其提交給Spark集群以供執(zhí)行嗅榕。
Spark是一種分布式處理引擎吵聪,遵循主從架構(gòu)凌那。在spark術(shù)語(yǔ)中,master是* Driver(驅(qū)動(dòng)程序)*吟逝,slave是執(zhí)行者(executors)帽蝶。
Driver負(fù)責(zé):
- 分析
- 分布。
- 監(jiān)測(cè)块攒。
- 調(diào)度励稳。
- 在Spark過(guò)程的生命周期內(nèi)保持所有必要的信息。
執(zhí)行程序(executors)僅負(fù)責(zé)執(zhí)行驅(qū)動(dòng)程序(Driver)分配給它們的部分代碼囱井,并將狀態(tài)報(bào)告給驅(qū)動(dòng)程序驹尼。
每個(gè)Spark過(guò)程都有一個(gè)單獨(dú)的Driver和獨(dú)有的執(zhí)行程序(executors)。
執(zhí)行方式
客戶端模式:驅(qū)動(dòng)程序是您提交應(yīng)用程序的本地VM庞呕。默認(rèn)情況下新翎,spark以客戶端模式提交所有應(yīng)用程序。由于驅(qū)動(dòng)程序是整個(gè)Spark過(guò)程中的主節(jié)點(diǎn),因此在生產(chǎn)設(shè)置中料祠,建議不要這樣做骆捧。對(duì)于調(diào)試澎羞,使用客戶端模式更有意義髓绽。
群集模式:驅(qū)動(dòng)程序是群集中的執(zhí)行程序之一。在spark-submit中妆绞,您可以按如下方式傳遞參數(shù):
--deploy-mode cluster
群集資源管理器
Yarn和Mesos是常用的集群管理器顺呕。
Kubernetes是一個(gè)通用的容器協(xié)調(diào)器。
注意:Kubernetes上的Spark不是生產(chǎn)就緒的括饶。
Yarn是最受歡迎的Spark資源管理器株茶,讓我們看看它的內(nèi)在工作:
在客戶端模式應(yīng)用程序中,驅(qū)動(dòng)程序(Driver)是我們的本地VM图焰,用于啟動(dòng)spark應(yīng)用程序:
步驟1:一旦Driver啟動(dòng)Spark會(huì)話請(qǐng)求就轉(zhuǎn)到Y(jié)arn以創(chuàng)建Yarn應(yīng)用程序启盛。
第2步: Yarn Resource Manager創(chuàng)建一個(gè)Application Master。對(duì)于客戶端模式技羔,AM充當(dāng)執(zhí)行程序(executor)啟動(dòng)器僵闯。
步驟3: AM將聯(lián)系Yarn Resource經(jīng)理以請(qǐng)求進(jìn)一步的容器。
步驟4:資源管理器將分配新容器藤滥,AM將在每個(gè)容器中啟動(dòng)執(zhí)行程序(executor)鳖粟。之后,執(zhí)行程序(executor)直接與驅(qū)動(dòng)程序(Driver)通信拙绊。
注意:在群集模式下向图,驅(qū)動(dòng)程序(Driver)在AM中啟動(dòng)。
執(zhí)行程序和內(nèi)存調(diào)整
硬件 - 6個(gè)節(jié)點(diǎn)标沪,每個(gè)節(jié)點(diǎn)16個(gè)內(nèi)核榄攀,64 GB RAM
讓我們從核心數(shù)量開(kāi)始。核心數(shù)表示執(zhí)行程序可以運(yùn)行的并發(fā)任務(wù)金句。研究表明檩赢,任何具有超過(guò)5個(gè)并發(fā)任務(wù)的應(yīng)用程序都會(huì)導(dǎo)致糟糕的表現(xiàn)。因此趴梢,我建議堅(jiān)持5漠畜。
注意:上述數(shù)字來(lái)自執(zhí)行程序的性能,而不是系統(tǒng)具有的核心數(shù)坞靶。因此憔狞,32核系統(tǒng)也將保持不變。
1核心操作系統(tǒng)和Hadoop守護(hù)進(jìn)程需要1 GB RAM彰阴。因此我們留下了63 GB Ram和15 Core瘾敢。
對(duì)于15個(gè)核心,每個(gè)節(jié)點(diǎn)可以有3個(gè)executors。這總共給了我們18個(gè)executors簇抵。AM Container需要1個(gè)executors庆杜。因此我們可以得到17位executors。
到內(nèi)存了碟摆,每個(gè)executors得到63/3 = 21 GB晃财。但是,在計(jì)算完整內(nèi)存請(qǐng)求時(shí)需要考慮很小的開(kāi)銷典蜕。
Formula for that over head = max(384, .07 * spark.executor.memory)
Calculating that overhead = .07 * 21 = 1.47
因此內(nèi)存降至約19 GB断盛。
因此系統(tǒng)變成:
--num-executors 17 --executor- memory 19G --executor-cores 5
注意:如果我們需要更少的內(nèi)存,我們可以減少內(nèi)核數(shù)量以增加執(zhí)行程序(executor)的數(shù)量愉舔。
Spark Core
現(xiàn)在我們來(lái)看一下Spark提供的一些核心API钢猛。Spark需要一個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)保存數(shù)據(jù)。我們有三個(gè)備選方案RDD轩缤,DataFrame和Dataset命迈。從Spark 2.0開(kāi)始,建議僅使用Dataset和DataFrame火的。這兩個(gè)內(nèi)部編譯到RDD本身壶愤。
這三個(gè)是彈性,分布式卫玖,分區(qū)和不可變的數(shù)據(jù)集合公你。
任務(wù): Spark中最小的工作單元,由執(zhí)行程序(executor)執(zhí)行假瞬。
數(shù)據(jù)集提供兩種類型的操作:
轉(zhuǎn)換:從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集陕靠。它很懶惰,數(shù)據(jù)仍然是分布式的脱茉。
操作: Action將數(shù)據(jù)返回給驅(qū)動(dòng)程序(driver)剪芥,本質(zhì)上是非分布式的。對(duì)數(shù)據(jù)集的操作會(huì)觸發(fā)作業(yè)琴许。
隨機(jī)和排序:重新分區(qū)數(shù)據(jù)集以對(duì)其執(zhí)行操作税肪。它是spark的抽象,我們不需要為它編寫代碼榜田。這項(xiàng)活動(dòng)需要一個(gè)新階段益兄。
通用行為和轉(zhuǎn)換
1)lit,geq箭券,leq净捅,gt,lt
lit:創(chuàng)建一個(gè)文字值的列辩块』琢可用于與其他列進(jìn)行比較荆永。
geq(大于等于),leq(小于等于)国章,gt(大于)具钥,lt(小于):用于與其他列值進(jìn)行比較。例如:
// [https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java](https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java)
// Filter dataset with GENERIC_COL value >= 0
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).geq(0));
// Filter dataset with GENERIC_COL value != "qwerty"
Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).notEqual(lit("QWERTY")));
2)join(加入)
Spark讓我們以各種方式加入數(shù)據(jù)集液兽。將嘗試用示例示例進(jìn)行解釋
// [https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java](https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java)
new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("value", DataTypes.StringType, true, Metadata.empty()),
};
StructType structType = new StructType(structFields);
List<Row> rowListA = new ArrayList<>();List<Row> rowListB = new ArrayList<>();
rowListA.add(RowFactory.create(1, "A1"));rowListA.add(RowFactory.create(2, "A2"));
rowListA.add(RowFactory.create(3, "A3"));rowListA.add(RowFactory.create(4, "A4"));
rowListB.add(RowFactory.create(3, "A3"));rowListB.add(RowFactory.create(4, "A4"));
rowListB.add(RowFactory.create(4, "A4_1"));rowListB.add(RowFactory.create(5, "A5"));
rowListB.add(RowFactory.create(6, "A6"));
// Create 2 sample dataset
dsA = sparkSession.createDataFrame(rowListA, structType);
dsB = sparkSession.createDataFrame(rowListB, structType);
String[] typesOfJoins = {"inner", "outer", "full", "full_outer", "left",
"left_outer", "right", "right_outer", "left_semi", "left_anti"};
//Print both the dataset
System.out.println("Dataset A");
dsA.show();
System.out.println("Dataset B");
dsB.show();
//Print all possible types of join
for (int i = 0; i < typesOfJoins.length; i++) {
System.out.println(typesOfJoins[i].toUpperCase());
dsA.join(dsB, dsA.col("id").equalTo(dsB.col("id")), typesOfJoins[i]).drop(dsB.col("id")).show();
}
結(jié)果如下:
Dataset A Dataset B
+---+-----+ +---+-----+
| id|value| | id|value|
+---+-----+ +---+-----+
| 1| A1| | 3| A3|
| 2| A2| | 4| A4|
| 3| A3| | 4| A4_1|
| 4| A4| | 5| A5|
+---+-----+ | 6| A6|
+---+-----+
----------------------------------------------------------------------------------------------------------------------------
INNER JOIN OUTER JOIN FULL JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| id|value|value| | id|value|value| | id|value|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| 3| A3| A3| | 1| A1| null| | 1| A1| null|
| 4| A4| A4_1| | 2| A2| null| | 2| A2| null|
| 4| A4| A4| | 3| A3| A3| | 3| A3| A3|
+---+-----+-----+ | 4| A4| A4| | 4| A4| A4_1|
| 4| A4| A4_1| | 4| A4| A4|
| 5| null| A5| | 5| null| A5|
| 6| null| A6| | 6| null| A6|
+---+-----+-----+ +---+-----+-----+
FULL_OUTER JOIN LEFT JOIN LEFT_OUTER JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| id|value|value| | id|value|value| | id|value|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+-----+
| 1| A1| null| | 1| A1| null| | 1| A1| null|
| 2| A2| null| | 2| A2| null| | 2| A2| null|
| 3| A3| A3| | 3| A3| A3| | 3| A3| A3|
| 4| A4| A4_1| | 4| A4| A4_1| | 4| A4| A4_1|
| 4| A4| A4| | 4| A4| A4| | 4| A4| A4|
| 5| null| A5| +---+-----+-----+ +---+-----+-----+
| 6| null| A6|
+---+-----+-----+
RIGHT JOIN RIGHT_OUTER JOIN LEFT_SEMI JOIN
+---+-----+-----+ +---+-----+-----+ +---+-----+
| id|value|value| | id|value|value| | id|value|
+---+-----+-----+ +---+-----+-----+ +---+-----+
| 3| A3| A3| | 3| A3| A3| | 3| A3|
| 4| A4| A4| | 4| A4| A4_1| | 4| A4|
| 4| A4| A4_1| | 4| A4| A4| +---+-----+
| 5| null| A5| | 5| null| A5|
| 6| null| A6| | 6| null| A6|
+---+-----+-----+ +---+-----+-----+
LEFT_ANTI JOIN
+---+-----+
| id|value|
+---+-----+
| 1| A1|
| 2| A2|
+---+-----+
3)union(聯(lián)合)
Spark聯(lián)合函數(shù)允許我們?cè)趦蓚€(gè)數(shù)據(jù)集之間建立聯(lián)合骂删。數(shù)據(jù)集應(yīng)該具有相同的模式。
4)window(窗口)
Spark中的基本功能之一抵碟。它允許您基于一組稱為Frame的行計(jì)算表的每個(gè)輸入行的返回值桃漾。
Spark為翻滾窗口坏匪,希望窗口拟逮,滑動(dòng)窗口和延遲窗口提供API。
我們將它用于排名适滓,總和敦迄,普通舊窗口等。一些用例是:
// Window Function different use cases
//Ranking
//Filter top SOME_INTEGER_VAL_COL
WindowSpec w1 = Window.orderBy(col(SOME_COUNT_COL).desc());
inputDataset = inputDataset.select(col(SOME_COL), rank().over(w1).as(RANK_COL)).filter(col(RANK_COL).leq(SOME_INTEGER_VAL));
//Get values over a moving Window
//Helpful for calculating Directed Graphs in data based on time, moving average etc.
mainDataset = mainDataset.groupBy(col(SOME_COL), window(col(DATE_TIME_COL), INTERVAL_STRING))
.agg(collect_list(struct(col(SOME_COL), col(DATE_TIME_COL))).as(SOME_OTHER_COL));
// Above program creates a directed graph based on window
//Getting sum over some col
WindowSpec w2 = Window.groupBy(col(SOME_COL));
rowDataset = rowDataset.select(col(SOME_COL_A), sum(col(SOME_COL_B).over(w2)).as(SOME_COL_C));
其他功能(如lag凭迹,lead等)允許您執(zhí)行其他操作罚屋,使您可以對(duì)數(shù)據(jù)集執(zhí)行復(fù)雜的分析。
但是嗅绸,如果仍需要對(duì)數(shù)據(jù)集執(zhí)行更復(fù)雜的操作脾猛,則可以使用UDF。UDF的使用示例:
//UDF
//Some class to calculate similarity between two Wrapped Array with Schema
public class SimilarityCalculator implements AnalyticsUDF {
private Double varACoff;
private Double varBCoff;
private Double varCCoff;
private Double varDCoff;
public SimilarityCalculator(Double varACoff, Double varBCoff, Double varCCoff, Double varDCoff) {
this.varACoff = varACoff;
this.varBCoff = varBCoff;
this.varCCoff = varCCoff;
this.varDCoff = varDCoff;
}
public Double doCal(WrappedArray<GenericRowWithSchema> varA,
WrappedArray<GenericRowWithSchema> varB) {
int lenA = varA.length();
int lenB = varB.length();
double ans = 0;
double temp;
Map<String, Double> mapA = new HashMap<>();
Map<String, Double> mapB = new HashMap<>();
for (int i = 0; i < lenA; i++) {
mapA.put(varA.apply(i).getString(0), varA.apply(i).getDouble(1));
}
for (int i = 0; i < lenB; i++) {
mapB.put(varB.apply(i).getString(0), varB.apply(i).getDouble(1));
}
for (int i = 0; i < lenA; i++) {
if (mapB.containsKey(varA.apply(i).getString(0))) {
temp = varA.apply(i).getDouble(1) - mapB.get(varA.apply(i).getString(0));
ans += temp * temp;
} else {
ans += varA.apply(i).getDouble(1) * varA.apply(i).getDouble(1);
}
}
for (int i = 0; i < lenB; i++) {
if (!mapA.containsKey(varB.apply(i).getString(0))) {
ans += varB.apply(i).getDouble(1) * varB.apply(i).getDouble(1);
}
}
return ans;
}
public Double doCal(Double varA, Double varB, Double varC, Double varD) {
return Math.sqrt(varA * varACoff + varB * varBCoff + varC * varCCoff + varD * varDCoff);
}
public String getName() {
return "L2DistanceScore";
}
public String getSecondName() {
return "MergeScore";
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// At the Spark Session Level
sparkSession.udf().register(similarityCalc.getName(),
(WrappedArray<GenericRowWithSchema> colA, WrappedArray<GenericRowWithSchema> colB) -> similarityCalc
.doCal(colA, colB), DataTypes.DoubleType);
sparkSession.udf().register(similarityCalc.getSecondName(),
(Double a, Double b, Double c, Double d) -> similarityCalc.doCal(a, b, c, d),
DataTypes.DoubleType);
注意:使用UDF應(yīng)該是最后的手段鱼鸠,因?yàn)樗鼈儧](méi)有針對(duì)Spark進(jìn)行優(yōu)化; 他們可能需要更長(zhǎng)的時(shí)間來(lái)執(zhí)行死刑猛拴。建議在UDF上使用本機(jī)spark函數(shù)。
這只是Apache Spark的冰山一角蚀狰。它的實(shí)用程序擴(kuò)展到各種領(lǐng)域愉昆,不僅限于數(shù)據(jù)分析。觀看這個(gè)空間了解更多麻蹋。
參考
- https://www.youtube.com/watch?v=AYZCpxYVxH4&list=PLkz1SCf5iB4dXiPdFD4hXwheRGRwhmd6K
- https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory
- https://medium.com/@Farox2q/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44
- https://stackoverflow.com/questions/45990633/what-are-the-various-join-types-in-spark