Flink1.11升級(jí)填坑

背景

現(xiàn)有集群版本是Flink 1.10.1,想要升級(jí)到社區(qū)最新的版本Flink 1.11.1.

踩坑過程

No hostname could be resolved for ip address

詳細(xì)的社區(qū)郵件討論過程如下:

http://apache-flink.147419.n8.nabble.com/Flink-1-11-submit-job-timed-out-td4982.html

在提交作業(yè)的時(shí)候怕磨,JM會(huì)瘋狂刷出大量的日志No hostname could be resolved for ip address xxxx羹幸。該xxxx ip是kubernetes分配給flink TM的內(nèi)網(wǎng)ip,JM由于這個(gè)報(bào)錯(cuò),直接time out仅胞。

kubectl run -i -t busybox --image=busybox --restart=Never

進(jìn)入到pod中反向解析flink TM的ip失敗。

/ # nslookup 10.47.96.2
Server:     10.96.0.10
Address:    10.96.0.10:53

** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN

而解析JM居然可以成功

/ # nslookup 10.34.128.8
Server:     10.96.0.10
Address:    10.96.0.10:53

8.128.34.10.in-addr.arpa    name = 10-34-128-8.flink-jobmanager.flink-test.svc.cluster.local

唯一的差別就是JM是有service剑辫。

通過添加社區(qū)提供的可選配置解決問題taskmanager-query-state-service.yaml干旧。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html

不過目前跟社區(qū)的溝通中,社區(qū)是沒有遇到這個(gè)問題的妹蔽,該問題還在進(jìn)一步討論中椎眯。

新版本waterMark改動(dòng)

新版的waterMark的生成改為

@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine and remember the
     * event timestamps, or to emit a watermark based on the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}

使用方式改為:

dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

跟舊版本的相比extractTimestamp提取時(shí)間戳的操作不見了。

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

如果按照新版的升級(jí)胳岂,那么數(shù)據(jù)的timeStamp會(huì)變成Long.Min编整。正確的使用方式是

dataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamp)->event.f1));

.assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime對(duì)應(yīng)的字段
                    }
                })

如果有自定義,使用方式如下

.assignTimestampsAndWatermarks(((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(60,60)
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime對(duì)應(yīng)的字段
                    }
                })

工具類

public class WatermarkStrategys{
    public static < T extends TimeEvent> WatermarkStrategy<T> forBoundOutOfOrderness(long futuerOutMs,long maxOutofOrderMs){
        return ((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(futuerOutMs,maxOutofOrderMs))
            .withTimestampAssigner((SerializableTimestampAssigner<T>)(element,recordTimeStamp)-> event.getEventTimeMs())
    }
}

public interface TimeEvent{
    long getEventTimeMs();
}

flink1.11乳丰,idea運(yùn)行失敗

社區(qū)討論見

http://apache-flink.147419.n8.nabble.com/flink1-11-idea-td4576.html

作業(yè)的依賴從1.10.1升級(jí)到1.11.0掌测,在idea運(yùn)行的時(shí)候報(bào)錯(cuò)

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
   at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
   at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()

解決方法:

嘗試加一下這個(gè)依賴
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

導(dǎo)致原因

https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者成艘。
  • 序言:七十年代末赏半,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子淆两,更是在濱河造成了極大的恐慌断箫,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秋冰,死亡現(xiàn)場離奇詭異仲义,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)剑勾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門埃撵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人虽另,你說我怎么就攤上這事暂刘。” “怎么了捂刺?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵谣拣,是天一觀的道長。 經(jīng)常有香客問我族展,道長森缠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任仪缸,我火速辦了婚禮贵涵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己宾茂,他們只是感情好瓷马,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著跨晴,像睡著了一般决采。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上坟奥,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天树瞭,我揣著相機(jī)與錄音,去河邊找鬼爱谁。 笑死晒喷,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的访敌。 我是一名探鬼主播凉敲,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼寺旺!你這毒婦竟也來了爷抓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤阻塑,失蹤者是張志新(化名)和其女友劉穎蓝撇,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體陈莽,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡渤昌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了走搁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片独柑。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖私植,靈堂內(nèi)的尸體忽然破棺而出忌栅,到底是詐尸還是另有隱情,我是刑警寧澤曲稼,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布索绪,位于F島的核電站,受9級(jí)特大地震影響躯肌,放射性物質(zhì)發(fā)生泄漏者春。R本人自食惡果不足惜破衔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一清女、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晰筛,春花似錦嫡丙、人聲如沸拴袭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拥刻。三九已至,卻和暖如春父泳,著一層夾襖步出監(jiān)牢的瞬間般哼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來泰國打工惠窄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蒸眠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓杆融,卻偏偏與公主長得像楞卡,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子脾歇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350