1.前言
Flink中基于DataStream的join壹士,只能實(shí)現(xiàn)在同一個(gè)窗口的兩個(gè)數(shù)據(jù)流進(jìn)行join,但是在實(shí)際中常常會(huì)存在數(shù)據(jù)亂序或者延時(shí)的情況钾埂,導(dǎo)致兩個(gè)流的數(shù)據(jù)進(jìn)度不一致球涛,就會(huì)出現(xiàn)數(shù)據(jù)跨窗口的情況庄敛,那么數(shù)據(jù)就無法在同一個(gè)窗口內(nèi)join。
Flink基于KeyedStream提供的interval join機(jī)制沽讹,intervaljoin 連接兩個(gè)keyedStream, 按照相同的key在一個(gè)相對(duì)數(shù)據(jù)時(shí)間的時(shí)間段內(nèi)進(jìn)行連接。
2.代碼示例
將訂單流與訂單品流通過訂單id進(jìn)行關(guān)聯(lián)武鲁,獲得訂單流中的會(huì)員id爽雄。
其中ds1就是訂單品流,ds2就是訂單流沐鼠,分別對(duì)ds1和ds2通過訂單id進(jìn)行keyBy操作挚瘟,得到兩個(gè)KeyedStream,再進(jìn)行intervalJoin操作饲梭;
between方法傳遞的兩個(gè)參數(shù)lowerBound和upperBound乘盖,用來控制右邊的流可以與哪個(gè)時(shí)間范圍內(nèi)的左邊的流進(jìn)行關(guān)聯(lián),即:
leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
相當(dāng)于左邊的流可以晚到lowerBound(lowerBound為負(fù)的話)時(shí)間憔涉,也可以早到upperBound(upperBound為正的話)時(shí)間订框。
DataStream<OrderItemBean> ds = ds1.keyBy(jo -> jo.getString("fk_tgou_order_id"))
.intervalJoin(ds2.keyBy(jo -> jo.getString("id")))
.between(Time.milliseconds(-5), Time.milliseconds(5))
.process(new ProcessJoinFunction<JSONObject, JSONObject, OrderItemBean>() {
@Override
public void processElement(JSONObject joItem, JSONObject joOrder, Context context, Collector<OrderItemBean> collector) throws Exception {
String order_id = joItem.getString("fk_tgou_order_id");
String item_id = joItem.getString("activity_to_product_id");
String create_time = df.format(joItem.getLong("create_time"));
String member_id = joOrder.getString("fk_member_id");
Double price = joItem.getDouble("price");
Integer quantity = joItem.getInteger("quantity");
collector.collect(new OrderItemBean(order_id, item_id, create_time, member_id, price, quantity));
}
});
ds.map(JSON::toJSONString).addSink(new FlinkKafkaProducer010<String>("berkeley-order-item", schema, produceConfig));
3.Interval Join源碼
<1> 使用Interval Join時(shí),必須要指定的時(shí)間類型為EventTime
<2>兩個(gè)KeyedStream在進(jìn)行intervalJoin并調(diào)用between方法后兜叨,跟著使用process方法穿扳;
process方法傳遞一個(gè)自定義的 ProcessJoinFunction 作為參數(shù),ProcessJoinFunction的三個(gè)參數(shù)就是左邊流的元素類型国旷,右邊流的元素類型矛物,輸出流的元素類型。
<3>intervalJoin跪但,底層是將兩個(gè)KeyedStream進(jìn)行connect操作履羞,得到ConnectedStreams,這樣的兩個(gè)數(shù)據(jù)流之間就可以實(shí)現(xiàn)狀態(tài)共享屡久,對(duì)于intervalJoin來說就是兩個(gè)流相同key的數(shù)據(jù)可以相互訪問忆首。
ConnectedStreams的keyby?被环?雄卷??
<4> 在ConnectedStreams之上執(zhí)行的操作就是IntervalJoinOperator
這里有兩個(gè)參數(shù)控制是否包括上下界蛤售,默認(rèn)都是包括的丁鹉。
a.initializeState()方法
這里面初始化了兩個(gè)狀態(tài)對(duì)象,
分別用來存儲(chǔ)兩個(gè)流的數(shù)據(jù)悴能,其中Long對(duì)應(yīng)數(shù)據(jù)的時(shí)間戳揣钦,List<BufferEntry<T1>>對(duì)應(yīng)相同時(shí)間戳的數(shù)據(jù)
b.processElement1和processElement2方法
方法描述的是,當(dāng)兩個(gè)流達(dá)到之后漠酿,比如左邊的流有數(shù)據(jù)到達(dá)之后冯凹,就去右邊的流去查找對(duì)應(yīng)上下界范圍內(nèi)的數(shù)據(jù)。這兩個(gè)方法調(diào)用的都是processElement方法。
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
(1)獲取記錄的值和時(shí)間戳宇姚,判斷是否延時(shí)匈庭,當(dāng)?shù)竭_(dá)的記錄的時(shí)間戳小于水位線時(shí),說明該數(shù)據(jù)延時(shí)浑劳,不去處理阱持,不去關(guān)聯(lián)另一條流的數(shù)據(jù)。
private boolean isLate(long timestamp) {
long currentWatermark = internalTimerService.currentWatermark();
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}
(2)將數(shù)據(jù)添加到對(duì)應(yīng)自己流的MapState緩存狀態(tài)中魔熏,key為數(shù)據(jù)的時(shí)間衷咽。
addToBuffer(ourBuffer, ourValue, ourTimestamp);
private static <T> void addToBuffer(
final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
final T value,
final long timestamp) throws Exception {
List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
}
elemsInBucket.add(new BufferEntry<>(value, false));
buffer.put(timestamp, elemsInBucket);
}
(3)去遍歷另一條流的MapState,如果ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound
蒜绽,則將數(shù)據(jù)輸出給ProcessJoinFunction調(diào)用镶骗,ourTimestamp表示流入的數(shù)據(jù)時(shí)間,timestamp表示對(duì)應(yīng)join的數(shù)據(jù)時(shí)間
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
對(duì)應(yīng)的collect方法:
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
設(shè)置結(jié)果的Timestamp為兩邊流中最大的躲雅,之后執(zhí)行processElement方法
(4)注冊(cè)定時(shí)清理時(shí)間
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
定時(shí)的清理時(shí)間鼎姊,就是當(dāng)下流入的數(shù)據(jù)的時(shí)間+relativeUpperBound,當(dāng)watermark大于該時(shí)間就需要清理相赁。
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
清理時(shí)間邏輯:
假設(shè)目前流到達(dá)的數(shù)據(jù)的時(shí)間戳為10s此蜈,between傳進(jìn)去的時(shí)間分別為1s,5s噪生,
upperBound為5s裆赵,lowerBound為1s
根據(jù) 左邊流時(shí)間戳+1s<=右邊時(shí)間戳<=左邊流時(shí)間戳+5s ;右邊時(shí)間戳-5s<=左邊流時(shí)間戳<=右邊時(shí)間戳-1s
a跺嗽。如果為左邊流數(shù)據(jù)到達(dá)战授,調(diào)用processElement1方法
此時(shí)relativeUpperBound為5,relativeLowerBound為1桨嫁,relativeUpperBound>0植兰,所以定時(shí)清理時(shí)間為10+5即15s
當(dāng)時(shí)間達(dá)到15s時(shí),清除左邊流數(shù)據(jù)璃吧,即看右邊流在15s時(shí)楣导,需要查找的左邊流時(shí)間范圍
10s<=左邊流時(shí)間戳<=14s,所以watermark>15s時(shí)可清除10s的數(shù)據(jù)畜挨。
b筒繁。如果為右邊流數(shù)據(jù)到達(dá),調(diào)用processElement2方法
此時(shí)relativeUpperBound為-1巴元,relativeLowerBound為-5毡咏,relativeUpperBound<0,所以定時(shí)清理時(shí)間為10s
當(dāng)時(shí)間達(dá)到10s時(shí)逮刨,清除右邊流數(shù)據(jù)呕缭,即看左邊流在10s時(shí),需要查找的右邊流時(shí)間范圍
11s<=右邊流時(shí)間戳<=15s,所以可以清除10s的數(shù)據(jù)恢总。