Flink水位線不觸發(fā)問題
窗口計算時遇到好幾次水位線不觸發(fā)的情況导俘,簡單總結下。
首先,介紹下Flink的事件時間(EventTime)和水位線(Watermarks)的概念肉康。
一、處理時間
如果要構造一個實時的流式應用灼舍,或早或晚都會接觸到EventTime這個概念『鸷停現(xiàn)實場景中也會遇到消息亂序到達,這里會介紹到為什么需要事件時間和如何去處理亂序到達的數(shù)據(jù)骑素。
ProcessingTime是Flink系統(tǒng)處理這條消息的時間炫乓,EventTime可以理解成是這條消息真實發(fā)生的時間。
舉個例子,創(chuàng)建一個SlidingWindow厢岂,窗口大小為10秒光督,步長為5秒。關于窗口的更多概念塔粒,可以參考Flink官方文檔——Windows结借。
案例1:消息都按時到達
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("ProcessingTime processing example")
如果source中有三條消息,對應的事件時間分別為13秒卒茬、13秒和16秒:
[站外圖片上傳中...(image-1e2304-1550219427900)]
它們會落到正確的窗口上船老,如下圖所示。13秒產(chǎn)生的兩條消息會落到窗口1[5s-15s]和窗口2[10s-20s]上圃酵,16秒產(chǎn)生的消息會落到窗口2[10s-20s]和窗口3[15s-25s]上柳畔。最后窗口fire掉時,三個窗口的count值分別為:(a,2), (a,3) and (a,1) 郭赐,和預期一致薪韩。
案例2:消息delay
其中一條13秒產(chǎn)生的消息晚到了6s,那按上面的代碼邏輯捌锭,這些消息會落到下面的窗口中:
延遲的消息會落到窗口2[10s-20s]和窗口3[15s-25s]上俘陷。這看起來對窗口2沒有影響,因為結果都是3观谦,但窗口3的結果卻不一致了拉盾。
二、事件時間
因此此處我們采用事件時間豁状,這里水位線的事件時間為當前系統(tǒng)的時間捉偏,當然你可以改成數(shù)據(jù)中的某個時間。
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
e.split(",")(1).toLong
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis)
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("EventTime processing example")
結果如下圖所示:
[站外圖片上傳中...(image-b2feb1-1550219427900)]
結果看起來好很多泻红,窗口2和3都正確了夭禽,但窗口1卻丟了一條數(shù)據(jù)。
Flink沒有將delay的數(shù)據(jù)分配給窗口3是因為現(xiàn)在是檢查消息的事件時間谊路,因此不會放入窗口三中驻粟。而沒有分配給窗口1的原因是delayed的消息到達系統(tǒng)的時間是19秒,窗口1已經(jīng)fire掉了凶异。此處就需要watermarks了。
三挤巡、水位線(水邮1颉)
我認為水位線是很重要和有趣的一個概念,我這里會大概描述下矿卑,如果想了解更多可以看Google一場精彩的talk喉恋,也可以看這個dataArtisans的blog。水位線簡單理解就是一個timestamp,當Flink收到這個水印時轻黑,F(xiàn)link理解會收到來自這個時間點之后的消息糊肤,也可以理解成告訴Flink運行到哪個事件時間了。
在這個案例中氓鄙,其實就是告訴Flink一條消息可以遲到多久馆揉。
我現(xiàn)在設置水位線為現(xiàn)在的事件提前5秒,相當于告訴Flink我的消息可以遲到五秒抖拦。
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis - 5000)
}
結果變成下圖所示:
[站外圖片上傳中...(image-6dfb6e-1550219427900)]
四升酣、允許延遲(Lateness)
如果采用“watermark - delay”,如果水位線不超過window_length + delay是不會被fire掉的态罪,所以此刻可以采用allowedLateness方法噩茄。在window_end_time + allowed lateness之前,F(xiàn)link都不會丟棄這條數(shù)據(jù)复颈。
當消息到達時绩聘,F(xiàn)link會提取它的時間,然后判斷它是否在有效的延遲時間內(nèi)耗啦,然后去判斷是否fire掉窗口凿菩。
但是通過這種途徑,一個窗口有可能被fire掉多次芹彬,如果需要exactly once processing的話蓄髓,需要保證sink是冪等的。
五舒帮、水位線怎么不觸發(fā)会喝?
數(shù)據(jù)一直有序得進來,為什么沒有窗口被fire掉玩郊?沒有結果產(chǎn)出肢执?
case1:提取時間失敗
筆者和上游約定好了數(shù)據(jù)格式,extractTimestamp中提取的是某個字段為事件時間译红。研究數(shù)據(jù)發(fā)現(xiàn)約定好的字段突然不發(fā)了预茄。
case2:提取時間有了,但是照樣失敗
上游按約定發(fā)了該字段后侦厚,系統(tǒng)在測試環(huán)境運行了一段時間耻陕,又沒有結果產(chǎn)出了。
調(diào)試發(fā)現(xiàn)提取時間正常刨沦,checkAndGetNextWatermark也正常诗宣,但是為什么窗口沒被fire掉呢。
因為時間的format變了想诅,由到毫秒的timestamp變成了yyyymmddHHmmss召庞,需要轉(zhuǎn)成timestamp岛心。
case3:一切正常,窗口不fire
EventTimeTrigger.java
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
看了下EventTimeTrigger的源碼篮灼,只有當當前的水位線越過窗口忘古,即時間大于窗口的endTime才會觸發(fā)Fire的操作。我們的處理流程沒有觸發(fā)诅诱,那就說明我們的水位線沒有更新到合適的值髓堪。調(diào)試后發(fā)現(xiàn)當前的水位線一直停留在初始的最小的long值。
BoundedOutOfOrdernessTimestampExtractor.java
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
debug發(fā)現(xiàn)lastEmittedWatermark確實有更新逢艘,這說明這個地方是觸發(fā)了Watermark的值旦袋。但是debug的過程中,發(fā)現(xiàn)時不時會出現(xiàn)初始值的水位線它改。
SystemProcessingTimeService.java
TimestampsAndPeriodicWatermarksOperator.java
@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
TimestampsAndPeriodicWatermarksOperator會做判斷:如果新的水位線小于當前的水位線疤孕,就不會更新了。
終于央拖,順著StreamInputProcessor–>StatusWatermarkValve理了下來祭阀,看見這樣的處理邏輯:
StreamInputProcessor.java
StatusWatermarkValve.java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
// determine new overall watermark by considering only watermark-aligned channels across all channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it is larger than the last output watermark
if (newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
}
}
這里會將所有的channel status的水位線做個匯總:取最小的水位線。那是不是問題出在這里鲜戒?后面debug了下看看专控,確實,這個地方有的channel status下的水位線一直是最小的long值那個不正常的水位線遏餐,進而導致整體的水位線發(fā)送不出去伦腐。
那么為什么會出現(xiàn)這種情況呢,百思不得其解失都。
當 [window_start_time,window_end_time) 有數(shù)據(jù)柏蘑,watermark Time大于等于window_end_time時,會觸發(fā)window trigger粹庞。
因為之前運行都是正常的咳焚,檢查了數(shù)據(jù)也沒問題。去翻改動庞溜,有影響的可能就是改了一些算子的并行度革半。
assigntimestampandwatermarks和map的并行度一樣了就不能生成水位線了?
于是修改了assigntimestampandwatermarks的并行度流码,window正常fire掉了又官。
分析原因:
Flink source用到了FlinkKafkaConsumer010,沒有指定KafkaPartitioner的話漫试,會通過FixedPartitioner來給出默認的partitioner方法:
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
return partitions[this.parallelInstanceId % partitions.length];
}
parallelInstanceId代表著Flink consumer程序的并行度ID六敬,假如FlinkKafkaConsumer010的并行度是12,那么這12個線程的ID分別是1-12.
parallelInstances代表著總的并行度商虐,即12觉阅。
partitions是一個kafka partition的數(shù)組,kafka的topic的partitions是4(因為性能測試秘车,換到只有一個節(jié)點的kafka)典勇。
Flink partition的規(guī)則,就是Flink的并行度ID除以kafka partition length取余叮趴。
因此kafka編號為1-4的partition分別對應source node的1-4的partition割笙,那么source node5-12的partition就為空了。
默認的partition策略是按照Flink的并行度ID與kafka中partition的數(shù)量取余的方法分配的眯亦,而與數(shù)據(jù)本身沒有關系伤溉。
source node的partition為5-12的接收不到數(shù)據(jù),
Watermark的生成是數(shù)據(jù)驅(qū)動的妻率,只有當TimestampAndWatermarkAssigner”看到”數(shù)據(jù)時乱顾,watermark才會生效。
而map和assigntimestampandwatermarks并發(fā)度一樣的話宫静,這八個partition的watermark不會修改走净,所以會出現(xiàn)watermark的初始值一直存在的情況。
當assigntimestampandwatermarks的并行度修改后孤里,事件會被shuffled across(洗牌)伏伯,因此到了TimestampAndWatermarkAssigner不會有空的partition存在了。
以上捌袜。
五 實際定位
- 問題:發(fā)現(xiàn)程序沒有輸出说搅,沒有任何報錯。
- 可以先去任務的Appmaster上看watermark運行情況.來定位是否是watermarke的問題
- 如果是顯示no watermark 虏等,按照上述的case 進行診斷弄唧。
- 在assignTimestampsAndWatermarks 后setParallelism(2) 。具體并行度根據(jù)實際需求設置博其,在這里kafkatopic只有2個分區(qū)套才,并行度設置為2