??有時(shí)候,wordcount 的案例的原理還真是好用涎嚼,當(dāng)然不過單單是從官網(wǎng)復(fù)制的案例遠(yuǎn)遠(yuǎn)是不滿足我們的平時(shí)需求的。那么假如我們?nèi)缦滦枨螅?/p>
1. 以天為單位拟赊,統(tǒng)計(jì)各個(gè)部門在每小時(shí)中銷售的商品數(shù)量,并以日期為組合鍵實(shí)時(shí)的將結(jié)果放入 redis 中去匈睁。
注意:這個(gè)需求有點(diǎn)坑爹,如果我們以普通的滾動(dòng)和滑動(dòng)窗口去實(shí)現(xiàn)是不會(huì)滿足要求的桶错,需求人員說至少1s 計(jì)算一次航唆。
{"id":"399","name":"fei niu - 399","sal":283366,"dept":"人事部","ts":1615194501416}
{"id":"398","name":"tang tang - 398","sal":209935,"dept":"燒錢部","ts":1615194501416}
{"id":"395","name":"tang tang - 395","sal":51628,"dept":"帥哥部","ts":1615194501404}
{"id":"400","name":"fei fei - 400","sal":45782,"dept":"燒錢部","ts":1615194501420}
{"id":"401","name":"fei fei - 401","sal":389162,"dept":"帥哥部","ts":1615194501424}
{"id":"402","name":"tang tang - 402","sal":127889,"dept":"人事部","ts":1615194501428}
public class App {
private static RedisUtil2 redisUtil2 = RedisUtil2.getInstance();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = GetStreamExecutionEnvironment.getEnv();
Properties prop = new Properties();
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011("luchangyin", new SimpleStringSchema() ,prop);
//myConsumer.setStartFromGroupOffsets(); // 默認(rèn)行為糯钙,從上次消費(fèi)的偏移量進(jìn)行繼續(xù)消費(fèi)。
//myConsumer.setStartFromEarliest(); //Flink從topic中最初的數(shù)據(jù)開始消費(fèi)
myConsumer.setStartFromLatest(); //最近的
DataStreamSource<String> dataStream = env.addSource(myConsumer);
//dataStream.print(); // {"id":"226","name":"tang tang - 226","sal":280751,"dept":"美女部","ts":1615191802523}
// ------------ 步驟一:json 解析并統(tǒng)計(jì)
// --------------- 步驟二:自定義 redis 的 conditionKey 并將結(jié)算結(jié)果入 redis 中去
// ---------- 步驟三:空實(shí)現(xiàn)結(jié)束流程
env.execute("wo xi huan ni");
步驟一:json 解析并統(tǒng)計(jì)
DataStream<Tuple3<String,String, String>> result = dataStream.map(new MapFunction<String, Employees>() {
public Employees map(String s) throws Exception {
Employees emp = MyJsonUtils.str2JsonObj(s);
emp.setEmpStartTime(new Date(emp.getTs()));
return emp; // Employees(eId=239, eName=tang tang - 239, eSal=286412.0, eDept=人事部, ts=1615191376732, empStartTime=Mon Mar 08 16:16:16 GMT+08:00 2021, dt=2021-03-08 16)
}).keyBy(new KeySelector<Employees, Tuple2<String,String>>() {
public Tuple2<String, String> getKey(Employees key) throws Exception {
return new Tuple2<>(key.getDt(),key.getEDept());
}).window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.aggregate(new EmpByKeyCountAgg(), new EmpByKeyWindow());
//result.print(); // (2021-03-08 16,帥哥部,62)
??這里我們自定義了 aggregate 里邊的兩個(gè)函數(shù)退腥,用于分類統(tǒng)計(jì)結(jié)果值任岸。
EmpByKeyCountAgg 類:
package com.nfdw.function;
import com.nfdw.entity.Employees;
import org.apache.flink.api.common.functions.AggregateFunction;
/** COUNT 統(tǒng)計(jì)的聚合函數(shù)實(shí)現(xiàn),每出現(xiàn)一條記錄加一
* in, acc, out
* */
public class EmpByKeyCountAgg implements AggregateFunction<Employees,Long, Long> {
public Long createAccumulator() {
return 0L;
public Long add(Employees employees, Long aLong) {
return aLong + 1;
public Long getResult(Long aLong) {
return aLong;
public Long merge(Long aLong, Long acc1) {
return aLong + acc1;
EmpByKeyWindow 類:
package com.nfdw.function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple3;
/** 用于輸出窗口的結(jié)果
* in, out, key, window
* */
public class EmpByKeyWindow implements WindowFunction<Long, Tuple3<String,String, String>, Tuple2<String,String>, TimeWindow> {
* 窗口的主鍵狡刘,即 itemId
* 窗口
* 聚合函數(shù)的結(jié)果享潜,即 count 值
* 輸出類型為 ItemViewCount
public void apply(Tuple2<String, String> strTuple2, TimeWindow timeWindow, Iterable<Long> iterable, Collector<Tuple3<String, String, String>> collector) throws Exception {
collector.collect(new Tuple3<String,String, String>(strTuple2.f0,strTuple2.f1, iterable.iterator().next().toString()));
步驟二:自定義 redis 的 conditionKey 并將結(jié)算結(jié)果入 redis 中去
DataStream<String> redisSink = result.map(new MapFunction<Tuple3<String, String, String>, String>() {
public String map(Tuple3<String, String, String> str) throws Exception {
//new Tuple2<String, String>(str.f0.substring(11),str.f2);
String[] myDate = str._1().split(" ");
String additionalKey = "index_emp_"+ myDate[0].replaceAll("-","");
String key = myDate[1];
double value = Double.valueOf(str._3());
redisUtil2.zset(additionalKey, key, value);
return additionalKey +" , "+ key +" , "+ value+ " 成功寫入reids...";
// redisSink.print(); // index_emp_20210308 , 16 , 54.0 成功寫入reids...
創(chuàng)建操作redis的工具 RedisUtil2 類:
package com.nfdw.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Set;
public class RedisUtil2 {
// private static final Logger log = LoggerFactory.getLogger(RedisUtil.class);
private static RedisUtil2 instance;
private static JedisPool jedisPool = RedisPoolUtil2.getPool();
private RedisUtil2() {
* 雙重校驗(yàn)鎖 保證單例
* @return
public static RedisUtil2 getInstance() {
if (instance == null) {
synchronized (RedisUtil2.class) {
if (instance == null) {
instance = new RedisUtil2();
return instance;
public void zset(String aditionalKey, String key, Double value) {
Jedis jedis = jedisPool.getResource();
try {
jedis.zadd(aditionalKey, value, key);
} catch (Exception e) {
} finally {
public Set<String> myZrange(String aditionalKey) {
Jedis jedis = jedisPool.getResource();
Set<String> result = null;
try {
result = jedis.zrange(aditionalKey, 0, -1);
} catch (Exception e) {
} finally {
return result;
* 通用方法:釋放Jedis
* @param jedis
private void closeJedis(Jedis jedis) {
if (jedis != null) {
redisSink.addSink(new MyAddRedisSink());
解析來我們?cè)賹?shí)現(xiàn) MyAddRedisSink 類:
package com.nfdw.utils;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class MyAddRedisSink extends RichSinkFunction<String> {
public void invoke(String value, Context context) throws Exception {
super.invoke(value, context);
System.out.println(" sink :"+value);
??大致的代碼我們已經(jīng)實(shí)現(xiàn)了,當(dāng)然還有事件操作工具類嗅蔬、json實(shí)體解析工具類以及 pom文件剑按。
MyJsonUtils 類:
package com.nfdw.utils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.nfdw.entity.Employees;
public class MyJsonUtils {
public static Employees str2JsonObj(String str){
GsonBuilder gsonBuilder = new GsonBuilder();
Gson gson = gsonBuilder.create();
return gson.fromJson(str, Employees.class);
MyDateUtils 類:
package com.nfdw.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyDateUtils {
// public static String getDate2Str(){
// SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
// return sdf.format(new Date());
// }
// public static long getDate2Timestamp(String ds){
// //創(chuàng)建SimpleDateFormat對(duì)象實(shí)例并定義好轉(zhuǎn)換格式
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date date = null;
// try {
// // 注意格式需要與上面一致,不然會(huì)出現(xiàn)異常
// date = sdf.parse(ds);
// } catch (ParseException e) {
// e.printStackTrace();
// }
// return date.getTime();
// }
// public static String getDate2Hour(String ds){
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH");
// Date date = null;
// String dateStr = null;
// try {
// date = sdf.parse(ds);
// dateStr = df.format(date);
// } catch (Exception e) {
// e.printStackTrace();
// }
// return dateStr;
// }
public static String getDate2Hour2(Date date){
//SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH");
String dateStr = null;
try {
// date = sdf.parse(ds);
dateStr = df.format(date);
} catch (Exception e) {
return dateStr;
Employees 類:
package com.nfdw.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
@Accessors(chain = true)
public class Employees {
// {"id":"619","name":"fei fei - 619","sal":306875,"dept":"人事部","ts":1615187714251}
@SerializedName(value = "id")
private String eId = "";
@SerializedName(value = "name")
private String eName = "";
@SerializedName(value = "sal")
private double eSal = 0;
@SerializedName(value = "dept")
private String eDept = "";
@SerializedName(value = "ts")
private long ts = 0L;
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Date empStartTime;
private String dt = "";
pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- redis -->
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
??由于我們是自定義的 conditionKey澜术,flink的sink接口還未提供這個(gè)功能艺蝴,因此需要我們自行處理,除了以上實(shí)現(xiàn)方式之外瘪板,也可以修改源碼進(jìn)行處理吴趴,可以參考這篇文章:https://my.oschina.net/u/4596020/blog/4517377