背景
公司平臺各端產(chǎn)生各種埋點(diǎn)日志见剩,通過自研系統(tǒng)加工落到kafka中,需要將kafka的json格式消息消費(fèi)落到數(shù)倉中溯职,之前的方案是通過flink datastream落到hbase蒂萎,再轉(zhuǎn)存到hive中,此方法需要編寫java代碼样傍,實(shí)現(xiàn)成本編寫代碼2天,加部署調(diào)試1-2天铺遂,實(shí)現(xiàn)成本偏高衫哥。因此,決定采用streamx通過flink sql實(shí)現(xiàn)實(shí)時落庫hive數(shù)倉目的襟锐。
由于streamx暫不支持sql方式切換hive catalog撤逢,所以在此利用streamx jar包能力,外層包裹一層env環(huán)境粮坞,內(nèi)部執(zhí)行flink sql蚊荣。此文章便完成此目標(biāo),達(dá)到日志落hive倉庫并調(diào)優(yōu)記錄莫杈。
Streamx漂亮的任務(wù)界面
基于streamx編寫flink sql代碼如下
-- -- 開啟 mini-batch 指定是否啟用小批量優(yōu)化
SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的時間間隔互例,即作業(yè)需要額外忍受的延遲
SET table.exec.mini-batch.allow-latency=60s;
-- -- 一個 mini-batch 中允許最多緩存的數(shù)據(jù)
SET table.exec.mini-batch.size=50;
SET table.exec.sink.not-null-enforcer=drop;
SET table.exec.state.ttl=3666000;
-- -- 設(shè)置時區(qū)
set table.local-time-zone=Asia/Shanghai;
set table.dynamic-table-options.enabled=TRUE;
CREATE CATALOG bigdataTeHive WITH (
'type' = 'hive',
'default-database' = 'ods',
'hive-conf-dir' = '/etc/hive/conf'
);
SET table.sql-dialect=default;
create table kafka_event_tracking (
`timestamp` BIGINT
, data ROW(
`project` string
, data_decode ROW(
distinct_id STRING
)
, ip STRING
, `timestamp` bigint
, created_at bigint
, updated_at BIGINT
)
, `project` as cast(data.`project` as string)
, distinct_id as cast(data.data_decode.distinct_id as string)
, created_at as cast(data.created_at as bigint)
, updated_at as cast(data.updated_at as bigint)
, ts AS to_timestamp(FROM_UNIXTIME(`timestamp`/1000))
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
with (
'connector' = 'kafka',
'topic' = 'events-tracking',
'properties.bootstrap.servers' = '172.18.5.15:9092,172.18.5.16:9092,172.18.5.17:9092',
'properties.group.id' = 'events-tracking_pro',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '172.18.5.10:2181/kafka,172.18.5.15:2181/kafka,172.18.5.16:2181/kafka'
);
INSERT INTO bigdataTeHive.ods.s_et_event_tracking_kf/*+ OPTIONS(
'format'='orc',
'auto-compaction' = 'true',
'compaction.file-size' = '256MB',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file',
'partition.time-extractor.kind'='custom',
'partition.time-extractor.class'='com.phillip.bigdata.skyrocks.sink.DSHourTimeExtractor',
'partition.time-extractor.timestamp-pattern'='$ds $hr:00:00') */
SELECT `timestamp`
, `project`
, distinct_id
, `timestamp `
, created_at
, updated_at
, from_unixtime(`timestamp` / 1000, 'yyyyMMdd') ds
, from_unixtime(`timestamp` / 1000, 'HH') hr
FROM kafka_event_tracking
where `timestamp` >= 1640188800000
;
基于任務(wù),編寫以上flink sql代碼筝闹,半天搞定媳叨,其中利用kafka-sql-connector編寫event_tracking原始消息表,再編寫落庫到hive ods表的dml語句关顷,在hive ods表中糊秆,添加sql hint語法,注釋flink sql streaming到hive的參數(shù)配置议双,自定義customer提取分區(qū)方式痘番,編寫jar包方式實(shí)現(xiàn)com.phillip.bigdata.skyrocks.sink.DSHourTimeExtractor。
flink sql main內(nèi)容
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
/**
* @author phillip2019
*/
public class AppMain {
private static final Logger logger = LoggerFactory.getLogger(AppMain.class);
public static volatile Boolean DEBUG = Boolean.FALSE;
public static final String SQL = "sql";
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hdfs");
ParameterTool parameters = ParameterTool.fromArgs(args);
if (!parameters.has(SQL)) {
System.err.print("--sql xxx.yml parameter is missing, this parameter is required");
System.exit(-1);
}
if (!parameters.has("debug")) {
System.err.print("--debug parameter is missing, this parameter is required");
System.exit(-1);
}
DEBUG = parameters.getBoolean("debug");
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkStreamEnv.setParallelism(1);
String catalogName = "bigdataTeHive";
String defaultDatabase = "ods";
String hiveConfDir = "/etc/hive/conf";
if (!DEBUG) {
blinkStreamEnv.setParallelism(parameters.getInt("p", 4));
// 半小時觸發(fā)一次ck, 模式為精確一次 (這是默認(rèn)值)
blinkStreamEnv.enableCheckpointing(parameters.getInt("checkpointInterval", 3600000), CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = blinkStreamEnv.getCheckpointConfig();
checkpointConfig.setCheckpointTimeout(3600000);
checkpointConfig.setForceUnalignedCheckpoints(true);
StateBackend stateBackend = new FsStateBackend(parameters.get("checkpointDir", "hdfs:///user/flink/checkpoints"), true);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 同一時間只允許一個checkpoint進(jìn)行
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 允許兩個連續(xù)的checkpoint錯誤
checkpointConfig.setTolerableCheckpointFailureNumber(2);
blinkStreamEnv.setStateBackend(stateBackend);
}
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);
Configuration blinkStreamTableConfiguration = blinkStreamTableEnv.getConfig().getConfiguration();
// 開啟 mini-batch 指定是否啟用小批量優(yōu)化
blinkStreamTableConfiguration.setString("table.exec.mini-batch.enabled", "true");
// mini-batch的時間間隔平痰,即作業(yè)需要額外忍受的延遲
blinkStreamTableConfiguration.setString("table.exec.mini-batch.allow-latency", "60s");
// 一個 mini-batch 中允許最多緩存的數(shù)據(jù)
blinkStreamTableConfiguration.setString("table.exec.mini-batch.size", "50");
blinkStreamTableConfiguration.setString("table.exec.sink.not-null-enforcer", "drop");
blinkStreamTableConfiguration.setString("table.exec.state.ttl", "3666000");
// 設(shè)置時區(qū)
blinkStreamTableConfiguration.setString("table.local-time-zone", "Asia/Shanghai");
blinkStreamTableEnv.registerCatalog(catalogName, hive);
blinkStreamTableEnv.useCatalog(catalogName);
blinkStreamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
InputStream sqlInputStream = AppMain.class.getResourceAsStream(parameters.getRequired(SQL));
if (Objects.isNull(sqlInputStream)) {
System.err.println(String.format("The file is empty, please check if the file exists, the file path is: %s", parameters.getRequired(SQL)));
System.exit(-1);
}
Yaml sqlYaml = new Yaml();
Map<String, String> sqlContentMap = sqlYaml.load(sqlInputStream);
logger.info("Sql content: {}", sqlContentMap);
blinkStreamTableEnv.executeSql(sqlContentMap.get("kafkaSql"));
blinkStreamTableEnv.createStatementSet()
.addInsertSql(sqlContentMap.get("insertSql")).execute();
}
}
上述代碼就是簡單創(chuàng)建stream執(zhí)行環(huán)境汞舱,其中由于要接入hive catalog,因此注冊環(huán)境為hive catalog name宗雇,執(zhí)行相關(guān)set參數(shù)設(shè)置兵拢,解析sql 模板中上訴sql文件,并執(zhí)行逾礁,就將核心代碼編寫完成了说铃。
接下來考慮整個項(xiàng)目依賴访惜,由于需要將flink jar包通過streamx部署到CDH 6.3,flink 1.12.1中腻扇,因此項(xiàng)目依賴特別復(fù)雜债热,再次踩坑無數(shù),最終形成如下pom.xml依賴腳本幼苛。
項(xiàng)目依賴pom.xml
<!--
Copyright (c) 2019 The StreamX Project
<p>
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
<p>
http://www.apache.org/licenses/LICENSE-2.0
<p>
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.phillip.bigdata</groupId>
<artifactId>jinwei</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>jinwei</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink112.version>1.12.0</flink112.version>
<flink113.version>1.13.0</flink113.version>
<flink114.version>1.14.0</flink114.version>
<streamx.version>1.2.1</streamx.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.17.0</log4j.version>
<streamx.version>1.0.0</streamx.version>
<hive.version>2.1.0</hive.version>
<hadoop.version>2.7.5</hadoop.version>
<slf4j.version>1.7.15</slf4j.version>
<main.class>com.phillip.bigdata.jingwei.AppMain</main.class>
<jar-scope>compile</jar-scope>
</properties>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<layout>default</layout>
</repository>
<repository>
<id>jboss</id>
<name>JBoss Repository</name>
<url>http://repository.jboss.com/maven2/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
<checksumPolicy>warn</checksumPolicy>
</snapshots>
</repository>
<!-- 指定倉庫位置窒篱,依次為local、aliyun舶沿、apache和cloudera倉庫 -->
<repository>
<id>local-public</id>
<url>http://172.18.8.15:8080/repository/public/</url>
</repository>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<!-- 發(fā)布maven私服 -->
<distributionManagement>
<repository>
<id>public</id>
<name>public</name>
<url>http://172.18.8.15:8080/repository/public/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>local-snapshots</name>
<url>http://172.18.8.15:8080/repository/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink112.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-uber_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.2.0_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink112.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink112.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>${jar-scope}</scope>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>org.apache.logging.log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>org.apache.logging.log4j</artifactId>
<groupId>log4j-1.2-api</groupId>
</exclusion>
<exclusion>
<artifactId>org.apache.logging.log4j</artifactId>
<groupId>log4j-slf4j-impl</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-1.2-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-web</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>asm</artifactId>
<groupId>asm</groupId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>${hadoop.version}-10.0</version>
<scope>${jar-scope}</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.philil.bigdata</groupId>
<artifactId>sky_rocks</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<!--定制kafka版本-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
<scope>${jar-scope}</scope>
</dependency>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
<scope>${jar-scope}</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>${jar-scope}</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>pro</id>
<properties>
<jar-scope>provided</jar-scope>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<relocations combine.children="append">
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.google.my-common</shadedPattern>
</relocation>
<!--替換老版本kafka-->
<!-- <relocation>-->
<!-- <pattern>org.apache.kafka</pattern>-->
<!-- <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>-->
<!-- </relocation>-->
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.jackson.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<!-- DO not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
<!-- <mainClass>${main.class}</mainClass>-->
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${main.class}</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
上訴pom.xml中墙杯,注意相應(yīng)依賴的執(zhí)行范圍。線上相應(yīng)依賴都存在括荡,需要將scope更改成provided高镐,避免再次上傳同名不同版本包,導(dǎo)致版本沖突畸冲。此外嫉髓,重點(diǎn)關(guān)注shaded模塊,添加shaded轉(zhuǎn)義google common包邑闲,jackson加工成flink shaded jackson包名算行,更新jackson版本,避免沖突導(dǎo)致flink消息消費(fèi)失敗苫耸。