使用flink 中遇到的問題總結(jié)

問題一:如何保證數(shù)據(jù)按照事件時(shí)間準(zhǔn)確的落到同一個(gè)分區(qū)璧疗;

/**
 * @Author:wenwei
 * @Date : 2020/9/8 22:15
 * 自定義分桶的規(guī)則
 * 1:按照什么格式定義文件名歌憨,默認(rèn)為yyyy-MM-dd-HH
 */
@PublicEvolving
public class CustomBucketAssigner<IN> implements BucketAssigner<IN, String> {

    private static final long serialVersionUID = 1L;

    private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

    private final   String formatString;

    private final ZoneId zoneId;

    private transient DateTimeFormatter dateTimeFormatter;

    /**
     * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
     */
    public CustomBucketAssigner() {
        this(DEFAULT_FORMAT_STRING);
    }

    /**
     * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
     *
     * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
     *                     the bucket id.
     */
    public CustomBucketAssigner(String formatString) {
        this(formatString, ZoneId.systemDefault());
    }

    /**
     * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
     *
     * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
     */
    public CustomBucketAssigner(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);
    }

    /**
     * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
     *
     * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
     *                     the bucket path.
     * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
     */
    public CustomBucketAssigner(String formatString, ZoneId zoneId) {
        this.formatString = Preconditions.checkNotNull(formatString);
        this.zoneId = Preconditions.checkNotNull(zoneId);
    }
//將分桶的規(guī)則寫成按照事件時(shí)間衣陶;
    @Override
    public String getBucketId(IN element, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        }
        //固定格式命名文件夾名稱
        return "p_data_day="+dateTimeFormatter.format(Instant.ofEpochMilli(context.currentWatermark()));
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }

    @Override
    public String toString() {
        return "DateTimeBucketAssigner{" +
                "formatString='" + formatString + '\'' +
                ", zoneId=" + zoneId +
                '}';
    }

}

問題二: flink 如何準(zhǔn)確的劃分窗口的解阅?

如何正確定義window的窗口時(shí)間落竹,保證數(shù)據(jù)都會準(zhǔn)確的按照事件分區(qū),不會將前一天的數(shù)據(jù)货抄,落入到下一個(gè)時(shí)間分區(qū)里面述召;可以參考windows 中的源碼,其中定義start時(shí)間蟹地,值得參考

/**
     * Method to get the window start for a timestamp.
     *
     * @param timestamp epoch millisecond to get the window start. 事件發(fā)生的時(shí)間 
     * @param offset The offset which window start would be shifted by.  定義TumblingEventTimeWindows 設(shè)置云訊的offset的值积暖,默認(rèn)都為零
     * @param windowSize The size of the generated windows.  窗口大小
     * @return window start
     對應(yīng)的數(shù)據(jù)應(yīng)windows
     
    例如 windows Size = 5s  ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s ; 7s 
    2 - (2-0+5) % 5 = 0 ,
    7 - (7 - 0 + 5) % 5 = 5 , 
    例如 windows Size = 7s  ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s 怪与; 7s 
    2 - (2-0+7) % 7 = 0;
    7 - (7-0+7)%7= 7
    
    例如 windows Size = 5s  ,offset = 1s ; 例如當(dāng)前的 timestamp = 2s 夺刑; 7s 
    2 - (2-1+5) % 5 = 1 ,
    7 - (7 - 0 + 5) % 5 = 6 , 
    例如 windows Size = 7s  ,offset = 0 ; 例如當(dāng)前的 timestamp = 2s ; 7s 
    2 - (2-1+7) % 7 = 1;
    7 - (7-1+7)%7= 8
    
     */
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

問題三 : 由于數(shù)據(jù)量不斷增大琼梆,解析IP地址的時(shí)候性誉,導(dǎo)致文件句柄過多窿吩;

  • 將解析ip的類改造成單例類,有待優(yōu)化

public class Ip2regionSingleton {

    private static Logger logger = LoggerFactory.getLogger(Ip2regionSingleton.class);

    private static Ip2regionSingleton instance = new Ip2regionSingleton();

    private static DbConfig config;
    private static DbSearcher searcher;


    public DbSearcher getSearcher() {
        return searcher;
    }


    // 私有化構(gòu)造方法
    private Ip2regionSingleton() {

        String path = Ip2regionSingleton.class.getResource("/").getPath();
        String dbPath =  path + "plugins/ip2region.db";
        File file = new File(dbPath);

        logger.info("singleton count:{}","-------------------------------------------------------");

        if ( file.exists()  ) {

            try{
                config = new DbConfig();
                searcher = new DbSearcher(config, dbPath);

            }catch (Exception e){
                logger.error("Ip2regionSingleton:{}",e.getMessage());
                e.printStackTrace();
            }
        }
    }

    public static Ip2regionSingleton getInstance() {
        return instance;
    }

}

問題四: 如何解決flink pom文件中 茎杂,包依賴的問題;

  • maven helper 纫雁;找到相應(yīng)沖突的jar類煌往;
  • 通過exclude方式,去除掉沖突的jar類

問題五: 如何保證flink中的轧邪,端到端數(shù)據(jù)的一致性刽脖,順序性;

  • 保證kafka中數(shù)據(jù)的順序性忌愚;(做到全局的數(shù)據(jù)的順序性基本上不可能曲管,但是可以做到單分區(qū)的數(shù)據(jù)一致性)

  • kaka中,每個(gè)機(jī)器有個(gè)broker硕糊,broker里面有多個(gè)partition院水,partition之間通過主從方式復(fù)制;這樣保證數(shù)據(jù)的一致性简十;

  • flink中設(shè)置 exactly oncely的語義檬某;

   env.enableCheckpointing(parameter.getLong("checkpoint.cycle",300*1000L),CheckpointingMode.EXACTLY_ONCE);

問題六: 如何保證在無事件數(shù)據(jù)更新的時(shí)候,更新watermark的值螟蝙,然后觸發(fā)窗口的計(jì)算

  • 在處理某些數(shù)據(jù)的時(shí)候恢恼,數(shù)據(jù)流的時(shí)間更新時(shí)間間隔大于窗口的大小击孩,如果使用PunctuatedWatermarks 會導(dǎo)致watermark一直不更新颁独;改成AssignerWithPeriodicWatermarks周期性的更新的watermark即可

    private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<PageActivityDO> {
            private static final long serialVersionUID = 1L;
            private Long currentTime = 0L;
            //允許2分鐘的延遲
            private Long allowDelayTime = 120L;
            @Override
            public Watermark checkAndGetNextWatermark(PageActivityDO topic, long l) {
                return new Watermark(currentTime - allowDelayTime);
            }
            @Override
            public long extractTimestamp(PageActivityDO topic, long l) {
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
                if(StringUtils.isNullOrWhitespaceOnly(topic.getPoint_time())){
                    return currentTime;
                }
                LocalDateTime localDateTime = LocalDateTime.parse(topic.getPoint_time(), formatter);
                currentTime = Math.max(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(), currentTime);
    
                return currentTime;
            }
    
    
        }
    
  private static class CustomWatermarksPeriodc<T> implements AssignerWithPeriodicWatermarks<ActivityInfoDO> {
        private static final long serialVersionUID = 1L;
        //允許30s的延遲
        private Long allowDelayTime = 30000L;

        @Override
        public long extractTimestamp(ActivityInfoDO topic, long l) {
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

            if(StringUtils.isNullOrWhitespaceOnly(topic.getPush_time())){
                return System.currentTimeMillis();
            }
            LocalDateTime localDateTime = LocalDateTime.parse(topic.getPush_time(), formatter);
            logger.info("extractTimestamp,currentWatermark:{}",localDateTime );
            return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();


        }


        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            logger.info("getCurrentWatermark, currentWatermark:{}",System.currentTimeMillis() - allowDelayTime);
            return new Watermark(System.currentTimeMillis() - allowDelayTime);
        }
    }
  • 特別注意 選用 Periodic WatermarkGenerator 需要設(shè)置自動(dòng)watermark更新機(jī)制, setAutoWatermarkInterval(1000)

問題七:如何保證兩階段提交的實(shí)現(xiàn),保證數(shù)據(jù)能夠冪等性寫入和事務(wù)性的寫入

  • 保證數(shù)據(jù)源數(shù)據(jù)可重放
  • 數(shù)據(jù)sink支持事務(wù)處理(預(yù)提交搁拙,回滾,提交)
  • 或者通過sink的地方凳枝,支持唯一性去重

問題八:sink to mysql 的時(shí)候滞谢,經(jīng)常報(bào)錯(cuò)

  • 報(bào)錯(cuò)類型 : The last packet successfully received from the server was 1,203,500 milliseconds ago.
  • 有可能是jdbc版本出現(xiàn),同時(shí)最好采用mysql 連接池

正確的使用valueState

flink 對于不是大規(guī)模的中間態(tài)的管理锁保,可以選用 fsStateBackend 薯酝;StateBackend fsStateBackend = new FsStateBackend(parameter.get("flink.state.path"));

  • 其中包括狀態(tài)的保留時(shí)間;更新類型爽柒;是否可見
 StateTtlConfig   ttlConfig = StateTtlConfig
                .newBuilder(Time.days(ttlDays))
                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//                .cleanupInRocksdbCompactFilter(1000L)
                .build();

參考鏈接:

1: Flink最難知識點(diǎn)再解析 | 時(shí)間/窗口/水印/遲到數(shù)據(jù)處理

2:Flink 中 timeWindow 滾動(dòng)窗口邊界和數(shù)據(jù)延遲問題調(diào)研

3: Kafka 概述:深入理解架構(gòu)

4: generate watermarks

5:兩階段提交(2PC)與其在Flink exactly once中的應(yīng)用

提醒吴菠,使用的是flink 1.9的版本

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市浩村,隨后出現(xiàn)的幾起案子做葵,更是在濱河造成了極大的恐慌,老刑警劉巖心墅,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酿矢,死亡現(xiàn)場離奇詭異,居然都是意外死亡怎燥,警方通過查閱死者的電腦和手機(jī)瘫筐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來铐姚,“玉大人策肝,你說我怎么就攤上這事∫啵” “怎么了之众?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長依许。 經(jīng)常有香客問我棺禾,道長,這世上最難降的妖魔是什么峭跳? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任膘婶,我火速辦了婚禮,結(jié)果婚禮上坦康,老公的妹妹穿的比我還像新娘竣付。我一直安慰自己,他們只是感情好滞欠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布古胆。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逸绎。 梳的紋絲不亂的頭發(fā)上惹恃,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機(jī)與錄音棺牧,去河邊找鬼巫糙。 笑死,一個(gè)胖子當(dāng)著我的面吹牛颊乘,可吹牛的內(nèi)容都是我干的参淹。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼乏悄,長吁一口氣:“原來是場噩夢啊……” “哼浙值!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起檩小,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤开呐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后规求,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體筐付,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年阻肿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了瓦戚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡冕茅,死狀恐怖伤极,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情姨伤,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布庸疾,位于F島的核電站乍楚,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏届慈。R本人自食惡果不足惜徒溪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望金顿。 院中可真熱鬧臊泌,春花似錦、人聲如沸揍拆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至播揪,卻和暖如春贮喧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背猪狈。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工箱沦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人雇庙。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓谓形,卻偏偏與公主長得像,于是被迫代替她去往敵國和親疆前。 傳聞我的和親對象是個(gè)殘疾皇子套耕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345