場景描述:Flink在處理實(shí)時(shí)數(shù)據(jù)時(shí),假如其中一條數(shù)據(jù)時(shí)臟數(shù)據(jù)泻拦,例如格式錯(cuò)誤導(dǎo)致Json轉(zhuǎn)換異常毙芜,字段缺少等等,這個(gè)時(shí)候該怎么處理呢争拐?
解決辦法:
這種問題在Spark Sql或者Flink Sql中腋粥,最常見的辦法就是直接過濾掉晦雨。
在實(shí)際中,遇到的情況會(huì)非常多隘冲,則我們可以自定義一個(gè)UDF闹瞧,這個(gè)UDF的作用就是用來處理null或者空字符串或者其他各種異常情況的。
官方案例:
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
在實(shí)際工作中展辞,在利用env.addSource方法對(duì)接Kafka數(shù)據(jù)源后奥邮,會(huì)利用map方法將對(duì)應(yīng)json串轉(zhuǎn)成對(duì)象,所以會(huì)try catch罗珍,即
this.source = this.env
.addSource(KafkaUtil.text("tgs-topic"))
.uid(Constants.UID_SOURCE_TGS)
.name("Kafka tgs-topic Source")
.map(json -> {
try {
return JSON.parseObject(json, Tgs.class);
} catch (Exception e) {
logger.error("Illegal JSON message: {}", json);
throw e;
}
})
.uid(Constants.UID_MAP_JSON_PARSE)
.name("Parse JSON to Tgs object");
這樣在遇到臟數(shù)據(jù)時(shí)洽腺,也不會(huì)因?yàn)閖son轉(zhuǎn)換出錯(cuò)導(dǎo)致任務(wù)失敗。