Spark實例-自定義聚合函數(shù)

Spark自定義聚合函數(shù)時,需要實現(xiàn)UserDefinedAggregateFunction中8個方法:

  • inputSchema:輸入的數(shù)據(jù)類型
  • bufferSchema:中間聚合處理時,需要處理的數(shù)據(jù)類型
  • dataType:函數(shù)的返回類型
  • deterministic:是否是確定的
  • initialize:為每個分組的數(shù)據(jù)初始化
  • update:每個分組衙传,有新的值進來時,如何進行分組的聚合計算
  • merge:由于Spark是分布式的憎妙,所以一個分組的數(shù)據(jù)搂擦,可能會在不同的節(jié)點上進行局部聚合,就是update类嗤,但是最后一個分組,在各節(jié)點上的聚合值辨宠,要進行Merge遗锣,也就是合并
  • evaluate:一個分組的聚合值,如何通過中間的聚合值嗤形,最后返回一個最終的聚合值
    實例代碼:
package com.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

/**
  * Created by Administrator on 2017/3/13.
  * 用戶自定義聚合函數(shù)
  */
class StrCountUDAF extends  UserDefinedAggregateFunction{
  //輸入的數(shù)據(jù)類型
  override def inputSchema: StructType = {
    StructType(Array(
      StructField("str",StringType,true)
    ))
  }
  //中間聚合處理時精偿,所處理的數(shù)據(jù)類型
  override def bufferSchema: StructType = {
    StructType(Array(
      StructField("count",IntegerType,true)
    ))
  }
  //函數(shù)的返回類型
  override def dataType: DataType = {
    IntegerType
  }

  override def deterministic: Boolean = {
    true
  }
  //為每個分組的數(shù)據(jù)初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=0
  }
  //指的是,每個分組赋兵,有新的值進來時笔咽,如何進行分組的聚合計算
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[Int](0)+1
  }
  //由于Spark是分布式的,所以一個分組的數(shù)據(jù)霹期,可能會在不同的節(jié)點上進行局部聚合叶组,就是update
  //但是最后一個分組,在各節(jié)點上的聚合值历造,要進行Merge甩十,也就是合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  }
  //一個分組的聚合值船庇,如何通過中間的聚合值,最后返回一個最終的聚合值
  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
  }
}

  • 聚合函數(shù)的使用
package com.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

object UDAF extends App{
  val conf = new SparkConf()
    .setMaster("local")
    .setAppName("DailyUVFunction")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  //導(dǎo)入隱式轉(zhuǎn)化
  import sqlContext.implicits._
  //構(gòu)造用戶的訪問數(shù)據(jù)枣氧,并創(chuàng)建DataFrame
  val names=Array("tom","yangql","mary","test","test")
  val namesRDD = sc.parallelize(names)
  //將RDD轉(zhuǎn)換為DataFram
  val namesRowRDD=namesRDD.map(name=>Row(name))
  val structType=StructType(Array(
    StructField("name",StringType,true)
  ))
  val namesDF=sqlContext.createDataFrame(namesRowRDD,structType)
  //注冊表
  namesDF.createOrReplaceTempView("names")
  //定義和注冊自定義函數(shù)
  sqlContext.udf.register("strCount",new StrCountUDAF)
  //使用自定義函數(shù)
  val df=sqlContext.sql("select name,strCount(name)  from names group by name")
  df.show()
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溢十,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子达吞,更是在濱河造成了極大的恐慌张弛,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酪劫,死亡現(xiàn)場離奇詭異吞鸭,居然都是意外死亡,警方通過查閱死者的電腦和手機覆糟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門刻剥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人滩字,你說我怎么就攤上這事造虏。” “怎么了麦箍?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵漓藕,是天一觀的道長。 經(jīng)常有香客問我挟裂,道長享钞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任诀蓉,我火速辦了婚禮栗竖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘渠啤。我一直安慰自己狐肢,他們只是感情好,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布沥曹。 她就那樣靜靜地躺著份名,像睡著了一般。 火紅的嫁衣襯著肌膚如雪架专。 梳的紋絲不亂的頭發(fā)上同窘,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天玄帕,我揣著相機與錄音部脚,去河邊找鬼。 笑死裤纹,一個胖子當著我的面吹牛委刘,可吹牛的內(nèi)容都是我干的丧没。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼锡移,長吁一口氣:“原來是場噩夢啊……” “哼呕童!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起淆珊,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤夺饲,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后施符,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體往声,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年戳吝,在試婚紗的時候發(fā)現(xiàn)自己被綠了浩销。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡听哭,死狀恐怖慢洋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情陆盘,我是刑警寧澤普筹,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站礁遣,受9級特大地震影響斑芜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜祟霍,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一杏头、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧沸呐,春花似錦醇王、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至呼渣,卻和暖如春棘伴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背屁置。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工焊夸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蓝角。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓阱穗,卻偏偏與公主長得像饭冬,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子揪阶,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容