2022-05-28-Flink-47(六)

1. 處理函數(shù)

測試案例

第一個(gè)水位線為 -9223372036854775808
這里watermark 總是比 上一個(gè)事件時(shí)間 慢1ms


public class processDemo1 {
    public static void main(String[] args) throws Exception {


        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默認(rèn)是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(2);


        // 隨機(jī)生成事件數(shù)據(jù)
        DataStreamSource<Event> addSource = env.addSource(new Source());
        SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        outputStreamOperator.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                long watermark = ctx.timerService().currentWatermark();
                long processingTime = ctx.timerService().currentProcessingTime();
                Long timestamp = ctx.timestamp();
                int thisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(processingTime);
                System.out.println("--------");
                System.out.println(timestamp);

                out.collect(value.user + "  " + value.timestamp + "   " + watermark + "  " + thisSubtask);


            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
            }
        }).print();

        env.execute();




    }
}
定時(shí)器的簡單應(yīng)用-事件的處理時(shí)間

public class processProcessingTime {

    public static void main(String[] args) throws Exception{
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默認(rèn)是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(2);

        DataStreamSource<Event> addSource = env.addSource(new Source());


        //定時(shí)器的觸發(fā)和事件處理時(shí)間
        addSource.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
            @Override
            public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                 //獲取時(shí)間的處理時(shí)間
                long processingTime = ctx.timerService().currentProcessingTime();
                out.collect(ctx.getCurrentKey() + "事件處理事件為" + new Timestamp(processingTime));
                //10秒的定時(shí)器
                ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                       out.collect(ctx.getCurrentKey() + "定時(shí)器觸發(fā)事件" + new Timestamp(timestamp));
            }
        }).print();

        env.execute();
    }
}

定時(shí)器的簡單應(yīng)用-事件的事件時(shí)間

public class processEventTime {


    public static void main(String[] args) throws Exception{
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默認(rèn)是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));



        operator.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
            @Override
            public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                Long timestamp = ctx.timestamp();
                System.out.println(new Timestamp(value.timestamp));
                out.collect(ctx.getCurrentKey() + "時(shí)間戳" + new Timestamp(timestamp) + "水位線" + ctx.timerService().currentWatermark());
                //10秒的定時(shí)器
                ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                out.collect(ctx.getCurrentKey() + "定時(shí)器觸發(fā)事件" + new Timestamp(timestamp) + "水位線" + ctx.timerService().currentWatermark());
            }
        }).print();

        env.execute();
    }
}
2022-05-23 23:54:50.305
大時(shí)間戳2022-05-23 23:54:50.305水位線-9223372036854775808
2022-05-23 23:54:51.309
大時(shí)間戳2022-05-23 23:54:51.309水位線1653321290304
2022-05-23 23:54:52.309
中時(shí)間戳2022-05-23 23:54:52.309水位線1653321291308
2022-05-23 23:54:53.309
中時(shí)間戳2022-05-23 23:54:53.309水位線1653321292308
2022-05-23 23:54:54.31
小時(shí)間戳2022-05-23 23:54:54.31水位線1653321293308
2022-05-23 23:54:55.31
中時(shí)間戳2022-05-23 23:54:55.31水位線1653321294309
2022-05-23 23:54:56.31
中時(shí)間戳2022-05-23 23:54:56.31水位線1653321295309
2022-05-23 23:54:57.311
小時(shí)間戳2022-05-23 23:54:57.311水位線1653321296309
2022-05-23 23:54:58.311
小時(shí)間戳2022-05-23 23:54:58.311水位線1653321297310
2022-05-23 23:54:59.311
中時(shí)間戳2022-05-23 23:54:59.311水位線1653321298310
2022-05-23 23:55:00.312
中時(shí)間戳2022-05-23 23:55:00.305水位線1653321299310
大定時(shí)器觸發(fā)事件2022-05-23 23:55:00.305水位線1653321300311

定時(shí)器的時(shí)間必須是小于等于水位線才會(huì)觸發(fā)

processwindowFunction
windowall實(shí)現(xiàn)TOP_N

public class processTop_N {


    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默認(rèn)是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

       operator.map(data -> data.url)
               //滑動(dòng)窗口
               .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
               .aggregate(new AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>>() {
                   @Override
                   public HashMap<String, Long> createAccumulator() {
                       return new HashMap<String, Long>();
                   }

                   @Override
                   public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
                       //每來一個(gè)數(shù)據(jù)判斷是否和map里的key是重復(fù)的,重復(fù)的+1,不重復(fù)的置為1
                       if (accumulator.containsKey(value)) {
                           accumulator.put(value, accumulator.get(value) + 1);
                       } else {
                           accumulator.put(value, 1L);
                       }
                       return accumulator;
                   }

                   @Override
                   public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
                       ArrayList<Tuple2<String, Long>> list = new ArrayList<>();
                       for (Map.Entry<String, Long> entry : accumulator.entrySet()) {
                           list.add(Tuple2.of(entry.getKey(), entry.getValue()));
                       }
                       list.sort(new Comparator<Tuple2<String, Long>>() {
                           @Override
                           public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                               return (int) (o2.f1 - o1.f1)  ;
                           }
                       });
                       return list;
                   }

                   @Override
                   public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
                       return null;
                   }
               }, new ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>() {
                   @Override
                   public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
                       long start = context.window().getStart();
                       long end = context.window().getEnd();
                       StringBuilder builder = new StringBuilder();
                       ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
                       for (int i = 0; i < 2; i++) {
                              builder.append("N").append(i + 1).append("事件類型").append(list.get(i).f0).append("  訪問次數(shù) ").append(list.get(i).f1).append("\n");
                       }
                       out.collect("窗口開始事件" + new Timestamp(start) + "  窗口結(jié)束事件" + new Timestamp(end) + "\n" + builder) ;
                   }
               }).print();

             env.execute();

    }
}

每個(gè)重寫方法是每來一條數(shù)據(jù)執(zhí)行一次,根據(jù)窗口來執(zhí)行,這個(gè)要了解清楚

2. 多流轉(zhuǎn)換

側(cè)輸出流

Flink 無法推斷接OutputTag的類型


public class outTagDemo {


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        OutputTag<Tuple2<String, String>> tag_小明 = new OutputTag<>("1"){};
        OutputTag<Tuple2<String, String>> tag_小紅 = new OutputTag<>("2"){};
        DataStreamSource<Event> source = env.addSource(new Source());
        SingleOutputStreamOperator<Event> operator = source.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
                         if (value.user.equals("小明")){
                               ctx.output(tag_小明,Tuple2.of(value.user,value.url));
                         }
                         else if(value.user.equals("小紅")){
                             ctx.output(tag_小紅,Tuple2.of(value.user,value.url));

                }
                         else {
                               out.collect(value);
                         }
            }
        });

        operator.getSideOutput(tag_小明).print();
        operator.getSideOutput(tag_小紅).print();
        operator.print();


        env.execute();
    }
}

聯(lián)合(union)
連接(connect)
雙流join-窗口連接

public class windowJoinDemo1 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                Tuple2.of("1", 1000L),
                Tuple2.of("2", 1001L),
                Tuple2.of("1", 2000L),
                Tuple2.of("2", 2001L),
                Tuple2.of("1", 5001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));
        SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                Tuple2.of("1", 4000L),
                Tuple2.of("2", 4001L),
                Tuple2.of("1", 5000L),
                Tuple2.of("2", 6001L),
                Tuple2.of("1", 7001L),
                Tuple2.of("1", 4001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        source1.join(source2)
                .where(data -> data.f0)
                .equalTo(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                        return first + "  --->  " + second;
                    }
                }).print();


        env.execute();
    }

}
雙流join-間隔連接

public class windowJoinDemo2 {
    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                Tuple2.of("1", 1000L),
                Tuple2.of("2", 1001L),
                Tuple2.of("1", 2000L),
                Tuple2.of("2", 2001L),
                Tuple2.of("1", 5001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));
        SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                Tuple2.of("1", 4000L),
                Tuple2.of("2", 4001L),
                Tuple2.of("1", 5000L),
                Tuple2.of("2", 6001L),
                Tuple2.of("1", 7001L),
                Tuple2.of("1", 4001L),
                Tuple2.of("1", 8001L),
                Tuple2.of("1", 9001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        source2.keyBy(data -> data.f0)
               .intervalJoin(source1.keyBy(data -> data.f0))
               .between(Time.seconds(-2),Time.seconds(2))
               .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                           @Override
                           public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                               out.collect(left + "-->" + right + "  " + new Timestamp(ctx.getTimestamp()) + "");
                           }
                       }).print();




        env.execute();
    }
}

雙流join-同組連結(jié)
        source1.coGroup(source2)
                .where(data -> data.f0)
                .equalTo(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new RichCoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
                            out.collect(first + "-->" + second );
                    }
                }).print();
[(1,1000), (1,2000)]-->[(1,4000), (1,4001)]
[(2,1001), (2,2001)]-->[(2,4001)]
[(1,5001)]-->[(1,5000), (1,7001)]
[]-->[(2,6001)]

3. 狀態(tài)編程

flink中的狀態(tài)
  1. 狀態(tài)的訪問權(quán)限
  2. 容錯(cuò)性
  3. 分布式應(yīng)用的橫向擴(kuò)展性

補(bǔ)充一下理解flink中的task :flink taskmanager&slots&并行度&任務(wù)鏈&task分配詳解

按KEY分區(qū)狀態(tài)
值狀態(tài)每隔10S統(tǒng)計(jì)一次PV

public class stateDemo1 {


    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));


        operator.keyBy(data -> data.user)
              .process(new KeyedProcessFunction<String, Event, String>() {
                  //定義兩個(gè)狀態(tài):1.保存值  2.保存定時(shí)器
                  ValueState<Long> valueState ;
                  ValueState<Long> timeState;
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      valueState  = getRuntimeContext().getState(new ValueStateDescriptor<Long>("valueState", Long.class));
                      timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeState", Long.class));
                  }

                  @Override
                  public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                      //更新key-value值
                      valueState.update(valueState.value() == null ? 1 : valueState.value() + 1);
                      //判斷這個(gè)key的定時(shí)器是否注冊(cè)過:我們的目的是10S只輸出一次
                      if (timeState.value() == null){
                             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                             timeState.update(ctx.timestamp() + 10 * 1000L);

                      }
                        out.collect(ctx.getCurrentKey() + "PV  " + valueState.value() + "水位線 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp())) ;

                  }

                  @Override
                  public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                          out.collect("onTimer" + ctx.getCurrentKey() + "PV  " + valueState.value() + "水位線 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp()));
                           //清空定時(shí)器
                           timeState.clear();
                           // valueState.clear();  這里清除PV的化就相當(dāng)于滾動(dòng)窗口了
                           // 直到現(xiàn)在下一個(gè)key的定時(shí)器的產(chǎn)生,依賴于下一個(gè)key數(shù)據(jù)的到達(dá),這就有點(diǎn)類似于會(huì)話窗口了,不符合題意,所以需要我們手動(dòng)注冊(cè)一下定時(shí)器
                           ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
                           timeState.update(timestamp + 10 * 1000L);
                  }
              })
              .print();


        env.execute();
    }
}

這里有個(gè)問題,我一直沒搞清楚,事件時(shí)間和水位線的問題.....把事件時(shí)間給打印,修改Duration.ofSeconds(2)測試案例

list-state案例

public class stateDemo2 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);


        SingleOutputStreamOperator<Tuple2<String, Long>> operator1 = env.fromElements(
                Tuple2.of("a", 100L),
                Tuple2.of("a", 100L),
                Tuple2.of("b", 200L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));


        SingleOutputStreamOperator<Tuple2<String, Long>> operator2 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 2000L),
                Tuple2.of("c", 2000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));


        operator1.keyBy(data -> data.f0)
                .connect(operator2.keyBy(data -> data.f0))
                .process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    ListState<Tuple2<String, Long>> left;
                    ListState<Tuple2<String, Long>> right;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        left = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("left", Types.TUPLE(Types.STRING, Types.LONG)));
                        right = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("right", Types.TUPLE(Types.STRING, Types.LONG)));

                    }

                    @Override
                    public void processElement1(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {


                        for (Tuple2<String, Long> left : right.get()) {
                            out.collect(value + " " + left);
                            System.out.println("----");

                        }
                        System.out.println("11111");
                        left.add(Tuple2.of(value.f0, value.f1));

                    }

                    @Override
                    public void processElement2(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                        for (Tuple2<String, Long> right : left.get()) {
                            out.collect(value + " " + right);
                        }
                        System.out.println("22222");
                        right.add(Tuple2.of(value.f0, value.f1));
                    }
                }).print();


        env.execute();
    }
}

測試了一下,為什么執(zhí)行結(jié)果是這樣的,非常詭異...

11111
(a,1000) (a,100)
22222
(a,100) (a,1000)
----
11111
22222
(b,200) (b,2000)
----
11111
22222

學(xué)習(xí)一下:Flink進(jìn)階(三):雙流connect的用法

                Tuple2.of("a", 1000L),
                Tuple2.of("a", 3000L),
                Tuple2.of("b", 2000L),
                Tuple2.of("c", 2000L)
改了一下,測試案例,確實(shí)不太對(duì),3000L的匹配了兩次100L,沒有進(jìn)行清除....
實(shí)現(xiàn)窗口

public class stateDemo3 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定義端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));
        operator.print();

        //按url統(tǒng)計(jì)PV
        operator.keyBy(data -> data.url)
                        .process(new keyProcess(10 * 1000L))
                        .print();

        env.execute();
    }
}

class  keyProcess extends KeyedProcessFunction<String,Event,String>{
       private  Long  tumbleTime;
       MapState<Long, Long> mapState;


    public keyProcess(Long tumbleTime) {
        this.tumbleTime = tumbleTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Long.class, Long.class));
    }

    @Override
    public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
        long startTime = value.timestamp / tumbleTime * tumbleTime;
        long endTime  = startTime + tumbleTime ;
        if (mapState.contains(startTime)){
               mapState.put(startTime,mapState.get(startTime) + 1);
        }
        else {
              mapState.put(startTime,1L);
        }
        //相同的定時(shí)器會(huì)重復(fù)嗎???
        ctx.timerService().registerEventTimeTimer(endTime - 1L);

    }
    
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        long endTime = timestamp + 1L;
        long startTime = endTime - tumbleTime;
        out.collect("窗口" + new Timestamp(startTime) + "--" + new Timestamp(endTime) + " " + ctx.getCurrentKey() + "  PV:   " + mapState.get(startTime));
        mapState.remove(startTime);
     }
}

這里為什么要+1 -1???有什么區(qū)別嗎

看起來 ontime一次會(huì)輸出多條數(shù)據(jù),但是不是已經(jīng)keyBy了嗎,也就是說一個(gè)key一個(gè)ontime...是這樣嗎,這樣做性能是不是有點(diǎn)低

狀態(tài)生存時(shí)間TTL
算子狀態(tài)

算子狀態(tài)也支持不同的結(jié)構(gòu)類型,主要有三種:liststate,unionliststate,broadcaststate

廣播狀態(tài)
狀態(tài)持久化

檢查點(diǎn)
狀態(tài)后端

4. 容錯(cuò)機(jī)制

檢查點(diǎn)(checkpoint)
  1. 檢查點(diǎn)的保存
  2. 從檢查點(diǎn)恢復(fù)狀態(tài)
  3. 檢查點(diǎn)算法
  4. 檢查點(diǎn)配置
  5. 保存點(diǎn)(savepoint)
狀態(tài)一致性

什么是狀態(tài)一致性

  1. 對(duì)于流處理器內(nèi)部來說,所謂的狀態(tài)一致性,其實(shí)就是我們所說的計(jì)算結(jié)果要保證準(zhǔn)確
  2. 一條數(shù)據(jù)不應(yīng)該丟失,也不應(yīng)該重復(fù)計(jì)算
  3. 在遇到故障可以恢復(fù)狀態(tài),恢復(fù)以后的重新計(jì)算,結(jié)果應(yīng)該也是完全正確的

狀態(tài)一致性分類

  1. 最多一次:當(dāng)任務(wù)故障,最簡單的做法是什么都不干,既不恢復(fù)丟失的狀態(tài),也不重播丟失的數(shù)據(jù).at-most-once 語義的含義是最多處理一次事件
  2. 至少一次:在大多數(shù)的真實(shí)應(yīng)用場景,我們希望不丟失事件,這種類型的保障稱為at-least-once,意思是所有的事件都得到了處理,而一些事件還可能被處理多次
  3. 精確一次: 恰好處理一次是最嚴(yán)格的保證,也是最難實(shí)現(xiàn)的,恰好處理一次語義不僅僅意味著沒有事件丟失,還意味著針對(duì)每一個(gè)數(shù)據(jù),內(nèi)部狀態(tài)僅僅更新一次(邏輯上的)
端到端的精確一致性

冪等寫入(idempotent writes)

所謂的冪等性操作,是說一個(gè)操作,可以重復(fù)執(zhí)行很多次,但是只會(huì)導(dǎo)致一次結(jié)果更改,也就是說,后面再重復(fù)執(zhí)行就不起作用了


冪等

有一種場景:數(shù)據(jù)消費(fèi)如下
10,20,30,40,50
我們?cè)谙M(fèi)到10的時(shí)候做了一個(gè)檢查點(diǎn),但是數(shù)據(jù)在消費(fèi)到30的時(shí)候掛掉了,所以數(shù)據(jù)要從最近一個(gè)檢查點(diǎn)恢復(fù),所以看到的數(shù)據(jù)走勢(shì)是 10,20,30,10,20,30.....雖然數(shù)據(jù)最終是一致性的,但是中間的過程客戶看到的是不對(duì)的...所以我們需要事務(wù)寫入這種類型的

事務(wù)寫入(transactional writes)

  1. 事務(wù)(transaction)
    應(yīng)用程序中一系列嚴(yán)密的操作.所有操作必須成功完成,否則在每個(gè)操作中所作的操作更改都會(huì)被撤銷
    具有原子性:一個(gè)事務(wù)中的一系列操作要么全部成功,要么一個(gè)都不做
  2. 實(shí)現(xiàn)思想
    構(gòu)建的事務(wù)對(duì)應(yīng)著checkpoint 等到checkpoint 真正完成的時(shí)候,才把所有對(duì)應(yīng)的結(jié)果寫入sink系統(tǒng)中
  3. 實(shí)現(xiàn)方式
    預(yù)寫日志
    兩階段提交

預(yù)寫日志(write-ahead-log)
把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到checkpoint完成的通知時(shí),一次性寫入sink系統(tǒng)
簡單易于實(shí)現(xiàn),由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無論做什么sink系統(tǒng),都能用這種方式搞定
在大多數(shù)場景下是至少一次性語義的

兩階段提交(two-phase-commit)
參考一下: 第九章:一致性與共識(shí)
對(duì)于每一個(gè)checkpoint ,sink任務(wù)都會(huì)啟動(dòng)一個(gè)事務(wù),并將接下來所有接收的數(shù)據(jù)添加到事務(wù)中
然后將這些數(shù)據(jù)寫入到sink系統(tǒng)中,但不提交他們 -- 這時(shí)只是預(yù)提交(這里能查數(shù)據(jù)不?不能被消費(fèi)的)
當(dāng)收到checkpoint完成的通知,它才正式提交事務(wù),實(shí)現(xiàn)結(jié)果的真正寫入
這種方式真正實(shí)現(xiàn)了exactly-once,它需要一個(gè)提供事務(wù)支持的外部sink系統(tǒng)

flink和kafka連接時(shí)的精準(zhǔn)一次保障

public class KafkaToKafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //開啟checkpointing
        env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));

        //設(shè)置Kafka相關(guān)參數(shù)
        Properties properties = new Properties();//設(shè)置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "linux03:9092,linux04:9092,linux05:9092");
        //讀取偏移量策略:如果沒有記錄偏移量棍潘,就從頭讀,如果記錄過偏移量牺荠,就接著讀
        properties.setProperty("auto.offset.reset", "earliest");
        //設(shè)置消費(fèi)者組ID
        properties.setProperty("group.id", "g1");
        //沒有開啟checkpoint疏虫,讓flink提交偏移量的消費(fèi)者定期自動(dòng)提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "kafka2021", //要讀取數(shù)據(jù)的Topic名稱
                new SimpleStringSchema(), //讀取文件的反序列化Schema
                properties //傳入Kafka的參數(shù)
        );
        //使用addSource添加kafkaConsumer
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint時(shí)屏鳍,不將偏移量寫入到kafka特殊的topic中

        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));


        //寫入Kafka的topic
        String topic = "out2021";
        //設(shè)置Kafka相關(guān)參數(shù)
        properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");

        //創(chuàng)建FlinkKafkaProducer
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                topic, //指定topic
                new KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schema
                properties, //指定Kafka的相關(guān)參數(shù)
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義
        );

        filtered.addSink(kafkaProducer);

        env.execute();

    }
}
/**
 * 自定義String類型數(shù)據(jù)Kafka的序列化Schema
 */
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {

    private String topic;
    private String charset;
    //構(gòu)造方法傳入要寫入的topic和字符集栏饮,默認(rèn)使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, "UTF-8");
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //調(diào)用該方法將數(shù)據(jù)進(jìn)行序列化
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        //將數(shù)據(jù)轉(zhuǎn)成bytes數(shù)組
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市论咏,隨后出現(xiàn)的幾起案子优炬,更是在濱河造成了極大的恐慌,老刑警劉巖厅贪,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蠢护,死亡現(xiàn)場離奇詭異,居然都是意外死亡养涮,警方通過查閱死者的電腦和手機(jī)葵硕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贯吓,“玉大人懈凹,你說我怎么就攤上這事∏男常” “怎么了介评?”我有些...
    開封第一講書人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長爬舰。 經(jīng)常有香客問我们陆,道長,這世上最難降的妖魔是什么情屹? 我笑而不...
    開封第一講書人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任坪仇,我火速辦了婚禮,結(jié)果婚禮上垃你,老公的妹妹穿的比我還像新娘椅文。我一直安慰自己颈墅,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開白布雾袱。 她就那樣靜靜地躺著,像睡著了一般官还。 火紅的嫁衣襯著肌膚如雪芹橡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評(píng)論 1 299
  • 那天望伦,我揣著相機(jī)與錄音林说,去河邊找鬼。 笑死屯伞,一個(gè)胖子當(dāng)著我的面吹牛腿箩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播劣摇,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼珠移,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了末融?” 一聲冷哼從身側(cè)響起钧惧,我...
    開封第一講書人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎勾习,沒想到半個(gè)月后浓瞪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡巧婶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年乾颁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片艺栈。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡英岭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出眼滤,到底是詐尸還是另有隱情巴席,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布诅需,位于F島的核電站漾唉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏堰塌。R本人自食惡果不足惜赵刑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望场刑。 院中可真熱鬧般此,春花似錦蚪战、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至科乎,卻和暖如春壁畸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背茅茂。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來泰國打工捏萍, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人空闲。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓令杈,卻偏偏與公主長得像,于是被迫代替她去往敵國和親碴倾。 傳聞我的和親對(duì)象是個(gè)殘疾皇子逗噩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容