Flink一個最常見的use case就是做ETL。
1. Stateless Transformation
無狀態(tài)的轉(zhuǎn)換最基礎(chǔ)的操作就是map和flatMap.
map操作執(zhí)行的是一對一的轉(zhuǎn)換蜕企,即對于每個stream中的元素都會輸出一個轉(zhuǎn)換后的元素趾疚。
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}
而flatMap通過一個Collector接收輸出痕惋,所以輸出的元素數(shù)量可以與輸入的不一致。
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
2. Keyed Streams
- KeyBy()
根據(jù)元素的某個屬性進(jìn)行分區(qū),就像group by一樣,通常這會導(dǎo)致昂貴的網(wǎng)絡(luò)交換插佛,序列化以及反序列化 - Keys are computed
也可以將多個屬性的計算結(jié)果作為key, 但為了在需要的時候重新計算key要保證每次計算的結(jié)果都是相同的
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));
- Aggregations on Keyed Streams
import org.joda.time.Interval;
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();
先將stream按照startCell分組婶熬,再對每組partition做聚合運(yùn)算剑勾。上例會實(shí)時更新每個startCell的max duration
- Implicit State
在上例中程序維護(hù)了一個implicit的state, 即每個key的max duration.在這個例子中state很簡單,但在實(shí)際生產(chǎn)中赵颅,我們最好一個時間窗口內(nèi)保存state虽另,而非在整個stream中。以避免state過大性含。
3. Stateful Transformations
Rich Functions
rich functions, 如RichFlatMapFunction洲赵,包含了額外的方法,如:
open(Configuration c): 只在operator初始化時調(diào)用一次,可以用來加載靜態(tài)數(shù)據(jù)叠萍,或建立與外部服務(wù)的連接
close():
getRuntimeContext(): 可以創(chuàng)建或獲取由Flink管理的stateKeyed State的例子
private static class Event {
public final String key;
public final long timestamp;
...
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();
env.execute();
}
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
Flink支持多種類型的keyed state, 本例中使用的是最簡單的valueState. 對于每一個key, flink會維護(hù)一個對象芝发。程序剛啟動時,調(diào)用open()方法苛谷,還沒有event, 也就沒有key. 后面event出現(xiàn)調(diào)用flatMap時辅鲸,可以獲取到key,就可以用于在flink的state后端做判斷腹殿。
部署到分布式集群上時独悴,會有很多個Deduplicator 實(shí)例,每一個都只對整個keyspace上互不相關(guān)的state負(fù)責(zé)锣尉,因此當(dāng)你看見一個valueState時刻炒,要明白這不止代表一個Boolean對象,而是一個分布式的共享的key-value store.
- Clearing State
如果例子中的key是無界的自沧,我們就需要手動清理state, 這通過clear()方法實(shí)現(xiàn)
keyHasBeenSeen.clear();
你可以指定一個Timer來執(zhí)行這個操作坟奥,或者指定valueState的Time-To-Live參數(shù)
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
- Non-keyed State
有時候我們也會需要管理non-keyed的state, 這通常稱作operator state
4. Connected Stream
一個operator可以有兩個及以上的source, 其中一個是data, 另一個可以是rules, thresholds或者其他參數(shù)等。也可以用作Streaming joins.
要注意的是兩個連接在一起的stream必須要有兼容的key.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
對element執(zhí)行flatMap1還是flatMap2是根據(jù)兩個stream connect的順序決定的拇厢。本例中control.connect(streamOfWords) 所以connect走flatMap1爱谁,dataStream走flatMap2. 但是你是沒有辦法控制flatMap1和flatMap2執(zhí)行的順序的,因?yàn)閮蓚€stream是競爭的關(guān)系孝偎,完全由Flink運(yùn)行時決定的访敌。所以如果順序或者執(zhí)行時間很重要的情境下,最好先將events緩存在flink state中衣盾,或者通過InputSelectable 接口指定執(zhí)行的順序寺旺。