Flink知識(shí)點(diǎn)

1savepoint checkpoint savepoint是checkpoint的一種特殊方式,手動(dòng)保存,其實(shí)就是指針卵渴。savepoint不會(huì)過(guò)期 不會(huì)覆蓋 除非手動(dòng)刪除

2.jobmanager相當(dāng)于resource manager 一般生產(chǎn)會(huì)有2個(gè) 做ha

3.OperatorChain的優(yōu)點(diǎn):
(1)減少線程切換
(2)減少序列化與反序列化
(3)減少數(shù)據(jù)在緩沖區(qū)的交換
(4)減少延遲并且提高吞吐能力

OperatorChain組成條件:
(1)沒(méi)有禁用Chain
(2)上下游算子并行度一致
(3)下游算子的入度為1(也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有其他節(jié)點(diǎn)的輸入)
(4)上下游算子在同一個(gè)slot group
(5)下游節(jié)點(diǎn)的chain策略為always(可以與上下游鏈接,map未巫、flatmap迎卤、filter等默認(rèn)是always)
(6)上有節(jié)點(diǎn)的chain策略為always或head(只能與下游鏈接它匕,不能與上有鏈接,source默認(rèn)是head)
(7)上下游算子之間沒(méi)有數(shù)據(jù)shuffle(數(shù)據(jù)分區(qū)方式是forward)

禁用OperatorChain幾種方式:
(1)DataStream的算子操作后調(diào)用startNewChain算子
(2)DataStream調(diào)用disableChaining來(lái)關(guān)閉Chain
(3)StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining() 全局關(guān)閉
(4) DataStream.slotSharingGroup("name") 設(shè)置新的slotgrop名稱
(5)改變并行度

4.共享slot
(1)flink集群需要的任務(wù)槽與作業(yè)中使用的最高并行度正好相同(前提吩跋,保持默認(rèn)SlotSharingGroup)寞射。也就是說(shuō)我們不需要再去
計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了
(2)適當(dāng)設(shè)置sltSharingGroup可以減少每個(gè)slot運(yùn)行的線程數(shù),從而整體上減少機(jī)器的負(fù)載

5.slot與parallelism的關(guān)系
(1)默認(rèn)task slot數(shù)與join中task的最高并行度一致

6.累加器和計(jì)數(shù)器
(1)計(jì)數(shù)器是最簡(jiǎn)單的累加器
(2)內(nèi)置累加器有IntCounter,LongCounter,DoubleCounter
(3)Histogram 柱狀圖

7.控制延遲
默認(rèn)情況下锌钮,流中的元素并不會(huì)一個(gè)一個(gè)的在網(wǎng)絡(luò)中傳輸(這會(huì)導(dǎo)致不必要的網(wǎng)絡(luò)流量消耗)
桥温,而是緩存起來(lái),緩存的大小可以在Flink的配置文件梁丘、ExecutinEnvironment侵浸、設(shè)置某個(gè)算子
進(jìn)行配置(默認(rèn)100ms)
這樣控制的 好處:提高吞吐 壞處:增加了延遲

如何把握平衡
(1)為了最大吞吐量,可以設(shè)置setBufferTimeout(-1)氛谜,這會(huì)移出timeout機(jī)制掏觉,緩存中的數(shù)據(jù)
一滿就會(huì)被發(fā)送,不建議用,假如一條信息4 5個(gè)小時(shí)才來(lái)這時(shí)候延遲會(huì)非常高,會(huì)等整個(gè)buffer滿了再處理
(2)為了最小延遲值漫,可以將超時(shí)設(shè)置為接近0的數(shù)(例如5或者10ms)
(3)緩存的超時(shí)不要設(shè)置0澳腹,因?yàn)闀?huì)帶來(lái)一些性能的損耗

8.min minby max maxby
min和minby的區(qū)別是min返回一個(gè)最小值,而minby返回的是其字段中包含的最小元素

9.interval join
在給定周期內(nèi)杨何,按照指定key對(duì)兩個(gè)KeyedStream進(jìn)行join操作酱塔,把符合join條件的兩個(gè)
event拉倒一起,然后怎么處理由用戶自己定義
場(chǎng)景:把一定時(shí)間內(nèi)的相關(guān)的分組數(shù)據(jù)拉成一個(gè)寬表

10connect 和union
connect之后是connectedStreams,會(huì)對(duì)兩個(gè)流的數(shù)據(jù)應(yīng)用不同的處理方法危虱,并且雙流之間可以
共享狀態(tài)(比如計(jì)數(shù))羊娃。這在第一個(gè)流的輸入會(huì)影響第二個(gè)流時(shí)會(huì)非常有用。
union合并多個(gè)流埃跷,新的流包含所有流的數(shù)據(jù)
union是DataStream->DataStream
connect只能連接兩個(gè)流蕊玷,而union可以連接多余兩個(gè)流
connect兩個(gè)流類型可以不一致,而union連接的流類型必須一致

11.assignTimestampsAndWatermarks
含義:提取記錄中的時(shí)間戳作為Event time,主要在window操作中發(fā)揮作用弥雹,不設(shè)置默認(rèn)就是
Processing time
限制: 只有基于event time構(gòu)建window時(shí)才有用
使用場(chǎng)景:當(dāng)你需要使用event time來(lái)創(chuàng)建window時(shí)垃帅,用來(lái)指定如何獲取event的時(shí)間戳

12.算子之間傳遞數(shù)據(jù)的方式
(1)One-to-one streams 保持元素的分區(qū)和順序
(2)重新分區(qū)的方式 ,重新分區(qū)策略取決于使用的算子 keyby缅糟、broadcast挺智、rebalance

dataStream.shuffle() 按均勻分布隨機(jī)劃分元素,網(wǎng)絡(luò)開(kāi)銷往往比較大
dataStream.rebalance() 循環(huán)對(duì)元素進(jìn)行分區(qū)窗宦,為每各分區(qū)創(chuàng)建相等負(fù)載赦颇,解決數(shù)據(jù)傾斜時(shí)非常有用
dataStream.rescale() 跟rebalance類似,但不是全局的赴涵,通過(guò)輪詢調(diào)度將元素從上游的task一個(gè)子集發(fā)送到下游task的一個(gè)子集
dataStream.broadcast() 將元素廣播到每個(gè)分區(qū)上

13.三個(gè)時(shí)間的比較
一媒怯、EventTime
(1)事件生成的時(shí)間,在進(jìn)入Flink之氣就存在髓窜,可以從event的字段中抽取
(2)必須指定watermarks的生產(chǎn)方式
(3)優(yōu)勢(shì):確定性扇苞,亂序欺殿、延時(shí)、或者數(shù)據(jù)重放等情況鳖敷,都能給出正確結(jié)果
(4)弱點(diǎn):處理無(wú)序事件時(shí)性能和延遲受到影響

二脖苏、IngerstTime
(1)事件進(jìn)入flink的時(shí)間,即source里獲取的當(dāng)前系統(tǒng)時(shí)間定踱,后續(xù)統(tǒng)一使用該時(shí)間
(2)不需要指定watermarks的生產(chǎn)范式(自動(dòng)生成)
(3)弱點(diǎn):不能處理無(wú)序事件和延遲數(shù)據(jù)

三棍潘、ProcessingTime
(1)執(zhí)行操作的機(jī)器的當(dāng)前系統(tǒng)時(shí)間(每個(gè)算子都不一樣)
(2)不需要流和機(jī)器之間的協(xié)調(diào)
(3)優(yōu)勢(shì):最佳的性能和最低的延遲
(4)弱點(diǎn):不確定性,容易受到各種因素影響(event產(chǎn)生的速度崖媚、到達(dá)flink的速度亦歉、算子之間傳輸速度),壓根不管順序和延遲

比較
性能:ProcessingTime>IngestTime>EventTime
延遲:ProcessingTime<IngestTime<EventTime
確定性:EventTime>IngestTime>ProcessIngTime

不設(shè)置time類型畅哑,默認(rèn)是processingTime
通過(guò) env.setStreamTimeCharacteristic()方法設(shè)置time類型

14.watermark
(1)通常情況下肴楷,watermark在source函數(shù)中生成,但也可以在source后任何階段荠呐,如果指定多次
后嗎指定的會(huì)覆蓋前面的值赛蔫。source的每個(gè)sub task獨(dú)立生成水位線。
(2)watermark通過(guò)操作時(shí)會(huì)推進(jìn)算子操作時(shí)的event time,同時(shí)會(huì)為下游生成一個(gè)新的watermark
(3)多輸入operator(union直秆、keyby濒募、partition)的當(dāng)前event time是其輸入流event time最小值

15.timestamp/watermark兩種生成方式
方式1:直接在source function中生成
方式2:timestamp assigner / watermark generator

timestamp和watermark都是采用毫秒

16.兩種watermark
一鞭盟、周期性 watermark
(1)基于時(shí)間
(2)ExecutionConfig.setAutoWatermarkInterval(msec) (默認(rèn)200ms圾结,設(shè)置watermarker發(fā)生的周期)
(3)實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口

二、間斷的watermark
(1)基于某些時(shí)間出發(fā)watermark的生產(chǎn)和發(fā)送(由用戶代碼實(shí)現(xiàn)齿诉,例如遇到特殊情況)
(2)實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口

17.處理延遲數(shù)據(jù)
方式一:allowedLateness(),設(shè)定最大延遲時(shí)間筝野,觸發(fā)被延遲,不宜設(shè)置太大
方式二:sideOutputTag,提供了延遲數(shù)據(jù)獲取的一種方式粤剧,這樣就不會(huì)丟棄數(shù)據(jù)了,延遲數(shù)據(jù)單獨(dú)處理歇竟。

18.windows分類(是否keyby決定了大分類)
一.Keyed Windows(在已經(jīng)安裝keyby分組的基礎(chǔ)上(KeyedStream),再構(gòu)建多任務(wù)并行window)
stream.keyBy().window()
二.Non-Keyed Windwos(在未分組的DataStream上構(gòu)建單任務(wù)Window,并行度是1,API都帶ALL后綴)
stream.windowAll()

19.window窗口生命周期
創(chuàng)建:當(dāng)屬于第一個(gè)元素到達(dá)時(shí)就會(huì)創(chuàng)建該窗口
銷毀:當(dāng)時(shí)間(event/process time)超過(guò)窗口的結(jié)束時(shí)間戳+用戶指定的延遲時(shí)(allowedLateness<time>),窗口將會(huì)移除

20.觸發(fā)器
(1)觸發(fā)器決定了一個(gè)窗口何時(shí)可以被窗口函數(shù)處理(條件滿足時(shí)觸發(fā)并發(fā)出信號(hào))
(2)每一個(gè)WindowAssigner都有一個(gè)默認(rèn)的觸發(fā)器抵恋,如果默認(rèn)觸發(fā)器不滿足需要可以通過(guò)trigger()來(lái)指定

觸發(fā)器有5個(gè)方法來(lái)允許觸發(fā)器處理不同的事件(trigger)
onElement()方法每個(gè)元素被添加到窗口是調(diào)用
onEvenTime() 當(dāng)一個(gè)已注冊(cè)的事件時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
onProcessingTime 當(dāng)一個(gè)已注冊(cè)的處理時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
onMerge 與狀態(tài)觸發(fā)器相關(guān)焕议, 當(dāng)使用session window時(shí)兩個(gè)觸發(fā)器對(duì)應(yīng)的窗口合并,合并兩個(gè)觸發(fā)器的狀態(tài)
clear相應(yīng)窗口被清除時(shí)觸發(fā)

21驅(qū)逐器
evictor是可選的弧关,WindowAssigner默認(rèn)沒(méi)有evictor
evictor能夠在Trigger觸發(fā)之后以及在應(yīng)用窗口函數(shù)執(zhí)行前和/或后從窗口中刪除無(wú)用的元素盅安,類似filter作用
evictBefore在窗口之前應(yīng)用
evictAfter在窗口后應(yīng)用

22.如何允許延遲
(1)當(dāng)處理event-time的windwo時(shí),可能會(huì)出現(xiàn)元素晚到的情況世囊,即flink用來(lái)跟蹤event-time進(jìn)度的
watermark已經(jīng)過(guò)了元素所屬窗口的最后時(shí)間别瞭,屬于當(dāng)前窗口的數(shù)據(jù)才到達(dá))
(2)默認(rèn)情況下,當(dāng)watermark已經(jīng)過(guò)了窗口的最后時(shí)間時(shí)株憾,晚到的元素會(huì)被丟棄
(3)Flink允許為窗口操作指定一個(gè)最大允許延時(shí)時(shí)長(zhǎng)蝙寨,Allowed lateness指定晒衩,默認(rèn)情況是0
(4)水位線已過(guò)了窗口最后時(shí)間才來(lái)的元素,如果還在未到窗口最后時(shí)間加延遲時(shí)間墙歪,任然可以在窗口中計(jì)算

特例:在使用GlobalWindows(全局window)听系,不會(huì)考慮延遲,因?yàn)榇翱诘慕Y(jié)束時(shí)間戳是Long.MAX_VALUE

23.CoGROUP虹菲、Join跛锌、Connect
一、cogroup:
(1)側(cè)重于group,是對(duì)同一個(gè)key上的兩組集合進(jìn)行操作
(2)CoGroup的作用和join基本相同届惋,但有一點(diǎn)不一樣的是髓帽,如果未能找到新到來(lái)的數(shù)據(jù)與另一個(gè)流
在window中存在的匹配數(shù)據(jù),仍會(huì)將其輸出
(3)只能在window中用

二 脑豹、join
(1)對(duì)同一個(gè)key上的每對(duì)元素進(jìn)行操作
(2)類似inner join
(3)按照條件分別取出兩個(gè)流能匹配的元素郑藏,返回個(gè)下游處理
(4)join是cogroup的特例
(5)只能在window中用

三、connect
(1)沒(méi)有匹配條件瘩欺,兩個(gè)流分別處理流各種的邏輯必盖。

24.Process Function
Flink提供三層API
(1)SQL/Table Api
(2)DataStream API
(3)ProcessFunction

不要跟ProcessWindowFunction混為一談
ProcessFunction是一個(gè)低階的流處理做操,它可以訪問(wèn)流處理程序的基礎(chǔ)構(gòu)建模塊
(1)事件(event 流元素)
(2)狀態(tài)(state容錯(cuò)性俱饿,一致性歌粥,僅在keyed stream中)
(3)定時(shí)器(event time和processint time,僅在keyed stream中)

25.Connectors
Connectors是數(shù)據(jù)進(jìn)出flink一套接口和實(shí)現(xiàn),是source和sink的統(tǒng)稱
數(shù)據(jù)進(jìn)出flink方式不止connectors拍埠,還有:(1)Async I/O 異步IO (2)Queryable State

自定義source
(1)實(shí)現(xiàn)SourceFunction 非并行失驶,并行度為1
(2)實(shí)現(xiàn)ParallelSourceFunction
(3)繼承RichParallelSourceFunction

kafka-connector:
(1)基于Kafka的partition機(jī)制,F(xiàn)link實(shí)現(xiàn)了并行化數(shù)據(jù)切分
(2)Flink可以效仿Kafka的topic枣购,和sink數(shù)據(jù)到kafka
(3)出現(xiàn)失敗嬉探,flink協(xié)調(diào)kafka來(lái)恢復(fù)應(yīng)用(通過(guò)設(shè)置kafka的offset)

FlinkKafkaConsumer消費(fèi)模式
(1)setStartFromEarliest 從隊(duì)列頭開(kāi)始消費(fèi),最早的記錄
(2)setStartFromLatest 從隊(duì)列為開(kāi)始消費(fèi)棉圈,最新的記錄
(3)setStartFromGroupOffsets 默認(rèn)值涩堤,從當(dāng)前消費(fèi)組記錄偏移量開(kāi)始消費(fèi)
(4)setStartFromSpecificOffsets(Map<TopicPartition,Long>)從指定位置開(kāi)始消費(fèi)
(5)setStartFromTimestamp(Long) 從指定時(shí)間戳愛(ài)是消費(fèi)

FlinkKafkaSource的容錯(cuò)性
(1)env.enableCheckpointing() 啟動(dòng)檢查點(diǎn)
(2)如果flink啟用了檢查點(diǎn),將會(huì)周期性的checkpoint其kafka的偏移量
(3)保證了僅一次消費(fèi)
(4)如果作業(yè)失敗分瘾,flink將程序恢復(fù)到最新檢查點(diǎn)的狀態(tài)胎围,并從檢查點(diǎn)中存儲(chǔ)的偏移量開(kāi)始消費(fèi)Kafka中的記錄
(5)內(nèi)部實(shí)現(xiàn)checkpointFunction
(6)內(nèi)部保存ListState<Tuple2<KafkaTopicPartition,Long>>

不同情況下消費(fèi)位置分析
(1)第一次啟動(dòng),無(wú)savepoint(常規(guī)情況) 德召。 由消費(fèi)者模式?jīng)Q定
(2)通過(guò)savepoint啟動(dòng)(應(yīng)用升級(jí)白魂,比如加大并行度)。 由savepoint記錄的offset決定
(3)有checkpoint,失敗后氏捞,job恢復(fù)的情況碧聪。 由checkpoint的snapshot中記錄的offset決定
(4)無(wú)checkpoint,失敗后,job恢復(fù)的情況液茎。 由消費(fèi)者模式?jīng)Q定

動(dòng)態(tài)Partition discovery
(1)Flink Kafka Consumer支持動(dòng)態(tài)發(fā)現(xiàn)Kafka分區(qū)逞姿,且能保證exactly-once
(2)默認(rèn)禁止動(dòng)態(tài)發(fā)現(xiàn)分區(qū)辞嗡,把flink.partition-discover.interval-milllis設(shè)置大于0即可開(kāi)啟
properties.setProperty("flink.partition-discover.interval-milllis","30000")

26.Sate 狀態(tài)
Flink的狀態(tài):一般指一個(gè)具體的task/operator某時(shí)刻在內(nèi)存中的的狀態(tài)(例如某屬性的值)
注意:State和checkpointing不要搞混
checkpoint 則表示了一個(gè)flink job ,在一個(gè)特定時(shí)一份全局狀態(tài)快照滞造,即包含了一個(gè)job下所有task/operator某時(shí)刻的狀態(tài)

狀態(tài)的錯(cuò)用
一.增量計(jì)算
(1)聚合操作
(2)機(jī)器學(xué)習(xí)訓(xùn)練模型迭代運(yùn)算時(shí)保持當(dāng)前模型
二.容錯(cuò)
(1)job故障重啟
(2)flink程序升級(jí)

狀態(tài)分類
(1)Operator State 每個(gè)流普通的Operator的狀態(tài)
(2)Keyed State Keyed Streaming的狀態(tài)
(3)特殊的:Broadcast State(1.5開(kāi)始)

Keyed State支持的數(shù)據(jù)結(jié)構(gòu)
(1)ValueState
(2)ListState
(3)ReducingState
(4)AggregatingState
(5)FoldingState
(6)MapState

注意:
(1)狀態(tài)不一定存儲(chǔ)在內(nèi)部续室,可能駐留在磁盤(pán)或其他地方
(2)狀態(tài)是使用RunntimContext方法的,因此只能在Rich函數(shù)中訪問(wèn)

Keyed State和Operator State,可以以兩種形式存在:原始狀態(tài)和托管狀態(tài)
managerd(托管狀態(tài)): 如ValueState,ListState,MapState等谒养,通過(guò)框架接口來(lái)更新和管理挺狰,不需要序列化
raw(原始狀態(tài)):原始狀態(tài)是由用戶自行管理的具體數(shù)據(jù)結(jié)構(gòu),如checkpoint的時(shí)候买窟,使用byte[]來(lái)讀寫(xiě)狀態(tài)內(nèi)容丰泊,需要序列化

通常在DataStream上的狀態(tài)推薦使用托管狀態(tài),當(dāng)用戶自定義operator時(shí)始绍,會(huì)使用到原始狀態(tài)

Keyed State TTL
任何類型的keyed都可以設(shè)置TTL瞳购。如果設(shè)置TTL已配置,且狀態(tài)值已過(guò)期亏推,則將以最佳方式清理
所有State collection都支持條目級(jí)別TTL学赛,即list、map中的條目獨(dú)立expire
用法:
val ttlConfig=StateTtlConfig.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
val descripe = new ValueStateDescriptor("avgState", TypeInformation.of(new TypeHint[(Long, Long)] {}))
descripe.enableTimeToLive(ttlConfig)

TTL相關(guān)配置
一.Refresh策略(默認(rèn)onCreateAndWrite):設(shè)置如何更新keyedState的最后訪問(wèn)時(shí)間
StateTtlConfig.UpdateType.Disabled 禁用ttl
StateTtlConfig.UpdateType.OnCreateAndWrite 每次寫(xiě)操作均更新State的最后訪問(wèn)時(shí)間(Create吞杭、Update)
StateTtlConfig.UpdateType.OnReadAndWrite 每次讀寫(xiě)操作均更新State的最后訪問(wèn)時(shí)間
二.狀可見(jiàn)性(默認(rèn)是NeverReturnExpired):設(shè)置是否返回過(guò)期的值(過(guò)期尚未處理盏浇,此時(shí)正好被訪問(wèn))
StateTtlConfig.StateVisibility.NeverReturnExpired 用不返回過(guò)期的值
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回過(guò)期但尚未清理的狀態(tài)值

TTL注意事項(xiàng)
(1)啟用TTL增加后端狀態(tài)存儲(chǔ)消耗
(2)原來(lái)沒(méi)啟用TTL,后來(lái)啟用TTL做恢復(fù)會(huì)將導(dǎo)致兼容性失敗和statmigrationexception(反之一樣)
(3)TTL配置不是檢查或保存點(diǎn)的一部分

26.Broadcast state使用套路(三步)
(1)創(chuàng)建dataStream
(2)創(chuàng)建BroadcastedStream:創(chuàng)建規(guī)則流/配置流(低吞吐)并廣播
(3)連接兩個(gè)流進(jìn)行計(jì)算 connect芽狗,proccess(BroadcastProcessFunction and keyedBroadcastProcessFunction)

27.checkpoint狀態(tài)容錯(cuò)
(1)有了狀態(tài)自然需要狀態(tài)容錯(cuò)绢掰,否則就失去意義了,flink狀態(tài)容錯(cuò)機(jī)制就是checkpoint
(2)checkpoint是通過(guò)分布式snapshot實(shí)現(xiàn)的译蒂,沒(méi)有特殊聲明時(shí)snapshot和checkpoint和back-up是一個(gè)意思

特點(diǎn):(1)異步 (2)全量和增量都可以設(shè)置 (3)Barrier機(jī)制 (4)失敗情況下可回滾到最近成功一次的checkpoint (5)周期性

使用checkpoint前置條件:
(1)在一定時(shí)間內(nèi)可回溯的datasource 例如:kafka曼月、rabiitma谊却、hdfs
(2)可持久化存儲(chǔ)state的存儲(chǔ)系統(tǒng),通常使用分布式文件系統(tǒng)柔昼,一般是hdfs,s3,nfs

checkmode:一般選擇EXACTLY_ONCE,除非場(chǎng)景要求極低會(huì)選擇AT_LEAST_ONCE(幾毫秒)

checkpoint高級(jí)選項(xiàng)值保留策略
默認(rèn)情況下檢查點(diǎn)不會(huì)被保留炎辨,僅用于從故障中恢復(fù)作業(yè)捕透。可以啟用外部持久化檢查點(diǎn)碴萧,同事指定保留策略
checkpointConfg.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
(1)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 在作業(yè)被取消時(shí)保留檢查點(diǎn)乙嘀。這種情況取消后必須手動(dòng)清除檢查點(diǎn)
(2)CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 在作業(yè)被取消(cancel)時(shí)會(huì)刪除檢查點(diǎn),等于不啟用破喻。

setCheckpointTimeout 設(shè)置超時(shí)時(shí)間虎谢,超過(guò)時(shí)間沒(méi)有完成checkpoint則被終止
setMinPauseBetweenCheckpoints 最小間隔,上一個(gè)checkpoint完成最少等待多久發(fā)出下一個(gè)checkpoint請(qǐng)求
setMaxConcurrentCheckpoints 指定運(yùn)行中多少并行度進(jìn)行checkpoint

使用checkpoint第二部:選擇合適的State Backed
(1)默認(rèn)State保存在taskmanager的內(nèi)存中
(2)checkpoint機(jī)制會(huì)持久化所有狀態(tài)的一致性快照
快照保存由State Backend來(lái)決定,目前flink自帶三個(gè)State Backed:
(1)MemoryStateBackend(默認(rèn))
(2)FsStateBackend
(3)RocksDBStateBackend

一曹质、MemoryStateBackend
(1)MemoryStateBackend是一個(gè)內(nèi)部狀態(tài)backend,用于維護(hù)Java堆上的狀態(tài)婴噩。Key/value狀態(tài)和窗口運(yùn)算符包含存儲(chǔ)值和計(jì)時(shí)器的哈希表
(2)Checkpoint時(shí)擎场,MemoryStateBackend會(huì)對(duì)state做一次快照,并像jobManager發(fā)送checkpoint確認(rèn)完成的消息中帶上此快照數(shù)據(jù),然后快照會(huì)存儲(chǔ)在JobManager的堆內(nèi)存中
(3)MemoeyStateBackend默認(rèn)開(kāi)啟異步方式進(jìn)行快照几莽,推薦使用異步避免阻塞迅办。如果要阻塞可以傳false,如下
val memoryStateBackend:StateBackend=new MemoryStateBackend(1010241024,false)
env.setStateBackend(memoryStateBackend)
(4)限制:?jiǎn)蝹€(gè)state默認(rèn)5mb,可以在MemoryStateBackend的構(gòu)造函數(shù)指定。不論如何設(shè)置章蚣,State大小無(wú)法大于akka.framesize(JobManager和TaskManager之間發(fā)送的最大消息的大小默認(rèn)10mb)站欺。Job Manager必須有足夠內(nèi)存
(5)適用場(chǎng)景:本地開(kāi)發(fā)和測(cè)試 小狀態(tài)job,如只使用Map FlatMap Fliter或Kaka Consumer

二、FsStateBackend
(1)FsStateBackend需要配置一個(gè)文件系統(tǒng)URL來(lái)纤垂,如hdfs://namenode:8080/flink/checkpoint
(2)FsStateBackend在TaskManager的內(nèi)存中持有正在處理的數(shù)據(jù)矾策。checkpoint時(shí)將state snapshot寫(xiě)入文件系統(tǒng)目錄下的文件中。
(3)FsStateBackend默認(rèn)開(kāi)啟異步方式進(jìn)行快照峭沦,構(gòu)造方法如下
val stateBackend:StateBackend=new FsStateBackend("hdfs://namenode:9000/flink/checkpoint",false)
env.setStateBackend(stateBackend)
(4)適用場(chǎng)景:大狀態(tài)蝴韭、長(zhǎng)窗口、大鍵/值狀態(tài)的job

三熙侍、RocksDBStateBackend
(1)RocksDBStateBackend需要配置一個(gè)文件系統(tǒng)的URL榄鉴。如hdfs://namenode:8080/flink/checkpoint
(2)RocksDBStateBackend運(yùn)行中的數(shù)據(jù)保存在RockDB數(shù)據(jù)庫(kù)中,默認(rèn)情況下存儲(chǔ)在TaskManager數(shù)據(jù)目錄中蛉抓。
在Checkpoint時(shí)庆尘,整個(gè)RocksDB數(shù)據(jù)庫(kù)將被checkpointed到配置的文件系統(tǒng)和目錄中
(3)RocksDBSateBackend 始終是異步
(4)RocksDB JNI API是基于Byte[],因此key和value最大支持2^31個(gè)字節(jié)(2GB)
(5)適用場(chǎng)景:超大窗口巷送,超大狀態(tài)驶忌,大鍵/值狀態(tài)的job
(6)只有RockDBStateBackend支持增量checkpoint
(7)狀態(tài)保存在數(shù)據(jù)塊中,只受可用磁盤(pán)空間量限制笑跛,但開(kāi)銷更大(讀/寫(xiě)需要反序列化與序列化)付魔,吞吐收到限制
使用需要導(dǎo)包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_{scala.binary.version}</artifactId> <version>{flink.version}</version>
</dependency>
val stateBackend:StateBackend=new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoint",true)
env.setStateBackend(stateBackend)

配置重啟策略
Flink支持不同的重啟策略,這些策略控制在出現(xiàn)故障時(shí)如何重新啟動(dòng)job
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)))
(1)如果沒(méi)用啟動(dòng)checkpoint,則使用無(wú)重啟方案
(2)如果啟用了checkpoint,但是沒(méi)有配重啟方案飞蹂,則使用固定延遲策略几苍,嘗試次數(shù)是Integer.MAX_VALUE

28.Savepoint
概念:手工觸發(fā),通過(guò)checkpointing機(jī)制創(chuàng)建的streaming job的一致性快照稱之為Savepoint
Savepoint由兩部分組成:
(1)數(shù)據(jù)目錄:穩(wěn)定存儲(chǔ)上的目錄
(2)元數(shù)據(jù)文件:指向數(shù)據(jù)目錄屬于當(dāng)前Savepoint的數(shù)據(jù)文件的指針

直接觸發(fā)Savepoint(想象你要為數(shù)據(jù)庫(kù)做個(gè)備份)
bin/flink savepoint :jobId [ 目錄](méi)

直接觸發(fā)savepoint(flink on yarn)
bin/flink savepoint :jobid [目錄](méi) -yid :yarnApplicationId

Cancel Job with Savepoint
bin/flink cncel -s [目錄](méi) :jobid

從指定Savepoint恢復(fù)job
bin/flink run -s :savepointPath

從指定Savepoint恢復(fù)job
bin/flin run -s:savepointPath -n

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末陈哑,一起剝皮案震驚了整個(gè)濱河市妻坝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌惊窖,老刑警劉巖刽宪,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異界酒,居然都是意外死亡圣拄,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)毁欣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)庇谆,“玉大人赁遗,你說(shuō)我怎么就攤上這事∽迕” “怎么了岩四?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)哥攘。 經(jīng)常有香客問(wèn)我剖煌,道長(zhǎng),這世上最難降的妖魔是什么逝淹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任耕姊,我火速辦了婚禮,結(jié)果婚禮上栅葡,老公的妹妹穿的比我還像新娘茉兰。我一直安慰自己,他們只是感情好欣簇,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布规脸。 她就那樣靜靜地躺著,像睡著了一般熊咽。 火紅的嫁衣襯著肌膚如雪莫鸭。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,031評(píng)論 1 285
  • 那天横殴,我揣著相機(jī)與錄音被因,去河邊找鬼。 笑死衫仑,一個(gè)胖子當(dāng)著我的面吹牛梨与,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播文狱,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼粥鞋,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了如贷?” 一聲冷哼從身側(cè)響起陷虎,我...
    開(kāi)封第一講書(shū)人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎杠袱,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體窝稿,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡楣富,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伴榔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纹蝴。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡庄萎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出塘安,到底是詐尸還是另有隱情糠涛,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布兼犯,位于F島的核電站忍捡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏切黔。R本人自食惡果不足惜砸脊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纬霞。 院中可真熱鬧凌埂,春花似錦、人聲如沸诗芜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)伏恐。三九已至挨下,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間脐湾,已是汗流浹背臭笆。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留秤掌,地道東北人愁铺。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像闻鉴,于是被迫代替她去往敵國(guó)和親茵乱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345