背景
現(xiàn)有集群版本是Flink 1.10.1,想要升級(jí)到社區(qū)最新的版本Flink 1.11.1.
踩坑過程
No hostname could be resolved for ip address
詳細(xì)的社區(qū)郵件討論過程如下:
http://apache-flink.147419.n8.nabble.com/Flink-1-11-submit-job-timed-out-td4982.html
在提交作業(yè)的時(shí)候怕磨,JM會(huì)瘋狂刷出大量的日志No hostname could be resolved for ip address xxxx羹幸。該xxxx ip是kubernetes分配給flink TM的內(nèi)網(wǎng)ip,JM由于這個(gè)報(bào)錯(cuò),直接time out仅胞。
kubectl run -i -t busybox --image=busybox --restart=Never
進(jìn)入到pod中反向解析flink TM的ip失敗。
/ # nslookup 10.47.96.2
Server: 10.96.0.10
Address: 10.96.0.10:53
** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN
而解析JM居然可以成功
/ # nslookup 10.34.128.8
Server: 10.96.0.10
Address: 10.96.0.10:53
8.128.34.10.in-addr.arpa name = 10-34-128-8.flink-jobmanager.flink-test.svc.cluster.local
唯一的差別就是JM是有service剑辫。
通過添加社區(qū)提供的可選配置解決問題taskmanager-query-state-service.yaml干旧。
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
不過目前跟社區(qū)的溝通中,社區(qū)是沒有遇到這個(gè)問題的妹蔽,該問題還在進(jìn)一步討論中椎眯。
新版本waterMark改動(dòng)
新版的waterMark的生成改為
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
使用方式改為:
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
跟舊版本的相比extractTimestamp提取時(shí)間戳的操作不見了。
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
如果按照新版的升級(jí)胳岂,那么數(shù)據(jù)的timeStamp會(huì)變成Long.Min编整。正確的使用方式是
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp)->event.f1));
.assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
@Override
public long extractTimestamp(StationLog element, long recordTimestamp) {
return element.getCallTime(); //指定EventTime對(duì)應(yīng)的字段
}
})
如果有自定義,使用方式如下
.assignTimestampsAndWatermarks(((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(60,60)
.withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
@Override
public long extractTimestamp(StationLog element, long recordTimestamp) {
return element.getCallTime(); //指定EventTime對(duì)應(yīng)的字段
}
})
工具類
public class WatermarkStrategys{
public static < T extends TimeEvent> WatermarkStrategy<T> forBoundOutOfOrderness(long futuerOutMs,long maxOutofOrderMs){
return ((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(futuerOutMs,maxOutofOrderMs))
.withTimestampAssigner((SerializableTimestampAssigner<T>)(element,recordTimeStamp)-> event.getEventTimeMs())
}
}
public interface TimeEvent{
long getEventTimeMs();
}
flink1.11乳丰,idea運(yùn)行失敗
社區(qū)討論見
http://apache-flink.147419.n8.nabble.com/flink1-11-idea-td4576.html
作業(yè)的依賴從1.10.1升級(jí)到1.11.0掌测,在idea運(yùn)行的時(shí)候報(bào)錯(cuò)
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
解決方法:
嘗試加一下這個(gè)依賴
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}
導(dǎo)致原因