flink使用checkpoint等知識

鏈接:http://shiyanjun.cn/archives/1855.html
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink(有關于狀態(tài)的序列化)

開啟

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);
        env.setStateBackend(new FsStateBackend("hdfs://home/wangxiaotong/data/flink/checkpoints"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

上面調用enableExternalizedCheckpoints設置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后露氮,會保留Checkpoint數(shù)據(jù)钠至,以便根據(jù)實際需要恢復到指定的Checkpoint處理。上面代碼配置了執(zhí)行Checkpointing的時間間隔為1分鐘驮审。

保存多個Checkpoint

默認情況下,如果設置了Checkpoint選項吉执,則Flink只保留最近成功生成的1個Checkpoint疯淫,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復戳玫。但是熙掺,如果我們希望保留多個Checkpoint,并能夠根據(jù)實際需要選擇其中一個進行恢復咕宿,這樣會更加靈活币绩,比如,我們發(fā)現(xiàn)最近4個小時數(shù)據(jù)記錄處理有問題府阀,希望將整個狀態(tài)還原到4小時之前缆镣。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中试浙,添加如下配置董瞻,指定最多需要保存Checkpoint的個數(shù):
state.checkpoints.num-retained: 20
在HDFS的相應文件夾下面會產(chǎn)生多個checkpoint文件。

從Checkpoint進行恢復

如果Flink程序異常失敗田巴,或者最近一段時間內數(shù)據(jù)處理錯誤钠糊,我們可以將程序從某一個Checkpoint點,比如chk-860進行回放固额,執(zhí)行如下命令:
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
從上面我們可以看到眠蚂,前面Flink Job的ID為582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID為名稱的目錄里面斗躏,當Job停掉后逝慧,重新從某個Checkpoint點(chk-860)進行恢復時昔脯,重新生成Job ID(這里是11bbc5d9933e4ff7e25198a760e9792e),而對應的Checkpoint編號會從該次運行基于的編號繼續(xù)連續(xù)生成:chk-861笛臣、chk-862云稚、chk-863等等。

Flink Savepoint

Savepoint會在Flink Job之外存儲自包含(self-contained)結構的Checkpoint沈堡,它使用Flink的Checkpointing機制來創(chuàng)建一個非增量的Snapshot静陈,里面包含Streaming程序的狀態(tài),并將Checkpoint的數(shù)據(jù)存儲到外部存儲系統(tǒng)中诞丽。

Flink程序中包含兩種狀態(tài)數(shù)據(jù)鲸拥,一種是用戶定義的狀態(tài)(User-defined State),他們是基于Flink的Transformation函數(shù)來創(chuàng)建或者修改得到的狀態(tài)數(shù)據(jù)僧免;另一種是系統(tǒng)狀態(tài)(System State)刑赶,他們是指作為Operator計算一部分的數(shù)據(jù)Buffer等狀態(tài)數(shù)據(jù),比如在使用Window Function時懂衩,在Window內部緩存Streaming數(shù)據(jù)記錄撞叨。為了能夠在創(chuàng)建Savepoint過程中,唯一識別對應的Operator的狀態(tài)數(shù)據(jù)浊洞,F(xiàn)link提供了API來為程序中每個Operator設置ID牵敷,這樣可以在后續(xù)更新/升級程序的時候,可以在Savepoint數(shù)據(jù)中基于Operator ID來與對應的狀態(tài)信息進行匹配法希,從而實現(xiàn)恢復枷餐。當然,如果我們不指定Operator ID铁材,F(xiàn)link也會我們自動生成對應的Operator狀態(tài)ID尖淘。
而且奕锌,強烈建議手動為每個Operator設置ID著觉,即使未來Flink應用程序可能會改動很大,比如替換原來的Operator實現(xiàn)惊暴、增加新的Operator饼丘、刪除Operator等等,至少我們有可能與Savepoint中存儲的Operator狀態(tài)對應上辽话。另外肄鸽,保存的Savepoint狀態(tài)數(shù)據(jù),畢竟是基于當時程序及其內存數(shù)據(jù)結構生成的油啤,所以如果未來Flink程序改動比較大典徘,尤其是對應的需要操作的內存數(shù)據(jù)結構都變化了,可能根本就無法從原來舊的Savepoint正確地恢復益咬。

下面逮诲,我們以Flink官網(wǎng)文檔中給定的例子,來看下如何設置Operator ID,代碼如下所示:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

創(chuàng)建Savepoint

創(chuàng)建一個Savepoint梅鹦,需要指定對應Savepoint目錄裆甩,有兩種方式來指定:
一種是,需要配置Savepoint的默認路徑齐唆,需要在Flink的配置文件conf/flink-conf.yaml中嗤栓,添加如下配置,設置Savepoint存儲目錄箍邮,例如如下所示:
state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另一種是茉帅,在手動執(zhí)行savepoint命令的時候,指定Savepoint存儲目錄锭弊,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]
例如担敌,正在運行的Flink Job對應的ID為40dcc6d2ba90f13930abce295de8d038,使用默認state.savepoints.dir配置指定的Savepoint目錄廷蓉,執(zhí)行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
可以看到全封,在目錄hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數(shù)據(jù)。
為正在運行的Flink Job指定一個目錄存儲Savepoint數(shù)據(jù)桃犬,執(zhí)行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
可以看到刹悴,在目錄 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數(shù)據(jù)。

從Savepoint恢復

現(xiàn)在攒暇,我們可以停掉Job 40dcc6d2ba90f13930abce295de8d038土匀,然后通過Savepoint命令來恢復Job運行,命令格式如下所示:
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint為例形用,恢復Job運行就轧,執(zhí)行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
可以看到,啟動一個新的Flink Job田度,ID為cdbae3af1b7441839e7c03bab0d0eefd妒御。

Savepoint目錄結構

下面,我們看一下Savepoint目錄下面存儲內容的結構镇饺,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路徑中乎莉,11bbc5是Flink Job ID字符串前6個字符,后面bd967f90709b是隨機生成的字符串奸笤,然后savepoint-11bbc5-bd967f90709b作為存儲此次Savepoint數(shù)據(jù)的根目錄惋啃,最后savepoint-11bbc5-bd967f90709b目錄下面_metadata文件包含了Savepoint的元數(shù)據(jù)信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目錄下面其它文件的路徑监右,這些文件內容都是序列化的狀態(tài)信息边灭。

使用EventTime與WaterMark

WaterMark通過數(shù)據(jù)源或水印生成器插入到流中。
下面健盒,我們通過實際編程實踐绒瘦,來說明一些需要遵守的基本原則宠互,以便在開發(fā)中進行合理設置。
在開發(fā)Flink流數(shù)據(jù)處理程序時椭坚,需要指定Time Notion予跌,F(xiàn)link API提供了TimeCharacteristic枚舉類,內部定義了3種Time Notion(參考上面說明)善茎。設置Time Notion的示例代碼券册,如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

上面,我們指定了基于TimeCharacteristic.EventTime來進行數(shù)據(jù)處理垂涯。如果我們沒有顯式指定TimeCharacteristic烁焙,默認使用TimeCharacteristic.ProcessTime。
基于EventTime的數(shù)據(jù)處理耕赘,需要對進入的數(shù)據(jù)元素指派時間戳骄蝇,并且指定如何生成WaterMark,這樣才能通過WaterMark來機制控制輸入數(shù)據(jù)的完整性(事件到達)操骡,以便觸發(fā)對指定Window進行計算九火。有兩種方式實現(xiàn)時間戳指派和生成WaterMark:

  • 在Flink程序一開始調用assignTimestampsAndWatermarks()進行指派
  • 在Source Operator中直接指派
    下面,我們會基于這兩種方式進行編碼實現(xiàn):

調用assignTimestampsAndWatermarks()進行指派

TimeWindow的大小設置為1分鐘(60000ms)册招,允許延遲到達時間設置為50秒(50000ms)岔激,并且為了模擬流數(shù)據(jù)元素事件時間早于當前處理系統(tǒng)的系統(tǒng)時間,設置延遲時間為2分鐘(120000ms)是掰。
我們自定義實現(xiàn)了一個用來模擬的Source Operator虑鼎,代碼如下所示:

class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
 
  val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
  @volatile private var running = true
  val channelSet = Seq("a", "b", "c", "d")
  val behaviorTypes = Seq(
    "INSTALL", "OPEN", "BROWSE", "CLICK",
    "PURCHASE", "CLOSE", "UNINSTALL")
  val rand = Random
 
  override def run(ctx: SourceContext[String]): Unit = {
    val numElements = Long.MaxValue
    var count = 0L
 
    while (running && count < numElements) {
      val channel = channelSet(rand.nextInt(channelSet.size))
      val event = generateEvent()
      LOG.debug("Event: " + event)
      val ts = event(0)
      val id = event(1)
      val behaviorType = event(2)
      ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
      count += 1
      TimeUnit.MILLISECONDS.sleep(5L)
    }
  }
 
  private def generateEvent(): Seq[String] = {
    // simulate 10 seconds lateness
    val ts = Instant.ofEpochMilli(System.currentTimeMillis)
      .minusMillis(latenessMillis)
      .toEpochMilli
 
    val id = UUID.randomUUID().toString
    val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
    // (ts, id, behaviorType)
    Seq(ts.toString, id, behaviorType)
  }
 
  override def cancel(): Unit = running = false
}

流數(shù)據(jù)中的數(shù)據(jù)元素為字符串記錄行的格式,包含字段:事件時間键痛、渠道炫彩、用戶編號、用戶行為類型絮短。這里江兢,我們直接調用SourceContext.collect()方法,將數(shù)據(jù)元素發(fā)送到下游進行處理戚丸。
在Flink程序中划址,通過調用stream: DataStream[T]的assignTimestampsAndWatermarks()進行時間戳的指派扔嵌,并生成WaterMark限府。然后,基于Keyed Window生成Tumbling Window(不存在Window重疊)來操作數(shù)據(jù)記錄痢缎。最后胁勺,將計算結果輸出到Kafka中去。
對應的實現(xiàn)代碼独旷,如下所示:

def main(args: Array[String]): Unit = {
 
  val params = ParameterTool.fromArgs(args)
  checkParams(params)
  val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
  maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
  val windowSizeMillis = params.getRequired("window-size-millis").toLong
 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置為TimeCharacteristic.EventTime
 
  val stream: DataStream[String] = env.addSource(new StringLineEventSource(sourceLatenessMillis))
 
  // create a Kafka producer for Kafka 0.9.x
  val kafkaProducer = new FlinkKafkaProducer09(
    params.getRequired("window-result-topic"),
    new SimpleStringSchema, params.getProperties
  )
 
  stream
    .setParallelism(1)
    .assignTimestampsAndWatermarks( // 指派時間戳署穗,并生成WaterMark
       new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
         override def extractTimestamp(element: String): Long = {
           element.split("\t")(0).toLong
         }
       })
    .setParallelism(2)
    .map(line => {
      // ts, channel, id, behaviorType
      val a = line.split("\t")
      val channel = a(1)
      ((channel, a(3)), 1L)
    })
    .setParallelism(3)
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis))) // 使用Keyed Window
    .process(new EventTimeWindowReduceFunction())
    .setParallelism(4)
    .map(t => {
      val windowStart = t._1
      val windowEnd = t._2
      val channel = t._3
      val behaviorType = t._4
      val count = t._5
      Seq(windowStart, windowEnd, channel, behaviorType, count).mkString("\t")
    })
    .setParallelism(3)
    .addSink(kafkaProducer)
    .setParallelism(3)
 
  env.execute(getClass.getSimpleName)
}

上面寥裂,我們使用了Flink內建實現(xiàn)的BoundedOutOfOrdernessTimestampExtractor來指派時間戳和生成WaterMark。這里案疲,我們實現(xiàn)了從事件記錄中提取時間戳的邏輯封恰,實際生成WaterMark的邏輯使用BoundedOutOfOrdernessTimestampExtractor提供的默認邏輯,在getCurrentWatermark()方法中褐啡。我們來看下BoundedOutOfOrdernessTimestampExtractor的實現(xiàn)诺舔,代碼如下所示:

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
 
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;
    private final long maxOutOfOrderness;
 
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始設置當前最大事件時間戳
    }
 
    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }
 
    public abstract long extractTimestamp(T element);
 
    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 當前最大事件時間戳,減去允許最大延遲到達時間
        if (potentialWM >= lastEmittedWatermark) { // 檢查上一次emit的WaterMark時間戳备畦,如果比lastEmittedWatermark大則更新其值
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }
 
    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) { // 檢查新到達的數(shù)據(jù)元素的事件時間低飒,用currentMaxTimestamp記錄下當前最大的
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

可以看到,在getCurrentWatermark()和extractTimestamp()方法中懂盐,lastEmittedWatermark是WaterMark中的時間戳褥赊,計算它時,總是根據(jù)當前進入Flink處理系統(tǒng)的數(shù)據(jù)元素的最大的事件時間currentMaxTimestamp莉恼,然后再減去一個maxOutOfOrderness(外部配置的支持最大延遲到達的時間)拌喉,也就說,這里面實現(xiàn)的WaterMark中的時間戳序列是非嚴格單調遞增的俐银。
我們實現(xiàn)的Flink程序為EventTimeTumblingWindowAnalytics司光,提交到Flink集群運行,執(zhí)行如下命令:

bin/flink run --class org.shirdrn.flink.windowing.EventTimeTumblingWindowAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
  --window-result-topic windowed-result-topic \
  --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
  --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
  --source-lateness-millis 120000 \
  --window-lagged-millis 50000 \
  --window-size-millis 60000

在Source Operator中直接指派

和上面我們最終期望的邏輯基本保持一致悉患,我們把指派時間戳和生成WaterMark的邏輯残家,提取出來放到Source Operator實現(xiàn)中,對應的關鍵代碼片段售躁,如下所示:

  var lastEmittedWatermark = Long.MinValue
  var currentMaxTimestamp = Long.MinValue + maxLaggedTimeMillis
... ...
      ctx.collectWithTimestamp(Seq(ts, channel, id, behaviorType).mkString("\t"), ts.toLong)
      ctx.emitWatermark(getCurrentWatermark(ts.toLong))
... ...
  private def getCurrentWatermark(ts: Long): Watermark = {
    if (ts > currentMaxTimestamp) {
      currentMaxTimestamp = ts
    }
    val watermarkTs = currentMaxTimestamp - maxLaggedTimeMillis
    if (watermarkTs >= lastEmittedWatermark) {
      lastEmittedWatermark = watermarkTs
    }
    new Watermark(lastEmittedWatermark)
  }

需要在Flink程序的main方法中坞淮,將外部配置的與WaterMark生成相關的參數(shù)值,傳到Source Operator實現(xiàn)類中陪捷,如下所示:

val stream: DataStream[String] = env.addSource(
  new StringLineEventSourceWithTsAndWaterMark(sourceLatenessMillis, maxLaggedTimeMillis))

同時回窘,把前面調用assignTimestampsAndWatermarks()的方法去掉即可。
編譯后市袖,提交到Flink集群運行啡直,可以查看輸出結果,和前面類似苍碟,輸出結果正是我們所期望的酒觅。

Keyed Window與Non-Keyed Window

鏈接:http://shiyanjun.cn/archives/1775.html
從編程API上看,Keyed Window編程結構微峰,可以直接對輸入的stream按照Key進行操作舷丹,輸入的stream中識別Key,即輸入stream中的每個數(shù)據(jù)元素哪一部分是作為Key來關聯(lián)這個數(shù)據(jù)元素的蜓肆,這樣就可以對stream中的數(shù)據(jù)元素基于Key進行相關計算操作颜凯,如keyBy谋币,可以根據(jù)Key進行分組(相同的Key必然可以分到同一組中去)。如果輸入的stream中沒有Key症概,比如就是一條日志記錄信息蕾额,那么無法對其進行keyBy操作。而對于Non-Keyed Window編程結構來說彼城,無論輸入的stream具有何種結構(比如是否具有Key)凡简,它都認為是無結構的,不能對其進行keyBy操作精肃,而且如果使用Non-Keyed Window函數(shù)操作秤涩,就會對該stream進行分組(具體如何分組依賴于我們選擇的WindowAssigner,它負責將stream中的每個數(shù)據(jù)元素指派到一個或多個Window中)司抱,指派到一個或多個Window中筐眷,然后后續(xù)應用到該stream上的計算都是對Window中的這些數(shù)據(jù)元素進行操作。
從計算上看习柠,Keyed Window編程結構會將輸入的stream轉換成Keyed stream匀谣,邏輯上會對應多個Keyed stream,每個Keyed stream會獨立進行計算资溃,這就使得多個Task可以對Windowing操作進行并行處理武翎,具有相同Key的數(shù)據(jù)元素會被發(fā)到同一個Task中進行處理。而對于Non-Keyed Window編程結構溶锭,Non-Keyed stream邏輯上將不能split成多個stream宝恶,所有的Windowing操作邏輯只能在一個Task中進行處理,也就是說計算并行度為1趴捅。
在實際編程過程中垫毙,我們可以看到DataStream的API也有對應的方法timeWindow()和timeWindowAll(),他們也分別對應著Keyed Window和Non-Keyed Window拱绑。

sate

轉載自過往記憶(https://www.iteblog.com/)
鏈接:https://www.iteblog.com/archives/2417.html

keyed state.png

keyed state 多種數(shù)據(jù)結構
operator state.png

note:Operator States的數(shù)據(jù)結構不像Keyed States豐富综芥,現(xiàn)在只支持List

多種保存點

多種后端保存

用戶可以根據(jù)自己的需求選擇,如果數(shù)據(jù)量較小猎拨,可以存放到MemoryStateBackend和FsStateBackend中膀藐,如果數(shù)據(jù)量較大,可以放到RockDB中红省。

WindowFunction (Legacy)

窗口的處理函數(shù)额各,但是獲取的contextual信息少,也沒有一些先進的特征类腮,比如per-window keyed state臊泰。未來將會被拋棄。

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

時間窗口的機制

Tumbling windows are aligned to epoch, i.e, 1970-01-01T00:00:00. So with a tumbling window of 10 seconds 2017-07-03T20:19:35Z would to into the window from [2017-07-03T20:19:30Z, 2017-07-03T20:19:40Z) and 2017-07-03T20:22:30Z would into [2017-07-03T20:22:30Z, 2017-07-03T20:22:40Z).
35的事件觸發(fā)的窗口是[30-40)的窗口蚜枢。

MapStateDescriptor

public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Word> descriptor =
                new MapStateDescriptor<>(
                        "words", // the state name
                        BasicTypeInfo.STRING_TYPE_INFO, // type information
                        TypeInformation.of(new TypeHint<Word>() {})
                        ); // default value of the state, if nothing was set
       // sum = getRuntimeContext().getState(descriptor);

    }
public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Word> descriptor =
                new MapStateDescriptor<>(
                        "words", // the state name
                        String.class, // type information
                        Word.class
                        ); // default value of the state, if nothing was set
       // sum = getRuntimeContext().getState(descriptor);

    }

設計草稿

時間缸逃、窗口、水印

鏈接:https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
講了窗口厂抽,時間需频,水印的概念,還有時間窗口計算后的時間戳應該怎么賦值筷凤。

state昭殉、fault tolence

鏈接:https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing#StatefulStreamProcessing-Stateinstreamingprograms

目錄

Streams and Operations on Streams

鏈接:https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

TaskManager故障恢復

JobManager可以通過配置高可用的zookeeper來保證故障恢復,TaskManager怎么故障恢復能藐守?


image.png

Keep in mind that in In stand alone mode a TM process that has exited
won't be automatically restarted though.

實踐所得:在standalone集群模式下挪丢,殺死一個TaskManager,那么機器上的這個TaskManager不會重啟卢厂,會使用其他機器剩余可用的TaskManager中slots來運行失敗的task乾蓬。

Slots

image.png

A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.

flink使用的slot數(shù)目是最高的并行度數(shù)目。

alignment

alignment的影響


image.png

Asynchronous State Snapshots

只有RockDB里面才有


image.png

各種state的例子

鏈接:https://www.slideshare.net/tillrohrmann/fault-tolerance-and-job-recovery-in-apache-flink

operator state.png

優(yōu)點.png

image.png

state backend

image.png

同步和異步的checkpoint區(qū)別

image.png

image.png

image.png

image.png

異步

鏈接:http://www.reibang.com/p/a2a0dade97b8
我們注意到上面描述的機制意味著當 operator 向后端存儲快照時慎恒,會停止處理輸入的數(shù)據(jù)任内。這種同步操作會在每次快照創(chuàng)建時引入延遲。
我們完全可以在存儲快照時融柬,讓 operator 繼續(xù)處理數(shù)據(jù)死嗦,讓快照存儲在后臺異步運行。為了做到這一點粒氧,operator 必須能夠生成一個后續(xù)修改不影響之前狀態(tài)的狀態(tài)對象越除。例如 RocksDB 中使用的寫時復制( copy-on-write )類型的數(shù)據(jù)結構。
接收到輸入的 barrier 時外盯,operator異步快照復制出的狀態(tài)(注:checkpoint的同步部分廊敌,復制狀態(tài)可能會花費較多的時間,這也是為什么checkpoint同步部分時間很長的原因)门怪。然后立即發(fā)射 barrier 到輸出流骡澈,繼續(xù)正常的流處理。一旦后臺異步快照完成掷空,它就會向 checkpoint coordinator(JobManager)確認 checkpoint 完成±吲梗現(xiàn)在 checkpoint 完成的充分條件是:所有 sink 接收到了 barrier,所有有狀態(tài) operator 都確認完成了狀態(tài)備份(可能會比 sink 接收到 barrier 晚)坦弟。
RocksDBStateBackend 模式對于較大的 Key 進行更新操作時序列化和反序列化耗時很多护锤。可以考慮使用 FsStateBackend 模式替代酿傍。

Checkpoint UI信息

image.png

image.png
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末烙懦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子赤炒,更是在濱河造成了極大的恐慌氯析,老刑警劉巖亏较,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異掩缓,居然都是意外死亡雪情,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門你辣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來巡通,“玉大人,你說我怎么就攤上這事舍哄⊙缌梗” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵表悬,是天一觀的道長弥锄。 經(jīng)常有香客問我,道長签孔,這世上最難降的妖魔是什么叉讥? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮饥追,結果婚禮上图仓,老公的妹妹穿的比我還像新娘。我一直安慰自己但绕,他們只是感情好救崔,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著捏顺,像睡著了一般六孵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上幅骄,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天劫窒,我揣著相機與錄音,去河邊找鬼拆座。 笑死主巍,一個胖子當著我的面吹牛,可吹牛的內容都是我干的挪凑。 我是一名探鬼主播孕索,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼躏碳!你這毒婦竟也來了搞旭?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肄渗,沒想到半個月后镇眷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡恳啥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年偏灿,在試婚紗的時候發(fā)現(xiàn)自己被綠了丹诀。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片钝的。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖铆遭,靈堂內的尸體忽然破棺而出硝桩,到底是詐尸還是另有隱情,我是刑警寧澤枚荣,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布碗脊,位于F島的核電站,受9級特大地震影響橄妆,放射性物質發(fā)生泄漏衙伶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一害碾、第九天 我趴在偏房一處隱蔽的房頂上張望矢劲。 院中可真熱鬧,春花似錦慌随、人聲如沸芬沉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丸逸。三九已至,卻和暖如春剃袍,著一層夾襖步出監(jiān)牢的瞬間黄刚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工民效, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留憔维,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓研铆,卻偏偏與公主長得像埋同,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子棵红,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容

  • apache Flink是一個面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計算平臺凶赁,它能夠基于同一個Flink運行時(...
    生活的探路者閱讀 1,475評論 3 8
  • 背景知識 低延遲 vs 高吞吐 流處理系統(tǒng)與批處理系統(tǒng)最大不同在于節(jié)點間的數(shù)據(jù)傳輸方式。 對于一個流處理系統(tǒng),當一...
    云藤閱讀 2,304評論 0 9
  • Flink總結 Flink簡介 Apache Flink作為一款高吞吐量虱肄、低延遲的針對流數(shù)據(jù)和批數(shù)據(jù)的分布式實時處...
    bigdata_er閱讀 10,598評論 0 10
  • “離開書店的時候萝挤,我把傘留下,希望取它回家的那個人是你根欧×洌” ——張國榮《那么近那么遠》 我曾經(jīng)有段時間特別迷...
    蘭蘭蘭蘭蘭蘭蘭蘭閱讀 573評論 2 4
  • 最近幾年,我所在城市餐飲行業(yè)發(fā)展很快凤粗,廣式粵菜餐廳潮流般一家家的開了起來酥泛,我也在四年前一個臘月寒冬的夜里,第一次喝...
    蛋撻少女啊閱讀 313評論 0 1