Spark之自定義AccumulatorV2

本文介紹如何使用Spark2中自定義累加器來實(shí)現(xiàn)數(shù)據(jù)的統(tǒng)計(jì)低飒。

Spark2.x之后霹崎,之前的的accumulator被廢除胯陋,用AccumulatorV2代替;

1.自定義Accumulator

class StrAccumulator extends AccumulatorV2[String, String] {

  // a=10|b=20
  private var v = ""

  override def isZero: Boolean = v == ""

  override def copy(): AccumulatorV2[String, String] = {
    val newAcc = new StrAccumulator
    newAcc.v = this.v
    newAcc
  }

  override def reset(): Unit = v = ""

  override def add(v: String): Unit = {
    if (v == null || v == "") {
      return this.v
    }

    val oldValue = getFieldFromConcatString(this.v, "\\|", v)
    if (oldValue != null) {
      val newValue = (oldValue.toInt + 1).toString
      this.v = setFieldInConcatString(this.v, "\\|", v, newValue)
    } else {
      if (isZero) {
        this.v = v + "=" + 1
      } else {
        this.v = this.v + "|" + v + "=" + 1
      }
    }

    this.v
  }

  override def merge(other: AccumulatorV2[String, String]): Unit = other match {
    case o: StrAccumulator => v += o.v
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: String = v

  def getFieldFromConcatString(str: String, delimiter: String, field: String): String = {
    val fields = str.split(delimiter)
    for (concatField <- fields) {
      if (concatField.split("=").length == 2) {
        val fieldName = concatField.split("=")(0)
        val fieldValue = concatField.split("=")(1)
        if (fieldName == field)
          return fieldValue
      }
    }
    null
  }

  def setFieldInConcatString(str: String, delimiter: String, field: String, newValue: String): String = {
    val fields = str.split(delimiter)

    var break = false
    for (i <- 0 until fields.length if !break) {
      if (fields(i).split("=")(0) == field) {
        val concatField = field + "=" + newValue
        fields(i) = concatField
        break = true
      }
    }

    fields.mkString("|")
  }
}

2.使用

需求:統(tǒng)計(jì)Session總數(shù)的時(shí)候扒磁,同時(shí)計(jì)算Session的步長

測試數(shù)據(jù)

session-1   1
session-1   2
session-1   3
session-2   1
session-2   2

測試代碼

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建一個(gè)Config
    val conf = new SparkConf()
      .setAppName("AccumulatorTest")
      .setMaster("local")

    //核心創(chuàng)建SparkContext對象
    val sc = new SparkContext(conf)

    // 注冊累加器
    val strAccumulator = new StrAccumulator
    sc.register(strAccumulator)

    //WordCount
    sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\session.txt")
      .map(line => {
        val lines = line.split("\t")
        val sessionId = lines(0)
        val pageId = lines(1)

        // 累加統(tǒng)計(jì)
        strAccumulator.add(sessionId)

        (sessionId, 1L)
      })
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .foreach(println)

    println(strAccumulator.value)

    //停止SparkContext對象
    sc.stop()
  }
}

打印結(jié)果

(session-1,123)
(session-2,12)

session-1=3|session-2=2

這樣在統(tǒng)計(jì)Session數(shù)量的同時(shí)庆揪,也計(jì)算了每個(gè)session的步長,當(dāng)然還可以計(jì)算其它屬性妨托。比如每個(gè)Session的會話時(shí)長缸榛,會話時(shí)長區(qū)間統(tǒng)計(jì)吝羞,會話步長區(qū)間統(tǒng)計(jì)等等。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末内颗,一起剝皮案震驚了整個(gè)濱河市钧排,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌均澳,老刑警劉巖恨溜,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異找前,居然都是意外死亡筒捺,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門纸厉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人五嫂,你說我怎么就攤上這事颗品。” “怎么了沃缘?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵躯枢,是天一觀的道長。 經(jīng)常有香客問我槐臀,道長锄蹂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任水慨,我火速辦了婚禮得糜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘晰洒。我一直安慰自己朝抖,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布谍珊。 她就那樣靜靜地躺著治宣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪砌滞。 梳的紋絲不亂的頭發(fā)上侮邀,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機(jī)與錄音贝润,去河邊找鬼绊茧。 笑死,一個(gè)胖子當(dāng)著我的面吹牛打掘,可吹牛的內(nèi)容都是我干的按傅。 我是一名探鬼主播捉超,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼唯绍!你這毒婦竟也來了拼岳?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤况芒,失蹤者是張志新(化名)和其女友劉穎惜纸,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绝骚,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡耐版,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了压汪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粪牲。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖止剖,靈堂內(nèi)的尸體忽然破棺而出腺阳,到底是詐尸還是另有隱情,我是刑警寧澤穿香,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布亭引,位于F島的核電站,受9級特大地震影響皮获,放射性物質(zhì)發(fā)生泄漏焙蚓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一洒宝、第九天 我趴在偏房一處隱蔽的房頂上張望购公。 院中可真熱鬧,春花似錦雁歌、人聲如沸君丁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽绘闷。三九已至,卻和暖如春较坛,著一層夾襖步出監(jiān)牢的瞬間印蔗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工丑勤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留华嘹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓法竞,卻偏偏與公主長得像耙厚,于是被迫代替她去往敵國和親强挫。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容