Flink-Kafka
眾所周知脑慧,F(xiàn)link在很早的時候就通過Checkpointing提供了exactly-once的semantic,不過僅限于自身或者是從KafkaConsumer中消費數(shù)據(jù)嗦随。而在Flink 1.4版本的時候加入了赫赫有名的TwoPhaseCommitSinkFunction碍沐,提供了End-to-End的exatcly-once語言身隐,當然是在需要下游支持回滾的情況下,具體的概念和設計方式官網(wǎng)已經(jīng)寫的比較清楚锤灿,就不多加贅述第股。而對于KafkaProducer,Kafka在0.11版本之后支持transaction吼拥,也就意味著支持對寫入數(shù)據(jù)的commit和rollback倚聚,在通過Flink寫入到Kafka的應用程序中可以達到exactly-once的效果。
接下來展示一下如何在Flink應用程序中激活exactly-once語義凿可。對于SourceFunction大家隨意采用一種即可惑折,文件,kafka topic等皆可枯跑。而主要部分是在于對FlinkKafkaProducer的初始化惨驶。我使用的是Flink1.7版本使用的Producer類為FlinkKafkaProducer011,觀察它的構(gòu)造函數(shù)敛助,很容易發(fā)現(xiàn)有的構(gòu)造函數(shù)中需要你傳入一個枚舉變量semantic, 有三種可選值NONE, AT_LEAST_ONCE,EXACTLY_ONCE粗卜,而默認值為AT_LEAST_ONCE,很顯然我們在這里需要使用EXACTLY_ONCE纳击。不過在此之前续扔,我們需要仔細閱讀一下Flink官網(wǎng)Flink-kafka-connector的內(nèi)容攻臀,其中提到,Kafka broker的transaction.max.timeout.ms默認為15分鐘纱昧,而FlinkKafkaProducer011默認的transaction.timeout.ms為1個小時刨啸,遠遠超出了broker的最大超時時間,這種情況下如果你的服務掛了超過15分鐘识脆,就會造成數(shù)據(jù)丟失设联。所以如果需要你的producer支持的更長的事務時間就需要提高kafka broker transaction.max.timeout.ms的值。下面是一個簡單的實例去使用Exactly-once語義的FlinkKafkaProducer存璃。
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
topics,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
properties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
)
這么做的話Flink sink到Kafka中在大部分情況下就都能保證Exactly-once仑荐。值得注意的是,所有通過事務寫入的Kafka topic, 在消費他們的時候纵东,必須給消費者加上參數(shù)isolation.level=read_committed粘招,這是因為Kafka的事務支持是給寫入的數(shù)據(jù)分為committed和uncomitted,如果使用默認配置的consumer偎球,讀取的時候依然會讀取所有數(shù)據(jù)而不是根據(jù)事務隔離洒扎。
Flink-Hdfs
目前我們使用的cdh中hadoop版本為2.6,Hadoop在2.7版本后對Hdfs支持了truncate操作衰絮,會使得回滾機制感覺方便快捷袍冷。這里只談一下關于低版本Hdfs flink的容錯機制,以及我們自身對寫入消息進行g(shù)zip壓縮所遇到的一些坑猫牡。
flink-connector-filesystem的源碼中提供了BucketingSink來支持文件在文件系統(tǒng)上的滾動寫入胡诗。BucketingSink對象通過傳入自定義Writer來執(zhí)行寫入的方式,研究下面的一些已經(jīng)實現(xiàn)的Writer類可以發(fā)現(xiàn)淌友,BucketingSink通過hadoop API的FSDataOutputStream來創(chuàng)建文件流和寫入煌恢。而FSDataOutputStream中又包裹了一個PositionCache類來記錄文件流每次運行的狀態(tài)。
private static class PositionCache extends FilterOutputStream {
private FileSystem.Statistics statistics;
long position;
public PositionCache(OutputStream out,
FileSystem.Statistics stats,
long pos) throws IOException {
super(out);
statistics = stats;
position = pos;
}
public void write(int b) throws IOException {
out.write(b);
position++;
if (statistics != null) {
statistics.incrementBytesWritten(1);
}
}
public void write(byte b[], int off, int len) throws IOException {
out.write(b, off, len);
position += len; // update position
if (statistics != null) {
statistics.incrementBytesWritten(len);
}
}
public long getPos() throws IOException {
return position; // return cached position
}
public void close() throws IOException {
out.close();
}
}
從該類中可以看到震庭,文件流每次執(zhí)行write操作的時候瑰抵,PosistionCache都會刷新他本身的position變量。而BucketingSink的BucketState則會通過這個變量來更新currentFileValiedLength成員來記錄文件的有效長度器联。當Job因為某種原因down了之后二汛,checkpoint會記錄bucketState的信息,在任務恢復的時候拨拓,會在文件系統(tǒng)上生成一個valid-length文件來表明該文件的有效長度(單位:Byte)是多少(Hadoop2.7后的truncate()功能可以直接幫你truncate掉多余的內(nèi)容肴颊,但是低版本就需要自己處理了)。
在我們的業(yè)務環(huán)境中千元,需要對寫入的文本文件進行g(shù)zip壓縮苫昌,F(xiàn)link目前只提供了SequenceFile和Avro格式的Writer,并沒有提供普通的原生文本壓縮支持幸海。所以需要我們自己編寫Writer祟身。在這值得注意的是對于壓縮流庫的選擇,我們選擇了java.util.zip下的GzipOutpuStream而不是org.apache.hadoop.io下的CompressOutputStream物独,原因是后者不支持對壓縮數(shù)據(jù)流設置syncFlush袜硫,因此在調(diào)用flush()方法的時候只會flush outputStream而前者會先flush底層的compressor。后者在使用中會導致PositionCache的position不正常從而導致valid-length不可用而無法達到hdfs的exactly-once語義挡篓。
public class HdfsCompressStringWriter extends StreamWriterBase<JSONObject> {
private static final long serialVersionUID = 2L;
/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
/**
--此處對StreamWriterBase類進行了修改添加了resetStream方法來將內(nèi)部的FSDataOutputStream置空婉陷,
不然在close的時候如果已經(jīng)通過compressionOutputStream關閉流則FSDataOutputStream對象沒有置空
會導致再下一次open的時候報Stream already open的錯誤。
*/
resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "\n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}
}
通過自定義的GzipWriter官研,如果任務遇到異常秽澳,checkpoint會記錄valid-length來讓我們恢復成準確無重復的數(shù)據(jù)。但是由于我們是2.6版本的Hadoop戏羽,只能將壓縮文件從Hdfs上get下來處理担神。而由于gzip非文本文件,而且在文件尾部有一個4字節(jié)的滾動更新的CRC32編碼始花,和另外一個4字節(jié)的ISIZE代表原始非壓縮文件的長度對求模妄讯,簡單的truncate會導致gzip文件損壞而無法通過正常的解壓縮讀取。不過Gzip本身的壓縮文本是以chunk形式連續(xù)存在的酷宵,zcat命令可以在不壓縮的情況下讀取有效的內(nèi)容亥贸。所以如果我們需要修復原始的文件,則大致必須通過以下方式浇垦。
length=$(hdfs dfs -text /path/to/my/file.valid-length) # 有時候獲取的length還有一些奇怪的空字符和特殊字符要處理比如('\x0' or ^M^H等)
hdfs dfs -get /path/to/my/file.gz myfile.gz
truncate myfile.gz -s $length
zcat myfile.gz > myfixedfile
gzip myfixedfile
現(xiàn)在的問題是炕置,如果文件數(shù)較多且大小不小的時候,通過腳本逐步執(zhí)行這些效率會非常低男韧,所以目前也在尋找更完善的方式去達成這個問題朴摊。
Flink-Elasticsearch & Hbase
對于Es,我們用了整條數(shù)據(jù)的哈希作為uuid煌抒,對于Hbase每條數(shù)據(jù)也同樣有固定的rowkey仍劈,因此只需要AT_LEAST_ONCE語義就可以保證數(shù)據(jù)不缺失不重復。