Flink系列 - 實(shí)時(shí)數(shù)倉之統(tǒng)計(jì)數(shù)據(jù)并入redis實(shí)戰(zhàn)(七)

??有時(shí)候,wordcount 的案例的原理還真是好用涎嚼,當(dāng)然不過單單是從官網(wǎng)復(fù)制的案例遠(yuǎn)遠(yuǎn)是不滿足我們的平時(shí)需求的。那么假如我們?nèi)缦滦枨螅?/p>

1. 以天為單位拟赊,統(tǒng)計(jì)各個(gè)部門在每小時(shí)中銷售的商品數(shù)量,并以日期為組合鍵實(shí)時(shí)的將結(jié)果放入 redis 中去匈睁。   
注意:這個(gè)需求有點(diǎn)坑爹,如果我們以普通的滾動(dòng)和滑動(dòng)窗口去實(shí)現(xiàn)是不會(huì)滿足要求的桶错,需求人員說至少1s 計(jì)算一次航唆。

數(shù)據(jù)源如下:

{"id":"399","name":"fei niu - 399","sal":283366,"dept":"人事部","ts":1615194501416}
{"id":"398","name":"tang tang - 398","sal":209935,"dept":"燒錢部","ts":1615194501416}
{"id":"395","name":"tang tang - 395","sal":51628,"dept":"帥哥部","ts":1615194501404}
{"id":"400","name":"fei fei - 400","sal":45782,"dept":"燒錢部","ts":1615194501420}
{"id":"401","name":"fei fei - 401","sal":389162,"dept":"帥哥部","ts":1615194501424}
{"id":"402","name":"tang tang - 402","sal":127889,"dept":"人事部","ts":1615194501428}

計(jì)算結(jié)果如圖:
image.png
項(xiàng)目結(jié)構(gòu)
image.png
代碼實(shí)現(xiàn)
一、創(chuàng)建APP主類院刁,主要的邏輯代碼如下:
public class App {

    private static RedisUtil2 redisUtil2 = RedisUtil2.getInstance();

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = GetStreamExecutionEnvironment.getEnv();
        //請(qǐng)求kafka數(shù)據(jù)
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","cdh101:9092");
        prop.setProperty("group.id","cloudera_mirrormaker");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011("luchangyin", new SimpleStringSchema() ,prop);

        //myConsumer.setStartFromGroupOffsets();  // 默認(rèn)行為糯钙,從上次消費(fèi)的偏移量進(jìn)行繼續(xù)消費(fèi)。
        //myConsumer.setStartFromEarliest(); //Flink從topic中最初的數(shù)據(jù)開始消費(fèi)
        myConsumer.setStartFromLatest();  //最近的

        //請(qǐng)求kafka數(shù)據(jù)
        DataStreamSource<String> dataStream = env.addSource(myConsumer);

        //dataStream.print();   // {"id":"226","name":"tang tang - 226","sal":280751,"dept":"美女部","ts":1615191802523}

        // ------------ 步驟一:json 解析并統(tǒng)計(jì)

       

        // --------------- 步驟二:自定義 redis 的 conditionKey 并將結(jié)算結(jié)果入 redis 中去

        

        // ---------- 步驟三:空實(shí)現(xiàn)結(jié)束流程
       

        env.execute("wo xi huan ni");
    }

}

步驟一:json 解析并統(tǒng)計(jì)
DataStream<Tuple3<String,String, String>> result = dataStream.map(new MapFunction<String, Employees>() {

            @Override
            public Employees map(String s) throws Exception {
                Employees emp = MyJsonUtils.str2JsonObj(s);
                emp.setEmpStartTime(new Date(emp.getTs()));
                emp.setDt(MyDateUtils.getDate2Hour2(emp.getEmpStartTime()));
                return emp; // Employees(eId=239, eName=tang tang - 239, eSal=286412.0, eDept=人事部, ts=1615191376732, empStartTime=Mon Mar 08 16:16:16 GMT+08:00 2021, dt=2021-03-08 16)
            }
        }).keyBy(new KeySelector<Employees, Tuple2<String,String>>() {
            @Override
            public Tuple2<String, String> getKey(Employees key) throws Exception {
                return new Tuple2<>(key.getDt(),key.getEDept());
            }
        }).window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
            //.window(TumblingProcessingTimeWindows.of(Time.hours(1)))
            .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
            .aggregate(new EmpByKeyCountAgg(), new EmpByKeyWindow());

        //result.print(); // (2021-03-08 16,帥哥部,62)

??這里我們自定義了 aggregate 里邊的兩個(gè)函數(shù)退腥,用于分類統(tǒng)計(jì)結(jié)果值任岸。
EmpByKeyCountAgg 類:

package com.nfdw.function;

import com.nfdw.entity.Employees;
import org.apache.flink.api.common.functions.AggregateFunction;

/** COUNT 統(tǒng)計(jì)的聚合函數(shù)實(shí)現(xiàn),每出現(xiàn)一條記錄加一
 *       in, acc, out
 * */
public class EmpByKeyCountAgg implements AggregateFunction<Employees,Long, Long> {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(Employees employees, Long aLong) {
        return aLong + 1;
    }

    @Override
    public Long getResult(Long aLong) {
        return aLong;
    }

    @Override
    public Long merge(Long aLong, Long acc1) {
        return aLong + acc1;
    }

}

EmpByKeyWindow 類:

package com.nfdw.function;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple3;

/** 用于輸出窗口的結(jié)果
 *      in, out, key, window
 * */
public class EmpByKeyWindow implements WindowFunction<Long, Tuple3<String,String, String>, Tuple2<String,String>, TimeWindow> {

    /**
     *     窗口的主鍵狡刘,即 itemId
     *     窗口
     *     聚合函數(shù)的結(jié)果享潜,即 count 值
     *     輸出類型為 ItemViewCount
     */
    @Override
    public void apply(Tuple2<String, String> strTuple2, TimeWindow timeWindow, Iterable<Long> iterable, Collector<Tuple3<String, String, String>> collector) throws Exception {
        collector.collect(new Tuple3<String,String, String>(strTuple2.f0,strTuple2.f1, iterable.iterator().next().toString()));
    }

}

步驟二:自定義 redis 的 conditionKey 并將結(jié)算結(jié)果入 redis 中去
DataStream<String> redisSink = result.map(new MapFunction<Tuple3<String, String, String>, String>() {
            @Override
            public String map(Tuple3<String, String, String> str) throws Exception {
                //new Tuple2<String, String>(str.f0.substring(11),str.f2);

                String[] myDate = str._1().split(" ");
                String additionalKey = "index_emp_"+ myDate[0].replaceAll("-","");
                String key = myDate[1];
                double value = Double.valueOf(str._3());
                redisUtil2.zset(additionalKey, key, value);

                return additionalKey +" , "+ key +" , "+ value+ " 成功寫入reids...";
            }
        });

        // redisSink.print(); // index_emp_20210308 , 16 , 54.0 成功寫入reids...

創(chuàng)建操作redis的工具 RedisUtil2 類:

package com.nfdw.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Set;

public class RedisUtil2 {
//    private static final Logger log = LoggerFactory.getLogger(RedisUtil.class);

    private static RedisUtil2 instance;

    private static JedisPool jedisPool = RedisPoolUtil2.getPool();

    private RedisUtil2() {
    }

    /**
     * 雙重校驗(yàn)鎖 保證單例
     *
     * @return
     */
    public static RedisUtil2 getInstance() {
        if (instance == null) {
            synchronized (RedisUtil2.class) {
                if (instance == null) {
                    instance = new RedisUtil2();
                }
            }
        }
        return instance;
    }

    public void zset(String aditionalKey, String key, Double value) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.zadd(aditionalKey, value, key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeJedis(jedis);
        }

    }

    public Set<String> myZrange(String aditionalKey) {
        Jedis jedis = jedisPool.getResource();
        Set<String> result = null;
        try {
            result = jedis.zrange(aditionalKey, 0, -1);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeJedis(jedis);
        }
        return result;
    }

    /**
     * 通用方法:釋放Jedis
     *
     * @param jedis
     */
    private void closeJedis(Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }
    }

}

步驟三:空實(shí)現(xiàn)結(jié)束流程
redisSink.addSink(new MyAddRedisSink());

解析來我們?cè)賹?shí)現(xiàn) MyAddRedisSink 類:

package com.nfdw.utils;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MyAddRedisSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String value, Context context) throws Exception {
        super.invoke(value, context);
        System.out.println(" sink :"+value);
    }
}

??大致的代碼我們已經(jīng)實(shí)現(xiàn)了,當(dāng)然還有事件操作工具類嗅蔬、json實(shí)體解析工具類以及 pom文件剑按。
MyJsonUtils 類:

package com.nfdw.utils;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.nfdw.entity.Employees;

public class MyJsonUtils {

    public static Employees str2JsonObj(String str){
        GsonBuilder gsonBuilder = new GsonBuilder();
        gsonBuilder.setPrettyPrinting();
        Gson gson = gsonBuilder.create();
        return gson.fromJson(str, Employees.class);
    }

}

MyDateUtils 類:

package com.nfdw.utils;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MyDateUtils {

//    public static String getDate2Str(){
//        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
//        return sdf.format(new Date());
//    }
//
//    public static long getDate2Timestamp(String ds){
//        //創(chuàng)建SimpleDateFormat對(duì)象實(shí)例并定義好轉(zhuǎn)換格式
//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//        Date date = null;
//        try {
//            // 注意格式需要與上面一致,不然會(huì)出現(xiàn)異常
//            date = sdf.parse(ds);
//        } catch (ParseException e) {
//            e.printStackTrace();
//        }
//        return date.getTime();
//    }
//
//    public static String getDate2Hour(String ds){
//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH");
//        Date date = null;
//        String dateStr = null;
//        try {
//            date = sdf.parse(ds);
//            dateStr = df.format(date);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//        return dateStr;
//    }

    public static String getDate2Hour2(Date date){
        //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH");
        String dateStr = null;
        try {
            // date = sdf.parse(ds);
            dateStr = df.format(date);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return dateStr;
    }

}

Employees 類:

package com.nfdw.entity;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;

@Data
@Accessors(chain = true)
public class Employees {

    // {"id":"619","name":"fei fei - 619","sal":306875,"dept":"人事部","ts":1615187714251}
    @SerializedName(value = "id")
    private String eId = "";
    @SerializedName(value = "name")
    private String eName = "";
    @SerializedName(value = "sal")
    private double eSal = 0;
    @SerializedName(value = "dept")
    private String eDept = "";
    @SerializedName(value = "ts")
    private long ts = 0L;

    @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private Date empStartTime;

    private String dt = "";

}

pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>MyFirstBigScreen</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>TestMyCountDemon</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.10.1</flink.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <!-- redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.10</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

??由于我們是自定義的 conditionKey澜术,flink的sink接口還未提供這個(gè)功能艺蝴,因此需要我們自行處理,除了以上實(shí)現(xiàn)方式之外瘪板,也可以修改源碼進(jìn)行處理吴趴,可以參考這篇文章:https://my.oschina.net/u/4596020/blog/4517377
??好了漆诽,案例到此為止侮攀,直接復(fù)制咱貼就可以用了,希望對(duì)你有幫助哦厢拭。兰英。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末供鸠,一起剝皮案震驚了整個(gè)濱河市畦贸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌楞捂,老刑警劉巖薄坏,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異寨闹,居然都是意外死亡胶坠,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門繁堡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沈善,“玉大人乡数,你說我怎么就攤上這事∥拍担” “怎么了净赴?”我有些...
    開封第一講書人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)罩润。 經(jīng)常有香客問我玖翅,道長(zhǎng),這世上最難降的妖魔是什么哨啃? 我笑而不...
    開封第一講書人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任烧栋,我火速辦了婚禮,結(jié)果婚禮上拳球,老公的妹妹穿的比我還像新娘审姓。我一直安慰自己,他們只是感情好祝峻,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開白布魔吐。 她就那樣靜靜地躺著,像睡著了一般莱找。 火紅的嫁衣襯著肌膚如雪酬姆。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評(píng)論 1 311
  • 那天奥溺,我揣著相機(jī)與錄音辞色,去河邊找鬼。 笑死浮定,一個(gè)胖子當(dāng)著我的面吹牛相满,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播桦卒,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼立美,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了方灾?” 一聲冷哼從身側(cè)響起建蹄,我...
    開封第一講書人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎裕偿,沒想到半個(gè)月后洞慎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嘿棘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年劲腿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蔫巩。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谆棱,死狀恐怖快压,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情垃瞧,我是刑警寧澤蔫劣,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站个从,受9級(jí)特大地震影響脉幢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜嗦锐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一嫌松、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧奕污,春花似錦萎羔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至嘱根,卻和暖如春髓废,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背该抒。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工慌洪, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人凑保。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓冈爹,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親愉适。 傳聞我的和親對(duì)象是個(gè)殘疾皇子犯助,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容