背景
Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個呼聲非常高的特性,于1.2版本引入壶愤。主要目的是為了解決與外部系統(tǒng)交互時網(wǎng)絡(luò)延遲成為了系統(tǒng)瓶頸的問題。
流計算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進(jìn)行交互卫玖,比如需要查詢外部數(shù)據(jù)庫以關(guān)聯(lián)上用戶的額外信息公你。通常,我們的實現(xiàn)方式是向數(shù)據(jù)庫發(fā)送用戶a
的查詢請求假瞬,然后等待結(jié)果返回陕靠,在這之前,我們無法發(fā)送用戶b
的查詢請求脱茉。這是一種同步訪問的模式剪芥,如下圖左邊所示。
圖中棕色的長條表示等待時間琴许,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時間極大地阻礙了吞吐和延遲税肪。為了解決同步訪問的問題,異步模式可以并發(fā)地處理多個請求和回復(fù)榜田。也就是說益兄,你可以連續(xù)地向數(shù)據(jù)庫發(fā)送用戶a、b箭券、c等的請求净捅,與此同時,哪個請求的回復(fù)先返回了就處理哪個回復(fù)辩块,從而連續(xù)的請求之間不需要阻塞等待蛔六,如上圖右邊所示荆永。這也正是 Async I/O 的實現(xiàn)原理。
Async I/O
使用 Async I/O 的前提是需要一個支持異步請求的客戶端国章。當(dāng)然具钥,沒有異步請求客戶端的話也可以將同步客戶端丟到線程池中執(zhí)行作為異步客戶端。Flink 提供了非常簡潔的API液兽,讓用戶只需要關(guān)注業(yè)務(wù)邏輯骂删,一些臟活累活比如消息順序性和一致性保證都由框架處理了,多么棒的事情抵碟!
使用方式如下方代碼片段所示(來自官網(wǎng)文檔):
/** 'AsyncFunction' 的一個實現(xiàn)桃漾,向數(shù)據(jù)庫發(fā)送異步請求并設(shè)置回調(diào) */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** 可以異步請求的特定數(shù)據(jù)庫的客戶端 */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** future 的回調(diào)的執(zhí)行上下文(當(dāng)前線程) */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
// 發(fā)起一個異步請求坏匪,返回結(jié)果的 future
val resultFuture: Future[String] = client.query(str)
// 設(shè)置請求完成時的回調(diào): 將結(jié)果傳遞給 collector
resultFuture.onSuccess {
case result: String => asyncCollector.collect(Iterable((str, result)));
}
}
}
// 創(chuàng)建一個原始的流
val stream: DataStream[String] = ...
// 添加一個 async I/O 的轉(zhuǎn)換
val resultStream: DataStream[(String, String)] =
AsyncDataStream.(un)orderedWait(
stream, new AsyncDatabaseRequest(),
1000, TimeUnit.MILLISECONDS, // 超時時間
100) // 進(jìn)行中的異步請求的最大數(shù)量
AsyncDataStream 有兩個靜態(tài)方法拟逮,orderedWait
和 unorderedWait
,對應(yīng)了兩種輸出模式:有序和無序适滓。
- 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark )敦迄,也就是先進(jìn)先出。
- 無序:
在 ProcessingTime 的情況下凭迹,完全無序罚屋,先返回的結(jié)果先發(fā)送。
在 EventTime 的情況下嗅绸,watermark 不能超越消息脾猛,消息也不能超越 watermark,也就是說 watermark 定義的順序的邊界鱼鸠。在兩個 watermark 之間的消息的發(fā)送是無序的猛拴,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送。
原理實現(xiàn)
AsyncDataStream.(un)orderedWait
的主要工作就是創(chuàng)建了一個 AsyncWaitOperator
蚀狰。AsyncWaitOperator
是支持異步 IO 訪問的算子實現(xiàn)愉昆,該算子會運行 AsyncFunction
并處理異步返回的結(jié)果,其內(nèi)部原理如下圖所示麻蹋。
如圖所示跛溉,AsyncWaitOperator
主要由兩部分組成:StreamElementQueue
和 Emitter
。StreamElementQueue 是一個 Promise 隊列扮授,所謂 Promise 是一種異步抽象表示將來會有一個值(參考 Scala Promise 了解更多)芳室,這個隊列是未完成的 Promise 隊列,也就是進(jìn)行中的請求隊列刹勃。Emitter 是一個單獨的線程堪侯,負(fù)責(zé)發(fā)送消息(收到的異步回復(fù))給下游。
圖中E5表示進(jìn)入該算子的第五個元素(”Element-5”)深夯,在執(zhí)行過程中首先會將其包裝成一個 “Promise” P5抖格,然后將P5放入隊列诺苹。最后調(diào)用 AsyncFunction 的 ayncInvoke 方法,該方法會向外部服務(wù)發(fā)起一個異步的請求,并注冊回調(diào)。該回調(diào)會在異步請求成功返回時調(diào)用 AsyncCollector.collect 方法將返回的結(jié)果交給框架處理花颗。實際上 AsyncCollector 是一個 Promise 废赞,也就是 P5,在調(diào)用 collect 的時候會標(biāo)記 Promise 為完成狀態(tài)蔬啡,并通知 Emitter 線程有完成的消息可以發(fā)送了。Emitter 就會從隊列中拉取完成的 Promise ,并從 Promise 中取出消息發(fā)送給下游翩肌。
消息的順序性
上文提到 Async I/O 提供了兩種輸出模式。其實細(xì)分有三種模式: 有序禁悠,ProcessingTime
無序念祭,EventTime
無序。Flink 使用隊列來實現(xiàn)不同的輸出模式碍侦,并抽象出一個隊列的接口(StreamElementQueue
)粱坤,這種分層設(shè)計使得AsyncWaitOperator
和Emitter
不用關(guān)心消息的順序問題。StreamElementQueue
有兩種具體實現(xiàn)瓷产,分別是OrderedStreamElementQueue
和 UnorderedStreamElementQueue
站玄。UnorderedStreamElementQueue
比較有意思,它使用了一套邏輯巧妙地實現(xiàn)完全無序和 EventTime 無序濒旦。
有序
有序比較簡單株旷,使用一個隊列就能實現(xiàn)。所有新進(jìn)入該算子的元素(包括 watermark)尔邓,都會包裝成 Promise 并按到達(dá)順序放入該隊列晾剖。如下圖所示,盡管P4的結(jié)果先返回铃拇,但并不會發(fā)送钞瀑,只有 P1 (隊首)的結(jié)果返回了才會觸發(fā) Emitter 拉取隊首元素進(jìn)行發(fā)送。
ProcessingTime 無序
ProcessingTime 無序也比較簡單慷荔,因為沒有 watermark雕什,不需要協(xié)調(diào) watermark 與消息的順序性,所以使用兩個隊列就能實現(xiàn)显晶,一個 uncompletedQueue 一個 completedQueue贷岸。所有新進(jìn)入該算子的元素,同樣的包裝成 Promise 并放入 uncompletedQueue 隊列磷雇,當(dāng)uncompletedQueue隊列中任意的Promise返回了數(shù)據(jù)偿警,則將該 Promise 移到 completedQueue 隊列中,并通知 Emitter 消費唯笙。如下圖所示:
EventTime 無序
EventTime 無序類似于有序與 ProcessingTime 無序的結(jié)合體螟蒸。因為有 watermark盒使,需要協(xié)調(diào) watermark 與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的 Promise 變成了 Promise 集合七嫌。如果進(jìn)入算子的是消息元素少办,則會包裝成 Promise 放入隊尾的集合中。如果進(jìn)入算子的是 watermark诵原,也會包裝成 Promise 并放到一個獨立的集合中英妓,再將該集合加入到 uncompletedQueue 隊尾,最后再創(chuàng)建一個空集合加到 uncompletedQueue 隊尾绍赛。這樣蔓纠,watermark 就成了消息順序的邊界。只有處在隊首的集合中的 Promise 返回了數(shù)據(jù)吗蚌,才能將該 Promise 移到 completedQueue 隊列中腿倚,由 Emitter 消費發(fā)往下游。只有隊首集合空了褪测,才能處理第二個集合猴誊。這樣就保證了當(dāng)且僅當(dāng)某個 watermark 之前所有的消息都已經(jīng)被發(fā)送了,該 watermark 才能被發(fā)送侮措。過程如下圖所示:
快照與恢復(fù)
分布式快照機(jī)制是為了保證狀態(tài)的一致性。我們需要分析哪些狀態(tài)是需要快照的乖杠,哪些是不需要的分扎。首先,已經(jīng)完成回調(diào)并且已經(jīng)發(fā)往下游的元素是不需要快照的胧洒。否則畏吓,會導(dǎo)致重發(fā),那就不是 exactly-once 了卫漫。而已經(jīng)完成回調(diào)且未發(fā)往下游的元素菲饼,加上未完成回調(diào)的元素,就是上述隊列中的所有元素列赎。
所以快照的邏輯也非常簡單宏悦,(1)清空原有的狀態(tài)存儲,(2)遍歷隊列中的所有 Promise包吝,從中取出 StreamElement(消息或 watermark)并放入狀態(tài)存儲中饼煞,(3)執(zhí)行快照操作。
恢復(fù)的時候诗越,從快照中讀取所有的元素全部再處理一次砖瞧,當(dāng)然包括之前已完成回調(diào)的元素。所以在失敗恢復(fù)后嚷狞,會有元素重復(fù)請求外部服務(wù)块促,但是每個回調(diào)的結(jié)果只會被發(fā)往下游一次荣堰。
本文的原理和實現(xiàn)分析基于 Flink 1.3 版本。
轉(zhuǎn)載:
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/