只是看flink社區(qū)一些文章的總結(jié)识虚,主要是為了理解flink的一些概念和編寫的一些規(guī)約
flink
一.datastreamapi
1.1.單條記錄
filter,map
1.2.windows
時間窗口
1.3.合并流
union把多個數(shù)據(jù)類型相同的合并
join多條數(shù)據(jù)合成一條
connect類型不同的流連起來,這條流就有了多種類型
1.4拆分流
split
2.1.DataStream基本轉(zhuǎn)換
3.1.物理分組
二.flink的客戶端操作
1.1 5種任務(wù)提交方式
1.scala shell
2.sql client
3.commandline
4.restful
5.web
1.2 命令行
bin/flink -h 查看所有命令行
bin/flink run -h 查看某一個命令的參數(shù)
1.3 standalone集群啟動
bin/start-cluster.sh(本地啟動一個flink服務(wù))
可以通過 http://127.0.0.1:8081 訪問
run
命令行執(zhí)行
bin/flink run -d examples/streaming/TopSpeedWindowing.jar
查看任務(wù)列表
bin/flink list -m 127.0.0.1:8081
stop停止任務(wù)
bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb(job id)
取消任務(wù)
bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de
取消任務(wù)。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,會保存 Savepoint花鹅,否則不會保存 Savepoint着撩。
也可以在停止的時候顯示指定 Savepoint 目錄夜矗。
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759
取消和停止(流作業(yè))的區(qū)別如下:
cancel()調(diào)用点骑,立即調(diào)用作業(yè)算子的cancel()方法制圈,以盡快取消他們。如果算子在接到cancel()調(diào)用后沒有停止畔况,flink將開始定期中斷算子線程的執(zhí)行,直到所有算子停止為止慧库。
stop()是更加優(yōu)雅的方式跷跪,僅適用于source實現(xiàn)了stoppableFunction接口的作業(yè)。當(dāng)用戶請求停止作業(yè)時齐板,作業(yè)的所有source都將被stop()方法調(diào)用吵瞻,指導(dǎo)所有source正常關(guān)閉時,作業(yè)才會正常結(jié)束甘磨,這種方式橡羞,使作業(yè)正常處理完所有作業(yè)。
Savepoint
bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
Checkpoint 是增量做的济舆,每次的時間較短卿泽,數(shù)據(jù)量較小,只要在程序里面啟用后會自動觸發(fā)滋觉,用戶無須感知签夭;Checkpoint 是作業(yè) failover 的時候自動使用,不需要用戶指定椎侠。
Savepoint 是全量做的第租,每次的時間較長,數(shù)據(jù)量較大我纪,需要用戶主動去觸發(fā)慎宾。Savepoint 一般用于程序的版本更新(詳見文檔),Bug 修復(fù)浅悉,A/B Test 等場景趟据,需要用戶指定。
通過 -s 參數(shù)從指定的 Savepoint 啟動:
bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar
Modify
修改任務(wù)并行度
bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa
bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa
conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint
修改后需要重啟集群再重啟任務(wù)
bin/stop-cluster.sh && bin/start-cluster.sh
bin/flink run -d examples/streaming/TopSpeedWindowing.jar
info
用來查看flink任務(wù)的執(zhí)行計劃的
bin/flink info examples/streaming/TopSpeedWindowing.jar
http://flink.apache.org/visualizer/
將命令輸出的json拷貝到這個網(wǎng)站
可以和實際運行的物理計劃作對比
web相關(guān)
在 Flink Dashboard 頁面左側(cè)可以看到有個「Submit new Job」的地方,用戶可以上傳 Jar 包和顯示執(zhí)行計劃和提交任務(wù)。Web 提交功能主要用于新手入門和演示用殖属。
三.window和time
1.1 window將有限流切成有限流
window可以是時間驅(qū)動旗闽,也可以是事件驅(qū)動的
window()的參數(shù)是一個分發(fā)器玄括,負(fù)責(zé)將數(shù)組分發(fā)到正確的window中(一條數(shù)據(jù)可能分發(fā)到多個window中)
2.1常見的幾種assigner
1.tumbling window(窗口間無重復(fù)元素類似set)
2.sliding window(窗口間的元素可能重復(fù))
3.session window(只對一個用戶的數(shù)據(jù)集做時間上的分割)以及global window(對一個用戶的所有數(shù)據(jù)做統(tǒng)計不做時間分割)
4.如果需要自定義的話需要實現(xiàn)class盈电,繼承WindowAssigner
evictor主要用于做一些數(shù)據(jù)的自定義操作
evicBefore 和 evicAfter 兩個方法
分別是執(zhí)行用戶代碼之前和執(zhí)行用戶代碼之后
flink提供了三種evictor
CountEvictor保留指定元素數(shù)
DeltaEvictor通過執(zhí)行用戶給定DeltaFunction以及預(yù)設(shè)的threshold沿癞,判斷是否刪除一個元素
TimeEvictor設(shè)定一個閥值interval,刪除所有不再max_ts-interval范圍內(nèi)的元素躁绸,其中max_ts是窗口內(nèi)時間戳的最大值
trigger
每個windowassigner都自帶一個默認(rèn)的trigger等缀,如果默認(rèn)的不滿足可以自定義枷莉,只要繼承trigger即可
1.onElement() 每次往 window 增加一個元素的時候都會觸發(fā)
2.onEventTime() 當(dāng) event-time timer 被觸發(fā)的時候會調(diào)用
3.onProcessingTime() 當(dāng) processing-time timer 被觸發(fā)的時候會調(diào)用
4.onMerge() 對兩個 trigger 的 state 進(jìn)行 merge 操作
5.clear() window 銷毀的時候被調(diào)用
上面的接口中前三個會返回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:
- CONTINUE 不做任何事情
- FIRE 觸發(fā) window
- PURGE 清空整個 window 的元素并銷毀窗口
- FIRE_AND_PURGE 觸發(fā)窗口尺迂,然后銷毀窗口
3.1 Time & Watermark
在 Flink 中 Time 可以分為三種
Event-Time 表示事件發(fā)生的時間笤妙,Processing-Time 則表示處理消息的時間(墻上時間),Ingestion-Time 表示進(jìn)入到系統(tǒng)的時間噪裕。
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 設(shè)置使用 ProcessingTime
watermark是為了在亂序的數(shù)據(jù)中保證某個時間段內(nèi)的數(shù)據(jù)已經(jīng)全部得到蹲盘,就是在指定的window后再收集一段時間
但是真實世界中我們沒法得到一個完美的 watermark 數(shù)值 — 要么沒法獲取到,要么耗費太大膳音,因此實際工作中我們會使用近似 watermark — 生成 watermark(t) 之后召衔,還有較小的概率接受到時間戳 t 之前的數(shù)據(jù),在 Flink 中將這些數(shù)據(jù)定義為 “l(fā)ate elements”, 同樣我們可以在 window 中指定是允許延遲的最大時間(默認(rèn)為 0)
4.1 window的內(nèi)部實現(xiàn)
每條數(shù)據(jù)過來之后祭陷,會由 WindowAssigner 分配到對應(yīng)的 Window苍凛,當(dāng) Window 被觸發(fā)之后,會交給 Evictor(如果沒有設(shè)置 Evictor 則跳過)兵志,然后處理 UserFunction醇蝴。而 UserFunction 則是用戶編寫的代碼。
四.狀態(tài)管理
1.1 無狀態(tài)計算的例子
單條輸入包含所有信息可直接計算
1.1.2 有狀態(tài)計算的例子(ps:基本可以的出我要做的訪問統(tǒng)計實際是一種有狀態(tài)的計算了想罕。)
單條輸入僅包含部分信息悠栓,相同輸入可能得到不同的輸出
1.2 flink的狀態(tài)類型
1.2.1 managed state & raw state
Managed State 是 Flink 自動管理的 State,而 Raw State 是原生態(tài) State按价,兩者的區(qū)別如下:
從狀態(tài)管理方式的方式來說闸迷,Managed State 由 Flink Runtime 管理,自動存儲俘枫,自動恢復(fù)腥沽,在內(nèi)存管理上有優(yōu)化;而 Raw State 需要用戶自己管理鸠蚪,需要自己序列化今阳,F(xiàn)link 不知道 State 中存入的數(shù)據(jù)是什么結(jié)構(gòu),只有用戶自己知道茅信,需要最終序列化為可存儲的數(shù)據(jù)結(jié)構(gòu)盾舌。
從狀態(tài)數(shù)據(jù)結(jié)構(gòu)來說,Managed State 支持已知的數(shù)據(jù)結(jié)構(gòu)蘸鲸,如 Value妖谴、List、Map 等。而 Raw State只支持字節(jié)數(shù)組 膝舅,所有狀態(tài)都要轉(zhuǎn)換為二進(jìn)制字節(jié)數(shù)組才可以嗡载。
從推薦使用場景來說,Managed State 大多數(shù)情況下均可使用仍稀,而 Raw State 是當(dāng) Managed State 不夠用時洼滚,比如需要自定義 Operator 時,推薦使用 Raw State技潘。
1.2.3 Keyed State & Operator State
Managed State 分為兩種遥巴,一種是 Keyed State;另外一種是 Operator State享幽。在Flink Stream模型中铲掐,Datastream 經(jīng)過 keyBy 的操作可以變?yōu)?KeyedStream 。
每個 Key 對應(yīng)一個 State值桩,即一個 Operator 實例處理多個 Key摆霉,訪問相應(yīng)的多個 State,并由此就衍生了 Keyed State颠毙。Keyed State 只能用在 KeyedStream 的算子中,即在整個程序中沒有 keyBy 的過程就沒有辦法使用 KeyedStream砂碉。
相比較而言蛀蜜,Operator State 可以用于所有算子,相對于數(shù)據(jù)源有一個更好的匹配方式增蹭,常用于 Source滴某,例如 FlinkKafkaConsumer。相比 Keyed State滋迈,一個 Operator 實例對應(yīng)一個 State霎奢,隨著并發(fā)的改變,Keyed State 中饼灿,State 隨著 Key 在實例間遷移幕侠,比如原來有 1 個并發(fā),對應(yīng)的 API 請求過來碍彭,/api/a 和 /api/b 都存放在這個實例當(dāng)中晤硕;如果請求量變大,需要擴(kuò)容庇忌,就會把 /api/a 的狀態(tài)和 /api/b 的狀態(tài)分別放在不同的節(jié)點舞箍。由于 Operator State 沒有 Key,并發(fā)改變時需要選擇狀態(tài)如何重新分配皆疹。其中內(nèi)置了 2 種分配方式:一種是均勻分配疏橄,另外一種是將所有 State 合并為全量 State 再分發(fā)給每個實例。
在訪問上略就,Keyed State 通過 RuntimeContext 訪問捎迫,這需要 Operator 是一個Rich Function晃酒。Operator State 需要自己實現(xiàn) CheckpointedFunction 或 ListCheckpointed 接口。在數(shù)據(jù)結(jié)構(gòu)上立砸,Keyed State 支持的數(shù)據(jù)結(jié)構(gòu)掖疮,比如 ValueState、ListState颗祝、ReducingState浊闪、AggregatingState 和 MapState;而 Operator State 支持的數(shù)據(jù)結(jié)構(gòu)相對較少螺戳,如 ListState搁宾。
五.容錯機制與故障恢復(fù)
- 狀態(tài)如何恢復(fù)如何保存
肯定是checkpoint和savepoint啊,意料之內(nèi)
這里有一個點就是如果數(shù)據(jù)源不支持重發(fā)的話任務(wù)和狀態(tài)雖然回歸了倔幼,但是數(shù)據(jù)丟失了盖腿,這樣就會導(dǎo)致計算結(jié)果錯誤。
我暫時想到的方案就是題提高chechpoint的頻率损同,減小結(jié)果的誤差
1.2 Checkpoint 通過代碼的實現(xiàn)方法如下:
首先從作業(yè)的運行環(huán)境 env.enableCheckpointing 傳入 1000翩腐,意思是做 2 個 Checkpoint 的事件間隔為 1 秒。Checkpoint 做的越頻繁膏燃,恢復(fù)時追數(shù)據(jù)就會相對減少茂卦,同時 Checkpoint 相應(yīng)的也會有一些 IO 消耗。
接下來是設(shè)置 Checkpoint 的 model组哩,即設(shè)置了 Exactly_Once 語義等龙,表示需要 Barrier 對齊,這樣可以保證消息不會丟失也不會重復(fù)伶贰。
setMinPauseBetweenCheckpoints 是 2 個 Checkpoint 之間最少是要等 500ms蛛砰,也就是剛做完一個 Checkpoint。比如某個 Checkpoint 做了700ms黍衙,按照原則過 300ms 應(yīng)該是做下一個 Checkpoint泥畅,因為設(shè)置了 1000ms 做一次 Checkpoint 的,但是中間的等待時間比較短琅翻,不足 500ms 了涯捻,需要多等 200ms,因此以這樣的方式防止 Checkpoint 太過于頻繁而導(dǎo)致業(yè)務(wù)處理的速度下降望迎。
setCheckpointTimeout 表示做 Checkpoint 多久超時障癌,如果 Checkpoint 在 1min 之內(nèi)尚未完成,說明 Checkpoint 超時失敗辩尊。
setMaxConcurrentCheckpoints 表示同時有多少個 Checkpoint 在做快照涛浙,這個可以根據(jù)具體需求去做設(shè)置。
enableExternalizedCheckpoints 表示下 Cancel 時是否需要保留當(dāng)前的 Checkpoint,默認(rèn) Checkpoint 會在整個作業(yè) Cancel 時被刪除轿亮。Checkpoint 是作業(yè)級別的保存點疮薇。
1.3聊聊可選的狀態(tài)存儲方式
Checkpoint 的存儲,第一種是內(nèi)存存儲我注,即 MemoryStateBackend按咒,構(gòu)造方法是設(shè)置最大的StateSize,選擇是否做異步快照但骨,這種存儲狀態(tài)本身存儲在 TaskManager 節(jié)點也就是執(zhí)行節(jié)點內(nèi)存中的励七,因為內(nèi)存有容量限制,所以單個 State maxStateSize 默認(rèn) 5 M奔缠,且需要注意 maxStateSize <= akka.framesize 默認(rèn) 10 M掠抬。Checkpoint 存儲在 JobManager 內(nèi)存中,因此總大小不超過 JobManager 的內(nèi)存校哎。推薦使用的場景為:本地測試两波、幾乎無狀態(tài)的作業(yè),比如 ETL闷哆、JobManager 不容易掛腰奋,或掛掉影響不大的情況。不推薦在生產(chǎn)場景使用抱怔。
另一種就是在文件系統(tǒng)上的 FsStateBackend 劣坊,構(gòu)建方法是需要傳一個文件路徑和是否異步快照。State 依然在 TaskManager 內(nèi)存中野蝇,但不會像 MemoryStateBackend 有 5 M 的設(shè)置上限讼稚,Checkpoint 存儲在外部文件系統(tǒng)(本地或 HDFS)括儒,打破了總大小 Jobmanager 內(nèi)存的限制绕沈。容量限制上,單 TaskManager 上 State 總量不超過它的內(nèi)存帮寻,總大小不超過配置的文件系統(tǒng)容量乍狐。推薦使用的場景、常規(guī)使用狀態(tài)的作業(yè)固逗、例如分鐘級窗口聚合或 join浅蚪、需要開啟HA的作業(yè)。
還有一種存儲為 RocksDBStateBackend 烫罩,RocksDB 是一個 key/value 的內(nèi)存存儲系統(tǒng)惜傲,和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中贝攒,如果內(nèi)存快滿時盗誊,則寫入到磁盤中,但需要注意 RocksDB 不支持同步的 Checkpoint,構(gòu)造方法中沒有同步快照這個選項哈踱。不過 RocksDB 支持增量的 Checkpoint荒适,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄开镣,僅需要上傳新生成的 sst 文件即可刀诬。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個 TaskManager 上 State 總量不超過它的內(nèi)存+磁盤邪财,單 Key最大 2G陕壹,總大小不超過配置的文件系統(tǒng)容量即可。推薦使用的場景為:超大狀態(tài)的作業(yè)卧蜓,例如天級窗口聚合帐要、需要開啟 HA 的作業(yè)、最好是對狀態(tài)讀寫性能要求不高的作業(yè)弥奸。
六. table api
1.1
第一榨惠,Table API & SQL 是一種聲明式的 API。用戶只需關(guān)心做什么盛霎,不用關(guān)心怎么做赠橙,比如圖中的 WordCount 例子,只需要關(guān)心按什么維度聚合愤炸,做哪種類型的聚合期揪,不需要關(guān)心底層的實現(xiàn)。
第二规个,高性能凤薛。Table API & SQL 底層會有優(yōu)化器對 query 進(jìn)行優(yōu)化。舉個例子诞仓,假如 WordCount 的例子里寫了兩個 count 操作缤苫,優(yōu)化器會識別并避免重復(fù)的計算,計算的時候只保留一個 count 操作墅拭,輸出的時候再把相同的值輸出兩遍即可活玲,以達(dá)到更好的性能。
第三谍婉,流批統(tǒng)一舒憾。上圖例子可以發(fā)現(xiàn),API 并沒有區(qū)分流和批穗熬,同一套 query 可以流批復(fù)用镀迂,對業(yè)務(wù)開發(fā)來說,避免開發(fā)兩套代碼唤蔗。
第四探遵,標(biāo)準(zhǔn)穩(wěn)定唧瘾。Table API & SQL 遵循 SQL 標(biāo)準(zhǔn),不易變動别凤。API 比較穩(wěn)定的好處是不用考慮 API 兼容性問題饰序。
第五,易理解规哪。語義明確求豫,所見即所得。
table api的編寫步驟
1.生成table(輸入數(shù)據(jù)生成table)
2.寫sql進(jìn)行聚合和統(tǒng)計
生成table的步驟
1.注冊對應(yīng)的tablesource
2.調(diào)用table environement的scan方法獲取table對象诉稍。
注冊source的三種方式
1.descriptor
2.自定義source
3.datastream
輸出table
Table descriptor, 自定義 Table sink 以及輸出成一個 DataStream
當(dāng)我們拿到一個 Table 后蝠嘉,調(diào)用 groupBy 會返回一個 GroupedTable。GroupedTable 里只有 select 方法杯巨,對 GroupedTable 調(diào)用 select 方法會返回一個 Table蚤告。拿到這個 Table 后,我們可以再調(diào)用 Table 上的方法服爷。圖中其他 Table杜恰,如 OverWindowedTable 也是類似的流程。值得注意的是仍源,引入各個類型的 Table 是為了保證 API 的合法性和便利性心褐,比如 groupBy 之后只有 select 操作是有意義的,在編輯器上可以直接點出來笼踩。
一些對列的增強操作
clumns operation
對列的增加逗爹,替換,刪除和重命名
columns function
選取多列或者反選
七.sql api
1.window aggregation
根據(jù)時間窗口對數(shù)據(jù)的進(jìn)行聚合計算嚎于,并且在到達(dá)結(jié)束時間時進(jìn)行結(jié)果輸出(只輸出一次)
2.group aggregation
無限制掘而,數(shù)據(jù)來一次計算一次,有多次輸出