前言
一年一度的雙11又要到了,阿里的雙11銷量大屏可以說是一道特殊的風(fēng)景線。實(shí)時大屏(real-time dashboard)正在被越來越多的企業(yè)采用,用來及時呈現(xiàn)關(guān)鍵的數(shù)據(jù)指標(biāo)讳嘱。并且在實(shí)際操作中闯袒,肯定也不會僅僅計算一兩個維度虎敦。由于Flink的“真·流式計算”這一特點(diǎn),它比Spark Streaming要更適合大屏應(yīng)用政敢。本文從筆者的實(shí)際工作經(jīng)驗(yàn)抽象出簡單的模型其徙,并簡要敘述計算流程(當(dāng)然大部分都是源碼)。
數(shù)據(jù)格式與接入
簡化的子訂單消息體如下喷户。
{
"userId": 234567,
"orderId": 2902306918400,
"subOrderId": 2902306918401,
"siteId": 10219,
"siteName": "site_blabla",
"cityId": 101,
"cityName": "北京市",
"warehouseId": 636,
"merchandiseId": 187699,
"price": 299,
"quantity": 2,
"orderStatus": 1,
"isNewOrder": 0,
"timestamp": 1572963672217
}
由于訂單可能會包含多種商品唾那,故會被拆分成子訂單來表示,每條JSON消息表示一個子訂單⊥食ⅲ現(xiàn)在要按照自然日來統(tǒng)計以下指標(biāo)闹获,并以1秒的刷新頻率呈現(xiàn)在大屏上:
每個站點(diǎn)(站點(diǎn)ID即siteId)的總訂單數(shù)、子訂單數(shù)河哑、銷量與GMV避诽;
當(dāng)前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。
由于大屏的最大訴求是實(shí)時性璃谨,等待遲到數(shù)據(jù)顯然不太現(xiàn)實(shí)沙庐,因此我們采用處理時間作為時間特征,并以1分鐘的頻率做checkpointing。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
然后訂閱Kafka的訂單消息作為數(shù)據(jù)源轨功。
Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
ORDER_EXT_TOPIC_NAME, // topic
new SimpleStringSchema(), // deserializer
consumerProps // consumer properties
))
.setParallelism(PARTITION_COUNT)
.name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
.uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);
給帶狀態(tài)的算子設(shè)定算子ID(通過調(diào)用uid()方法)是個好習(xí)慣旭斥,能夠保證Flink應(yīng)用從保存點(diǎn)重啟時能夠正確恢復(fù)狀態(tài)現(xiàn)場。為了盡量穩(wěn)妥古涧,F(xiàn)link官方也建議為每個算子都顯式地設(shè)定ID垂券,參考官方文檔。
接下來將JSON數(shù)據(jù)轉(zhuǎn)化為POJO羡滑,JSON框架采用FastJSON菇爪。
DataStream<SubOrderDetail> orderStream = sourceStream
.map(message -> JSON.parseObject(message, SubOrderDetail.class))
.name("map_sub_order_detail").uid("map_sub_order_detail");
JSON已經(jīng)是預(yù)先處理好的標(biāo)準(zhǔn)化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡化柒昏。如果JSON的字段有不規(guī)范的凳宙,那么就需要手寫Getter和Setter,并用@JSONField注解來指明职祷。
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {
private static final long serialVersionUID = 1L;
private long userId;
private long orderId;
private long subOrderId;
private long siteId;
private String siteName;
private long cityId;
private String cityName;
private long warehouseId;
private long merchandiseId;
private long price;
private long quantity;
private int orderStatus;
private int isNewOrder;
private long timestamp;
}
統(tǒng)計站點(diǎn)指標(biāo)
將子訂單流按站點(diǎn)ID分組氏涩,開1天的滾動窗口,并同時設(shè)定ContinuousProcessingTimeTrigger觸發(fā)器有梆,以1秒周期觸發(fā)計算是尖。注意處理時間的時區(qū)問題,這是老生常談了泥耀。
WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
.keyBy("siteId")
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));
接下來寫個聚合函數(shù)饺汹。
DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
.aggregate(new OrderAndGmvAggregateFunc())
.name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
public static final class OrderAndGmvAggregateFunc
implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
private static final long serialVersionUID = 1L;
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator();
}
@Override
public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
if (acc.getSiteId() == 0) {
acc.setSiteId(record.getSiteId());
acc.setSiteName(record.getSiteName());
}
acc.addOrderId(record.getOrderId());
acc.addSubOrderSum(1);
acc.addQuantitySum(record.getQuantity());
acc.addGmv(record.getPrice() * record.getQuantity());
return acc;
}
@Override
public OrderAccumulator getResult(OrderAccumulator acc) {
return acc;
}
@Override
public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
if (acc1.getSiteId() == 0) {
acc1.setSiteId(acc2.getSiteId());
acc1.setSiteName(acc2.getSiteName());
}
acc1.addOrderIds(acc2.getOrderIds());
acc1.addSubOrderSum(acc2.getSubOrderSum());
acc1.addQuantitySum(acc2.getQuantitySum());
acc1.addGmv(acc2.getGmv());
return acc1;
}
}
累加器類OrderAccumulator的實(shí)現(xiàn)很簡單,看源碼就大概知道它的結(jié)構(gòu)了痰催,因此不再多廢話兜辞。唯一需要注意的是訂單ID可能重復(fù),所以需要用名為orderIds的HashSet來保存它夸溶。HashSet應(yīng)付我們目前的數(shù)據(jù)規(guī)模還是沒太大問題的逸吵,如果是海量數(shù)據(jù),就考慮換用HyperLogLog吧蜘醋。
接下來就該輸出到Redis供呈現(xiàn)端查詢了胁塞。這里有個問題:一秒內(nèi)有數(shù)據(jù)變化的站點(diǎn)并不多,而ContinuousProcessingTimeTrigger每次觸發(fā)都會輸出窗口里全部的聚合數(shù)據(jù)压语,這樣做了很多無用功,并且還會增大Redis的壓力编检。所以胎食,我們在聚合結(jié)果后再接一個ProcessFunction,代碼如下允懂。
DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
.keyBy(0)
.process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
.name("process_site_gmv_changed").uid("process_site_gmv_changed");
public static final class OutputOrderGmvProcessFunc
extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
private static final long serialVersionUID = 1L;
private MapState<Long, OrderAccumulator> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
"state_site_order_gmv",
Long.class,
OrderAccumulator.class)
);
}
@Override
public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
long key = value.getSiteId();
OrderAccumulator cachedValue = state.get(key);
if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
JSONObject result = new JSONObject();
result.put("site_id", value.getSiteId());
result.put("site_name", value.getSiteName());
result.put("quantity", value.getQuantitySum());
result.put("orderCount", value.getOrderIds().size());
result.put("subOrderCount", value.getSubOrderSum());
result.put("gmv", value.getGmv());
out.collect(new Tuple2<>(key, result.toJSONString());
state.put(key, value);
}
}
@Override
public void close() throws Exception {
state.clear();
super.close();
}
}
說來也簡單厕怜,就是用一個MapState狀態(tài)緩存當(dāng)前所有站點(diǎn)的聚合數(shù)據(jù)。由于數(shù)據(jù)源是以子訂單為單位的,因此如果站點(diǎn)ID在MapState中沒有緩存粥航,或者緩存的子訂單數(shù)與當(dāng)前子訂單數(shù)不一致琅捏,表示結(jié)果有更新,這樣的數(shù)據(jù)才允許輸出递雀。
最后就可以安心地接上Redis Sink了柄延,結(jié)果會被存進(jìn)一個Hash結(jié)構(gòu)里。
// 看官請自己構(gòu)造合適的FlinkJedisPoolConfig
FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
siteResultStream
.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
.name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
.setParallelism(1);
public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
private static final long serialVersionUID = 1L;
private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
}
@Override
public String getKeyFromData(Tuple2<Long, String> data) {
return String.valueOf(data.f0);
}
@Override
public String getValueFromData(Tuple2<Long, String> data) {
return data.f1;
}
@Override
public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
return Optional.of(
HASH_NAME_PREFIX +
new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
"SITES"
);
}
}
商品Top N
我們可以直接復(fù)用前面產(chǎn)生的orderStream缀程,玩法與上面的GMV統(tǒng)計大同小異搜吧。這里用1秒滾動窗口就可以了。
WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
.keyBy("merchandiseId")
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
.aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
.name("aggregate_merch_sales").uid("aggregate_merch_sales")
.returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));
聚合函數(shù)與窗口函數(shù)的實(shí)現(xiàn)更加簡單了杨凑,最終返回的是商品ID與商品銷量的二元組滤奈。
public static final class MerchandiseSalesAggregateFunc
implements AggregateFunction<SubOrderDetail, Long, Long> {
private static final long serialVersionUID = 1L;
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(SubOrderDetail value, Long acc) {
return acc + value.getQuantity();
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
public static final class MerchandiseSalesWindowFunc
implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Long> accs,
Collector<Tuple2<Long, Long>> out) throws Exception {
long merchId = ((Tuple1<Long>) key).f0;
long acc = accs.iterator().next();
out.collect(new Tuple2<>(merchId, acc));
}
}
既然數(shù)據(jù)最終都要落到Redis,那么我們完全沒必要在Flink端做Top N的統(tǒng)計撩满,直接利用Redis的有序集合(zset)就行了蜒程,商品ID作為field,銷量作為分?jǐn)?shù)值伺帘,簡單方便昭躺。不過flink-redis-connector項(xiàng)目中默認(rèn)沒有提供ZINCRBY命令的實(shí)現(xiàn)(必須再吐槽一次),我們可以自己加曼追,步驟參照之前寫過的那篇加SETEX的命令的文章窍仰,不再贅述。RedisMapper的寫法如下礼殊。
public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
}
@Override
public String getKeyFromData(Tuple2<Long, Long> data) {
return String.valueOf(data.f0);
}
@Override
public String getValueFromData(Tuple2<Long, Long> data) {
return String.valueOf(data.f1);
}
@Override
public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
return Optional.of(
ZSET_NAME_PREFIX +
new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
"MERCHANDISE"
);
}
}
后端取數(shù)時驹吮,用ZREVRANGE命令即可取出指定排名的數(shù)據(jù)了。只要數(shù)據(jù)規(guī)模不是大到難以接受晶伦,并且有現(xiàn)成的Redis碟狞,這個方案完全可以作為各類Top N需求的通用實(shí)現(xiàn)。
The End
大屏的實(shí)際呈現(xiàn)需要保密婚陪,截圖自然是沒有的族沃。以下是提交執(zhí)行時Flink Web UI給出的執(zhí)行計劃(實(shí)際有更多的統(tǒng)計任務(wù),不止3個Sink)泌参。通過復(fù)用源數(shù)據(jù)脆淹,可以在同一個Flink job內(nèi)實(shí)現(xiàn)更多統(tǒng)計需求。
關(guān)注我的公眾號沽一,后臺回復(fù)【JAVAPDF】獲取200頁面試題盖溺!
5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎铣缠?
5萬人關(guān)注的大數(shù)據(jù)成神之路烘嘱,真的不來了解一下嗎昆禽?
5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎蝇庭?