package com.ctgu.flink.project;
import com.ctgu.flink.entity.BehaviorChannelCount;
import com.ctgu.flink.entity.MarketingUserBehavior;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class Flink_Sql_Marketing {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<MarketingUserBehavior> dataStream = env.addSource(new SimulatedMarketingUserBehaviorSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketingUserBehavior userBehavior) throws Exception {
return new Tuple2<>(userBehavior.getChannel(), userBehavior.getBehavior());
}
})
// .keyBy(MarketingUserBehavior::getBehavior)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
.print("分組求和:");
dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.map(new MyMapFunction())
.keyBy(data -> data.f0)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
.aggregate(new AverageAggregate1(), new MyWindowFunction())
.print("total:");
env.execute("Table SQL");
System.out.println("耗時: " + (System.currentTimeMillis() - start) / 1000);
}
private static class SimulatedMarketingUserBehaviorSource implements SourceFunction<MarketingUserBehavior> {
boolean running = true;
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "tencent", "ali");
Random rand = new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> sourceContext) throws Exception {
while (running) {
long userId = rand.nextLong();
String behavior = behaviorList.get(rand.nextInt(behaviorList.size()));
String channel = channelList.get(rand.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
MarketingUserBehavior userBehavior = new MarketingUserBehavior(userId, behavior, channel, timestamp);
System.out.println(userBehavior);
sourceContext.collect(userBehavior);
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class MyMapFunction extends RichMapFunction<MarketingUserBehavior, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> map(MarketingUserBehavior userBehavior) throws Exception {
return new Tuple2<>("total", 1L);
}
}
private static class AverageAggregate
implements AggregateFunction<MarketingUserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketingUserBehavior userBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
private static class AverageAggregate1
implements AggregateFunction<Tuple2<String, Long>, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<String, Long> tuple, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
private static class MyWindowFunction
implements WindowFunction<Long, BehaviorChannelCount, String, TimeWindow> {
@Override
public void apply(String key,
TimeWindow timeWindow,
Iterable<Long> iterable,
Collector<BehaviorChannelCount> out) throws Exception {
String windowEnd = new Timestamp(timeWindow.getEnd()).toString();
Long count = iterable.iterator().next();
out.collect(new BehaviorChannelCount(key, key, windowEnd, count));
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Long, BehaviorChannelCount, Tuple2<String, String>, TimeWindow> {
@Override
public void process(Tuple2<String, String> tuple2,
Context context,
Iterable<Long> iterable,
Collector<BehaviorChannelCount> out) throws Exception {
String channel = tuple2.getField(0);
String behavior = tuple2.getField(1);
String windowEnd = new Timestamp(context.window().getEnd()).toString();
Long count = iterable.iterator().next();
out.collect(new BehaviorChannelCount(behavior, channel, windowEnd, count));
}
}
}
Flink-6.Flink 分組求和
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來草冈,“玉大人她奥,你說我怎么就攤上這事≡趵猓” “怎么了哩俭?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長拳恋。 經(jīng)常有香客問我凡资,道長,這世上最難降的妖魔是什么谬运? 我笑而不...
- 正文 為了忘掉前任隙赁,我火速辦了婚禮,結(jié)果婚禮上吩谦,老公的妹妹穿的比我還像新娘鸳谜。我一直安慰自己,他們只是感情好式廷,可當我...
- 文/花漫 我一把揭開白布咐扭。 她就那樣靜靜地躺著,像睡著了一般滑废。 火紅的嫁衣襯著肌膚如雪蝗肪。 梳的紋絲不亂的頭發(fā)上,一...
- 文/蒼蘭香墨 我猛地睜開眼诱咏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了缴挖?” 一聲冷哼從身側(cè)響起袋狞,我...
- 正文 年R本政府宣布,位于F島的核電站诉儒,受9級特大地震影響葡缰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜忱反,卻給世界環(huán)境...
- 文/蒙蒙 一泛释、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧温算,春花似錦怜校、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至巩割,卻和暖如春裙顽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宣谈。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 一、簡單說明 本以為mybatis的example可以搞定group by酸钦,后面看到說不行于是曲線救國,直接查出一...
- mongodb 分組統(tǒng)計: 按deviceId、tenant分組硝拧,統(tǒng)計總記錄條數(shù)径筏、求和workload 刪除sen...
- 最近有個培訓班的小伙伴遇到了這樣的問題滋恬,他想對字符串變量進行分組求和與分組累加,但是他不知道該如何實現(xiàn)抱究,今天我們就...
- 文章詳細地址:https://tengxiaotao.top/blog/7 模型(models.py) 視圖(vi...