談?wù)勅N海量數(shù)據(jù)實(shí)時(shí)去重方案(w/ Flink)

Prologue

數(shù)據(jù)去重(data deduplication)是我們大數(shù)據(jù)攻城獅司空見(jiàn)慣的問(wèn)題了欣簇。除了統(tǒng)計(jì)UV等傳統(tǒng)用法之外,去重的意義更在于消除不可靠數(shù)據(jù)源產(chǎn)生的臟數(shù)據(jù)——即重復(fù)上報(bào)數(shù)據(jù)或重復(fù)投遞數(shù)據(jù)的影響坯约,使流式計(jì)算產(chǎn)生的結(jié)果更加準(zhǔn)確熊咽。本文以Flink處理日均億級(jí)別及以上的日志數(shù)據(jù)為背景,討論除了樸素方法(HashSet)之外的三種實(shí)時(shí)去重方案闹丐,即:布隆過(guò)濾器横殴、RocksDB狀態(tài)后端、外部存儲(chǔ)卿拴。

布隆過(guò)濾器去重

布隆過(guò)濾器在筆者的博客里出鏡率是很高的衫仑,如果看官尚未了解,請(qǐng)務(wù)必先食用這篇文章巍棱。

以之前用過(guò)的子訂單日志模型為例惑畴,假設(shè)上游數(shù)據(jù)源產(chǎn)生的消息為<Integer, Long, String>三元組,三個(gè)元素分別代表站點(diǎn)ID航徙、子訂單ID和數(shù)據(jù)載荷如贷。由于數(shù)據(jù)源只能保證at least once語(yǔ)義(例如未開(kāi)啟correlation ID機(jī)制的RabbitMQ隊(duì)列),會(huì)重復(fù)投遞子訂單數(shù)據(jù),導(dǎo)致下游各統(tǒng)計(jì)結(jié)果偏高「芨ぃ現(xiàn)引入Guava的BloomFilter來(lái)去重尚猿,直接上代碼說(shuō)事。

  // dimensionedStream是個(gè)DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(0)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);
    private static final int BF_CARDINAL_THRESHOLD = 1000000;
    private static final double BF_FALSE_POSITIVE_RATE = 0.01;

    private volatile BloomFilter<Long> subOrderFilter;

    @Override
    public void open(Configuration parameters) throws Exception {
      long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
      long subOrderId = value.f1;
      if (!subOrderFilter.mightContain(subOrderId)) {
        subOrderFilter.put(subOrderId);
        out.collect(value.f2);
      }
      ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
      long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost: " + (e - s));
    }

    @Override
    public void close() throws Exception {
      subOrderFilter = null;
    }
  }

  // 根據(jù)當(dāng)前時(shí)間戳獲取第二天0時(shí)0分0秒的時(shí)間戳
  public static long tomorrowZeroTimestampMs(long now, int timeZone) {
    return now - (now + timeZone * 3600000) % 86400000 + 86400000;
  }

這里先按照站點(diǎn)ID為key分組楣富,然后在每個(gè)分組內(nèi)創(chuàng)建存儲(chǔ)子訂單ID的布隆過(guò)濾器凿掂。布隆過(guò)濾器的期望最大數(shù)據(jù)量應(yīng)該按每天產(chǎn)生子訂單最多的那個(gè)站點(diǎn)來(lái)設(shè)置,這里設(shè)為100萬(wàn)纹蝴,并且可容忍的誤判率為1%庄萎。根據(jù)上面科普文中的講解,單個(gè)布隆過(guò)濾器需要8個(gè)哈希函數(shù)塘安,其位圖占用內(nèi)存約114MB糠涛,壓力不大。

每當(dāng)一條數(shù)據(jù)進(jìn)入時(shí)兼犯,調(diào)用BloomFilter.mightContain()方法判斷對(duì)應(yīng)的子訂單ID是否已出現(xiàn)過(guò)忍捡。當(dāng)沒(méi)出現(xiàn)過(guò)時(shí),調(diào)用put()方法將其插入BloomFilter切黔,并交給Collector輸出砸脊。

另外,通過(guò)注冊(cè)第二天凌晨0時(shí)0分0秒的processing time計(jì)時(shí)器纬霞,就可以在onTimer()方法內(nèi)重置布隆過(guò)濾器凌埂,開(kāi)始新一天的去重磅废。

(吐槽一句,Guava的BloomFilter竟然沒(méi)有提供清零的方法业栅,有點(diǎn)詭異)

內(nèi)嵌RocksDB狀態(tài)后端去重

布隆過(guò)濾器雖然香岛都,但是它不能做到100%精確。在必須保證萬(wàn)無(wú)一失的場(chǎng)合卡者,我們可以選擇Flink自帶的RocksDB狀態(tài)后端,這樣不需要依賴(lài)其他的組件。之前已經(jīng)講過(guò)脐湾,RocksDB本身是一個(gè)類(lèi)似于HBase的嵌入式K-V數(shù)據(jù)庫(kù),并且它的本地性比較好叙淌,用它維護(hù)一個(gè)較大的狀態(tài)集合并不是什么難事秤掌。

首先我們要開(kāi)啟RocksDB狀態(tài)后端(平常在生產(chǎn)環(huán)境中,也建議總是使用它)鹰霍,并配置好相應(yīng)的參數(shù)闻鉴。這些參數(shù)同樣可以在flink-conf.yaml里寫(xiě)入。

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
rocksDBStateBackend.setNumberOfTransferingThreads(2);
rocksDBStateBackend.enableTtlCompactionFilter();

env.setStateBackend(rocksDBStateBackend);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(5 * 60 * 1000);

RocksDB的調(diào)優(yōu)是個(gè)很復(fù)雜的話(huà)題茂洒,詳情參見(jiàn)官方提供的tuning guide孟岛,以及Flink配置中與RocksDB相關(guān)的參數(shù),今后會(huì)挑時(shí)間重點(diǎn)分析一下RocksDB存儲(chǔ)大狀態(tài)時(shí)的調(diào)優(yōu)方法。好在Flink已經(jīng)為我們提供了一些預(yù)調(diào)優(yōu)的參數(shù)渠羞,即PredefinedOptions斤贰,請(qǐng)務(wù)必根據(jù)服務(wù)器的實(shí)際情況選擇。我們的Flink集群統(tǒng)一采用SSD做存儲(chǔ)次询,故選擇的是PredefinedOptions.FLASH_SSD_OPTIMIZED荧恍。

另外,由于狀態(tài)空間不小屯吊,打開(kāi)增量檢查點(diǎn)以及設(shè)定多線(xiàn)程讀寫(xiě)RocksDB送巡,可以提高checkpointing效率,檢查點(diǎn)周期也不能太短盒卸。還有授艰,為了避免狀態(tài)無(wú)限增長(zhǎng)下去,我們?nèi)匀坏枚ㄆ谇謇硭慈缤瞎?jié)中布隆過(guò)濾器的復(fù)位)世落。當(dāng)然淮腾,除了自己注冊(cè)定時(shí)器之外,我們也可以利用Flink提供的狀態(tài)TTL機(jī)制屉佳,并打開(kāi)RocksDB狀態(tài)后端的TTL compaction filter谷朝,讓它們?cè)赗ocksDB后臺(tái)執(zhí)行compaction操作時(shí)自動(dòng)刪除。特別注意武花,狀態(tài)TTL僅對(duì)時(shí)間特征為處理時(shí)間時(shí)生效圆凰,對(duì)事件時(shí)間是無(wú)效的。

接下來(lái)寫(xiě)具體的業(yè)務(wù)代碼体箕,以上節(jié)的<站點(diǎn)ID, 子訂單ID, 消息載荷>三元組為例专钉,有兩種可實(shí)現(xiàn)的思路:

  • 仍然按站點(diǎn)ID分組,用存儲(chǔ)子訂單ID的MapState(當(dāng)做Set來(lái)使用)保存狀態(tài)累铅;
  • 直接按子訂單ID分組跃须,用單值的ValueState保存狀態(tài)。

顯然娃兽,如果我們要用狀態(tài)TTL控制過(guò)期的話(huà)菇民,第二種思路更好,因?yàn)榱6雀?xì)投储。代碼如下第练。

  // dimensionedStream是個(gè)DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(1)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);

    private ValueState<Boolean> existState;

    @Override
    public void open(Configuration parameters) throws Exception {
      StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
        .setStateVisibility(StateVisibility.NeverReturnExpired)
        .setUpdateType(UpdateType.OnCreateAndWrite)
        .cleanupInRocksdbCompactFilter(10000)
        .build();

      ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>(
        "suborder-dedup-state",
        Boolean.class
      );
      existStateDesc.enableTimeToLive(stateTtlConfig);

      existState = this.getRuntimeContext().getState(existStateDesc);
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
      if (existState.value() == null) {
        existState.update(true);
        out.collect(value.f2);
      }
    }
  }

上述代碼中設(shè)定了狀態(tài)TTL的相關(guān)參數(shù):

  • 過(guò)期時(shí)間設(shè)為1天;
  • 在狀態(tài)值被創(chuàng)建和被更新時(shí)重設(shè)TTL玛荞;
  • 已經(jīng)過(guò)期的數(shù)據(jù)不能再被訪(fǎng)問(wèn)到娇掏;
  • 在每處理10000條狀態(tài)記錄之后,更新檢測(cè)過(guò)期的時(shí)間戳勋眯。這個(gè)參數(shù)要小心設(shè)定婴梧,更新太頻繁會(huì)降低compaction的性能壁涎,更新過(guò)慢會(huì)使得compaction不及時(shí),狀態(tài)空間膨脹志秃。

在實(shí)際處理數(shù)據(jù)時(shí)怔球,如果數(shù)據(jù)的key(即子訂單ID)對(duì)應(yīng)的狀態(tài)不存在,說(shuō)明它沒(méi)有出現(xiàn)過(guò)浮还,可以更新?tīng)顟B(tài)并輸出竟坛。反之,說(shuō)明它已經(jīng)出現(xiàn)過(guò)了钧舌,直接丟棄担汤,so easy。

最后還需要注意一點(diǎn)洼冻,若數(shù)據(jù)的key占用的空間比較大(如長(zhǎng)度可能會(huì)很長(zhǎng)的字符串類(lèi)型)崭歧,也會(huì)造成狀態(tài)膨脹。我們可以將它hash成整型再存儲(chǔ)撞牢,這樣每個(gè)key就最多只占用8個(gè)字節(jié)了率碾。不過(guò)任何哈希算法都無(wú)法保證不產(chǎn)生沖突,所以還是得根據(jù)業(yè)務(wù)場(chǎng)景自行決定屋彪。

引入外部K-V存儲(chǔ)去重

如果既不想用布隆過(guò)濾器所宰,也不想在Flink作業(yè)內(nèi)維護(hù)巨大的狀態(tài),就只能用折衷方案了:利用外部K-V數(shù)據(jù)庫(kù)(Redis畜挥、HBase之類(lèi))存儲(chǔ)需要去重的鍵仔粥。由于外部存儲(chǔ)對(duì)內(nèi)存和磁盤(pán)占用同樣敏感,所以也得設(shè)定相應(yīng)的TTL蟹但,以及對(duì)大的鍵進(jìn)行壓縮躯泰。另外,外部K-V存儲(chǔ)畢竟是獨(dú)立于Flink框架之外的华糖,一旦作業(yè)出現(xiàn)問(wèn)題重啟麦向,外部存儲(chǔ)是不會(huì)與作業(yè)的checkpoint同步恢復(fù)到一致的狀態(tài)的,也就是說(shuō)結(jié)果仍然會(huì)出現(xiàn)偏差缅阳,需要注意磕蛇。

鑒于這種方案對(duì)第三方組件有強(qiáng)依賴(lài),要關(guān)心的東西太多十办,所以一般情況下是不用的,我們也沒(méi)有實(shí)操過(guò)超棺,所以抱歉沒(méi)有代碼了向族。

The End

如果有其他更高效的解決方法,歡迎批評(píng)指正哈棠绘。

民那晚安件相。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末再扭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子夜矗,更是在濱河造成了極大的恐慌泛范,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件紊撕,死亡現(xiàn)場(chǎng)離奇詭異罢荡,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)对扶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)区赵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人浪南,你說(shuō)我怎么就攤上這事笼才。” “怎么了络凿?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵骡送,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我絮记,道長(zhǎng)各谚,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任到千,我火速辦了婚禮昌渤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘憔四。我一直安慰自己膀息,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布了赵。 她就那樣靜靜地躺著潜支,像睡著了一般。 火紅的嫁衣襯著肌膚如雪柿汛。 梳的紋絲不亂的頭發(fā)上冗酿,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音络断,去河邊找鬼裁替。 笑死,一個(gè)胖子當(dāng)著我的面吹牛貌笨,可吹牛的內(nèi)容都是我干的弱判。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼锥惋,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼昌腰!你這毒婦竟也來(lái)了开伏?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤遭商,失蹤者是張志新(化名)和其女友劉穎固灵,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體劫流,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡巫玻,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了困介。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片大审。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖座哩,靈堂內(nèi)的尸體忽然破棺而出徒扶,到底是詐尸還是另有隱情,我是刑警寧澤根穷,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布姜骡,位于F島的核電站,受9級(jí)特大地震影響屿良,放射性物質(zhì)發(fā)生泄漏圈澈。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一尘惧、第九天 我趴在偏房一處隱蔽的房頂上張望康栈。 院中可真熱鬧,春花似錦喷橙、人聲如沸啥么。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)悬荣。三九已至,卻和暖如春疙剑,著一層夾襖步出監(jiān)牢的瞬間氯迂,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工言缤, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嚼蚀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓轧简,卻偏偏與公主長(zhǎng)得像驰坊,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子哮独,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 最近看了看Flink中state方面的知識(shí)拳芙,F(xiàn)link中的state是啥?state的作用是啥皮璧?為什么Flink中...
    MrSocean閱讀 7,064評(píng)論 3 13
  • 本文是先介紹 Flink舟扎,再說(shuō) Flink的過(guò)去和現(xiàn)在 一、Flink介紹 Flink是一款分布式的計(jì)算引擎悴务,它可...
    生活的探路者閱讀 1,272評(píng)論 0 22
  • 前言 最近公司有個(gè)項(xiàng)目需要驗(yàn)證APP應(yīng)用在一段時(shí)間內(nèi)消耗的流量統(tǒng)計(jì)睹限,與后臺(tái)數(shù)據(jù)平臺(tái)以及APP自身打印的log日志進(jìn)...
    keitwo閱讀 4,826評(píng)論 0 0
  • 一本童話(huà)版的理財(cái)入門(mén)書(shū),講述一個(gè)小姑娘救助了一條狗之后展開(kāi)的理財(cái)入門(mén)之路讯檐。語(yǔ)言淺顯易懂羡疗,很適合用來(lái)向年輕人介紹如何...
    佐佐吧閱讀 538評(píng)論 1 1
  • 文/流紋千枚 送給烈日一團(tuán)心中的火讓它在燃燒之中更熾熱 這世界也許會(huì)重生也許會(huì)毀滅——大地孵化一條紅色火蛇像女人欲...
    流紋千枚閱讀 162評(píng)論 2 4