描述
- 數據在KeyBy后對key按hash結果進入對應的KeyedProcessFunction subtask
- 同個key的數據的所有數據都會進入同一個KeyedProcessFunction subtask
- 不同key的數據也有可能進入同一個KeyedProcessFunction subtask
- 同一個KeyedProcessFunction subtask中所有數據共享普通變量
- 同一個KeyedProcessFunction subtask中同一個key的數據共享狀態(tài)變量囱修,不同key的數據不共享狀態(tài)變量
- 不同KeyedProcessFunction subtask中所有變量均不共享
- 提供了RuntimeContext的使用
- 提供了Watermark和ProcessingTime的訪問
- 提供了timerService的使用渣淤,當數據即將觸發(fā)定時器時兆蕉,先執(zhí)行processElement函數封救,再執(zhí)行onTimer函數
- 提供了側輸出流的使用
輸入
KeyedStream
輸出
DataStream
KeyedProcessFunction
聲明一個自定義KeyedProcessFunction類
class MyProcessFunction(自定義類參數) extends KeyedProcessFunction[key數據類型, input數據類型, output數據類型] {
// 必須實現processElement方法
override def processElement(value: input數據類型, ctx: KeyedProcessFunction[key數據類型, input數據類型, output數據類型]#Context, out: Collector[output數據類型]): Unit = {
...
}
}
// 使用
dataStream
.keyBy(...)
.process(new MyProcessFunction(...))
// 在processFunction中使用狀態(tài)一個valueState
lazy val myState: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("myState", classOf[Long])
)
// 在processFunction中使用timerService計時器功能
// 聲明一個基于eventTime的計時器, 當該task中的數據eventTime到達觸發(fā)時間戳時专钉,就會調用onTimer方法捺典,并清除該計時器娇斩。 ctx在processElement方法和onTimer方法中均能使用
ctx.timerService().registerEventTimeTimer(觸發(fā)時間戳茅主,單位毫秒)
// 聲明一個基于processTime的計時器部念,當processTime到達觸發(fā)時間戳時弃酌,該task會調用onTimer方法,并清除計時器
ctx.timerService().registerProcessingTimeTimer(觸發(fā)時間戳儡炼,單位毫秒)
// 手動刪除一個eventTime計時器妓湘,需要指定計時器對應的時間戳
ctx.timerService().deleteEventTimeTimer(計時器觸發(fā)時間戳)
// 手動刪除一個processTime計時器,需要指定計時器對應的時間戳
ctx.timerService().deleteProcessingTimeTimer(計時器觸發(fā)時間戳)
// 實現onTimer方法
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[key數據類型, input數據類型, output數據類型]#OnTimerContext, out: Collector[output數據類型]): Unit = {
...
//其中timestamp為聲明計時器時指定的eventTime或processTime
}
// 訪問task中的watermark乌询,watermark與eventTime對齊
ctx.timerService().currentWatermark()
// 訪問task中的processTime
ctx.timerService().currentProcessingTime()
// 使用側輸出流
ctx.output(new OutputTag[output數據類型]("定義測輸出流id"), output value)