Flink的Process Function(低層次操作)

Process Function(過程函數(shù))

ProcessFunction是一個(gè)低層次的流處理操作罗洗,允許返回所有(無(wú)環(huán)的)流程序的基礎(chǔ)構(gòu)建模塊:
  1群扶、事件(event)(流元素)
  2只锻、狀態(tài)(state)(容錯(cuò)性友瘤,一致性基公,僅在keyed stream中)
  3条篷、定時(shí)器(timers)(event time和processing time需纳, 僅在keyed stream中)

ProcessFunction可以認(rèn)為是能夠訪問到keyed state和timers的FlatMapFunction芦倒,輸入流中接收到的每個(gè)事件都會(huì)調(diào)用它來(lái)處理。

對(duì)于容錯(cuò)性狀態(tài)不翩,ProcessFunction可以通過RuntimeContext來(lái)訪問Flink的keyed state袭异,方法與其他狀態(tài)性函數(shù)訪問keyed state一樣荐虐。

定時(shí)器允許應(yīng)用程序?qū)?code>processing time和event time的變化做出反應(yīng),每次對(duì)processElement(...)的調(diào)用都會(huì)得到一個(gè)Context對(duì)象,該對(duì)象允許訪問元素事件時(shí)間的時(shí)間戳和TimeServer崇裁。TimeServer可以用來(lái)為尚未發(fā)生的event-time或者processing-time注冊(cè)回調(diào)格嘁,當(dāng)定時(shí)器的時(shí)間到達(dá)時(shí)吠式,onTimer(...)方法會(huì)被調(diào)用柱宦。在這個(gè)調(diào)用期間,所有的狀態(tài)都會(huì)限定到創(chuàng)建定時(shí)器的鍵灭必,并允許定時(shí)器操縱鍵控狀態(tài)(keyed states)狞谱。

注意:如果你想訪問一個(gè)鍵控狀態(tài)(keyed state)和定時(shí)器,你需要將ProcessFunction應(yīng)用到一個(gè)鍵控流(keyed stream)中:

stream.keyBy(...).process(new MyProcessFunction())

低層次的Join(Low-Level Joins)

為了在兩個(gè)輸入流中實(shí)現(xiàn)低層次的操作禁漓,應(yīng)用程序可以使用CoProcessFunction跟衅,這個(gè)函數(shù)綁定了兩個(gè)不同的輸入流,并通過分別調(diào)用processElement1(...)processElement2(...)來(lái)獲取兩個(gè)不同輸入流中的記錄播歼。

實(shí)現(xiàn)一個(gè)低層次的join通常按下面的模式進(jìn)行:
  1伶跷、為一個(gè)輸入源(或者兩個(gè)都)創(chuàng)建一個(gè)狀態(tài)(state)
  2、在接收到輸入源中的元素時(shí)更新狀態(tài)(state)
  3秘狞、在接收到另一個(gè)輸入源的元素時(shí)叭莫,探查狀態(tài)并產(chǎn)生連接結(jié)果
例如:你可能將客戶數(shù)據(jù)連接到金融交易中,同時(shí)保存客戶信息的狀態(tài)烁试,如果您關(guān)心在無(wú)序事件的情況下進(jìn)行完整和確定性的連接雇初,則當(dāng)客戶數(shù)據(jù)流的水印已經(jīng)通過交易時(shí),您可以使用計(jì)時(shí)器來(lái)評(píng)估和發(fā)布交易的連接减响。

例子

以下示例維護(hù)了每個(gè)key的計(jì)數(shù)靖诗,并且每一分鐘發(fā)出一個(gè)沒有更新key的key/count對(duì)。
  1支示、count刊橘,key和last-modification-timestamp保存在ValueState中,該ValueState由key隱含地限定颂鸿。
  2伤为、對(duì)于每條記錄,ProcessFunction增加counter的計(jì)數(shù)并設(shè)置最后更新時(shí)間戳
  3、該函數(shù)還會(huì)在未來(lái)一分鐘內(nèi)安排回調(diào)(基于event time)
  4绞愚、在每次回調(diào)時(shí),它會(huì)根據(jù)存儲(chǔ)的count的最后更新時(shí)間來(lái)檢查callback的事件時(shí)間颖医,如果匹配的話位衩,就會(huì)將key/count對(duì)發(fā)出來(lái)

注意:這個(gè)簡(jiǎn)單的例子可以通過session window來(lái)實(shí)現(xiàn),我們這里使用ProcessFunction是為了說(shuō)明它提供的基本模式熔萧。
Java 代碼:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// 定義源數(shù)據(jù)流
DataStream<Tuple2<String, String>> stream = ...;

// 將 process function 應(yīng)用到一個(gè)鍵控流(keyed stream)中
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 * state中保存的數(shù)據(jù)類型
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 * ProcessFunction的實(shí)現(xiàn)糖驴,用來(lái)維護(hù)計(jì)數(shù)和超時(shí)
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    /** process function維持的狀態(tài) */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        // 獲取當(dāng)前的count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
         // 更新 state 的 count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        // 將state的時(shí)間戳設(shè)置為記錄的分配事件時(shí)間戳
        current.lastModified = ctx.timestamp();

        // write the state back
        // 將狀態(tài)寫回
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        // 從當(dāng)前事件時(shí)間開始計(jì)劃下一個(gè)60秒的定時(shí)器
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        //獲取計(jì)劃定時(shí)器的key的狀態(tài)
        CountWithTimestamp result = state.value();

        // 檢查是否是過時(shí)的定時(shí)器或最新的定時(shí)器
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

Scala 代碼:

import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector

//  定義源數(shù)據(jù)流
val stream: DataStream[Tuple2[String, String]] = ...

// 將 process function 應(yīng)用到一個(gè)鍵控流(keyed stream)中
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  *  state中保存的數(shù)據(jù)類型
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * ProcessFunction的實(shí)現(xiàn),用來(lái)維護(hù)計(jì)數(shù)和超時(shí)
  */
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

  /** process function維持的狀態(tài)  */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
    // 初始化或者獲取/更新狀態(tài)
 
    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // 將狀態(tài)寫回
    state.update(current)

    // 從當(dāng)前事件時(shí)間開始計(jì)劃下一個(gè)60秒的定時(shí)器
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末佛致,一起剝皮案震驚了整個(gè)濱河市贮缕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌俺榆,老刑警劉巖感昼,帶你破解...
    沈念sama閱讀 222,464評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異罐脊,居然都是意外死亡定嗓,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門萍桌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)宵溅,“玉大人,你說(shuō)我怎么就攤上這事上炎∈崖撸” “怎么了?”我有些...
    開封第一講書人閱讀 169,078評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵藕施,是天一觀的道長(zhǎng)寇损。 經(jīng)常有香客問我,道長(zhǎng)铅碍,這世上最難降的妖魔是什么润绵? 我笑而不...
    開封第一講書人閱讀 59,979評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮胞谈,結(jié)果婚禮上尘盼,老公的妹妹穿的比我還像新娘。我一直安慰自己烦绳,他們只是感情好卿捎,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著径密,像睡著了一般午阵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,584評(píng)論 1 312
  • 那天底桂,我揣著相機(jī)與錄音植袍,去河邊找鬼。 笑死籽懦,一個(gè)胖子當(dāng)著我的面吹牛于个,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播暮顺,決...
    沈念sama閱讀 41,085評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼厅篓,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了捶码?” 一聲冷哼從身側(cè)響起羽氮,我...
    開封第一講書人閱讀 40,023評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎惫恼,沒想到半個(gè)月后档押,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,555評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡尤筐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評(píng)論 3 342
  • 正文 我和宋清朗相戀三年汇荐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盆繁。...
    茶點(diǎn)故事閱讀 40,769評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡掀淘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出油昂,到底是詐尸還是另有隱情革娄,我是刑警寧澤,帶...
    沈念sama閱讀 36,439評(píng)論 5 351
  • 正文 年R本政府宣布冕碟,位于F島的核電站拦惋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏安寺。R本人自食惡果不足惜厕妖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望挑庶。 院中可真熱鬧言秸,春花似錦、人聲如沸迎捺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)凳枝。三九已至抄沮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背叛买。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工砂代, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人率挣。 一個(gè)月前我還...
    沈念sama閱讀 49,191評(píng)論 3 378
  • 正文 我出身青樓泊藕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親难礼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容