import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class FlinkCheckpointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment steamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
steamEnv.enableCheckpointing(1000L*2);
steamEnv
.addSource(new FSource()).setParallelism(4)
.transform("開始事務(wù)", Types.STRING,new FStart()).setParallelism(1)
.process(new FCombine()).name("事務(wù)預(yù)處理").setParallelism(4)
.addSink(new FSubmit()).name("提交事務(wù)").setParallelism(1)
;
steamEnv.execute("test");
}
static class FSource extends RichParallelSourceFunction<String>{
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
int I =0;
while (true){
I = I + 1;
sourceContext.collect(Thread.currentThread().getId() +"-" +I);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
}
static class FStart extends AbstractStreamOperator<String> implements OneInputStreamOperator<String,String>{
volatile Long ckid = 0L;
@Override
public void processElement(StreamRecord<String> streamRecord) throws Exception {
String value = streamRecord.getValue() + "-" + ckid;
streamRecord.replace(value);
log("收到數(shù)據(jù): " + value + "..ckid:" + ckid);
output.collect(streamRecord);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
log("開啟事務(wù): " + checkpointId);
ckid = checkpointId;
super.prepareSnapshotPreBarrier(checkpointId);
}
}
static class FCombine extends ProcessFunction<String,String> implements CheckpointedFunction {
List ls = new ArrayList<String>();
Collector<String> collector =null;
volatile Long ckid = 0L;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
StringBuffer sb = new StringBuffer();
ls.forEach(x->{sb.append(x).append(";");});
log("批處理 " + functionSnapshotContext.getCheckpointId() + ": 時(shí)收到數(shù)據(jù):" + sb.toString());
Thread.sleep(5*1000);
collector.collect("["+sb.toString() + "-"+ckid+"]");
ls.clear();
log("批處理 " + functionSnapshotContext.getCheckpointId() + " 完成");
//Thread.sleep(5*1000);
//Thread.sleep(20*1000);
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { }
@Override
public void processElement(String s, Context context, Collector<String> out) throws Exception {
if(StringUtils.isNotBlank(s)){
ls.add(s);
}
log("收到數(shù)據(jù) :" + s + "; 這批數(shù)據(jù)大小為:" + ls.size() + "..ckid:" + ckid);
if(collector ==null){
collector = out;
}
}
}
static class FSubmit extends RichSinkFunction<String> implements CheckpointedFunction, CheckpointListener {
List ls = new ArrayList<String>();
volatile Long ckid = 0L;
@Override
public void notifyCheckpointComplete(long l) throws Exception {
ckid = l;
StringBuffer sb = new StringBuffer();
ls.forEach(x->{sb.append(x).append("||");});
log("submit notifyCheckpointComplete " + l + " over data:list size" + ls.size()+ "; detail" + sb.toString());
Thread.sleep(100000);
ls.clear();
}
@Override
public void invoke(String value, Context context) throws Exception {
if(StringUtils.isNotBlank(value)){
ls.add(value);
}
log("收到數(shù)據(jù) :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
StringBuffer sb = new StringBuffer();
ls.forEach(x->{sb.append(x).append("||");});
log("submit snapshotState " + context.getCheckpointId() + " over data:list size" + ls.size()+ "; detail" + sb.toString());
ls.clear();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
public static void log(String s){
String name = Thread.currentThread().getName();
System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())+":"+name + ":" + s);
}
}