Flink 使用之?dāng)?shù)據(jù)源

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

Flink內(nèi)置數(shù)據(jù)源

Text file

讀取磁盤或者HDFS中的文件作為數(shù)據(jù)源彪薛。
唯一的參數(shù)file path可以指定:

  • file:///path/to/file.txt
  • hdfs:///path/to/file.txt

注意:

  1. 如果不填寫前綴file://或者hdfs://,默認(rèn)為file://
  2. 使用Flink讀取HDFS文件系統(tǒng)傀缩,需要去官網(wǎng)下載對應(yīng)Pre-bundled Hadoop包案铺。這里給出的鏈接是適用于Hadoop 2.8.3折晦。之后將這個jar復(fù)制到flink安裝位置的lib目錄中货抄。
val stream = env.readTextFile("/path/to/file.txt")

socketTextStream

使用socket作為數(shù)據(jù)源。但不推薦socket在生產(chǎn)環(huán)境中作為數(shù)據(jù)源蒲跨。原因如下:

  • socket無狀態(tài)译断,也不能replay。故無法保證數(shù)據(jù)精準(zhǔn)投送或悲。
  • socket數(shù)據(jù)源并行度只能是1孙咪,無法很好利用并發(fā)處理性能。

SocketTextStream適合用于debug或者是測試用途巡语。

val stream = env.socketTextStream("localhost", 9000)

fromElements

將一系列元素作為數(shù)據(jù)源翎蹈。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3);

fromCollection

和fromElements方法類似,不同的是該方法接收一個集合對象男公,而不是可變參數(shù)荤堪。如下所示:

val stream = env.fromCollection(Array(1, 2, 3))

Kafka 數(shù)據(jù)源

該數(shù)據(jù)源用于接收Kafka的數(shù)據(jù)。
使用Kafka數(shù)據(jù)源之前需要先確定Kafka的版本枢赔,引入對應(yīng)的Kafka Connector以來澄阳。對應(yīng)關(guān)系如下所示。

Kafka 版本 Maven 依賴
0.8.x flink-connector-kafka-0.8_2.11
0.9.x flink-connector-kafka-0.9_2.11
0.10.x flink-connector-kafka-0.10_2.11
0.11.x flink-connector-kafka-0.11_2.11
1.0 以上 flink-connector-kafka_2.11

引入Maven依賴踏拜。以flink-connector-kafka_2.11為例碎赢,添加以下依賴到pom.xml文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

在集群中運行時,為了減少提交jar包的大小速梗,需要將該依賴設(shè)置為provided肮塞。然后把此依賴包復(fù)制到Flink各個節(jié)點安裝位置的lib目錄中襟齿。

一個簡單的使用例子如下:

// 設(shè)置Kafka屬性
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.100.128:9092")
properties.setProperty("group.id", "test")

// 創(chuàng)建Kafka數(shù)據(jù)源,其中test為topic名稱
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)

DeserializationSchema

DeserializationSchema用于將接收到的二進制數(shù)據(jù)轉(zhuǎn)換為Java或Scala對象枕赵。Kafka Connector提供了如下4種DeserializationSchema:

  • TypeInformationSerializationSchema:使用Flink的TypeInformation反序列化蕊唐。如果上游數(shù)據(jù)也是通過Flink TypeInformation序列化后寫入的,這里使用此schema最為合適烁设。
  • JsonDeserializationSchema :將獲取的數(shù)據(jù)轉(zhuǎn)換為JSON格式替梨。這里有一個坑,如果發(fā)送過來的數(shù)據(jù)不是合法的JSON格式装黑,數(shù)據(jù)源會拋出異常導(dǎo)致TaskManager重啟副瀑。如果需要對不合法的JSON數(shù)據(jù)容錯,需要實現(xiàn)自定義的DeserializationSchema恋谭。
  • AvroDeserializationSchema:讀取Avro格式的數(shù)據(jù)糠睡。
  • SimpleStringSchema:轉(zhuǎn)換接收到的數(shù)據(jù)為字符串。

自定義DeserializationSchema

所有的Schema需要實現(xiàn)DeserializationSchema疚颊。該接口源碼如下所示:

@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

    /**
     * Deserializes the byte message.
     *
     * @param message The message, as a byte array.
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    T deserialize(byte[] message) throws IOException;

    /**
     * Method to decide whether the element signals the end of the stream. If
     * true is returned the element won't be emitted.
     *
     * @param nextElement The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
     */
    boolean isEndOfStream(T nextElement);
}

方法解釋:

  • deserialize:將二進制消息轉(zhuǎn)換為某類型消息狈孔。
  • isEndOfStream:表示是否是最后一條數(shù)據(jù)。

以SimpleStringSchema為例展示下怎么編寫自定義的DeserializationSchema材义。
相關(guān)代碼如下:

@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    // SerializationSchema接口的方法省略
    @Override
    public String deserialize(byte[] message) {
        return new String(message, charset);
    }

    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }
    // ...
}

起始位置屬性配置

使用示例:

myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

方法解釋:

  • setStartFromEarliest:從最早兒元素開始消費
  • setStartFromLatest:從最近的元素開始消費
  • setStartFromTimestamp:從指定時間戳的數(shù)據(jù)開始消費
  • setStartFromGroupOffsets:這是默認(rèn)的配置均抽。從消費組的offset開始消費。必須配置group.id配置項其掂。

Topic和分區(qū)感知

Topic感知

可以使用如下構(gòu)造函數(shù)創(chuàng)建FlinkKafkaConsumer:

FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) 

和指定topic名稱不同的是油挥,這里傳入的是一個正則表達式。所有名稱匹配該正則表達式的topic都會被訂閱款熬。如果配置了分區(qū)感知(配置flink.partition-discovery.interval-millis為非負(fù)數(shù))深寥,Job啟動之后kafka新創(chuàng)建的topic如果匹配該正則,也會被訂閱到贤牛。

分區(qū)感知

在Job運行過程中如果kafka新創(chuàng)建了partition惋鹅,F(xiàn)link可以動態(tài)感知到,然后對其中數(shù)據(jù)進行消費殉簸。整個過程仍然可以保證exactly once語義闰集。

默認(rèn)情況分區(qū)感知是禁用的。如果要開啟分區(qū)感知喂链,可以設(shè)置flink.partition-discovery.interval-millis返十,即分區(qū)感知觸發(fā)時間間隔。

實現(xiàn)自定義數(shù)據(jù)源

自定義數(shù)據(jù)源需要實現(xiàn)Flink提供的SourceFunction接口椭微。

SourceFunction接口的定義如下:

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
}

run方法

run方法為數(shù)據(jù)源向下游發(fā)送數(shù)據(jù)的主要邏輯洞坑。編寫套路為:

  • 不斷調(diào)用循環(huán)發(fā)送數(shù)據(jù)。
  • 使用一個狀態(tài)變量控制循環(huán)的執(zhí)行蝇率。當(dāng)cancel方法執(zhí)行后必須能夠跳出循環(huán)迟杂,停止發(fā)送數(shù)據(jù)刽沾。
  • 使用SourceContext的collect等方法將元素發(fā)送至下游。
  • 如果使用Checkpoint排拷,在SourceContext collect數(shù)據(jù)的時候必須加鎖侧漓。防止checkpoint操作和發(fā)送數(shù)據(jù)操作同時進行。

cancel方法:

cancel方法在數(shù)據(jù)源停止的時候調(diào)用监氢。cancel方法必須能夠控制run方法中的循環(huán)布蔗,停止循環(huán)的運行。并做一些狀態(tài)清理操作浪腐。

SourceContext類

SourceContext在SourceFunction中使用纵揍,用于向下游發(fā)送數(shù)據(jù),或者是發(fā)送watermark议街。
SourceContext的方法包括:

  • collect:向下游發(fā)送數(shù)據(jù)泽谨。有如下三種情況:
    • 如果使用ProcessingTime,該元素不攜帶timestamp特漩。
    • 如果使用IngestionTime吧雹,元素使用系統(tǒng)當(dāng)前時間作為timestamp。
    • 如果使用EventTime涂身,元素不攜帶timestamp雄卷。需要在數(shù)據(jù)流后續(xù)為元素指定timestamp(assignTimestampAndWatermark)。
  • collectWithTimestamp:向下游發(fā)送帶有timestamp的數(shù)據(jù)访得。和collect方法一樣也有如下三種情況:
    • 如果使用ProcessingTime龙亲,timestamp會被忽略
    • 如果使用IngestionTime陕凹,使用系統(tǒng)時間覆蓋timestamp
    • 如果使用EventTime悍抑,使用指定的timestamp
  • emitWatermark:向下游發(fā)送watermark。watermark也包含一個timestamp杜耙。向下游發(fā)送watermark意味著所有在watermark的timestamp之前的數(shù)據(jù)已經(jīng)到齊搜骡。如果在watermark之后,收到了timestamp比該watermark的timestamp小的元素佑女,該元素會被認(rèn)為遲到记靡,將會被系統(tǒng)忽略,或者進入到旁路輸出(side output)团驱。
  • markAsTemporarilyIdle:標(biāo)記此數(shù)據(jù)源暫時閑置摸吠。該數(shù)據(jù)源暫時不會發(fā)送任何數(shù)據(jù)和watermark。僅對IngestionTime和EventTime生效嚎花。下游任務(wù)前移watermark的時候?qū)⒉粫俚却粯?biāo)記為閑置的數(shù)據(jù)源的watermark寸痢。

CheckpointedFunction

如果數(shù)據(jù)源需要保存狀態(tài),那么就需要實現(xiàn)CheckpointedFunction中的相關(guān)方法紊选。
CheckpointedFunction包含如下方法:

  • snapshotState:保存checkpoint的時候調(diào)用啼止。需要在此方法中編寫狀態(tài)保存邏輯
  • initializeState:在數(shù)據(jù)源創(chuàng)建或者是從checkpoint恢復(fù)的時候調(diào)用道逗。此方法包含數(shù)據(jù)源的狀態(tài)恢復(fù)邏輯。

樣例

Flink官方給出的樣板Source献烦。這個數(shù)據(jù)源會發(fā)送0-999到下游系統(tǒng)滓窍。代碼如下所示:

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
    private long count = 0L;
    // 使用一個volatile類型變量控制run方法內(nèi)循環(huán)的運行
    private volatile boolean isRunning = true;

    // 保存數(shù)據(jù)源狀態(tài)的變量
    private transient ListState<Long> checkpointedCount;

    public void run(SourceContext<T> ctx) {
        while (isRunning && count < 1000) {
            // this synchronized block ensures that state checkpointing,
            // internal state updates and emission of elements are an atomic operation
            // 此處必須要加鎖,防止在checkpoint過程中巩那,仍然發(fā)送數(shù)據(jù)
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }

    public void cancel() {
        // 設(shè)置isRunning為false吏夯,終止run方法內(nèi)循環(huán)的運行
        isRunning = false;
    }

    public void initializeState(FunctionInitializationContext context) {
        // 獲取存儲狀態(tài)
        this.checkpointedCount = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>("count", Long.class));

        // 如果數(shù)據(jù)源是從失敗中恢復(fù),則讀取count的值即横,恢復(fù)數(shù)據(jù)源count狀態(tài)
        if (context.isRestored()) {
            for (Long count : this.checkpointedCount.get()) {
                this.count = count;
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext context) {
        // 保存數(shù)據(jù)到狀態(tài)變量
        this.checkpointedCount.clear();
        this.checkpointedCount.add(count);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末锦亦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子令境,更是在濱河造成了極大的恐慌杠园,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舔庶,死亡現(xiàn)場離奇詭異抛蚁,居然都是意外死亡,警方通過查閱死者的電腦和手機惕橙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門瞧甩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弥鹦,你說我怎么就攤上這事肚逸。” “怎么了彬坏?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵朦促,是天一觀的道長。 經(jīng)常有香客問我栓始,道長务冕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任幻赚,我火速辦了婚禮禀忆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘落恼。我一直安慰自己箩退,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布佳谦。 她就那樣靜靜地躺著戴涝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上喊括,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天胧瓜,我揣著相機與錄音,去河邊找鬼郑什。 笑死府喳,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蘑拯。 我是一名探鬼主播钝满,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼申窘!你這毒婦竟也來了弯蚜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤剃法,失蹤者是張志新(化名)和其女友劉穎碎捺,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贷洲,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡收厨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了优构。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诵叁。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钦椭,靈堂內(nèi)的尸體忽然破棺而出拧额,到底是詐尸還是另有隱情,我是刑警寧澤彪腔,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布侥锦,位于F島的核電站,受9級特大地震影響漫仆,放射性物質(zhì)發(fā)生泄漏捎拯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一盲厌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧祸泪,春花似錦吗浩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春阀湿,著一層夾襖步出監(jiān)牢的瞬間赶熟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工陷嘴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留映砖,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓灾挨,卻偏偏與公主長得像邑退,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子劳澄,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 本章節(jié)是關(guān)于在event time上執(zhí)行的程序的地技。想獲取更多關(guān)于event time,processing tim...
    寫B(tài)ug的張小天閱讀 23,783評論 0 12
  • 本章節(jié)是關(guān)于在event time上執(zhí)行的程序秒拔。有關(guān)event time, processing time, an...
    尼小摩閱讀 9,126評論 0 9
  • 基于flink-1.8.1 基于flink官網(wǎng) 概述 實時計算中莫矗,數(shù)據(jù)時間比較敏感。有eventTime和proc...
    李小李的路閱讀 2,364評論 0 3
  • 我在UITableView(六)數(shù)據(jù)更新(增砂缩、刪趣苏、改)中新增方法的數(shù)據(jù)寫的是固定數(shù)據(jù),不能自己添加梯轻。所以在這篇會用...
    飛翔的道長閱讀 469評論 0 7
  • 為什么對待父母平和如此重要對待父母平和不是外在的一種表現(xiàn)食磕,而是內(nèi)心的一種狀態(tài)。如果內(nèi)心是不接納的喳挑,不管外在表現(xiàn)的多...
    我是一凡閱讀 284評論 0 1