第一章 初識Flink
大數(shù)據(jù)開發(fā)總體架構(gòu)
數(shù)據(jù)傳輸層:
常用的數(shù)據(jù)傳輸工具有Flume们拙、Sqoop沼死、Kafka。Flume是一個日志收集系統(tǒng)呕童,用于將大量日志數(shù)據(jù)從不同的源進(jìn)行收集漆际、聚合,最終移動到一個集中的數(shù)據(jù)中心進(jìn)行存儲夺饲。Sqoop主要用于將數(shù)據(jù)在關(guān)系型數(shù)據(jù)庫和Hadoop平臺之間進(jìn)行相互轉(zhuǎn)移奸汇。Kafka是一個發(fā)布與訂閱消息系統(tǒng)施符,它可以實時處理大量消息數(shù)據(jù)以滿足各種需求,相當(dāng)于數(shù)據(jù)中轉(zhuǎn)站擂找。
數(shù)據(jù)存儲層:
數(shù)據(jù)可以存儲于分布式文件系統(tǒng)HDFS中戳吝,也可以存儲于分布式數(shù)據(jù)庫HBase中,而HBase的底層實際上還是將數(shù)據(jù)存儲于HDFS中贯涎。此外听哭,為了滿足對大量數(shù)據(jù)的快速檢索與統(tǒng)計,可以使用Elasticsearch作為全文檢索引擎塘雳。
資源管理層:
YARN是大數(shù)據(jù)開發(fā)中常用的資源管理器陆盘,它是一個通用資源(內(nèi)存、CPU)管理系統(tǒng)败明,不僅可以集成于Hadoop中隘马,也可以集成于Flink、Spark等其他大數(shù)據(jù)框架中肩刃。
數(shù)據(jù)計算層:
MapReduce是Hadoop的核心組成部分犀斋,可以結(jié)合Hive通過SQL的方式進(jìn)行數(shù)據(jù)的離線計算,當(dāng)然也可以單獨編寫MapReduce應(yīng)用程序進(jìn)行計算贮懈。Storm用于進(jìn)行數(shù)據(jù)的實時計算旬牲,可以非常容易地實時處理無限的流數(shù)據(jù)。Flink提供了離線計算庫和實時計算庫兩種呢燥,離線計算庫支持FlinkML(機(jī)器學(xué)習(xí))崭添、Gelly(圖計算)、基于Table的關(guān)系操作叛氨,實時計算庫支持CEP(復(fù)雜事件處理)呼渣,同時也支持基于Table的關(guān)系操作。
任務(wù)調(diào)度層:
Oozie是一個用于Hadoop平臺的工作流調(diào)度引擎寞埠,可以使用工作流的方式對編寫好的大數(shù)據(jù)任務(wù)進(jìn)行調(diào)度屁置。若任務(wù)不復(fù)雜,則可以使用Linux系統(tǒng)自帶的Crontab定時任務(wù)進(jìn)行調(diào)度仁连。
業(yè)務(wù)模型層:
對大量數(shù)據(jù)的處理結(jié)果最終需要通過可視化的方式進(jìn)行展示蓝角。可以使用Java饭冬、PHP等處理業(yè)務(wù)邏輯使鹅,查詢結(jié)果數(shù)據(jù)庫,最終結(jié)合ECharts等前端可視化框架展示處理結(jié)果昌抠。
從另一個角度理解Flink在大數(shù)據(jù)開發(fā)架構(gòu)中的位置患朱,如圖。
什么是Flink
Apache Flink是一個框架和分布式處理引擎炊苫,用于對無邊界和有邊界的數(shù)據(jù)流進(jìn)行有狀態(tài)的計算裁厅。Flink被設(shè)計為可以在所有常見集群環(huán)境中運行冰沙,并能以內(nèi)存速度和任意規(guī)模執(zhí)行計算。目前市場上主流的流式計算框架有Apache Storm执虹、Spark Streaming倦淀、Apache Flink等,但能夠同時支持低延遲声畏、高吞吐撞叽、Exactly-Once(收到的消息僅處理一次)的框架只有Apache Flink。
Flink是原生的流處理系統(tǒng)插龄,但也提供了批處理API愿棋,擁有基于流式計算引擎處理批量數(shù)據(jù)的計算能力,真正實現(xiàn)了批流統(tǒng)一均牢。與Spark批處理不同的是糠雨,F(xiàn)link把批處理當(dāng)作流處理中的一種特殊情況。在Flink中徘跪,所有的數(shù)據(jù)都看作流甘邀,是一種很好的抽象,因為這更接近于現(xiàn)實世界垮庐。
Flink的主要優(yōu)勢如下:
同時支持高吞吐松邪、低延遲
Flink是目前開源社區(qū)中唯一同時支持高吞吐、低延遲的分布式流式數(shù)據(jù)處理框架哨查,在每秒處理數(shù)百萬條事件的同時能夠保持毫秒級延遲逗抑。而同類框架Spark Streaming在流式計算中無法做到低延遲保障。Apache Storm可以做到低延遲寒亥,但無法滿足高吞吐的要求邮府。同時滿足高吞吐、低延遲對流式數(shù)據(jù)處理框架是非常重要的溉奕,可以大大提高數(shù)據(jù)處理的性能褂傀。-
支持有狀態(tài)計算
所謂狀態(tài),就是在流式計算過程中將算子(Flink提供了豐富的用于數(shù)據(jù)處理的函數(shù)加勤,這些函數(shù)稱為算子)的中間結(jié)果(需要持續(xù)聚合計算仙辟,依賴后續(xù)的數(shù)據(jù)記錄)保存在內(nèi)存或者文件系統(tǒng)中,等下一個事件進(jìn)入算子后可以從之前的狀態(tài)中獲取中間結(jié)果胸竞,以便計算當(dāng)前的結(jié)果(當(dāng)前結(jié)果的計算可能依賴于之前的中間結(jié)果)欺嗤,從而無須每次都基于全部的原始數(shù)據(jù)來統(tǒng)計結(jié)果参萄,極大地提升了系統(tǒng)性能卫枝。
支持事件時間
時間是流處理框架的一個重要組成部分。目前大多數(shù)框架計算采用的都是系統(tǒng)處理時間(Process Time)讹挎,也就是事件傳輸?shù)接嬎憧蚣芴幚頃r校赤,系統(tǒng)主機(jī)的當(dāng)前時間吆玖。Flink除了支持處理時間外,還支持事件時間(Event Time)马篮,根據(jù)事件本身自帶的時間戳(事件的產(chǎn)生時間)進(jìn)行結(jié)果的計算沾乘,例如窗口聚合、會話計算浑测、模式檢測和基于時間的聚合等翅阵。這種基于事件驅(qū)動的機(jī)制使得事件即使亂序到達(dá),F(xiàn)link也能夠計算出精確的結(jié)果迁央,保證了結(jié)果的準(zhǔn)確性和一致性掷匠。支持高可用性配置
Flink可以與YARN、HDFS岖圈、ZooKeeper等緊密集成讹语,配置高可用,從而可以實現(xiàn)快速故障恢復(fù)蜂科、動態(tài)擴(kuò)容顽决、7×24小時運行流式應(yīng)用等作業(yè)。Flink可以將任務(wù)執(zhí)行的快照保存在存儲介質(zhì)上导匣,當(dāng)需要停機(jī)運維等操作時才菠,下次啟動可以直接從事先保存的快照恢復(fù)原有的計算狀態(tài),使得任務(wù)繼續(xù)按照停機(jī)之前的狀態(tài)運行贡定。提供了不同層級的API
Flink為流處理和批處理提供了不同層級的API鸠儿,每一種API在簡潔性和表達(dá)力上有著不同的側(cè)重,并且針對不同的應(yīng)用場景厕氨,不同層級的API降低了系統(tǒng)耦合度进每,也為用戶構(gòu)建Flink應(yīng)用程序提供了豐富且友好的接口。
Flink的應(yīng)用場景
- 事件驅(qū)動
根據(jù)到來的事件流觸發(fā)計算命斧、狀態(tài)更新或其他外部動作田晚,主要應(yīng)用實例有反欺詐、異常檢測国葬、基于規(guī)則的報警贤徒、業(yè)務(wù)流程監(jiān)控、(社交網(wǎng)絡(luò))Web應(yīng)用等汇四。
傳統(tǒng)應(yīng)用和事件驅(qū)動型應(yīng)用架構(gòu)的區(qū)別如圖:
- 數(shù)據(jù)分析
從原始數(shù)據(jù)中提取有價值的信息和指標(biāo)接奈,這些信息和指標(biāo)數(shù)據(jù)可以寫入外部數(shù)據(jù)庫系統(tǒng)或以內(nèi)部狀態(tài)的形式維護(hù),主要應(yīng)用實例有電信網(wǎng)絡(luò)質(zhì)量監(jiān)控通孽、移動應(yīng)用中的產(chǎn)品更新及實驗評估分析序宦、實時數(shù)據(jù)分析、大規(guī)模圖分析等背苦。
Flink同時支持批量及流式分析應(yīng)用互捌,如圖
- 數(shù)據(jù)管道
數(shù)據(jù)管道和ETL(Extract-Transform-Load潘明,提取-轉(zhuǎn)換-加載)作業(yè)的用途相似,都可以轉(zhuǎn)換秕噪、豐富數(shù)據(jù)钳降,并將其從某個存儲系統(tǒng)移動到另一個。與ETL不同的是腌巾,ETL作業(yè)通常會周期性地觸發(fā)遂填,將數(shù)據(jù)從事務(wù)型數(shù)據(jù)庫復(fù)制到分析型數(shù)據(jù)庫或數(shù)據(jù)倉庫。但數(shù)據(jù)管道是以持續(xù)流模式運行的澈蝙,而非周期性觸發(fā)城菊,它支持從一個不斷生成數(shù)據(jù)的源頭讀取記錄,并將它們以低延遲移動到終點碉克。例如凌唬,監(jiān)控文件系統(tǒng)目錄中的新文件,并將其數(shù)據(jù)寫入事件日志漏麦。
數(shù)據(jù)管道的主要應(yīng)用實例有電子商務(wù)中的實時查詢索引構(gòu)建客税、持續(xù)ETL等。周期性ETL作業(yè)和持續(xù)數(shù)據(jù)管道的對比如圖:
流計算框架對比
當(dāng)前大數(shù)據(jù)領(lǐng)域主流的流式計算框架有Apache Storm撕贞、Spark Streaming更耻、Apache Flink三種。通常將Apache Storm稱為第一代流式計算框架捏膨,Spark Streaming稱為第二代流式計算框架秧均,現(xiàn)在又出現(xiàn)了一種優(yōu)秀的第三代實時計算框架Apache Flink,這三種計算框架的區(qū)別如表:
-
模型
Native:原生流處理。指輸入的數(shù)據(jù)一旦到達(dá)链快,就立即進(jìn)行處理誉己,一次處理一條數(shù)據(jù),如圖
Micro-Batching:微批流處理域蜗。把輸入的數(shù)據(jù)按照預(yù)先定義的時間間隔(例如1秒鐘)分成短小的批量數(shù)據(jù)巨双,流經(jīng)流處理系統(tǒng)進(jìn)行處理,如圖
Storm和Flink使用的是原生流處理霉祸,一次處理一條數(shù)據(jù)筑累,是真正意義的流處理;而Spark Streaming實際上是通過批處理的方式模擬流處理丝蹭,一次處理一批數(shù)據(jù)(小批量)慢宗。
API
Storm只提供了組合式的基礎(chǔ)API;而Spark Streaming和Flink都提供了封裝后的高階函數(shù),例如map()婆廊、filter(),以及一些窗口函數(shù)巫橄、聚合函數(shù)等淘邻,使用這些函數(shù)可以輕松處理復(fù)雜的數(shù)據(jù),構(gòu)建并行應(yīng)用程序湘换。處理次數(shù)
在流處理系統(tǒng)中宾舅,對數(shù)據(jù)的處理有3種級別的語義:At-Most-Once(最多一次)、At-Least-Once(至少一次)彩倚、Exactly-Once(僅一次)筹我。
由此可見,衡量一個流處理系統(tǒng)能力的關(guān)鍵是Exactly-Once帆离。容錯
Storm通過使用ACK(確認(rèn)回執(zhí)蔬蕊,即數(shù)據(jù)接收方接收到數(shù)據(jù)后要向發(fā)送方發(fā)送確認(rèn)回執(zhí),以此來保證數(shù)據(jù)不丟失)機(jī)制來確認(rèn)每一條數(shù)據(jù)是否被成功處理哥谷,當(dāng)處理失敗時岸夯,則重新發(fā)送數(shù)據(jù)。這樣很容易做到保證所有數(shù)據(jù)均被處理们妥,沒有遺漏猜扮,但這種方式不能保證數(shù)據(jù)僅被處理一次,因此存在同一條數(shù)據(jù)重復(fù)處理的情況监婶。
由于Spark Streaming是微批處理旅赢,不是真正意義上的流處理,其容錯機(jī)制的實現(xiàn)相對簡單惑惶。Spark Streaming中的每一批數(shù)據(jù)成為一個RDD(Resilient Distributed Dataset煮盼,分布式數(shù)據(jù)集)。RDD Checkpoint(檢查點)機(jī)制相當(dāng)于對RDD數(shù)據(jù)進(jìn)行快照带污,可以將經(jīng)常使用的RDD快照到指定的文件系統(tǒng)中孕似,例如HDFS。當(dāng)機(jī)器發(fā)生故障導(dǎo)致內(nèi)存或磁盤中的RDD數(shù)據(jù)丟失時刮刑,可以快速從快照中對指定的RDD進(jìn)行恢復(fù)喉祭。
Flink的容錯機(jī)制是基于分布式快照實現(xiàn)的,通過CheckPoint機(jī)制保存流處理作業(yè)某些時刻的狀態(tài)雷绢,當(dāng)任務(wù)異常結(jié)束時泛烙,默認(rèn)從最近一次保存的完整快照處恢復(fù)任務(wù)。狀態(tài)
流處理系統(tǒng)的狀態(tài)管理是非常重要的翘紊,Storm沒有實現(xiàn)狀態(tài)管理蔽氨,Spark Streaming和Flink都實現(xiàn)了狀態(tài)管理。通過狀態(tài)管理可以把程序運行中某一時刻的數(shù)據(jù)結(jié)果保存起來,以便于后續(xù)的計算和故障的恢復(fù)鹉究。延遲
由于Storm和Flink是接收到一條數(shù)據(jù)就立即處理宇立,因此數(shù)據(jù)處理的延遲很低;而Spark Streaming是微批處理自赔,需要形成一小批數(shù)據(jù)才會處理妈嘹,數(shù)據(jù)處理的延遲相對偏高。吞吐量
Storm的吞吐量相對來說較低绍妨,Spark Streaming和Flink的吞吐量則比較高润脸。較高的吞吐量可以提高資源利用率,減小系統(tǒng)開銷他去。
總的來說毙驯,Storm非常適合任務(wù)量小且延遲要求低的應(yīng)用,但要注意Storm的容錯恢復(fù)和狀態(tài)管理都會降低整體的性能水平灾测。如果你要使用Lambda架構(gòu)爆价,并且要集成Spark的各種庫,那么Spark Streaming是一個不錯的選擇媳搪,但是要注意微批處理的局限性以及延遲問題允坚。Flink可以滿足絕大多數(shù)流處理場景,提供了豐富的高階函數(shù)蛾号,并且也針對批處理場景提供了相應(yīng)的API稠项,是非常有前景的一個項目。
Flink主要組件
Flink是由多個組件構(gòu)成的軟件棧鲜结,整個軟件椪乖耍可分為4層,如圖:
(1)存儲層
Flink本身并沒有提供分布式文件系統(tǒng)精刷,因此Flink的分析大多依賴于HDFS拗胜,也可以從HBase和Amazon S3(亞馬遜云存儲服務(wù))等持久層讀取數(shù)據(jù)。
(2)調(diào)度層
Flink自帶一個簡易的資源調(diào)度器怒允,稱為獨立調(diào)度器(Standalone)埂软。若集群中沒有任何資源管理器,則可以使用自帶的獨立調(diào)度器纫事。當(dāng)然勘畔,F(xiàn)link也支持在其他的集群管理器上運行,包括Hadoop YARN丽惶、Apache Mesos等炫七。
(3)計算層
Flink的核心是一個對由很多計算任務(wù)組成的、運行在多個工作機(jī)器或者一個計算集群上的應(yīng)用進(jìn)行調(diào)度钾唬、分發(fā)以及監(jiān)控的計算引擎万哪,為API工具層提供基礎(chǔ)服務(wù)俐筋。
(4)工具層
在Flink Runtime的基礎(chǔ)上娘侍,F(xiàn)link提供了面向流處理(DataStream API)和批處理(DataSet API)的不同計算接口搀继,并在此接口上抽象出了不同的應(yīng)用類型組件庫驶冒,例如基于流處理的CEP(復(fù)雜事件處理庫)、Table&SQL(結(jié)構(gòu)化表處理庫)和基于批處理的Gelly(圖計算庫)的止、FlinkML(機(jī)器學(xué)習(xí)庫)檩坚、Table&SQL(結(jié)構(gòu)化表處理庫)。
Flink編程模型——數(shù)據(jù)集
在Flink的世界觀中冲杀,任何類型的數(shù)據(jù)都可以形成一種事件流效床。例如信用卡交易睹酌、傳感器測量权谁、服務(wù)器日志、網(wǎng)站或移動應(yīng)用程序上的用戶交互記錄等憋沿,所有這些數(shù)據(jù)都可以形成一種流旺芽,因為數(shù)據(jù)都是一條一條產(chǎn)生的。
根據(jù)數(shù)據(jù)流是否有時間邊界辐啄,可將數(shù)據(jù)流分為有界流和無界流采章。有界流產(chǎn)生的數(shù)據(jù)集稱為有界數(shù)據(jù)集,無界流產(chǎn)生的數(shù)據(jù)集稱為無界數(shù)據(jù)集壶辜,如圖
- 有界數(shù)據(jù)集
定義一個數(shù)據(jù)流的開始悯舟,也定義數(shù)據(jù)流的結(jié)束,就會產(chǎn)生有界數(shù)據(jù)集砸民。有界數(shù)據(jù)集的特點是數(shù)據(jù)是靜止不動的抵怎,或者說當(dāng)處理此類數(shù)據(jù)時不考慮數(shù)據(jù)的追加操作。例如岭参,讀取MySQL數(shù)據(jù)庫反惕、文本文件、HDFS系統(tǒng)等存儲介質(zhì)中的數(shù)據(jù)進(jìn)行計算分析演侯。
有界數(shù)據(jù)集具有時間邊界姿染,時間范圍可能是一分鐘,也可能是一天內(nèi)的交易數(shù)據(jù)秒际⌒停可以在讀取所有數(shù)據(jù)后再進(jìn)行計算,對有界數(shù)據(jù)集的處理通常稱為批處理(Batch Processing)娄徊。
批處理的數(shù)據(jù)查詢方式如圖:
- 無界數(shù)據(jù)集
定義一個數(shù)據(jù)流的開始舷嗡,但沒有定義數(shù)據(jù)流的結(jié)束,就會產(chǎn)生無界數(shù)據(jù)集嵌莉。無界數(shù)據(jù)集會無休止地產(chǎn)生新數(shù)據(jù)进萄,是沒有邊界的。例如,實時讀取Kafka中的消息數(shù)據(jù)進(jìn)行計算中鼠、實時日志監(jiān)控等可婶。
對無界數(shù)據(jù)集必須持續(xù)處理,即數(shù)據(jù)被讀取后需要立刻處理援雇,不能等到所有數(shù)據(jù)都到達(dá)再處理矛渴,因為數(shù)據(jù)輸入是無限的,在任何時候輸入都不會完成惫搏。處理無界數(shù)據(jù)集通常要求以特定順序讀取事件(例如事件發(fā)生的順序)具温,以便能夠推斷結(jié)果的完整性。對無界數(shù)據(jù)集的處理被稱為流處理筐赔。
有界數(shù)據(jù)集與無界數(shù)據(jù)集其實是一個相對的概念铣猩,如果每間隔一分鐘、一小時茴丰、一天對數(shù)據(jù)進(jìn)行一次計算达皿,那么認(rèn)為這一段時間的數(shù)據(jù)相對是有界的。有界的流數(shù)據(jù)又可以一條一條地按照順序發(fā)送給計算引擎進(jìn)行處理贿肩,在這種情況下可以認(rèn)為數(shù)據(jù)是相對無界的峦椰。因此,有界數(shù)據(jù)集與無界數(shù)據(jù)集可以相互轉(zhuǎn)換汰规。Flink正是使用這種方式將有界數(shù)據(jù)集與無界數(shù)據(jù)集進(jìn)行統(tǒng)一處理汤功,從而將批處理和流處理統(tǒng)一在一套流式引擎中,能夠同時實現(xiàn)批處理與流處理任務(wù)溜哮。
流處理數(shù)據(jù)查詢方式如圖:
Flink編程模型——編程接口
Flink提供了豐富的數(shù)據(jù)處理接口滔金,并將接口抽象成4層,由下向上分別為Stateful Stream Processing API茬射、DataStream/DataSet API鹦蠕、Table API以及SQL API,開發(fā)者可以根據(jù)具體需求選擇任意一層接口進(jìn)行應(yīng)用開發(fā),如圖:
Stateful Stream Processing API
Flink中處理有狀態(tài)流最底層的接口在抛,使用Stateful Stream Process API接口可以實現(xiàn)非常復(fù)雜的流式計算邏輯钟病,開發(fā)靈活性非常強(qiáng),但是用戶使用成本也相對較高刚梭。DataStream/DataSet API
實際上肠阱,大多數(shù)應(yīng)用程序不需要上述低級抽象,而是針對核心API進(jìn)行編程的朴读,例如DataStream API和DataSet API屹徘。DataStream API用于處理無界數(shù)據(jù)集,即流處理衅金;DataSet API用于處理有界數(shù)據(jù)集噪伊,即批處理簿煌。這兩種API都提供了用于數(shù)據(jù)處理的通用操作,例如各種形式的轉(zhuǎn)換鉴吹、連接姨伟、聚合等。Table API
Table API作為批處理和流處理統(tǒng)一的關(guān)系型API豆励,即查詢在無界實時流或有界批數(shù)據(jù)集上以相同的語義執(zhí)行夺荒,并產(chǎn)生相同的結(jié)果。
Table API構(gòu)建在DataStream/DataSet API之上良蒸,提供了大量編程接口技扼,例如GroupByKey、Join等操作嫩痰,是批處理和流處理統(tǒng)一的關(guān)系型API剿吻,使用起來更加簡潔。使用Table API允許在表與DataStream/DataSet數(shù)據(jù)集之間無縫切換始赎,并且可以將Table API與DataStream/DataSet API混合使用和橙。SQL API
Flink提供的最高級別的抽象是SQL API仔燕。這種抽象在語義和表達(dá)方式上均類似于Table API造垛,但是將程序表示為SQL查詢表達(dá)式。SQL抽象與Table API緊密交互晰搀,并且可以對Table API中定義的表執(zhí)行SQL查詢五辽。
此外,SQL語言具有比較低的學(xué)習(xí)成本外恕,能夠讓數(shù)據(jù)分析人員和開發(fā)人員快速上手杆逗。
Flink編程模型——程序結(jié)構(gòu)
在Hadoop中,實現(xiàn)一個MapReduce應(yīng)用程序需要編寫Map和Reduce兩部分鳞疲;在Storm中罪郊,實現(xiàn)一個Topology需要編寫Spout和Bolt兩部分;同樣尚洽,實現(xiàn)一個Flink應(yīng)用程序也需要同樣的邏輯悔橄。
一個Flink應(yīng)用程序由3部分構(gòu)成,或者說將Flink的操作算子可以分成3部分腺毫,分別為Source癣疟、Transformation和Sink,如圖:
- Source:數(shù)據(jù)源部分潮酒。負(fù)責(zé)讀取指定存儲介質(zhì)中的數(shù)據(jù)睛挚,轉(zhuǎn)為分布式數(shù)據(jù)流或數(shù)據(jù)集,例如readTextFile()急黎、socketTextStream()等算子扎狱。
- Transformation:數(shù)據(jù)轉(zhuǎn)換部分侧到。負(fù)責(zé)對一個或多個數(shù)據(jù)流或數(shù)據(jù)集進(jìn)行各種轉(zhuǎn)換操作,并產(chǎn)生一個或多個輸出數(shù)據(jù)流或數(shù)據(jù)集淤击,例如map()床牧、flatMap()、keyBy()等算子遭贸。
- Sink:數(shù)據(jù)輸出部分戈咳。負(fù)責(zé)將轉(zhuǎn)換后的結(jié)果數(shù)據(jù)發(fā)送到HDFS、文本文件壕吹、MySQL著蛙、Elasticsearch等目的地,例如writeAsText()算子耳贬。
第二章 Flink運行架構(gòu)及原理
1踏堡、Flink運行時架構(gòu)
1.1 Flink運行時架構(gòu)-YARN架構(gòu)
Flink有多種運行模式,可以運行在一臺機(jī)器上咒劲,稱為本地(單機(jī))模式顷蟆;也可以使用YARN或Mesos作為底層資源調(diào)度系統(tǒng)以分布式的方式在集群中運行,稱為Flink On YARN模式(目前企業(yè)中使用最多的模式)腐魂;還可以使用Flink自帶的資源調(diào)度系統(tǒng)帐偎,不依賴其他系統(tǒng),稱為Flink Standalone模式蛔屹。
本地模式通常用于對應(yīng)用程序的簡單測試削樊。
YARN集群總體上是經(jīng)典的主/從(Master/Slave)架構(gòu),主要由ResourceManager兔毒、NodeManager漫贞、ApplicationMaster和Container等幾個組件構(gòu)成,YARN集群架構(gòu)如圖:
ResourceManager
以后臺進(jìn)程的形式運行育叁,負(fù)責(zé)對集群資源進(jìn)行統(tǒng)一管理和任務(wù)調(diào)度迅脐。NodeManager
集群中每個節(jié)點上的資源和任務(wù)管理器,以后臺進(jìn)程的形式運行豪嗽。它會定時向ResourceManager匯報本節(jié)點上的資源(內(nèi)存谴蔑、CPU)使用情況和各個Container的運行狀態(tài),同時會接收并處理來自ApplicationMaster的Container啟動/停止等請求昵骤。Task
應(yīng)用程序具體執(zhí)行的任務(wù)树碱。一個應(yīng)用程序可能有多個任務(wù),例如一個MapReduce程序可以有多個Map任務(wù)和多個Reduce任務(wù)变秦。Container
YARN中資源分配的基本單位成榜,封裝了CPU和內(nèi)存資源的一個容器,相當(dāng)于一個Task運行環(huán)境的抽象蹦玫。ApplicationMaster
應(yīng)用程序管理者主要負(fù)責(zé)應(yīng)用程序的管理赎婚,以后臺進(jìn)程的形式運行刘绣,為應(yīng)用程序向ResourceManager申請資源(CPU、內(nèi)存)挣输,并將資源分配給所管理的應(yīng)用程序的Task纬凤。
YARN集群中應(yīng)用程序的執(zhí)行流程如圖。
客戶端提交應(yīng)用程序到ResourceManager撩嚼。
ResourceManager分配用于運行ApplicationMaster的Container停士,然后與NodeManager通 信,要求它在該Container中啟動ApplicationMaster完丽。ApplicationMaster啟動后恋技,它將負(fù)責(zé)此應(yīng)用程序的整個生命周期。
ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應(yīng)用程序的運行狀態(tài))并請求運行應(yīng)用程序各個Task所需的Container(資源請求是對一些Container的請求)逻族。如果符合條件蜻底,ResourceManager會分配給 ApplicationMaster所需的Container。
ApplicationMaster請求NodeManager使用這些Container來運行應(yīng)用程序的相應(yīng)Task(即將Task發(fā)布到指定的Container中運行)聘鳞。
1.2 Flink運行時架構(gòu)——Standalone架構(gòu)
Flink Standalone模式為經(jīng)典的主從(Master/Slave)架構(gòu)薄辅,資源調(diào)度是Flink自己實現(xiàn)的。集群啟動后抠璃,主節(jié)點上會啟動一個JobManager進(jìn)程站楚,類似YARN集群的ResourceManager,因此主節(jié)點也稱為JobManager節(jié)點鸡典;各個從節(jié)點上會啟動一個TaskManager進(jìn)程源请,類似YARN集群的NodeManager枪芒,因此從節(jié)點也稱為TaskManager節(jié)點彻况。
從Flink 1.6版本開始,將主節(jié)點上的進(jìn)程名稱改為了StandaloneSessionClusterEntrypoint舅踪,從節(jié)點的進(jìn)程名稱改為了TaskManagerRunner纽甘,在這里為了方便使用,仍然沿用之前版本的稱呼抽碌,即JobManager和TaskManager悍赢。Flink Standalone模式的運行架構(gòu)如圖:
Client接收到Flink應(yīng)用程序后,將作業(yè)提交給JobManager货徙。JobManager要做的第一件事就是分配Task(任務(wù))所需的資源左权。完成資源分配后,Task將被JobManager提交給相應(yīng)的TaskManager痴颊,TaskManager會啟動線程開始執(zhí)行赏迟。在執(zhí)行過程中,TaskManager會持續(xù)向JobManager匯報狀態(tài)信息蠢棱,例如開始執(zhí)行锌杀、進(jìn)行中或完成等狀態(tài)甩栈。作業(yè)執(zhí)行完成后,結(jié)果將通過JobManager發(fā)送給Client糕再。
Client
Client是提交作業(yè)的客戶端量没,雖然不是運行時和作業(yè)執(zhí)行時的一部分,但它負(fù)責(zé)準(zhǔn)備和提交作業(yè)到JobManager突想,它可以運行在任何機(jī)器上殴蹄,只要與JobManager環(huán)境連通即可。提交完成后猾担,Client可以斷開連接饶套,也可以保持連接來接收進(jìn)度報告。JobManager
JobManager根據(jù)客戶端提交的應(yīng)用將應(yīng)用分解為子任務(wù)垒探,從資源管理器(YARN等)申請所需的計算資源妓蛮,然后分發(fā)任務(wù)到TaskManager執(zhí)行,并跟蹤作業(yè)的執(zhí)行狀態(tài)等圾叼。TaskManager
TaskManager是Flink集群的工作進(jìn)程蛤克。Task被調(diào)度到TaskManager上執(zhí)行。TaskManager 相互通信夷蚊,只為在后續(xù)的Task之間交換數(shù)據(jù)构挤。Task
Flink中的每一個操作算子稱為一個Task(任務(wù)),例如單詞計數(shù)中使用的flatMap()算子惕鼓、map()算子等筋现。每個Task在一個JVM線程中執(zhí)行。Task Slot
TaskManager為了控制執(zhí)行的Task數(shù)量箱歧,將計算資源(內(nèi)存)劃分為多個Task Slot(任務(wù)槽)矾飞,每個Task Slot代表TaskManager的一份固定內(nèi)存資源,Task則在Task Slot中執(zhí)行呀邢。例如洒沦,具有3個Task Slot的TaskManager會將其管理的內(nèi)存資源分成3等份給每個Task Slot。
每個Task Slot只對應(yīng)一個執(zhí)行線程价淌。
1.3 Flink運行時架構(gòu)——On YARN架構(gòu)
Flink On YARN模式遵循YARN的官方規(guī)范申眼,YARN只負(fù)責(zé)資源的管理和調(diào)度,運行哪種應(yīng)用程序由用戶自己實現(xiàn)蝉衣,因此可能在YARN上同時運行MapReduce程序括尸、Spark程序、Flink程序等。YARN很好地對每一個程序?qū)崿F(xiàn)了資源的隔離,這使得Spark捧存、MapReduce筝闹、Flink等可以運行于同一個集群中戳葵,共享集群存儲資源與計算資源燎孟。
Flink On YARN模式的運行架構(gòu)如圖:
當(dāng)啟動一個Client(客戶端)會話時喇勋,Client首先會上傳Flink應(yīng)用程序JAR包和配置文件到HDFS僚楞。
Client向ResourceManager申請用于運行ApplicationMaster的Container娶眷。
ResourceManager分配用于運行ApplicationMaster的Container似嗤,然后與NodeManager通 信,要求它在該Container中啟動ApplicationMaster(ApplicationMaster與Flink JobManager運行于同一Container中届宠,這樣ApplicationMaster就能知道Flink JobManager的地址)烁落。ApplicationMaster啟動后,它將負(fù)責(zé)此應(yīng)用程序的整個生命周期豌注。另外伤塌,ApplicationMaster還提供了Flink的WebUI服務(wù)。
ApplicationMaster向ResourceManager注冊(注冊后可以通過ResourceManager查看應(yīng)用程序的運行狀態(tài))并請求運行Flink TaskManager所需的Container(資源請求是對一些Container的請求)轧铁。如果符合條件每聪,ResourceManager會分配給 ApplicationMaster所需的Container。ApplicationMaster請求NodeManager使用這些Container來運行Flink TaskManager齿风。各個NodeManager從HDFS中下載Flink JAR包和配置文件药薯。至此,F(xiàn)link相關(guān)任務(wù)就可以運行了救斑。
此外童本,各個運行中的Flink TaskManager會通過RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進(jìn)度。
2脸候、Flink任務(wù)調(diào)度原理
2.1 Flink任務(wù)調(diào)度原理——任務(wù)鏈
Flink中的每一個操作算子稱為一個Task(任務(wù))穷娱,算子的每個具體實例則稱為SubTask(子任務(wù)),SubTask是Flink中最小的處理單元运沦,多個SubTask可能在不同的機(jī)器上執(zhí)行泵额。一個TaskManager進(jìn)程包含一個或多個執(zhí)行線程,用于執(zhí)行SubTask茶袒。TaskManager中的一個Task Slot對應(yīng)一個執(zhí)行線程梯刚,一個執(zhí)行線程可以執(zhí)行一個或多個SubTask,如下圖所示薪寓。
由于每個SubTask只能在一個線程中執(zhí)行,為了能夠減少線程間切換和緩沖的開銷澜共,在降低延遲的同時提高整體吞吐量向叉,F(xiàn)link可以將多個連續(xù)的SubTask鏈接成一個Task在一個線程中執(zhí)行。這種將多個SubTask連在一起的方式稱為任務(wù)鏈嗦董。下方右圖中母谎,一個Source類算子的SubTask和一個map()算子的SubTask連在了一起,組成了任務(wù)鏈京革。
2.2 Flink任務(wù)調(diào)度原理——并行度
Flink應(yīng)用程序可以在分布式集群上并行運行奇唤,其中每個算子的各個并行實例會在單獨的線程中獨立運行幸斥,并且通常情況下會在不同的機(jī)器上運行。
為了充分利用計算資源咬扇,提高計算效率甲葬,可以增加算子的實例數(shù)(SubTask數(shù)量)。一個特定算子的SubTask數(shù)量稱為該算子的并行度懈贺,且任意兩個算子的并行度之間是獨立的经窖,不同算子可能擁有不同的并行度。例如梭灿,將Source算子画侣、map()算子、keyby()/window()/apply()算子的并行度設(shè)置為2堡妒,Sink算子的并行度設(shè)置為1配乱,運行效果如圖:
由于一個Task Slot對應(yīng)一個執(zhí)行線程,因此并行度為2的算子的SubTask將被分配到不同的Task Slot中執(zhí)行皮迟。假設(shè)一個作業(yè)圖(JobGraph)有A宪卿、B、C万栅、D佑钾、E五個算子,其中A烦粒、B休溶、D的并行度為4,C扰她、E的并行度為2兽掰,該作業(yè)在TaskManager中的詳細(xì)數(shù)據(jù)流程可能如圖:
Flink中并行度的設(shè)置有4種級別:算子級別、執(zhí)行環(huán)境(Execution Environment)級別徒役、客戶端(命令行)級別孽尽、系統(tǒng)級別。
- 算子級別
每個算子忧勿、Source和Sink都可以通過調(diào)用setParallelism()方法指定其并行度杉女。例如以下代碼設(shè)置flatMap()算子的并行度為2:
data.flatMap(_.split(" ")).setParallelism(2)
- 執(zhí)行環(huán)境級別
調(diào)用執(zhí)行環(huán)境對象的setParallelism()方法可以指定Flink應(yīng)用程序中所有算子的默認(rèn)并行度,代碼如下:
val env=ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
- 客戶端(命令行)級別
在向集群提交Flink應(yīng)用程序時使用-p選項可以指定并行度鸳吸。例如以下提交命令:
bin/flink run -p 2 WordCount.jar
- 系統(tǒng)級別
影響所有運行環(huán)境的系統(tǒng)級別的默認(rèn)并行度可以在配置文件flink-conf.yaml中的parallelism.default屬性中指定熏挎,默認(rèn)為1。
4種并行度級別的作用順序為:算子級別>執(zhí)行環(huán)境級別>客戶端級別>系統(tǒng)級別晌砾。
2.3 Flink任務(wù)調(diào)度原理——共享Task Slot
默認(rèn)情況下坎拐,F(xiàn)link允許SubTask之間共享Task Slot,即使它們是不同Task(算子)的SubTask,只要它們來自同一個作業(yè)(Job)即可哼勇。在沒有共享Task Slot的情況下都伪,簡單的SubTask(source()、map()等)將會占用和復(fù)雜的SubTask(keyBy()积担、window()等)一樣多的資源陨晶,通過共享Task Slot可以充分利用Task Slot的資源,同時確保繁重的SubTask在TaskManager之間公平地獲取資源磅轻。例如珍逸,將算子并行度從2增加到6,并行效果如圖:
最左側(cè)的Task Slot負(fù)責(zé)作業(yè)的整個管道(Pipeline)聋溜。管道用于連接多個算子谆膳,將一個算子的執(zhí)行結(jié)果輸出給下一個算子。
2.4 Flink任務(wù)調(diào)度原理——數(shù)據(jù)流
一個Flink應(yīng)用程序會被映射成邏輯數(shù)據(jù)流(Dataflow)撮躁,而Dataflow都是以一個或多個Source開始漱病、以一個或多個Sink結(jié)束的,且始終包括Source把曼、Transformation杨帽、Sink三部分。Dataflow描述了數(shù)據(jù)如何在不同算子之間流動嗤军,將這些算子用帶方向的直線連接起來會形成一個關(guān)于計算路徑的有向無環(huán)圖注盈,稱為DAG(Directed Acyclic Graph,有向無環(huán)圖)或Dataflow圖叙赚。各個算子的中間數(shù)據(jù)會被保存在內(nèi)存中老客,如圖:
假設(shè)一個Flink應(yīng)用程序在讀取數(shù)據(jù)后先對數(shù)據(jù)進(jìn)行了map()操作,然后進(jìn)行了keyBy()/window()/apply()操作震叮,最后將計算結(jié)果輸出到了指定的文件中胧砰,則該程序的Dataflow圖如圖:
假設(shè)該程序的Source、map()苇瓣、keyBy()/window()/apply()算子的并行度為2尉间,Sink算子的并行度為1,則該程序的邏輯數(shù)據(jù)流圖击罪、物理(并行)數(shù)據(jù)流圖和Flink優(yōu)化后的數(shù)據(jù)流圖如圖哲嘲。
Flink應(yīng)用程序在執(zhí)行時,為了降低線程開銷外邓,會將多個SubTask連接在一起組成任務(wù)鏈撤蚊,在一個線程中運行。對于物理(并行)數(shù)據(jù)流來說损话,F(xiàn)link執(zhí)行時會對其進(jìn)行優(yōu)化,將Source[1]和map()[1]、Source[2]和map()[2]分別連接成一個任務(wù)丧枪,這是因為Source和map()之間采用了一對一的直連模式光涂,而且沒有任何的重分區(qū),它們之間可以直接通過緩存進(jìn)行數(shù)據(jù)傳遞拧烦,而不需要通過網(wǎng)絡(luò)或序列化(如果不使用任務(wù)鏈忘闻,Source和map()可能在不同的機(jī)器上,它們之間的數(shù)據(jù)傳遞就需要通過網(wǎng)絡(luò))恋博。這種優(yōu)化在很大程度上提升了Flink的執(zhí)行效率齐佳。
2.5 Flink任務(wù)調(diào)度原理——執(zhí)行圖
Flink應(yīng)用程序執(zhí)行時會根據(jù)數(shù)據(jù)流生成多種圖,每種圖對應(yīng)了作業(yè)的不同階段债沮,根據(jù)不同圖的生成順序炼吴,主要分為4層:StreamGraph→JobGraph→ExecutionGraph→物理執(zhí)行圖,如圖疫衩。
StreamGraph:流圖硅蹦。使用DataStream API編寫的應(yīng)用程序生成的最初的圖代表程序的拓?fù)浣Y(jié)構(gòu),描述了程序的執(zhí)行邏輯闷煤。StreamGraph在Flink客戶端中生成童芹,在客戶端應(yīng)用程序最后調(diào)用execute()方法時觸發(fā)StreamGraph的構(gòu)建。
JobGraph:作業(yè)圖鲤拿。所有高級別API都需要轉(zhuǎn)換為JobGraph假褪。StreamGraph經(jīng)過優(yōu)化(例如任務(wù)鏈)后生成了JobGraph,以提高執(zhí)行效率近顷。StreamGraph和JobGraph都是在本地客戶端生成的數(shù)據(jù)結(jié)構(gòu)生音,而JobGraph需要被提交給JobManager進(jìn)行解析。
ExecutionGraph:執(zhí)行圖幕庐。JobManager對JobGraph進(jìn)行解析后生成的并行化執(zhí)行圖是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)久锥。它包含對每個中間數(shù)據(jù)集或數(shù)據(jù)流、每個并行任務(wù)以及它們之間的通信的描述异剥。
物理執(zhí)行圖:JobManager根據(jù)ExecutionGraph對作業(yè)進(jìn)行調(diào)度后瑟由,在各個TaskManager上部署Task后形成的“圖”。物理執(zhí)行圖并不是一個具體的數(shù)據(jù)結(jié)構(gòu)冤寿,而是各個Task分布在不同的節(jié)點上所形成的物理上的關(guān)系表示歹苦。
2.6 Flink任務(wù)調(diào)度原理——執(zhí)行計劃
Flink的優(yōu)化器會根據(jù)數(shù)據(jù)量或集群機(jī)器數(shù)等的不同自動地為程序選擇執(zhí)行策略。因此督怜,準(zhǔn)確地了解Flink如何執(zhí)行編寫的應(yīng)用程序是很有必要的殴瘦。
接下來我們對流處理單詞計數(shù)例子的代碼進(jìn)行更改,從執(zhí)行環(huán)境級別設(shè)置并行度 為2号杠,代碼如下:
val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
//將最后的觸發(fā)任務(wù)執(zhí)行代碼env.execute("StreamWordCount")改為:
println(env.getExecutionPlan)//打印計劃描述
直接在本地運行程序蚪腋,將從控制臺打印應(yīng)用程序的邏輯執(zhí)行計劃對應(yīng)的JSON描述丰歌,JSON字符串內(nèi)容如下:
Flink為執(zhí)行計劃提供了可視化工具,它可以把用JSON格式表示的作業(yè)執(zhí)行計劃以圖的形式展現(xiàn)屉凯,并且其中包含完整的執(zhí)行策略標(biāo)注立帖。
將上述JSON字符串粘貼到可視化工具網(wǎng)址(http://flink.apache.org/visualizer/)提供的文本框中,可將JSON字符串解析為可視化圖悠砚,該可視化圖對應(yīng)的是StreamGraph晓勇,如圖:
將應(yīng)用程序提交到Flink集群后,在Flink的WebUI中還可以看到另一張可視化圖灌旧,即JobGraph绑咱,如圖所示。此處的單詞數(shù)據(jù)來源于HDFS存儲系統(tǒng)枢泰,而不是本地文件描融。
3、Flink數(shù)據(jù)分區(qū)
在Flink中宗苍,數(shù)據(jù)流或數(shù)據(jù)集被劃分成多個獨立的子集稼稿,這些子集分布到了不同的節(jié)點上,而每一個子集稱為分區(qū)(Partition)讳窟。因此可以說让歼,F(xiàn)link中的數(shù)據(jù)流或數(shù)據(jù)集是由若干個分區(qū)組成的。數(shù)據(jù)流或數(shù)據(jù)集與分區(qū)的關(guān)系如圖:
通過將每個記錄分配給一個或多個分區(qū)來把數(shù)據(jù)流或數(shù)據(jù)集劃分為多個分區(qū)丽啡。在運行期間谋右,Task會消費數(shù)據(jù)流或數(shù)據(jù)集的分區(qū)。改變數(shù)據(jù)流或數(shù)據(jù)集分區(qū)方式的轉(zhuǎn)換通常稱為重分區(qū)补箍。
3.1 Flink數(shù)據(jù)分區(qū)——分區(qū)數(shù)量
運行期間改执,每個數(shù)據(jù)記錄將被分配給一個或多個分區(qū),各個分區(qū)中的數(shù)據(jù)可以并行計算坑雅。數(shù)據(jù)是由上游算子的某個實例(SubTask)發(fā)往下游算子的一個或多個實例辈挂,而一個算子實例只負(fù)責(zé)計算一個分區(qū)的數(shù)據(jù)。因此裹粤,分區(qū)的數(shù)量是由下游算子的實例數(shù)量(并行度)決定的终蒂,發(fā)往下游算子的數(shù)據(jù)分區(qū)數(shù)量等于下游算子的實例數(shù)量。
如圖所示遥诉,上游Source算子的并行度為1拇泣,下游map()算子的并行度為2,數(shù)據(jù)由Source發(fā)往map()矮锈,因此將分成兩個分區(qū)霉翔,map()的兩個實例各自執(zhí)行一個分區(qū)的數(shù)據(jù)。
數(shù)據(jù)分區(qū)的一個原則是使得分區(qū)的數(shù)量盡量等于集群節(jié)點CPU的核心數(shù)量苞笨,而前面提到過债朵,算子的并行度應(yīng)盡量等于集群節(jié)點CPU的核心數(shù)量子眶,可見兩者保持一致。
3.2 Flink數(shù)據(jù)分區(qū)——分區(qū)策略
Flink分區(qū)策略決定了一條數(shù)據(jù)如何發(fā)送給下游算子的不同實例葱弟,或者說如何在下游算子不同實例之間進(jìn)行數(shù)據(jù)劃分壹店。程序運行時猜丹,系統(tǒng)會根據(jù)算子的語義和配置的并行度自動選擇數(shù)據(jù)的分區(qū)策略芝加。當(dāng)然也可以在程序中顯式指定分區(qū)策略。
Flink常見的分區(qū)策略如下射窒。
- 轉(zhuǎn)發(fā)策略
在上游算子實例和下游算子實例之間一對一地進(jìn)行數(shù)據(jù)傳輸藏杖。這種策略不會產(chǎn)生重分區(qū)(改變數(shù)據(jù)流或數(shù)據(jù)集分區(qū)方式的轉(zhuǎn)換通常稱為重分區(qū)),且可以避免網(wǎng)絡(luò)傳輸脉顿,以提高傳輸效率蝌麸。假設(shè)上游算子A和下游算子B的并行度都為2,使用轉(zhuǎn)發(fā)策略的效果如圖所示艾疟。
- 廣播策略
上游算子實例的每個數(shù)據(jù)記錄都會發(fā)往下游算子的所有實例来吩。這種策略會把數(shù)據(jù)復(fù)制多份,向下游算子的每個實例發(fā)送一份蔽莱,且涉及網(wǎng)絡(luò)傳輸弟疆,代價較高。使用廣播策略的效果如圖:
- 鍵值策略
根據(jù)數(shù)據(jù)記錄中的鍵對數(shù)據(jù)進(jìn)行重分區(qū)盗冷,鍵相同的數(shù)據(jù)記錄一定會被發(fā)送給下游同一個算子實例怠苔,鍵不同的數(shù)據(jù)記錄可能會被發(fā)送到下游不同的算子實例,也可能會被發(fā)送到下游同一個算子實例仪糖。這種策略要求數(shù)據(jù)記錄的格式為(鍵,值)形式柑司,如圖:
- 隨機(jī)策略
將數(shù)據(jù)記錄進(jìn)行隨機(jī)重分區(qū),數(shù)據(jù)記錄會被均勻分配到下游算子的每個實例锅劝。這種策略可以實現(xiàn)計算任務(wù)的負(fù)載均衡攒驰,如圖:
- 全局策略
將上游所有數(shù)據(jù)記錄發(fā)送到下游第一個算子實例,如圖:
- 自定義策略
如果內(nèi)置的分區(qū)策略不能滿足當(dāng)前需求故爵,則可以在程序中自定義分區(qū)策略玻粪。
第三章 Flink安裝及部署
1. Flink集群搭建
Flink可以在Linux、macOS和Windows上運行稠集。前提條件是集群各節(jié)點提前安裝JDK8以上版本奶段,并配置好SSH免密登錄,因為集群各節(jié)點之間需要相互通信剥纷,F(xiàn)link主節(jié)點需要對其他節(jié)點進(jìn)行遠(yuǎn)程管理和監(jiān)控痹籍。
從Flink官網(wǎng)下載頁面(https://flink.apache.org/downloads.html)下載二進(jìn)制安裝文件,并選擇對應(yīng)的Scala版本晦鞋,此處選擇Apache Flink 1.13.0 for Scala 2.11(Flink版本為1.13.0蹲缠,使用的Scala版本為2.11)棺克。
由于當(dāng)前版本的Flink不包含Hadoop相關(guān)依賴庫,如果需要結(jié)合Hadoop(例如讀取HDFS中的數(shù)據(jù))线定,還需要下載預(yù)先捆綁的Hadoop JAR包娜谊,并將其放置在Flink安裝目錄的lib目錄中。
此處選擇Pre-bundled Hadoop 2.8.3(適用于Hadoop 2.8.3)斤讥,如圖:
接下來使用3個節(jié)點(主機(jī)名分別為centos01纱皆、centos02、centos03)講解Flink各種運行模式的搭建芭商。3個節(jié)點的主機(jī)名與IP的對應(yīng)關(guān)系如表
1.1 Flink本地模式
接下來講解在CentOS 7操作系統(tǒng)中搭建Flink本地模式派草。
- 上傳解壓安裝包
將下載的Flink安裝包flink-1.13.0-bin-scala_2.11.tgz上傳到centos01節(jié)點的/opt/softwares目錄,然后進(jìn)入該目錄,執(zhí)行以下命令將其解壓到目錄/opt/modules中。
$ tar -zxvf flink-1.13.0-bin-scala_2.11.tgz -C /opt/modules/
- 啟動Flink
進(jìn)入Flink安裝目錄窄陡,執(zhí)行以下命令啟動Flink:
$ bin/start-cluster.sh
啟動后,使用jps命令查看Flink的JVM進(jìn)程鉴竭,命令如下:
$ jps
13309 StandaloneSessionClusterEntrypoint
13599 TaskManagerRunner
若出現(xiàn)上述進(jìn)程,則代表啟動成功岸浑。StandaloneSessionClusterEntrypoint為Flink主進(jìn)程搏存,即JobManager;TaskManagerRunner為Flink從進(jìn)程助琐,即TaskManager祭埂。
- 查看WebUI
在瀏覽器中訪問服務(wù)器8081端口即可查看Flink的WebUI,此處訪問地址http://192.168.170.133:8081/兵钮,如圖:
從WebUI中可以看出蛆橡,當(dāng)前本地模式的Task Slot數(shù)量和TaskManager數(shù)量都為1(Task Slot數(shù)量默認(rèn)為1)。
1.2 Flink集群搭建——Standalone模式
Flink Standalone模式的搭建需要在集群的每個節(jié)點都安裝Flink掘譬,集群角色分配如表:
集群搭建的操作步驟如下:
- 上傳解壓安裝包
將下載的Flink安裝包flink-1.13.0-bin-scala_2.11.tgz上傳到centos01節(jié)點的/opt/softwares目錄泰演,然后進(jìn)入該目錄,執(zhí)行以下命令將其解壓到目錄/opt/modules中葱轩。
$ tar -zxvf flink-1.13.0-bin-scala_2.11.tgz -C /opt/modules/
- 修改配置文件
Flink的配置文件都存放于安裝目錄下的conf目錄睦焕,進(jìn)入該目錄,執(zhí)行以下操作靴拱。
(1)修改flink-conf.yaml文件
$ vim conf/flink-conf.yaml
將文件中jobmanager.rpc.address屬性的值改為centos01垃喊,命令如下:
jobmanager.rpc.address: centos01
上述配置表示指定集群主節(jié)點(JobManager)的主機(jī)名(或IP),此處為centos01袜炕。
(2)修改workers文件
workers文件必須包含所有需要啟動的TaskManager節(jié)點的主機(jī)名本谜,且每個主機(jī)名占一行。
執(zhí)行以下命令修改workers文件:
$ vim conf/workers
改為以下內(nèi)容:
centos02
centos03
上述配置表示將centos02和centos03節(jié)點設(shè)置為集群的從節(jié)點(TaskManager節(jié)點)偎窘。
- 復(fù)制Flink安裝文件到其他節(jié)點
在centos01節(jié)點中進(jìn)入/opt/modules/目錄執(zhí)行以下命令乌助,將Flink安裝文件復(fù)制到其他節(jié)點:
$ scp -r flink-1.13.0/ centos02:/opt/modules/
$ scp -r flink-1.13.0/ centos03:/opt/modules/
- 啟動Flink集群
在centos01節(jié)點上進(jìn)入Flink安裝目錄溜在,執(zhí)行以下命令啟動Flink集群:
$ bin/start-cluster.sh
啟動完畢后,分別在各節(jié)點執(zhí)行jps命令他托,查看啟動的Java進(jìn)程掖肋。若各節(jié)點存在以下進(jìn)程,則說明集群啟動成功赏参。
centos01節(jié)點:StandaloneSessionClusterEntrypoint
centos02節(jié)點:TaskManagerRunner
centos03節(jié)點:TaskManagerRunner
- 查看WebUI
集群啟動后志笼,在瀏覽器中訪問JobManager節(jié)點的8081端口即可查看Flink的WebUI,此處訪問地址http://192.168.170.133:8081/登刺,如圖:
從WebUI中可以看出籽腕,當(dāng)前集群總的Task Slot數(shù)量(每個節(jié)點的Task Slot數(shù)量默認(rèn)為1)和TaskManager數(shù)量都為2。
1.3 Flink集群搭建——On YARN模式
Flink On YARN模式的搭建比較簡單纸俭,僅需要在YARN集群的一個節(jié)點上安裝Flink即可,該節(jié)點可作為提交Flink應(yīng)用程序到Y(jié)ARN集群的客戶端南窗。
若要在YARN上運行Flink應(yīng)用揍很,則需要注意以下幾點:
1)Hadoop版本應(yīng)在2.2以上。
2)必須事先確保環(huán)境變量文件中配置了HADOOP_CONF_DIR万伤、YARN_CONF_DIR或者HADOOP_HOME窒悔,F(xiàn)link客戶端會通過該環(huán)境變量讀取YARN和HDFS的配置信息,以便正確加載Hadoop配置以訪問YARN敌买,否則將啟動失敗简珠。
3)需要下載預(yù)先捆綁的Hadoop JAR包,并將其放置在Flink安裝目錄的lib目錄中虹钮,本例使用flink-shaded-hadoop-2-uber-2.8.3-10.0.jar聋庵。具體下載方式見3.1節(jié)的Flink集群搭建。
4)需要提前將HDFS和YARN集群啟動芙粱。
本例使用的Hadoop集群各節(jié)點的角色分配如表:
在Flink On YARN模式中祭玉,根據(jù)作業(yè)的運行方式不同,又分為兩種模式:Flink YARN Session模式和Flink Single Job(獨立作業(yè))模式春畔。
Flink YARN Session模式需要先在YARN中啟動一個長時間運行的Flink集群脱货,也稱為Flink YARN Session集群,該集群會常駐在YARN集群中律姨,除非手動停止振峻。客戶端向Flink YARN Session集群中提交作業(yè)時择份,相當(dāng)于連接到一個預(yù)先存在的扣孟、長期運行的Flink集群,該集群可以接受多個作業(yè)提交缓淹。即使所有作業(yè)完成后哈打,集群(和JobManager)仍將繼續(xù)運行直到手動停止塔逃。該模式下,F(xiàn)link會向YARN一次性申請足夠多的資源料仗,資源永久保持不變湾盗,如果資源被占滿,則下一個作業(yè)無法提交立轧,只能等其中一個作業(yè)執(zhí)行完成后釋放資源格粪,如圖:
擁有一個預(yù)先存在的集群可以節(jié)省大量時間申請資源和啟動TaskManager。作業(yè)可以使用現(xiàn)有資源快速執(zhí)行計算是非常重要的氛改。
Flink Single Job模式不需要提前啟動Flink YARN Session集群帐萎,直接在YARN上提交Flink作業(yè)即可。每一個作業(yè)會根據(jù)自身情況向YARN申請資源胜卤,不會影響其他作業(yè)運行疆导,除非整個YARN集群已無任何資源。并且每個作業(yè)都有自己的JobManager和TaskManager葛躏,相當(dāng)于為每個作業(yè)提供了一個集群環(huán)境澈段,當(dāng)作業(yè)結(jié)束后,對應(yīng)的組件也會同時釋放舰攒。該模式不會額外占用資源败富,使資源利用率達(dá)到最大,在生產(chǎn)環(huán)境中推薦使用這種模式摩窃,如圖:
Flink Single Job模式適合長期運行兽叮、具有高穩(wěn)定性要求且對較長的啟動時間不敏感的大型作業(yè)。
- Flink YARN Session模式操作
(1)啟動Flink YARN Session集群
在啟動HDFS和YARN集群后猾愿,在YARN集群主節(jié)點(此處為centos01節(jié)點)安裝好Flink鹦聪,進(jìn)入Flink主目錄執(zhí)行以下命令,即可啟動Flink YARN Session集群:
$ bin/yarn-session.sh -jm 1024 -tm 2048
上述命令中的參數(shù)-jm表示指定JobManager容器的內(nèi)存大蟹梭啊(單位為MB)椎麦,參數(shù)-tm表示指定TaskManager容器的內(nèi)存大小(單位為MB)材彪。
啟動完畢后观挎,會在啟動節(jié)點(此處為centos01節(jié)點)產(chǎn)生一個名為FlinkYarnSessionCli的進(jìn)程,該進(jìn)程是Flink客戶端進(jìn)程段化;在其中一個NodeManager節(jié)點產(chǎn)生一個名為YarnSessionClusterEntrypoint的進(jìn)程嘁捷,該進(jìn)程是Flink JobManager進(jìn)程。而Flink TaskManager進(jìn)程不會啟動显熏,在后續(xù)向集群提交作業(yè)時才會啟動雄嚣。例如,啟動完畢后查看centos01節(jié)點的進(jìn)程可能如下:
$ jps
7232 NodeManager
6626 NameNode
14422 YarnSessionClusterEntrypoint
14249 FlinkYarnSessionCli
6956 SecondaryNameNode
7116 ResourceManager
17612 Jps
6750 DataNode
此時可以在瀏覽器訪問YARN ResourceManager節(jié)點的8088端口,此處地址為http://192.168.170.133:8088/缓升,在YARN的WebUI中可以查看當(dāng)前Flink應(yīng)用程序(Flink YARN Session集群)的運行狀態(tài)鼓鲁,如圖
從圖中可以看出,一個Flink YARN Session集群實際上就是一個長時間在YARN中運行的應(yīng)用程序(Application)港谊,后面的Flink作業(yè)也會提交到該應(yīng)用程序中骇吭。
(2)提交Flink作業(yè)
接下來向Flink YARN Session集群提交Flink自帶的單詞計數(shù)程序。
首先在HDFS中準(zhǔn)備/input/word.txt文件歧寺,內(nèi)容如下:
hello hadoop
hello java
hello scala
java
然后在Flink客戶端(centos01節(jié)點)中執(zhí)行以下命令燥狰,提交單詞計數(shù)程序到Flink YARN Session集群:
$ bin/flink run ./examples/batch/WordCount.jar \
-input hdfs://centos01:9000/input/word.txt \
-output hdfs://centos01:9000/result.txt
上述命令通過參數(shù)-input指定輸入數(shù)據(jù)目錄,-output指定輸出數(shù)據(jù)目錄斜筐。
在執(zhí)行過程中龙致,查看Flink YARN Session集群的WebUI,如圖:
當(dāng)作業(yè)執(zhí)行完畢后顷链,查看HDFS/result.txt文件中的結(jié)果目代,如圖:
(3)分離模式
如果希望將啟動的Flink YARN Session集群在后臺獨立運行,與Flink客戶端進(jìn)程脫離關(guān)系蕴潦,可以在啟動時添加-d或--detached參數(shù)像啼,表示以分離模式運行作業(yè),即Flink客戶端在啟動Flink YARN Session集群后潭苞,就不再屬于YARN集群的一部分。例如以下代碼:
$ bin/yarn-session.sh -jm 1024 -tm 2048 -d
(4)進(jìn)程綁定
與分離模式相反真朗,當(dāng)使用分離模式啟動Flink YARN Session集群后此疹,如果需要再次將Flink客戶端與Flink YARN Session集群綁定,則使用-id或--applicationId參數(shù)指定Flink YARN Session集群在YARN中對應(yīng)的applicationId即可遮婶,命令格式如下:
$ bin/yarn-session.sh –id [applicationId]
例如蝗碎,將Flink客戶端(執(zhí)行綁定命令的本地客戶端)與applicationId為application_ 1593999118637_0009的Flink YARN Session集群綁定,命令如下:
$ bin/yarn-session.sh -id application_1593999118637_0009
執(zhí)行上述命令后旗扑,在Flink客戶端會產(chǎn)生一個名為FlinkYarnSessionCli的客戶端進(jìn)程蹦骑。此時就可以在Flink客戶端對Flink YARN Session集群進(jìn)行操作,包括執(zhí)行停止命令等臀防。例如執(zhí)行Ctrl+C命令或輸入stop命令即可停止Flink YARN Session集群眠菇。
- Flink Single Job模式操作
Flink Single Job模式可以將單個作業(yè)直接提交到Y(jié)ARN中,每次提交的Flink作業(yè)都是一個獨立的YARN應(yīng)用程序袱衷,應(yīng)用程序運行完畢后釋放資源捎废,這種模式適合批處理應(yīng)用。
例如致燥,在Flink客戶端(centos01節(jié)點)中執(zhí)行以下命令登疗,以Flink Single Job模式提交單詞計數(shù)程序到Y(jié)ARN集群:
$ bin/flink run -m yarn-cluster examples/batch/WordCount.jar \
-input hdfs://centos01:9000/input/word.txt \
-output hdfs://centos01:9000/result.txt
上述命令通過參數(shù)-m指定使用YARN集群(即以Flink Single Job模式提交),-input指定輸入數(shù)據(jù)目錄,-output指定輸出數(shù)據(jù)目錄辐益。
提交完畢后断傲,可以在瀏覽器訪問YARN ResourceManager節(jié)點的8088端口,此處地址為http://192.168.170.133:8088/智政,在YARN的WebUI中可以查看當(dāng)前Flink應(yīng)用程序的運行狀態(tài)认罩,如圖:
2. Flink HA模式
3. Flink命令行界面
4. Flink應(yīng)用提交
5. Flink Shell的使用
第四章 Flink DataStream API
01 基本概念
DataStream API的名稱來自一個特殊的DataStream類,該類用于表示Flink程序中的數(shù)據(jù)集合女仰。你可以將它視為包含重復(fù)項的不可變數(shù)據(jù)集合猜年。這些數(shù)據(jù)可以是有限的,也可以是無限的疾忍,用于處理這些數(shù)據(jù)的API是相同的乔外。
Flink中使用DataSet和DataStream表示數(shù)據(jù)的基本抽象,可以將它們視為包含特定類型的元素集合一罩,類似于常規(guī)Java集合杨幼。但不同的是,集合數(shù)據(jù)不可變聂渊,集合一旦被創(chuàng)建差购,就不能添加或刪除元素。對于DataSet汉嗽,數(shù)據(jù)是有限的欲逃,而對于DataStream,元素的數(shù)量可以是無限的饼暑。
DataSet和DataStream數(shù)據(jù)集都是分布式數(shù)據(jù)集稳析,分布式數(shù)據(jù)集是指:一個數(shù)據(jù)集存儲在不同的服務(wù)器節(jié)點上,每個節(jié)點存儲數(shù)據(jù)集的一部分弓叛。例如彰居,將數(shù)據(jù)集(hello,world,scala,spark,love,spark,happy)存儲在3個節(jié)點上,節(jié)點一存儲(hello,world)撰筷,節(jié)點二存儲(scala,spark,love)陈惰,節(jié)點三存儲(spark,happy),這樣對3個節(jié)點的數(shù)據(jù)可以并行計算毕籽,并且3個節(jié)點的數(shù)據(jù)共同組成了一個DataSet/DataStream抬闯,如圖:
分布式數(shù)據(jù)集類似于HDFS中的文件分塊,不同的塊存儲在不同的節(jié)點上影钉;而并行計算類似于使用MapReduce讀取HDFS中的數(shù)據(jù)并進(jìn)行Map和Reduce操作画髓。Flink包含這兩種功能,并且計算更加靈活平委。
DataSet/DataStream數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中奈虾,并且可以在多次計算時重用。數(shù)據(jù)也可以持久化到磁盤,具有高效的容錯能力肉微。
DataSet的主要特征如下(DataStream同樣擁有):
數(shù)據(jù)是不可變的匾鸥,但可以將DataSet轉(zhuǎn)換成新的DataSet進(jìn)行操作。
數(shù)據(jù)是可分區(qū)的碉纳。DataSet由很多分區(qū)組成勿负,每個分區(qū)對應(yīng)一個Task任務(wù)來執(zhí)行。
對DataSet進(jìn)行操作劳曹,相當(dāng)于對每個分區(qū)進(jìn)行操作奴愉。
DataSet擁有一系列對分區(qū)進(jìn)行計算的函數(shù),稱為算子(關(guān)于算子將在4.6節(jié)詳細(xì)講解)铁孵。
DataSet之間存在依賴關(guān)系锭硼,可以實現(xiàn)管道化,避免了中間數(shù)據(jù)的存儲蜕劝。
在編程時檀头,可以把DataSet/DataStream看作一個數(shù)據(jù)操作的基本單位,而不必關(guān)心數(shù)據(jù)的分布式特性岖沛,F(xiàn)link會自動將其中的數(shù)據(jù)分發(fā)到集群的各個節(jié)點暑始。Flink中對數(shù)據(jù)的操作主要是對DataSet/DataStream的操作(創(chuàng)建、轉(zhuǎn)換婴削、求值等)廊镜。
02 執(zhí)行模式
DataStream API 支持不同的運行時執(zhí)行模式,可以根據(jù)用例的要求和作業(yè)的特征從中進(jìn)行選擇唉俗。DataStream API比較“經(jīng)典”的執(zhí)行行為稱為“流”執(zhí)行模式期升,主要用于需要連續(xù)增量處理并無限期保持在線的無限作業(yè)。
此外互躬,還有一種“批”處理執(zhí)行模式。該模式以一種類似于MapReduce等批處理框架的方式執(zhí)行作業(yè)颂郎,主要用于具有已知固定輸入并且不會連續(xù)運行的有界作業(yè)吼渡。
當(dāng)需要為最終使用無界源運行的代碼編寫測試時,可以使用“流”模式運行有界作業(yè)乓序。在測試情況下使用有界源會更自然寺酪。
不管配置的執(zhí)行模式如何,在有界輸入上執(zhí)行的DataStream應(yīng)用程序都會產(chǎn)生相同的最終結(jié)果替劈。以流模式執(zhí)行的作業(yè)可能會產(chǎn)生增量更新寄雀,而批作業(yè)最終只會產(chǎn)生一個最終結(jié)果。
通過啟用批執(zhí)行陨献,允許Flink應(yīng)用額外的優(yōu)化盒犹。
執(zhí)行模式可以通過execution.runtime-mode屬性來配置。其有3種可能的值:
- STREAMING:典型的DataStream執(zhí)行模式(默認(rèn))。
- BATCH:在DataStream API上以批處理方式執(zhí)行急膀。
- AUTOMATIC:讓系統(tǒng)根據(jù)數(shù)據(jù)源的有界性來決定沮协。
也可以通過使用bin/flink run命令行參數(shù)配置,或在Flink應(yīng)用程序中創(chuàng)建StreamExecutionEnvironment對象時以編程方式指定卓嫂。
通過命令行配置執(zhí)行模式慷暂,代碼如下:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
上述代碼向Flink集群中提交了單詞計數(shù)程序,并指定使用批執(zhí)行模式晨雳。
在Flink應(yīng)用程序中通過代碼配置執(zhí)行模式行瑞,代碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
03 作業(yè)流程
Flink的作業(yè)執(zhí)行流程如圖:
Flink JobManager是Flink集群的主節(jié)點。它包含3個不同的組件:Flink Resource Manager餐禁、Dispatcher血久、運行每個Flink Job的JobMaster。
在一個作業(yè)提交前坠宴,JobManager和TaskManager等進(jìn)程需要先被啟動洋魂。可以在Flink安裝目錄中執(zhí)行bin/start-cluster.sh命令來啟動這些進(jìn)程喜鼓。JobManager和TaskManager被啟動后副砍,TaskManager需要將自己注冊給JobManager中的ResourceManager(資源注冊)。
Flink作業(yè)的具體執(zhí)行流程如下:
- 用戶編寫應(yīng)用程序代碼庄岖,并通過Flink客戶端提交作業(yè)豁翎。程序一般為Java或Scala語言,調(diào)用Flink API構(gòu)建邏輯數(shù)據(jù)流圖隅忿,然后轉(zhuǎn)為作業(yè)圖JobGraph心剥,并附加到StreamExecutionEnvironment中。代碼和相關(guān)配置文件被編譯打包背桐,被提交到JobManager的Dispatcher优烧,形成一個應(yīng)用作業(yè)。
- Dispatcher(JobManager的一個組件)接收到這個作業(yè)链峭,啟動JobManager畦娄,JobManager負(fù)責(zé)本次作業(yè)的各項協(xié)調(diào)工作。
- 接下來JobManager向ResourceManager申請本次作業(yè)所需的資源弊仪。
- JobManager將用戶作業(yè)中的作業(yè)圖JobGraph轉(zhuǎn)化為并行化的物理執(zhí)行圖熙卡,對作業(yè)并行處理并將其子任務(wù)分發(fā)部署到多個TaskManager上執(zhí)行。每個作業(yè)的并行子任務(wù)將在Task Slot中執(zhí)行励饵。至此驳癌,一個Flink作業(yè)就開始執(zhí)行了。
- TaskManager在執(zhí)行計算任務(wù)的過程中可能會與其他TaskManager交換數(shù)據(jù)役听,會使用相應(yīng)的數(shù)據(jù)交換策略颓鲜。同時表窘,TaskManager也會將一些任務(wù)狀態(tài)信息反饋給JobManager,這些信息包括任務(wù)啟動灾杰、運行或終止的狀態(tài)蚊丐、快照的元數(shù)據(jù)等。
04 程序結(jié)構(gòu)
Flink DataStream程序都包含相同的基本部分:
- 獲取執(zhí)行環(huán)境艳吠。
- 加載/創(chuàng)建初始數(shù)據(jù)麦备。
- 對初始數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
- 指定計算結(jié)果的輸出位置昭娩。
- 觸發(fā)程序執(zhí)行凛篙。
StreamExecutionEnvironment是所有Flink流程序的基礎(chǔ)。我們可以在StreamExecutionEnvironment上使用getExecutionEnvironment()創(chuàng)建一個執(zhí)行環(huán)境栏渺。該方法將根據(jù)上下文自動獲取當(dāng)前正確的執(zhí)行環(huán)境呛梆。如果是在IDE中執(zhí)行程序或作為常規(guī)Java程序執(zhí)行,它將創(chuàng)建一個本地環(huán)境磕诊,程序?qū)⒃诒镜貦C(jī)器上執(zhí)行填物。如果將程序打包成一個JAR文件,并通過集群的命令行執(zhí)行它霎终,那么Flink集群管理器將執(zhí)行程序的main方法滞磺,而getExecutionEnvironment()將返回一個集群執(zhí)行環(huán)境。
有多種方法可以為執(zhí)行環(huán)境指定數(shù)據(jù)源莱褒,例如從CSV文件中逐行讀取或從其他數(shù)據(jù)源中讀取击困。按行讀取文本文件中的數(shù)據(jù),代碼如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
上述代碼將得到一個數(shù)據(jù)流,接下來可以在該數(shù)據(jù)流上應(yīng)用轉(zhuǎn)換算子來創(chuàng)建新的派生數(shù)據(jù)流。例如挺峡,將數(shù)據(jù)流中的每個元素轉(zhuǎn)為整數(shù),使用map轉(zhuǎn)換代碼如下:
val input: DataStream [String] = ...
val mapped = input.map { x => x.toInt }
上述代碼通過將原始集合中的每個字符串轉(zhuǎn)換為整數(shù)來創(chuàng)建一個新的數(shù)據(jù)流望薄。
一旦有了包含最終結(jié)果的數(shù)據(jù)流,就可以通過創(chuàng)建接收器將流數(shù)據(jù)寫入外部系統(tǒng)。創(chuàng)建接收器的示例方法代碼如下:
//將數(shù)據(jù)流以字節(jié)數(shù)組的形式寫入Socket
dataStream.writeToSocket()
//將數(shù)據(jù)流寫入標(biāo)準(zhǔn)輸出流(stdout),數(shù)據(jù)流的每個元素將以toString的方式轉(zhuǎn)為字符串
dataStream.print()
完整程序?qū)懲旰笃蟛洌詈笮枰ㄟ^在StreamExecutionEnvironment上調(diào)用execute()來觸發(fā)程序執(zhí)行。根據(jù)執(zhí)行環(huán)境的類型(本地或集群)智末,執(zhí)行將在本地計算機(jī)上觸發(fā)或提交程序以在集群上執(zhí)行。
所有Flink程序都是延遲(惰性)執(zhí)行的:執(zhí)行程序的main()方法時徒河,不會直接進(jìn)行數(shù)據(jù)加載和轉(zhuǎn)換系馆。而是將每個操作添加到數(shù)據(jù)流圖。當(dāng)在執(zhí)行環(huán)境中調(diào)用execute()顯式觸發(fā)執(zhí)行時才會執(zhí)行這些操作顽照。
05 Source數(shù)據(jù)源
5.1 Source數(shù)據(jù)源——基本數(shù)據(jù)源
DataStream API中直接提供了對一些基本數(shù)據(jù)源的支持由蘑,例如文件系統(tǒng)闽寡、Socket連接等;也提供了非常豐富的高級數(shù)據(jù)源連接器(Connector)尼酿,例如Kafka Connector爷狈、Elasticsearch Connector等。用戶也可以實現(xiàn)自定義Connector數(shù)據(jù)源裳擎,以便使Flink能夠與其他外部系統(tǒng)進(jìn)行數(shù)據(jù)交互涎永。
- 文件數(shù)據(jù)源
Flink可以將文件內(nèi)容讀取到系統(tǒng)中,并轉(zhuǎn)換成分布式數(shù)據(jù)集DataStream進(jìn)行處理鹿响。
使用readTextFile(path)方法可以逐行讀取文本文件內(nèi)容羡微,并作為字符串返回,代碼如下:
//第一步:創(chuàng)建流處理的執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//第二步:讀取流數(shù)據(jù)惶我,創(chuàng)建DataStream
val data:DataStream[String]=senv
.readTextFile("hdfs://centos01:9000/input/words.txt")
- Socket數(shù)據(jù)源
通過監(jiān)聽Socket端口接收數(shù)據(jù)創(chuàng)建DataStream妈倔。例如以下代碼從本地的9999端口接收數(shù)據(jù):
//第一步:創(chuàng)建流處理的執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//第二步:讀取流數(shù)據(jù),創(chuàng)建DataStream
val data:DataStream[String]=senv.socketTextStream("localhost",9999)
- 集合數(shù)據(jù)源
從java.util.collection集合創(chuàng)建DataStream绸贡。集合中的所有元素必須是相同類型的盯蝴,例如以下代碼:
//第一步:創(chuàng)建流處理的執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//第二步:讀取流數(shù)據(jù),創(chuàng)建DataStream
val data:DataStream[String]=senv.fromCollection(
List("hello","flink","scala")
)
當(dāng)然听怕,也可以從迭代器中創(chuàng)建DataStream捧挺,例如以下代碼:
//第一步:創(chuàng)建流處理的執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//第二步:讀取流數(shù)據(jù),創(chuàng)建DataStream
val it = Iterator("hello","flink","scala")
val data:DataStream[String]=senv.fromCollection(it)
還可以直接從元素集合中創(chuàng)建DataStream叉跛,例如以下代碼:
//第一步:創(chuàng)建流處理的執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//第二步:讀取流數(shù)據(jù)松忍,創(chuàng)建DataStream
val data:DataStream[String]=senv.fromElements("hello","flink","scala")
5.2 Source數(shù)據(jù)源——高級數(shù)據(jù)源
Flink可以從Kafka、Flume筷厘、Kinesis等數(shù)據(jù)源讀取數(shù)據(jù)鸣峭,使用時需要引入第三方依賴庫。例如酥艳,在Maven工程中引入Flink針對Kafka的API依賴庫摊溶,代碼如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
然后使用addSource()方法接入Kafka數(shù)據(jù)源,代碼示例如下:
val senv = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
val stream = senv.addSource(myConsumer)
5.3 Source數(shù)據(jù)源——自定義數(shù)據(jù)源
在Flink中充石,用戶也可以自定義數(shù)據(jù)源莫换,以滿足不同數(shù)據(jù)源的接入需求。自定義數(shù)據(jù)源有3種方式:
1)實現(xiàn)SourceFunction接口定義非并行數(shù)據(jù)源(單線程)骤铃。SourceFunction是Flink中所有流數(shù)據(jù)源的基本接口拉岁。
2)實現(xiàn)ParallelSourceFunction接口定義并行數(shù)據(jù)源。
3)繼承RichParallelSourceFunction抽象類定義并行數(shù)據(jù)源惰爬。該類已經(jīng)實現(xiàn)了ParallelSourceFunction接口喊暖,是實現(xiàn)并行數(shù)據(jù)源的基類,在執(zhí)行時撕瞧,F(xiàn)link Runtime將執(zhí)行與該類源代碼配置的并行度一樣多的并行實例陵叽。
4)繼承RichSourceFunction抽象類定義并行數(shù)據(jù)源狞尔。該類是實現(xiàn)并行數(shù)據(jù)源的基類,該數(shù)據(jù)源可以通過父類AbstractRichFunction的getRuntimeContext()方法訪問上下文信息巩掺,通過父類AbstractRichFunction的open()和close()方法訪問生命周期信息偏序。
數(shù)據(jù)源定義好后,可以使用StreamExecutionEnvironment.addSource(sourceFunction)將數(shù)據(jù)源附加到程序中胖替。這樣就可以將外部數(shù)據(jù)轉(zhuǎn)換為DataStream研儒。
例如,自定義MySQL數(shù)據(jù)源刊殉,讀取MySQL中的表數(shù)據(jù)殉摔,實現(xiàn)步驟如下。
(1)引入數(shù)據(jù)庫驅(qū)動
在Maven工程中引入MySQL數(shù)據(jù)庫連接驅(qū)動的依賴庫记焊,代碼如下:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
(2)創(chuàng)建表
在MySQL數(shù)據(jù)庫中創(chuàng)建一張student表并添加測試數(shù)據(jù)逸月,如圖:
(3)定義樣例類
定義樣例類Student用于存儲數(shù)據(jù),代碼如下:
package flink.demo
object Domain {
case class Student(id: Int, name: String, age: Int)
}
(4)創(chuàng)建JDBC工具類
創(chuàng)建一個JDBC工具類遍膜,用于獲得MySQL數(shù)據(jù)庫連接碗硬,代碼如下:
import java.sql.DriverManager
import java.sql.Connection
/**
* JDBC工具類
*/
object JDBCUtils {
//數(shù)據(jù)庫驅(qū)動類
private val driver = "com.mysql.jdbc.Driver"
//數(shù)據(jù)庫連接地址
private val url = "jdbc:mysql://localhost:3306/student_db"
//數(shù)據(jù)庫賬號
private val username = "root"
//數(shù)據(jù)庫密碼
private val password = "123456"
/**
* 獲得數(shù)據(jù)庫連接
*/
def getConnection(): Connection = {
Class.forName(driver)//加載驅(qū)動
val conn = DriverManager.getConnection(url, username, password)
conn
}
}
(5)創(chuàng)建自定義數(shù)據(jù)源類
創(chuàng)建自定義MySQL數(shù)據(jù)源類MySQLSource,繼承RichSourceFunction類瓢颅,并重寫open()恩尾、run()、cancel()方法挽懦,代碼如下:
import java.sql.{Connection, PreparedStatement}
import flink.demo.Domain.Student
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.configuration.Configuration
/**
* 自定義MySQL數(shù)據(jù)源
*/
class MySQLSource extends RichSourceFunction[Student] {
var conn: Connection = _//數(shù)據(jù)庫連接對象
var ps: PreparedStatement = _//SQL命令執(zhí)行對象
var isRunning=true//是否運行(是否持續(xù)從數(shù)據(jù)源讀取數(shù)據(jù))
/**
* 初始化方法
* @param parameters 存儲鍵/值對的輕量級配置對象
*/
override def open(parameters: Configuration): Unit = {
//獲得數(shù)據(jù)庫連接
conn = JDBCUtils.getConnection
//獲得命令執(zhí)行對象
ps = conn.prepareStatement("select * from student")
}
/**
* 當(dāng)開始從數(shù)據(jù)源讀取元素時翰意,該方法將被調(diào)用
* @param ctx 用于從數(shù)據(jù)源發(fā)射元素
*/
override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
//執(zhí)行查詢
val rs = ps.executeQuery()
//循環(huán)讀取集合中的數(shù)據(jù)并發(fā)射出去
while (isRunning&&rs.next()) {
val student = Student(
rs.getInt("id"),
rs.getString("name"),
rs.getInt("age")
)
//從數(shù)據(jù)源收集一個元素數(shù)據(jù)并發(fā)射出去,而不附加時間戳(默認(rèn)方式)
ctx.collect(student)
}
}
/**
* 取消數(shù)據(jù)源讀取
*/
override def cancel(): Unit = {
this.isRunning=false
}
}
(6)測試程序
創(chuàng)建測試類StreamTest信柿,從自定義數(shù)據(jù)源中讀取流數(shù)據(jù)冀偶,打印到控制臺,代碼如下:
import org.apache.flink.streaming.api.scala.{DataStream, _}
/**
* 測試類
*/
object StreamTest {
def main(args: Array[String]): Unit = {
//創(chuàng)建流處理執(zhí)行環(huán)境
val senv=StreamExecutionEnvironment.getExecutionEnvironment
//從自定義數(shù)據(jù)源中讀取數(shù)據(jù)渔嚷,創(chuàng)建DataStream
val dataStream: DataStream[Domain.Student] = senv.addSource(new MySQLSource)
//打印流數(shù)據(jù)到控制臺
dataStream.print()
//觸發(fā)任務(wù)執(zhí)行进鸠,指定作業(yè)名稱
senv.execute("StreamMySQLSource")
}
}
直接在IDEA中運行上述測試類,控制臺輸出結(jié)果如下:
3> Student(2,李四,22)
2> Student(1,張三,19)
4> Student(3,王五,20)
結(jié)果前面的數(shù)字表示執(zhí)行線程的編號形病。
06 Transformation數(shù)據(jù)轉(zhuǎn)換
在Flink中客年,Transformation(轉(zhuǎn)換)算子就是將一個或多個DataStream轉(zhuǎn)換為新的DataStream,可以將多個轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流(Dataflow)拓?fù)洹?br> 常用的Transformation算子介紹如下漠吻。
- map(func)
map()算子接收一個函數(shù)作為參數(shù)量瓜,并把這個函數(shù)應(yīng)用于DataStream的每個元素,最后將函數(shù)的返回結(jié)果作為結(jié)果DataStream中對應(yīng)元素的值途乃,即將DataStream的每個元素轉(zhuǎn)換成新的元素榔至。
如以下代碼所示,對dataStream1應(yīng)用map()算子欺劳,將dataStream1中的每個元素加一并返回一個名為dataStream2的新DataStream:
val dataStream1=senv.fromCollection(List(1,2,3,4,5,6))
val dataStream2=dataStream1.map(x=>x+1)
- flatMap(func)
與map()算子類似唧取,但是每個傳入該函數(shù)func的DataStream元素會返回0到多個元素,最終會將返回的所有元素合并到一個DataStream划提。
例如以下代碼將集合List轉(zhuǎn)為dataStream1枫弟,然后調(diào)用dataStream1的flatMap()算子將dataStream1的每個元素按照空格分割成多個元素,最終合并所有元素到一個新的DataStream鹏往。
//創(chuàng)建DataStream
val dataStream1=senv.fromCollection(
List("hadoop hello scala","flink hello")
)
//調(diào)用flatMap()算子進(jìn)行運算
val dataStream2=dataStream1.flatMap(_.split(" "))
//打印結(jié)果到控制臺
dataStream2.print()
//觸發(fā)任務(wù)執(zhí)行淡诗,指定作業(yè)名稱
senv.execute("MyJob")
- filter(func)
通過函數(shù)func對源DataStream的每個元素進(jìn)行過濾,并返回一個新的DataStream伊履。
例如以下代碼韩容,過濾出dataStream1中大于3的所有元素,并輸出結(jié)果唐瀑。
val dataStream1=senv.fromCollection(List(1,2,3,4,5,6))
val dataStream2=dataStream1.filter(_>3)
dataStream2.print()
控制臺輸出結(jié)果如下:
1> 4
2> 5
3> 6
-
keyBy()
keyBy()算子主要作用于元素類型是元組或數(shù)組的DataStream上群凶。使用該算子可以將DataStream中的元素按照指定的key(指定的字段)進(jìn)行分組,具有相同key的元素將進(jìn)入同一個分區(qū)中(不進(jìn)行聚合)哄辣,并且不改變原來元素的數(shù)據(jù)結(jié)構(gòu)请梢。例如,根據(jù)元素的形狀對元素進(jìn)行分組力穗,相同形狀的元素將被分配到一起毅弧,可被后續(xù)算子統(tǒng)一處理,如圖:
假設(shè)有兩個同學(xué)zhangsan和lisi当窗,zhangsan的語文和數(shù)學(xué)成績分別為98够坐、78,lisi的語文和數(shù)學(xué)成績分別為88崖面、79元咙。將數(shù)據(jù)集的姓名作為key進(jìn)行keyBy()操作,代碼如下:
val dataStream=senv.fromCollection(
List(("zhangsan",98),("zhangsan",78),("lisi",88),("lisi",79))
)
//使用數(shù)字位置指定嘶朱,按照第一個字段分組
val keyedStream: KeyedStream[(String, Int), Tuple]=dataStream.keyBy(_._1)
keyedStream.print()
控制臺輸出結(jié)果如下:
4> (lisi,88)
4> (lisi,79)
3> (zhangsan,98)
3> (zhangsan,78)
從上述輸出結(jié)果可以看出蛾坯,
同一組的元素被同一個線程執(zhí)行。運行過程如圖疏遏。
keyBy()算子的執(zhí)行對象是DataStream脉课,執(zhí)行結(jié)果則是KeyedStream。KeyedStream實際上是一種特殊的DataStream财异,因為其繼承了DataStream倘零。KeyedStream用來表示根據(jù)指定的key進(jìn)行分組的數(shù)據(jù)流。
- reduce()
reduce()算子主要作用于KeyedStream上戳寸,對KeyedStream數(shù)據(jù)流進(jìn)行滾動聚合呈驶,即將當(dāng)前元素與上一個聚合值進(jìn)行合并,并且發(fā)射出新值疫鹊。該算子的原理與MapReduce中的Reduce類似袖瞻,聚合前后的元素類型保持一致司致,如圖所示。
reduce()算子始終以滾動的方式將兩個元素合并為一個元素聋迎,最終將一組元素合并為單個元素脂矫。reduce()算子可以用于整個數(shù)據(jù)集,也可以用于分組的數(shù)據(jù)集霉晕。該算子的執(zhí)行效率比較高庭再,因為它允許系統(tǒng)使用更有效的執(zhí)行策略。
繼續(xù)對前面兩個同學(xué)zhangsan和lisi的成績進(jìn)行reduce()操作牺堰,求出每個同學(xué)的總成績拄轻,代碼如下:
val reducedDataStream: DataStream[(String, Int)] = keyedStream
.reduce((t1, t2) => {
//聚合規(guī)則:將每一組的第二個字段進(jìn)行累加,第一個字段保持不變伟葫。
//注意聚合后數(shù)據(jù)類型與聚合前保持一致(String, Int)
(t1._1, t1._2 + t2._2)
})
reducedDataStream.print()
- Aggregation
除了reduce()算子外恨搓,其他常用的聚合算子有sum()、max()扒俯、min()等奶卓,這些聚合算子統(tǒng)稱為Aggregation。Aggregation算子作用于KeyedStream上撼玄,并且進(jìn)行滾動聚合夺姑。與keyBy()算子類似,可以使用數(shù)字或字段名稱指定需要聚合的字段掌猛。例如以下代碼:
keyedStream.sum(0);//對第一個字段進(jìn)行求和
keyedStream.sum("key");//對字段key進(jìn)行求和
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
keyBy()算子會將DataStream轉(zhuǎn)換為KeyedStream盏浙,而Aggregation算子會將KeyedStream轉(zhuǎn)換為DataStream。相關(guān)類型轉(zhuǎn)換如圖:
- union()
union()算子用于將兩個或多個數(shù)據(jù)流進(jìn)行合并荔茬,創(chuàng)建一個包含所有數(shù)據(jù)流所有元素的新流(不會去除重復(fù)元素)废膘。如果將一個數(shù)據(jù)流與它本身合并,在結(jié)果流中慕蔚,每個元素會出現(xiàn)兩次丐黄。使用union()算子合并數(shù)據(jù)流,如圖:
union()算子執(zhí)行過程中孔飒,多條數(shù)據(jù)流中的元素會以先進(jìn)先出的方式合并灌闺,無法保證順序,每個輸入的元素都會被發(fā)往下游算子坏瞄。使用union()算子對兩個數(shù)據(jù)流進(jìn)行合并桂对,代碼如下:
//創(chuàng)建數(shù)據(jù)流一
val dataStream1 = senv.fromElements(
(0, 0, 0), (1, 1, 1), (2, 2, 2)
)
//創(chuàng)建數(shù)據(jù)流二
val dataStream2 = senv.fromElements(
(3, 3, 3), (4, 4, 4), (5, 5, 5)
)
//合并兩個數(shù)據(jù)流
val unionDataStream=dataStream1.union(dataStream2)
unionDataStream.print()
07 Sink數(shù)據(jù)輸出
Flink可以使用DataStream API將數(shù)據(jù)流輸出到文件、Socket鸠匀、外部系統(tǒng)等蕉斜。Flink自帶了各種內(nèi)置的輸出格式,說明如下。
writeAsText():將元素轉(zhuǎn)為String類型按行寫入外部輸出宅此。String類型是通過調(diào)用每個元素的toString()方法獲得的机错。
writeToSocket():將元素寫入Socket。
writeAsCsv():將元組寫入以逗號分隔的文本文件父腕。行和字段分隔符是可配置的毡熏。每個字段的值來自對象的toString()方法。
addSink():調(diào)用自定義接收函數(shù)侣诵。Flink可以與其他系統(tǒng)(如Apache Kafka)的連接器集成在一起,這些系統(tǒng)已經(jīng)實現(xiàn)了自定義接收函數(shù)狱窘。
08 分區(qū)策略
數(shù)據(jù)在算子之間流動需要依靠分區(qū)策略(分區(qū)器)杜顺,F(xiàn)link目前內(nèi)置了8種已實現(xiàn)的分區(qū)策略和1種自定義分區(qū)策略。已實現(xiàn)的分區(qū)策略對應(yīng)的API為:
BinaryHashPartitioner
BroadcastPartitioner
ForwardPartitioner
GlobalPartitioner
KeyGroupStreamPartitioner
RebalancePartitioner
RescalePartitioner
ShufflePartitioner
自定義分區(qū)策略的API為CustomPartitionerWrapper蘸炸。
- BinaryHashPartitioner
該分區(qū)策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中躬络,是一種針對BinaryRowData的哈希分區(qū)器。BinaryRowData是RowData的實現(xiàn)搭儒,可以顯著減少Java對象的序列化/反序列化穷当。RowData用于表示結(jié)構(gòu)化數(shù)據(jù)類型,運行時通過Table API或SQL管道傳遞的所有頂級記錄都是RowData的實例淹禾。關(guān)于BinaryHashPartitioner馁菜,此處不做過多講解。
- BroadcastPartitioner
廣播分區(qū)策略將上游數(shù)據(jù)記錄輸出到下游算子的每個并行實例中铃岔,即下游每個分區(qū)都會有上游的所有數(shù)據(jù)汪疮。使用DataStream的broadcast()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用廣播分區(qū)策略,Java代碼如下:
dataStream.broadcast()
廣播分區(qū)策略數(shù)據(jù)流圖如下毁习。
- ForwardPartitioner
轉(zhuǎn)發(fā)分區(qū)策略只將元素轉(zhuǎn)發(fā)給本地運行的下游算子的實例智嚷,即將元素發(fā)送到與當(dāng)前算子實例在同一個TaskManager的下游算子實例,而不需要進(jìn)行網(wǎng)絡(luò)傳輸纺且。要求上下游算子并行度一樣盏道,這樣上下游算子可以同屬一個子任務(wù)。
使用DataStream的forward()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用轉(zhuǎn)發(fā)分區(qū)策略载碌,Java代碼如下:
dataStream.forward()
轉(zhuǎn)發(fā)分區(qū)策略數(shù)據(jù)流圖如下猜嘱。
- GlobalPartitioner
全局分區(qū)策略將上游所有元素發(fā)送到下游子任務(wù)編號等于0的分區(qū)算子實例上(下游第一個實例)。
使用DataStream的global()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用全局分區(qū)策略恐仑,Java代碼如下:
dataStream.global()
全局分區(qū)策略數(shù)據(jù)流圖如下泉坐。
- KeyGroupStreamPartitioner
Key分區(qū)策略根據(jù)元素Key的Hash值輸出到下游算子指定的實例。keyBy()算子底層正是使用的該分區(qū)策略裳仆,底層最終會調(diào)用KeyGroupStreamPartitioner的selectChannel()方法腕让,計算每個Key對應(yīng)的通道索引(通道編號,可理解為分區(qū)編號),根據(jù)通道索引將Key發(fā)送到下游相應(yīng)的分區(qū)中纯丸。
- RebalancePartitioner
平衡分區(qū)策略使用循環(huán)遍歷下游分區(qū)的方式偏形,將上游元素均勻分配給下游算子的每個實例。每個下游算子的實例都具有相等的負(fù)載觉鼻。當(dāng)數(shù)據(jù)流中的元素存在數(shù)據(jù)傾斜時俊扭,使用該策略對性能有很大的提升。
使用DataStream的rebalance()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用平衡分區(qū)策略坠陈,Java代碼如下:
dataStream.rebalance()
平衡分區(qū)策略數(shù)據(jù)流圖如下:
- RescalePartitioner
重新調(diào)節(jié)分區(qū)策略基于上下游算子的并行度萨惑,將元素以循環(huán)的方式輸出到下游算子的每個實例。類似于平衡分區(qū)策略仇矾,但又與平衡分區(qū)策略不同庸蔼。
上游算子將元素發(fā)送到下游哪一個算子實例,取決于上游和下游算子的并行度贮匕。例如姐仅,如果上游算子的并行度為2,而下游算子的并行度為4刻盐,那么一個上游算子實例將把元素均勻分配給兩個下游算子實例掏膏,而另一個上游算子實例將把元素均勻分配給另外兩個下游算子實例。相反敦锌,如果下游算子的并行度為2馒疹,而上游算子的并行度為4,那么兩個上游算子實例將分配給一個下游算子實例供屉,而另外兩個上游算子實例將分配給另一個下游算子實例行冰。
假設(shè)上游算子并行度為2,分區(qū)編號為A和B伶丐,下游算子并行度為4悼做,分區(qū)編號為1、2哗魂、3肛走、4,那么A將把數(shù)據(jù)循環(huán)發(fā)送給1和2录别,B則把數(shù)據(jù)循環(huán)發(fā)送給3和4朽色。假設(shè)上游算子并行度為4,編號為A组题、B葫男、C、D崔列,下游算子并行度為2梢褐,編號為1旺遮、2,那么A和B把數(shù)據(jù)發(fā)送給1盈咳,C和D則把數(shù)據(jù)發(fā)送給2耿眉。
使用DataStream的rescale()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用重新調(diào)節(jié)分區(qū)策略,Java代碼如下:
dataStream.rescale()
重新調(diào)節(jié)分區(qū)策略數(shù)據(jù)流圖如下:
如果想將元素均勻地輸出到下游算子的每個實例鱼响,以實現(xiàn)負(fù)載均衡鸣剪,同時又不希望使用平衡分區(qū)策略的全局負(fù)載均衡,則可以使用重新調(diào)節(jié)分區(qū)策略丈积。該策略會盡可能避免數(shù)據(jù)在網(wǎng)絡(luò)間傳輸筐骇,而能否避免還取決于TaskManager的Task Slot數(shù)量、上下游算子的并行度等江滨。
- ShufflePartitioner
隨機(jī)分區(qū)策略將上游算子元素輸出到下游算子的隨機(jī)實例中拥褂。元素會被均勻分配到下游算子的每個實例。這種策略可以實現(xiàn)計算任務(wù)的負(fù)載均衡牙寞。
使用DataStream的shuffle()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用隨機(jī)分區(qū)策略,Java代碼如下:
dataStream.shuffle()