0 相關源碼
掌握Flink中三種常用的Time處理方式,掌握Flink中滾動窗口以及滑動窗口的使用播急,了解Flink中的watermark纱烘。
Flink 在流處理工程中支持不同的時間概念。
1 處理時間(Processing time)
執(zhí)行相應算子操作的機器的系統時間.
當流程序在處理時間運行時厘唾,所有基于時間的 算子操作(如時間窗口)將使用運行相應算子的機器的系統時鐘介杆。每小時處理時間窗口將包括在系統時鐘指示整個小時之間到達特定算子的所有記錄鹃操。
例如,如果應用程序在上午9:15開始運行春哨,則第一個每小時處理時間窗口將包括在上午9:15到上午10:00之間處理的事件荆隘,下一個窗口將包括在上午10:00到11:00之間處理的事件
處理時間是最簡單的時間概念,不需要流和機器之間的協調
它提供最佳性能和最低延遲赴背。但是椰拒,在分布式和異步環(huán)境中晶渠,處理時間不提供確定性,因為它容易受到記錄到達系統的速度(例如從消息隊列)到記錄在系統內的算子之間流動的速度的影響燃观。和停電(調度或其他)褒脯。
2 事件時間(Event time)
每個單獨的事件在其生產設備上發(fā)生的時間.
此時間通常在進入Flink之前內置在記錄中,并且可以從每個記錄中提取該事件時間戳缆毁。
在事件時間番川,時間的進展取決于數據,而不是任何掛鐘脊框。
事件時間程序必須指定如何生成事件時間水印颁督,這是表示事件時間進度的機制.
在一個完美的世界中,事件時間處理將產生完全一致和確定的結果浇雹,無論事件何時到達沉御,或者順序.
但是,除非事件已知按順序到達(按時間戳)昭灵,否則事件時間處理會在等待無序事件時產生一些延遲吠裆。由于只能等待一段有限的時間,因此限制了確定性事件時間應用程序的可能性虎锚。
假設所有數據都已到達硫痰,算子操作將按預期運行,即使在處理無序或延遲事件或重新處理歷史數據時也會產生正確且一致的結果窜护。
例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄非春,無論它們到達的順序如何柱徙,或者何時處理它們。(有關更多信息奇昙,請參閱有關遲發(fā)事件的部分护侮。)
請注意,有時當事件時間程序實時處理實時數據時储耐,它們將使用一些處理時間 算子操作羊初,以確保它們及時進行。
3 攝取時間(Ingestion time)
事件進入Flink的時間.
在源算子處什湘,每個記錄將源的當前時間作為時間戳长赞,并且基于時間的算子操作(如時間窗口)引用該時間戳。
在概念上位于事件時間和處理時間之間闽撤。
- 與處理時間相比 得哆,它成本稍微高一些,但可以提供更可預測的結果哟旗。因為使用穩(wěn)定的時間戳(在源處分配一次)贩据,所以對記錄的不同窗口 算子操作將引用相同的時間戳栋操,而在處理時間中,每個窗口算子可以將記錄分配給不同的窗口(基于本地系統時鐘和任何運輸延誤)
- 與事件時間相比饱亮,無法處理任何無序事件或后期數據矾芙,但程序不必指定如何生成水印。
在內部近上,攝取時間與事件時間非常相似剔宪,但具有自動時間戳分配和自動水印生成函數
4 設置時間特性
Flink DataStream程序的第一部分通常設置基本時間特性
- 顯然,在Flink的流式處理環(huán)境中,默認使用處理時間
該設置定義了數據流源的行為方式(例如,它們是否將分配時間戳)戈锻,以及窗口 算子操作應該使用的時間概念,比如
KeyedStream.timeWindow(Time.seconds(30))歼跟。
以下示例顯示了一個Flink程序,該程序在每小時時間窗口中聚合事件格遭。窗口的行為適應時間特征哈街。
- Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 可選的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
- Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
請注意,為了在事件時間運行此示例拒迅,程序需要使用直接為數據定義事件時間的源并自行發(fā)出水印骚秦,或者程序必須在源之后注入時間戳分配器和水印生成器。這些函數描述了如何訪問事件時間戳璧微,以及事件流表現出的無序程度作箍。
5 Windows
5.1 簡介
Windows是處理無限流的核心。Windows將流拆分為有限大小的“桶”前硫,我們可以在其上應用計算胞得。我們重點介紹如何在Flink中執(zhí)行窗口,以及程序員如何從其提供的函數中獲益最大化屹电。
窗口Flink程序的一般結構如下所示
- 第一個片段指的是被Keys化流
- 而第二個片段指的是非被Keys化流
正如所看到的阶剑,唯一的區(qū)別是keyBy(...)呼吁Keys流和window(...)成為windowAll(...)非被Key化的數據流。這也將作為頁面其余部分的路線圖危号。
Keyed Windows
Non-Keyed Windows
在上面牧愁,方括號(...)中的命令是可選的。這表明Flink允許您以多種不同方式自定義窗口邏輯外莲,以便最適合您的需求猪半。
5.2 窗口生命周期
簡而言之,只要應該屬于此窗口的第一個數據元到達偷线,就會創(chuàng)建一個窗口磨确,當時間(事件或處理時間)超過其結束時間戳加上用戶指定 時,窗口將被完全刪除allowed lateness(請參閱允許的延遲))淋昭。Flink保證僅刪除基于時間的窗口而不是其他類型俐填,例如全局窗口(請參閱窗口分配器)。例如翔忽,使用基于事件時間的窗口策略英融,每5分鐘創(chuàng)建一個非重疊(或翻滾)的窗口盏檐,并允許延遲1分鐘,Flink將創(chuàng)建一個新窗口驶悟,用于間隔12:00和12:05當具有落入此間隔的時間戳的第一個數據元到達時胡野,當水印通過12:06 時間戳時它將刪除它。
此外痕鳍,每個窗口將具有Trigger和一個函數(ProcessWindowFunction硫豆,ReduceFunction, AggregateFunction或FoldFunction)連接到它笼呆。該函數將包含要應用于窗口內容的計算熊响,而Trigger指定窗口被認為準備好應用該函數的條件。
觸發(fā)策略可能類似于“當窗口中的數據元數量大于4”時诗赌,或“當水印通過窗口結束時”汗茄。
觸發(fā)器還可以決定在創(chuàng)建和刪除之間的任何時間清除窗口的內容。在這種情況下铭若,清除僅指窗口中的數據元洪碳,而不是窗口元數據。這意味著仍然可以將新數據添加到該窗口叼屠。
除了上述內容之外瞳腌,您還可以指定一個Evictor,它可以在觸發(fā)器觸發(fā)后以及應用函數之前和/或之后從窗口中刪除數據元镜雨。
5.3 被Keys化與非被Keys化Windows
要指定的第一件事是您的流是否應該鍵入嫂侍。必須在定義窗口之前完成此 算子操作。使用the keyBy(...)將您的無限流分成邏輯被Key化的數據流荚坞。如果keyBy(...)未調用吵冒,則表示您的流不是被Keys化的。
對于被Key化的數據流西剥,可以將傳入事件的任何屬性用作鍵(此處有更多詳細信息)。擁有被Key化的數據流將允許您的窗口計算由多個任務并行執(zhí)行亿汞,因為每個邏輯被Key化的數據流可以獨立于其余任務進行處理瞭空。引用相同Keys的所有數據元將被發(fā)送到同一個并行任務。
在非被Key化的數據流的情況下疗我,您的原始流將不會被拆分為多個邏輯流咆畏,并且所有窗口邏輯將由單個任務執(zhí)行,即并行度為1吴裤。
6 窗口分配器
指定流是否已鍵入后旧找,下一步是定義一個窗口分配器.
窗口分配器定義如何將數據元分配給窗口,這是通過WindowAssigner 在window(...)(對于被Keys化流)或windowAll()(對于非被Keys化流)調用中指定您的選擇來完成的
WindowAssigner
負責將每個傳入數據元分配給一個或多個窗口
Flink帶有預定義的窗口分配器,用于最常見的用例麦牺,即
- 滾動窗口
- 滑動窗口
- 會話窗口
- 全局窗口
還可以通過擴展WindowAssigner類來實現自定義窗口分配器钮蛛。所有內置窗口分配器(全局窗口除外)都根據時間為窗口分配數據元鞭缭,這可以是處理時間或事件時間。請查看我們關于活動時間的部分魏颓,了解處理時間和事件時間之間的差異以及時間戳和水印的生成方式岭辣。
基于時間的窗口具有開始時間戳(包括)和結束時間戳(不包括),它們一起描述窗口的大小甸饱。
在代碼中沦童,Flink在使用TimeWindow基于時間的窗口時使用,該窗口具有查詢開始和結束時間戳的方法maxTimestamp()返回給定窗口的最大允許時間戳
下圖顯示了每個分配者的工作情況叹话。紫色圓圈表示流的數據元偷遗,這些數據元由某個鍵(在這種情況下是用戶1,用戶2和用戶3)劃分驼壶。x軸顯示時間的進度氏豌。
6.1 滾動窗口
一個滾動窗口分配器的每個數據元分配給指定的窗口的窗口大小。滾動窗口具有固定的尺寸辅柴,不重疊.
例如箩溃,如果指定大小為5分鐘的翻滾窗口,則將評估當前窗口碌嘀,并且每五分鐘將啟動一個新窗口涣旨,如下圖所示
以下代碼段顯示了如何使用滾動窗口。
- Java
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
- Scala
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
-
Scala
-
Java
6.2 滑動窗口
該滑動窗口分配器分配元件以固定長度的窗口股冗。與滾動窗口分配器類似霹陡,窗口大小由窗口大小參數配置
附加的窗口滑動參數控制滑動窗口的啟動頻率。因此止状,如果幻燈片小于窗口大小烹棉,則滑動窗口可以重疊。在這種情況下怯疤,數據元被分配給多個窗口浆洗。
例如,您可以將大小為10分鐘的窗口滑動5分鐘集峦。有了這個伏社,你每隔5分鐘就會得到一個窗口,其中包含過去10分鐘內到達的事件塔淤,如下圖所示摘昌。
以下代碼段顯示了如何使用滑動窗口
- Java
DataStream<T> input = ...;
// 滑動 事件時間 窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑動 處理時間 窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
- Scala
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
7 窗口函數
定義窗口分配器后,我們需要指定要在每個窗口上執(zhí)行的計算高蜂。這是窗口函數的職責聪黎,窗口函數用于在系統確定窗口準備好進行處理后處理每個(可能是被Keys化的)窗口的數據元
的窗函數可以是一個ReduceFunction,AggregateFunction备恤,FoldFunction或ProcessWindowFunction稿饰。前兩個可以更有效地執(zhí)行锦秒,因為Flink可以在每個窗口到達時遞增地聚合它們的數據元.
ProcessWindowFunction獲取Iterable窗口中包含的所有數據元以及有關數據元所屬窗口的其他元信息。
具有ProcessWindowFunction的窗口轉換不能像其他情況一樣有效地執(zhí)行湘纵,因為Flink必須在調用函數之前在內部緩沖窗口的所有數據元脂崔。這可以通過組合來減輕ProcessWindowFunction與ReduceFunction,AggregateFunction或FoldFunction以獲得兩個窗口元件的增量聚合并且該附加元數據窗口 ProcessWindowFunction接收梧喷。我們將查看每個變體的示例砌左。
7.1 ReduceFunction
指定如何組合輸入中的兩個數據元以生成相同類型的輸出數據元.
Flink使用ReduceFunction來遞增地聚合窗口的數據元.
定義和使用
- Java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
- Scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
原來傳遞進來的數據是字符串,此處我們就使用數值類型铺敌,通過數值類型來演示增量的效果
這里不是等待窗口所有的數據進行一次性處理汇歹,而是數據兩兩處理
-
輸入
-
增量輸出
-
Java
7.2 聚合函數An AggregateFunction是一個通用版本,ReduceFunction它有三種類型:輸入類型(IN)偿凭,累加器類型(ACC)和輸出類型(OUT)产弹。輸入類型是輸入流中數據元的類型,并且AggregateFunction具有將一個輸入數據元添加到累加器的方法弯囊。該接口還具有用于創(chuàng)建初始累加器的方法痰哨,用于將兩個累加器合并到一個累加器中以及用于OUT從累加器提取輸出(類型)。我們將在下面的示例中看到它的工作原理匾嘱。
與之相同ReduceFunction斤斧,Flink將在窗口到達時遞增地聚合窗口的輸入數據元。
一個AggregateFunction可以被定義并這樣使用:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
- Scala
The accumulator is used to keep a running sum and a count. The [getResult] method
\* computes the average.
\*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator.\_1 + value.\_2, accumulator.\_2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator.\_1 / accumulator.\_2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a.\_1 + b.\_1, a.\_2 + b.\_2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
7.3 ProcessWindowFunction
ProcessWindowFunction獲取包含窗口的所有數據元的Iterable霎烙,以及可訪問時間和狀態(tài)信息的Context對象撬讽,這使其能夠提供比其他窗口函數更多的靈活性。這是以性能和資源消耗為代價的悬垃,因為數據元不能以遞增方式聚合游昼,而是需要在內部進行緩沖,直到窗口被認為已準備好進行處理尝蠕。
ProcessWindowFunction外觀簽名如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
該key參數是通過KeySelector為keyBy()調用指定的Keys提取的Keys烘豌。在元組索引鍵或字符串字段引用的情況下,此鍵類型始終是Tuple看彼,您必須手動將其轉換為正確大小的元組以提取鍵字段扇谣。
A ProcessWindowFunction可以像這樣定義和使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
該示例顯示了ProcessWindowFunction對窗口中的數據元進行計數的情況。此外闲昭,窗口函數將有關窗口的信息添加到輸出。
注意注意靡挥,使用ProcessWindowFunction簡單的聚合(例如count)是非常低效的