Prologue
數(shù)據(jù)去重(data deduplication)是我們大數(shù)據(jù)攻城獅司空見(jiàn)慣的問(wèn)題了欣簇。除了統(tǒng)計(jì)UV等傳統(tǒng)用法之外,去重的意義更在于消除不可靠數(shù)據(jù)源產(chǎn)生的臟數(shù)據(jù)——即重復(fù)上報(bào)數(shù)據(jù)或重復(fù)投遞數(shù)據(jù)的影響坯约,使流式計(jì)算產(chǎn)生的結(jié)果更加準(zhǔn)確熊咽。本文以Flink處理日均億級(jí)別及以上的日志數(shù)據(jù)為背景,討論除了樸素方法(HashSet)之外的三種實(shí)時(shí)去重方案闹丐,即:布隆過(guò)濾器横殴、RocksDB狀態(tài)后端、外部存儲(chǔ)卿拴。
布隆過(guò)濾器去重
布隆過(guò)濾器在筆者的博客里出鏡率是很高的衫仑,如果看官尚未了解,請(qǐng)務(wù)必先食用這篇文章巍棱。
以之前用過(guò)的子訂單日志模型為例惑畴,假設(shè)上游數(shù)據(jù)源產(chǎn)生的消息為<Integer, Long, String>三元組,三個(gè)元素分別代表站點(diǎn)ID航徙、子訂單ID和數(shù)據(jù)載荷如贷。由于數(shù)據(jù)源只能保證at least once語(yǔ)義(例如未開(kāi)啟correlation ID機(jī)制的RabbitMQ隊(duì)列),會(huì)重復(fù)投遞子訂單數(shù)據(jù),導(dǎo)致下游各統(tǒng)計(jì)結(jié)果偏高「芨ぃ現(xiàn)引入Guava的BloomFilter來(lái)去重尚猿,直接上代碼說(shuō)事。
// dimensionedStream是個(gè)DataStream<Tuple3<Integer, Long, String>>
DataStream<String> dedupStream = dimensionedStream
.keyBy(0)
.process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
.name("process_sub_order_dedup").uid("process_sub_order_dedup");
// 去重用的ProcessFunction
public static final class SubOrderDeduplicateProcessFunc
extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);
private static final int BF_CARDINAL_THRESHOLD = 1000000;
private static final double BF_FALSE_POSITIVE_RATE = 0.01;
private volatile BloomFilter<Long> subOrderFilter;
@Override
public void open(Configuration parameters) throws Exception {
long s = System.currentTimeMillis();
subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
long e = System.currentTimeMillis();
LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));
}
@Override
public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
long subOrderId = value.f1;
if (!subOrderFilter.mightContain(subOrderId)) {
subOrderFilter.put(subOrderId);
out.collect(value.f2);
}
ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
long s = System.currentTimeMillis();
subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
long e = System.currentTimeMillis();
LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost: " + (e - s));
}
@Override
public void close() throws Exception {
subOrderFilter = null;
}
}
// 根據(jù)當(dāng)前時(shí)間戳獲取第二天0時(shí)0分0秒的時(shí)間戳
public static long tomorrowZeroTimestampMs(long now, int timeZone) {
return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}
這里先按照站點(diǎn)ID為key分組楣富,然后在每個(gè)分組內(nèi)創(chuàng)建存儲(chǔ)子訂單ID的布隆過(guò)濾器凿掂。布隆過(guò)濾器的期望最大數(shù)據(jù)量應(yīng)該按每天產(chǎn)生子訂單最多的那個(gè)站點(diǎn)來(lái)設(shè)置,這里設(shè)為100萬(wàn)纹蝴,并且可容忍的誤判率為1%庄萎。根據(jù)上面科普文中的講解,單個(gè)布隆過(guò)濾器需要8個(gè)哈希函數(shù)塘安,其位圖占用內(nèi)存約114MB糠涛,壓力不大。
每當(dāng)一條數(shù)據(jù)進(jìn)入時(shí)兼犯,調(diào)用BloomFilter.mightContain()方法判斷對(duì)應(yīng)的子訂單ID是否已出現(xiàn)過(guò)忍捡。當(dāng)沒(méi)出現(xiàn)過(guò)時(shí),調(diào)用put()方法將其插入BloomFilter切黔,并交給Collector輸出砸脊。
另外,通過(guò)注冊(cè)第二天凌晨0時(shí)0分0秒的processing time計(jì)時(shí)器纬霞,就可以在onTimer()方法內(nèi)重置布隆過(guò)濾器凌埂,開(kāi)始新一天的去重磅废。
(吐槽一句,Guava的BloomFilter竟然沒(méi)有提供清零的方法业栅,有點(diǎn)詭異)
內(nèi)嵌RocksDB狀態(tài)后端去重
布隆過(guò)濾器雖然香岛都,但是它不能做到100%精確。在必須保證萬(wàn)無(wú)一失的場(chǎng)合卡者,我們可以選擇Flink自帶的RocksDB狀態(tài)后端,這樣不需要依賴(lài)其他的組件。之前已經(jīng)講過(guò)脐湾,RocksDB本身是一個(gè)類(lèi)似于HBase的嵌入式K-V數(shù)據(jù)庫(kù),并且它的本地性比較好叙淌,用它維護(hù)一個(gè)較大的狀態(tài)集合并不是什么難事秤掌。
首先我們要開(kāi)啟RocksDB狀態(tài)后端(平常在生產(chǎn)環(huán)境中,也建議總是使用它)鹰霍,并配置好相應(yīng)的參數(shù)闻鉴。這些參數(shù)同樣可以在flink-conf.yaml里寫(xiě)入。
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
rocksDBStateBackend.setNumberOfTransferingThreads(2);
rocksDBStateBackend.enableTtlCompactionFilter();
env.setStateBackend(rocksDBStateBackend);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(5 * 60 * 1000);
RocksDB的調(diào)優(yōu)是個(gè)很復(fù)雜的話(huà)題茂洒,詳情參見(jiàn)官方提供的tuning guide孟岛,以及Flink配置中與RocksDB相關(guān)的參數(shù),今后會(huì)挑時(shí)間重點(diǎn)分析一下RocksDB存儲(chǔ)大狀態(tài)時(shí)的調(diào)優(yōu)方法。好在Flink已經(jīng)為我們提供了一些預(yù)調(diào)優(yōu)的參數(shù)渠羞,即PredefinedOptions斤贰,請(qǐng)務(wù)必根據(jù)服務(wù)器的實(shí)際情況選擇。我們的Flink集群統(tǒng)一采用SSD做存儲(chǔ)次询,故選擇的是PredefinedOptions.FLASH_SSD_OPTIMIZED荧恍。
另外,由于狀態(tài)空間不小屯吊,打開(kāi)增量檢查點(diǎn)以及設(shè)定多線(xiàn)程讀寫(xiě)RocksDB送巡,可以提高checkpointing效率,檢查點(diǎn)周期也不能太短盒卸。還有授艰,為了避免狀態(tài)無(wú)限增長(zhǎng)下去,我們?nèi)匀坏枚ㄆ谇謇硭慈缤瞎?jié)中布隆過(guò)濾器的復(fù)位)世落。當(dāng)然淮腾,除了自己注冊(cè)定時(shí)器之外,我們也可以利用Flink提供的狀態(tài)TTL機(jī)制屉佳,并打開(kāi)RocksDB狀態(tài)后端的TTL compaction filter谷朝,讓它們?cè)赗ocksDB后臺(tái)執(zhí)行compaction操作時(shí)自動(dòng)刪除。特別注意武花,狀態(tài)TTL僅對(duì)時(shí)間特征為處理時(shí)間時(shí)生效圆凰,對(duì)事件時(shí)間是無(wú)效的。
接下來(lái)寫(xiě)具體的業(yè)務(wù)代碼体箕,以上節(jié)的<站點(diǎn)ID, 子訂單ID, 消息載荷>三元組為例专钉,有兩種可實(shí)現(xiàn)的思路:
- 仍然按站點(diǎn)ID分組,用存儲(chǔ)子訂單ID的MapState(當(dāng)做Set來(lái)使用)保存狀態(tài)累铅;
- 直接按子訂單ID分組跃须,用單值的ValueState保存狀態(tài)。
顯然娃兽,如果我們要用狀態(tài)TTL控制過(guò)期的話(huà)菇民,第二種思路更好,因?yàn)榱6雀?xì)投储。代碼如下第练。
// dimensionedStream是個(gè)DataStream<Tuple3<Integer, Long, String>>
DataStream<String> dedupStream = dimensionedStream
.keyBy(1)
.process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
.name("process_sub_order_dedup").uid("process_sub_order_dedup");
// 去重用的ProcessFunction
public static final class SubOrderDeduplicateProcessFunc
extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);
private ValueState<Boolean> existState;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
.setStateVisibility(StateVisibility.NeverReturnExpired)
.setUpdateType(UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter(10000)
.build();
ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>(
"suborder-dedup-state",
Boolean.class
);
existStateDesc.enableTimeToLive(stateTtlConfig);
existState = this.getRuntimeContext().getState(existStateDesc);
}
@Override
public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
if (existState.value() == null) {
existState.update(true);
out.collect(value.f2);
}
}
}
上述代碼中設(shè)定了狀態(tài)TTL的相關(guān)參數(shù):
- 過(guò)期時(shí)間設(shè)為1天;
- 在狀態(tài)值被創(chuàng)建和被更新時(shí)重設(shè)TTL玛荞;
- 已經(jīng)過(guò)期的數(shù)據(jù)不能再被訪(fǎng)問(wèn)到娇掏;
- 在每處理10000條狀態(tài)記錄之后,更新檢測(cè)過(guò)期的時(shí)間戳勋眯。這個(gè)參數(shù)要小心設(shè)定婴梧,更新太頻繁會(huì)降低compaction的性能壁涎,更新過(guò)慢會(huì)使得compaction不及時(shí),狀態(tài)空間膨脹志秃。
在實(shí)際處理數(shù)據(jù)時(shí)怔球,如果數(shù)據(jù)的key(即子訂單ID)對(duì)應(yīng)的狀態(tài)不存在,說(shuō)明它沒(méi)有出現(xiàn)過(guò)浮还,可以更新?tīng)顟B(tài)并輸出竟坛。反之,說(shuō)明它已經(jīng)出現(xiàn)過(guò)了钧舌,直接丟棄担汤,so easy。
最后還需要注意一點(diǎn)洼冻,若數(shù)據(jù)的key占用的空間比較大(如長(zhǎng)度可能會(huì)很長(zhǎng)的字符串類(lèi)型)崭歧,也會(huì)造成狀態(tài)膨脹。我們可以將它hash成整型再存儲(chǔ)撞牢,這樣每個(gè)key就最多只占用8個(gè)字節(jié)了率碾。不過(guò)任何哈希算法都無(wú)法保證不產(chǎn)生沖突,所以還是得根據(jù)業(yè)務(wù)場(chǎng)景自行決定屋彪。
引入外部K-V存儲(chǔ)去重
如果既不想用布隆過(guò)濾器所宰,也不想在Flink作業(yè)內(nèi)維護(hù)巨大的狀態(tài),就只能用折衷方案了:利用外部K-V數(shù)據(jù)庫(kù)(Redis畜挥、HBase之類(lèi))存儲(chǔ)需要去重的鍵仔粥。由于外部存儲(chǔ)對(duì)內(nèi)存和磁盤(pán)占用同樣敏感,所以也得設(shè)定相應(yīng)的TTL蟹但,以及對(duì)大的鍵進(jìn)行壓縮躯泰。另外,外部K-V存儲(chǔ)畢竟是獨(dú)立于Flink框架之外的华糖,一旦作業(yè)出現(xiàn)問(wèn)題重啟麦向,外部存儲(chǔ)是不會(huì)與作業(yè)的checkpoint同步恢復(fù)到一致的狀態(tài)的,也就是說(shuō)結(jié)果仍然會(huì)出現(xiàn)偏差缅阳,需要注意磕蛇。
鑒于這種方案對(duì)第三方組件有強(qiáng)依賴(lài),要關(guān)心的東西太多十办,所以一般情況下是不用的,我們也沒(méi)有實(shí)操過(guò)超棺,所以抱歉沒(méi)有代碼了向族。
The End
如果有其他更高效的解決方法,歡迎批評(píng)指正哈棠绘。
民那晚安件相。