Flink exactly-once 實戰(zhàn)筆記

Flink-Kafka

眾所周知脑慧,F(xiàn)link在很早的時候就通過Checkpointing提供了exactly-once的semantic,不過僅限于自身或者是從KafkaConsumer中消費數(shù)據(jù)嗦随。而在Flink 1.4版本的時候加入了赫赫有名的TwoPhaseCommitSinkFunction碍沐,提供了End-to-End的exatcly-once語言身隐,當然是在需要下游支持回滾的情況下,具體的概念和設計方式官網(wǎng)已經(jīng)寫的比較清楚锤灿,就不多加贅述第股。而對于KafkaProducer,Kafka在0.11版本之后支持transaction吼拥,也就意味著支持對寫入數(shù)據(jù)的commit和rollback倚聚,在通過Flink寫入到Kafka的應用程序中可以達到exactly-once的效果。

接下來展示一下如何在Flink應用程序中激活exactly-once語義凿可。對于SourceFunction大家隨意采用一種即可惑折,文件,kafka topic等皆可枯跑。而主要部分是在于對FlinkKafkaProducer的初始化惨驶。我使用的是Flink1.7版本使用的Producer類為FlinkKafkaProducer011,觀察它的構(gòu)造函數(shù)敛助,很容易發(fā)現(xiàn)有的構(gòu)造函數(shù)中需要你傳入一個枚舉變量semantic, 有三種可選值NONE, AT_LEAST_ONCE,EXACTLY_ONCE粗卜,而默認值為AT_LEAST_ONCE,很顯然我們在這里需要使用EXACTLY_ONCE纳击。不過在此之前续扔,我們需要仔細閱讀一下Flink官網(wǎng)Flink-kafka-connector的內(nèi)容攻臀,其中提到,Kafka broker的transaction.max.timeout.ms默認為15分鐘纱昧,而FlinkKafkaProducer011默認的transaction.timeout.ms為1個小時刨啸,遠遠超出了broker的最大超時時間,這種情況下如果你的服務掛了超過15分鐘识脆,就會造成數(shù)據(jù)丟失设联。所以如果需要你的producer支持的更長的事務時間就需要提高kafka broker transaction.max.timeout.ms的值。下面是一個簡單的實例去使用Exactly-once語義的FlinkKafkaProducer存璃。

FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
    topics,
    new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
    properties,
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
)

這么做的話Flink sink到Kafka中在大部分情況下就都能保證Exactly-once仑荐。值得注意的是,所有通過事務寫入的Kafka topic, 在消費他們的時候纵东,必須給消費者加上參數(shù)isolation.level=read_committed粘招,這是因為Kafka的事務支持是給寫入的數(shù)據(jù)分為committed和uncomitted,如果使用默認配置的consumer偎球,讀取的時候依然會讀取所有數(shù)據(jù)而不是根據(jù)事務隔離洒扎。

Flink-Hdfs

目前我們使用的cdh中hadoop版本為2.6,Hadoop在2.7版本后對Hdfs支持了truncate操作衰絮,會使得回滾機制感覺方便快捷袍冷。這里只談一下關于低版本Hdfs flink的容錯機制,以及我們自身對寫入消息進行g(shù)zip壓縮所遇到的一些坑猫牡。

flink-connector-filesystem的源碼中提供了BucketingSink來支持文件在文件系統(tǒng)上的滾動寫入胡诗。BucketingSink對象通過傳入自定義Writer來執(zhí)行寫入的方式,研究下面的一些已經(jīng)實現(xiàn)的Writer類可以發(fā)現(xiàn)淌友,BucketingSink通過hadoop API的FSDataOutputStream來創(chuàng)建文件流和寫入煌恢。而FSDataOutputStream中又包裹了一個PositionCache類來記錄文件流每次運行的狀態(tài)。

private static class PositionCache extends FilterOutputStream {
    private FileSystem.Statistics statistics;
    long position;

    public PositionCache(OutputStream out, 
                         FileSystem.Statistics stats,
                         long pos) throws IOException {
      super(out);
      statistics = stats;
      position = pos;
    }

    public void write(int b) throws IOException {
      out.write(b);
      position++;
      if (statistics != null) {
        statistics.incrementBytesWritten(1);
      }
    }
    
    public void write(byte b[], int off, int len) throws IOException {
      out.write(b, off, len);
      position += len;                            // update position
      if (statistics != null) {
        statistics.incrementBytesWritten(len);
      }
    }
      
    public long getPos() throws IOException {
      return position;                            // return cached position
    }
    
    public void close() throws IOException {
      out.close();
    }
  }

從該類中可以看到震庭,文件流每次執(zhí)行write操作的時候瑰抵,PosistionCache都會刷新他本身的position變量。而BucketingSink的BucketState則會通過這個變量來更新currentFileValiedLength成員來記錄文件的有效長度器联。當Job因為某種原因down了之后二汛,checkpoint會記錄bucketState的信息,在任務恢復的時候拨拓,會在文件系統(tǒng)上生成一個valid-length文件來表明該文件的有效長度(單位:Byte)是多少(Hadoop2.7后的truncate()功能可以直接幫你truncate掉多余的內(nèi)容肴颊,但是低版本就需要自己處理了)。

在我們的業(yè)務環(huán)境中千元,需要對寫入的文本文件進行g(shù)zip壓縮苫昌,F(xiàn)link目前只提供了SequenceFile和Avro格式的Writer,并沒有提供普通的原生文本壓縮支持幸海。所以需要我們自己編寫Writer祟身。在這值得注意的是對于壓縮流庫的選擇,我們選擇了java.util.zip下的GzipOutpuStream而不是org.apache.hadoop.io下的CompressOutputStream物独,原因是后者不支持對壓縮數(shù)據(jù)流設置syncFlush袜硫,因此在調(diào)用flush()方法的時候只會flush outputStream而前者會先flush底層的compressor。后者在使用中會導致PositionCache的position不正常從而導致valid-length不可用而無法達到hdfs的exactly-once語義挡篓。

public class HdfsCompressStringWriter extends StreamWriterBase<JSONObject> {

    private static final long serialVersionUID = 2L;

    /**
     * The {@code CompressFSDataOutputStream} for the current part file.
     */
    private transient GZIPOutputStream compressionOutputStream;

    public HdfsCompressStringWriter() {}

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);
        this.setSyncOnFlush(true);
        compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
    }

    public void close() throws IOException {
        if (compressionOutputStream != null) {
            compressionOutputStream.close();
            compressionOutputStream = null;
        }
        /** 
        --此處對StreamWriterBase類進行了修改添加了resetStream方法來將內(nèi)部的FSDataOutputStream置空婉陷,
        不然在close的時候如果已經(jīng)通過compressionOutputStream關閉流則FSDataOutputStream對象沒有置空
        會導致再下一次open的時候報Stream already open的錯誤。
        */
        resetStream();
    }

    @Override
    public void write(JSONObject element) throws IOException {
        if (element == null || !element.containsKey("body")) {
            return;
        }
        String content = element.getString("body") + "\n";
        compressionOutputStream.write(content.getBytes());
        compressionOutputStream.flush();
    }

    @Override
    public Writer<JSONObject> duplicate() {
        return new HdfsCompressStringWriter();
    }

}

通過自定義的GzipWriter官研,如果任務遇到異常秽澳,checkpoint會記錄valid-length來讓我們恢復成準確無重復的數(shù)據(jù)。但是由于我們是2.6版本的Hadoop戏羽,只能將壓縮文件從Hdfs上get下來處理担神。而由于gzip非文本文件,而且在文件尾部有一個4字節(jié)的滾動更新的CRC32編碼始花,和另外一個4字節(jié)的ISIZE代表原始非壓縮文件的長度對2^32求模妄讯,簡單的truncate會導致gzip文件損壞而無法通過正常的解壓縮讀取。不過Gzip本身的壓縮文本是以chunk形式連續(xù)存在的酷宵,zcat命令可以在不壓縮的情況下讀取有效的內(nèi)容亥贸。所以如果我們需要修復原始的文件,則大致必須通過以下方式浇垦。

length=$(hdfs dfs -text /path/to/my/file.valid-length) # 有時候獲取的length還有一些奇怪的空字符和特殊字符要處理比如('\x0' or ^M^H等)
hdfs dfs -get /path/to/my/file.gz myfile.gz
truncate myfile.gz -s $length
zcat myfile.gz > myfixedfile
gzip myfixedfile

現(xiàn)在的問題是炕置,如果文件數(shù)較多且大小不小的時候,通過腳本逐步執(zhí)行這些效率會非常低男韧,所以目前也在尋找更完善的方式去達成這個問題朴摊。

Flink-Elasticsearch & Hbase

對于Es,我們用了整條數(shù)據(jù)的哈希作為uuid煌抒,對于Hbase每條數(shù)據(jù)也同樣有固定的rowkey仍劈,因此只需要AT_LEAST_ONCE語義就可以保證數(shù)據(jù)不缺失不重復。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末寡壮,一起剝皮案震驚了整個濱河市贩疙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌况既,老刑警劉巖这溅,帶你破解...
    沈念sama閱讀 212,686評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異棒仍,居然都是意外死亡悲靴,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,668評論 3 385
  • 文/潘曉璐 我一進店門莫其,熙熙樓的掌柜王于貴愁眉苦臉地迎上來癞尚,“玉大人耸三,你說我怎么就攤上這事〗娇” “怎么了仪壮?”我有些...
    開封第一講書人閱讀 158,160評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長胳徽。 經(jīng)常有香客問我积锅,道長,這世上最難降的妖魔是什么养盗? 我笑而不...
    開封第一講書人閱讀 56,736評論 1 284
  • 正文 為了忘掉前任缚陷,我火速辦了婚禮,結(jié)果婚禮上往核,老公的妹妹穿的比我還像新娘箫爷。我一直安慰自己,他們只是感情好铆铆,可當我...
    茶點故事閱讀 65,847評論 6 386
  • 文/花漫 我一把揭開白布蝶缀。 她就那樣靜靜地躺著,像睡著了一般薄货。 火紅的嫁衣襯著肌膚如雪翁都。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,043評論 1 291
  • 那天谅猾,我揣著相機與錄音柄慰,去河邊找鬼。 笑死税娜,一個胖子當著我的面吹牛坐搔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播敬矩,決...
    沈念sama閱讀 39,129評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼概行,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了弧岳?” 一聲冷哼從身側(cè)響起凳忙,我...
    開封第一講書人閱讀 37,872評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎禽炬,沒想到半個月后涧卵,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,318評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡腹尖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,645評論 2 327
  • 正文 我和宋清朗相戀三年柳恐,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,777評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡乐设,死狀恐怖讼庇,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情伤提,我是刑警寧澤巫俺,帶...
    沈念sama閱讀 34,470評論 4 333
  • 正文 年R本政府宣布认烁,位于F島的核電站肿男,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏却嗡。R本人自食惡果不足惜舶沛,卻給世界環(huán)境...
    茶點故事閱讀 40,126評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望窗价。 院中可真熱鬧如庭,春花似錦、人聲如沸撼港。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,861評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽帝牡。三九已至往毡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間靶溜,已是汗流浹背开瞭。 一陣腳步聲響...
    開封第一講書人閱讀 32,095評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留罩息,地道東北人嗤详。 一個月前我還...
    沈念sama閱讀 46,589評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像瓷炮,于是被迫代替她去往敵國和親葱色。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,687評論 2 351

推薦閱讀更多精彩內(nèi)容