- 基于flink-1.8.1
- 基于flink官網(wǎng)
概述
-
實時計算中谴返,數(shù)據(jù)時間比較敏感皂岔。有eventTime和processTime區(qū)分庭瑰,一般來說eventTime是從原始的消息中提取過來的晤锹,processTime是Flink自己提供的驼唱,F(xiàn)link中一個亮點就是可以基于eventTime計算寇损,這個功能很有用凸郑,因為實時數(shù)據(jù)可能會經(jīng)過比較長的鏈路,多少會有延時矛市,并且有很大的不確定性芙沥,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的浊吏。
Flink-eventtime.png
Flink WaterMark介紹
- watermark是一種衡量Event Time進展的機制而昨,它是數(shù)據(jù)本身的一個隱藏屬性。通痴姨铮基于Event Time的數(shù)據(jù)歌憨,自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件墩衙,通常用watermark機制結合window來實現(xiàn)躺孝。
- 流處理從事件產(chǎn)生享扔,到流經(jīng)source,再到operator植袍,中間是有一個過程和時間的惧眠。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的于个,但是也不排除由于網(wǎng)絡氛魁、背壓等原因,導致亂序的產(chǎn)生(out-of-order或者說late element)厅篓。
- 但是對于late element秀存,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后羽氮,必須觸發(fā)window去進行計算了或链。這個特別的機制,就是watermark档押。
并行流中的watermark
- 在source functions處或者之后生成watermark澳盐,source functions的parallel subtask 通常會獨立的生成watermark;這些watermarks定義了該特定parallel source的事件事件令宿;
- 對于流式處理中的叼耙,當watermarks達到某個算子時,watermark會將event time提前粒没。每當算子將流中的event time提前筛婉,這個算子都會為下游算子生成新的watermark;
- 一些算子會有多個source stream癞松, 例如爽撒,一個union,或者跟隨keyBy(...)或partition(...)函數(shù)的算子响蓉。當前輸入stream中的event time是多個source stream中的event time的最小值匆浙;(
Such an operator’s current event time is the minimum of its input streams’ event times
);由于input stream更新了event time,算子同樣會更新event time厕妖。如下圖所示:
Flink-eventtime.png
生成 Timestamps / Watermarks
分配Timestamps
- 為了處理事件時間,F(xiàn)link需要知道事件的時間戳挑庶,這意味著流中的每個元素都需要分配其事件時間戳言秸。 這通常通過從元素中的某個字段訪問/提取時間戳來完成。
- 時間戳分配與生成watermark密切相關迎捺,watermark告訴系統(tǒng)事件時間的進展举畸。
- 有兩種方法可以分配時間戳并生成水印:
- 直接在數(shù)據(jù)流源中凳枝;
- 通過時間戳分配器/watermatk生成器:在Flink中抄沮,時間戳分配器還定義要發(fā)出的watermark跋核;
- Attention:Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
Source Functions with Timestamps and Watermarks
- source functions可以直接為它們生成的元素分配時間戳,它們也可以發(fā)出watermark叛买。當完成此操作時砂代,不需要再使用時間戳分配器。請注意率挣,如果使用時間戳分配器刻伊,則將覆蓋源提供的任何時間戳和水印。
- 要直接為source中的元素分配時間戳椒功,源必須在SourceContext上使用collectWithTimestamp(...)方法捶箱。 要生成水印,源必須調用emitWatermark(水佣)功能丁屎。
- 語法:
java
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
scala
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
Timestamp Assigners / Watermark Generators
- Timestamp Assigners獲取stream并生成帶有帶時間戳元素和水印的新流。 如果原始stream中已經(jīng)有時間戳和/或水印旱眯,則時間戳分配器會覆蓋它們晨川。
- 時間戳分配器通常在數(shù)據(jù)源之后立即指定,但并非嚴格要求這樣做键思。 例如础爬,常見的模式是在時間戳分配器之前解析(MapFunction)和過濾(FilterFunction)。 在任何情況下吼鳞,需要在事件時間的第一個操作之前指定時間戳分配器(例如第一個窗口操作)看蚜。 作為一種特殊情況,當使用Kafka作為流式傳輸作業(yè)的源時赔桌,F(xiàn)link允許在源(或消費者)本身內指定時間戳分配器/水印發(fā)射器供炎。 有關如何執(zhí)行此操作的更多信息,請參閱Kafka Connector文檔疾党。
- java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
- scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
With Periodic Watermarks周期性watermark
定時提取watermark音诫,這種方式會定時提取更新wartermark。
- AssignerWithPeriodicWatermarks定期分配時間戳并生成水友┪弧(可能取決于流元素竭钝,或純粹基于處理時間)。
- 生成水印的間隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定義雹洗。 每次調用分配器的getCurrentWatermark()方法香罐,如果返回的水印非空且大于前一個水印,則會發(fā)出新的水印时肿。
- 這里我們展示了兩個使用周期性水印生成的時間戳分配器的簡單示例庇茫。 請注意,F(xiàn)link附帶了一個BoundedOutOfOrdernessTimestampExtractor螃成,類似于下面顯示的BoundedOutOfOrdernessGenerator旦签;
- java
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
- scala
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
With Punctuated Watermarks
伴隨event的到來就提取watermark查坪,就是每一個event到來的時候,就會提取一次Watermark宁炫。這樣的方式當然設置watermark更為精準偿曙,但是當數(shù)據(jù)量大的時候,頻繁的更新wartermark會比較影響性能淋淀。通常情況下采用定時提取就足夠了遥昧。
- 當生成watermark的過程中某個event指示器 可能生成新wateramrk,請使用AssignerWithPunctuatedWatermarks朵纷。 對于此類炭臭,F(xiàn)link將首先調用extractTimestamp(...)方法為元素分配時間戳,然后立即調用該元素上的checkAndGetNextWatermark(...)方法袍辞。
- checkAndGetNextWatermark(...)方法傳遞在extractTimestamp(...)方法中分配的時間戳鞋仍,并可以決定是否要生成watermark。 每當checkAndGetNextWatermark(...)方法返回非空watermark搅吁,并且該watermark大于最新的先前watermark時威创,將發(fā)出該新watermark。
- demo
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
- scala
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
- 注意:可以在每個事件上生成水印谎懦。 然而肚豺,因為每個水印在下游引起一些計算,所以過多的水印會降低性能.
Timestamps per Kafka Partition
- 當使用Apache Kafka作為數(shù)據(jù)源時界拦,每個Kafka分區(qū)可能具有簡單的事件時間模式(升序時間戳或有界無序)吸申。但是,當從Kafka消費流數(shù)據(jù)時享甸,多個分區(qū)通常并行消費截碴,交錯來自分區(qū)的事件并破壞每個分區(qū)模式(這是Kafka的消費者客戶端工作的固有方式)。
- 在這種情況下蛉威,您可以使用Flink的Kafka分區(qū)感知水印生成日丹。使用該功能,根據(jù)Kafka分區(qū)在Kafka使用者內部生成水印蚯嫌,并且每個分區(qū)水印的合并方式與在流shuffle上合并水印的方式相同哲虾。
- 例如,如果事件時間戳嚴格按每個Kafka分區(qū)升序择示,則使用升序時間戳水印生成器生成每分區(qū)水印將產(chǎn)生完美的整體水印束凑。
- 下圖顯示了如何使用per-Kafka分區(qū)水印生成,以及在這種情況下水印如何通過流數(shù)據(jù)流傳播对妄。
- demo
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
- scala
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
kafka.png
Pre-defined Timestamp Extractors / Watermark Emitters
- 這部分比較簡單,是兩個類的講解敢朱,詳見官網(wǎng)剪菱;
- 建議大家認真閱讀一下官網(wǎng)摩瞎;