通常情況下裹刮,當(dāng)向Spark操作(如map,reduce)傳遞一個函數(shù)時,它會在一個遠程集群節(jié)點上執(zhí)行斗埂,它會使用函數(shù)中所有變量的副本端衰。這些變量被復(fù)制到所有的機器上叠洗,遠程機器上并沒有被更新的變量會向驅(qū)動程序回傳。在任務(wù)之間使用通用的旅东,支持讀寫的共享變量是低效的灭抑。盡管如此,Spark提供了兩種有限類型的共享變量抵代,廣播變量和累加器腾节。
1.累加器
提供了將工作節(jié)點中的值聚合到驅(qū)動器程序中的簡單語法。累加器的一個常見的用途是在調(diào)試時對作業(yè)的執(zhí)行過程中事件進行計數(shù)。
一個計算空行的例子
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by chh on 2016/5/22.
*/
object Counter {
def main(args :Array[String]): Unit = {
//創(chuàng)建一個scala版本的SparkContext
val conf = new SparkConf().setAppName("Counter").setMaster("local")
val sc = new SparkContext(conf)
val file=sc.textFile("file.txt")
val blankLines =sc.accumulator(0)//創(chuàng)建Accumulator[Int]并初始化為0
val callSigns =file.flatMap(line => {
if(line == ""){
blankLines += 1 //累加器加一
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println(blankLines.value)
}
}
注:
1.累加器的值只有在驅(qū)動器程序中訪問案腺,所以檢查也應(yīng)當(dāng)在驅(qū)動器程序中完成庆冕。
2.對于行動操作中使用的累加器,Spark只會把每個任務(wù)對各累加器的修改應(yīng)用一次劈榨。因此如果想要一個無論在失敗還是在重新計算時候都絕對可靠的累加器访递,必須把它放在foreach()這樣的行動操作中。
而轉(zhuǎn)化操作最好在調(diào)試中使用
2.廣播變量
廣播變量允許程序員將一個只讀的變量緩存在每臺機器上同辣,而不用在任務(wù)之間傳遞變量拷姿。廣播變量可被用于有效地給每個節(jié)點一個大輸入數(shù)據(jù)集的副本。
一個Executor只需要在第一個Task啟動時邑闺,獲得一份Broadcast數(shù)據(jù)跌前,之后的Task都從本節(jié)點的BlockManager中獲取相關(guān)數(shù)據(jù)。
步驟
1.調(diào)用SparkContext.broadcast方法創(chuàng)建一個Broadcast[T]對象陡舅。
任何序列化的類型都可以這么實現(xiàn)。
2.通過value屬性訪問改對象的值(Java之中為value()方法)
3.變量只會被發(fā)送到各個節(jié)點一次伴挚,應(yīng)作為只讀值處理(修改這個值不會影響到別的節(jié)點)
修改為:.廣播變量方式
3.分區(qū)操作
(1)mapPartitions
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
該函數(shù)和map函數(shù)類似靶衍,只不過映射函數(shù)的參數(shù)由RDD中的每一個元素變成了RDD中每一個分區(qū)的迭代器。如果在映射的過程中需要頻繁創(chuàng)建額外的對象(如數(shù)據(jù)庫連接對象)茎芋,使用mapPartitions要比map高效的多颅眶。
比如,將RDD中的所有數(shù)據(jù)通過JDBC連接寫入數(shù)據(jù)庫田弥,如果使用map函數(shù)涛酗,可能要為每一個元素都創(chuàng)建一個connection,這樣開銷很大偷厦,如果使用mapPartitions商叹,那么只需要針對每一個分區(qū)建立一個connection。
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分區(qū)
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23
//rdd3將rdd1中每個分區(qū)中的數(shù)值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2
(2)mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函數(shù)作用同mapPartitions只泼,不過提供了兩個參數(shù)剖笙,第一個參數(shù)為分區(qū)的索引。
<template>
<div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>個人</h3>
<p>
適合垂直領(lǐng)域?qū)<仪氤⒁庖婎I(lǐng)袖等自媒體人士申請
</p>
</div>
<div class="enter" @click="nextStage('個人')">
<span>入駐</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>媒體</h3>
<p>
面向報紙弥咪、雜志、電視十绑、廣播等其他以內(nèi)容生產(chǎn)為主的組織機構(gòu)
</p>
</div>
<div class="enter" @click="nextStage('媒體')">
<span>入駐</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>企業(yè)/機構(gòu)</h3>
<p>
面向企業(yè)聚至、公司、分支機構(gòu)本橙、社團扳躬、民間組織等機構(gòu)團體
</p>
</div>
<div class="enter" @click="nextStage('企業(yè)/機構(gòu)')">
<span>入駐</span>
</div>
</div>
<div class="type-item">
<div class="type-image">
</div>
<div class="content">
<h3>政府</h3>
<p>
面向國內(nèi)各省市區(qū)的各級黨政機關(guān)
</p>
</div>
<div class="enter" @click="nextStage('政府')">
<span>入駐</span>
</div>
</div>
</div>
</template>
<script>
export default {
data : function(){
return {
}
},
methods: {
nextStage: function(type) {
console.log(type);
window.location.href="#/registor/fill"
}
}
}
</script>
<style scoped>
.type-item {
position: relative;
box-sizing: border-box;
padding: 40px 140px 30px 150px;
border-bottom: 1px solid #cdcdcd;/*no*/
}
.type-item:nth-of-type(1) {
margin-top: 40px;
}
.type-item:nth-of-type(1) .type-image{
background-image: url('../../assets/images/registor/geren.png');
}
.type-item:nth-of-type(2) .type-image{
background-image: url('../../assets/images/registor/meiti.png');
}
.type-item:nth-of-type(3) .type-image{
background-image: url('../../assets/images/registor/qiye.png');
}
.type-item:nth-of-type(4) .type-image{
background-image: url('../../assets/images/registor/zhengfu.png');
}
.type-item:nth-last-of-type(1) {
border-bottom: 0px;
margin-bottom: 80px;
}
.type-item .type-image {
position: absolute;
left: 0px;
width: 118px;
height: 118px;
border-radius: 50%;
background-color: #fdb300;
background-repeat: no-repeat;
background-size: 55px 50px;
background-position: center;
border-bottom: 1px solid #cdcdcd;
}
.type-item .content h3 {
font-size: 34px;
line-height: 45px;
color: #666666;
margin-top: -5px;
}
.type-item .content p {
font-size:26px;
color: #999999;
margin-top: 16px;
}
.type-item .enter {
position: absolute;
box-sizing: border-box;
top: 50%;
-webkit-transform: translate(0, -50%);
transform: translate(0, -50%);
right: 0px;
width: 120px;
font-size: 26px;
color: #666666;
border-radius: 10px;
border: 2px solid #fdb300;
text-align: center;
vertical-align: center;
padding: 15px 0px;
}
</style>
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分區(qū)
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator
}
}
//rdd2將rdd1中每個分區(qū)的數(shù)字累加,并在每個分區(qū)的累加結(jié)果前面加了分區(qū)索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)