測試數(shù)據(jù)
User Behavior Data from Taobao for Recommendation
import lombok.Data;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.io.File;
import java.net.URL;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@Data
public class HotItems {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 為了打印到控制臺的結(jié)果不亂序农尖,我們配置全局的并發(fā)為1朱躺,這里改變并發(fā)對結(jié)果正確性沒有影響
env.setParallelism(1);
// UserBehavior.csv 的本地文件路徑
URL fileUrl = HotItems.class.getClassLoader().getResource("User.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段順序是不確定的卑吭,需要顯式指定下文件中字段的順序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 創(chuàng)建 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
env.createInput(csvInput, pojoType)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始數(shù)據(jù)單位秒哲虾,將其轉(zhuǎn)成毫秒
return userBehavior.timestamp * 1000;
}
}).filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// 過濾出只有點擊的數(shù)據(jù)
return userBehavior.behavior.equals("pv");
}
}).keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy("windowEnd")
.process(new TopNHotItems(3))
.print();
env.execute("Hot Items Job");
}
/**
* 求某個窗口中前 N 名的熱門點擊商品丙躏,key 為窗口時間戳,輸出為 TopN 的結(jié)果字符串
*/
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
private final int topSize;
public TopNHotItems(int topSize) {
this.topSize = topSize;
}
// 用于存儲商品與點擊數(shù)的狀態(tài)束凑,待收齊同一個窗口的數(shù)據(jù)后晒旅,再觸發(fā) TopN 計算
private ListState<ItemViewCount> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement(
ItemViewCount input,
Context context,
Collector<String> collector) throws Exception {
// 每條數(shù)據(jù)都保存到狀態(tài)中
itemState.add(input);
// 注冊 windowEnd+1 的 EventTime Timer, 當(dāng)觸發(fā)時,說明收齊了屬于windowEnd窗口的所有商品數(shù)據(jù)
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}
@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 獲取收到的所有商品點擊量
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除狀態(tài)中的數(shù)據(jù)汪诉,釋放空間
itemState.clear();
// 按照點擊量從大到小排序
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount - o1.viewCount);
}
});
// 將排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < allItems.size() && i < topSize; i++) {
ItemViewCount currentItem = allItems.get(i);
// No1: 商品ID=12224 瀏覽量=2413
result.append("No").append(i).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 瀏覽量=").append(currentItem.viewCount)
.append("\n");
}
result.append("====================================\n\n");
// 控制輸出頻率废恋,模擬實時滾動結(jié)果
Thread.sleep(1000);
out.collect(result.toString());
}
}
/**
* 商品點擊量(窗口操作的輸出類型)
*/
public static class ItemViewCount {
public long itemId; // 商品ID
public long windowEnd; // 窗口結(jié)束時間戳
public long viewCount; // 商品的點擊量
public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
return result;
}
}
/**
* 用于輸出窗口的結(jié)果
*/
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, // 窗口的主鍵,即 itemId
TimeWindow window, // 窗口
Iterable<Long> aggregateResult, // 聚合函數(shù)的結(jié)果摩瞎,即 count 值
Collector<ItemViewCount> collector // 輸出類型為 ItemViewCount
) throws Exception {
Long itemId = ((Tuple1<Long>) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}
/**
* 用戶行為數(shù)據(jù)結(jié)構(gòu)
**/
@Data
public static class UserBehavior {
public long userId; // 用戶ID
public long itemId; // 商品ID
public int categoryId; // 商品類目ID
public String behavior; // 用戶行為, 包括("pv", "buy", "cart", "fav")
public long timestamp; // 行為發(fā)生的時間戳拴签,單位秒
}
/**
* COUNT 統(tǒng)計的聚合函數(shù)實現(xiàn),每出現(xiàn)一條記錄加一
*/
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
}