問(wèn)題1
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that
implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an
error: Table sink 'default_catalog.default_database.fs_table' doesn't support consuming update changes
which is
produced by node GroupAggregate(groupBy=[id, channel], select=[id, channel, COUNT(*) AS cnt])
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-
dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.j
ava:222) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(App
licationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
解決
1、注意引用曹货,flink-connector-kafka_2.12和flink-sql-connector-kafka_2.12不要同時(shí)出現(xiàn)在pom.xml
<!--flink 算子使用-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink sql 算子使用-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
2、Flink 加載 table Factory 使用的時(shí)SPI機(jī)制寿桨,而我們打的的flink jar包是不包含META-INF.services
目錄自己建好始鱼,并且要打入jar中
maven打包插件,將META-INF.services目錄下的文件打入jar中憔儿,一下是maven插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
org.apache.flink.table.factories.TableFactory內(nèi)容
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.factories.Factory內(nèi)容
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
問(wèn)題2
Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink
and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems,
please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
Unable to mount volumes for pod "my-second-cluster-taskmanager-1-2_flink(e6164522-e2f6-11eb-b173-
eeeeeeeeeeee)": timeout expired waiting for volumes to attach or mount
for pod "flink"/"my-second-cluster-taskmanager-1-2". list of unmounted volumes=[hadoop-config-volume].
list of unattached volumes=[hadoop-config-volume flink-config-volume default-token-qbpfh]
MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-my-second-cluster" not found
解決
1、1.11版本以后可以直接在Flink Client的機(jī)器上(提交作業(yè)的機(jī)器上放可,該機(jī)器上需要有kubectl客戶端環(huán)境以及flink環(huán)境)
export HADOOP_CONF_DIR
然后運(yùn)行flink run-application啟動(dòng)Flink任務(wù)谒臼,這樣Flink Client會(huì)自動(dòng)通過(guò)ConfigMap將Hadoop配置ship到JobManager和TaskManager pod并且加到classpath的
在flink-1.13.0/bin/config.sh
新增一行export HADOOP_CONF_DIR=/var/lib/jenkins/flink-k8s/file/hadoopconf
內(nèi)容如下:$ ls hadoopconf/
core-site.xml hdfs-site.xml yarn-site.xml
2、打包鏡像Dockerfile也需要指定
COPY hadoopconf/*.xml $FLINK_HOME/hadoopconf/
ENV HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf/
3耀里、依賴的hadoop jar有問(wèn)題
下載(也可以自己編譯)flink Pre-bundled Hadoop 2.7.5 https://flink.apache.org/downloads.html
jar導(dǎo)入到$FLINK_HOME/lib中
問(wèn)題3
Caused by: org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: No task slot
allocated for job ID 7b1c33d4ebbe8d47623c52d33b838f4a and allocation ID 56306de50eed492c961f884c131b2f9e.
解決
直接加大taskmanager內(nèi)存解決-Dtaskmanager.memory.process.size=1024m
問(wèn)題4
java.net.UnknownHostException: xxx-hadoop
解決
修改k8s組件中coredns confmap配置蜈缤,重新部署pod
.:53 {
errors
health
hosts {
ip0 hostname0
ip1 hostname1
...
}
...
}
問(wèn)題5
flink 命令參數(shù) -C 使用問(wèn)題
直接使用maven倉(cāng)庫(kù)http://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.0/flink-sql-connector-kafka_2.12-1.13.0.jar
拉取jar包,發(fā)現(xiàn)無(wú)法拉取成功冯挎,驗(yàn)證不是網(wǎng)絡(luò)不通問(wèn)題
解決
修改為linux httpd服務(wù),將jar放入/var/www/html/jar/ 目錄下可以實(shí)現(xiàn)自動(dòng)拉取
參考
https://blog.csdn.net/qq_31866793/article/details/114883944
https://blog.csdn.net/u013516966/article/details/106536525
https://segmentfault.com/a/1190000039198813
https://segmentfault.com/a/1190000023280126
http://apache-flink.147419.n8.nabble.com/Flink-on-k8s-1-11-3-hdfs-taskmanager-td9907.html
https://blog.csdn.net/cenjianteng/article/details/102654070