1. 處理函數(shù)
測試案例
第一個(gè)水位線為 -9223372036854775808
這里watermark 總是比 上一個(gè)事件時(shí)間 慢1ms
public class processDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默認(rèn)是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(2);
// 隨機(jī)生成事件數(shù)據(jù)
DataStreamSource<Event> addSource = env.addSource(new Source());
SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
outputStreamOperator.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
long watermark = ctx.timerService().currentWatermark();
long processingTime = ctx.timerService().currentProcessingTime();
Long timestamp = ctx.timestamp();
int thisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(processingTime);
System.out.println("--------");
System.out.println(timestamp);
out.collect(value.user + " " + value.timestamp + " " + watermark + " " + thisSubtask);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
}
}).print();
env.execute();
}
}
定時(shí)器的簡單應(yīng)用-事件的處理時(shí)間
public class processProcessingTime {
public static void main(String[] args) throws Exception{
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默認(rèn)是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(2);
DataStreamSource<Event> addSource = env.addSource(new Source());
//定時(shí)器的觸發(fā)和事件處理時(shí)間
addSource.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
//獲取時(shí)間的處理時(shí)間
long processingTime = ctx.timerService().currentProcessingTime();
out.collect(ctx.getCurrentKey() + "事件處理事件為" + new Timestamp(processingTime));
//10秒的定時(shí)器
ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + "定時(shí)器觸發(fā)事件" + new Timestamp(timestamp));
}
}).print();
env.execute();
}
}
定時(shí)器的簡單應(yīng)用-事件的事件時(shí)間
public class processEventTime {
public static void main(String[] args) throws Exception{
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默認(rèn)是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
Long timestamp = ctx.timestamp();
System.out.println(new Timestamp(value.timestamp));
out.collect(ctx.getCurrentKey() + "時(shí)間戳" + new Timestamp(timestamp) + "水位線" + ctx.timerService().currentWatermark());
//10秒的定時(shí)器
ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + "定時(shí)器觸發(fā)事件" + new Timestamp(timestamp) + "水位線" + ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
}
2022-05-23 23:54:50.305
大時(shí)間戳2022-05-23 23:54:50.305水位線-9223372036854775808
2022-05-23 23:54:51.309
大時(shí)間戳2022-05-23 23:54:51.309水位線1653321290304
2022-05-23 23:54:52.309
中時(shí)間戳2022-05-23 23:54:52.309水位線1653321291308
2022-05-23 23:54:53.309
中時(shí)間戳2022-05-23 23:54:53.309水位線1653321292308
2022-05-23 23:54:54.31
小時(shí)間戳2022-05-23 23:54:54.31水位線1653321293308
2022-05-23 23:54:55.31
中時(shí)間戳2022-05-23 23:54:55.31水位線1653321294309
2022-05-23 23:54:56.31
中時(shí)間戳2022-05-23 23:54:56.31水位線1653321295309
2022-05-23 23:54:57.311
小時(shí)間戳2022-05-23 23:54:57.311水位線1653321296309
2022-05-23 23:54:58.311
小時(shí)間戳2022-05-23 23:54:58.311水位線1653321297310
2022-05-23 23:54:59.311
中時(shí)間戳2022-05-23 23:54:59.311水位線1653321298310
2022-05-23 23:55:00.312
中時(shí)間戳2022-05-23 23:55:00.305水位線1653321299310
大定時(shí)器觸發(fā)事件2022-05-23 23:55:00.305水位線1653321300311
定時(shí)器的時(shí)間必須是小于等于水位線才會(huì)觸發(fā)
processwindowFunction
windowall實(shí)現(xiàn)TOP_N
public class processTop_N {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默認(rèn)是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.map(data -> data.url)
//滑動(dòng)窗口
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(new AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>>() {
@Override
public HashMap<String, Long> createAccumulator() {
return new HashMap<String, Long>();
}
@Override
public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
//每來一個(gè)數(shù)據(jù)判斷是否和map里的key是重復(fù)的,重復(fù)的+1,不重復(fù)的置為1
if (accumulator.containsKey(value)) {
accumulator.put(value, accumulator.get(value) + 1);
} else {
accumulator.put(value, 1L);
}
return accumulator;
}
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
ArrayList<Tuple2<String, Long>> list = new ArrayList<>();
for (Map.Entry<String, Long> entry : accumulator.entrySet()) {
list.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
list.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return (int) (o2.f1 - o1.f1) ;
}
});
return list;
}
@Override
public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
return null;
}
}, new ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
StringBuilder builder = new StringBuilder();
ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
for (int i = 0; i < 2; i++) {
builder.append("N").append(i + 1).append("事件類型").append(list.get(i).f0).append(" 訪問次數(shù) ").append(list.get(i).f1).append("\n");
}
out.collect("窗口開始事件" + new Timestamp(start) + " 窗口結(jié)束事件" + new Timestamp(end) + "\n" + builder) ;
}
}).print();
env.execute();
}
}
每個(gè)重寫方法是每來一條數(shù)據(jù)執(zhí)行一次,根據(jù)窗口來執(zhí)行,這個(gè)要了解清楚
2. 多流轉(zhuǎn)換
側(cè)輸出流
public class outTagDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
OutputTag<Tuple2<String, String>> tag_小明 = new OutputTag<>("1"){};
OutputTag<Tuple2<String, String>> tag_小紅 = new OutputTag<>("2"){};
DataStreamSource<Event> source = env.addSource(new Source());
SingleOutputStreamOperator<Event> operator = source.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("小明")){
ctx.output(tag_小明,Tuple2.of(value.user,value.url));
}
else if(value.user.equals("小紅")){
ctx.output(tag_小紅,Tuple2.of(value.user,value.url));
}
else {
out.collect(value);
}
}
});
operator.getSideOutput(tag_小明).print();
operator.getSideOutput(tag_小紅).print();
operator.print();
env.execute();
}
}
聯(lián)合(union)
連接(connect)
雙流join-窗口連接
public class windowJoinDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
Tuple2.of("1", 1000L),
Tuple2.of("2", 1001L),
Tuple2.of("1", 2000L),
Tuple2.of("2", 2001L),
Tuple2.of("1", 5001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
Tuple2.of("1", 4000L),
Tuple2.of("2", 4001L),
Tuple2.of("1", 5000L),
Tuple2.of("2", 6001L),
Tuple2.of("1", 7001L),
Tuple2.of("1", 4001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
source1.join(source2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return first + " ---> " + second;
}
}).print();
env.execute();
}
}
雙流join-間隔連接
public class windowJoinDemo2 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
Tuple2.of("1", 1000L),
Tuple2.of("2", 1001L),
Tuple2.of("1", 2000L),
Tuple2.of("2", 2001L),
Tuple2.of("1", 5001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
Tuple2.of("1", 4000L),
Tuple2.of("2", 4001L),
Tuple2.of("1", 5000L),
Tuple2.of("2", 6001L),
Tuple2.of("1", 7001L),
Tuple2.of("1", 4001L),
Tuple2.of("1", 8001L),
Tuple2.of("1", 9001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
source2.keyBy(data -> data.f0)
.intervalJoin(source1.keyBy(data -> data.f0))
.between(Time.seconds(-2),Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left + "-->" + right + " " + new Timestamp(ctx.getTimestamp()) + "");
}
}).print();
env.execute();
}
}
雙流join-同組連結(jié)
source1.coGroup(source2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new RichCoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
out.collect(first + "-->" + second );
}
}).print();
[(1,1000), (1,2000)]-->[(1,4000), (1,4001)]
[(2,1001), (2,2001)]-->[(2,4001)]
[(1,5001)]-->[(1,5000), (1,7001)]
[]-->[(2,6001)]
3. 狀態(tài)編程
flink中的狀態(tài)
- 狀態(tài)的訪問權(quán)限
- 容錯(cuò)性
- 分布式應(yīng)用的橫向擴(kuò)展性
補(bǔ)充一下理解flink中的task :flink taskmanager&slots&并行度&任務(wù)鏈&task分配詳解
按KEY分區(qū)狀態(tài)
值狀態(tài)每隔10S統(tǒng)計(jì)一次PV
public class stateDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.keyBy(data -> data.user)
.process(new KeyedProcessFunction<String, Event, String>() {
//定義兩個(gè)狀態(tài):1.保存值 2.保存定時(shí)器
ValueState<Long> valueState ;
ValueState<Long> timeState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("valueState", Long.class));
timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeState", Long.class));
}
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
//更新key-value值
valueState.update(valueState.value() == null ? 1 : valueState.value() + 1);
//判斷這個(gè)key的定時(shí)器是否注冊(cè)過:我們的目的是10S只輸出一次
if (timeState.value() == null){
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
timeState.update(ctx.timestamp() + 10 * 1000L);
}
out.collect(ctx.getCurrentKey() + "PV " + valueState.value() + "水位線 " + new Timestamp(ctx.timerService().currentWatermark()) + " " + new Timestamp(ctx.timestamp())) ;
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("onTimer" + ctx.getCurrentKey() + "PV " + valueState.value() + "水位線 " + new Timestamp(ctx.timerService().currentWatermark()) + " " + new Timestamp(ctx.timestamp()));
//清空定時(shí)器
timeState.clear();
// valueState.clear(); 這里清除PV的化就相當(dāng)于滾動(dòng)窗口了
// 直到現(xiàn)在下一個(gè)key的定時(shí)器的產(chǎn)生,依賴于下一個(gè)key數(shù)據(jù)的到達(dá),這就有點(diǎn)類似于會(huì)話窗口了,不符合題意,所以需要我們手動(dòng)注冊(cè)一下定時(shí)器
ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
timeState.update(timestamp + 10 * 1000L);
}
})
.print();
env.execute();
}
}
這里有個(gè)問題,我一直沒搞清楚,事件時(shí)間和水位線的問題.....把事件時(shí)間給打印,修改Duration.ofSeconds(2)測試案例
list-state案例
public class stateDemo2 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> operator1 = env.fromElements(
Tuple2.of("a", 100L),
Tuple2.of("a", 100L),
Tuple2.of("b", 200L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> operator2 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 2000L),
Tuple2.of("c", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
operator1.keyBy(data -> data.f0)
.connect(operator2.keyBy(data -> data.f0))
.process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
ListState<Tuple2<String, Long>> left;
ListState<Tuple2<String, Long>> right;
@Override
public void open(Configuration parameters) throws Exception {
left = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("left", Types.TUPLE(Types.STRING, Types.LONG)));
right = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("right", Types.TUPLE(Types.STRING, Types.LONG)));
}
@Override
public void processElement1(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
for (Tuple2<String, Long> left : right.get()) {
out.collect(value + " " + left);
System.out.println("----");
}
System.out.println("11111");
left.add(Tuple2.of(value.f0, value.f1));
}
@Override
public void processElement2(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
for (Tuple2<String, Long> right : left.get()) {
out.collect(value + " " + right);
}
System.out.println("22222");
right.add(Tuple2.of(value.f0, value.f1));
}
}).print();
env.execute();
}
}
測試了一下,為什么執(zhí)行結(jié)果是這樣的,非常詭異...
11111
(a,1000) (a,100)
22222
(a,100) (a,1000)
----
11111
22222
(b,200) (b,2000)
----
11111
22222
學(xué)習(xí)一下:Flink進(jìn)階(三):雙流connect的用法
Tuple2.of("a", 1000L),
Tuple2.of("a", 3000L),
Tuple2.of("b", 2000L),
Tuple2.of("c", 2000L)
改了一下,測試案例,確實(shí)不太對(duì),3000L的匹配了兩次100L,沒有進(jìn)行清除....
實(shí)現(xiàn)窗口
public class stateDemo3 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定義端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.print();
//按url統(tǒng)計(jì)PV
operator.keyBy(data -> data.url)
.process(new keyProcess(10 * 1000L))
.print();
env.execute();
}
}
class keyProcess extends KeyedProcessFunction<String,Event,String>{
private Long tumbleTime;
MapState<Long, Long> mapState;
public keyProcess(Long tumbleTime) {
this.tumbleTime = tumbleTime;
}
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Long.class, Long.class));
}
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
long startTime = value.timestamp / tumbleTime * tumbleTime;
long endTime = startTime + tumbleTime ;
if (mapState.contains(startTime)){
mapState.put(startTime,mapState.get(startTime) + 1);
}
else {
mapState.put(startTime,1L);
}
//相同的定時(shí)器會(huì)重復(fù)嗎???
ctx.timerService().registerEventTimeTimer(endTime - 1L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
long endTime = timestamp + 1L;
long startTime = endTime - tumbleTime;
out.collect("窗口" + new Timestamp(startTime) + "--" + new Timestamp(endTime) + " " + ctx.getCurrentKey() + " PV: " + mapState.get(startTime));
mapState.remove(startTime);
}
}
這里為什么要+1 -1???有什么區(qū)別嗎
看起來 ontime一次會(huì)輸出多條數(shù)據(jù),但是不是已經(jīng)keyBy了嗎,也就是說一個(gè)key一個(gè)ontime...是這樣嗎,這樣做性能是不是有點(diǎn)低
狀態(tài)生存時(shí)間TTL
算子狀態(tài)
算子狀態(tài)也支持不同的結(jié)構(gòu)類型,主要有三種:liststate,unionliststate,broadcaststate
廣播狀態(tài)
狀態(tài)持久化
檢查點(diǎn)
狀態(tài)后端
4. 容錯(cuò)機(jī)制
檢查點(diǎn)(checkpoint)
- 檢查點(diǎn)的保存
- 從檢查點(diǎn)恢復(fù)狀態(tài)
- 檢查點(diǎn)算法
- 檢查點(diǎn)配置
- 保存點(diǎn)(savepoint)
狀態(tài)一致性
什么是狀態(tài)一致性
- 對(duì)于流處理器內(nèi)部來說,所謂的狀態(tài)一致性,其實(shí)就是我們所說的計(jì)算結(jié)果要保證準(zhǔn)確
- 一條數(shù)據(jù)不應(yīng)該丟失,也不應(yīng)該重復(fù)計(jì)算
- 在遇到故障可以恢復(fù)狀態(tài),恢復(fù)以后的重新計(jì)算,結(jié)果應(yīng)該也是完全正確的
狀態(tài)一致性分類
- 最多一次:當(dāng)任務(wù)故障,最簡單的做法是什么都不干,既不恢復(fù)丟失的狀態(tài),也不重播丟失的數(shù)據(jù).at-most-once 語義的含義是最多處理一次事件
- 至少一次:在大多數(shù)的真實(shí)應(yīng)用場景,我們希望不丟失事件,這種類型的保障稱為at-least-once,意思是所有的事件都得到了處理,而一些事件還可能被處理多次
- 精確一次: 恰好處理一次是最嚴(yán)格的保證,也是最難實(shí)現(xiàn)的,恰好處理一次語義不僅僅意味著沒有事件丟失,還意味著針對(duì)每一個(gè)數(shù)據(jù),內(nèi)部狀態(tài)僅僅更新一次(邏輯上的)
端到端的精確一致性
冪等寫入(idempotent writes)
所謂的冪等性操作,是說一個(gè)操作,可以重復(fù)執(zhí)行很多次,但是只會(huì)導(dǎo)致一次結(jié)果更改,也就是說,后面再重復(fù)執(zhí)行就不起作用了
有一種場景:數(shù)據(jù)消費(fèi)如下
10,20,30,40,50
我們?cè)谙M(fèi)到10的時(shí)候做了一個(gè)檢查點(diǎn),但是數(shù)據(jù)在消費(fèi)到30的時(shí)候掛掉了,所以數(shù)據(jù)要從最近一個(gè)檢查點(diǎn)恢復(fù),所以看到的數(shù)據(jù)走勢(shì)是 10,20,30,10,20,30.....雖然數(shù)據(jù)最終是一致性的,但是中間的過程客戶看到的是不對(duì)的...所以我們需要事務(wù)寫入這種類型的
事務(wù)寫入(transactional writes)
- 事務(wù)(transaction)
應(yīng)用程序中一系列嚴(yán)密的操作.所有操作必須成功完成,否則在每個(gè)操作中所作的操作更改都會(huì)被撤銷
具有原子性:一個(gè)事務(wù)中的一系列操作要么全部成功,要么一個(gè)都不做 - 實(shí)現(xiàn)思想
構(gòu)建的事務(wù)對(duì)應(yīng)著checkpoint 等到checkpoint 真正完成的時(shí)候,才把所有對(duì)應(yīng)的結(jié)果寫入sink系統(tǒng)中 - 實(shí)現(xiàn)方式
預(yù)寫日志
兩階段提交
預(yù)寫日志(write-ahead-log)
把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到checkpoint完成的通知時(shí),一次性寫入sink系統(tǒng)
簡單易于實(shí)現(xiàn),由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無論做什么sink系統(tǒng),都能用這種方式搞定
在大多數(shù)場景下是至少一次性語義的
兩階段提交(two-phase-commit)
參考一下: 第九章:一致性與共識(shí)
對(duì)于每一個(gè)checkpoint ,sink任務(wù)都會(huì)啟動(dòng)一個(gè)事務(wù),并將接下來所有接收的數(shù)據(jù)添加到事務(wù)中
然后將這些數(shù)據(jù)寫入到sink系統(tǒng)中,但不提交他們 -- 這時(shí)只是預(yù)提交(這里能查數(shù)據(jù)不?不能被消費(fèi)的)
當(dāng)收到checkpoint完成的通知,它才正式提交事務(wù),實(shí)現(xiàn)結(jié)果的真正寫入
這種方式真正實(shí)現(xiàn)了exactly-once,它需要一個(gè)提供事務(wù)支持的外部sink系統(tǒng)
flink和kafka連接時(shí)的精準(zhǔn)一次保障
public class KafkaToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//開啟checkpointing
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));
//設(shè)置Kafka相關(guān)參數(shù)
Properties properties = new Properties();//設(shè)置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "linux03:9092,linux04:9092,linux05:9092");
//讀取偏移量策略:如果沒有記錄偏移量棍潘,就從頭讀,如果記錄過偏移量牺荠,就接著讀
properties.setProperty("auto.offset.reset", "earliest");
//設(shè)置消費(fèi)者組ID
properties.setProperty("group.id", "g1");
//沒有開啟checkpoint疏虫,讓flink提交偏移量的消費(fèi)者定期自動(dòng)提交偏移量
properties.setProperty("enable.auto.commit", "false");
//創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"kafka2021", //要讀取數(shù)據(jù)的Topic名稱
new SimpleStringSchema(), //讀取文件的反序列化Schema
properties //傳入Kafka的參數(shù)
);
//使用addSource添加kafkaConsumer
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint時(shí)屏鳍,不將偏移量寫入到kafka特殊的topic中
DataStreamSource<String> lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));
//寫入Kafka的topic
String topic = "out2021";
//設(shè)置Kafka相關(guān)參數(shù)
properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
//創(chuàng)建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //指定topic
new KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schema
properties, //指定Kafka的相關(guān)參數(shù)
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義
);
filtered.addSink(kafkaProducer);
env.execute();
}
}
/**
* 自定義String類型數(shù)據(jù)Kafka的序列化Schema
*/
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
private String charset;
//構(gòu)造方法傳入要寫入的topic和字符集栏饮,默認(rèn)使用UTF-8
public KafkaStringSerializationSchema(String topic) {
this(topic, "UTF-8");
}
public KafkaStringSerializationSchema(String topic, String charset) {
this.topic = topic;
this.charset = charset;
}
//調(diào)用該方法將數(shù)據(jù)進(jìn)行序列化
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
//將數(shù)據(jù)轉(zhuǎn)成bytes數(shù)組
byte[] bytes = element.getBytes(Charset.forName(charset));
//返回ProducerRecord
return new ProducerRecord<>(topic, bytes);
}
}