Flink 原理與實現(xiàn):Aysnc I/O

背景

Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個呼聲非常高的特性,于1.2版本引入壶愤。主要目的是為了解決與外部系統(tǒng)交互時網(wǎng)絡(luò)延遲成為了系統(tǒng)瓶頸的問題。

流計算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進(jìn)行交互卫玖,比如需要查詢外部數(shù)據(jù)庫以關(guān)聯(lián)上用戶的額外信息公你。通常,我們的實現(xiàn)方式是向數(shù)據(jù)庫發(fā)送用戶a的查詢請求假瞬,然后等待結(jié)果返回陕靠,在這之前,我們無法發(fā)送用戶b的查詢請求脱茉。這是一種同步訪問的模式剪芥,如下圖左邊所示。


圖中棕色的長條表示等待時間琴许,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時間極大地阻礙了吞吐和延遲税肪。為了解決同步訪問的問題,異步模式可以并發(fā)地處理多個請求和回復(fù)榜田。也就是說益兄,你可以連續(xù)地向數(shù)據(jù)庫發(fā)送用戶a、b箭券、c等的請求净捅,與此同時,哪個請求的回復(fù)先返回了就處理哪個回復(fù)辩块,從而連續(xù)的請求之間不需要阻塞等待蛔六,如上圖右邊所示荆永。這也正是 Async I/O 的實現(xiàn)原理。

Async I/O

使用 Async I/O 的前提是需要一個支持異步請求的客戶端国章。當(dāng)然具钥,沒有異步請求客戶端的話也可以將同步客戶端丟到線程池中執(zhí)行作為異步客戶端。Flink 提供了非常簡潔的API液兽,讓用戶只需要關(guān)注業(yè)務(wù)邏輯骂删,一些臟活累活比如消息順序性和一致性保證都由框架處理了,多么棒的事情抵碟!

使用方式如下方代碼片段所示(來自官網(wǎng)文檔):

/** 'AsyncFunction' 的一個實現(xiàn)桃漾,向數(shù)據(jù)庫發(fā)送異步請求并設(shè)置回調(diào) */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** 可以異步請求的特定數(shù)據(jù)庫的客戶端 */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** future 的回調(diào)的執(zhí)行上下文(當(dāng)前線程) */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())

    override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {

        // 發(fā)起一個異步請求坏匪,返回結(jié)果的 future
        val resultFuture: Future[String] = client.query(str)

        // 設(shè)置請求完成時的回調(diào): 將結(jié)果傳遞給 collector
        resultFuture.onSuccess {
            case result: String => asyncCollector.collect(Iterable((str, result)));
        }
    }
}

// 創(chuàng)建一個原始的流
val stream: DataStream[String] = ...

// 添加一個 async I/O 的轉(zhuǎn)換
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.(un)orderedWait(
      stream, new AsyncDatabaseRequest(),
      1000, TimeUnit.MILLISECONDS, // 超時時間
      100)  // 進(jìn)行中的異步請求的最大數(shù)量

AsyncDataStream 有兩個靜態(tài)方法拟逮,orderedWaitunorderedWait,對應(yīng)了兩種輸出模式:有序和無序适滓。

  • 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark )敦迄,也就是先進(jìn)先出。
  • 無序:
    在 ProcessingTime 的情況下凭迹,完全無序罚屋,先返回的結(jié)果先發(fā)送。
    在 EventTime 的情況下嗅绸,watermark 不能超越消息脾猛,消息也不能超越 watermark,也就是說 watermark 定義的順序的邊界鱼鸠。在兩個 watermark 之間的消息的發(fā)送是無序的猛拴,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送。

原理實現(xiàn)

AsyncDataStream.(un)orderedWait 的主要工作就是創(chuàng)建了一個 AsyncWaitOperator蚀狰。AsyncWaitOperator 是支持異步 IO 訪問的算子實現(xiàn)愉昆,該算子會運行 AsyncFunction 并處理異步返回的結(jié)果,其內(nèi)部原理如下圖所示麻蹋。

如圖所示跛溉,AsyncWaitOperator 主要由兩部分組成:StreamElementQueueEmitter。StreamElementQueue 是一個 Promise 隊列扮授,所謂 Promise 是一種異步抽象表示將來會有一個值(參考 Scala Promise 了解更多)芳室,這個隊列是未完成的 Promise 隊列,也就是進(jìn)行中的請求隊列刹勃。Emitter 是一個單獨的線程堪侯,負(fù)責(zé)發(fā)送消息(收到的異步回復(fù))給下游。

圖中E5表示進(jìn)入該算子的第五個元素(”Element-5”)深夯,在執(zhí)行過程中首先會將其包裝成一個 “Promise” P5抖格,然后將P5放入隊列诺苹。最后調(diào)用 AsyncFunction 的 ayncInvoke 方法,該方法會向外部服務(wù)發(fā)起一個異步的請求,并注冊回調(diào)。該回調(diào)會在異步請求成功返回時調(diào)用 AsyncCollector.collect 方法將返回的結(jié)果交給框架處理花颗。實際上 AsyncCollector 是一個 Promise 废赞,也就是 P5,在調(diào)用 collect 的時候會標(biāo)記 Promise 為完成狀態(tài)蔬啡,并通知 Emitter 線程有完成的消息可以發(fā)送了。Emitter 就會從隊列中拉取完成的 Promise ,并從 Promise 中取出消息發(fā)送給下游翩肌。

消息的順序性

上文提到 Async I/O 提供了兩種輸出模式。其實細(xì)分有三種模式: 有序禁悠,ProcessingTime無序念祭,EventTime無序。Flink 使用隊列來實現(xiàn)不同的輸出模式碍侦,并抽象出一個隊列的接口(StreamElementQueue)粱坤,這種分層設(shè)計使得AsyncWaitOperatorEmitter不用關(guān)心消息的順序問題。StreamElementQueue有兩種具體實現(xiàn)瓷产,分別是OrderedStreamElementQueueUnorderedStreamElementQueue站玄。UnorderedStreamElementQueue 比較有意思,它使用了一套邏輯巧妙地實現(xiàn)完全無序和 EventTime 無序濒旦。

有序

有序比較簡單株旷,使用一個隊列就能實現(xiàn)。所有新進(jìn)入該算子的元素(包括 watermark)尔邓,都會包裝成 Promise 并按到達(dá)順序放入該隊列晾剖。如下圖所示,盡管P4的結(jié)果先返回铃拇,但并不會發(fā)送钞瀑,只有 P1 (隊首)的結(jié)果返回了才會觸發(fā) Emitter 拉取隊首元素進(jìn)行發(fā)送。


ProcessingTime 無序

ProcessingTime 無序也比較簡單慷荔,因為沒有 watermark雕什,不需要協(xié)調(diào) watermark 與消息的順序性,所以使用兩個隊列就能實現(xiàn)显晶,一個 uncompletedQueue 一個 completedQueue贷岸。所有新進(jìn)入該算子的元素,同樣的包裝成 Promise 并放入 uncompletedQueue 隊列磷雇,當(dāng)uncompletedQueue隊列中任意的Promise返回了數(shù)據(jù)偿警,則將該 Promise 移到 completedQueue 隊列中,并通知 Emitter 消費唯笙。如下圖所示:


EventTime 無序

EventTime 無序類似于有序與 ProcessingTime 無序的結(jié)合體螟蒸。因為有 watermark盒使,需要協(xié)調(diào) watermark 與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的 Promise 變成了 Promise 集合七嫌。如果進(jìn)入算子的是消息元素少办,則會包裝成 Promise 放入隊尾的集合中。如果進(jìn)入算子的是 watermark诵原,也會包裝成 Promise 并放到一個獨立的集合中英妓,再將該集合加入到 uncompletedQueue 隊尾,最后再創(chuàng)建一個空集合加到 uncompletedQueue 隊尾绍赛。這樣蔓纠,watermark 就成了消息順序的邊界。只有處在隊首的集合中的 Promise 返回了數(shù)據(jù)吗蚌,才能將該 Promise 移到 completedQueue 隊列中腿倚,由 Emitter 消費發(fā)往下游。只有隊首集合空了褪测,才能處理第二個集合猴誊。這樣就保證了當(dāng)且僅當(dāng)某個 watermark 之前所有的消息都已經(jīng)被發(fā)送了,該 watermark 才能被發(fā)送侮措。過程如下圖所示:

快照與恢復(fù)

分布式快照機(jī)制是為了保證狀態(tài)的一致性。我們需要分析哪些狀態(tài)是需要快照的乖杠,哪些是不需要的分扎。首先,已經(jīng)完成回調(diào)并且已經(jīng)發(fā)往下游的元素是不需要快照的胧洒。否則畏吓,會導(dǎo)致重發(fā),那就不是 exactly-once 了卫漫。而已經(jīng)完成回調(diào)且未發(fā)往下游的元素菲饼,加上未完成回調(diào)的元素,就是上述隊列中的所有元素列赎。

所以快照的邏輯也非常簡單宏悦,(1)清空原有的狀態(tài)存儲,(2)遍歷隊列中的所有 Promise包吝,從中取出 StreamElement(消息或 watermark)并放入狀態(tài)存儲中饼煞,(3)執(zhí)行快照操作。

恢復(fù)的時候诗越,從快照中讀取所有的元素全部再處理一次砖瞧,當(dāng)然包括之前已完成回調(diào)的元素。所以在失敗恢復(fù)后嚷狞,會有元素重復(fù)請求外部服務(wù)块促,但是每個回調(diào)的結(jié)果只會被發(fā)往下游一次荣堰。

本文的原理和實現(xiàn)分析基于 Flink 1.3 版本。

轉(zhuǎn)載:
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末竭翠,一起剝皮案震驚了整個濱河市持隧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逃片,老刑警劉巖屡拨,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異褥实,居然都是意外死亡呀狼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門损离,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哥艇,“玉大人,你說我怎么就攤上這事僻澎∶蔡ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵窟勃,是天一觀的道長祖乳。 經(jīng)常有香客問我,道長秉氧,這世上最難降的妖魔是什么眷昆? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮汁咏,結(jié)果婚禮上亚斋,老公的妹妹穿的比我還像新娘。我一直安慰自己攘滩,他們只是感情好帅刊,可當(dāng)我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著漂问,像睡著了一般赖瞒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上级解,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天冒黑,我揣著相機(jī)與錄音,去河邊找鬼勤哗。 笑死抡爹,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的芒划。 我是一名探鬼主播冬竟,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼欧穴,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了泵殴?” 一聲冷哼從身側(cè)響起涮帘,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎笑诅,沒想到半個月后调缨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡吆你,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年弦叶,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妇多。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡伤哺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出者祖,到底是詐尸還是另有隱情立莉,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布七问,位于F島的核電站蜓耻,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏烂瘫。R本人自食惡果不足惜媒熊,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坟比。 院中可真熱鬧,春花似錦嚷往、人聲如沸葛账。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽籍琳。三九已至,卻和暖如春贷祈,著一層夾襖步出監(jiān)牢的瞬間趋急,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工势誊, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留呜达,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓粟耻,卻偏偏與公主長得像查近,于是被迫代替她去往敵國和親眉踱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容