概述
無論是基本的簡單轉(zhuǎn)換和聚合而芥,還是基于窗口的計算律罢,都是針對一條流上的數(shù)據(jù)進行處理的。而在實際應(yīng)用中棍丐,可能需要將不同來源的數(shù)據(jù)連接合并在一起處理误辑,也有可能需要將一條流拆分開,所以經(jīng)常會有對多條流進行處理的場景歌逢。
簡單劃分的話巾钉,多流轉(zhuǎn)換可以分為“分流”和“合流”兩大類。目前分流的操作一般是通過側(cè)輸出流(side output)來實現(xiàn)秘案,而合流的算子比較豐富砰苍,根據(jù)不同的需求可以調(diào)用 union潦匈、connect、join 以及 coGroup 等接口進行連接合并操作赚导。
一茬缩、分流
所謂“分流”,就是將一條數(shù)據(jù)流拆分成完全獨立的兩條辟癌、甚至多條流寒屯。也就是基于一個DataStream,得到完全平等的多個子 DataStream黍少,如圖所示寡夹。一般來說,我們會定義一些篩選條件厂置,將符合條件的數(shù)據(jù)揀選出來放到對應(yīng)的流里菩掏。
1.1 簡單實現(xiàn)
其實根據(jù)條件篩選數(shù)據(jù)的需求,本身非常容易實現(xiàn):只要針對同一條流多次獨立調(diào)用.filter()方法進行篩選昵济,就可以得到拆分之后的流了智绸。
例如,我們可以將電商網(wǎng)站收集到的用戶行為數(shù)據(jù)進行一個拆分访忿,根據(jù)類型(type)的不同瞧栗,分為“Mary”的瀏覽數(shù)據(jù)、“Bob”的瀏覽數(shù)據(jù)等等海铆。那么代碼就可以這樣實現(xiàn):
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitStreamByFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env
.addSource(new ClickSource());
// 篩選Mary的瀏覽行為放入MaryStream流中
DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
});
// 篩選Bob的購買行為放入BobStream流中
DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Bob");
}
});
// 篩選其他人的瀏覽行為放入elseStream流中
DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return !value.user.equals("Mary") && !value.user.equals("Bob") ;
}
});
MaryStream.print("Mary pv");
BobStream.print("Bob pv");
elseStream.print("else pv");
env.execute();
}
}
這種實現(xiàn)非常簡單迹恐,但代碼顯得有些冗余——我們的處理邏輯對拆分出的三條流其實是一樣的,卻重復(fù)寫了三次卧斟,這明顯是不夠高效的殴边。
在早期的版本中,DataStream API 中提供了一個.split()方法珍语,專門用來將一條流“切分” 成多個锤岸。它的基本思路其實就是按照給定的篩選條件,給數(shù)據(jù)分類“蓋戳”板乙;然后基于這條蓋戳之后的流是偷,分別揀選想要的“戳”就可以得到拆分后的流。這樣我們就不必再對流進行復(fù)制了亡驰。不過這種方法有一個缺陷:因為只是“蓋戳”揀選晓猛,所以無法對數(shù)據(jù)進行轉(zhuǎn)換,分流后的數(shù)據(jù)類型必須跟原始流保持一致凡辱。這就極大地限制了分流操作的應(yīng)用場景〗渲埃現(xiàn)在 split 方法已經(jīng)淘汰掉了。
1.2 側(cè)輸出流
在 Flink 1.13 版本中透乾,已經(jīng)棄用了.split()方法洪燥,取而代之的是直接用處理函數(shù)(process function)的側(cè)輸出流(side output)磕秤。
處理函數(shù)本身可以認為是一個轉(zhuǎn)換算子,它的輸出類型是單一的捧韵,處理之后得到的仍然是一個 DataStream市咆;而側(cè)輸出流則不受限制,可以任意自定義輸出數(shù)據(jù)再来,它們就像從“主流”上分叉出的“支流”蒙兰。盡管看起來主流和支流有所區(qū)別,不過實際上它們都是某種類型的 DataStream芒篷,所以本質(zhì)上還是平等的搜变。利用側(cè)輸出流就可以很方便地實現(xiàn)分流操作,而且得到的多條DataStream 類型可以不同针炉,這就給我們的應(yīng)用帶來了極大的便利挠他。
關(guān)于處理函數(shù)中側(cè)輸出流的用法,簡單來說篡帕,只需要調(diào)用上下文 ctx 的.output()方法殖侵,就可以輸出任意類型的數(shù)據(jù)了。而側(cè)輸出流的標記和提取镰烧,都離不開一個“輸出標簽”(OutputTag)拢军,它就相當于 split()分流時的“戳”,指定了側(cè)輸出流的 id 和類型怔鳖。
import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import scala.Tuple3;
import java.time.Duration;
/**
* @Author: huangyibo
* @Date: 2022/7/19 0:34
* @Description:
*/
public class SideOutputStreamTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//設(shè)置生成水位線的時間間隔
env.getConfig().setAutoWatermarkInterval(100);
//亂序流的Watermark生成
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
// 插入水位線的邏輯 設(shè)置 watermark 延遲時間骤竹,2 秒
.assignTimestampsAndWatermarks(
// 針對亂序流插入水位線快鱼,延遲時間設(shè)置為 2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// 抽取時間戳的邏輯
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
);
OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){};
OutputTag<Tuple3<String, String, Long>> boboTag = new OutputTag<Tuple3<String, String, Long>>("Bobo"){};
SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
if ("Mary".equals(event.getUser())) {
context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
} else if ("Bobo".equals(event.getUser())) {
context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
} else {
out.collect(event);
}
}
});
processStream.print("else");
processStream.getSideOutput(maryTag).print("Mary");
processStream.getSideOutput(boboTag).print("Bobo");
env.execute();
}
}
二隘道、基本合流操作
2.1 聯(lián)合(Union)
最簡單的合流操作世落,就是直接將多條流合在一起魏铅,叫作流的“聯(lián)合”(union)昌犹,如圖所示。聯(lián)合操作要求必須流中的數(shù)據(jù)類型必須相同览芳,合并之后的新流會包括所有流中的元素斜姥, 數(shù)據(jù)類型不變。
在代碼中沧竟,我們只要基于DataStream 直接調(diào)用.union()方法铸敏,傳入其他DataStream 作為參數(shù),就可以實現(xiàn)流的聯(lián)合了悟泵;得到的依然是一個DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的參數(shù)可以是多個 DataStream杈笔,所以聯(lián)合操作可以實現(xiàn)多條流的合并。
在事件時間語義下糕非,水位線是時間的進度標志蒙具;不同的流中可能水位線的進展快慢完全不同球榆,所以對于合流之后的水位線,也是要以最小的那個為準禁筏,這樣才可以保證所有流都不會再傳來之前的數(shù)據(jù)持钉。換句話說,多流合并時處理的時效性是以最慢的那個流為準的篱昔。
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Author: huangyibo
* @Date: 2022/7/31 16:13
* @Description:
*/
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("192.168.111.188", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
stream1.print("stream1");
SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("192.168.111.189", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
stream2.print("stream2");
// 合并兩條流
stream1.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
out.collect("水位線:" + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
2.2 連接(Connect)
流的聯(lián)合雖然簡單每强,不過受限于數(shù)據(jù)類型不能改變,靈活性大打折扣州刽,所以實際應(yīng)用較少出現(xiàn)空执。除了聯(lián)合(union),F(xiàn)link 還提供了另外一種方便的合流操作——連接(connect)怀伦。顧名思義脆烟,這種操作就是直接把兩條流像接線一樣對接起來。
2.2.1 連接流(ConnectedStreams)
為了處理更加靈活房待,連接操作允許流的數(shù)據(jù)類型不同邢羔。但我們知道一個 DataStream 中的數(shù)據(jù)只能有唯一的類型,所以連接得到的并不是 DataStream 桑孩, 而是一個“ 連接流”(ConnectedStreams)拜鹤。
連接流可以看成是兩條流形式上的“統(tǒng)一”,被放在了一個同一個流中流椒;事實上內(nèi)部仍保持各自的數(shù)據(jù)形式不變敏簿,彼此之間是相互獨立的。要想得到新的DataStream宣虾, 還需要進一步定義一個“同處理”(co-process)轉(zhuǎn)換操作惯裕,用來說明對于不同來源、不同類型 的數(shù)據(jù)绣硝,怎樣分別進行處理轉(zhuǎn)換蜻势、得到統(tǒng)一的輸出類型。兩條流可以保持各自的數(shù)據(jù)類型鹉胖、處理方式也可以不同握玛,不過最終還是會統(tǒng)一到同一個 DataStream 中。
在代碼實現(xiàn)上甫菠,需要分為兩步:首先基于一條 DataStream 調(diào)用.connect()方法挠铲,傳入另外一條DataStream 作為參數(shù),將兩條流連接起來寂诱,得到一個 ConnectedStreams拂苹;然后再調(diào)用同處理方法得到 DataStream。這里可以的調(diào)用的同處理方法有.map()/.flatMap()痰洒,以及.process()方法醋寝。
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* @Author: huangyibo
* @Date: 2022/7/31 16:15
* @Description:
*/
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
DataStream<Long> stream2 = env.fromElements(1L, 2L, 3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
@Override
public String map1(Integer value) {
return "Integer: " + value;
}
@Override
public String map2(Long value) {
return "Long: " + value;
}
});
result.print();
env.execute();
}
}
上面的代碼中搞挣,ConnectedStreams 有兩個類型參數(shù),分別表示內(nèi)部包含的兩條流各自的數(shù)據(jù)類型音羞;因此調(diào)用.map()方法時傳入的不再是一個簡單的 MapFunction囱桨, 而是一個 CoMapFunction,表示分別對兩條流中的數(shù)據(jù)執(zhí)行 map 操作嗅绰。
這個接口有三個類型參數(shù)舍肠,依次表示第一條流、第二條流窘面,以及合并后的流中的數(shù)據(jù)類型翠语。需要實現(xiàn)的方法也非常直白:.map1()就是對第一條流中數(shù)據(jù)的 map 操作,.map2()則是針對第二條流财边。這里我們將一條 Integer 流和一條Long 流合并肌括,轉(zhuǎn)換成 String 輸出。所以當遇到第一條流輸入的整型值時酣难, 調(diào)用.map1()谍夭;而遇到第二條流輸入的長整型數(shù)據(jù)時,調(diào)用.map2():最終都轉(zhuǎn)換為字符串輸出憨募, 合并成了一條字符串流紧索。
值得一提的是,ConnectedStreams 也可以直接調(diào)用.keyBy()進行按鍵分區(qū)的操作菜谣,得到的還是一個ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
這里傳入兩個參數(shù) keySelector1 和 keySelector2珠漂,是兩條流中各自的鍵選擇器;當然也可以直接傳入鍵的位置值(keyPosition)尾膊,或者鍵的字段名(field)媳危,這與普通的 keyBy 用法完全一致。ConnectedStreams 進行 keyBy 操作冈敛,其實就是把兩條流中 key 相同的數(shù)據(jù)放到了一起济舆, 然后針對來源的流再做各自處理,這在一些場景下非常有用莺债。另外,我們也可以在合并之前就將兩條流分別進行 keyBy签夭,得到的 KeyedStream 再進行連接(connect)操作齐邦,效果是一樣的。要注意兩條流定義的鍵的類型必須相同第租,否則會拋出異常措拇。
兩條流的連接(connect),與聯(lián)合(union)操作相比慎宾,最大的優(yōu)勢就是可以處理不同類型的流的合并丐吓,使用更靈活浅悉、應(yīng)用更廣泛。當然它也有限制券犁,就是合并流的數(shù)量只能是 2术健,而 union 可以同時進行多條流的合并。這也非常容易理解:union 限制了類型不變粘衬,所以直接合并沒有問題荞估。
2.2.2 CoProcessFunction
對于連接流ConnectedStreams 的處理操作,需要分別定義對兩條流的處理轉(zhuǎn)換稚新,因此接口中就會有兩個相同的方法需要實現(xiàn)勘伺,用數(shù)字“1” “2”區(qū)分,在兩條流中的數(shù)據(jù)到來時分別調(diào)用褂删。我們把這種接口叫作“協(xié)同處理函數(shù)”(co-process function)飞醉。與 CoMapFunction 類似,如果是調(diào)用.flatMap()就需要傳入一個 CoFlatMapFunction屯阀,需要實現(xiàn)flatMap1()缅帘、flatMap2()兩個方法;而調(diào)用.process()時蹲盘,傳入的則是一個CoProcessFunction股毫。
抽象類 CoProcessFunction 在源碼中定義如下:
public abstract class CoProcessFunction<IN1,IN2,OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
public abstract class Context {...}
...
}
可以看到,很明顯 CoProcessFunction 也是“處理函數(shù)”家族中的一員召衔,用法非常相似铃诬。它需要實現(xiàn)的就是 processElement1()、processElement2()兩個方法苍凛,在每個數(shù)據(jù)到來時趣席, 會根據(jù)來源的流調(diào)用其中的一個方法進行處理。CoProcessFunction 同樣可以通過上下文 ctx 來訪問 timestamp醇蝴、水位線宣肚,并通過 TimerService 注冊定時器;另外也提供了.onTimer()方法悠栓,用于定義定時觸發(fā)的處理操作霉涨。
下面是 CoProcessFunction 的一個具體示例:我們可以實現(xiàn)一個實時對賬的需求,也就是app 的支付操作和第三方的支付操作的一個雙流 Join惭适。App 的支付事件和第三方的支付事件將會互相等待 5 秒鐘笙瑟,如果等不來對應(yīng)的支付事件,那么就輸出報警信息癞志。程序如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Author: huangyibo
* @Date: 2022/7/31 16:24
* @Description: 實時對賬
*/
public class BillCheckExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 來自app的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
// 來自第三方支付平臺的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
return element.f3;
}
})
);
// 檢測同一支付單在兩條流中是否匹配往枷,不匹配就報警
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
// 自定義實現(xiàn)CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
// 定義狀態(tài)變量,用來保存已經(jīng)到達的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
// 看另一條流中事件是否來過
if (thirdPartyEventState.value() != null){
out.collect("對賬成功:" + value + " " + thirdPartyEventState.value());
// 清空狀態(tài)
thirdPartyEventState.clear();
} else {
// 更新狀態(tài)
appEventState.update(value);
// 注冊一個5秒后的定時器,開始等待另一條流的事件
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null){
out.collect("對賬成功:" + appEventState.value() + " " + value);
// 清空狀態(tài)
appEventState.clear();
} else {
// 更新狀態(tài)
thirdPartyEventState.update(value);
// 注冊一個5秒后的定時器错洁,開始等待另一條流的事件
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定時器觸發(fā)秉宿,判斷狀態(tài),如果某個狀態(tài)不為空屯碴,說明另一條流中事件沒來
if (appEventState.value() != null) {
out.collect("對賬失斆枘馈:" + appEventState.value() + " " + "第三方支付平臺信息未到");
}
if (thirdPartyEventState.value() != null) {
out.collect("對賬失敗:" + thirdPartyEventState.value() + " " + "app信息未到");
}
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
2.2.3 廣播連接流(BroadcastConnectedStream)
關(guān)于兩條流的連接窿锉,還有一種比較特殊的用法:DataStream 調(diào)用.connect()方法時酌摇,傳入的參數(shù)也可以不是一個DataStream,而是一個“廣播流”(BroadcastStream)嗡载,這時合并兩條流得到的就變成了一個“廣播連接流”(BroadcastConnectedStream)窑多。
這種連接方式往往用在需要動態(tài)定義某些規(guī)則或配置的場景。因為規(guī)則是實時變動的洼滚,所以我們可以用一個單獨的流來獲取規(guī)則數(shù)據(jù)埂息;而這些規(guī)則或配置是對整個應(yīng)用全局有效的,所以不能只把這數(shù)據(jù)傳遞給一個下游并行子任務(wù)處理遥巴,而是要“廣播”(broadcast)給所有的并行子任務(wù)千康。而下游子任務(wù)收到廣播出來的規(guī)則,會把它保存成一個狀態(tài)铲掐,這就是所謂的“廣播狀態(tài)”(broadcast state)拾弃。
廣播狀態(tài)底層是用一個“映射”(map)結(jié)構(gòu)來保存的。在代碼實現(xiàn)上摆霉,可以直接調(diào)用 DataStream的.broadcast()方法豪椿,傳入一個“映射狀態(tài)描述器”(MapStateDescriptor)說明狀態(tài)的名稱和類型,就可以得到規(guī)則數(shù)據(jù)的“廣播流”(BroadcastStream):
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
接下來我們就可以將要處理的數(shù)據(jù)流携栋,與這條廣播流進行連接(connect)搭盾,得到的就是所謂的“廣播連接流”(BroadcastConnectedStream)⊥裰В基于BroadcastConnectedStream 調(diào)用.process()方法鸯隅,就可以同時獲取規(guī)則和數(shù)據(jù),進行動態(tài)處理了向挖。
這里既然調(diào)用了.process()方法蝌以,當然傳入的參數(shù)也應(yīng)該是處理函數(shù)大家族中一員——如果對數(shù)據(jù)流調(diào)用過keyBy 進行了按鍵分區(qū),那么要傳入的就是KeyedBroadcastProcessFunction何之; 如果沒有按鍵分區(qū)跟畅,就傳入BroadcastProcessFunction。
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process(new BroadcastProcessFunction<>() {...});
BroadcastProcessFunction 與 CoProcessFunction 類似帝美,同樣是一個抽象類,需要實現(xiàn)兩個方法,針對合并的兩條流中元素分別定義處理操作悼潭。區(qū)別在于這里一條流是正常處理數(shù)據(jù)庇忌,而另一條流則是要用新規(guī)則來更新廣播狀態(tài),所以對應(yīng)的兩個方法叫作.processElement()以及.processBroadcastElement()舰褪。源碼中定義如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
...
}
三皆疹、基于時間的合流——雙流聯(lián)結(jié)(Join)
對于兩條流的合并,很多情況我們并不是簡單地將所有數(shù)據(jù)放在一起占拍,而是希望根據(jù)某個字段的值將它們聯(lián)結(jié)起來略就,“配對”去做處理。例如用傳感器監(jiān)控火情時晃酒,我們需要將大量溫度傳感器和煙霧傳感器采集到的信息表牢,按照傳感器 ID 分組、再將兩條流中數(shù)據(jù)合并起來贝次,如果同時超過設(shè)定閾值就要報警崔兴。
我們發(fā)現(xiàn),這種需求與關(guān)系型數(shù)據(jù)庫中表的 join 操作非常相近蛔翅。事實上敲茄,F(xiàn)link 中兩條流的 connect 操作,就可以通過 keyBy 指定鍵進行分組后合并山析,實現(xiàn)了類似于 SQL 中的 join 操作堰燎;另外 connect 支持處理函數(shù),可以使用自定義狀態(tài)和TimerService 靈活實現(xiàn)各種需求笋轨,其實已經(jīng)能夠處理雙流合并的大多數(shù)場景秆剪。
不過處理函數(shù)是底層接口,所以盡管connect 能做的事情多翩腐,但在一些具體應(yīng)用場景下還是顯得太過抽象了鸟款。比如,如果我們希望統(tǒng)計固定時間內(nèi)兩條流數(shù)據(jù)的匹配情況茂卦,那就需要設(shè)置定時器何什、自定義觸發(fā)邏輯來實現(xiàn)——其實這完全可以用窗口(window)來表示。為了更方便地實現(xiàn)基于時間的合流操作等龙,F(xiàn)link 的 DataStrema API 提供了兩種內(nèi)置的 join 算子处渣,以及coGroup 算子。
注:SQL 中 join 一般會翻譯為“連接”蛛砰;這里為了區(qū)分不同的算子罐栈,一般的合流操作connect 翻譯為“連接”,而把 join 翻譯為“聯(lián)結(jié)”泥畅。
3.1 窗口聯(lián)結(jié)(Window Join)
基于時間的操作荠诬,最基本的當然就是時間窗口了。之前已經(jīng)介紹過 Window API 的用法,主要是針對單一數(shù)據(jù)流在某些時間段內(nèi)的處理計算柑贞。
Flink 為這種場景專門提供了一個窗口聯(lián)結(jié)(window join)算子方椎,可以定義時間窗口,并將兩條流中共享一個公共鍵(key)的數(shù)據(jù)放在窗口中進行配對處理钧嘶。
3.1.1 窗口聯(lián)結(jié)的調(diào)用
窗口聯(lián)結(jié)在代碼中的實現(xiàn)棠众,首先需要調(diào)用DataStream 的.join()方法來合并兩條流,得到一個 JoinedStreams 有决;接著通過.where() 和.equalTo() 方法指定兩條流中聯(lián)結(jié)的 key闸拿; 然后通過.window()開窗口,并調(diào)用.apply()傳入聯(lián)結(jié)窗口函數(shù)進行處理計算书幕。通用調(diào)用形式如下:
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
上面代碼中.where()的參數(shù)是鍵選擇器(KeySelector)新荤,用來指定第一條流中的 key;而.equalTo()傳入的 KeySelector 則指定了第二條流中的 key按咒。兩者相同的元素迟隅,如果在同一窗口中,就可以匹配起來励七,并通過一個“聯(lián)結(jié)函數(shù)”(JoinFunction)進行處理了智袭。
這里.window()傳入的就是窗口分配器,之前講到的三種時間窗口都可以用在這里:滾動窗口(tumbling window)掠抬、滑動窗口(sliding window)和會話窗口(session window)吼野。而后面調(diào)用.apply()可以看作實現(xiàn)了一個特殊的窗口函數(shù)。注意這里只能調(diào)用.apply()两波,沒有其他替代的方法瞳步。
傳入的 JoinFunction 也是一個函數(shù)類接口,使用時需要實現(xiàn)內(nèi)部的.join()方法腰奋。這個方法有兩個參數(shù)单起,分別表示兩條流中成對匹配的數(shù)據(jù)。JoinFunction 在源碼中的定義如下:
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT join(IN1 first, IN2 second) throws Exception;
}
這里需要注意劣坊,JoinFunciton 并不是真正的“窗口函數(shù)”嘀倒,它只是定義了窗口函數(shù)在調(diào)用時對匹配數(shù)據(jù)的具體處理邏輯。
當然局冰,既然是窗口計算测蘑,在.window()和.apply()之間也可以調(diào)用可選 API 去做一些自定義, 比如用.trigger()定義觸發(fā)器康二,用.allowedLateness()定義允許延遲時間碳胳,等等。
3.1.2 窗口聯(lián)結(jié)的處理流程
JoinFunction 中的兩個參數(shù)沫勿,分別代表了兩條流中的匹配的數(shù)據(jù)挨约。
兩條流的數(shù)據(jù)到來之后味混,首先會按照 key 分組、進入對應(yīng)的窗口中存儲诫惭;當?shù)竭_窗口結(jié)束時間時惜傲,算子會先統(tǒng)計出窗口內(nèi)兩條流的數(shù)據(jù)的所有組合,也就是對兩條流中的數(shù)據(jù)做一個笛卡爾積(相當于表的交叉連接贝攒,cross join),然后進行遍歷时甚,把每一對匹配的數(shù)據(jù)隘弊,作為參數(shù) (first,second)傳入 JoinFunction 的.join()方法進行計算處理荒适,得到的結(jié)果直接輸出如圖所示梨熙。所以窗口中每有一對數(shù)據(jù)成功聯(lián)結(jié)匹配,JoinFunction 的.join()方法就會被調(diào)用一次刀诬,并輸出一個結(jié)果咽扇。
除了 JoinFunction,在.apply()方法中還可以傳入 FlatJoinFunction陕壹,用法非常類似质欲,只是內(nèi)部需要實現(xiàn)的.join()方法沒有返回值。結(jié)果的輸出是通過收集器(Collector)來實現(xiàn)的糠馆,所以對于一對匹配數(shù)據(jù)可以輸出任意條結(jié)果嘶伟。
其實仔細觀察可以發(fā)現(xiàn),窗口 join 的調(diào)用語法和我們熟悉的 SQL 中表的 join 非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
這句 SQL 中 where 子句的表達又碌,等價于 inner join … on,所以本身表示的是兩張表基于 id 的“內(nèi)連接”(inner join)九昧。而 Flink 中的 window join,同樣類似于 inner join毕匀。也就是說铸鹰,最后處理輸出的,只有兩條流中數(shù)據(jù)按 key 配對成功的那些皂岔;如果某個窗口中一條流的數(shù)據(jù)沒有任何另一條流的數(shù)據(jù)匹配蹋笼,那么就不會調(diào)用 JoinFunction 的.join()方法,也就沒有任何輸出了凤薛。
3.1.3 窗口聯(lián)結(jié)實例
在電商網(wǎng)站中姓建,往往需要統(tǒng)計用戶不同行為之間的轉(zhuǎn)化,這就需要對不同的行為數(shù)據(jù)流缤苫, 按照用戶 ID 進行分組后再合并速兔,以分析它們之間的關(guān)聯(lián)。如果這些是以固定時間周期(比如1 小時)來統(tǒng)計的活玲,那我們就可以使用窗口 join 來實現(xiàn)這樣的需求涣狗。
下面是一段示例代碼:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: huangyibo
* @Date: 2022/7/31 16:52
* @Description: 基于窗口的join
*/
public class WindowJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env
.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.join(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
return left + "=>" + right;
}
})
.print();
env.execute();
}
}
輸出結(jié)果是:
(a,1000)=>(a,3000)
(a,1000)=>(a,4000)
(a,2000)=>(a,3000)
(a,2000)=>(a,4000)
(b,1000)=>(b,3000)
(b,1000)=>(b,4000)
(b,2000)=>(b,3000)
(b,2000)=>(b,4000)
可以看到谍婉,窗口的聯(lián)結(jié)是笛卡爾積。
3.2 間隔聯(lián)結(jié)(Interval Join)
在有些場景下镀钓,我們要處理的時間間隔可能并不是固定的穗熬。比如,在交易系統(tǒng)中丁溅,需要實時地對每一筆交易進行核驗唤蔗,保證兩個賬戶轉(zhuǎn)入轉(zhuǎn)出數(shù)額相等,也就是所謂的“實時對賬”窟赏。兩次轉(zhuǎn)賬的數(shù)據(jù)可能寫入了不同的日志流妓柜,它們的時間戳應(yīng)該相差不大,所以我們可以考慮只統(tǒng)計一段時間內(nèi)是否有出賬入賬的數(shù)據(jù)匹配涯穷。這時顯然不應(yīng)該用滾動窗口或滑動窗口來處理棍掐,因為匹配的兩個數(shù)據(jù)有可能剛好“卡在”窗口邊緣兩側(cè),于是窗口內(nèi)就都沒有匹配了拷况;會話窗口雖然時間不固定作煌,但也明顯不適合這個場景。 基于時間的窗口聯(lián)結(jié)已經(jīng)無能為力了赚瘦。
為了應(yīng)對這樣的需求粟誓,F(xiàn)link 提供了一種叫作“間隔聯(lián)結(jié)”(interval join)的合流操作。顧名思義起意,間隔聯(lián)結(jié)的思路就是針對一條流的每個數(shù)據(jù)努酸,開辟出其時間戳前后的一段時間間隔, 看這期間是否有來自另一條流的數(shù)據(jù)匹配杜恰。
3.2.1 間隔聯(lián)結(jié)的原理
間隔聯(lián)結(jié)具體的定義方式是获诈,我們給定兩個時間點,分別叫作間隔的“上界”(upperBound)和“下界”(lowerBound)心褐;于是對于一條流(不妨叫作 A)中的任意一個數(shù)據(jù)元素 a舔涎,就可以開辟一段時間間隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的時間戳為中心,下至下界點逗爹、上至上界點的一個閉區(qū)間:我們就把這段時間作為可以匹配另一條流數(shù)據(jù)的“窗口”范圍亡嫌。所以對于另一條流(不妨叫B)中的數(shù)據(jù)元素 b,如果它的時間戳落在了這個區(qū)間范圍內(nèi)掘而,a 和b 就可以成功配對挟冠,進而進行計算輸出結(jié)果。所以匹配的條件為:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
這里需要注意袍睡,做間隔聯(lián)結(jié)的兩條流 A 和 B知染,也必須基于相同的 key;下界 lowerBound應(yīng)該小于等于上界upperBound斑胜,兩者都可正可負控淡;間隔聯(lián)結(jié)目前只支持事件時間語義嫌吠。
下方的流 A 去間隔聯(lián)結(jié)上方的流 B,所以基于 A 的每個數(shù)據(jù)元素掺炭,都可以開辟一個間隔區(qū)間辫诅。我們這里設(shè)置下界為-2 毫秒,上界為 1 毫秒涧狮。于是對于時間戳為 2 的 A 中元素炕矮,它的可匹配區(qū)間就是[0, 3],流 B 中有時間戳為 0、1 的兩個元素落在這個范圍內(nèi)者冤,所以就可以得到匹配數(shù)據(jù)對(2, 0)和(2, 1)吧享。同樣地,A 中時間戳為 3 的元素譬嚣,可匹配區(qū)間為[1, 4],B 中只有時間戳為 1 的一個數(shù)據(jù)可以匹配钞它,于是得到匹配數(shù)據(jù)對(3, 1)拜银。
所以我們可以看到,間隔聯(lián)結(jié)同樣是一種內(nèi)連接(inner join)遭垛。與窗口聯(lián)結(jié)不同的是尼桶,interval join 做匹配的時間段是基于流中數(shù)據(jù)的,所以并不確定锯仪;而且流 B 中的數(shù)據(jù)可以不只在一個區(qū)間內(nèi)被匹配泵督。
3.2.2 間隔聯(lián)結(jié)的調(diào)用
間隔聯(lián)結(jié)在代碼中,是基于 KeyedStream 的聯(lián)結(jié)(join)操作庶喜。DataStream 在keyBy 得到KeyedStream 之后小腊,可以調(diào)用.intervalJoin()來合并兩條流,傳入的參數(shù)同樣是一個 KeyedStream久窟, 兩者的 key 類型應(yīng)該一致秩冈;得到的是一個 IntervalJoin 類型。后續(xù)的操作同樣是完全固定的: 先通過.between()方法指定間隔的上下界斥扛,再調(diào)用.process()方法入问,定義對匹配數(shù)據(jù)對的處理操作。調(diào)用.process()需要傳入一個處理函數(shù)稀颁,這是處理函數(shù)家族的最后一員:“處理聯(lián)結(jié)函數(shù)”ProcessJoinFunction芬失。
通用調(diào)用形式如下:
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
可以看到,抽象類 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的結(jié)合匾灶,內(nèi)部同樣有一個抽象方法.processElement()棱烂。與其他處理函數(shù)不同的是,它多了一個參數(shù)阶女,這自然是因為有來自兩條流的數(shù)據(jù)垢啼。參數(shù)中 left 指的就是第一條流中的數(shù)據(jù)窜锯,right 則是第二條流中與它匹配的數(shù)據(jù)。每當檢測到一組匹配芭析,就會調(diào)用這里的.processElement()方法锚扎,經(jīng)處理轉(zhuǎn)換之后輸出結(jié)果。
3.2.3 間隔聯(lián)結(jié)實例
在電商網(wǎng)站中馁启,某些用戶行為往往會有短時間內(nèi)的強關(guān)聯(lián)驾孔。我們這里舉一個例子,我們有兩條流惯疙,一條是下訂單的流翠勉,一條是瀏覽數(shù)據(jù)的流。我們可以針對同一個用戶霉颠,來做這樣一個聯(lián)結(jié)对碌。也就是使用一個用戶的下訂單的事件和這個用戶的最近十分鐘的瀏覽數(shù)據(jù)進行一個聯(lián)結(jié)查詢。
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @Author: huangyibo
* @Date: 2022/7/31 17:19
* @Description: 基于間隔的join
*/
public class IntervalJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
Tuple3.of("Mary", "order-1", 5000L),
Tuple3.of("Alice", "order-2", 5000L),
Tuple3.of("Bob", "order-3", 20000L),
Tuple3.of("Alice", "order-4", 20000L),
Tuple3.of("Cary", "order-5", 51000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "./prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 36000L),
new Event("Bob", "./home", 30000L),
new Event("Bob", "./prod?id=1", 23000L),
new Event("Bob", "./prod?id=3", 33000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
orderStream.keyBy(data -> data.f0)
.intervalJoin(clickStream.keyBy(data -> data.getUser()))
.between(Time.seconds(-5), Time.seconds(10))
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
@Override
public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
out.collect(right + " => " + left);
}
})
.print();
env.execute();
}
}
3.3. 窗口同組聯(lián)結(jié)(Window CoGroup)
除窗口聯(lián)結(jié)和間隔聯(lián)結(jié)之外蒿偎,F(xiàn)link 還提供了一個“窗口同組聯(lián)結(jié)”(window coGroup)操作朽们。它的用法跟window join 非常類似,也是將兩條流合并之后開窗處理匹配的元素诉位,調(diào)用時只需要將.join()換為.coGroup()就可以了骑脱。
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
與 window join 的區(qū)別在于, 調(diào)用.apply() 方法定義具體操作時苍糠, 傳入的是CoGroupFunction的實例叁丧。這也是一個函數(shù)類接口,源碼中定義如下:
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable { void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)throws Exception;
內(nèi)部的.coGroup()方法岳瞭,有些類似于 FlatJoinFunction 中.join()的形式拥娄,同樣有三個參數(shù), 分別代表兩條流中的數(shù)據(jù)以及用于輸出的收集器(Collector)瞳筏。不同的是条舔,這里的前兩個參數(shù)不再是單獨的每一組“配對”數(shù)據(jù)了,而是傳入了可遍歷的數(shù)據(jù)集合乏矾。也就是說孟抗,現(xiàn)在不會再去計算窗口中兩條流數(shù)據(jù)集的笛卡爾積,而是直接把收集到的所有數(shù)據(jù)一次性傳入钻心,至于要怎樣配對完全是自定義的凄硼。這樣.coGroup()方法只會被調(diào)用一次,而且即使一條流的數(shù)據(jù)沒有任何另一條流的數(shù)據(jù)匹配捷沸,也可以出現(xiàn)在集合中摊沉、當然也可以定義輸出結(jié)果了。
所以能夠看出痒给,coGroup 操作比窗口的 join 更加通用说墨,不僅可以實現(xiàn)類似 SQL 中的“內(nèi)連接”(inner join)超升,也可以實現(xiàn)左外連接(left outer join)昌阿、右外連接(right outer join)和全外連接(full outer join)。事實上,窗口 join 的底層病苗,也是通過 coGroup 來實現(xiàn)的儡率。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @Author: huangyibo
* @Date: 2022/7/31 17:23
* @Description: 基于窗口的join
*/
public class CoGroupTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env
.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.coGroup(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
collector.collect(iter1 + "=>" + iter2);
}
})
.print();
env.execute();
}
}
輸出:
[(a,1000), (a,2000)]=>[(a,3000), (a,4000)]
[(b,1000), (b,2000)]=>[(b,3000), (b,4000)]
參考:
https://blog.csdn.net/mengxianglong123/article/details/123896237
https://blog.csdn.net/weixin_43495317/article/details/125744516
https://blog.csdn.net/weixin_43788859/article/details/125609885