Watermark作用
- watermark是用于處理亂序事件的峻厚,而正確的處理亂序事件屋谭,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
- 我們知道胚委,流處理從事件產(chǎn)生竹椒,到流經(jīng)source朦促,再到operator膝晾,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下务冕,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的血当,但是也不排除由于網(wǎng)絡(luò)、背壓等原因禀忆,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)臊旭。
- 但是對(duì)于late element,我們又不能無(wú)限期的等下去箩退,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后离熏,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制戴涝,就是watermark滋戳。
- watermark基礎(chǔ)知識(shí):Flink--EventTime中WaterMark知識(shí)點(diǎn)掃盲
Window的劃分
- Window的設(shè)定無(wú)關(guān)數(shù)據(jù)本身钻蔑,而是系統(tǒng)定義好的。
- window是flink中劃分?jǐn)?shù)據(jù)一個(gè)基本單位奸鸯,window的劃分方式是固定的咪笑,默認(rèn)會(huì)根據(jù)自然時(shí)間劃分window,并且劃分方式是前閉后開(kāi)娄涩。
- window示例:
window劃分 |
第一個(gè) |
第二個(gè) |
第三個(gè) |
3s |
[00:00:00~00:00:03) |
[00:00:03~00:00:06) |
[00:00:06~00:00:09) |
5s |
[00:00:00~00:00:05) |
[00:00:05~00:00:10) |
[00:00:10~00:00:15) |
10s |
[00:00:00~00:00:10) |
[00:00:10~00:00:20) |
[00:00:20~00:00:30) |
1min |
[00:00:00~00:01:00) |
[00:01:00~00:02:00) |
[00:02:00~00:03:00) |
Watermark分配方式
Watermark默認(rèn)更新時(shí)間
- 詳見(jiàn)源碼解釋
- 在非processing time的模式下钝满,默認(rèn)是200ms;
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
Periodic Watermarks跟蹤
- 因?yàn)镻eriodic Watermarks允許設(shè)定一個(gè)最大亂序時(shí)間申窘,這種情況應(yīng)用最多弯蚜。
package github.yahuili1128.watermark;
import github.yahuili1128.pojo.MockUpModel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.TreeSet;
import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;
/**
* @Description : 從kafka中讀取數(shù)據(jù),練習(xí)watermark
* @Author : LiYahui
* @Date : 2019-08-06 11:45
* @Version : V1.0
*/
public class PeriodicWatermarkTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
.assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");
result.print();
env.execute(PeriodicWatermarkTest.class.getSimpleName());
}
public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
throws Exception {
TreeSet<Long> set = new TreeSet<>();
// 元素個(gè)數(shù)
int size = Iterables.size(input);
Iterator<MockUpModel> eles = input.iterator();
while (eles.hasNext()) {
set.add(eles.next().timestamp);
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//(code剃法,窗口內(nèi)元素個(gè)數(shù)碎捺,窗口內(nèi)最早元素的時(shí)間,窗口內(nèi)最晚元素的時(shí)間贷洲,窗口自身開(kāi)始時(shí)間收厨,窗口自身結(jié)束時(shí)間)
String first = sdf.format(set.first());
String last = sdf.format(set.last());
String start = sdf.format(window.getStart());
String end = sdf.format(window.getEnd());
// 調(diào)試使用
out.collect("event.key:" + key + ",window中元素個(gè)數(shù):" + size + ",window第一個(gè)元素時(shí)間戳:" + first + ",window最后一個(gè)元素時(shí)間戳:"
+ last + ",window開(kāi)始時(shí)間戳:" + start + ",window結(jié)束時(shí)間戳:" + end + ",窗口內(nèi)所有的時(shí)間戳:" + set.toString());
}
}
public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
// 定義最大延遲 2s
private final long maxOutOfOrderness = 5000L;
private long currentMaxTimestamp;
private Watermark watermark;
// 將時(shí)間戳信息格式化,調(diào)試學(xué)習(xí)
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
// 獲取event中的時(shí)間戳
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
// 將所有的時(shí)間信息打印
System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
.format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
.format(currentMaxTimestamp) + "| watermark" + watermark.toString());
// 返回event中的時(shí)間戳
return timestamp;
}
}
}
--->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
8> event.key:male,window中元素個(gè)數(shù):6,window第一個(gè)元素時(shí)間戳:2019-08-06 17:00:33.747,window最后一個(gè)元素時(shí)間戳:2019-08-06 17:00:39.815,window開(kāi)始時(shí)間戳:2019-08-06 17:00:30.000,window結(jié)束時(shí)間戳:2019-08-06 17:00:40.000,窗口內(nèi)所有的時(shí)間戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
--->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
--->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
--->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
--->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
8> event.key:male,window中元素個(gè)數(shù):6,window第一個(gè)元素時(shí)間戳:2019-08-06 17:00:41.839,window最后一個(gè)元素時(shí)間戳:2019-08-06 17:00:49.900,window開(kāi)始時(shí)間戳:2019-08-06 17:00:40.000,window結(jié)束時(shí)間戳:2019-08-06 17:00:50.000,窗口內(nèi)所有的時(shí)間戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
--->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
--->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
--->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028
- 為什么watermark會(huì)出現(xiàn)-5000
- AssignerWithPeriodicWatermarks子類(lèi)是每隔一段時(shí)間執(zhí)行的,這個(gè)具體由ExecutionConfig.setAutoWatermarkInterval設(shè)置优构,默認(rèn)是200ms诵叁,之所以會(huì)出現(xiàn)-5000時(shí)因?yàn)槟銢](méi)有數(shù)據(jù)進(jìn)入窗口,當(dāng)然一直都是-5000钦椭,但是getCurrentWatermark方法不是在執(zhí)行extractTimestamp后才執(zhí)行拧额。
結(jié)論
- window的觸發(fā)要符合以下幾個(gè)條件:
- watermark時(shí)間 >= window_end_time
- 在[window_start_time,window_end_time)中有數(shù)據(jù)存在;
- 同時(shí)滿足了以上2個(gè)條件彪腔,window才會(huì)觸發(fā)侥锦。
- watermark是一個(gè)全局的值,不是某一個(gè)key下的值德挣,所以即使不是同一個(gè)key的數(shù)據(jù)恭垦,其warmark也會(huì)增加.
- 這個(gè)部分的知識(shí)點(diǎn)需要細(xì)細(xì)的理解一下;