Streamx在CDH6.3上部署Flink SQL任務(wù)

背景

公司平臺各端產(chǎn)生各種埋點(diǎn)日志见剩,通過自研系統(tǒng)加工落到kafka中,需要將kafka的json格式消息消費(fèi)落到數(shù)倉中溯职,之前的方案是通過flink datastream落到hbase蒂萎,再轉(zhuǎn)存到hive中,此方法需要編寫java代碼样傍,實(shí)現(xiàn)成本編寫代碼2天,加部署調(diào)試1-2天铺遂,實(shí)現(xiàn)成本偏高衫哥。因此,決定采用streamx通過flink sql實(shí)現(xiàn)實(shí)時落庫hive數(shù)倉目的襟锐。

由于streamx暫不支持sql方式切換hive catalog撤逢,所以在此利用streamx jar包能力,外層包裹一層env環(huán)境粮坞,內(nèi)部執(zhí)行flink sql蚊荣。此文章便完成此目標(biāo),達(dá)到日志落hive倉庫并調(diào)優(yōu)記錄莫杈。

Streamx漂亮的任務(wù)界面


image.png

基于streamx編寫flink sql代碼如下

-- -- 開啟 mini-batch 指定是否啟用小批量優(yōu)化 
SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的時間間隔互例,即作業(yè)需要額外忍受的延遲
SET table.exec.mini-batch.allow-latency=60s;
-- -- 一個 mini-batch 中允許最多緩存的數(shù)據(jù)
SET table.exec.mini-batch.size=50;
SET table.exec.sink.not-null-enforcer=drop;
SET table.exec.state.ttl=3666000;
-- -- 設(shè)置時區(qū)
set table.local-time-zone=Asia/Shanghai;

set table.dynamic-table-options.enabled=TRUE;
CREATE CATALOG bigdataTeHive WITH (
    'type' = 'hive',
    'default-database' = 'ods',
    'hive-conf-dir' = '/etc/hive/conf'
);


SET table.sql-dialect=default;
create table kafka_event_tracking (
 `timestamp` BIGINT
, data ROW(
           `project` string
            , data_decode ROW(
                       distinct_id STRING
            )
            , ip STRING
             , `timestamp` bigint
            , created_at bigint
            , updated_at BIGINT
            )
, `project` as cast(data.`project` as string)
, distinct_id as cast(data.data_decode.distinct_id as string)
, created_at as cast(data.created_at as bigint)
, updated_at as cast(data.updated_at as bigint)
, ts AS to_timestamp(FROM_UNIXTIME(`timestamp`/1000))
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
with (
    'connector' = 'kafka',
    'topic' = 'events-tracking',
    'properties.bootstrap.servers' = '172.18.5.15:9092,172.18.5.16:9092,172.18.5.17:9092',
    'properties.group.id' = 'events-tracking_pro',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.zookeeper.connect' = '172.18.5.10:2181/kafka,172.18.5.15:2181/kafka,172.18.5.16:2181/kafka'
);
  
 

 INSERT INTO bigdataTeHive.ods.s_et_event_tracking_kf/*+ OPTIONS(
  'format'='orc',
  'auto-compaction' = 'true',
  'compaction.file-size' = '256MB',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.kind'='custom',
  'partition.time-extractor.class'='com.phillip.bigdata.skyrocks.sink.DSHourTimeExtractor',
  'partition.time-extractor.timestamp-pattern'='$ds $hr:00:00') */
SELECT `timestamp`
     , `project`
     , distinct_id
     , `timestamp `
     , created_at
     , updated_at
     , from_unixtime(`timestamp` / 1000, 'yyyyMMdd') ds
     , from_unixtime(`timestamp` / 1000, 'HH')       hr
FROM kafka_event_tracking
where `timestamp` >= 1640188800000
;
基于任務(wù),編寫以上flink sql代碼筝闹,半天搞定媳叨,其中利用kafka-sql-connector編寫event_tracking原始消息表,再編寫落庫到hive ods表的dml語句关顷,在hive ods表中糊秆,添加sql hint語法,注釋flink sql streaming到hive的參數(shù)配置议双,自定義customer提取分區(qū)方式痘番,編寫jar包方式實(shí)現(xiàn)com.phillip.bigdata.skyrocks.sink.DSHourTimeExtractor。

flink sql main內(nèi)容

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.InputStream;
import java.util.Map;
import java.util.Objects;

/**
 * @author phillip2019
 */
public class AppMain {
    private static final Logger logger = LoggerFactory.getLogger(AppMain.class);

    public static volatile Boolean DEBUG = Boolean.FALSE;
    public static final String SQL = "sql";
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        ParameterTool parameters = ParameterTool.fromArgs(args);
        if (!parameters.has(SQL)) {
            System.err.print("--sql xxx.yml parameter is missing, this parameter is required");
            System.exit(-1);
        }

        if (!parameters.has("debug")) {
            System.err.print("--debug parameter is missing, this parameter is required");
            System.exit(-1);
        }
        DEBUG = parameters.getBoolean("debug");
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        blinkStreamEnv.setParallelism(1);
        String catalogName = "bigdataTeHive";
        String defaultDatabase = "ods";
        String hiveConfDir = "/etc/hive/conf";
        if (!DEBUG) {
            blinkStreamEnv.setParallelism(parameters.getInt("p", 4));
            // 半小時觸發(fā)一次ck, 模式為精確一次 (這是默認(rèn)值)
            blinkStreamEnv.enableCheckpointing(parameters.getInt("checkpointInterval", 3600000), CheckpointingMode.EXACTLY_ONCE);
            CheckpointConfig checkpointConfig = blinkStreamEnv.getCheckpointConfig();
            checkpointConfig.setCheckpointTimeout(3600000);
            checkpointConfig.setForceUnalignedCheckpoints(true);
            StateBackend stateBackend = new FsStateBackend(parameters.get("checkpointDir", "hdfs:///user/flink/checkpoints"), true);
            checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // 同一時間只允許一個checkpoint進(jìn)行
            checkpointConfig.setMaxConcurrentCheckpoints(1);
            // 允許兩個連續(xù)的checkpoint錯誤
            checkpointConfig.setTolerableCheckpointFailureNumber(2);
            blinkStreamEnv.setStateBackend(stateBackend);
        }

        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);

        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);
        Configuration blinkStreamTableConfiguration = blinkStreamTableEnv.getConfig().getConfiguration();
        // 開啟 mini-batch 指定是否啟用小批量優(yōu)化
        blinkStreamTableConfiguration.setString("table.exec.mini-batch.enabled", "true");
        // mini-batch的時間間隔平痰,即作業(yè)需要額外忍受的延遲
        blinkStreamTableConfiguration.setString("table.exec.mini-batch.allow-latency", "60s");
        //  一個 mini-batch 中允許最多緩存的數(shù)據(jù)
        blinkStreamTableConfiguration.setString("table.exec.mini-batch.size", "50");
        blinkStreamTableConfiguration.setString("table.exec.sink.not-null-enforcer", "drop");
        blinkStreamTableConfiguration.setString("table.exec.state.ttl", "3666000");
        // 設(shè)置時區(qū)
        blinkStreamTableConfiguration.setString("table.local-time-zone", "Asia/Shanghai");
        blinkStreamTableEnv.registerCatalog(catalogName, hive);
        blinkStreamTableEnv.useCatalog(catalogName);
        blinkStreamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        InputStream sqlInputStream = AppMain.class.getResourceAsStream(parameters.getRequired(SQL));
        if (Objects.isNull(sqlInputStream)) {
            System.err.println(String.format("The file is empty, please check if the file exists, the file path is: %s", parameters.getRequired(SQL)));
            System.exit(-1);
        }
        Yaml sqlYaml = new Yaml();
        Map<String, String> sqlContentMap = sqlYaml.load(sqlInputStream);
        logger.info("Sql content: {}", sqlContentMap);

        blinkStreamTableEnv.executeSql(sqlContentMap.get("kafkaSql"));
        blinkStreamTableEnv.createStatementSet()
                .addInsertSql(sqlContentMap.get("insertSql")).execute();
    }
}

上述代碼就是簡單創(chuàng)建stream執(zhí)行環(huán)境汞舱,其中由于要接入hive catalog,因此注冊環(huán)境為hive catalog name宗雇,執(zhí)行相關(guān)set參數(shù)設(shè)置兵拢,解析sql 模板中上訴sql文件,并執(zhí)行逾礁,就將核心代碼編寫完成了说铃。

接下來考慮整個項(xiàng)目依賴访惜,由于需要將flink jar包通過streamx部署到CDH 6.3,flink 1.12.1中腻扇,因此項(xiàng)目依賴特別復(fù)雜债热,再次踩坑無數(shù),最終形成如下pom.xml依賴腳本幼苛。

項(xiàng)目依賴pom.xml

<!--
 Copyright (c) 2019 The StreamX Project
 <p>
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements. See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership. The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License. You may obtain a copy of the License at
 <p>
 http://www.apache.org/licenses/LICENSE-2.0
 <p>
 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.phillip.bigdata</groupId>
    <artifactId>jinwei</artifactId>
    <version>1.0.0</version>

    <packaging>jar</packaging>
    <name>jinwei</name>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink112.version>1.12.0</flink112.version>
        <flink113.version>1.13.0</flink113.version>
        <flink114.version>1.14.0</flink114.version>
        <streamx.version>1.2.1</streamx.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.17.0</log4j.version>

        <streamx.version>1.0.0</streamx.version>
        <hive.version>2.1.0</hive.version>
        <hadoop.version>2.7.5</hadoop.version>
        <slf4j.version>1.7.15</slf4j.version>

        <main.class>com.phillip.bigdata.jingwei.AppMain</main.class>
        <jar-scope>compile</jar-scope>
    </properties>

    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <layout>default</layout>
        </repository>
        <repository>
            <id>jboss</id>
            <name>JBoss Repository</name>
            <url>http://repository.jboss.com/maven2/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
            </releases>
            <snapshots>
                <enabled>false</enabled>
                <checksumPolicy>warn</checksumPolicy>
            </snapshots>
        </repository>
        <!-- 指定倉庫位置窒篱,依次為local、aliyun舶沿、apache和cloudera倉庫 -->
        <repository>
            <id>local-public</id>
            <url>http://172.18.8.15:8080/repository/public/</url>
        </repository>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <!-- 發(fā)布maven私服 -->
    <distributionManagement>
        <repository>
            <id>public</id>
            <name>public</name>
            <url>http://172.18.8.15:8080/repository/public/</url>
        </repository>
        <snapshotRepository>
            <id>snapshots</id>
            <name>local-snapshots</name>
            <url>http://172.18.8.15:8080/repository/snapshots/</url>
        </snapshotRepository>
    </distributionManagement>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink112.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-uber_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-hive-2.2.0_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink112.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink112.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <scope>${jar-scope}</scope>
            <exclusions>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>org.apache.logging.log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>org.apache.logging.log4j</artifactId>
                    <groupId>log4j-1.2-api</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>org.apache.logging.log4j</artifactId>
                    <groupId>log4j-slf4j-impl</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-1.2-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-core</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-slf4j-impl</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-web</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>asm</artifactId>
                    <groupId>asm</groupId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>${hadoop.version}-10.0</version>
            <scope>${jar-scope}</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.philil.bigdata</groupId>
            <artifactId>sky_rocks</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>

        <!--定制kafka版本-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
            <version>1.26</version>
        </dependency>

        <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath Hadoop is logging to log4j! -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
            <scope>${jar-scope}</scope>
        </dependency>

        <!-- Add the two required logback dependencies -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
            <scope>${jar-scope}</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>${jar-scope}</scope>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <id>pro</id>
            <properties>
                <jar-scope>provided</jar-scope>
            </properties>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
    </profiles>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <relocations combine.children="append">
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>com.google.my-common</shadedPattern>
                                </relocation>
                                <!--替換老版本kafka-->
<!--                                <relocation>-->
<!--                                    <pattern>org.apache.kafka</pattern>-->
<!--                                    <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>-->
<!--                                </relocation>-->
                                <relocation>
                                    <pattern>com.fasterxml.jackson</pattern>
                                    <shadedPattern>org.apache.flink.jackson.shaded.com.fasterxml.jackson</shadedPattern>
                                </relocation>
                            </relocations>
                            <filters>
                                <filter>
                                    <!-- DO not copy the signatures in the META-INF folder.
                                       Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.apache.kafka:*</artifact>
                                    <excludes>
                                        <exclude>kafka/kafka-version.properties</exclude>
                                        <exclude>LICENSE</exclude>
                                        <!-- Does not contain anything relevant.
                                            Cites a binary dependency on jersey, but this is neither reflected in the
                                            dependency graph, nor are any jersey files bundled. -->
                                        <exclude>NOTICE</exclude>
                                        <exclude>common/**</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                    <!--                                        <mainClass>${main.class}</mainClass>-->
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${main.class}</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

上訴pom.xml中墙杯,注意相應(yīng)依賴的執(zhí)行范圍。線上相應(yīng)依賴都存在括荡,需要將scope更改成provided高镐,避免再次上傳同名不同版本包,導(dǎo)致版本沖突畸冲。此外嫉髓,重點(diǎn)關(guān)注shaded模塊,添加shaded轉(zhuǎn)義google common包邑闲,jackson加工成flink shaded jackson包名算行,更新jackson版本,避免沖突導(dǎo)致flink消息消費(fèi)失敗苫耸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末州邢,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子褪子,更是在濱河造成了極大的恐慌量淌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件褐筛,死亡現(xiàn)場離奇詭異,居然都是意外死亡叙身,警方通過查閱死者的電腦和手機(jī)渔扎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來信轿,“玉大人晃痴,你說我怎么就攤上這事〔坪觯” “怎么了倘核?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長即彪。 經(jīng)常有香客問我紧唱,道長活尊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任漏益,我火速辦了婚禮蛹锰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绰疤。我一直安慰自己铜犬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布轻庆。 她就那樣靜靜地躺著癣猾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪余爆。 梳的紋絲不亂的頭發(fā)上纷宇,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機(jī)與錄音龙屉,去河邊找鬼呐粘。 笑死,一個胖子當(dāng)著我的面吹牛转捕,可吹牛的內(nèi)容都是我干的作岖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼五芝,長吁一口氣:“原來是場噩夢啊……” “哼痘儡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起枢步,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤沉删,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后醉途,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體矾瑰,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年隘擎,在試婚紗的時候發(fā)現(xiàn)自己被綠了殴穴。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡货葬,死狀恐怖采幌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情休傍,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布磨取,位于F島的核電站人柿,受9級特大地震影響寝衫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜慰毅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望汹胃。 院中可真熱鬧,春花似錦着饥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至轨奄,卻和暖如春孟害,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背挪拟。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工挨务, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人玉组。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓谎柄,卻偏偏與公主長得像,于是被迫代替她去往敵國和親惯雳。 傳聞我的和親對象是個殘疾皇子朝巫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容