Data Pipelines & ETL

Flink一個最常見的use case就是做ETL。

1. Stateless Transformation

無狀態(tài)的轉(zhuǎn)換最基礎(chǔ)的操作就是map和flatMap.
map操作執(zhí)行的是一對一的轉(zhuǎn)換蜕企,即對于每個stream中的元素都會輸出一個轉(zhuǎn)換后的元素趾疚。

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

而flatMap通過一個Collector接收輸出痕惋,所以輸出的元素數(shù)量可以與輸入的不一致。

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

2. Keyed Streams

  • KeyBy()
    根據(jù)元素的某個屬性進(jìn)行分區(qū),就像group by一樣,通常這會導(dǎo)致昂貴的網(wǎng)絡(luò)交換插佛,序列化以及反序列化
  • Keys are computed
    也可以將多個屬性的計算結(jié)果作為key, 但為了在需要的時候重新計算key要保證每次計算的結(jié)果都是相同的
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));
  • Aggregations on Keyed Streams
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();

先將stream按照startCell分組婶熬,再對每組partition做聚合運(yùn)算剑勾。上例會實(shí)時更新每個startCell的max duration

  • Implicit State
    在上例中程序維護(hù)了一個implicit的state, 即每個key的max duration.在這個例子中state很簡單,但在實(shí)際生產(chǎn)中赵颅,我們最好一個時間窗口內(nèi)保存state虽另,而非在整個stream中。以避免state過大性含。

3. Stateful Transformations

  • Rich Functions
    rich functions, 如RichFlatMapFunction洲赵,包含了額外的方法,如:
    open(Configuration c): 只在operator初始化時調(diào)用一次,可以用來加載靜態(tài)數(shù)據(jù)叠萍,或建立與外部服務(wù)的連接
    close():
    getRuntimeContext(): 可以創(chuàng)建或獲取由Flink管理的state

  • Keyed State的例子

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

Flink支持多種類型的keyed state, 本例中使用的是最簡單的valueState. 對于每一個key, flink會維護(hù)一個對象芝发。程序剛啟動時,調(diào)用open()方法苛谷,還沒有event, 也就沒有key. 后面event出現(xiàn)調(diào)用flatMap時辅鲸,可以獲取到key,就可以用于在flink的state后端做判斷腹殿。
部署到分布式集群上時独悴,會有很多個Deduplicator 實(shí)例,每一個都只對整個keyspace上互不相關(guān)的state負(fù)責(zé)锣尉,因此當(dāng)你看見一個valueState時刻炒,要明白這不止代表一個Boolean對象,而是一個分布式的共享的key-value store.

  • Clearing State
    如果例子中的key是無界的自沧,我們就需要手動清理state, 這通過clear()方法實(shí)現(xiàn)
keyHasBeenSeen.clear();

你可以指定一個Timer來執(zhí)行這個操作坟奥,或者指定valueState的Time-To-Live參數(shù)

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • Non-keyed State
    有時候我們也會需要管理non-keyed的state, 這通常稱作operator state

4. Connected Stream

一個operator可以有兩個及以上的source, 其中一個是data, 另一個可以是rules, thresholds或者其他參數(shù)等。也可以用作Streaming joins.



要注意的是兩個連接在一起的stream必須要有兼容的key.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

對element執(zhí)行flatMap1還是flatMap2是根據(jù)兩個stream connect的順序決定的拇厢。本例中control.connect(streamOfWords) 所以connect走flatMap1爱谁,dataStream走flatMap2. 但是你是沒有辦法控制flatMap1和flatMap2執(zhí)行的順序的,因?yàn)閮蓚€stream是競爭的關(guān)系孝偎,完全由Flink運(yùn)行時決定的访敌。所以如果順序或者執(zhí)行時間很重要的情境下,最好先將events緩存在flink state中衣盾,或者通過InputSelectable 接口指定執(zhí)行的順序寺旺。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市雨效,隨后出現(xiàn)的幾起案子迅涮,更是在濱河造成了極大的恐慌,老刑警劉巖徽龟,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叮姑,死亡現(xiàn)場離奇詭異,居然都是意外死亡据悔,警方通過查閱死者的電腦和手機(jī)传透,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來极颓,“玉大人朱盐,你說我怎么就攤上這事〔ぢ。” “怎么了兵琳?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵狂秘,是天一觀的道長。 經(jīng)常有香客問我躯肌,道長者春,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任清女,我火速辦了婚禮钱烟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嫡丙。我一直安慰自己拴袭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布曙博。 她就那樣靜靜地躺著拥刻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪父泳。 梳的紋絲不亂的頭發(fā)上泰佳,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機(jī)與錄音尘吗,去河邊找鬼。 笑死浇坐,一個胖子當(dāng)著我的面吹牛睬捶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播近刘,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼擒贸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了觉渴?” 一聲冷哼從身側(cè)響起介劫,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎案淋,沒想到半個月后座韵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡踢京,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年誉碴,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓣距。...
    茶點(diǎn)故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡黔帕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蹈丸,到底是詐尸還是另有隱情成黄,我是刑警寧澤呐芥,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站奋岁,受9級特大地震影響思瘟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜厦取,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一潮太、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧虾攻,春花似錦铡买、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至漂坏,卻和暖如春景埃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背顶别。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工谷徙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驯绎。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓完慧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親剩失。 傳聞我的和親對象是個殘疾皇子屈尼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容