Flink中的DataStream程序是實(shí)現(xiàn)數(shù)據(jù)流轉(zhuǎn)換的常規(guī)程序(例如牺蹄,過濾、更新狀態(tài)薄翅、定義窗口沙兰、聚合)。
數(shù)據(jù)流最初是從各種來源(例如翘魄,消息隊(duì)列鼎天、套接字流、文件)創(chuàng)建的暑竟。
結(jié)果通過sink返回斋射,例如,接收可以將數(shù)據(jù)寫入文件或標(biāo)準(zhǔn)輸出(例如命令行終端)但荤。
Flink程序在各種上下文中運(yùn)行罗岖,獨(dú)立運(yùn)行或嵌入到其他程序中。執(zhí)行可以在本地JVM中進(jìn)行腹躁,也可以在許多機(jī)器的集群中進(jìn)行呀闻。
有關(guān)Flink API的基本概念的介紹,請(qǐng)參閱https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html潜慎。
程序樣例
下面的程序是一個(gè)完整的捡多、可工作的流式窗口關(guān)于字?jǐn)?shù)統(tǒng)計(jì)的應(yīng)用程序示例,它在5秒內(nèi)計(jì)算來自web套接字的字?jǐn)?shù)铐炫。您可以復(fù)制并粘貼代碼以在本地運(yùn)行它垒手。
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }}
要運(yùn)行示例程序,首先從終端使用netcat啟動(dòng)輸入流:
nc-lk9999
只需鍵入一些單詞倒信,然后按回車鍵輸入一個(gè)新單詞科贬。這些將是字?jǐn)?shù)統(tǒng)計(jì)程序的輸入。如果你想看到數(shù)大于1,輸入相同的單詞一遍又一遍在5s大小的窗口輸出。
數(shù)據(jù)源
程序源是程序讀取輸入的地方榜掌∮琶睿可以使用StreamExecutionEnvironment.addSource(sourceFunction)將數(shù)據(jù)源添加到程序中。
Flink附帶了許多預(yù)先實(shí)現(xiàn)的源函數(shù)憎账,而且你可以實(shí)現(xiàn)串行方式SourceFunction接口來實(shí)現(xiàn)自定義的數(shù)據(jù)源套硼。
或通過并行方式ParallelSourceFunction接口或RichParallelSourceFunction來編寫自己的數(shù)據(jù)源。
下面預(yù)定義的數(shù)據(jù)源可以通過StreamExecutionEnvironment來訪問:
基于文件:
? ? 1.readTextFile(path)?-按照TextInputFormat規(guī)范逐行以字符串形式讀取文件或者文件夾胞皱。
????2.readFile(fileInputFormat, path)?- 按照指定的文件輸入格式讀取文件或者文件夾邪意。
????3.readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)?-這是前兩個(gè)方法在內(nèi)部調(diào)用的方法。
它根據(jù)給定的fileInputFormat讀取路徑中的文件反砌。根據(jù)所提供的watchType雾鬼,此源可以定期(每隔一段時(shí)間)監(jiān)視新數(shù)據(jù)的路徑(fileprocessingmode . process_continuous),
或者一次性處理當(dāng)前路徑中的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)宴树。使用pathFilter策菜,可以排除不需要處理的文件。
實(shí)現(xiàn):
在底層酒贬,F(xiàn)link將文件讀取過程分成兩個(gè)子任務(wù)做入,即目錄監(jiān)視和數(shù)據(jù)讀取。每個(gè)子任務(wù)都由一個(gè)單獨(dú)的實(shí)體實(shí)現(xiàn)同衣。
監(jiān)視由單個(gè)非并行(parallelism = 1)任務(wù)實(shí)現(xiàn),而讀取由多個(gè)并行運(yùn)行的任務(wù)執(zhí)行壶运。后者的并行度等于作業(yè)并行度耐齐。
單個(gè)監(jiān)視任務(wù)的作用是掃描目錄(定期或僅掃描一次,這取決于watchType)蒋情,查找要處理的文件埠况,將它們劃分為分段,并將這些分段分配給下游的讀取器棵癣。
讀取器將讀取實(shí)際數(shù)據(jù)辕翰。每個(gè)分割只由一個(gè)讀取器讀取,而一個(gè)讀取器可以逐個(gè)讀取多個(gè)分割狈谊。
注意:
如果watchType設(shè)置為FileProcessingMode喜命。當(dāng)一個(gè)文件被修改時(shí),它的內(nèi)容將被完全重新處理河劝。這可能會(huì)打破“只有一次”的語義壁榕,因?yàn)樵谖募┪哺郊訑?shù)據(jù)將導(dǎo)致所有內(nèi)容被重新處理。
如果watchType設(shè)置為FileProcessingMode赎瞎。PROCESS_ONCE牌里,源程序只掃描路徑一次并退出,而不等待讀取器完成文件內(nèi)容的讀取务甥。
當(dāng)然牡辽,程序?qū)⒗^續(xù)閱讀喳篇,直到所有的文件內(nèi)容被讀取。關(guān)閉源將導(dǎo)致在該點(diǎn)之后沒有更多的檢查點(diǎn)态辛。這可能會(huì)導(dǎo)致節(jié)點(diǎn)故障后恢復(fù)較慢麸澜,因?yàn)樽鳂I(yè)將從最后一個(gè)檢查點(diǎn)恢復(fù)讀取。
基于Socket:
? ? ? ?1.socketTextStream?-?從套接字讀取因妙。元素可以用分隔符分隔痰憎。
基于集合:
????1.fromCollection(Collection)?-從Java.util. collection創(chuàng)建一個(gè)數(shù)據(jù)流。集合中的所有元素必須具有相同的類型攀涵。
????2.fromCollection(Iterator, Class)?-從迭代器創(chuàng)建數(shù)據(jù)流铣耘。該類指定迭代器返回的元素的數(shù)據(jù)類型。
????3.fromElements(T ...)?-?從給定的對(duì)象序列創(chuàng)建數(shù)據(jù)流以故。所有對(duì)象必須具有相同的類型蜗细。
????4.fromParallelCollection(SplittableIterator, Class)?-?并行地從迭代器創(chuàng)建數(shù)據(jù)流。該類指定迭代器返回的元素的數(shù)據(jù)類型怒详。
????5.generateSequence(from, to)?-?并行地生成給定區(qū)間內(nèi)的數(shù)字序列炉媒。
基于自定義:
? ? 1.addSource?- 添加一個(gè)新的source函數(shù)。例如昆烁,要從Apache Kafka中讀取數(shù)據(jù)吊骤,可以使用addSource(new FlinkKafkaConsumer08<>(...))。有關(guān)更多細(xì)節(jié)静尼,請(qǐng)參見連接器https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html白粉。
源地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html
更多大數(shù)據(jù)技術(shù)請(qǐng)咨詢微信 18310801089