Flink-Streaming-overview

Flink中的流應(yīng)用就是在數(shù)據(jù)流上應(yīng)用各種轉(zhuǎn)化(如:filter,update state,difine window,aggregation)苔巨。數(shù)據(jù)流有各種數(shù)據(jù)源創(chuàng)建而來(lái)(如:消息隊(duì)列,socket流掐禁,文件等)。結(jié)果輸出到sink它改,如寫入文件或者標(biāo)準(zhǔn)輸出。Flink程序可以在多種上下文中運(yùn)行,standalone疆拘,內(nèi)置在其他應(yīng)用中等。應(yīng)用可以在本地JVM中執(zhí)行寂曹,也可以在集群的許多機(jī)器中執(zhí)行哎迄。

示例程序


下面的程序是一個(gè)完成的應(yīng)用,它演示了如何在web soscke上使用window統(tǒng)計(jì)5秒內(nèi)的字?jǐn)?shù)隆圆。你可以復(fù)制代碼然后在你本地運(yùn)行漱挚。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

運(yùn)行應(yīng)用前,先使用 netcat 在命令行中開(kāi)啟輸入流:

nc -lk 9999

輸入一些單詞就會(huì)返回新的結(jié)果渺氧。這些單詞或作為字?jǐn)?shù)統(tǒng)計(jì)應(yīng)用的輸入旨涝。如果你想看到統(tǒng)計(jì)值大于1,可以在5秒內(nèi)一遍又一遍的輸入相同的單詞(如果你做不到侣背,可以增加window的大邪谆)

數(shù)據(jù)源 Data Source


Source指的是你的程序從哪里讀取它的輸入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加數(shù)據(jù)源秃踩。Flink自帶了一些實(shí)現(xiàn)好的數(shù)據(jù)源函數(shù)衬鱼,淡然你可以實(shí)現(xiàn) SourceFunction 來(lái)實(shí)現(xiàn)自定義的非并行的source或者實(shí)現(xiàn) ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 來(lái)實(shí)現(xiàn)并行的source。
StreamExecutionEnvironment有一些實(shí)現(xiàn)定義好的數(shù)據(jù)源方法:

基于文件的數(shù)據(jù)源:

  • readTextFile(path) - 讀取text文件憔杨。也就是說(shuō)使用 TextInputFormat 一行一行的讀取數(shù)據(jù)鸟赫。
  • readFile(fileInputFormat,path) - 使用給定的 input format讀取文件
  • readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) - 這個(gè)方法在flink內(nèi)部,被上面的兩個(gè)方法所調(diào)用消别。它使用給定的fileInputFomat讀取path中的文件抛蚤。根據(jù) watchType 的值,數(shù)據(jù)源會(huì)定期(interval 毫秒)監(jiān)控path中的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY)寻狂,或者僅對(duì)當(dāng)前path下的文件進(jìn)行一次處理岁经,然后退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter 蛇券,用戶可以排除不想要處理的文件缀壤。
    實(shí)現(xiàn):
    在內(nèi)部,F(xiàn)link將讀取文件分為兩個(gè)子任務(wù)纠亚,分別叫做 目錄監(jiān)控 與 數(shù)據(jù)讀取塘慕。每個(gè)任務(wù)都是單獨(dú)運(yùn)行的。目錄監(jiān)控是一個(gè)單線程的任務(wù)蒂胞,而數(shù)據(jù)讀取任務(wù)可以是多線程的并發(fā)任務(wù)图呢。數(shù)據(jù)讀取任務(wù)的并發(fā)度取決于job的并發(fā)度。目錄監(jiān)控的功能在于定期監(jiān)控目錄,發(fā)現(xiàn)需要被處理的文件蛤织,將它們分片 split 然后指定分片給下游的reader赴叹。reader會(huì)進(jìn)行實(shí)際讀取數(shù)據(jù)的操作。每一個(gè)split僅會(huì)被一個(gè)reader讀取指蚜,而一個(gè)reader可能會(huì)讀取多個(gè)split(依次讀绕蚯伞)。
    重要說(shuō)明:
    1.如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY姚炕,當(dāng)文件被修改后摊欠,它的內(nèi)容會(huì)被全部重新進(jìn)行處理。這會(huì)破壞“精確一次”的語(yǔ)義柱宦,因?yàn)橄蛭募凶芳訑?shù)據(jù)些椒,會(huì)導(dǎo)致整個(gè)文件進(jìn)行重新處理。
    2.如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE掸刊,source只會(huì)掃描path一次然后退出免糕,而不會(huì)等到reader讀取所有數(shù)據(jù)完畢后再退出。當(dāng)然忧侧,reader會(huì)繼續(xù)進(jìn)行數(shù)據(jù)讀取石窑,直到所有文件內(nèi)容都讀取完畢。關(guān)閉source會(huì)導(dǎo)致之后不會(huì)再有checkpoint蚓炬。這將導(dǎo)致故障恢復(fù)時(shí)松逊,需要等待更長(zhǎng)的時(shí)間,因?yàn)閖ob會(huì)從上次checkpoint處會(huì)進(jìn)行重新讀取數(shù)據(jù)肯夏。

基于socket:

  • socketTextStream - 從socket讀取數(shù)據(jù)经宏。數(shù)據(jù)可以被 分隔符delimiter 分隔開(kāi)

基于集合:

  • fromCollection(Collection) - 從java集合中創(chuàng)造數(shù)據(jù)流。所有集合中的數(shù)據(jù)必須是同樣的類型
  • fromCollection(Iterator,Class) - 從iterator中創(chuàng)造數(shù)據(jù)流驯击。class參數(shù)指定了iterator返回的數(shù)據(jù)的類型
  • fromElements(T ...) - 從給定的對(duì)象序列中創(chuàng)造數(shù)據(jù)流烁兰。所有對(duì)象必須是相同的類型
  • fromParallelCollection(SplitableIterator , Class) - 從iterator中并行的創(chuàng)造數(shù)據(jù)流。class參數(shù)指定了iterator返回的數(shù)據(jù)的類型
  • generateSequence(from,to) - 使用給定的interval并行的生成數(shù)字序列

自定義:

  • addSource - 使用source function徊都。如沪斟,從Kafka中讀取數(shù)據(jù),你可以使用 addSource(new FlinkKafkaConsumer08<>(...)).

DataStream Transformations


查閱 operator 文檔

Data Sink


Data Sink讀取數(shù)據(jù)流暇矫,并將它們寫入到file,socket,其他系統(tǒng)或者打印它們主之。Flink自帶了一些output format,它們被封裝到一些操作符中:

  • writeAsText() / TextOutputFormat - 將數(shù)據(jù)作為一整行string李根,寫入文件槽奕。通過(guò)調(diào)用數(shù)據(jù)的toString方法
  • writeAsCsv(...) / CsvOutputFormat - 將 tuple 以逗號(hào)分隔,寫入文件朱巨。行與行以及field之間的分隔符可以自定義。每一個(gè)field的值枉长,是通過(guò)調(diào)用toString方法獲取的
  • pring() / pringToErr() - 將數(shù)據(jù)的toString方法的值打印到口紅紙條冀续∏矸恚可以選擇前綴,在打印輸出內(nèi)容前先打印前綴洪唐。這能夠區(qū)分不同的print的內(nèi)容钻蹬。如果并發(fā)度大于1,輸出同樣會(huì)打印一個(gè)task的標(biāo)識(shí)符凭需。
  • writeUsingOutputFormat() / FileOutputFormat - 使用自定義文件輸出的基類與方法问欠。支持自定義的 對(duì)象-字節(jié) 的轉(zhuǎn)化。
  • writeToSocket - 根據(jù) SerializationSchema 將數(shù)據(jù)寫入socket
  • addSink - 調(diào)用傳入的自定義 sink function粒蜈。Flink通過(guò)實(shí)現(xiàn) sink function可以與其他系統(tǒng)連接起來(lái)(如kafka)

注意的是 write*() 方法主要用于調(diào)試的目的顺献。它們沒(méi)有參與flink的checkpoint過(guò)程,這就意味著使用這些函數(shù)是“at-least-once”至少一次語(yǔ)義枯怖。數(shù)據(jù)如何寫入目標(biāo)系統(tǒng)是由OutputFormat決定的注整,也就是說(shuō)發(fā)送到OutputFormat的數(shù)據(jù)并不一定會(huì)立即寫入目標(biāo)系統(tǒng)(如批量寫入情況)。因此度硝,在遇到故障時(shí)肿轨,這些數(shù)據(jù)有可能會(huì)丟失。
為了穩(wěn)定地蕊程,精確一致的將流數(shù)據(jù)寫入問(wèn)加你系統(tǒng)椒袍,建議使用 flink-connector-filesystem。當(dāng)然藻茂,如果自定義了sink function驹暑,通過(guò) addSink 添加該自定義的sink,也可以參與flink的checkpoint過(guò)程捌治,保持 exactly-once 語(yǔ)義岗钩。

Iterator


迭代流程序?qū)崿F(xiàn)了step function,并且內(nèi)置在 IterativeStream中肖油。由于DataStream程序可能不會(huì)停止兼吓,因此iteration中不會(huì)有最大數(shù)量限制。你需要定義流中的哪些數(shù)據(jù)需要繼續(xù)迭代森枪,哪些數(shù)據(jù)可以發(fā)送到下游的操作符视搏,你可以使用split或者filter實(shí)現(xiàn)县袱。下面我們使用 filter 來(lái)演示。首先式散,我們定義一個(gè) IterativeStream :

IterativeStream<Integer> iteration = input.iterate();

然后,我們定義在循環(huán)中,需要對(duì)數(shù)據(jù)流做哪些操作(下面我們就簡(jiǎn)單的使用map作為演示)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

為了定義迭代器何時(shí)關(guān)閉编饺,可以調(diào)用 IterativeStream 的 closeWith(feedbackStream) 方法。傳入 closeWith() 的數(shù)據(jù)流會(huì)再次進(jìn)入迭代器透且,放到迭代器的head。一個(gè)常用的模式是秽誊,使用filter將流的一部分重新放入迭代器,而另一部分下發(fā)到下游操作符這些filter可以定義“終止”的邏輯锅论,也就是一個(gè)數(shù)據(jù)可以不再進(jìn)入迭代器,而是被轉(zhuǎn)發(fā)到下游操作符棍厌。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,下面的程序就是對(duì)數(shù)據(jù)進(jìn)行減1操作耘纱,直到為0:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

Execution Parameter 執(zhí)行參數(shù)


StreamExecutionEnvironment 包括 ExecutionConfig 毕荐,它允許設(shè)置運(yùn)行時(shí)所需的job配置束析。
請(qǐng)參閱 execution configuration 獲取更多參數(shù)的解釋。下面的參數(shù)僅屬于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds) : 設(shè)置 watermark 發(fā)射的間隔憎亚。你可以公共 long getAutoWatermartkInterval() 獲取當(dāng)前的值员寇。

故障容忍

查閱 State & Checkpointing

控制延遲


默認(rèn)情況下,數(shù)據(jù)在網(wǎng)絡(luò)間傳輸時(shí)第美,并不是一個(gè)一個(gè)的傳輸(造成不必要的網(wǎng)絡(luò)擁堵)蝶锋,而是緩存后一起傳輸。buffer的大小可以在Flink 的配置文件中配置什往。盡管這種方式可以優(yōu)化吞吐率扳缕,但是當(dāng)輸入流的速度不夠快時(shí),會(huì)造成延遲問(wèn)題别威。為了平衡吞吐率和延遲躯舔,你可以使用 env.setBufferTimeout(timeoutMillis) 來(lái)設(shè)置最大等待時(shí)間。超過(guò)這個(gè)時(shí)間后省古,即便buffer沒(méi)有填滿粥庄,也要發(fā)出去。默認(rèn)值為100ms豺妓。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

為了最大化吞吐率惜互,設(shè)置 setBufferTimeout(-1) 會(huì)移除超時(shí)設(shè)置布讹,僅當(dāng)buffer填滿后才發(fā)送。為了最小化延遲训堆,設(shè)置超時(shí)的值接近0(如 5 或 10 毫秒)炒事。應(yīng)該避免設(shè)置值為0,因?yàn)檫@會(huì)導(dǎo)致服務(wù)性能下降蔫慧。

調(diào)試 Debugging

在提交任務(wù)到分布式集群運(yùn)行前,最好確認(rèn)程序可以按預(yù)期運(yùn)行权薯。因此姑躲,實(shí)現(xiàn)一個(gè)數(shù)據(jù)分析應(yīng)用,通常是一個(gè)增量的過(guò)程:檢查結(jié)果盟蚣,調(diào)試黍析,優(yōu)化。
Flink提供了本地IDE調(diào)試的功能屎开,簡(jiǎn)化了數(shù)據(jù)分析應(yīng)用的開(kāi)發(fā)阐枣。包括加載測(cè)試數(shù)據(jù),收集結(jié)果數(shù)據(jù)奄抽。這一部分會(huì)顯示如何簡(jiǎn)化flink程序的開(kāi)發(fā)蔼两,便于測(cè)試調(diào)試程序。

本地運(yùn)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
加載測(cè)試數(shù)據(jù)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:需要提供數(shù)據(jù)烈性逞度,iterator要實(shí)現(xiàn) Serializable额划。不能并發(fā)執(zhí)行

迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末俊戳,一起剝皮案震驚了整個(gè)濱河市抑胎,隨后出現(xiàn)的幾起案子阿逃,更是在濱河造成了極大的恐慌腔稀,老刑警劉巖焊虏,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诵闭,死亡現(xiàn)場(chǎng)離奇詭異澎嚣,居然都是意外死亡易桃,警方通過(guò)查閱死者的電腦和手機(jī)晤郑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門造寝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)诫龙,“玉大人签赃,你說(shuō)我怎么就攤上這事分尸。” “怎么了括丁?”我有些...
    開(kāi)封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵史飞,是天一觀的道長(zhǎng)构资。 經(jīng)常有香客問(wèn)我吐绵,道長(zhǎng)河绽,這世上最難降的妖魔是什么耙饰? 我笑而不...
    開(kāi)封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任苟跪,我火速辦了婚禮蔓涧,結(jié)果婚禮上元暴,老公的妹妹穿的比我還像新娘茉盏。我一直安慰自己枢冤,他們只是感情好掏导,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布趟咆。 她就那樣靜靜地躺著值纱,像睡著了一般虐唠。 火紅的嫁衣襯著肌膚如雪惰聂。 梳的紋絲不亂的頭發(fā)上搓幌,一...
    開(kāi)封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天溉愁,我揣著相機(jī)與錄音,去河邊找鬼撤蟆。 笑死家肯,一個(gè)胖子當(dāng)著我的面吹牛盟猖,可吹牛的內(nèi)容都是我干的寝贡。 我是一名探鬼主播圃泡,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼颇蜡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼辆亏!你這毒婦竟也來(lái)了扮叨?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤碍沐,失蹤者是張志新(化名)和其女友劉穎累提,沒(méi)想到半個(gè)月后磁浇,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡无虚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年骑科,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了咆爽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斗埂。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡凫海,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漾稀,到底是詐尸還是另有隱情,我是刑警寧澤尸折,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布实夹,位于F島的核電站粒梦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏缴淋。R本人自食惡果不足惜泄朴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一叼旋、第九天 我趴在偏房一處隱蔽的房頂上張望夫植。 院中可真熱鬧油讯,春花似錦、人聲如沸沈跨。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)锭亏。三九已至,卻和暖如春戴已,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背糖儡。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工休玩, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人永部。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓苔埋,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親组橄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子玉工,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容