Apache Flink是什么竹宋?
Apache Flink 是一個框架和分布式處理引擎敞贡,用于在無邊界和有邊界數(shù)據(jù)流上進行有狀態(tài)的計算。Flink 能在所有常見集群環(huán)境中運行哗总,并能以內(nèi)存速度和任意規(guī)模進行計算。
Flink 主要包括 DataStream API倍试、DataSet API讯屈、Table API、SQL县习、Graph API 和 FlinkML 等′棠福現(xiàn)在 Flink 也有自己的生態(tài)圈,涉及離線數(shù)據(jù)處理躁愿、實時數(shù)據(jù)處理叛本、SQL 操作、圖計算和機器學(xué)習(xí)庫等彤钟。
Flink 的重要特點
(1)事件驅(qū)動型(Event-driven)
事件驅(qū)動型應(yīng)用是一類具有狀態(tài)的應(yīng)用来候,它從一個或多個事件流提取數(shù)據(jù),并根據(jù)到來的事件觸發(fā)計算逸雹、狀態(tài)更新或其他外部動作营搅。比較典型的就是以 kafka 為代表的消息隊列幾乎都是事件驅(qū)動型應(yīng)用。
與之不同的就是 SparkStreaming 微批次梆砸,如圖:
事件驅(qū)動型:
(2)流與批的世界觀
批處理的特點是有界转质、持久、大量辫樱,非常適合需要訪問全套記錄才能完成的計算工作峭拘,一般用于離線統(tǒng)計。
流處理的特點是無界狮暑、實時, 無需針對整個數(shù)據(jù)集執(zhí)行操作鸡挠,而是對通過系統(tǒng)傳輸?shù)拿總€數(shù)據(jù)項執(zhí)行操作,一般用于實時統(tǒng)計搬男。 在 spark 的世界觀中拣展,一切都是由批次組成的,離線數(shù)據(jù)是一個大批次缔逛,而實時數(shù)據(jù)是由一個一個無限的小批次組成的备埃。
而在 flink 的世界觀中,一切都是由流組成的褐奴,離線數(shù)據(jù)是有界限的流按脚,實時數(shù)據(jù)是一個沒有界限的流,這就是所謂的有界流和無界流敦冬。
無界數(shù)據(jù)流:無界數(shù)據(jù)流有一個開始但是沒有結(jié)束辅搬,它們不會在生成時終止并 提供數(shù)據(jù),必須連續(xù)處理無界流脖旱,也就是說必須在獲取后立即處理 event堪遂。對于無界數(shù)據(jù)流我們無法等待所有數(shù)據(jù)都到達介蛉,因為輸入是無界的,并且在任何時間點都不會完成溶褪。處理無界數(shù)據(jù)通常要求以特定順序(例如事件發(fā)生的順序)獲取 event币旧,以便能夠推斷結(jié)果完整性。
有界數(shù)據(jù)流:有界數(shù)據(jù)流有明確定義的開始和結(jié)束猿妈,可以在執(zhí)行任何計算之前通過獲取所有數(shù)據(jù)來處理有界流吹菱,處理有界流不需要有序獲取,因為可以始終對有界數(shù)據(jù)集進行排序彭则,有界流的處理也稱為批處理毁葱。
(3) 分層 api
最底層級的抽象僅僅提供了有狀態(tài)流,它將通過過程函數(shù)(Process Function) 被嵌入到 DataStream API 中贰剥。底層過程函數(shù)(Process Function) 與 DataStream API 相集成,使其可以對某些特定的操作進行底層的抽象筷频,它允許用戶可以自由地處理 來自一個或多個數(shù)據(jù)流的事件蚌成,并使用一致的容錯的狀態(tài)。除此之外凛捏,用戶可以注冊事件時間并處理時間回調(diào)担忧,從而使程序可以處理復(fù)雜的計算。 實際上坯癣,大多數(shù)應(yīng)用并不需要上述的底層抽象瓶盛,而是針對核心 API(Core APIs) 進行編程,比如 DataStream API(有界或無界流數(shù)據(jù))以及 DataSet API(有界數(shù)據(jù) 集)示罗。這些 API 為數(shù)據(jù)處理提供了通用的構(gòu)建模塊惩猫,比如由用戶定義的多種形式的
轉(zhuǎn)換(transformations),連接(joins)蚜点,聚合(aggregations)轧房,窗口操作(windows) 等等。DataSet API 為有界數(shù)據(jù)集提供了額外的支持绍绘,例如循環(huán)與迭代奶镶。這些 API處理的數(shù)據(jù)類型以類(classes)的形式由各自的編程語言所表示。
Table API 是以表為中心的聲明式編程陪拘,其中表可能會動態(tài)變化(在表達流數(shù)據(jù)時)厂镇。Table API 遵循(擴展的)關(guān)系模型:表有二維數(shù)據(jù)結(jié)構(gòu)(schema)(類似于關(guān)系數(shù)據(jù)庫中的表),同時 API 提供可比較的操作左刽,例如 select捺信、project、join悠反、group-by残黑、aggregate 等馍佑。Table API 程序聲明式地定義了什么邏輯操作應(yīng)該執(zhí)行,而不是準確地確定這些操作代碼的看上去如何梨水。
盡管 Table API 可以通過多種類型的用戶自定義函數(shù)(UDF)進行擴展拭荤,其仍不如核心 API 更具表達能力,但是使用起來卻更加簡潔(代碼量更少)疫诽。除此之外舅世,Table API 程序在執(zhí)行之前會經(jīng)過內(nèi)置優(yōu)化器進行化。你可以在表與DataStream/DataSet 之間無縫切換奇徒,以允許程序?qū)?Table API 與DataStream 以及 DataSet 混合使用雏亚。Flink 提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與Table API 類似摩钙,但是是以 SQL 查詢表達式的形式表現(xiàn)程序罢低。SQL 抽象與 Table API交互密切,同時 SQL 查詢可以直接在 Table API 定義的表上執(zhí)行胖笛。
Flink 幾大模塊
- ? Flink Table & SQL
- ? Flink Gelly(圖計算)
- ? Flink CEP(復(fù)雜事件處理)
Flink應(yīng)用場景
- 電信市場營銷
- 數(shù)據(jù)報表网持,廣告投放,業(yè)務(wù)流程需求
- 物聯(lián)網(wǎng)
- 傳感器實時采集和顯示长踊、實時報警功舀、交通運輸業(yè)
- 電信業(yè)
- 基站流量調(diào)配
- 銀行金融
- 實時計算和通知推送,實時檢測異常行為
Flink部署
Flink standalone模式安裝部署身弊,首先到官網(wǎng)!下載頁面下載辟汰,解壓安裝
進入conf目錄打開flink-conf.yaml 進行編輯
進入bin目錄 ,單機模式下使用Standalone 模式
.\start-cluster.bat
打開瀏覽器訪問 http://locahost:8081 對 flink 集群和任務(wù)進行監(jiān)控管理
Flink 架構(gòu)
Flink運行時架構(gòu)主要包括四個不同的組件阱佛,它們會在運行流處理應(yīng)用程序時協(xié)同工作:作業(yè)管理器(JobManager)帖汞、資源管理器(ResourceManager)、任務(wù)管理器(TaskManager)凑术,以及分發(fā)器(Dispatcher)涨冀。因為Flink是用Java和Scala實現(xiàn)的,所以所有組件都會運行在Java虛擬機上麦萤。每個組件的職責(zé)如下:
作業(yè)管理器(JobManager)
- 控制一個應(yīng)用程序執(zhí)行的主進程鹿鳖,也就是說,每個應(yīng)用程序都會被一個不同的JobManager 所控制執(zhí)行壮莹。
- JobManager 會先接收到要執(zhí)行的應(yīng)用程序翅帜,這個應(yīng)用程序會包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類命满、庫和其它資源的JAR包涝滴。
- JobManager 會把JobGraph轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖,這個圖被叫做“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)歼疮。
- JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源杂抽,也就是任務(wù)管理器(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源韩脏,就會將執(zhí)行圖分發(fā)到真正運行它們的 TaskManager上缩麸。而在運行過程中,JobManager會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)。
任務(wù)管理器(TaskManager)
- Flink中的工作進程桃纯。通常在Flink中會有多個TaskManager運行,每一個TaskManager都包含了一定數(shù)量的插槽(slots)其爵。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。
- 啟動之后,TaskManager會向資源管理器注冊它的插槽;收到資源管理器的指令后刃唐,TaskManager就會將一個或者多個插槽提供給JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了界轩。
- 在執(zhí)行過程中唁桩,一個TaskManager可以跟其它運行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。
資源管理器(ResourceManager)
- 主要負責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)耸棒,TaskManger 插槽是Flink中定義的處理資源單元。
- Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器报辱,比如YARN与殃、Mesos、K8s碍现,以及standalone部署幅疼。
- 當JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給JobManager昼接。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求爽篷,它還可以向資源提供平臺發(fā)起會話,以提供啟動TaskManager進程的容器慢睡。
分發(fā)器(Dispatcher)
- 可以跨作業(yè)運行逐工,它為應(yīng)用提交提供了REST接口。
- 當一個應(yīng)用被提交執(zhí)行時漂辐,分發(fā)器就會啟動并將應(yīng)用移交給一個JobManager泪喊。
- Dispatcher也會啟動一個Web UI,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息髓涯。
- Dispatcher在架構(gòu)中可能并不是必需的袒啼,這取決于應(yīng)用提交運行的方式。
任務(wù)提交流程
這是從一個較為高層級的視角,來看應(yīng)用中各組件的交互協(xié)作蚓再。如果部署的集群環(huán)境不同(例如YARN滑肉,Mesos,Kubernetes摘仅,standalone等)靶庙,其中一些步驟可以被省略,或是有些組件會運行在同一個JVM進程中实檀。
命令行提交job
bin/flink run -c <入口類> -p <并行度> <jar包路徑> <啟動參數(shù)>
$ bin/flink run -c ** WordCount -p 3 **.jar --host localhost --port 7777
Job has been submitted with JobID 33a5d1f00688a362837830f0b85fd75e
取消任務(wù)
bin/flink cancel <Job的ID>
Yarn模式任務(wù)提交流程
- Flink任務(wù)提交后惶洲,Client向HDFS上傳Flink的Jar包和配置
- 之后客戶端向Yarn ResourceManager提交任務(wù),ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster
- ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境膳犹,去啟動JobManager恬吕,之后JobManager向Flink自身的RM進行申請資源,自身的RM向Yarn 的ResourceManager申請資源(因為是yarn模式须床,所有資源歸yarn RM管理)啟動TaskManager
- Yarn ResourceManager分配Container資源后铐料,由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager
-
NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager,TaskManager啟動后向JobManager發(fā)送心跳包豺旬,并等待JobManager向其分配任務(wù)钠惩。
創(chuàng)建項目
搭建maven項目
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0
創(chuàng)建一個簡單工程去坐單詞數(shù)量統(tǒng)計的工程,根據(jù)空格統(tǒng)計單詞數(shù)量
public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
WordCount
public class WordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements("").name("in-memory-input");
DataStream<Tuple2<String, Integer>> counts =
// The text lines read from the source are split into words
// using a user-defined function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
text.flatMap(new Tokenizer())
.name("tokenizer")
// keyBy groups tuples based on the "0" field, the word.
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
// This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
// For each key, we perform a simple sum of the "1" field, the count.
// If the input data stream is bounded, sum will output a final count for
// each word. If it is unbounded, it will continuously output updates
// each time it sees a new instance of each word in the stream.
.sum(1)
.name("counter");
counts.print().name("print-sink");
env.execute("WordCount");
}
}
程序與數(shù)據(jù)流
1.所有的Flink程序都是由三部分組成的: Source 、Transformation 和 Sink族阅。
-
Source 負責(zé)讀取數(shù)據(jù)源篓跛,Transformation 利用各種算子進行處理加工,Sink 負責(zé)輸出
- 在運行時坦刀,F(xiàn)link上運行的程序會被映射成“邏輯數(shù)據(jù)流”(dataflows)愧沟,它包含了這三部分
- 每一個dataflow以一個或多個sources開始以一個或多個sinks結(jié)束。dataflow類似于任意的有向無環(huán)圖(DAG)
-
在大部分情況下鲤遥,程序中的轉(zhuǎn)換運算(transformations)跟dataflow中的算子(operator)是一一對應(yīng)的關(guān)系
任務(wù)調(diào)度原理
- 客戶端不是運行時和程序執(zhí)行的一部分沐寺,但它用于準備并發(fā)送dataflow(JobGraph)給Master(JobManager),然后盖奈,客戶端斷開連接或者維持連接以等待接收計算結(jié)果混坞。而Job Manager會產(chǎn)生一個執(zhí)行圖(Dataflow Graph)
- 當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager钢坦。由 Client 提交任務(wù)給 JobManager究孕,JobManager 再調(diào)度任務(wù)到各個 TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager爹凹。TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸蚊俺。上述三者均為獨立的 JVM 進程。
- Client 為提交 Job 的客戶端逛万,可以是運行在任何機器上(與 JobManager 環(huán)境連通即可)泳猬。提交 Job 后批钠,Client 可以結(jié)束進程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回得封。
- JobManager 主要負責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint埋心,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后忙上,會生成優(yōu)化后的執(zhí)行計劃拷呆,并以 Task 的單元調(diào)度到各個 TaskManager 去執(zhí)行。
-
TaskManager 在啟動的時候就設(shè)置好了槽位數(shù)(Slot)疫粥,每個 slot 能啟動一個 Task茬斧,Task 為線程。從 JobManager 處接收需要部署的 Task梗逮,部署啟動后项秉,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理慷彤。
TaskManger與Slots與parallelism
- Flink 中每一個 TaskManager 都是一個JVM進程娄蔼,它可能會在獨立的線程上執(zhí)行一個或多個子任務(wù)
- 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
- 圖中每個Task Manager中的Slot為3個底哗,那么兩個Task Manager一共有六個Slot, 而這6個Slot代表著Task Manager最大的并發(fā)執(zhí)行能力岁诉,一共能可以執(zhí)行6個task進行同時執(zhí)行
- Slot是靜態(tài)概念,代表著Task Manager具有的并發(fā)執(zhí)行能力跋选,可以通過參數(shù)taskmanager.numberOfTaskSlots進行配置
- 為了控制一個 TaskManager 能接收多少個 task涕癣, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
- 圖中Source和Map是一個Task,且并行度(我們設(shè)置的setParallelism())都為1前标,指這個task任務(wù)的并行能力為1坠韩,只占用一個Slot資源
————————————————
版權(quán)聲明:本文為CSDN博主「SmallScorpion」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議候生,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_40180229/article/details/106321149
- 在第二張圖中為Flink的共享子任務(wù)绽昼,如果一個TaskManager一個slot唯鸭,那將意味著每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM硅确。而在同一個JVM進程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息目溉。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個task的負載菱农。
-
并行度parallelism是動態(tài)概念缭付,即TaskManager運行程序時實際使用的并發(fā)能力,可以通過參數(shù)parallelism.default進行配置循未。
-
也就是說陷猫,假設(shè)一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task绣檬,一共9個TaskSlot足陨,如果我們設(shè)置parallelism.default=1,即運行程序默認的并行度為1娇未,9個TaskSlot只用了1個墨缘,有8個空閑,因此零抬,設(shè)置合適的并行度才能提高效率镊讼。
- 一個特定算子的 子任務(wù)(subtask)的個數(shù)被稱之為其并行度(parallelism),我們可以對單獨的每個算子進行設(shè)置并行度平夜,也可以直接用env設(shè)置全局的并行度蝶棋,更可以在頁面中去指定并行度。
-
最后褥芒,由于并行度是實際Task Manager處理task 的能力嚼松,而一般情況下,一個 stream 的并行度锰扶,可以認為就是其所有算子中最大的并行度献酗,則可以得出在設(shè)置Slot時,在所有設(shè)置中的最大設(shè)置的并行度大小則就是所需要設(shè)置的Slot的數(shù)量坷牛。
資料來源
https://blog.csdn.net/qq_40180229/article/details/106321149
https://zhuanlan.zhihu.com/p/138107079