《從0到1學習Flink》—— Data Source 介紹 轉(zhuǎn)自微信公眾號:zhisheng

前言

Data Sources 是什么呢着撩?就字面意思其實就可以知道:數(shù)據(jù)來源夜矗。

Flink 做為一款流式計算框架点骑,它可用來做批處理,即處理靜態(tài)的數(shù)據(jù)集谍夭、歷史的數(shù)據(jù)集黑滴;也可以用來做流處理,即實時的處理些實時數(shù)據(jù)流紧索,實時的產(chǎn)生數(shù)據(jù)流結(jié)果袁辈,只要數(shù)據(jù)源源不斷的過來,F(xiàn)link 就能夠一直計算下去珠漂,這個 Data Sources 就是數(shù)據(jù)的來源地晚缩。

Flink 中你可以使用?StreamExecutionEnvironment.addSource(sourceFunction)?來為你的程序添加數(shù)據(jù)來源。

Flink 已經(jīng)提供了若干實現(xiàn)好了的 source functions媳危,當然你也可以通過實現(xiàn) SourceFunction 來自定義非并行的 source 或者實現(xiàn) ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義并行的 source荞彼,

Flink

StreamExecutionEnvironment 中可以使用以下幾個已實現(xiàn)的 stream sources,

總的來說可以分為下面幾大類:

基于集合

1待笑、fromCollection(Collection) - 從 Java 的 Java.util.Collection 創(chuàng)建數(shù)據(jù)流鸣皂。集合中的所有元素類型必須相同。

2暮蹂、fromCollection(Iterator, Class) - 從一個迭代器中創(chuàng)建數(shù)據(jù)流寞缝。Class 指定了該迭代器返回元素的類型。

3仰泻、fromElements(T …) - 從給定的對象序列中創(chuàng)建數(shù)據(jù)流荆陆。所有對象類型必須相同。

1

2

3

4

5

6

7

8

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = env.fromElements(

newEvent(1,"barfoo",1.0),

newEvent(2,"start",2.0),

newEvent(3,"foobar",3.0),

...

);

4集侯、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創(chuàng)建并行數(shù)據(jù)流被啼。Class 指定了該迭代器返回元素的類型帜消。

5、generateSequence(from, to) - 創(chuàng)建一個生成指定區(qū)間范圍內(nèi)的數(shù)字序列的并行數(shù)據(jù)流趟据。

基于文件

1、readTextFile(path) - 讀取文本文件粘衬,即符合 TextInputFormat 規(guī)范的文件跪腹,并將其作為字符串返回屯阀。

1

2

3

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.readTextFile("file:///path/to/file");

2逗栽、readFile(fileInputFormat, path) - 根據(jù)指定的文件輸入格式讀取文件(一次)彼宠。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內(nèi)部調(diào)用的方法。它根據(jù)給定的 fileInputFormat 和讀取路徑讀取文件。根據(jù)提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監(jiān)測給定路徑的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY)错洁,或者處理一次路徑對應(yīng)文件的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)膊存。你可以通過 pathFilter 進一步排除掉需要處理的文件。

1

2

3

4

5

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(

myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100,

? ? ? ? FilePathFilter.createDefaultFilter(), typeInfo);

實現(xiàn):

在具體實現(xiàn)上,F(xiàn)link 把文件讀取過程分為兩個子任務(wù),即目錄監(jiān)控和數(shù)據(jù)讀取。每個子任務(wù)都由單獨的實體實現(xiàn)摆霉。目錄監(jiān)控由單個非并行(并行度為1)的任務(wù)執(zhí)行咳秉,而數(shù)據(jù)讀取由并行運行的多個任務(wù)執(zhí)行蝌以。后者的并行性等于作業(yè)的并行性。單個目錄監(jiān)控任務(wù)的作用是掃描目錄(根據(jù) watchType 定期掃描或僅掃描一次)跟畅,查找要處理的文件并把文件分割成切分片(splits)咽筋,然后將這些切分片分配給下游 reader。reader 負責讀取數(shù)據(jù)徊件。每個切分片只能由一個 reader 讀取奸攻,但一個 reader 可以逐個讀取多個切分片蒜危。

重要注意:

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當文件被修改時睹耐,其內(nèi)容將被重新處理辐赞。這會打破“exactly-once”語義,因為在文件末尾附加數(shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理硝训。

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE响委,則 source 僅掃描路徑一次然后退出,而不等待 reader 完成文件內(nèi)容的讀取捎迫。當然 reader 會繼續(xù)閱讀晃酒,直到讀取所有的文件內(nèi)容。關(guān)閉 source 后就不會再有檢查點窄绒。這可能導(dǎo)致節(jié)點故障后的恢復(fù)速度較慢贝次,因為該作業(yè)將從最后一個檢查點恢復(fù)讀取。

基于 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取彰导。元素可以用分隔符切分蛔翅。

1

2

3

4

5

6

7

8

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env

.socketTextStream("localhost",9999)// 監(jiān)聽 localhost 的 9999 端口過來的數(shù)據(jù)

.flatMap(newSplitter())

.keyBy(0)

.timeWindow(Time.seconds(5))

.sum(1);

這個在?《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運行簡單程序入門?文章里用的就是基于 Socket 的 Word Count 程序。

自定義:

addSource - 添加一個新的 source function位谋。例如山析,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取數(shù)據(jù)

說下上面幾種的特點吧

1、基于集合:有界數(shù)據(jù)集掏父,更偏向于本地測試用

2笋轨、基于文件:適合監(jiān)聽文件修改并讀取其內(nèi)容

3、基于 Socket:監(jiān)聽主機的 host port赊淑,從 Socket 中獲取數(shù)據(jù)

4爵政、自定義 addSource:大多數(shù)的場景數(shù)據(jù)都是無界的,會源源不斷的過來陶缺。比如去消費 Kafka 某個 topic 上的數(shù)據(jù)钾挟,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧饱岸,F(xiàn)link 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用掺出。你可以去看看 FlinkKafkaConsumerBase 這個基礎(chǔ)類,它是 Flink Kafka 消費的最根本的類苫费。

1

2

3

4

5

6

7

8

9

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env

.addSource(

newFlinkKafkaConsumer011<>(

parameterTool.getRequired("input-topic"),//從參數(shù)中獲取傳進來的 topic

newKafkaEventSchema(),

parameterTool.getProperties())

.assignTimestampsAndWatermarks(newCustomWatermarkExtractor()));

Flink 目前支持如下圖里面常見的 Source:

如果你想自己自定義自己的 Source 呢汤锨?

那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口百框,它繼承自一個標記接口(空接口)Function闲礼。

SourceFunction 定義了兩個接口方法:

1、run : 啟動一個 source,即對接一個外部數(shù)據(jù)源然后 emit 元素形成 stream(大部分情況下會通過在該方法里運行一個 while 循環(huán)的形式來產(chǎn)生 stream)位仁。

2柑贞、cancel : 取消一個 source,也即將 run 中的循環(huán) emit 元素的行為終止聂抢。

正常情況下钧嘶,一個 SourceFunction 實現(xiàn)這兩個接口方法就可以了。其實這兩個接口方法也固定了一種實現(xiàn)模板琳疏。

比如有决,實現(xiàn)一個 XXXSourceFunction,那么大致的模板是這樣的:(直接拿 FLink 源碼的實例給你看看)

最后

本文主要講了下 Flink 的常見 Source 有哪些并且簡單的提了下如何自定義 Source空盼。

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/

微信公眾號:zhisheng

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末书幕,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子揽趾,更是在濱河造成了極大的恐慌台汇,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件篱瞎,死亡現(xiàn)場離奇詭異苟呐,居然都是意外死亡,警方通過查閱死者的電腦和手機俐筋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門牵素,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人澄者,你說我怎么就攤上這事笆呆。” “怎么了粱挡?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵赠幕,是天一觀的道長。 經(jīng)常有香客問我抱怔,道長劣坊,這世上最難降的妖魔是什么嘀倒? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任屈留,我火速辦了婚禮,結(jié)果婚禮上测蘑,老公的妹妹穿的比我還像新娘灌危。我一直安慰自己,他們只是感情好碳胳,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布勇蝙。 她就那樣靜靜地躺著,像睡著了一般挨约。 火紅的嫁衣襯著肌膚如雪味混。 梳的紋絲不亂的頭發(fā)上产雹,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機與錄音翁锡,去河邊找鬼蔓挖。 笑死,一個胖子當著我的面吹牛馆衔,可吹牛的內(nèi)容都是我干的瘟判。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼角溃,長吁一口氣:“原來是場噩夢啊……” “哼拷获!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起减细,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤匆瓜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后未蝌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體陕壹,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年树埠,在試婚紗的時候發(fā)現(xiàn)自己被綠了糠馆。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡怎憋,死狀恐怖又碌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绊袋,我是刑警寧澤毕匀,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站癌别,受9級特大地震影響皂岔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜展姐,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一躁垛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧圾笨,春花似錦教馆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春悲敷,著一層夾襖步出監(jiān)牢的瞬間究恤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工后德, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留丁溅,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓探遵,卻偏偏與公主長得像窟赏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子箱季,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容