Flink 應(yīng)用剖析

Flink 程序看起來跟普通程序沒什么區(qū)別疚鲤,都是處理數(shù)據(jù)流集歇。每個程序都有幾個相同的基礎(chǔ)部分組成:
1、申請執(zhí)行環(huán)境
2际歼、加載/創(chuàng)造原始數(shù)據(jù)
3鹅心、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程
4、聲明存儲這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址
5颅筋、觸發(fā)程序結(jié)束
接下來會對每個步驟做一個概述输枯,想要了解細(xì)節(jié)的話請參考各自的部分桃熄。
Java DataStream API 所有的核心類都可以在這里找到。

1碉京、申請執(zhí)行環(huán)境

StreamExecutionEnvironment是所有 Flink 程序的基礎(chǔ)缎讼,通過調(diào)用它的以下這幾個靜態(tài)方法可以得到一個執(zhí)行環(huán)境:

StreamExecutionEnvironment.getExecutionEnvironment()

StreamExecutionEnvironment.createLocalEnvironment()

StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)

官方例子的main方法中第一行程序:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

一般來說血崭,直接調(diào)用getExecutionEnvironment()即可,他會根據(jù)上下文自動處理咽瓷。如果你像平時一樣是在本地 IDE 執(zhí)行程序茅姜,他會創(chuàng)建一個本地環(huán)境在你自己的機(jī)器上執(zhí)行代碼月匣。如果你將代碼打包成一個jar包锄开,并且通過命令行(flink run XXX.jar)的方式提交給 Flink 集群,F(xiàn)link 會為你的程序生成一個執(zhí)行環(huán)境头遭,讓他在集群內(nèi)完成執(zhí)行。

2袜香、加載/創(chuàng)造原始數(shù)據(jù)

談到數(shù)據(jù)源的聲明鲫惶,以基于文件讀取型的場景為例欠母,F(xiàn)link 的執(zhí)行環(huán)境支持了多種方法讓程序從文件讀取:

  • 逐行讀取
  • 讀取為 CSV 文件
  • 使用其他提供的源

下面以從txt文件逐行讀取為例猬腰,講解代碼實現(xiàn):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");

首先姑荷,依然是申請執(zhí)行環(huán)境缩擂,之后調(diào)用該執(zhí)行環(huán)境對象的env.readTextFile方法即可得到一個數(shù)據(jù)流。
除了readTestFile()方法懈费,F(xiàn)link 還支持別的方式憎乙,同時也支持其他場景的數(shù)據(jù)源加載叉趣,見官方文檔疗杉,這里僅做列表阵谚,不詳細(xì)解釋:

2.1 文件讀取型數(shù)據(jù)源

  • readTextFile(path):從 text 文件中逐行讀取,讀取進(jìn)來后的事件是 string 型的
  • readFile(fileInputFormat, path):讀取fileInputFormat類型的文檔
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):前面兩個方法的內(nèi)部調(diào)用實現(xiàn)

2.2 Socket 型數(shù)據(jù)源

  • socketTextStream:從 socket 中讀取烟具,各元素以分隔符分開

2.3 集合型

  • fromCollection(Collection):從Java.util.Collection中讀取數(shù)據(jù)生成數(shù)據(jù)流梢什。集合中所有元素必須是同類型。
  • fromCollection(Iterator, Class):從迭代器中讀取數(shù)據(jù)生成數(shù)據(jù)流朝聋。參數(shù)Class用以聲明元素類型
  • fromElements(T ...):從數(shù)組中讀取對象數(shù)據(jù)生成數(shù)據(jù)流嗡午,所有對象必須是同類型。
  • fromParallelCollection(SplittableIterator, Class):Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
  • generateSequence(from, to):在from~to之間并行生成數(shù)據(jù)返回為一個數(shù)組玖翅。

2.4 定制

  • addSource:添加一個新的數(shù)據(jù)源生成方法翼馆。如:從 Kafka 中讀取時,可以這么寫:addSource(new FlinkKafkaConsumer<>(...))金度。具體語法參考connector

3猜极、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程

得到源數(shù)據(jù)流后中姜,接下來就是調(diào)用 Flink 的各個 Operators 來對數(shù)據(jù)流中的數(shù)據(jù)做“變身”動作了。以下面的代碼為例:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

這段代碼調(diào)用了map這個算子跟伏,將input流中的所有string型數(shù)據(jù)都轉(zhuǎn)換成int型丢胚,生成一個新的數(shù)據(jù)流賦值給parsed。
當(dāng)然受扳,你可以做更復(fù)雜的“變身”携龟,更多官方支持的算子及其語法介紹點我了解

4勘高、聲明存儲這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址

當(dāng)你得到一個最終狀態(tài)的數(shù)據(jù)流后峡蟋,接下來要考慮的就是如何把這個數(shù)據(jù)流寫道外部系統(tǒng)了,也就是創(chuàng)建一個 sink华望。最常見的就是打印到控制臺蕊蝗,可以這么實現(xiàn):

parsed.print();

Data sinks 消費數(shù)據(jù)流,并且把他們寫入文件赖舟、sockets蓬戚、外部系統(tǒng),或者直接打印他們宾抓。Flink 內(nèi)置了許多輸出格式子漩。

  • writeAsText() / TextOutputFormat:以string形式按行輸出元素。該string是通過調(diào)用元素的toString()方法得到的石洗。
  • writeAsCsv(...) / CsvOutputFormat:每個元素轉(zhuǎn)換成一個逗號隔開的二元數(shù)組痛单。行于列的分隔符可以配置。列的值同樣是調(diào)用元素的toString()方法得到的劲腿。
  • print() / printToErr():直接打印到標(biāo)準(zhǔn)控制臺旭绒。還可以傳一個前綴參數(shù)加載字符串之前,用以區(qū)分不同的print()調(diào)用焦人。 當(dāng)集群開啟并發(fā)模式且大于1的時候挥吵,還會把產(chǎn)生該數(shù)據(jù)的task的標(biāo)識符作為前綴加到輸出之前。
  • writeUsingOutputFormat() / FileOutputFormat:自定義文件類型花椭,需要支持自定義類型與bytes類型的轉(zhuǎn)換忽匈。
  • writeToSocket:根據(jù)序列化規(guī)則輸出到socket。
  • addSink:調(diào)用外部sink方法矿辽〉ぴ剩可以查看connector了解更多郭厌。

一般來說,write*()方法都是用來掉使用的雕蔽,這些方法輸出的元素不參與 Flink 的 checkpoint 機(jī)制折柠。產(chǎn)生的直接效果就是,這些方法輸出的文檔可能會有丟失或者延遲批狐,而我們無法通過 Flink 的checkpoint 機(jī)制來保證扇售。
因此,為了保證流數(shù)據(jù)的精確送達(dá)嚣艇,建議使用 StreamingFileSink承冰。同時,通過.addSink(...)接口調(diào)用的外部實現(xiàn)也可以參與 Flink 的checkpoint機(jī)制食零,從而保證唯一性送達(dá)困乒。

5、觸發(fā)程序結(jié)束

完成上述所有步驟后贰谣,接下來就是觸發(fā)程序執(zhí)行顶燕,方法就是調(diào)用當(dāng)前執(zhí)行環(huán)境的execute()方法。根據(jù)執(zhí)行環(huán)境的不同冈爹,F(xiàn)link會自動選擇是在本地執(zhí)行涌攻,還是把代碼提交給集群執(zhí)行。
調(diào)用env.execute()方法后频伤,F(xiàn)link 會等待程序執(zhí)行完成恳谎,并根據(jù)執(zhí)行模式不同做不同的后續(xù)處理。
如果你不想等待job執(zhí)行完成憋肖,你可以異步觸發(fā)程序執(zhí)行因痛,這可以通過調(diào)用當(dāng)前執(zhí)行環(huán)境的executeAysnc()方法實現(xiàn)。此時岸更,他會返回給你一個 JobClient鸵膏,之后,你可以通過與 JobClient 交互來得到你提交的 job 的執(zhí)行情況怎炊,如:

final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

最后這一步在幫助理解 Flink 算子的執(zhí)行時機(jī)以及方法極其重要谭企。Flink 程序都是懶執(zhí)行的,即:當(dāng)程序的main()方法被執(zhí)行時评肆,數(shù)據(jù)的加載以及轉(zhuǎn)換工作并不會馬上開始债查,F(xiàn)link 會首先創(chuàng)建一個 dataflow graph,并把他們添加進(jìn)去瓜挽。只有當(dāng)execute()方法被觸發(fā)時盹廷,算子們的處理才真正開始,這與執(zhí)行環(huán)境類型無關(guān)久橙,不管你是在本地執(zhí)行俄占,還是在集群中執(zhí)行管怠,都是如此。

6缸榄、一個完整的例子

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        // 申請執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 聲明數(shù)據(jù)源:從套接字localhost:9999讀取
        // 聲明數(shù)據(jù)源的處理過程:.flat.Map().keyBy().window().sum()
        // 生成最終的結(jié)果數(shù)據(jù)源
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
        // 配置sink渤弛,將結(jié)果集打印到控制臺
        dataStream.print();
        // 觸發(fā)執(zhí)行
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

編碼完成后,本地啟動socket輸入端:

nc -lk 9999

然后敲入一些單詞碰凶,之后啟動 job,查看 job 的控制臺輸出鹿驼,是你輸入的這些么欲低?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市畜晰,隨后出現(xiàn)的幾起案子砾莱,更是在濱河造成了極大的恐慌,老刑警劉巖凄鼻,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腊瑟,死亡現(xiàn)場離奇詭異,居然都是意外死亡块蚌,警方通過查閱死者的電腦和手機(jī)闰非,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來峭范,“玉大人财松,你說我怎么就攤上這事∩纯兀” “怎么了辆毡?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長甜害。 經(jīng)常有香客問我舶掖,道長,這世上最難降的妖魔是什么尔店? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任眨攘,我火速辦了婚禮,結(jié)果婚禮上嚣州,老公的妹妹穿的比我還像新娘期犬。我一直安慰自己,他們只是感情好避诽,可當(dāng)我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布龟虎。 她就那樣靜靜地躺著,像睡著了一般沙庐。 火紅的嫁衣襯著肌膚如雪鲤妥。 梳的紋絲不亂的頭發(fā)上佳吞,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機(jī)與錄音棉安,去河邊找鬼底扳。 笑死,一個胖子當(dāng)著我的面吹牛贡耽,可吹牛的內(nèi)容都是我干的衷模。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼蒲赂,長吁一口氣:“原來是場噩夢啊……” “哼阱冶!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起滥嘴,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤木蹬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后若皱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體镊叁,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年走触,在試婚紗的時候發(fā)現(xiàn)自己被綠了晦譬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡互广,死狀恐怖蛔添,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情兜辞,我是刑警寧澤迎瞧,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站逸吵,受9級特大地震影響凶硅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜扫皱,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一足绅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧韩脑,春花似錦氢妈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春加缘,著一層夾襖步出監(jiān)牢的瞬間鸭叙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工拣宏, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留沈贝,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓勋乾,卻偏偏與公主長得像宋下,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子辑莫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,446評論 2 348