本文介紹如何使用Spark2中自定義累加器來實(shí)現(xiàn)數(shù)據(jù)的統(tǒng)計(jì)低飒。
Spark2.x之后霹崎,之前的的accumulator被廢除胯陋,用AccumulatorV2代替;
1.自定義Accumulator
class StrAccumulator extends AccumulatorV2[String, String] {
// a=10|b=20
private var v = ""
override def isZero: Boolean = v == ""
override def copy(): AccumulatorV2[String, String] = {
val newAcc = new StrAccumulator
newAcc.v = this.v
newAcc
}
override def reset(): Unit = v = ""
override def add(v: String): Unit = {
if (v == null || v == "") {
return this.v
}
val oldValue = getFieldFromConcatString(this.v, "\\|", v)
if (oldValue != null) {
val newValue = (oldValue.toInt + 1).toString
this.v = setFieldInConcatString(this.v, "\\|", v, newValue)
} else {
if (isZero) {
this.v = v + "=" + 1
} else {
this.v = this.v + "|" + v + "=" + 1
}
}
this.v
}
override def merge(other: AccumulatorV2[String, String]): Unit = other match {
case o: StrAccumulator => v += o.v
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: String = v
def getFieldFromConcatString(str: String, delimiter: String, field: String): String = {
val fields = str.split(delimiter)
for (concatField <- fields) {
if (concatField.split("=").length == 2) {
val fieldName = concatField.split("=")(0)
val fieldValue = concatField.split("=")(1)
if (fieldName == field)
return fieldValue
}
}
null
}
def setFieldInConcatString(str: String, delimiter: String, field: String, newValue: String): String = {
val fields = str.split(delimiter)
var break = false
for (i <- 0 until fields.length if !break) {
if (fields(i).split("=")(0) == field) {
val concatField = field + "=" + newValue
fields(i) = concatField
break = true
}
}
fields.mkString("|")
}
}
2.使用
需求:統(tǒng)計(jì)Session總數(shù)的時(shí)候扒磁,同時(shí)計(jì)算Session的步長
測試數(shù)據(jù)
session-1 1
session-1 2
session-1 3
session-2 1
session-2 2
測試代碼
object AccumulatorTest {
def main(args: Array[String]): Unit = {
//創(chuàng)建一個(gè)Config
val conf = new SparkConf()
.setAppName("AccumulatorTest")
.setMaster("local")
//核心創(chuàng)建SparkContext對象
val sc = new SparkContext(conf)
// 注冊累加器
val strAccumulator = new StrAccumulator
sc.register(strAccumulator)
//WordCount
sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\session.txt")
.map(line => {
val lines = line.split("\t")
val sessionId = lines(0)
val pageId = lines(1)
// 累加統(tǒng)計(jì)
strAccumulator.add(sessionId)
(sessionId, 1L)
})
.reduceByKey(_ + _)
.sortBy(_._2, false)
.foreach(println)
println(strAccumulator.value)
//停止SparkContext對象
sc.stop()
}
}
打印結(jié)果
(session-1,123)
(session-2,12)
session-1=3|session-2=2
這樣在統(tǒng)計(jì)Session數(shù)量的同時(shí)庆揪,也計(jì)算了每個(gè)session的步長,當(dāng)然還可以計(jì)算其它屬性妨托。比如每個(gè)Session的會話時(shí)長缸榛,會話時(shí)長區(qū)間統(tǒng)計(jì)吝羞,會話步長區(qū)間統(tǒng)計(jì)等等。