問題一:如何保證數(shù)據(jù)按照事件時(shí)間準(zhǔn)確的落到同一個(gè)分區(qū)璧疗;
- 使用watermark自定義分桶規(guī)則,參考鏈接:flink 落 hdfs 數(shù)據(jù)按照事件事件分區(qū)解決方案 https://developer.aliyun.com/article/719786
/**
* @Author:wenwei
* @Date : 2020/9/8 22:15
* 自定義分桶的規(guī)則
* 1:按照什么格式定義文件名歌憨,默認(rèn)為yyyy-MM-dd-HH
*/
@PublicEvolving
public class CustomBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
*/
public CustomBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
*
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket id.
*/
public CustomBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
*
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public CustomBucketAssigner(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public CustomBucketAssigner(String formatString, ZoneId zoneId) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
}
//將分桶的規(guī)則寫成按照事件時(shí)間衣陶;
@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
//固定格式命名文件夾名稱
return "p_data_day="+dateTimeFormatter.format(Instant.ofEpochMilli(context.currentWatermark()));
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String toString() {
return "DateTimeBucketAssigner{" +
"formatString='" + formatString + '\'' +
", zoneId=" + zoneId +
'}';
}
}
問題二: flink 如何準(zhǔn)確的劃分窗口的解阅?
如何正確定義window的窗口時(shí)間落竹,保證數(shù)據(jù)都會準(zhǔn)確的按照事件分區(qū),不會將前一天的數(shù)據(jù)货抄,落入到下一個(gè)時(shí)間分區(qū)里面述召;可以參考windows 中的源碼,其中定義start時(shí)間蟹地,值得參考
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start. 事件發(fā)生的時(shí)間
* @param offset The offset which window start would be shifted by. 定義TumblingEventTimeWindows 設(shè)置云訊的offset的值积暖,默認(rèn)都為零
* @param windowSize The size of the generated windows. 窗口大小
* @return window start
對應(yīng)的數(shù)據(jù)應(yīng)windows
例如 windows Size = 5s ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s ; 7s
2 - (2-0+5) % 5 = 0 ,
7 - (7 - 0 + 5) % 5 = 5 ,
例如 windows Size = 7s ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s 怪与; 7s
2 - (2-0+7) % 7 = 0;
7 - (7-0+7)%7= 7
例如 windows Size = 5s ,offset = 1s ; 例如當(dāng)前的 timestamp = 2s 夺刑; 7s
2 - (2-1+5) % 5 = 1 ,
7 - (7 - 0 + 5) % 5 = 6 ,
例如 windows Size = 7s ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s ; 7s
2 - (2-1+7) % 7 = 1;
7 - (7-1+7)%7= 8
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
問題三 : 由于數(shù)據(jù)量不斷增大琼梆,解析IP地址的時(shí)候性誉,導(dǎo)致文件句柄過多窿吩;
- 將解析ip的類改造成單例類,有待優(yōu)化
public class Ip2regionSingleton {
private static Logger logger = LoggerFactory.getLogger(Ip2regionSingleton.class);
private static Ip2regionSingleton instance = new Ip2regionSingleton();
private static DbConfig config;
private static DbSearcher searcher;
public DbSearcher getSearcher() {
return searcher;
}
// 私有化構(gòu)造方法
private Ip2regionSingleton() {
String path = Ip2regionSingleton.class.getResource("/").getPath();
String dbPath = path + "plugins/ip2region.db";
File file = new File(dbPath);
logger.info("singleton count:{}","-------------------------------------------------------");
if ( file.exists() ) {
try{
config = new DbConfig();
searcher = new DbSearcher(config, dbPath);
}catch (Exception e){
logger.error("Ip2regionSingleton:{}",e.getMessage());
e.printStackTrace();
}
}
}
public static Ip2regionSingleton getInstance() {
return instance;
}
}
問題四: 如何解決flink pom文件中 茎杂,包依賴的問題;
- maven helper 纫雁;找到相應(yīng)沖突的jar類煌往;
- 通過exclude方式,去除掉沖突的jar類
問題五: 如何保證flink中的轧邪,端到端數(shù)據(jù)的一致性刽脖,順序性;
保證kafka中數(shù)據(jù)的順序性忌愚;(做到全局的數(shù)據(jù)的順序性基本上不可能曲管,但是可以做到單分區(qū)的數(shù)據(jù)一致性)
kaka中,每個(gè)機(jī)器有個(gè)broker硕糊,broker里面有多個(gè)partition院水,partition之間通過主從方式復(fù)制;這樣保證數(shù)據(jù)的一致性简十;
flink中設(shè)置 exactly oncely的語義檬某;
env.enableCheckpointing(parameter.getLong("checkpoint.cycle",300*1000L),CheckpointingMode.EXACTLY_ONCE);
問題六: 如何保證在無事件數(shù)據(jù)更新的時(shí)候,更新watermark的值螟蝙,然后觸發(fā)窗口的計(jì)算
-
在處理某些數(shù)據(jù)的時(shí)候恢恼,數(shù)據(jù)流的時(shí)間更新時(shí)間間隔大于窗口的大小击孩,如果使用PunctuatedWatermarks 會導(dǎo)致watermark一直不更新颁独;改成AssignerWithPeriodicWatermarks周期性的更新的watermark即可
private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<PageActivityDO> { private static final long serialVersionUID = 1L; private Long currentTime = 0L; //允許2分鐘的延遲 private Long allowDelayTime = 120L; @Override public Watermark checkAndGetNextWatermark(PageActivityDO topic, long l) { return new Watermark(currentTime - allowDelayTime); } @Override public long extractTimestamp(PageActivityDO topic, long l) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); if(StringUtils.isNullOrWhitespaceOnly(topic.getPoint_time())){ return currentTime; } LocalDateTime localDateTime = LocalDateTime.parse(topic.getPoint_time(), formatter); currentTime = Math.max(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(), currentTime); return currentTime; } }
private static class CustomWatermarksPeriodc<T> implements AssignerWithPeriodicWatermarks<ActivityInfoDO> {
private static final long serialVersionUID = 1L;
//允許30s的延遲
private Long allowDelayTime = 30000L;
@Override
public long extractTimestamp(ActivityInfoDO topic, long l) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
if(StringUtils.isNullOrWhitespaceOnly(topic.getPush_time())){
return System.currentTimeMillis();
}
LocalDateTime localDateTime = LocalDateTime.parse(topic.getPush_time(), formatter);
logger.info("extractTimestamp,currentWatermark:{}",localDateTime );
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
logger.info("getCurrentWatermark, currentWatermark:{}",System.currentTimeMillis() - allowDelayTime);
return new Watermark(System.currentTimeMillis() - allowDelayTime);
}
}
- 特別注意 選用 Periodic WatermarkGenerator 需要設(shè)置自動(dòng)watermark更新機(jī)制, setAutoWatermarkInterval(1000)
問題七:如何保證兩階段提交的實(shí)現(xiàn),保證數(shù)據(jù)能夠冪等性寫入和事務(wù)性的寫入
- 保證數(shù)據(jù)源數(shù)據(jù)可重放
- 數(shù)據(jù)sink支持事務(wù)處理(預(yù)提交搁拙,回滾,提交)
- 或者通過sink的地方凳枝,支持唯一性去重
問題八:sink to mysql 的時(shí)候滞谢,經(jīng)常報(bào)錯(cuò)
- 報(bào)錯(cuò)類型 : The last packet successfully received from the server was 1,203,500 milliseconds ago.
- 有可能是jdbc版本出現(xiàn),同時(shí)最好采用mysql 連接池
正確的使用valueState
flink 對于不是大規(guī)模的中間態(tài)的管理锁保,可以選用 fsStateBackend 薯酝;StateBackend fsStateBackend = new FsStateBackend(parameter.get("flink.state.path"));
- 其中包括狀態(tài)的保留時(shí)間;更新類型爽柒;是否可見
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(ttlDays))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// .cleanupInRocksdbCompactFilter(1000L)
.build();
參考鏈接:
1: Flink最難知識點(diǎn)再解析 | 時(shí)間/窗口/水印/遲到數(shù)據(jù)處理
2:Flink 中 timeWindow 滾動(dòng)窗口邊界和數(shù)據(jù)延遲問題調(diào)研
5:兩階段提交(2PC)與其在Flink exactly once中的應(yīng)用
提醒吴菠,使用的是flink 1.9的版本