Flink-[白話解析] Flink的Watermark機(jī)制

轉(zhuǎn)自:https://bbs.huaweicloud.com/blogs/146333

[白話解析] Flink的Watermark機(jī)制

0x00 摘要

對(duì)于Flink來(lái)說(shuō),Watermark是個(gè)很難繞過(guò)去的概念针余。本文將從整體的思路上來(lái)說(shuō),運(yùn)用感性直覺(jué)的思考來(lái)幫大家梳理Watermark概念锅棕。

0x01 問(wèn)題

關(guān)于Watermark酥夭,很容易產(chǎn)生幾個(gè)問(wèn)題

Flink 流處理應(yīng)用中,常見(jiàn)的處理需求/應(yīng)對(duì)方案是什么?

Watermark究竟應(yīng)該翻譯成水印還是水位線筏勒?

Watermark本質(zhì)是什么?

Watermark是如何解決問(wèn)題?

下面我們就來(lái)簡(jiǎn)要解答這些問(wèn)題以給大家一個(gè)大致概念遮精,在后文中居夹,會(huì)再深入描述。

問(wèn)題1. Flink 流處理應(yīng)用中常見(jiàn)的需求/方案是什么

聚合類(lèi)的處理?Flink可以每來(lái)一個(gè)消息就處理一次本冲,但是有時(shí)我們需要做一些聚合類(lèi)的處理准脂,例如:在過(guò)去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。所以Flink引入了窗口概念檬洞。

窗口?窗口的作用為了周期性的獲取數(shù)據(jù)狸膏。就是把傳入的原始數(shù)據(jù)流切分成多個(gè)buckets,所有計(jì)算都在單一的buckets中進(jìn)行添怔。窗口(window)就是從 Streaming 到 Batch 的一個(gè)橋梁湾戳。

帶來(lái)的問(wèn)題:聚合類(lèi)處理帶來(lái)了新的問(wèn)題贤旷,比如亂序/延遲。其解決方案就是 Watermark / allowLateNess / sideOutPut 這一組合拳砾脑。

Watermark?的作用是防止 數(shù)據(jù)亂序 / 指定時(shí)間內(nèi)獲取不到全部數(shù)據(jù)幼驶。

allowLateNess?是將窗口關(guān)閉時(shí)間再延遲一段時(shí)間。

sideOutPut?是最后兜底操作韧衣,當(dāng)指定窗口已經(jīng)徹底關(guān)閉后盅藻,就會(huì)把所有過(guò)期延遲數(shù)據(jù)放到側(cè)輸出流,讓用戶決定如何處理畅铭。

總結(jié)起來(lái)就是說(shuō)

Windows----->Watermark----->allowLateNess----->sideOutPut? ? 用Windows把流數(shù)據(jù)分塊處理氏淑,用Watermark確定什么時(shí)候不再等待更早的數(shù)據(jù)/觸發(fā)窗口進(jìn)行計(jì)算,用allowLateNess 將窗口關(guān)閉時(shí)間再延遲一段時(shí)間硕噩。用sideOutPut 最后兜底把數(shù)據(jù)導(dǎo)出到其他地方假残。

問(wèn)題2. Watermark應(yīng)該翻譯成水位線

我最初看的一篇文章中把Watermark翻譯成“水印”。我當(dāng)時(shí)比較暈炉擅。因?yàn)榘凑f(shuō)名字一定能夠反應(yīng)事物本質(zhì)守问。但是我怎么也腦補(bǔ)不出這個(gè)”水印“的本質(zhì)。

繼續(xù)看文章內(nèi)容坑资,越來(lái)越覺(jué)得這個(gè)應(yīng)該翻譯成“水位線”。于是查了查穆端,確實(shí)英文有如下翻譯:high-water mark 高水位線(海水或洪水所達(dá)到的最高水位)袱贮。

后來(lái)逐漸看到其他文章中也有翻譯成水位線,我才放心下來(lái)体啰,終于不會(huì)出現(xiàn)第二個(gè)“套接字”這樣神奇的翻譯了攒巍。

問(wèn)題3. Watermark本質(zhì)是什么

Watermarks是基于已經(jīng)收集的消息來(lái)估算是否還有消息未到達(dá),本質(zhì)上是一個(gè)時(shí)間戳荒勇。時(shí)間戳反映的是事件發(fā)生的時(shí)間柒莉,而不是事件處理的時(shí)間。

這個(gè)從Flink的源碼就能看出來(lái)沽翔,唯一有意義的成員變量就是 timestamp兢孝。

publicfinalclassWatermarkextendsStreamElement{/*The watermark that signifies end-of-event-time. */publicstaticfinal WatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);/* The timestamp of the watermark in milliseconds. */privatefinal long timestamp;/* Creates a new watermark with the given timestamp in milliseconds.*/publicWatermarklong timestamp){this.timestamp=timestamp;}/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/publiclonggetTimestamp(){returntimestamp;}}

問(wèn)題4. Watermark如何解決問(wèn)題

Watermark是一種告訴Flink一個(gè)消息延遲多少的方式。它定義了什么時(shí)候不再等待更早的數(shù)據(jù)仅偎。

可以把Watermarks理解為一個(gè)水位線跨蟹,這個(gè)Watermarks在不斷的變化。Watermark實(shí)際上作為數(shù)據(jù)流的一部分隨數(shù)據(jù)流流動(dòng)橘沥。

當(dāng)Flink中的運(yùn)算符接收到Watermarks時(shí)窗轩,它明白早于該時(shí)間的消息已經(jīng)完全抵達(dá)計(jì)算引擎,即假設(shè)不會(huì)再有時(shí)間小于水位線的事件到達(dá)座咆。

這個(gè)假設(shè)是觸發(fā)窗口計(jì)算的基礎(chǔ)痢艺,只有水位線越過(guò)窗口對(duì)應(yīng)的結(jié)束時(shí)間仓洼,窗口才會(huì)關(guān)閉和進(jìn)行計(jì)算。

0x02 背景概念

流處理

流處理堤舒,最本質(zhì)的是在處理數(shù)據(jù)的時(shí)候色建,接受一條處理一條數(shù)據(jù)。

批處理植酥,則是累積數(shù)據(jù)到一定程度在處理镀岛。這是他們本質(zhì)的區(qū)別。

在設(shè)計(jì)上Flink認(rèn)為數(shù)據(jù)是流式的友驮,批處理只是流處理的特例漂羊。同時(shí)對(duì)數(shù)據(jù)分為有界數(shù)據(jù)和無(wú)界數(shù)據(jù)。

有界數(shù)據(jù)對(duì)應(yīng)批處理卸留,API對(duì)應(yīng)Dateset走越。

無(wú)界數(shù)據(jù)對(duì)應(yīng)流處理,API對(duì)應(yīng)DataStream耻瑟。

亂序(out-of-order)

什么是亂序呢旨指?可以理解為數(shù)據(jù)到達(dá)的順序和其實(shí)際產(chǎn)生時(shí)間的排序不一致。導(dǎo)致這的原因有很多喳整,比如延遲谆构,消息積壓,重試等等框都。

我們知道搬素,流處理從事件產(chǎn)生,到流經(jīng)source魏保,再到operator熬尺,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下谓罗,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的粱哼,但是也不排除由于網(wǎng)絡(luò)、背壓等原因檩咱,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)揭措。

比如:

某數(shù)據(jù)源中的某些數(shù)據(jù)由于某種原因(如:網(wǎng)絡(luò)原因,外部存儲(chǔ)自身原因)會(huì)有5秒的延時(shí)刻蚯,也就是在實(shí)際時(shí)間的第1秒產(chǎn)生的數(shù)據(jù)有可能在第5秒中產(chǎn)生的數(shù)據(jù)之后到來(lái)(比如到Window處理節(jié)點(diǎn))蜂筹。有1~10個(gè)事件。亂序到達(dá)的序列是:2,3,4,5,1,6,3,8,9,10,7

0x03 Flink中的窗口概念

窗口

對(duì)于Flink芦倒,如果來(lái)一條消息計(jì)算一條艺挪,這樣是可以的,但是這樣計(jì)算是非常頻繁而且消耗資源,如果想做一些統(tǒng)計(jì)這是不可能的麻裳。所以對(duì)于Spark和Flink都產(chǎn)生了窗口計(jì)算口蝠。

比如 是因?yàn)槲覀兿肟吹竭^(guò)去一分鐘,過(guò)去半小時(shí)的訪問(wèn)數(shù)據(jù)津坑,這時(shí)候我們就需要窗口妙蔗。

Window:Window是處理無(wú)界流的關(guān)鍵,Windows將流拆分為一個(gè)個(gè)有限大小的buckets疆瑰,可以可以在每一個(gè)buckets中進(jìn)行計(jì)算眉反。

start_time,end_time:當(dāng)Window時(shí)時(shí)間窗口的時(shí)候,每個(gè)window都會(huì)有一個(gè)開(kāi)始時(shí)間和結(jié)束時(shí)間(前開(kāi)后閉)穆役,這個(gè)時(shí)間是系統(tǒng)時(shí)間寸五。

窗口生命周期

簡(jiǎn)而言之,只要屬于此窗口的第一個(gè)元素到達(dá)耿币,就會(huì)創(chuàng)建一個(gè)窗口梳杏,當(dāng)時(shí)間(事件或處理時(shí)間)超過(guò)其結(jié)束時(shí)間戳加上用戶指定的允許延遲時(shí),窗口將被完全刪除淹接。

例如:

使用基于事件時(shí)間的窗口策略十性,每5分鐘創(chuàng)建一個(gè)不重疊(或翻滾)的窗口并允許延遲1分鐘。? ? 假定目前是12:00塑悼。當(dāng)具有落入該間隔的時(shí)間戳的第一個(gè)元素到達(dá)時(shí)蟹肘,F(xiàn)link將為12:00到12:05之間的間隔創(chuàng)建一個(gè)新窗口澈魄,當(dāng)水位線(watermark)到12:06時(shí)間戳?xí)r將刪除它张吉。

窗口有如下組件:

Window Assigner:用來(lái)決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去耐亏。

Trigger:觸發(fā)器。決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除郭怪。觸發(fā)策略可能類(lèi)似于“當(dāng)窗口中的元素?cái)?shù)量大于4”時(shí),或“當(dāng)水位線通過(guò)窗口結(jié)束時(shí)”刊橘。

Evictor:它可以在 觸發(fā)器觸發(fā)后 & 應(yīng)用函數(shù)之前和/或之后 從窗口中刪除元素鄙才。

窗口還擁有函數(shù),比如 ProcessWindowFunction促绵,ReduceFunction攒庵,AggregateFunction或FoldFunction。該函數(shù)將包含要應(yīng)用于窗口內(nèi)容的計(jì)算败晴,而觸發(fā)器指定窗口被認(rèn)為準(zhǔn)備好應(yīng)用該函數(shù)的條件浓冒。

Keyed vs Non-Keyed Windows

在定義窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(...)將無(wú)界流分成邏輯的keyed stream尖坤。 如果未調(diào)用keyBy(...)稳懒,則表示流不是keyed stream。

對(duì)于Keyed流慢味,可以將傳入事件的任何屬性用作key场梆。 擁有Keyed stream將允許窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行墅冷,因?yàn)槊總€(gè)邏輯Keyed流可以獨(dú)立于其余任務(wù)進(jìn)行處理。 相同Key的所有元素將被發(fā)送到同一個(gè)任務(wù)或油。

在Non-Keyed流的情況下寞忿,原始流將不會(huì)被分成多個(gè)邏輯流,并且所有窗口邏輯將由單個(gè)任務(wù)執(zhí)行顶岸,即并行性為1腔彰。

窗口分類(lèi)

窗口分類(lèi)可以分成:翻滾窗口(Tumbling Window,無(wú)重疊)辖佣,滾動(dòng)窗口(Sliding Window霹抛,有重疊),和會(huì)話窗口凌简,(Session Window上炎,活動(dòng)間隙)

滾動(dòng)窗口

滾動(dòng)窗口分配器將每個(gè)元素分配給固定窗口大小的窗口。滾動(dòng)窗口大小固定的并且不重疊雏搂。例如藕施,如果指定大小為5分鐘的滾動(dòng)窗口,則將執(zhí)行當(dāng)前窗口凸郑,并且每五分鐘將啟動(dòng)一個(gè)新窗口裳食。

滑動(dòng)窗口

滑動(dòng)窗口與滾動(dòng)窗口的區(qū)別就是滑動(dòng)窗口有重復(fù)的計(jì)算部分。

滑動(dòng)窗口分配器將每個(gè)元素分配給固定窗口大小的窗口芙沥。類(lèi)似于滾動(dòng)窗口分配器诲祸,窗口的大小由窗口大小參數(shù)配置。另外一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口的啟動(dòng)頻率(how frequently a sliding window is started)而昨。因此救氯,如果滑動(dòng)大小小于窗口大小,滑動(dòng)窗可以重疊歌憨。在這種情況下着憨,元素被分配到多個(gè)窗口。

例如务嫡,你可以使用窗口大小為10分鐘的窗口甲抖,滑動(dòng)大小為5分鐘。這樣心铃,每5分鐘會(huì)生成一個(gè)窗口准谚,包含最后10分鐘內(nèi)到達(dá)的事件。

會(huì)話窗口

會(huì)話窗口分配器通過(guò)活動(dòng)會(huì)話分組元素去扣。與滾動(dòng)窗口和滑動(dòng)窗口相比柱衔,會(huì)話窗口不會(huì)重疊,也沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間。相反秀存,當(dāng)會(huì)話窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí)會(huì)關(guān)閉捶码。

例如,不活動(dòng)的間隙時(shí)或链。會(huì)話窗口分配器配置會(huì)話間隙惫恼,定義所需的不活動(dòng)時(shí)間長(zhǎng)度(defines how long is the required period of inactivity)。當(dāng)此時(shí)間段到期時(shí)澳盐,當(dāng)前會(huì)話關(guān)閉祈纯,后續(xù)元素被分配到新的會(huì)話窗口。

0x04 Flink中的時(shí)間概念

Flink在流處理程序支持不同的時(shí)間概念叼耙。分別為Event Time/Processing Time/Ingestion Time腕窥,也就是事件時(shí)間、處理時(shí)間筛婉、提取時(shí)間簇爆。

從時(shí)間序列角度來(lái)說(shuō),發(fā)生的先后順序是:

事件時(shí)間(Event Time)---->提取時(shí)間(Ingestion Time)---->處理時(shí)間(Processing Time)

Event Time 是事件在現(xiàn)實(shí)世界中發(fā)生的時(shí)間爽撒,它通常由事件中的時(shí)間戳描述入蛆。

Ingestion Time 是數(shù)據(jù)進(jìn)入Apache Flink流處理系統(tǒng)的時(shí)間,也就是Flink讀取數(shù)據(jù)源時(shí)間硕勿。

Processing Time 是數(shù)據(jù)流入到具體某個(gè)算子 (消息被計(jì)算處理) 時(shí)候相應(yīng)的系統(tǒng)時(shí)間哨毁。也就是Flink程序處理該事件時(shí)當(dāng)前系統(tǒng)時(shí)間。

但是我們講解時(shí)源武,會(huì)從后往前講解扼褪,把最重要的Event Time放在最后。

處理時(shí)間

是數(shù)據(jù)流入到具體某個(gè)算子時(shí)候相應(yīng)的系統(tǒng)時(shí)間粱栖。

這個(gè)系統(tǒng)時(shí)間指的是執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間话浇。當(dāng)一個(gè)流程序通過(guò)處理時(shí)間來(lái)運(yùn)行時(shí),所有基于時(shí)間的操作(如: 時(shí)間窗口)將使用各自操作所在的物理機(jī)的系統(tǒng)時(shí)間闹究。

ProcessingTime 有最好的性能和最低的延遲幔崖。但在分布式計(jì)算環(huán)境或者異步環(huán)境中,ProcessingTime具有不確定性跋核,相同數(shù)據(jù)流多次運(yùn)行有可能產(chǎn)生不同的計(jì)算結(jié)果。因?yàn)樗菀资艿綇挠涗浀竭_(dá)系統(tǒng)的速度(例如從消息隊(duì)列)到記錄在系統(tǒng)內(nèi)的operator之間流動(dòng)的速度的影響(停電叛买,調(diào)度或其他)砂代。

提取時(shí)間

IngestionTime是數(shù)據(jù)進(jìn)入Apache Flink框架的時(shí)間,是在Source Operator中設(shè)置的率挣。每個(gè)記錄將源的當(dāng)前時(shí)間作為時(shí)間戳刻伊,并且后續(xù)基于時(shí)間的操作(如時(shí)間窗口)引用該時(shí)間戳。

提取時(shí)間在概念上位于事件時(shí)間和處理時(shí)間之間。與處理時(shí)間相比捶箱,它稍早一些智什。IngestionTime與ProcessingTime相比可以提供更可預(yù)測(cè)的結(jié)果,因?yàn)镮ngestionTime的時(shí)間戳比較穩(wěn)定(在源處只記錄一次)丁屎,所以同一數(shù)據(jù)在流經(jīng)不同窗口操作時(shí)將使用相同的時(shí)間戳荠锭,而對(duì)于ProcessingTime同一數(shù)據(jù)在流經(jīng)不同窗口算子會(huì)有不同的處理時(shí)間戳。

與事件時(shí)間相比晨川,提取時(shí)間程序無(wú)法處理任何無(wú)序事件或后期數(shù)據(jù)证九,但程序不必指定如何生成水位線。

在內(nèi)部共虑,提取時(shí)間與事件時(shí)間非常相似愧怜,但具有自動(dòng)時(shí)間戳分配和自動(dòng)水位線生成功能。

事件時(shí)間

事件時(shí)間就是事件在真實(shí)世界的發(fā)生時(shí)間妈拌,即每個(gè)事件在產(chǎn)生它的設(shè)備上發(fā)生的時(shí)間(當(dāng)?shù)貢r(shí)間)拥坛。比如一個(gè)點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間尘分。

在進(jìn)入Apache Flink框架之前EventTime通常要嵌入到記錄中猜惋,并且EventTime也可以從記錄中提取出來(lái)。在實(shí)際的網(wǎng)上購(gòu)物訂單等業(yè)務(wù)場(chǎng)景中音诫,大多會(huì)使用EventTime來(lái)進(jìn)行數(shù)據(jù)計(jì)算惨奕。

基于事件時(shí)間處理的強(qiáng)大之處在于即使在亂序事件,延遲事件竭钝,歷史數(shù)據(jù)以及從備份或持久化日志中的重復(fù)數(shù)據(jù)也能獲得正確的結(jié)果梨撞。對(duì)于事件時(shí)間,時(shí)間的進(jìn)度取決于數(shù)據(jù)香罐,而不是任何時(shí)鐘卧波。

事件時(shí)間程序必須指定如何生成事件時(shí)間的Watermarks,這是表示事件時(shí)間進(jìn)度的機(jī)制庇茫。

現(xiàn)在假設(shè)我們正在創(chuàng)建一個(gè)排序的數(shù)據(jù)流港粱。這意味著應(yīng)用程序處理流中的亂序到達(dá)的事件,并生成同樣事件但按時(shí)間戳(事件時(shí)間)排序的新數(shù)據(jù)流旦签。

比如:

有1~10個(gè)事件查坪。亂序到達(dá)的序列是:1,2,4,5,6,3,8,9,10,7經(jīng)過(guò)按 事件時(shí)間 處理后的序列是:1,2,3,4,5,6,7,8,9,10

為了處理事件時(shí)間,F(xiàn)link需要知道事件的時(shí)間戳宁炫,這意味著流中的每條數(shù)據(jù)都需要分配其事件時(shí)間戳偿曙。這通常通過(guò)提取每條數(shù)據(jù)中的固定字段來(lái)完成時(shí)間戳的獲取。

設(shè)定時(shí)間特性

Flink DataStream 程序的第一部分通常是設(shè)置基本時(shí)間特性羔巢。 該設(shè)置定義了數(shù)據(jù)流源的行為方式(例如:它們是否將分配時(shí)間戳)望忆,以及像?KeyedStream.timeWindow(Time.seconds(30))?這樣的窗口操作應(yīng)該使用上面哪種時(shí)間概念罩阵。

比如:

final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

0x05 Watermark

前文講到了事件時(shí)間,這個(gè)真實(shí)發(fā)生的時(shí)間是我們業(yè)務(wù)在實(shí)時(shí)處理程序中非常關(guān)心的启摄。在一個(gè)理想的情況下稿壁,事件時(shí)間處理將產(chǎn)生完全一致和確定的結(jié)果,無(wú)論事件何時(shí)到達(dá)或其排序歉备。但是在現(xiàn)實(shí)中傅是,消息不在是按照順序發(fā)送,產(chǎn)生了亂序威创,這時(shí)候該怎么處理落午?

Watermark是Apache Flink為了處理EventTime 窗口計(jì)算提出的一種機(jī)制,本質(zhì)上也是一種時(shí)間戳。watermark是用于處理亂序事件或延遲數(shù)據(jù)的肚豺,這通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)(Watermarks用來(lái)觸發(fā)window窗口計(jì)算)溃斋。

比如對(duì)于late element,我們不能無(wú)限期的等下去吸申,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后梗劫,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制截碴,就是watermark梳侨。 可以把Watermark看作是一種告訴Flink一個(gè)消息延遲多少的方式。定義了什么時(shí)候不再等待更早的數(shù)據(jù)日丹。

1. 窗口觸發(fā)條件

上面談到了對(duì)數(shù)據(jù)亂序問(wèn)題的處理機(jī)制是watermark+window走哺,那么window什么時(shí)候該被觸發(fā)呢?

基于Event Time的事件處理哲虾,F(xiàn)link默認(rèn)的事件觸發(fā)條件為:

對(duì)于out-of-order及正常的數(shù)據(jù)而言

watermark的時(shí)間戳 > = window endTime

在 [window_start_time,window_end_time] 中有數(shù)據(jù)存在丙躏。

對(duì)于late element太多的數(shù)據(jù)而言

Event Time > watermark的時(shí)間戳

WaterMark相當(dāng)于一個(gè)EndLine,一旦Watermarks大于了某個(gè)window的end_time束凑,就意味著windows_end_time時(shí)間和WaterMark時(shí)間相同的窗口開(kāi)始計(jì)算執(zhí)行了晒旅。

就是說(shuō),我們根據(jù)一定規(guī)則汪诉,計(jì)算出Watermarks废恋,并且設(shè)置一些延遲,給遲到的數(shù)據(jù)一些機(jī)會(huì)扒寄,也就是說(shuō)正常來(lái)講鱼鼓,對(duì)于遲到的數(shù)據(jù),我只等你一段時(shí)間该编,再不來(lái)就沒(méi)有機(jī)會(huì)了迄本。

WaterMark時(shí)間可以用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間,也可以用處理數(shù)據(jù)所攜帶的Event time上渴。

使用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間岸梨,在并行和多線程中需要注意的問(wèn)題較少,因?yàn)槎际且袁F(xiàn)實(shí)時(shí)間為標(biāo)準(zhǔn)稠氮。

如果使用處理數(shù)據(jù)所攜帶的Event time作為WaterMark時(shí)間曹阔,需要注意兩點(diǎn):

因?yàn)閿?shù)據(jù)到達(dá)并不是循序的,注意保存一個(gè)當(dāng)前最大時(shí)間戳作為WaterMark時(shí)間

并行同步問(wèn)題

2. WaterMark設(shè)定方法

標(biāo)點(diǎn)水位線(Punctuated Watermark)

標(biāo)點(diǎn)水位線(Punctuated Watermark)通過(guò)數(shù)據(jù)流中某些特殊標(biāo)記事件來(lái)觸發(fā)新水位線的生成隔披。這種方式下窗口的觸發(fā)與時(shí)間無(wú)關(guān)赃份,而是決定于何時(shí)收到標(biāo)記事件。

在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力奢米,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成抓韩。

定期水位線(Periodic Watermark)

周期性的(允許一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。水位線提升的時(shí)間間隔是由用戶設(shè)置的鬓长,在兩次水位線提升時(shí)隔內(nèi)會(huì)有一部分消息流入谒拴,用戶可以根據(jù)這部分?jǐn)?shù)據(jù)來(lái)計(jì)算出新的水位線。

在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark涉波,否則在極端情況下會(huì)有很大的延時(shí)英上。

舉個(gè)例子,最簡(jiǎn)單的水位線算法就是取目前為止最大的事件時(shí)間啤覆,然而這種方式比較暴力苍日,對(duì)亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件窗声。

3. 遲到事件

雖說(shuō)水位線表明著早于它的事件不應(yīng)該再出現(xiàn)相恃,但是上如上文所講,接收到水位線以前的的消息是不可避免的笨觅,這就是所謂的遲到事件拦耐。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì)屋摇,導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉揩魂。

遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:

重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果炮温。

將遲到事件收集起來(lái)另外處理火脉。

將遲到事件視為錯(cuò)誤消息并丟棄。

Flink 默認(rèn)的處理方式是第3種直接丟棄柒啤,其他兩種方式分別使用Side Output和Allowed Lateness倦挂。

Side Output機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品担巩,以便用戶獲取并對(duì)其進(jìn)行特殊處理方援。

Allowed Lateness機(jī)制允許用戶設(shè)置一個(gè)允許的最大遲到時(shí)長(zhǎng)。Flink 會(huì)在窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過(guò)允許遲到時(shí)長(zhǎng)涛癌,這期間的遲到事件不會(huì)被丟棄犯戏,而是默認(rèn)會(huì)觸發(fā)窗口重新計(jì)算送火。因?yàn)楸4娲翱跔顟B(tài)需要額外內(nèi)存,并且如果窗口計(jì)算使用了?ProcessWindowFunction?API 還可能使得每個(gè)遲到事件觸發(fā)一次窗口的全量計(jì)算先匪,代價(jià)比較大种吸,所以允許遲到時(shí)長(zhǎng)不宜設(shè)得太長(zhǎng),遲到事件也不宜過(guò)多呀非,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法坚俗。

這里總結(jié)機(jī)制為:

窗口window 的作用是為了周期性的獲取數(shù)據(jù)。

watermark的作用是防止數(shù)據(jù)出現(xiàn)亂序(經(jīng)常)岸裙,事件時(shí)間內(nèi)獲取不到指定的全部數(shù)據(jù)猖败,而做的一種保險(xiǎn)方法。

allowLateNess是將窗口關(guān)閉時(shí)間再延遲一段時(shí)間降允。

sideOutPut是最后兜底操作恩闻,所有過(guò)期延遲數(shù)據(jù),指定窗口已經(jīng)徹底關(guān)閉了剧董,就會(huì)把數(shù)據(jù)放到側(cè)輸出流判呕。

4. 實(shí)例

采用系統(tǒng)時(shí)間做Watermark

我們將水位線設(shè)置為當(dāng)前系統(tǒng)時(shí)間間-5秒。

override defgetCurrentWatermark():Watermark={newWatermark(System.currentTimeMillis-5000)}

通常最好保持接收到的最大時(shí)間戳送滞,并創(chuàng)建具有最大預(yù)期延遲的水位線侠草,而不是從當(dāng)前系統(tǒng)時(shí)間減去。

采用Event Time做watermark

例如基于Event Time的數(shù)據(jù)犁嗅,自身都包含一個(gè)類(lèi)型為timestamp的字段边涕,假設(shè)叫做rowtime,例如1543903383(2018-12-04 14:03:03)褂微,定義一個(gè)基于rowtime列功蜓,策略為偏移3s的watermark,這條數(shù)據(jù)的水位線時(shí)間戳則是:

1543903383-3000=1543900383(2018-12-0414:03:00)

該條數(shù)據(jù)的水位線時(shí)間含義:timestamp小于1543900383(2018-12-04 14:03:00)的數(shù)據(jù)宠蚂,都已經(jīng)到達(dá)了式撼。

classBoundedOutOfOrdernessGeneratorextendsAssignerWithPeriodicWatermarks[MyEvent]{val maxOutOfOrderness=3000L;// 3 secondsvarcurrentMaxTimestamp:Long;override defextractTimestamp(element:MyEvent,previousElementTimestamp:Long):Long={val timestamp=element.getCreationTime()currentMaxTimestamp=max(timestamp,currentMaxTimestamp)timestamp;}override defgetCurrentWatermark():Watermark={// return the watermark as current highest timestamp minus the out-of-orderness boundnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}

看看如何觸發(fā)窗口

我們明白了窗口的觸發(fā)機(jī)制,這里我們添加了水位線求厕,到底是個(gè)怎么個(gè)情況著隆?我們來(lái)看下面

假如我們?cè)O(shè)置10s的時(shí)間窗口(window),那么0~10s呀癣,10~20s都是一個(gè)窗口美浦,以0~10s為例,0為start-time项栏,10為end-time浦辨。假如有4個(gè)數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們?cè)O(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒

當(dāng)A到達(dá)的時(shí)候沼沈,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會(huì)觸發(fā)計(jì)算

當(dāng)B到達(dá)的時(shí)候流酬,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算

當(dāng)C到達(dá)的時(shí)候币厕,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算

當(dāng)D到達(dá)的時(shí)候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計(jì)算

觸發(fā)計(jì)算的時(shí)候芽腾,會(huì)將A劈榨,C(因?yàn)樗麄兌夹∮?0)都計(jì)算進(jìn)去,其中C是遲到的晦嵌。

max這個(gè)很關(guān)鍵,就是當(dāng)前窗口內(nèi)拷姿,所有事件的最大事件惭载。

這里的延遲3.5s是我們假設(shè)一個(gè)數(shù)據(jù)到達(dá)的時(shí)候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了响巢,這個(gè)是需要根據(jù)經(jīng)驗(yàn)推算描滔。假設(shè)加入D到達(dá)以后有到達(dá)了一個(gè)E,event-time=6,但是由于0~10的時(shí)間窗口已經(jīng)開(kāi)始計(jì)算了踪古,所以E就丟了含长。

從這里上面E的丟失說(shuō)明,水位線也不是萬(wàn)能的伏穆,但是如果根據(jù)我們自己的生產(chǎn)經(jīng)驗(yàn)+側(cè)道輸出等方案拘泞,可以做到數(shù)據(jù)不丟失。

0x06 Flink源碼

數(shù)據(jù)結(jié)構(gòu)定義

在Flink DataStream中流動(dòng)著不同的元素枕扫,統(tǒng)稱為StreamElement陪腌,StreamElement可以是StreamRecord、Watermark烟瞧、StreamStatus诗鸭、LatencyMarker中任何一種類(lèi)型。

StreamElement

StreamElement是一個(gè)抽象類(lèi)(是Flink 承載消息的基類(lèi))参滴,其他四種類(lèi)型繼承StreamElement强岸。

publicabstractclassStreamElement{//判斷是否是Watermarkpublicfinal booleanisWatermark(){returngetClass()==Watermark.class;}//判斷是否為StreamStatuspublicfinal booleanisStreamStatus(){returngetClass()==StreamStatus.class;}//判斷是否為StreamRecordpublicfinal booleanisRecord(){returngetClass()==StreamRecord.class;}//判斷是否為L(zhǎng)atencyMarkerpublicfinal booleanisLatencyMarker(){returngetClass()==LatencyMarker.class;}//轉(zhuǎn)換為StreamRecordpublicfinal? StreamRecordasRecord(){return(StreamRecord)this;}//轉(zhuǎn)換為Watermarkpublicfinal WatermarkasWatermark(){return(Watermark)this;}//轉(zhuǎn)換為StreamStatuspublicfinal StreamStatusasStreamStatus(){return(StreamStatus)this;}//轉(zhuǎn)換為L(zhǎng)atencyMarkerpublicfinal LatencyMarkerasLatencyMarker(){return(LatencyMarker)this;}}

Watermark

Watermark繼承了StreamElement。Watermark 是和事件一個(gè)級(jí)別的抽象砾赔,其內(nèi)部包含一個(gè)成員變量時(shí)間戳timestamp蝌箍,標(biāo)識(shí)當(dāng)前數(shù)據(jù)的時(shí)間進(jìn)度。Watermark實(shí)際上作為數(shù)據(jù)流的一部分隨數(shù)據(jù)流流動(dòng)暴心。

@PublicEvolvingpublicfinalclassWatermarkextendsStreamElement{/*The watermark that signifies end-of-event-time. */publicstaticfinal WatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);/* The timestamp of the watermark in milliseconds. */privatefinal long timestamp;/* Creates a new watermark with the given timestamp in milliseconds.*/publicWatermarklong timestamp){this.timestamp=timestamp;}/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/publiclonggetTimestamp(){returntimestamp;}}

Flink如何生成&處理Watermark

在實(shí)際使用中大多數(shù)情況下會(huì)選擇周期性生成方式也就是AssignerWithPeriodicWatermarks方式.

//指定為evenTime時(shí)間語(yǔ)義env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//生成watermark的周期env.getConfig.setAutoWatermarkInterval(watermarkInterval)//指定方式dataStream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)){override defextractTimestamp(element:Element):Long=element.dT})

BoundedOutOfOrdernessTimestampExtractor 是Flink內(nèi)置提供的允許亂序最大延時(shí)的watermark生成方式十绑,只需要重寫(xiě)其extractTimestamp方法即可。

assignTimestampsAndWatermarks 可以理解為是一個(gè)算子轉(zhuǎn)換操作酷勺,等同于map/window一樣理解本橙,可以為其設(shè)置并行度、名稱脆诉,也是一個(gè)transformation/operator甚亭,

publicSingleOutputStreamOperatorassignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner){// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism=getTransformation().getParallelism();final AssignerWithPeriodicWatermarks cleanedAssigner=clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator operator=newTimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);returntransform("Timestamps/Watermarks",getTransformation().getOutputType(),operator).setParallelism(inputParallelism);}

其使用的StreamOperator類(lèi)型TimestampsAndPeriodicWatermarksOperator贷币,繼承了AbstractUdfStreamOperator,實(shí)現(xiàn)了OneInputStreamOperator接口與ProcessingTimeCallback接口亏狰,

TimestampsAndPeriodicWatermarksOperator役纹。

/**

* A stream operator that extracts timestamps from stream elements and

* generates periodic watermarks.

*

* @param? The type of the input elements

*/publicclassTimestampsAndPeriodicWatermarksOperatorextendsAbstractUdfStreamOperator>implementsOneInputStreamOperator,ProcessingTimeCallback{privatestaticfinal long serialVersionUID=1L;privatetransient long watermarkInterval;privatetransient long currentWatermark;publicTimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks assigner){super(assigner);this.chainingStrategy=ChainingStrategy.ALWAYS;}@Overridepublicvoidopen()throws Exception{super.open();//初始化默認(rèn)當(dāng)前watermarkcurrentWatermark=Long.MIN_VALUE;//生成watermark周期時(shí)間配置watermarkInterval=getExecutionConfig().getAutoWatermarkInterval();//注冊(cè)定時(shí)其配置if(watermarkInterval>0){long now=getProcessingTimeService().getCurrentProcessingTime();//注冊(cè)一個(gè)watermarkInterval后觸發(fā)的定時(shí)器,傳入回調(diào)參數(shù)是this暇唾,也就是會(huì)調(diào)用當(dāng)前對(duì)象的onProcessingTime方法getProcessingTimeService().registerTimer(now+watermarkInterval,this);}}@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{//提取當(dāng)前的事件時(shí)間final long newTimestamp=userFunction.extractTimestamp(element.getValue(),element.hasTimestamp()?element.getTimestamp():Long.MIN_VALUE);//保存當(dāng)前最大的事件時(shí)間促脉。output.collect(element.replace(element.getValue(),newTimestamp));}@OverridepublicvoidonProcessingTime(long timestamp)throws Exception{//此方法表示的就是定時(shí)回調(diào)的方法,將符合要求的watermark發(fā)送出去并且注冊(cè)下一個(gè)定時(shí)器策州。// register next timerWatermark newWatermark=userFunction.getCurrentWatermark();//當(dāng)新的watermark大于當(dāng)前的watermarkif(newWatermark!=null&&newWatermark.getTimestamp()>currentWatermark){currentWatermark=newWatermark.getTimestamp();//將符合要求的watermark發(fā)送出去// emit watermarkoutput.emitWatermark(newWatermark);}//注冊(cè)下一次觸發(fā)時(shí)間long now=getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now+watermarkInterval,this);}/**

? ? * Override the base implementation to completely ignore watermarks propagated from

? ? * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit

? ? * watermarks from here).

? ? */@OverridepublicvoidprocessWatermark(Watermark mark)throws Exception{//用來(lái)處理上游發(fā)送過(guò)來(lái)的watermark瘸味,可以認(rèn)為不做任何處理,下游的watermark只與其上游最近的生成方式相關(guān)够挂。// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif(mark.getTimestamp()==Long.MAX_VALUE&&currentWatermark!=Long.MAX_VALUE){currentWatermark=Long.MAX_VALUE;output.emitWatermark(mark);}}@Overridepublicvoidclose()throws Exception{super.close();// emit a final watermarkWatermark newWatermark=userFunction.getCurrentWatermark();if(newWatermark!=null&&newWatermark.getTimestamp()>currentWatermark){currentWatermark=newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}}}

Flink如何處理遲到數(shù)據(jù)

這里我們使用 Side Output機(jī)制來(lái)說(shuō)明旁仿。Side Output機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品孽糖,以便用戶獲取并對(duì)其進(jìn)行特殊處理枯冈。

生成新的Watermark

Flink會(huì)替換StreamRecord 對(duì)象中的Timestamp,如果 根據(jù)當(dāng)前事件的Timestamp 生成的Watermark 大于上一次的Watermark办悟,就發(fā)出新的Watermark尘奏。

具體代碼在 TimestampsAndPunctuatedWatermarksOperator.processElement。

@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{finalTvalue=element.getValue();// 調(diào)用 用戶實(shí)現(xiàn)的 extractTimestamp 獲取新的Timestampfinal long newTimestamp=userFunction.extractTimestamp(value,element.hasTimestamp()?element.getTimestamp():Long.MIN_VALUE);// 用新Timestamp 替換StreamRecord中的舊Timestampoutput.collect(element.replace(element.getValue(),newTimestamp));// 調(diào)用 用戶實(shí)現(xiàn)的 checkAndGetNextWatermark 方法獲取下一個(gè)Watermarkfinal Watermark nextWatermark=userFunction.checkAndGetNextWatermark(value,newTimestamp);// 如果下一個(gè)Watermark 大于當(dāng)前Watermark病蛉,就發(fā)出新的Watermarkif(nextWatermark!=null&&nextWatermark.getTimestamp()>currentWatermark){currentWatermark=nextWatermark.getTimestamp();output.emitWatermark(nextWatermark);}}

處理遲到數(shù)據(jù)

首先罪既,判斷是否是遲到數(shù)據(jù)。

@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{for(Wwindow:elementWindows){// drop if the window is already late// 如果窗口已經(jīng)遲到了铡恕,則處理下一條數(shù)據(jù)if(isWindowLate(window)){continue;}}......}/**

Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.

*/protectedbooleanisWindowLate(Wwindow){// 當(dāng)前機(jī)制是 事件時(shí)間 && 窗口元素的最大時(shí)間戳 + 允許遲到時(shí)間 <= 當(dāng)前水位線 的時(shí)候?yàn)閠rue(即當(dāng)前窗口元素遲到了)return(windowAssigner.isEventTime()&&(cleanupTime(window)<=internalTimerService.currentWatermark()));}/**

* Returns the cleanup time for a window, which is

* {@code window.maxTimestamp + allowedLateness}. In

* case this leads to a value greater than {@link Long#MAX_VALUE}

* then a cleanup time of {@link Long#MAX_VALUE} is

* returned.

*

* @param window the window whose cleanup time we are computing.

*/privatelongcleanupTime(Wwindow){if(windowAssigner.isEventTime()){long cleanupTime=window.maxTimestamp()+allowedLateness;//返回窗口的 cleanup 時(shí)間 : 窗口元素的最大時(shí)間戳 + 允許延遲的時(shí)間returncleanupTime>=window.maxTimestamp()?cleanupTime:Long.MAX_VALUE;}else{returnwindow.maxTimestamp();}}

其次琢感,處理遲到數(shù)據(jù)的具體代碼在WindowOperator.processElement 方法的最后一段。這里就是旁路輸出探熔。

@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{......// 其他操作......// side output input event if element not handled by any window? late arriving tag has been set// 如果沒(méi)有window處理過(guò)這條數(shù)據(jù)驹针,isSkippedElement = true,如果上面判斷為遲到數(shù)據(jù)诀艰,isSkippedElement = false// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif(isSkippedElement&&isElementLate(element)){if(lateDataOutputTag!=null){//旁路輸出//這就是我們之前提到的柬甥,F(xiàn)link 的 Side Output 機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品其垄,以便用戶獲取并對(duì)其進(jìn)行特殊處理苛蒲。sideOutput(element);}else{this.numLateRecordsDropped.inc();}}}/**

* Decide if a record is currently late, based on current watermark and allowed lateness.

* 當(dāng)前機(jī)制是 事件時(shí)間 && (元素時(shí)間戳 + 允許延遲的時(shí)間) <= 當(dāng)前水位線

* @param element The element to check

* @return The element for which should be considered when sideoutputs

*/protectedbooleanisElementLate(StreamRecord element){return(windowAssigner.isEventTime())&&(element.getTimestamp()+allowedLateness<=internalTimerService.currentWatermark());}/**

* Write skipped late arriving element to SideOutput.

* // 把數(shù)據(jù)輸出到旁路,供用戶決定如何處理绿满。

* @param element skipped late arriving element to side output

*/protectedvoidsideOutput(StreamRecord element){output.collect(lateDataOutputTag,element);}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末臂外,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌漏健,老刑警劉巖嚎货,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蔫浆,居然都是意外死亡殖属,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)瓦盛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)洗显,“玉大人,你說(shuō)我怎么就攤上這事原环∧铀簦” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵扮念,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我碧库,道長(zhǎng)柜与,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任嵌灰,我火速辦了婚禮弄匕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沽瞭。我一直安慰自己迁匠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布驹溃。 她就那樣靜靜地躺著城丧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪豌鹤。 梳的紋絲不亂的頭發(fā)上亡哄,一...
    開(kāi)封第一講書(shū)人閱讀 51,190評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音布疙,去河邊找鬼蚊惯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛灵临,可吹牛的內(nèi)容都是我干的截型。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼儒溉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼宦焦!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤赶诊,失蹤者是張志新(化名)和其女友劉穎笼平,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體舔痪,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡寓调,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锄码。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夺英。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖滋捶,靈堂內(nèi)的尸體忽然破棺而出痛悯,到底是詐尸還是另有隱情,我是刑警寧澤重窟,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布载萌,位于F島的核電站,受9級(jí)特大地震影響巡扇,放射性物質(zhì)發(fā)生泄漏扭仁。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一厅翔、第九天 我趴在偏房一處隱蔽的房頂上張望乖坠。 院中可真熱鬧,春花似錦刀闷、人聲如沸熊泵。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)顽分。三九已至,卻和暖如春施蜜,著一層夾襖步出監(jiān)牢的瞬間怯邪,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工花墩, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留悬秉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓冰蘑,卻偏偏與公主長(zhǎng)得像和泌,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子祠肥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容