flink學(xué)習(xí)之十一-window&EventTime實(shí)例

上面試了Processing Time捣作,在這里準(zhǔn)備看下Event Time态秧,以及必須需要關(guān)注的惩激,在ET場(chǎng)景下的Watermarks。

EventTime & Watermark

Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time.

以event time為準(zhǔn)的程序苞慢,必須要指定watermark.

以下內(nèi)容引自 《從0到1學(xué)習(xí)flink》及 官網(wǎng)說(shuō)明:

支持 Event Time 的流處理器需要一種方法來(lái)衡量 Event Time 的進(jìn)度诵原。 例如,當(dāng) Event Time 超過(guò)一小時(shí)結(jié)束時(shí)挽放,需要通知構(gòu)建每小時(shí)窗口的窗口操作符绍赛,以便操作員可以關(guān)閉正在進(jìn)行的窗口。

Event Time 可以獨(dú)立于 Processing Time 進(jìn)行骂维。 例如惹资,在一個(gè)程序中,操作員的當(dāng)前 Event Time 可能略微落后于 Processing Time (考慮到接收事件的延遲)航闺,而兩者都以相同的速度進(jìn)行褪测。另一方面,另一個(gè)流程序可能只需要幾秒鐘的時(shí)間就可以處理完 Kafka Topic 中數(shù)周的 Event Time 數(shù)據(jù)潦刃。

A stream processor that supports event time needs a way to measure the progress of event time. For example, a window operator that builds hourly windows needs to be notified when event time has passed beyond the end of an hour, so that the operator can close the window in progress.

Event time can progress independently of processing time (measured by wall clocks). For example, in one program the current event time of an operator may trail slightly behind the processing time (accounting for a delay in receiving the events), while both proceed at the same speed. On the other hand, another streaming program might progress through weeks of event time with only a few seconds of processing, by fast-forwarding through some historic data already buffered in a Kafka topic (or another message queue).

Flink 中用于衡量 Event Time 進(jìn)度的機(jī)制是 Watermarks侮措。 Watermarks 作為數(shù)據(jù)流的一部分流動(dòng)并帶有時(shí)間戳 t。 Watermark(t)聲明 Event Time 已到達(dá)該流中的時(shí)間 t乖杠,這意味著流中不應(yīng)再有具有時(shí)間戳 t’<= t 的元素(即時(shí)間戳大于或等于水印的事件)

下圖顯示了帶有(邏輯)時(shí)間戳和內(nèi)聯(lián)水印的事件流分扎。在本例中,事件是按順序排列的(相對(duì)于它們的時(shí)間戳)胧洒,這意味著水印只是流中的周期性標(biāo)記畏吓。

stream_watermark_in_order
stream_watermark_in_order

Watermark 對(duì)于無(wú)序流是至關(guān)重要的墨状,如下所示,其中事件不按時(shí)間戳排序菲饼。通常肾砂,Watermark 是一種聲明,通過(guò)流中的該點(diǎn)宏悦,到達(dá)某個(gè)時(shí)間戳的所有事件都應(yīng)該到達(dá)镐确。一旦水印到達(dá)操作員,操作員就可以將其內(nèi)部事件時(shí)間提前到水印的值饼煞。

stream_watermark_out_of_order
stream_watermark_out_of_order

理解下來(lái)源葫,如果flink中設(shè)置的時(shí)間類型是Event Time,必須要設(shè)置watermark砖瞧,作為告訴flink進(jìn)度的標(biāo)志息堂。

如果watermark(time1)已經(jīng)確定,那么說(shuō)明流中所有time2早于watermark-time1的數(shù)據(jù)肯定都已經(jīng)被處理完畢块促,不管是有序數(shù)據(jù)流還是無(wú)序數(shù)據(jù)流储矩。

watermark是誰(shuí)來(lái)產(chǎn)生的?--sorry褂乍,是跑在flink中的job代碼來(lái)產(chǎn)生,而不是datasource本身即硼。

watermark是每個(gè)數(shù)據(jù)都有一個(gè)對(duì)應(yīng)的么逃片?可以1:1,但不是只酥,按需要和實(shí)際情況來(lái)做褥实。

It is possible to generate a watermark on every single event. However, because each watermark causes some computation downstream, an excessive number of watermarks degrades performance.

平行流中的水印

水印是在源函數(shù)處生成的,或直接在源函數(shù)之后生成的裂允。源函數(shù)的每個(gè)并行子任務(wù)通常獨(dú)立生成其水印损离。這些水印定義了特定并行源處的事件時(shí)間。

當(dāng)水印通過(guò)流程序時(shí)绝编,它們會(huì)提前到達(dá)操作人員處的事件時(shí)間僻澎。當(dāng)一個(gè)操作符提前(advanced)它的事件時(shí)間(event time)時(shí),它為它的后續(xù)操作符在下游生成一個(gè)新的水印十饥。

一些操作員消耗多個(gè)輸入流; 例如窟勃,一個(gè) union,或者跟隨 keyBy(…)或 partition(…)函數(shù)的運(yùn)算符逗堵。 這樣的操作員當(dāng)前事件時(shí)間是其輸入流的事件時(shí)間的最小值秉氧。 由于其輸入流更新其事件時(shí)間,因此操作員也是如此蜒秤。

下圖顯示了流經(jīng)并行流的事件和水印的示例汁咏,以及跟蹤事件時(shí)間的運(yùn)算符亚斋。

flink_parallel_streams_watermarks
flink_parallel_streams_watermarks

從上圖看,event time是從source中產(chǎn)生的攘滩,同樣的帅刊,watermark也是如此。

數(shù)據(jù)從source在經(jīng)過(guò)map轉(zhuǎn)換轰驳,并且放在window中處理

其他的沒(méi)看懂厚掷。。级解。

關(guān)于TimeStamp及Watermark

In order to work with event time, Flink needs to know the events’ timestamps, meaning each element in the stream needs to have its event timestamp assigned. This is usually done by accessing/extracting the timestamp from some field in the element.

Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time.

There are two ways to assign timestamps and generate watermarks:

  1. Directly in the data stream source
  2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted

Attention Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.

event time類型下冒黑,flink必須知道event對(duì)應(yīng)的timestamp,也就是說(shuō)勤哗,這個(gè)stream中的每個(gè)元素都要分配timestamp抡爹,一般是放在每個(gè)元素中對(duì)應(yīng)的字段。

分配timestamp和生成watermark一般是在一起處理的(hand-in-hand).

有兩種方式來(lái)分配timestamp+生成watermark

  • 直接在datasource中指定
  • 通過(guò)一個(gè)timestamp assigner(或者稱之為watermark generator)來(lái)指定芒划。在flink中冬竟,timestamp assigner 同時(shí)也是一個(gè)watermark generator
直接在datasource中指定

Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks. When this is done, no timestamp assigner is needed. Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten.

To assign a timestamp to an element in the source directly, the source must use the collectWithTimestamp(...) method on the SourceContext. To generate watermarks, the source must call the emitWatermark(Watermark) function.

比如之前的mysql datasource with spring,其實(shí)現(xiàn)是這樣的:

    @Override
    public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
        log.info("------query ");

        if(urlInfoManager == null){
            init();
        }

        List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
        urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
    }

如果需要加入timestamp民逼,則需要調(diào)用collectWithTimestamp泵殴;如果需要生成watermark,則需要調(diào)用emitWatermark拼苍。

修改后如下:

    @Override
    public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
        log.info("------query ");

        if(urlInfoManager == null){
            init();
        }

        List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
        urlInfoList.parallelStream().forEach(urlInfo -> {
            // 增加timestamp
            sourceContext.collectWithTimestamp(urlInfo,System.currentTimeMillis());

            // 生成水印
            sourceContext.emitWatermark(new Watermark(urlInfo.getCurrentTime()== null? System.currentTimeMillis():urlInfo.getCurrentTime().getTime()));

            sourceContext.collect(urlInfo);

        });
    }

注意其中增加的兩行代碼笑诅,timestamp和watermark都是針對(duì)每個(gè)元素的。

通過(guò)Timestamp Assigners / Watermark Generators指定

Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.

Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so. A common pattern, for example, is to parse (MapFunction) and filter (FilterFunction) before the timestamp assigner. In any case, the timestamp assigner needs to be specified before the first operation on event time (such as the first window operation). As a special case, when using Kafka as the source of a streaming job, Flink allows the specification of a timestamp assigner / watermark emitter inside the source (or consumer) itself. More information on how to do so can be found in the Kafka Connector documentation.

Timestamp Assigner 允許輸入一個(gè)stream疮鲫,輸出一個(gè)帶timestamp吆你、watermark的元素組成的流。如果流之前已經(jīng)有了timestamp俊犯、watermark妇多,則會(huì)被覆蓋。

Timestamp Assigner 一般會(huì)立即在datasoure初始化之后馬上指定燕侠,不過(guò)卻并不一定非要這么做者祖。一個(gè)通用的模式是在parse、filter之后绢彤,指定timestamp assigner咸包;不過(guò)在任何第一次需要對(duì)event time操作之前,必須指定timestamp assigner杖虾。

先看一個(gè)例子:

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();

        DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
                dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
                    if (o.getDomain() == UrlInfo.BAIDU) {
                        return true;
                    }
                    return false;
                }).assignTimestampsAndWatermarks(new MyTimestampAndWatermarkAssigner());

        dataStreamSource.addSink(new PrintSinkFunction());

        env.execute("mysql Datasource with pool and spring");
    }

可以看到烂瘫,這里在filter之后做了一個(gè)assignTimestampAndWatermarks的操作。

With Periodic Watermarks--周期性的添加watermark

AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).

The interval (every n milliseconds) in which the watermark will be generated is defined viaExecutionConfig.setAutoWatermarkInterval(...). The assigner’s getCurrentWatermark() method will be called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous watermark.

如果需要周期性的生成watermark,而不是每次都生成坟比,就需要調(diào)用方法AssignerWithPeriodicWatermarks芦鳍,時(shí)間間隔以milliseconds為單位,需要在ExecutionConfig.setAutoWatermarkInterval方法中設(shè)置葛账。

   public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();

        DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 設(shè)定watermark間隔時(shí)間
        ExecutionConfig config = env.getConfig();
        config.setAutoWatermarkInterval(300);

        SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
                dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
                    if (o.getDomain() == UrlInfo.BAIDU) {
                        return true;
                    }

                    return false;
                }).assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator());

        dataStreamSource.addSink(new PrintSinkFunction());

        env.execute("mysql Datasource with pool and spring");
    }

可以看到柠衅,這里通過(guò)ExecuteConfig設(shè)置了watermark生成的間隔時(shí)間,同時(shí)在filter之后加入了TimeLagWatermarkGenerator籍琳,其代碼如下(來(lái)源于官網(wǎng)菲宴,稍有修改):

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<UrlInfo> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(UrlInfo element, long previousElementTimestamp) {
        return element.getCurrentTime().getTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}
With Punctuated(不時(shí)打斷) Watermarks

To generate watermarks whenever a certain event indicates that a new watermark might be generated, useAssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.

The checkAndGetNextWatermark(...) method is passed the timestamp that was assigned in the extractTimestamp(...) method, and can decide whether it wants to generate a watermark. Whenever the checkAndGetNextWatermark(...) method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that new watermark will be emitted.

 public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MysqlDSWithSpringForFlink streamSource = new MysqlDSWithSpringForFlink();

        DataStreamSource dataStreamSource = env.addSource(streamSource);//addSink(new PrintSinkFunction<>());

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<UrlInfo> withTimestampAndWatermarkStream =
                dataStreamSource.filter((FilterFunction<UrlInfo>) o -> {
                    if (o.getDomain() == UrlInfo.BAIDU) {
                        return true;
                    }
                    return false;
                }).assignTimestampsAndWatermarks(new PunctuatedAssigner());

        dataStreamSource.addSink(new PrintSinkFunction());

        env.execute("mysql Datasource with pool and spring");
    }
import myflink.model.UrlInfo;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<UrlInfo> {

    @Override
    public long extractTimestamp(UrlInfo element, long previousElementTimestamp) {
        return element.getCurrentTime().getTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(UrlInfo lastElement, long extractedTimestamp) {
        /**
         * Creates a new watermark with the given timestamp in milliseconds.
         */
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}
kafka相關(guān)

When using Apache Kafka as a data source, each Kafka partition may have a simple event time pattern (ascending timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel, interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka’s consumer clients work).

In that case, you can use Flink’s Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.

For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the ascending timestamps watermark generator will result in perfect overall watermarks.

The illustrations below show how to use the per-Kafka-partition watermark generation, and how watermarks propagate through the streaming dataflow in that case.

由于kafka有多個(gè)partition,每個(gè)kafka partition中可能都有自己的event time規(guī)則趋急,而在消費(fèi)端喝峦,多個(gè)partition中的數(shù)據(jù)是并行處理的,來(lái)自于不同partition的數(shù)據(jù)其event time規(guī)則不同呜达,所以就破壞掉了event time的生成規(guī)則谣蠢。

在這種情況下,可以使用flink的Kafka-partition-aware watermark生成查近,如下代碼:

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作眉踱,轉(zhuǎn)換,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流霜威,這里是從string-->UrlInfo
                .map(string -> JSON.parseObject(string, UrlInfo.class));

                dataStreamSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UrlInfo>(){

                    @Override
                    public long extractAscendingTimestamp(UrlInfo element) {
                        return element.getCurrentTime().getTime();
                    }
                });

        env.execute("save url to db");
    }

注意使用的是AscendingTimestampExtractor谈喳,也就是一個(gè)升序的timestamp 指派器。

參考資料:

http://www.54tianzhisheng.cn/2018/12/11/Flink-time/

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末戈泼,一起剝皮案震驚了整個(gè)濱河市叁执,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌矮冬,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件次哈,死亡現(xiàn)場(chǎng)離奇詭異胎署,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)窑滞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門琼牧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人哀卫,你說(shuō)我怎么就攤上這事巨坊。” “怎么了此改?”我有些...
    開(kāi)封第一講書人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵趾撵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)占调,這世上最難降的妖魔是什么暂题? 我笑而不...
    開(kāi)封第一講書人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮究珊,結(jié)果婚禮上薪者,老公的妹妹穿的比我還像新娘。我一直安慰自己剿涮,他們只是感情好言津,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著取试,像睡著了一般悬槽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上想括,一...
    開(kāi)封第一講書人閱讀 50,096評(píng)論 1 291
  • 那天陷谱,我揣著相機(jī)與錄音,去河邊找鬼瑟蜈。 笑死烟逊,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的铺根。 我是一名探鬼主播宪躯,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼位迂!你這毒婦竟也來(lái)了访雪?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤掂林,失蹤者是張志新(化名)和其女友劉穎臣缀,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體泻帮,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡精置,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锣杂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脂倦。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖元莫,靈堂內(nèi)的尸體忽然破棺而出赖阻,到底是詐尸還是另有隱情,我是刑警寧澤踱蠢,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布火欧,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏布隔。R本人自食惡果不足惜离陶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望衅檀。 院中可真熱鬧招刨,春花似錦、人聲如沸哀军。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)杉适。三九已至谎倔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猿推,已是汗流浹背片习。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蹬叭,地道東北人藕咏。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像秽五,于是被迫代替她去往敵國(guó)和親孽查。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容