基于Kafka+Flink+Redis的電商大屏實(shí)時計算案例

前言
一年一度的雙11又要到了,阿里的雙11銷量大屏可以說是一道特殊的風(fēng)景線。實(shí)時大屏(real-time dashboard)正在被越來越多的企業(yè)采用,用來及時呈現(xiàn)關(guān)鍵的數(shù)據(jù)指標(biāo)讳嘱。并且在實(shí)際操作中闯袒,肯定也不會僅僅計算一兩個維度虎敦。由于Flink的“真·流式計算”這一特點(diǎn),它比Spark Streaming要更適合大屏應(yīng)用政敢。本文從筆者的實(shí)際工作經(jīng)驗(yàn)抽象出簡單的模型其徙,并簡要敘述計算流程(當(dāng)然大部分都是源碼)。

數(shù)據(jù)格式與接入
簡化的子訂單消息體如下喷户。

{
"userId": 234567,
"orderId": 2902306918400,
"subOrderId": 2902306918401,
"siteId": 10219,
"siteName": "site_blabla",
"cityId": 101,
"cityName": "北京市",
"warehouseId": 636,
"merchandiseId": 187699,
"price": 299,
"quantity": 2,
"orderStatus": 1,
"isNewOrder": 0,
"timestamp": 1572963672217
}
由于訂單可能會包含多種商品唾那,故會被拆分成子訂單來表示,每條JSON消息表示一個子訂單⊥食ⅲ現(xiàn)在要按照自然日來統(tǒng)計以下指標(biāo)闹获,并以1秒的刷新頻率呈現(xiàn)在大屏上:

每個站點(diǎn)(站點(diǎn)ID即siteId)的總訂單數(shù)、子訂單數(shù)河哑、銷量與GMV避诽;
當(dāng)前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。
由于大屏的最大訴求是實(shí)時性璃谨,等待遲到數(shù)據(jù)顯然不太現(xiàn)實(shí)沙庐,因此我們采用處理時間作為時間特征,并以1分鐘的頻率做checkpointing。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然后訂閱Kafka的訂單消息作為數(shù)據(jù)源轨功。

Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
DataStream<String> sourceStream = env
  .addSource(new FlinkKafkaConsumer011<>(
    ORDER_EXT_TOPIC_NAME,                        // topic
    new SimpleStringSchema(),                    // deserializer
    consumerProps                                // consumer properties
  ))
  .setParallelism(PARTITION_COUNT)
  .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
  .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

給帶狀態(tài)的算子設(shè)定算子ID(通過調(diào)用uid()方法)是個好習(xí)慣旭斥,能夠保證Flink應(yīng)用從保存點(diǎn)重啟時能夠正確恢復(fù)狀態(tài)現(xiàn)場。為了盡量穩(wěn)妥古涧,F(xiàn)link官方也建議為每個算子都顯式地設(shè)定ID垂券,參考官方文檔。

接下來將JSON數(shù)據(jù)轉(zhuǎn)化為POJO羡滑,JSON框架采用FastJSON菇爪。

DataStream<SubOrderDetail> orderStream = sourceStream
  .map(message -> JSON.parseObject(message, SubOrderDetail.class))
  .name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已經(jīng)是預(yù)先處理好的標(biāo)準(zhǔn)化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡化柒昏。如果JSON的字段有不規(guī)范的凳宙,那么就需要手寫Getter和Setter,并用@JSONField注解來指明职祷。

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {
private static final long serialVersionUID = 1L;

private long userId;
private long orderId;
private long subOrderId;
private long siteId;
private String siteName;
private long cityId;
private String cityName;
private long warehouseId;
private long merchandiseId;
private long price;
private long quantity;
private int orderStatus;
private int isNewOrder;
private long timestamp;
}
統(tǒng)計站點(diǎn)指標(biāo)
將子訂單流按站點(diǎn)ID分組氏涩,開1天的滾動窗口,并同時設(shè)定ContinuousProcessingTimeTrigger觸發(fā)器有梆,以1秒周期觸發(fā)計算是尖。注意處理時間的時區(qū)問題,這是老生常談了泥耀。

WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
  .keyBy("siteId")
  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下來寫個聚合函數(shù)饺汹。

DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
  .aggregate(new OrderAndGmvAggregateFunc())
  .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");

public static final class OrderAndGmvAggregateFunc
implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
private static final long serialVersionUID = 1L;

@Override
public OrderAccumulator createAccumulator() {
  return new OrderAccumulator();
}

@Override
public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
  if (acc.getSiteId() == 0) {
    acc.setSiteId(record.getSiteId());
    acc.setSiteName(record.getSiteName());
  }
  acc.addOrderId(record.getOrderId());
  acc.addSubOrderSum(1);
  acc.addQuantitySum(record.getQuantity());
  acc.addGmv(record.getPrice() * record.getQuantity());
  return acc;
}

@Override
public OrderAccumulator getResult(OrderAccumulator acc) {
  return acc;
}

@Override
public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
  if (acc1.getSiteId() == 0) {
    acc1.setSiteId(acc2.getSiteId());
    acc1.setSiteName(acc2.getSiteName());
  }
  acc1.addOrderIds(acc2.getOrderIds());
  acc1.addSubOrderSum(acc2.getSubOrderSum());
  acc1.addQuantitySum(acc2.getQuantitySum());
  acc1.addGmv(acc2.getGmv());
  return acc1;
}

}
累加器類OrderAccumulator的實(shí)現(xiàn)很簡單,看源碼就大概知道它的結(jié)構(gòu)了痰催,因此不再多廢話兜辞。唯一需要注意的是訂單ID可能重復(fù),所以需要用名為orderIds的HashSet來保存它夸溶。HashSet應(yīng)付我們目前的數(shù)據(jù)規(guī)模還是沒太大問題的逸吵,如果是海量數(shù)據(jù),就考慮換用HyperLogLog吧蜘醋。

接下來就該輸出到Redis供呈現(xiàn)端查詢了胁塞。這里有個問題:一秒內(nèi)有數(shù)據(jù)變化的站點(diǎn)并不多,而ContinuousProcessingTimeTrigger每次觸發(fā)都會輸出窗口里全部的聚合數(shù)據(jù)压语,這樣做了很多無用功,并且還會增大Redis的壓力编检。所以胎食,我們在聚合結(jié)果后再接一個ProcessFunction,代碼如下允懂。

DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
  .keyBy(0)
  .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
  .name("process_site_gmv_changed").uid("process_site_gmv_changed");

public static final class OutputOrderGmvProcessFunc
extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
private static final long serialVersionUID = 1L;

private MapState<Long, OrderAccumulator> state;

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
    "state_site_order_gmv",
    Long.class,
    OrderAccumulator.class)
  );
}

@Override
public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
  long key = value.getSiteId();
  OrderAccumulator cachedValue = state.get(key);

  if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
    JSONObject result = new JSONObject();
    result.put("site_id", value.getSiteId());
    result.put("site_name", value.getSiteName());
    result.put("quantity", value.getQuantitySum());
    result.put("orderCount", value.getOrderIds().size());
    result.put("subOrderCount", value.getSubOrderSum());
    result.put("gmv", value.getGmv());
    out.collect(new Tuple2<>(key, result.toJSONString());
    state.put(key, value);
  }
}

@Override
public void close() throws Exception {
  state.clear();
  super.close();
}

}
說來也簡單厕怜,就是用一個MapState狀態(tài)緩存當(dāng)前所有站點(diǎn)的聚合數(shù)據(jù)。由于數(shù)據(jù)源是以子訂單為單位的,因此如果站點(diǎn)ID在MapState中沒有緩存粥航,或者緩存的子訂單數(shù)與當(dāng)前子訂單數(shù)不一致琅捏,表示結(jié)果有更新,這樣的數(shù)據(jù)才允許輸出递雀。

最后就可以安心地接上Redis Sink了柄延,結(jié)果會被存進(jìn)一個Hash結(jié)構(gòu)里。

// 看官請自己構(gòu)造合適的FlinkJedisPoolConfig
FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
siteResultStream
  .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
  .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
  .setParallelism(1);

public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
private static final long serialVersionUID = 1L;
private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";

@Override
public RedisCommandDescription getCommandDescription() {
  return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
}

@Override
public String getKeyFromData(Tuple2<Long, String> data) {
  return String.valueOf(data.f0);
}

@Override
public String getValueFromData(Tuple2<Long, String> data) {
  return data.f1;
}

@Override
public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
  return Optional.of(
    HASH_NAME_PREFIX +
    new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
    "SITES"
  );
}

}
商品Top N
我們可以直接復(fù)用前面產(chǎn)生的orderStream缀程,玩法與上面的GMV統(tǒng)計大同小異搜吧。這里用1秒滾動窗口就可以了。

WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
  .keyBy("merchandiseId")
  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));

DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
  .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
  .name("aggregate_merch_sales").uid("aggregate_merch_sales")
  .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函數(shù)與窗口函數(shù)的實(shí)現(xiàn)更加簡單了杨凑,最終返回的是商品ID與商品銷量的二元組滤奈。

public static final class MerchandiseSalesAggregateFunc
implements AggregateFunction<SubOrderDetail, Long, Long> {
private static final long serialVersionUID = 1L;

@Override
public Long createAccumulator() {
  return 0L;
}

@Override
public Long add(SubOrderDetail value, Long acc) {
  return acc + value.getQuantity();
}

@Override
public Long getResult(Long acc) {
  return acc;
}

@Override
public Long merge(Long acc1, Long acc2) {
  return acc1 + acc2;
}

}

public static final class MerchandiseSalesWindowFunc
implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
private static final long serialVersionUID = 1L;

@Override
public void apply(
  Tuple key,
  TimeWindow window,
  Iterable<Long> accs,
  Collector<Tuple2<Long, Long>> out) throws Exception {
  long merchId = ((Tuple1<Long>) key).f0;
  long acc = accs.iterator().next();
  out.collect(new Tuple2<>(merchId, acc));
}

}
既然數(shù)據(jù)最終都要落到Redis,那么我們完全沒必要在Flink端做Top N的統(tǒng)計撩满,直接利用Redis的有序集合(zset)就行了蜒程,商品ID作為field,銷量作為分?jǐn)?shù)值伺帘,簡單方便昭躺。不過flink-redis-connector項(xiàng)目中默認(rèn)沒有提供ZINCRBY命令的實(shí)現(xiàn)(必須再吐槽一次),我們可以自己加曼追,步驟參照之前寫過的那篇加SETEX的命令的文章窍仰,不再贅述。RedisMapper的寫法如下礼殊。

public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";

@Override
public RedisCommandDescription getCommandDescription() {
  return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
}

@Override
public String getKeyFromData(Tuple2<Long, Long> data) {
  return String.valueOf(data.f0);
}

@Override
public String getValueFromData(Tuple2<Long, Long> data) {
  return String.valueOf(data.f1);
}

@Override
public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
  return Optional.of(
    ZSET_NAME_PREFIX +
    new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
    "MERCHANDISE"
  );
}

}
后端取數(shù)時驹吮,用ZREVRANGE命令即可取出指定排名的數(shù)據(jù)了。只要數(shù)據(jù)規(guī)模不是大到難以接受晶伦,并且有現(xiàn)成的Redis碟狞,這個方案完全可以作為各類Top N需求的通用實(shí)現(xiàn)。

The End
大屏的實(shí)際呈現(xiàn)需要保密婚陪,截圖自然是沒有的族沃。以下是提交執(zhí)行時Flink Web UI給出的執(zhí)行計劃(實(shí)際有更多的統(tǒng)計任務(wù),不止3個Sink)泌参。通過復(fù)用源數(shù)據(jù)脆淹,可以在同一個Flink job內(nèi)實(shí)現(xiàn)更多統(tǒng)計需求。

關(guān)注我的公眾號沽一,后臺回復(fù)【JAVAPDF】獲取200頁面試題盖溺!
5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎铣缠?
5萬人關(guān)注的大數(shù)據(jù)成神之路烘嘱,真的不來了解一下嗎昆禽?
5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎蝇庭?

歡迎您關(guān)注《大數(shù)據(jù)成神之路》
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末醉鳖,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子哮内,更是在濱河造成了極大的恐慌盗棵,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牍蜂,死亡現(xiàn)場離奇詭異漾根,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)鲫竞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門辐怕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人从绘,你說我怎么就攤上這事寄疏。” “怎么了僵井?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵陕截,是天一觀的道長。 經(jīng)常有香客問我批什,道長农曲,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任驻债,我火速辦了婚禮乳规,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘合呐。我一直安慰自己暮的,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布淌实。 她就那樣靜靜地躺著冻辩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拆祈。 梳的紋絲不亂的頭發(fā)上恨闪,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機(jī)與錄音放坏,去河邊找鬼凛剥。 笑死,一個胖子當(dāng)著我的面吹牛轻姿,可吹牛的內(nèi)容都是我干的犁珠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼互亮,長吁一口氣:“原來是場噩夢啊……” “哼犁享!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起豹休,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤炊昆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后威根,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凤巨,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年洛搀,在試婚紗的時候發(fā)現(xiàn)自己被綠了敢茁。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡留美,死狀恐怖彰檬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情谎砾,我是刑警寧澤逢倍,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站景图,受9級特大地震影響较雕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜挚币,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一亮蒋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧忘晤,春花似錦宛蚓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至闰蛔,卻和暖如春痕钢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背序六。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工任连, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人例诀。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓随抠,卻偏偏與公主長得像裁着,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子拱她,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容