聊聊flink的BoundedOutOfOrdernessTimestampExtractor

本文主要研究一下flink的BoundedOutOfOrdernessTimestampExtractor

BoundedOutOfOrdernessTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    /**
     * Extracts the timestamp from the given element.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現(xiàn)AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法亡资,同時聲明抽象方法extractAscendingTimestamp供子類實現(xiàn)
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數(shù)用于指定element允許滯后(t-t_w棵介,t為element的eventTime谊却,t_w為前一次watermark的時間)的最大時間倦春,在計算窗口數(shù)據(jù)時私痹,如果超過該值則會被忽略
  • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法會調用子類的extractTimestamp方法抽取時間容握,如果該時間大于currentMaxTimestamp逢艘,則更新currentMaxTimestamp鳞青;getCurrentWatermark先計算potentialWM,如果potentialWM大于等于lastEmittedWatermark則更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness铃肯,這里表示lastEmittedWatermark太小了所以差值超過了maxOutOfOrderness患亿,因而調大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)

實例

    public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        popularPlaces.print();

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }

    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • 該實例使用的是AssignerWithPeriodicWatermarks押逼,通過env.getConfig().setAutoWatermarkInterval(1000)設置了watermark的時間間隔步藕,通過assignTimestampsAndWatermarks指定了AssignerWithPeriodicWatermarks為TaxiRideTSExtractor,它繼承了BoundedOutOfOrdernessTimestampExtractor抽象類

小結

  • flink為了方便開發(fā)提供了幾個內置的Pre-defined Timestamp Extractors / Watermark Emitters挑格,其中一個就是BoundedOutOfOrdernessTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現(xiàn)AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法咙冗,同時聲明抽象方法extractAscendingTimestamp供子類實現(xiàn)
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數(shù)用于指定element允許滯后(t-t_w,t為element的eventTime漂彤,t_w為前一次watermark的時間)的最大時間雾消,在計算窗口數(shù)據(jù)時,如果超過該值則會被忽略

doc

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末挫望,一起剝皮案震驚了整個濱河市立润,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌媳板,老刑警劉巖桑腮,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蛉幸,居然都是意外死亡到旦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門巨缘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來添忘,“玉大人,你說我怎么就攤上這事若锁「槠铮” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長仲器。 經(jīng)常有香客問我煤率,道長,這世上最難降的妖魔是什么乏冀? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任蝶糯,我火速辦了婚禮,結果婚禮上辆沦,老公的妹妹穿的比我還像新娘昼捍。我一直安慰自己,他們只是感情好肢扯,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布妒茬。 她就那樣靜靜地躺著,像睡著了一般蔚晨。 火紅的嫁衣襯著肌膚如雪乍钻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天铭腕,我揣著相機與錄音银择,去河邊找鬼。 笑死累舷,一個胖子當著我的面吹牛欢摄,可吹牛的內容都是我干的。 我是一名探鬼主播笋粟,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼析蝴!你這毒婦竟也來了害捕?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤闷畸,失蹤者是張志新(化名)和其女友劉穎尝盼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體佑菩,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡盾沫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了殿漠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片播揪。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡匠楚,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情寒跳,我是刑警寧澤郭怪,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響帘营,放射性物質發(fā)生泄漏。R本人自食惡果不足惜逐哈,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一芬迄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧昂秃,春花似錦禀梳、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至哗戈,卻和暖如春郊艘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唯咬。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工纱注, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人胆胰。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓狞贱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蜀涨。 傳聞我的和親對象是個殘疾皇子瞎嬉,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容