Spark自定義累加器的實(shí)現(xiàn)
Java版本:
package com.luoxuehuan.sparkproject.spark;
import org.apache.spark.AccumulatorParam;
/**
*
* @author lxh
* implements AccumulatorParam<String>
* String格式 進(jìn)行分布式計(jì)算
* 也可以用自己的model 臼寄,但必須是可以序列化的众雷!
* 然后基于這種特殊的數(shù)據(jù)格式冒签,可以實(shí)現(xiàn)自己復(fù)雜的分布式計(jì)算邏輯
*
* 各個(gè)task 分布式在運(yùn)行债蜜,可以根據(jù)你需求挠锥,task給Accumulator傳入不同的值阳掐。
*
* 根據(jù)不同的值洽腺,去做復(fù)雜的邏輯。
*/
public class SessionAggrAccumulator implements AccumulatorParam<String> {
private static final long serialVersionUID = 1L;
/**
* Zoro方法淡诗,其實(shí)主要用于數(shù)據(jù)的初始化
* 那么骇塘,我們這里,就返回一個(gè)值韩容,就是初始化中款违,所有范圍區(qū)間的數(shù)量,多少0
*
* 各個(gè)范圍區(qū)間的統(tǒng)計(jì)數(shù)量的拼接群凶,還是采用|分割插爹。
*/
@Override
public String zero(String v) {
return Constants.SESSION_COUNT + "=0|"
+ Constants.TIME_PERIOD_1s_3s + "=0|"
+ Constants.TIME_PERIOD_4s_6s + "=0|"
+ Constants.TIME_PERIOD_7s_9s + "=0|"
+ Constants.TIME_PERIOD_10s_30s + "=0|"
+ Constants.TIME_PERIOD_30s_60s + "=0|"
+ Constants.STEP_PERIOD_60 + "=0";
}
/**
* 這兩個(gè)方法可以理解為一樣的。
* 這兩個(gè)方法请梢,其實(shí)主要就是實(shí)現(xiàn)赠尾,v1可能就是我們初始化的那個(gè)連接串
* v2,就是我們?cè)诒闅vsession的時(shí)候毅弧,判斷出某個(gè)session對(duì)應(yīng)的區(qū)間气嫁,然后會(huì)用Constants.TIME_PERIOD_1s_3s
* 所以,我們够坐,要做的事情就是
* 在v1中寸宵,找到v2對(duì)應(yīng)的value,累加1元咙,然后再更新回連接串里面去
*/
@Override
public String addInPlace(String v1, String v2) {
return add(v1, v2);
}
@Override
public String addAccumulator(String v1, String v2) {
return add(v1, v2);
}
/**
* session統(tǒng)計(jì)計(jì)算邏輯梯影。
* @param v1 連接串
* @param v2 范圍區(qū)間
* @return 更新以后的連接串
*/
private String add(String v1,String v2){
//校驗(yàn):v1位空的話,直接返回v2
if(StringUtils.isEmpty(v1)) {
return v2;
}
// 使用StringUtils工具類蛾坯,從v1中光酣,提取v2對(duì)應(yīng)的值疏遏,并累加1
String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
if(oldValue != null) {
// 將范圍區(qū)間原有的值脉课,累加1
int newValue = Integer.valueOf(oldValue) + 1;
// 使用StringUtils工具類,將v1中财异,v2對(duì)應(yīng)的值倘零,設(shè)置成新的累加后的值
return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
}
return v1;
}
}
Scala版本
package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
* Created by lxh on 2016/6/30.
*/
object BroadcastAccumulatorStreaming {
/**
* 聲明一個(gè)廣播和累加器!
*/
private var broadcastList:Broadcast[List[String]] = _
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
val sc = new SparkContext(sparkConf)
/**
* duration是ms
*/
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
/**
* 獲取數(shù)據(jù)戳寸!
*/
val lines = ssc.socketTextStream("localhost",9999)
/**
* 1.flatmap把行分割成詞呈驶。
* 2.map把詞變成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.進(jìn)行過(guò)濾。 value是否在累加器中疫鹊。
* 5.打印顯示袖瞻。
*/
val words = lines.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
wordpair.filter(record => {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
/**
* 這個(gè)pair 是PairDStream<String, Integer>
* 查看這個(gè)id是否在黑名單中司致,如果是的話,累加器就+1
*/
/* pair.foreachRDD(rdd => {
rdd.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
} else {
return false
}
})
})*/
val filtedpair = pair.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
true
} else {
false
}
}).print
println("累加器的值"+accumulator.value)
// pair.filter(record => {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair => (pair._2,pair._1))*/
/**
* 如果DStream自己沒(méi)有某個(gè)算子操作嗅蔬。就通過(guò)轉(zhuǎn)化transform蜓席!
*/
/* keypair.transform(rdd => {
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者