Process Function(過程函數(shù))
ProcessFunction
是一個(gè)低層次的流處理操作罗洗,允許返回所有(無(wú)環(huán)的)流程序的基礎(chǔ)構(gòu)建模塊:
1群扶、事件(event)(流元素)
2只锻、狀態(tài)(state)(容錯(cuò)性友瘤,一致性基公,僅在keyed stream中)
3条篷、定時(shí)器(timers)(event time和processing time需纳, 僅在keyed stream中)
ProcessFunction
可以認(rèn)為是能夠訪問到keyed state和timers的FlatMapFunction
芦倒,輸入流中接收到的每個(gè)事件都會(huì)調(diào)用它來(lái)處理。
對(duì)于容錯(cuò)性狀態(tài)不翩,ProcessFunction可以通過RuntimeContext來(lái)訪問Flink的keyed state袭异,方法與其他狀態(tài)性函數(shù)訪問keyed state一樣荐虐。
定時(shí)器允許應(yīng)用程序?qū)?code>processing time和event time
的變化做出反應(yīng),每次對(duì)processElement(...)
的調(diào)用都會(huì)得到一個(gè)Context
對(duì)象,該對(duì)象允許訪問元素事件時(shí)間的時(shí)間戳和TimeServer
崇裁。TimeServer
可以用來(lái)為尚未發(fā)生的event-time或者processing-time注冊(cè)回調(diào)格嘁,當(dāng)定時(shí)器的時(shí)間到達(dá)時(shí)吠式,onTimer(...)方法會(huì)被調(diào)用柱宦。在這個(gè)調(diào)用期間,所有的狀態(tài)都會(huì)限定到創(chuàng)建定時(shí)器的鍵灭必,并允許定時(shí)器操縱鍵控狀態(tài)(keyed states)狞谱。
注意:如果你想訪問一個(gè)鍵控狀態(tài)(keyed state)和定時(shí)器,你需要將ProcessFunction應(yīng)用到一個(gè)鍵控流(keyed stream)中:
stream.keyBy(...).process(new MyProcessFunction())
低層次的Join(Low-Level Joins)
為了在兩個(gè)輸入流中實(shí)現(xiàn)低層次的操作禁漓,應(yīng)用程序可以使用CoProcessFunction
跟衅,這個(gè)函數(shù)綁定了兩個(gè)不同的輸入流,并通過分別調(diào)用processElement1(...)
和processElement2(...)
來(lái)獲取兩個(gè)不同輸入流中的記錄播歼。
實(shí)現(xiàn)一個(gè)低層次的join通常按下面的模式進(jìn)行:
1伶跷、為一個(gè)輸入源(或者兩個(gè)都)創(chuàng)建一個(gè)狀態(tài)(state)
2、在接收到輸入源中的元素時(shí)更新狀態(tài)(state)
3秘狞、在接收到另一個(gè)輸入源的元素時(shí)叭莫,探查狀態(tài)并產(chǎn)生連接結(jié)果
例如:你可能將客戶數(shù)據(jù)連接到金融交易中,同時(shí)保存客戶信息的狀態(tài)烁试,如果您關(guān)心在無(wú)序事件的情況下進(jìn)行完整和確定性的連接雇初,則當(dāng)客戶數(shù)據(jù)流的水印已經(jīng)通過交易時(shí),您可以使用計(jì)時(shí)器來(lái)評(píng)估和發(fā)布交易的連接减响。
例子
以下示例維護(hù)了每個(gè)key的計(jì)數(shù)靖诗,并且每一分鐘發(fā)出一個(gè)沒有更新key的key/count對(duì)。
1支示、count刊橘,key和last-modification-timestamp保存在ValueState中,該ValueState由key隱含地限定颂鸿。
2伤为、對(duì)于每條記錄,ProcessFunction增加counter的計(jì)數(shù)并設(shè)置最后更新時(shí)間戳
3、該函數(shù)還會(huì)在未來(lái)一分鐘內(nèi)安排回調(diào)(基于event time)
4绞愚、在每次回調(diào)時(shí),它會(huì)根據(jù)存儲(chǔ)的count的最后更新時(shí)間來(lái)檢查callback的事件時(shí)間颖医,如果匹配的話位衩,就會(huì)將key/count對(duì)發(fā)出來(lái)
注意:這個(gè)簡(jiǎn)單的例子可以通過session window來(lái)實(shí)現(xiàn),我們這里使用ProcessFunction是為了說(shuō)明它提供的基本模式熔萧。
Java 代碼:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// 定義源數(shù)據(jù)流
DataStream<Tuple2<String, String>> stream = ...;
// 將 process function 應(yīng)用到一個(gè)鍵控流(keyed stream)中
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
* state中保存的數(shù)據(jù)類型
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
* ProcessFunction的實(shí)現(xiàn)糖驴,用來(lái)維護(hù)計(jì)數(shù)和超時(shí)
*/
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
/** process function維持的狀態(tài) */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
// 獲取當(dāng)前的count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
// 更新 state 的 count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
// 將state的時(shí)間戳設(shè)置為記錄的分配事件時(shí)間戳
current.lastModified = ctx.timestamp();
// write the state back
// 將狀態(tài)寫回
state.update(current);
// schedule the next timer 60 seconds from the current event time
// 從當(dāng)前事件時(shí)間開始計(jì)劃下一個(gè)60秒的定時(shí)器
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
//獲取計(jì)劃定時(shí)器的key的狀態(tài)
CountWithTimestamp result = state.value();
// 檢查是否是過時(shí)的定時(shí)器或最新的定時(shí)器
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
Scala 代碼:
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector
// 定義源數(shù)據(jù)流
val stream: DataStream[Tuple2[String, String]] = ...
// 將 process function 應(yīng)用到一個(gè)鍵控流(keyed stream)中
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
/**
* state中保存的數(shù)據(jù)類型
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* ProcessFunction的實(shí)現(xiàn),用來(lái)維護(hù)計(jì)數(shù)和超時(shí)
*/
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** process function維持的狀態(tài) */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// 初始化或者獲取/更新狀態(tài)
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// 將狀態(tài)寫回
state.update(current)
// 從當(dāng)前事件時(shí)間開始計(jì)劃下一個(gè)60秒的定時(shí)器
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}