鏈接:http://shiyanjun.cn/archives/1855.html
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink(有關于狀態(tài)的序列化)
開啟
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs://home/wangxiaotong/data/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
上面調用enableExternalizedCheckpoints設置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后露氮,會保留Checkpoint數(shù)據(jù)钠至,以便根據(jù)實際需要恢復到指定的Checkpoint處理。上面代碼配置了執(zhí)行Checkpointing的時間間隔為1分鐘驮审。
保存多個Checkpoint
默認情況下,如果設置了Checkpoint選項吉执,則Flink只保留最近成功生成的1個Checkpoint疯淫,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復戳玫。但是熙掺,如果我們希望保留多個Checkpoint,并能夠根據(jù)實際需要選擇其中一個進行恢復咕宿,這樣會更加靈活币绩,比如,我們發(fā)現(xiàn)最近4個小時數(shù)據(jù)記錄處理有問題府阀,希望將整個狀態(tài)還原到4小時之前缆镣。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中试浙,添加如下配置董瞻,指定最多需要保存Checkpoint的個數(shù):
state.checkpoints.num-retained: 20
在HDFS的相應文件夾下面會產(chǎn)生多個checkpoint文件。
從Checkpoint進行恢復
如果Flink程序異常失敗田巴,或者最近一段時間內數(shù)據(jù)處理錯誤钠糊,我們可以將程序從某一個Checkpoint點,比如chk-860進行回放固额,執(zhí)行如下命令:
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
從上面我們可以看到眠蚂,前面Flink Job的ID為582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID為名稱的目錄里面斗躏,當Job停掉后逝慧,重新從某個Checkpoint點(chk-860)進行恢復時昔脯,重新生成Job ID(這里是11bbc5d9933e4ff7e25198a760e9792e),而對應的Checkpoint編號會從該次運行基于的編號繼續(xù)連續(xù)生成:chk-861笛臣、chk-862云稚、chk-863等等。
Flink Savepoint
Savepoint會在Flink Job之外存儲自包含(self-contained)結構的Checkpoint沈堡,它使用Flink的Checkpointing機制來創(chuàng)建一個非增量的Snapshot静陈,里面包含Streaming程序的狀態(tài),并將Checkpoint的數(shù)據(jù)存儲到外部存儲系統(tǒng)中诞丽。
Flink程序中包含兩種狀態(tài)數(shù)據(jù)鲸拥,一種是用戶定義的狀態(tài)(User-defined State),他們是基于Flink的Transformation函數(shù)來創(chuàng)建或者修改得到的狀態(tài)數(shù)據(jù)僧免;另一種是系統(tǒng)狀態(tài)(System State)刑赶,他們是指作為Operator計算一部分的數(shù)據(jù)Buffer等狀態(tài)數(shù)據(jù),比如在使用Window Function時懂衩,在Window內部緩存Streaming數(shù)據(jù)記錄撞叨。為了能夠在創(chuàng)建Savepoint過程中,唯一識別對應的Operator的狀態(tài)數(shù)據(jù)浊洞,F(xiàn)link提供了API來為程序中每個Operator設置ID牵敷,這樣可以在后續(xù)更新/升級程序的時候,可以在Savepoint數(shù)據(jù)中基于Operator ID來與對應的狀態(tài)信息進行匹配法希,從而實現(xiàn)恢復枷餐。當然,如果我們不指定Operator ID铁材,F(xiàn)link也會我們自動生成對應的Operator狀態(tài)ID尖淘。
而且奕锌,強烈建議手動為每個Operator設置ID著觉,即使未來Flink應用程序可能會改動很大,比如替換原來的Operator實現(xiàn)惊暴、增加新的Operator饼丘、刪除Operator等等,至少我們有可能與Savepoint中存儲的Operator狀態(tài)對應上辽话。另外肄鸽,保存的Savepoint狀態(tài)數(shù)據(jù),畢竟是基于當時程序及其內存數(shù)據(jù)結構生成的油啤,所以如果未來Flink程序改動比較大典徘,尤其是對應的需要操作的內存數(shù)據(jù)結構都變化了,可能根本就無法從原來舊的Savepoint正確地恢復益咬。
下面逮诲,我們以Flink官網(wǎng)文檔中給定的例子,來看下如何設置Operator ID,代碼如下所示:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
創(chuàng)建Savepoint
創(chuàng)建一個Savepoint梅鹦,需要指定對應Savepoint目錄裆甩,有兩種方式來指定:
一種是,需要配置Savepoint的默認路徑齐唆,需要在Flink的配置文件conf/flink-conf.yaml中嗤栓,添加如下配置,設置Savepoint存儲目錄箍邮,例如如下所示:
state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另一種是茉帅,在手動執(zhí)行savepoint命令的時候,指定Savepoint存儲目錄锭弊,命令格式如下所示:
bin/flink savepoint :jobId [:targetDirectory]
例如担敌,正在運行的Flink Job對應的ID為40dcc6d2ba90f13930abce295de8d038,使用默認state.savepoints.dir配置指定的Savepoint目錄廷蓉,執(zhí)行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
可以看到全封,在目錄hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數(shù)據(jù)。
為正在運行的Flink Job指定一個目錄存儲Savepoint數(shù)據(jù)桃犬,執(zhí)行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
可以看到刹悴,在目錄 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數(shù)據(jù)。
從Savepoint恢復
現(xiàn)在攒暇,我們可以停掉Job 40dcc6d2ba90f13930abce295de8d038土匀,然后通過Savepoint命令來恢復Job運行,命令格式如下所示:
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint為例形用,恢復Job運行就轧,執(zhí)行如下命令:
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
可以看到,啟動一個新的Flink Job田度,ID為cdbae3af1b7441839e7c03bab0d0eefd妒御。
Savepoint目錄結構
下面,我們看一下Savepoint目錄下面存儲內容的結構镇饺,如下所示:
hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r-- 3 hadoop supergroup 4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r-- 3 hadoop supergroup 4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r-- 3 hadoop supergroup 4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r-- 3 hadoop supergroup 4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r-- 3 hadoop supergroup 4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a
如上面列出的HDFS路徑中乎莉,11bbc5是Flink Job ID字符串前6個字符,后面bd967f90709b是隨機生成的字符串奸笤,然后savepoint-11bbc5-bd967f90709b作為存儲此次Savepoint數(shù)據(jù)的根目錄惋啃,最后savepoint-11bbc5-bd967f90709b目錄下面_metadata文件包含了Savepoint的元數(shù)據(jù)信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目錄下面其它文件的路徑监右,這些文件內容都是序列化的狀態(tài)信息边灭。
使用EventTime與WaterMark
WaterMark通過數(shù)據(jù)源或水印生成器插入到流中。
下面健盒,我們通過實際編程實踐绒瘦,來說明一些需要遵守的基本原則宠互,以便在開發(fā)中進行合理設置。
在開發(fā)Flink流數(shù)據(jù)處理程序時椭坚,需要指定Time Notion予跌,F(xiàn)link API提供了TimeCharacteristic枚舉類,內部定義了3種Time Notion(參考上面說明)善茎。設置Time Notion的示例代碼券册,如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
上面,我們指定了基于TimeCharacteristic.EventTime來進行數(shù)據(jù)處理垂涯。如果我們沒有顯式指定TimeCharacteristic烁焙,默認使用TimeCharacteristic.ProcessTime。
基于EventTime的數(shù)據(jù)處理耕赘,需要對進入的數(shù)據(jù)元素指派時間戳骄蝇,并且指定如何生成WaterMark,這樣才能通過WaterMark來機制控制輸入數(shù)據(jù)的完整性(事件到達)操骡,以便觸發(fā)對指定Window進行計算九火。有兩種方式實現(xiàn)時間戳指派和生成WaterMark:
- 在Flink程序一開始調用assignTimestampsAndWatermarks()進行指派
- 在Source Operator中直接指派
下面,我們會基于這兩種方式進行編碼實現(xiàn):
調用assignTimestampsAndWatermarks()進行指派
TimeWindow的大小設置為1分鐘(60000ms)册招,允許延遲到達時間設置為50秒(50000ms)岔激,并且為了模擬流數(shù)據(jù)元素事件時間早于當前處理系統(tǒng)的系統(tǒng)時間,設置延遲時間為2分鐘(120000ms)是掰。
我們自定義實現(xiàn)了一個用來模擬的Source Operator虑鼎,代碼如下所示:
class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
@volatile private var running = true
val channelSet = Seq("a", "b", "c", "d")
val behaviorTypes = Seq(
"INSTALL", "OPEN", "BROWSE", "CLICK",
"PURCHASE", "CLOSE", "UNINSTALL")
val rand = Random
override def run(ctx: SourceContext[String]): Unit = {
val numElements = Long.MaxValue
var count = 0L
while (running && count < numElements) {
val channel = channelSet(rand.nextInt(channelSet.size))
val event = generateEvent()
LOG.debug("Event: " + event)
val ts = event(0)
val id = event(1)
val behaviorType = event(2)
ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
private def generateEvent(): Seq[String] = {
// simulate 10 seconds lateness
val ts = Instant.ofEpochMilli(System.currentTimeMillis)
.minusMillis(latenessMillis)
.toEpochMilli
val id = UUID.randomUUID().toString
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
// (ts, id, behaviorType)
Seq(ts.toString, id, behaviorType)
}
override def cancel(): Unit = running = false
}
流數(shù)據(jù)中的數(shù)據(jù)元素為字符串記錄行的格式,包含字段:事件時間键痛、渠道炫彩、用戶編號、用戶行為類型絮短。這里江兢,我們直接調用SourceContext.collect()方法,將數(shù)據(jù)元素發(fā)送到下游進行處理戚丸。
在Flink程序中划址,通過調用stream: DataStream[T]的assignTimestampsAndWatermarks()進行時間戳的指派扔嵌,并生成WaterMark限府。然后,基于Keyed Window生成Tumbling Window(不存在Window重疊)來操作數(shù)據(jù)記錄痢缎。最后胁勺,將計算結果輸出到Kafka中去。
對應的實現(xiàn)代碼独旷,如下所示:
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
checkParams(params)
val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
val windowSizeMillis = params.getRequired("window-size-millis").toLong
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置為TimeCharacteristic.EventTime
val stream: DataStream[String] = env.addSource(new StringLineEventSource(sourceLatenessMillis))
// create a Kafka producer for Kafka 0.9.x
val kafkaProducer = new FlinkKafkaProducer09(
params.getRequired("window-result-topic"),
new SimpleStringSchema, params.getProperties
)
stream
.setParallelism(1)
.assignTimestampsAndWatermarks( // 指派時間戳署穗,并生成WaterMark
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
override def extractTimestamp(element: String): Long = {
element.split("\t")(0).toLong
}
})
.setParallelism(2)
.map(line => {
// ts, channel, id, behaviorType
val a = line.split("\t")
val channel = a(1)
((channel, a(3)), 1L)
})
.setParallelism(3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis))) // 使用Keyed Window
.process(new EventTimeWindowReduceFunction())
.setParallelism(4)
.map(t => {
val windowStart = t._1
val windowEnd = t._2
val channel = t._3
val behaviorType = t._4
val count = t._5
Seq(windowStart, windowEnd, channel, behaviorType, count).mkString("\t")
})
.setParallelism(3)
.addSink(kafkaProducer)
.setParallelism(3)
env.execute(getClass.getSimpleName)
}
上面寥裂,我們使用了Flink內建實現(xiàn)的BoundedOutOfOrdernessTimestampExtractor來指派時間戳和生成WaterMark。這里案疲,我們實現(xiàn)了從事件記錄中提取時間戳的邏輯封恰,實際生成WaterMark的邏輯使用BoundedOutOfOrdernessTimestampExtractor提供的默認邏輯,在getCurrentWatermark()方法中褐啡。我們來看下BoundedOutOfOrdernessTimestampExtractor的實現(xiàn)诺舔,代碼如下所示:
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始設置當前最大事件時間戳
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 當前最大事件時間戳,減去允許最大延遲到達時間
if (potentialWM >= lastEmittedWatermark) { // 檢查上一次emit的WaterMark時間戳备畦,如果比lastEmittedWatermark大則更新其值
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) { // 檢查新到達的數(shù)據(jù)元素的事件時間低飒,用currentMaxTimestamp記錄下當前最大的
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
可以看到,在getCurrentWatermark()和extractTimestamp()方法中懂盐,lastEmittedWatermark是WaterMark中的時間戳褥赊,計算它時,總是根據(jù)當前進入Flink處理系統(tǒng)的數(shù)據(jù)元素的最大的事件時間currentMaxTimestamp莉恼,然后再減去一個maxOutOfOrderness(外部配置的支持最大延遲到達的時間)拌喉,也就說,這里面實現(xiàn)的WaterMark中的時間戳序列是非嚴格單調遞增的俐银。
我們實現(xiàn)的Flink程序為EventTimeTumblingWindowAnalytics司光,提交到Flink集群運行,執(zhí)行如下命令:
bin/flink run --class org.shirdrn.flink.windowing.EventTimeTumblingWindowAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
--window-result-topic windowed-result-topic \
--zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
--bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
--source-lateness-millis 120000 \
--window-lagged-millis 50000 \
--window-size-millis 60000
在Source Operator中直接指派
和上面我們最終期望的邏輯基本保持一致悉患,我們把指派時間戳和生成WaterMark的邏輯残家,提取出來放到Source Operator實現(xiàn)中,對應的關鍵代碼片段售躁,如下所示:
var lastEmittedWatermark = Long.MinValue
var currentMaxTimestamp = Long.MinValue + maxLaggedTimeMillis
... ...
ctx.collectWithTimestamp(Seq(ts, channel, id, behaviorType).mkString("\t"), ts.toLong)
ctx.emitWatermark(getCurrentWatermark(ts.toLong))
... ...
private def getCurrentWatermark(ts: Long): Watermark = {
if (ts > currentMaxTimestamp) {
currentMaxTimestamp = ts
}
val watermarkTs = currentMaxTimestamp - maxLaggedTimeMillis
if (watermarkTs >= lastEmittedWatermark) {
lastEmittedWatermark = watermarkTs
}
new Watermark(lastEmittedWatermark)
}
需要在Flink程序的main方法中坞淮,將外部配置的與WaterMark生成相關的參數(shù)值,傳到Source Operator實現(xiàn)類中陪捷,如下所示:
val stream: DataStream[String] = env.addSource(
new StringLineEventSourceWithTsAndWaterMark(sourceLatenessMillis, maxLaggedTimeMillis))
同時回窘,把前面調用assignTimestampsAndWatermarks()的方法去掉即可。
編譯后市袖,提交到Flink集群運行啡直,可以查看輸出結果,和前面類似苍碟,輸出結果正是我們所期望的酒觅。
Keyed Window與Non-Keyed Window
鏈接:http://shiyanjun.cn/archives/1775.html
從編程API上看,Keyed Window編程結構微峰,可以直接對輸入的stream按照Key進行操作舷丹,輸入的stream中識別Key,即輸入stream中的每個數(shù)據(jù)元素哪一部分是作為Key來關聯(lián)這個數(shù)據(jù)元素的蜓肆,這樣就可以對stream中的數(shù)據(jù)元素基于Key進行相關計算操作颜凯,如keyBy谋币,可以根據(jù)Key進行分組(相同的Key必然可以分到同一組中去)。如果輸入的stream中沒有Key症概,比如就是一條日志記錄信息蕾额,那么無法對其進行keyBy操作。而對于Non-Keyed Window編程結構來說彼城,無論輸入的stream具有何種結構(比如是否具有Key)凡简,它都認為是無結構的,不能對其進行keyBy操作精肃,而且如果使用Non-Keyed Window函數(shù)操作秤涩,就會對該stream進行分組(具體如何分組依賴于我們選擇的WindowAssigner,它負責將stream中的每個數(shù)據(jù)元素指派到一個或多個Window中)司抱,指派到一個或多個Window中筐眷,然后后續(xù)應用到該stream上的計算都是對Window中的這些數(shù)據(jù)元素進行操作。
從計算上看习柠,Keyed Window編程結構會將輸入的stream轉換成Keyed stream匀谣,邏輯上會對應多個Keyed stream,每個Keyed stream會獨立進行計算资溃,這就使得多個Task可以對Windowing操作進行并行處理武翎,具有相同Key的數(shù)據(jù)元素會被發(fā)到同一個Task中進行處理。而對于Non-Keyed Window編程結構溶锭,Non-Keyed stream邏輯上將不能split成多個stream宝恶,所有的Windowing操作邏輯只能在一個Task中進行處理,也就是說計算并行度為1趴捅。
在實際編程過程中垫毙,我們可以看到DataStream的API也有對應的方法timeWindow()和timeWindowAll(),他們也分別對應著Keyed Window和Non-Keyed Window拱绑。
sate
轉載自過往記憶(https://www.iteblog.com/)
鏈接:https://www.iteblog.com/archives/2417.html
note:Operator States的數(shù)據(jù)結構不像Keyed States豐富综芥,現(xiàn)在只支持List
用戶可以根據(jù)自己的需求選擇,如果數(shù)據(jù)量較小猎拨,可以存放到MemoryStateBackend和FsStateBackend中膀藐,如果數(shù)據(jù)量較大,可以放到RockDB中红省。
WindowFunction (Legacy)
窗口的處理函數(shù)额各,但是獲取的contextual信息少,也沒有一些先進的特征类腮,比如per-window keyed state臊泰。未來將會被拋棄。
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
時間窗口的機制
Tumbling windows are aligned to epoch, i.e, 1970-01-01T00:00:00. So with a tumbling window of 10 seconds 2017-07-03T20:19:35Z would to into the window from [2017-07-03T20:19:30Z, 2017-07-03T20:19:40Z) and 2017-07-03T20:22:30Z would into [2017-07-03T20:22:30Z, 2017-07-03T20:22:40Z).
35的事件觸發(fā)的窗口是[30-40)的窗口蚜枢。
MapStateDescriptor
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Word> descriptor =
new MapStateDescriptor<>(
"words", // the state name
BasicTypeInfo.STRING_TYPE_INFO, // type information
TypeInformation.of(new TypeHint<Word>() {})
); // default value of the state, if nothing was set
// sum = getRuntimeContext().getState(descriptor);
}
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Word> descriptor =
new MapStateDescriptor<>(
"words", // the state name
String.class, // type information
Word.class
); // default value of the state, if nothing was set
// sum = getRuntimeContext().getState(descriptor);
}
設計草稿
時間缸逃、窗口、水印
鏈接:https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams(好
)
講了窗口厂抽,時間需频,水印的概念,還有時間窗口計算后的時間戳應該怎么賦值筷凤。
state昭殉、fault tolence
Streams and Operations on Streams
鏈接:https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
TaskManager故障恢復
JobManager可以通過配置高可用的zookeeper來保證故障恢復,TaskManager怎么故障恢復能藐守?
Keep in mind that in In stand alone mode a TM process that has exited
won't be automatically restarted though.
實踐所得:在standalone集群模式下挪丢,殺死一個TaskManager,那么機器上的這個TaskManager不會重啟卢厂,會使用其他機器剩余可用的TaskManager中slots來運行失敗的task乾蓬。
Slots
A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.
flink使用的slot數(shù)目是最高的并行度數(shù)目。
alignment
alignment的影響
Asynchronous State Snapshots
只有RockDB里面才有
各種state的例子
鏈接:https://www.slideshare.net/tillrohrmann/fault-tolerance-and-job-recovery-in-apache-flink
state backend
同步和異步的checkpoint區(qū)別
異步
鏈接:http://www.reibang.com/p/a2a0dade97b8
我們注意到上面描述的機制意味著當 operator 向后端存儲快照時慎恒,會停止處理輸入的數(shù)據(jù)任内。這種同步操作會在每次快照創(chuàng)建時引入延遲。
我們完全可以在存儲快照時融柬,讓 operator 繼續(xù)處理數(shù)據(jù)死嗦,讓快照存儲在后臺異步運行。為了做到這一點粒氧,operator 必須能夠生成一個后續(xù)修改不影響之前狀態(tài)的狀態(tài)對象越除。例如 RocksDB 中使用的寫時復制( copy-on-write )類型的數(shù)據(jù)結構。
接收到輸入的 barrier 時外盯,operator異步快照復制出的狀態(tài)(注:checkpoint的同步部分廊敌,復制狀態(tài)可能會花費較多的時間,這也是為什么checkpoint同步部分時間很長的原因)
门怪。然后立即發(fā)射 barrier 到輸出流骡澈,繼續(xù)正常的流處理。一旦后臺異步快照完成掷空,它就會向 checkpoint coordinator(JobManager)確認 checkpoint 完成±吲梗現(xiàn)在 checkpoint 完成的充分條件是:所有 sink 接收到了 barrier,所有有狀態(tài) operator 都確認完成了狀態(tài)備份(可能會比 sink 接收到 barrier 晚)坦弟。
RocksDBStateBackend 模式對于較大的 Key 進行更新操作時序列化和反序列化耗時很多护锤。可以考慮使用 FsStateBackend 模式替代酿傍。