問題定位:Flink水位線不觸發(fā)問題

Flink水位線不觸發(fā)問題

窗口計算時遇到好幾次水位線不觸發(fā)的情況导俘,簡單總結下。

首先,介紹下Flink的事件時間(EventTime)和水位線(Watermarks)的概念肉康。

一、處理時間

如果要構造一個實時的流式應用灼舍,或早或晚都會接觸到EventTime這個概念『鸷停現(xiàn)實場景中也會遇到消息亂序到達,這里會介紹到為什么需要事件時間和如何去處理亂序到達的數(shù)據(jù)骑素。
ProcessingTime是Flink系統(tǒng)處理這條消息的時間炫乓,EventTime可以理解成是這條消息真實發(fā)生的時間。
舉個例子,創(chuàng)建一個SlidingWindow厢岂,窗口大小為10秒光督,步長為5秒。關于窗口的更多概念塔粒,可以參考Flink官方文檔——Windows结借。

案例1:消息都按時到達

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

如果source中有三條消息,對應的事件時間分別為13秒卒茬、13秒和16秒:
[站外圖片上傳中...(image-1e2304-1550219427900)]
它們會落到正確的窗口上船老,如下圖所示。13秒產(chǎn)生的兩條消息會落到窗口1[5s-15s]和窗口2[10s-20s]上圃酵,16秒產(chǎn)生的消息會落到窗口2[10s-20s]和窗口3[15s-25s]上柳畔。最后窗口fire掉時,三個窗口的count值分別為:(a,2), (a,3) and (a,1) 郭赐,和預期一致薪韩。

案例2:消息delay

其中一條13秒產(chǎn)生的消息晚到了6s,那按上面的代碼邏輯捌锭,這些消息會落到下面的窗口中:


延遲的消息會落到窗口2[10s-20s]和窗口3[15s-25s]上俘陷。這看起來對窗口2沒有影響,因為結果都是3观谦,但窗口3的結果卻不一致了拉盾。

二、事件時間

因此此處我們采用事件時間豁状,這里水位線的事件時間為當前系統(tǒng)的時間捉偏,當然你可以改成數(shù)據(jù)中的某個時間。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

結果如下圖所示:
[站外圖片上傳中...(image-b2feb1-1550219427900)]
結果看起來好很多泻红,窗口2和3都正確了夭禽,但窗口1卻丟了一條數(shù)據(jù)。
Flink沒有將delay的數(shù)據(jù)分配給窗口3是因為現(xiàn)在是檢查消息的事件時間谊路,因此不會放入窗口三中驻粟。而沒有分配給窗口1的原因是delayed的消息到達系統(tǒng)的時間是19秒,窗口1已經(jīng)fire掉了凶异。此處就需要watermarks了。

三挤巡、水位線(水邮1颉)

我認為水位線是很重要和有趣的一個概念,我這里會大概描述下矿卑,如果想了解更多可以看Google一場精彩的talk喉恋,也可以看這個dataArtisans的blog。水位線簡單理解就是一個timestamp,當Flink收到這個水印時轻黑,F(xiàn)link理解會收到來自這個時間點之后的消息糊肤,也可以理解成告訴Flink運行到哪個事件時間了。
在這個案例中氓鄙,其實就是告訴Flink一條消息可以遲到多久馆揉。
我現(xiàn)在設置水位線為現(xiàn)在的事件提前5秒,相當于告訴Flink我的消息可以遲到五秒抖拦。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

結果變成下圖所示:

[站外圖片上傳中...(image-6dfb6e-1550219427900)]

四升酣、允許延遲(Lateness)

如果采用“watermark - delay”,如果水位線不超過window_length + delay是不會被fire掉的态罪,所以此刻可以采用allowedLateness方法噩茄。在window_end_time + allowed lateness之前,F(xiàn)link都不會丟棄這條數(shù)據(jù)复颈。
當消息到達時绩聘,F(xiàn)link會提取它的時間,然后判斷它是否在有效的延遲時間內(nèi)耗啦,然后去判斷是否fire掉窗口凿菩。
但是通過這種途徑,一個窗口有可能被fire掉多次芹彬,如果需要exactly once processing的話蓄髓,需要保證sink是冪等的。

五舒帮、水位線怎么不觸發(fā)会喝?

數(shù)據(jù)一直有序得進來,為什么沒有窗口被fire掉玩郊?沒有結果產(chǎn)出肢执?

case1:提取時間失敗

筆者和上游約定好了數(shù)據(jù)格式,extractTimestamp中提取的是某個字段為事件時間译红。研究數(shù)據(jù)發(fā)現(xiàn)約定好的字段突然不發(fā)了预茄。

case2:提取時間有了,但是照樣失敗

上游按約定發(fā)了該字段后侦厚,系統(tǒng)在測試環(huán)境運行了一段時間耻陕,又沒有結果產(chǎn)出了。
調(diào)試發(fā)現(xiàn)提取時間正常刨沦,checkAndGetNextWatermark也正常诗宣,但是為什么窗口沒被fire掉呢。
因為時間的format變了想诅,由到毫秒的timestamp變成了yyyymmddHHmmss召庞,需要轉(zhuǎn)成timestamp岛心。

case3:一切正常,窗口不fire

EventTimeTrigger.java
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

看了下EventTimeTrigger的源碼篮灼,只有當當前的水位線越過窗口忘古,即時間大于窗口的endTime才會觸發(fā)Fire的操作。我們的處理流程沒有觸發(fā)诅诱,那就說明我們的水位線沒有更新到合適的值髓堪。調(diào)試后發(fā)現(xiàn)當前的水位線一直停留在初始的最小的long值。

BoundedOutOfOrdernessTimestampExtractor.java
@Override
public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

debug發(fā)現(xiàn)lastEmittedWatermark確實有更新逢艘,這說明這個地方是觸發(fā)了Watermark的值旦袋。但是debug的過程中,發(fā)現(xiàn)時不時會出現(xiàn)初始值的水位線它改。

SystemProcessingTimeService.java
TimestampsAndPeriodicWatermarksOperator.java
@Override
public void onProcessingTime(long timestamp) throws Exception {
    // register next timer
    Watermark newWatermark = userFunction.getCurrentWatermark();
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
        currentWatermark = newWatermark.getTimestamp();
        // emit watermark
        output.emitWatermark(newWatermark);
    }
    long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

TimestampsAndPeriodicWatermarksOperator會做判斷:如果新的水位線小于當前的水位線疤孕,就不會更新了。

終于央拖,順著StreamInputProcessor–>StatusWatermarkValve理了下來祭阀,看見這樣的處理邏輯:

StreamInputProcessor.java
StatusWatermarkValve.java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
    long newMinWatermark = Long.MAX_VALUE;
    // determine new overall watermark by considering only watermark-aligned channels across all channels
    for (InputChannelStatus channelStatus : channelStatuses) {
        if (channelStatus.isWatermarkAligned) {
            newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
        }
    }
    // we acknowledge and output the new overall watermark if it is larger than the last output watermark
    if (newMinWatermark > lastOutputWatermark) {
        lastOutputWatermark = newMinWatermark;
        outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
    }
}

這里會將所有的channel status的水位線做個匯總:取最小的水位線。那是不是問題出在這里鲜戒?后面debug了下看看专控,確實,這個地方有的channel status下的水位線一直是最小的long值那個不正常的水位線遏餐,進而導致整體的水位線發(fā)送不出去伦腐。

那么為什么會出現(xiàn)這種情況呢,百思不得其解失都。

當 [window_start_time,window_end_time) 有數(shù)據(jù)柏蘑,watermark Time大于等于window_end_time時,會觸發(fā)window trigger粹庞。

因為之前運行都是正常的咳焚,檢查了數(shù)據(jù)也沒問題。去翻改動庞溜,有影響的可能就是改了一些算子的并行度革半。

assigntimestampandwatermarks和map的并行度一樣了就不能生成水位線了?

于是修改了assigntimestampandwatermarks的并行度流码,window正常fire掉了又官。

分析原因:
Flink source用到了FlinkKafkaConsumer010,沒有指定KafkaPartitioner的話漫试,會通過FixedPartitioner來給出默認的partitioner方法:

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        return partitions[this.parallelInstanceId % partitions.length];
    }

parallelInstanceId代表著Flink consumer程序的并行度ID六敬,假如FlinkKafkaConsumer010的并行度是12,那么這12個線程的ID分別是1-12.

parallelInstances代表著總的并行度商虐,即12觉阅。

partitions是一個kafka partition的數(shù)組,kafka的topic的partitions是4(因為性能測試秘车,換到只有一個節(jié)點的kafka)典勇。

Flink partition的規(guī)則,就是Flink的并行度ID除以kafka partition length取余叮趴。

因此kafka編號為1-4的partition分別對應source node的1-4的partition割笙,那么source node5-12的partition就為空了。

默認的partition策略是按照Flink的并行度ID與kafka中partition的數(shù)量取余的方法分配的眯亦,而與數(shù)據(jù)本身沒有關系伤溉。

source node的partition為5-12的接收不到數(shù)據(jù),

Watermark的生成是數(shù)據(jù)驅(qū)動的妻率,只有當TimestampAndWatermarkAssigner”看到”數(shù)據(jù)時乱顾,watermark才會生效。

而map和assigntimestampandwatermarks并發(fā)度一樣的話宫静,這八個partition的watermark不會修改走净,所以會出現(xiàn)watermark的初始值一直存在的情況。

當assigntimestampandwatermarks的并行度修改后孤里,事件會被shuffled across(洗牌)伏伯,因此到了TimestampAndWatermarkAssigner不會有空的partition存在了。

以上捌袜。

五 實際定位

  1. 問題:發(fā)現(xiàn)程序沒有輸出说搅,沒有任何報錯。
  2. 可以先去任務的Appmaster上看watermark運行情況.來定位是否是watermarke的問題
image-20181219103115949
  1. 如果是顯示no watermark 虏等,按照上述的case 進行診斷弄唧。
  2. 在assignTimestampsAndWatermarks 后setParallelism(2) 。具體并行度根據(jù)實際需求設置博其,在這里kafkatopic只有2個分區(qū)套才,并行度設置為2
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市慕淡,隨后出現(xiàn)的幾起案子背伴,更是在濱河造成了極大的恐慌,老刑警劉巖峰髓,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件傻寂,死亡現(xiàn)場離奇詭異,居然都是意外死亡携兵,警方通過查閱死者的電腦和手機疾掰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來徐紧,“玉大人静檬,你說我怎么就攤上這事炭懊。” “怎么了拂檩?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵侮腹,是天一觀的道長。 經(jīng)常有香客問我稻励,道長父阻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任望抽,我火速辦了婚禮加矛,結果婚禮上,老公的妹妹穿的比我還像新娘煤篙。我一直安慰自己斟览,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布舰蟆。 她就那樣靜靜地躺著趣惠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪身害。 梳的紋絲不亂的頭發(fā)上味悄,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音塌鸯,去河邊找鬼侍瑟。 笑死,一個胖子當著我的面吹牛丙猬,可吹牛的內(nèi)容都是我干的涨颜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼茧球,長吁一口氣:“原來是場噩夢啊……” “哼庭瑰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起抢埋,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤弹灭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后揪垄,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體穷吮,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年饥努,在試婚紗的時候發(fā)現(xiàn)自己被綠了捡鱼。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡酷愧,死狀恐怖驾诈,靈堂內(nèi)的尸體忽然破棺而出缠诅,到底是詐尸還是另有隱情,我是刑警寧澤乍迄,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布滴铅,位于F島的核電站,受9級特大地震影響就乓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拱烁,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一生蚁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧戏自,春花似錦邦投、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至猛们,卻和暖如春念脯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背弯淘。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工绿店, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人庐橙。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓假勿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親态鳖。 傳聞我的和親對象是個殘疾皇子转培,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容