前言
Data Sources 是什么呢着撩?就字面意思其實就可以知道:數(shù)據(jù)來源夜矗。
Flink 做為一款流式計算框架点骑,它可用來做批處理,即處理靜態(tài)的數(shù)據(jù)集谍夭、歷史的數(shù)據(jù)集黑滴;也可以用來做流處理,即實時的處理些實時數(shù)據(jù)流紧索,實時的產(chǎn)生數(shù)據(jù)流結(jié)果袁辈,只要數(shù)據(jù)源源不斷的過來,F(xiàn)link 就能夠一直計算下去珠漂,這個 Data Sources 就是數(shù)據(jù)的來源地晚缩。
Flink 中你可以使用?StreamExecutionEnvironment.addSource(sourceFunction)?來為你的程序添加數(shù)據(jù)來源。
Flink 已經(jīng)提供了若干實現(xiàn)好了的 source functions媳危,當然你也可以通過實現(xiàn) SourceFunction 來自定義非并行的 source 或者實現(xiàn) ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義并行的 source荞彼,
StreamExecutionEnvironment 中可以使用以下幾個已實現(xiàn)的 stream sources,
總的來說可以分為下面幾大類:
1待笑、fromCollection(Collection) - 從 Java 的 Java.util.Collection 創(chuàng)建數(shù)據(jù)流鸣皂。集合中的所有元素類型必須相同。
2暮蹂、fromCollection(Iterator, Class) - 從一個迭代器中創(chuàng)建數(shù)據(jù)流寞缝。Class 指定了該迭代器返回元素的類型。
3仰泻、fromElements(T …) - 從給定的對象序列中創(chuàng)建數(shù)據(jù)流荆陆。所有對象類型必須相同。
1
2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
newEvent(1,"barfoo",1.0),
newEvent(2,"start",2.0),
newEvent(3,"foobar",3.0),
...
);
4集侯、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創(chuàng)建并行數(shù)據(jù)流被啼。Class 指定了該迭代器返回元素的類型帜消。
5、generateSequence(from, to) - 創(chuàng)建一個生成指定區(qū)間范圍內(nèi)的數(shù)字序列的并行數(shù)據(jù)流趟据。
1、readTextFile(path) - 讀取文本文件粘衬,即符合 TextInputFormat 規(guī)范的文件跪腹,并將其作為字符串返回屯阀。
1
2
3
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.readTextFile("file:///path/to/file");
2逗栽、readFile(fileInputFormat, path) - 根據(jù)指定的文件輸入格式讀取文件(一次)彼宠。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內(nèi)部調(diào)用的方法。它根據(jù)給定的 fileInputFormat 和讀取路徑讀取文件。根據(jù)提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監(jiān)測給定路徑的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY)错洁,或者處理一次路徑對應(yīng)文件的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)膊存。你可以通過 pathFilter 進一步排除掉需要處理的文件。
1
2
3
4
5
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100,
? ? ? ? FilePathFilter.createDefaultFilter(), typeInfo);
實現(xiàn):
在具體實現(xiàn)上,F(xiàn)link 把文件讀取過程分為兩個子任務(wù),即目錄監(jiān)控和數(shù)據(jù)讀取。每個子任務(wù)都由單獨的實體實現(xiàn)摆霉。目錄監(jiān)控由單個非并行(并行度為1)的任務(wù)執(zhí)行咳秉,而數(shù)據(jù)讀取由并行運行的多個任務(wù)執(zhí)行蝌以。后者的并行性等于作業(yè)的并行性。單個目錄監(jiān)控任務(wù)的作用是掃描目錄(根據(jù) watchType 定期掃描或僅掃描一次)跟畅,查找要處理的文件并把文件分割成切分片(splits)咽筋,然后將這些切分片分配給下游 reader。reader 負責讀取數(shù)據(jù)徊件。每個切分片只能由一個 reader 讀取奸攻,但一個 reader 可以逐個讀取多個切分片蒜危。
重要注意:
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當文件被修改時睹耐,其內(nèi)容將被重新處理辐赞。這會打破“exactly-once”語義,因為在文件末尾附加數(shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理硝训。
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE响委,則 source 僅掃描路徑一次然后退出,而不等待 reader 完成文件內(nèi)容的讀取捎迫。當然 reader 會繼續(xù)閱讀晃酒,直到讀取所有的文件內(nèi)容。關(guān)閉 source 后就不會再有檢查點窄绒。這可能導(dǎo)致節(jié)點故障后的恢復(fù)速度較慢贝次,因為該作業(yè)將從最后一個檢查點恢復(fù)讀取。
socketTextStream(String hostname, int port) - 從 socket 讀取彰导。元素可以用分隔符切分蛔翅。
1
2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost",9999)// 監(jiān)聽 localhost 的 9999 端口過來的數(shù)據(jù)
.flatMap(newSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
這個在?《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運行簡單程序入門?文章里用的就是基于 Socket 的 Word Count 程序。
addSource - 添加一個新的 source function位谋。例如山析,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取數(shù)據(jù)
說下上面幾種的特點吧:
1、基于集合:有界數(shù)據(jù)集掏父,更偏向于本地測試用
2笋轨、基于文件:適合監(jiān)聽文件修改并讀取其內(nèi)容
3、基于 Socket:監(jiān)聽主機的 host port赊淑,從 Socket 中獲取數(shù)據(jù)
4爵政、自定義 addSource:大多數(shù)的場景數(shù)據(jù)都是無界的,會源源不斷的過來陶缺。比如去消費 Kafka 某個 topic 上的數(shù)據(jù)钾挟,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧饱岸,F(xiàn)link 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用掺出。你可以去看看 FlinkKafkaConsumerBase 這個基礎(chǔ)類,它是 Flink Kafka 消費的最根本的類苫费。
1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaEvent> input = env
.addSource(
newFlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),//從參數(shù)中獲取傳進來的 topic
newKafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(newCustomWatermarkExtractor()));
Flink 目前支持如下圖里面常見的 Source:
如果你想自己自定義自己的 Source 呢汤锨?
那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口百框,它繼承自一個標記接口(空接口)Function闲礼。
SourceFunction 定義了兩個接口方法:
1、run : 啟動一個 source,即對接一個外部數(shù)據(jù)源然后 emit 元素形成 stream(大部分情況下會通過在該方法里運行一個 while 循環(huán)的形式來產(chǎn)生 stream)位仁。
2柑贞、cancel : 取消一個 source,也即將 run 中的循環(huán) emit 元素的行為終止聂抢。
正常情況下钧嘶,一個 SourceFunction 實現(xiàn)這兩個接口方法就可以了。其實這兩個接口方法也固定了一種實現(xiàn)模板琳疏。
比如有决,實現(xiàn)一個 XXXSourceFunction,那么大致的模板是這樣的:(直接拿 FLink 源碼的實例給你看看)
本文主要講了下 Flink 的常見 Source 有哪些并且簡單的提了下如何自定義 Source空盼。
轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/
微信公眾號:zhisheng