Flink 升級1.12版本的坑

Flink 使用介紹相關文檔目錄

Flink 使用介紹相關文檔目錄

Job提交出現(xiàn)異常:No ExecutorFactory found to execute the application

詳細的報錯如下所示:

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
    at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1931)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1836)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
    at com.paultech.CEPProblem.main(CEPProblem.java:73)

原因分析:

經過排查舵鳞,發(fā)現(xiàn)是從Flink 1.11開始,flink-streaming-java中的flink-clients依賴被移除啦粹。運行的時候需要添加這個依賴恰响,如下所示:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

其中scala.binary.version和flink.version屬性修改為實際項目中使用的版本

StackOverflow中相關問題鏈接:https://stackoverflow.com/questions/63032060/upgraded-flink-from-1-10-to-1-11-met-error-no-executorfactory-found-to-execute

Flink CEP 作業(yè)執(zhí)行結果異常

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1, "Device01", 22.0),
        new TemperatureEvent(1, "Device01", 27.1), new TemperatureEvent(2, "Device01", 28.1),
        new TemperatureEvent(1, "Device01", 22.2), new TemperatureEvent(3, "Device01", 22.1),
        new TemperatureEvent(1, "Device02", 22.3), new TemperatureEvent(4, "Device02", 22.1),
        new TemperatureEvent(1, "Device02", 22.4), new TemperatureEvent(5, "Device02", 22.7),
        new TemperatureEvent(1, "Device02", 27.0), new TemperatureEvent(6, "Device02", 30.0));


Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
        .subtype(TemperatureEvent.class)
        .where(new SimpleCondition<TemperatureEvent>() {
            @Override
            public boolean filter(TemperatureEvent subEvent) {
                if (subEvent.getTemperature() >= 26.0) {
                    return true;
                }
                return false;
            }
        }).where(new SimpleCondition<TemperatureEvent>() {
            @Override
            public boolean filter(TemperatureEvent subEvent) {
                if (subEvent.getMachineName().equals("Device02")) {
                    return true;
                }
                return false;
            }
        }).within(Time.seconds(10));

DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
        .select(
                new RichPatternSelectFunction<TemperatureEvent, Alert>() {
                    /**

                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println(getRuntimeContext().getUserCodeClassLoader());
                    }

                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                        return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                    }
                });

patternStream.print();

env.execute("CEP on Temperature Sensor");

TemperatureEvent 和 Alert類為Java bean,不再貼出它們的代碼

這段代碼在1.12版本之前執(zhí)行會輸出:

Alert{message='Temperature Rise Detected: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}] on machine name: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}]'}

Alert{message='Temperature Rise Detected: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}] on machine name: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}]'}

但是在Flink 1.12版本中運行关筒,patternStream.print()這一行沒有任何輸出描孟。debug了CepOperator一直到StreamOneInputProcessor驶睦,發(fā)現(xiàn)均沒有元素輸入,甚是奇怪匿醒。

第一反應可能是Time characteristic設置的問題场航。于是在創(chuàng)建env的后面添加了一行:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

然后發(fā)現(xiàn)這個調用在Flink 1.12版本中被廢棄了。原因是Flink 1.12中的流處理默認的時間特征改為TimeCharacteristic.EventTime廉羔,也就是說從之前默認的processing time改為了event time溉痢。這就是問題所在。對于默認的event time,F(xiàn)link需要等待到10秒內的元素都到齊(下一個元素的event time為10秒后)孩饼,才會打印出結果髓削。然而,10秒鐘之后的元素還沒有到來時Flink 程序就退出了镀娶,因此不會有任何輸出立膛。

要解決這個問題,我們需要知道1.12版本中CEP設置時間特征為ProcessingTime的方式梯码。嘗試使用前邊提到的配置方法無效宝泵。

如何在CEP處理流程中使用ProcessingTime呢?我們翻閱PatternStream的源代碼轩娶,發(fā)現(xiàn)有如下兩個方法:

/**
 * Sets the time characteristic to processing time.
 */
public PatternStream<T> inProcessingTime() {
    return new PatternStream<>(builder.inProcessingTime());
}

/**
 * Sets the time characteristic to event time.
 */
public PatternStream<T> inEventTime() {
    return new PatternStream<>(builder.inEventTime());
}

顯然鲁猩,這兩個方法分別是用來設置ProcessingTime和EventTime的。將本篇開始的代碼修改罢坝,如下所示:

DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
        .inProcessingTime() // 注意廓握,這里是關鍵,顯式指定使用processing time
        .select(
                new RichPatternSelectFunction<TemperatureEvent, Alert>() {
                    /**

                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println(getRuntimeContext().getUserCodeClassLoader());
                    }

                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                        return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                    }
                });

再次運行嘁酿,發(fā)現(xiàn)打印出了預期結果隙券。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市闹司,隨后出現(xiàn)的幾起案子娱仔,更是在濱河造成了極大的恐慌,老刑警劉巖游桩,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牲迫,死亡現(xiàn)場離奇詭異,居然都是意外死亡借卧,警方通過查閱死者的電腦和手機盹憎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來铐刘,“玉大人陪每,你說我怎么就攤上這事×常” “怎么了檩禾?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長疤祭。 經常有香客問我盼产,道長,這世上最難降的妖魔是什么勺馆? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任戏售,我火速辦了婚禮啦辐,結果婚禮上,老公的妹妹穿的比我還像新娘蜈项。我一直安慰自己,他們只是感情好续挟,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布紧卒。 她就那樣靜靜地躺著,像睡著了一般诗祸。 火紅的嫁衣襯著肌膚如雪跑芳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天直颅,我揣著相機與錄音博个,去河邊找鬼。 笑死功偿,一個胖子當著我的面吹牛盆佣,可吹牛的內容都是我干的。 我是一名探鬼主播械荷,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼共耍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吨瞎?” 一聲冷哼從身側響起痹兜,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎颤诀,沒想到半個月后字旭,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡崖叫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年遗淳,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片心傀。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡洲脂,死狀恐怖,靈堂內的尸體忽然破棺而出剧包,到底是詐尸還是另有隱情恐锦,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布疆液,位于F島的核電站一铅,受9級特大地震影響,放射性物質發(fā)生泄漏堕油。R本人自食惡果不足惜潘飘,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一肮之、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧卜录,春花似錦戈擒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至丑瞧,卻和暖如春柑土,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绊汹。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工稽屏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人西乖。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓狐榔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親获雕。 傳聞我的和親對象是個殘疾皇子荒叼,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內容