Flink 使用介紹相關文檔目錄
Job提交出現(xiàn)異常:No ExecutorFactory found to execute the application
詳細的報錯如下所示:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1931)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1836)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.paultech.CEPProblem.main(CEPProblem.java:73)
原因分析:
經過排查舵鳞,發(fā)現(xiàn)是從Flink 1.11開始,flink-streaming-java中的flink-clients依賴被移除啦粹。運行的時候需要添加這個依賴恰响,如下所示:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
其中scala.binary.version和flink.version屬性修改為實際項目中使用的版本
StackOverflow中相關問題鏈接:https://stackoverflow.com/questions/63032060/upgraded-flink-from-1-10-to-1-11-met-error-no-executorfactory-found-to-execute
Flink CEP 作業(yè)執(zhí)行結果異常
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1, "Device01", 22.0),
new TemperatureEvent(1, "Device01", 27.1), new TemperatureEvent(2, "Device01", 28.1),
new TemperatureEvent(1, "Device01", 22.2), new TemperatureEvent(3, "Device01", 22.1),
new TemperatureEvent(1, "Device02", 22.3), new TemperatureEvent(4, "Device02", 22.1),
new TemperatureEvent(1, "Device02", 22.4), new TemperatureEvent(5, "Device02", 22.7),
new TemperatureEvent(1, "Device02", 27.0), new TemperatureEvent(6, "Device02", 30.0));
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
.subtype(TemperatureEvent.class)
.where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getMachineName().equals("Device02")) {
return true;
}
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.select(
new RichPatternSelectFunction<TemperatureEvent, Alert>() {
/**
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
TemperatureEvent 和 Alert類為Java bean,不再貼出它們的代碼
這段代碼在1.12版本之前執(zhí)行會輸出:
Alert{message='Temperature Rise Detected: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}] on machine name: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}]'}
Alert{message='Temperature Rise Detected: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}] on machine name: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}]'}
但是在Flink 1.12版本中運行关筒,patternStream.print()
這一行沒有任何輸出描孟。debug了CepOperator
一直到StreamOneInputProcessor
驶睦,發(fā)現(xiàn)均沒有元素輸入,甚是奇怪匿醒。
第一反應可能是Time characteristic設置的問題场航。于是在創(chuàng)建env的后面添加了一行:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
然后發(fā)現(xiàn)這個調用在Flink 1.12版本中被廢棄了。原因是Flink 1.12中的流處理默認的時間特征改為TimeCharacteristic.EventTime
廉羔,也就是說從之前默認的processing time改為了event time溉痢。這就是問題所在。對于默認的event time,F(xiàn)link需要等待到10秒內的元素都到齊(下一個元素的event time為10秒后)孩饼,才會打印出結果髓削。然而,10秒鐘之后的元素還沒有到來時Flink 程序就退出了镀娶,因此不會有任何輸出立膛。
要解決這個問題,我們需要知道1.12版本中CEP設置時間特征為ProcessingTime的方式梯码。嘗試使用前邊提到的配置方法無效宝泵。
如何在CEP處理流程中使用ProcessingTime呢?我們翻閱PatternStream
的源代碼轩娶,發(fā)現(xiàn)有如下兩個方法:
/**
* Sets the time characteristic to processing time.
*/
public PatternStream<T> inProcessingTime() {
return new PatternStream<>(builder.inProcessingTime());
}
/**
* Sets the time characteristic to event time.
*/
public PatternStream<T> inEventTime() {
return new PatternStream<>(builder.inEventTime());
}
顯然鲁猩,這兩個方法分別是用來設置ProcessingTime和EventTime的。將本篇開始的代碼修改罢坝,如下所示:
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.inProcessingTime() // 注意廓握,這里是關鍵,顯式指定使用processing time
.select(
new RichPatternSelectFunction<TemperatureEvent, Alert>() {
/**
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
}
});
再次運行嘁酿,發(fā)現(xiàn)打印出了預期結果隙券。