聊聊flink Table的Distinct Aggregation

本文主要研究一下flink Table的Distinct Aggregation

實(shí)例

//Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation.
Table orders = tableEnv.scan("Orders");
// Distinct aggregation on group by
Table groupByDistinctResult = orders
    .groupBy("a")
    .select("a, b.sum.distinct as d");
// Distinct aggregation on time window group by
Table groupByWindowDistinctResult = orders
    .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
    .select("a, b.sum.distinct as d");
// Distinct aggregation on over window
Table result = orders
    .window(Over
        .partitionBy("a")
        .orderBy("rowtime")
        .preceding("UNBOUNDED_RANGE")
        .as("w"))
    .select("a, b.avg.distinct over w, b.max over w, b.min over w");

//User-defined aggregation function can also be used with DISTINCT modifiers
Table orders = tEnv.scan("Orders");
// Use distinct aggregation for user-defined aggregate functions
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
  • Distinct Aggregation可以用于內(nèi)置的及自定義的aggregation function浮梢;內(nèi)置的aggregation function諸如GroupBy Aggregation、GroupBy Window Aggregation彤路、Over Window Aggregation

AggregateFunction

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scala

/**
  * Base class for User-Defined Aggregates.
  *
  * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
  * methods. An [[AggregateFunction]] needs at least three methods:
  *  - createAccumulator,
  *  - accumulate, and
  *  - getValue.
  *
  *  There are a few other methods that can be optional to have:
  *  - retract,
  *  - merge, and
  *  - resetAccumulator
  *
  * All these methods must be declared publicly, not static and named exactly as the names
  * mentioned above. The methods createAccumulator and getValue are defined in the
  * [[AggregateFunction]] functions, while other methods are explained below.
  *
  *
  * {{{
  * Processes the input values and update the provided accumulator instance. The method
  * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
  * requires at least one accumulate() method.
  *
  * @param accumulator           the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  *
  * def accumulate(accumulator: ACC, [user defined inputs]): Unit
  * }}}
  *
  *
  * {{{
  * Retracts the input values from the accumulator instance. The current design assumes the
  * inputs are the values that have been previously accumulated. The method retract can be
  * overloaded with different custom types and arguments. This function must be implemented for
  * datastream bounded over aggregate.
  *
  * @param accumulator           the accumulator which contains the current aggregated results
  * @param [user defined inputs] the input value (usually obtained from a new arrived data).
  *
  * def retract(accumulator: ACC, [user defined inputs]): Unit
  * }}}
  *
  *
  * {{{
  * Merges a group of accumulator instances into one accumulator instance. This function must be
  * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which will keep the merged aggregate results. It should
  *                     be noted that the accumulator may contain the previous aggregated
  *                     results. Therefore user should not replace or clean this instance in the
  *                     custom merge method.
  * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
  *                     merged.
  *
  * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
  * }}}
  *
  *
  * {{{
  * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
  * dataset grouping aggregate.
  *
  * @param accumulator  the accumulator which needs to be reset
  *
  * def resetAccumulator(accumulator: ACC): Unit
  * }}}
  *
  *
  * @tparam T   the type of the aggregation result
  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  *             AggregateFunction represents its state using accumulator, thereby the state of the
  *             AggregateFunction must be put into the accumulator.
  */
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
  /**
    * Creates and init the Accumulator for this [[AggregateFunction]].
    *
    * @return the accumulator with the initial value
    */
  def createAccumulator(): ACC

  /**
    * Called every time when an aggregation result should be materialized.
    * The returned value could be either an early and incomplete result
    * (periodically emitted as data arrive) or the final result of the
    * aggregation.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @return the aggregation result
    */
  def getValue(accumulator: ACC): T

    /**
    * Returns true if this AggregateFunction can only be applied in an OVER window.
    *
    * @return true if the AggregateFunction requires an OVER window, false otherwise.
    */
  def requiresOver: Boolean = false

  /**
    * Returns the TypeInformation of the AggregateFunction's result.
    *
    * @return The TypeInformation of the AggregateFunction's result or null if the result type
    *         should be automatically inferred.
    */
  def getResultType: TypeInformation[T] = null

  /**
    * Returns the TypeInformation of the AggregateFunction's accumulator.
    *
    * @return The TypeInformation of the AggregateFunction's accumulator or null if the
    *         accumulator type should be automatically inferred.
    */
  def getAccumulatorType: TypeInformation[ACC] = null
}
  • AggregateFunction繼承了UserDefinedFunction秕硝;它有兩個(gè)泛型,一個(gè)T表示value的泛型洲尊,一個(gè)ACC表示Accumulator的泛型远豺;它定義了createAccumulator、getValue坞嘀、getResultType躯护、getAccumulatorType方法(這幾個(gè)方法中子類必須實(shí)現(xiàn)createAccumulator、getValue方法)
  • 對(duì)于AggregateFunction丽涩,有一個(gè)accumulate方法這里沒(méi)定義榛做,但是需要子類定義及實(shí)現(xiàn),該方法接收ACC,T兩個(gè)參數(shù),返回void检眯;另外還有retract厘擂、merge、resetAccumulator三個(gè)方法是可選的锰瘸,需要子類根據(jù)情況去定義及實(shí)現(xiàn)
  • 對(duì)于datastream bounded over aggregate操作刽严,要求實(shí)現(xiàn)restract方法,該方法接收ACC,T兩個(gè)參數(shù)避凝,返回void舞萄;對(duì)于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求實(shí)現(xiàn)merge方法管削,該方法接收ACC,java.lang.Iterable<T>兩個(gè)參數(shù)倒脓,返回void;對(duì)于dataset grouping aggregate操作含思,要求實(shí)現(xiàn)resetAccumulator方法崎弃,該方法接收ACC參數(shù),返回void

小結(jié)

  • Table的Distinct Aggregation可以用于內(nèi)置的及自定義的aggregation function含潘;內(nèi)置的aggregation function諸如GroupBy Aggregation饲做、GroupBy Window Aggregation、Over Window Aggregation
  • AggregateFunction繼承了UserDefinedFunction遏弱;它有兩個(gè)泛型盆均,一個(gè)T表示value的泛型,一個(gè)ACC表示Accumulator的泛型漱逸;它定義了createAccumulator泪姨、getValue、getResultType饰抒、getAccumulatorType方法(這幾個(gè)方法中子類必須實(shí)現(xiàn)createAccumulator肮砾、getValue方法)
  • 對(duì)于AggregateFunction,有一個(gè)accumulate方法這里沒(méi)定義循集,但是需要子類定義及實(shí)現(xiàn)唇敞,該方法接收ACC,T兩個(gè)參數(shù),返回void咒彤;另外還有retract疆柔、merge、resetAccumulator三個(gè)方法是可選的镶柱,需要子類根據(jù)情況去定義及實(shí)現(xiàn)(對(duì)于datastream bounded over aggregate操作旷档,要求實(shí)現(xiàn)restract方法,該方法接收ACC,T兩個(gè)參數(shù)歇拆,返回void鞋屈;對(duì)于datastream session window grouping aggregate以及dataset grouping aggregate操作范咨,要求實(shí)現(xiàn)merge方法,該方法接收ACC,java.lang.Iterable\<T\>兩個(gè)參數(shù)厂庇,返回void渠啊;對(duì)于dataset grouping aggregate操作,要求實(shí)現(xiàn)resetAccumulator方法权旷,該方法接收ACC參數(shù)替蛉,返回void)

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市拄氯,隨后出現(xiàn)的幾起案子躲查,更是在濱河造成了極大的恐慌,老刑警劉巖译柏,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件镣煮,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡鄙麦,警方通過(guò)查閱死者的電腦和手機(jī)典唇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)黔衡,“玉大人蚓聘,你說(shuō)我怎么就攤上這事腌乡∶私伲” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵与纽,是天一觀的道長(zhǎng)凑队。 經(jīng)常有香客問(wèn)我卓箫,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任铺坞,我火速辦了婚禮,結(jié)果婚禮上挟憔,老公的妹妹穿的比我還像新娘抠藕。我一直安慰自己,他們只是感情好勺阐,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布卷中。 她就那樣靜靜地躺著,像睡著了一般渊抽。 火紅的嫁衣襯著肌膚如雪蟆豫。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,718評(píng)論 1 305
  • 那天懒闷,我揣著相機(jī)與錄音十减,去河邊找鬼栈幸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛帮辟,可吹牛的內(nèi)容都是我干的速址。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼由驹,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼壳繁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起荔棉,我...
    開(kāi)封第一講書人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤闹炉,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后润樱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渣触,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年壹若,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嗅钻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡店展,死狀恐怖养篓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情赂蕴,我是刑警寧澤柳弄,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站概说,受9級(jí)特大地震影響碧注,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜糖赔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一萍丐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧放典,春花似錦逝变、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至声怔,卻和暖如春态贤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背醋火。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工悠汽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留箱吕,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓柿冲,卻偏偏與公主長(zhǎng)得像茬高,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子假抄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容