Flink 異步I/O

1.為什么需要異步IO

flink在做實時處理時,有時候需要和外部數(shù)據(jù)交互免钻,但是通常情況下這個交互過程是同步的彼水,這樣就會產(chǎn)生大量的等待時間;而異步操作可以在單個函數(shù)實例中同時處理多個請求伯襟,并且同時接收相應(yīng)猿涨。這樣等待時間就平均分?jǐn)偟搅硕鄠€請求上,大大減少了請求的等待時長姆怪,可以提高實時處理的吞吐量叛赚。


flink異步io.png

2.使用flink異步IO的先決條件

  • 需要所連接的數(shù)據(jù)庫支持異步客戶端
  • 在沒有異步客戶端的情況下澡绩,可以通過創(chuàng)建多個客戶端并使用線程池處理同步調(diào)用來嘗試將同步客戶端轉(zhuǎn)變?yōu)橛邢薜牟l(fā)客戶端

3. flink異步IO的使用步驟

  • 實現(xiàn)AsyncFunction接口;
  • 一個回調(diào)俺附,該函數(shù)取回操作的結(jié)果肥卡,然后將結(jié)果傳遞給ResultFuture;
  • 在DataStream上應(yīng)用異步IO操作事镣。

4. 使用示例

import scala.concurrent._
import ExecutionContext.Implicits.global

/**
  * 使用scala并發(fā)包的Future模擬一個異步客戶端
  */
class DatabaseClient {
  def query: Future[Long] = Future {
    System.currentTimeMillis() / 1000
  }
}

/** 'AsyncFunction' 的一個實現(xiàn)步鉴,向數(shù)據(jù)庫發(fā)送異步請求并設(shè)置回調(diào) 
  * 改編自官網(wǎng)實例
  */

class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {

  /** The database specific client that can issue concurrent requests with callbacks */
  lazy val client: DatabaseClient = new DatabaseClient

  /** The context used for the future callbacks */
  implicit lazy val executor: ExecutionContext =
    ExecutionContext.fromExecutor(Executors.directExecutor())

  override def asyncInvoke(str: Int,
                           resultFuture: ResultFuture[(Int, Long)]): Unit = {

    // issue the asynchronous request, receive a future for the result
    val resultFutureRequested: Future[Long] = client.query

    // set the callback to be executed once the request by the client is complete
    // the callback simply forwards the result to the result future
    resultFutureRequested.onSuccess {
      case result: Long => resultFuture.complete(Iterable((str, result)))
    }
  }
}

object AsynchronousIOTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data: immutable.Seq[Int] = Range(1, 10)

    // 創(chuàng)建數(shù)據(jù)流
    val dataStream: DataStream[Int] = env.fromCollection(data)

     // 使用異步IO
     val asyn = AsyncDataStream.unorderedWait(
      dataStream,//執(zhí)行異步操作的DataStream
      new AsyncDatabaseRequest,//
      1000, TimeUnit.MILLISECONDS, //超時時間
      100 // 進行中的異步請求的最大數(shù)量
    )

    asyn.print()

    env.execute("AsynchronousIOTest")

  }

}
結(jié)果順序

AsyncDataStream 有兩個靜態(tài)方法,orderedWait 和 unorderedWait璃哟,對應(yīng)了兩種輸出模式:有序和無序氛琢。

  • 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark ),也就是先進先出随闪。
  • 無序:
    在 ProcessingTime 的情況下阳似,完全無序,先返回的結(jié)果先發(fā)送铐伴。
    在 EventTime 的情況下撮奏,watermark 不能超越消息,消息也不能超越 watermark当宴,也就是說 watermark 定義的順序的邊界畜吊。在兩個 watermark 之間的消息的發(fā)送是無序的,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送户矢。
超時處理

當(dāng)異步IO請求超時時玲献,默認(rèn)情況下會引發(fā)異常并重新啟動作業(yè)。如果要處理超時梯浪,可以重寫AsyncFunction#timeout方法青自。

  override def timeout(input: Int,
                       resultFuture: ResultFuture[(Int, Long)]): Unit =
    super.timeout(input, resultFuture)

容錯保證

異步IO運算符提供完全一次的容錯保證。它在檢查點中存儲正在進行的異步請求的記錄驱证,并在從故障中恢復(fù)時恢復(fù)/重新觸發(fā)請求。

其他資料

關(guān)于flink異步IO的更多信息可以參考flink官網(wǎng)或者Flink 原理與實現(xiàn):Aysnc I/O這篇文章恋腕。

Futures和事件驅(qū)動編程的知識可以參考《AKKA入門與實踐》這本書第二章的內(nèi)容:Actor與并發(fā)抹锄。


參考資料:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入門與實踐》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市荠藤,隨后出現(xiàn)的幾起案子伙单,更是在濱河造成了極大的恐慌,老刑警劉巖哈肖,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吻育,死亡現(xiàn)場離奇詭異,居然都是意外死亡淤井,警方通過查閱死者的電腦和手機布疼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進店門摊趾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人游两,你說我怎么就攤上這事砾层。” “怎么了贱案?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵肛炮,是天一觀的道長。 經(jīng)常有香客問我宝踪,道長侨糟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任瘩燥,我火速辦了婚禮秕重,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘颤芬。我一直安慰自己悲幅,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布站蝠。 她就那樣靜靜地躺著汰具,像睡著了一般。 火紅的嫁衣襯著肌膚如雪菱魔。 梳的紋絲不亂的頭發(fā)上留荔,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機與錄音澜倦,去河邊找鬼聚蝶。 笑死,一個胖子當(dāng)著我的面吹牛藻治,可吹牛的內(nèi)容都是我干的碘勉。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼桩卵,長吁一口氣:“原來是場噩夢啊……” “哼验靡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起雏节,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤胜嗓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后钩乍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體辞州,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年寥粹,在試婚紗的時候發(fā)現(xiàn)自己被綠了变过。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片埃元。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖牵啦,靈堂內(nèi)的尸體忽然破棺而出亚情,到底是詐尸還是另有隱情,我是刑警寧澤哈雏,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布楞件,位于F島的核電站,受9級特大地震影響裳瘪,放射性物質(zhì)發(fā)生泄漏土浸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一彭羹、第九天 我趴在偏房一處隱蔽的房頂上張望黄伊。 院中可真熱鬧,春花似錦派殷、人聲如沸还最。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拓轻。三九已至,卻和暖如春经伙,著一層夾襖步出監(jiān)牢的瞬間扶叉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工帕膜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留枣氧,地道東北人。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓垮刹,卻偏偏與公主長得像达吞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荒典,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容

  • 1 概述 流計算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進行交互宗挥,我們通常的做法如向數(shù)據(jù)庫發(fā)送用戶a的查詢請求,然后等待結(jié)果返回种蝶,...
    薛定諤的貓Plus閱讀 4,724評論 0 5
  • 背景 Async I/O 是阿里巴巴貢獻給社區(qū)的一個呼聲非常高的特性,于1.2版本引入瞒大。主要目的是為了解決與外部系...
    尼小摩閱讀 1,067評論 0 3
  • Flink總結(jié) Flink簡介 Apache Flink作為一款高吞吐量螃征、低延遲的針對流數(shù)據(jù)和批數(shù)據(jù)的分布式實時處...
    bigdata_er閱讀 10,603評論 0 10
  • 8:30準(zhǔn)備工作 拿早點 開窗 開燈 煲水 杯子 點心桌 輕音樂 8:45接孩子 提醒上廁所、洗手透敌、擦手盯滚、拿杯子喝...
    小藕家閱讀 456評論 0 0
  • 看了成為作家這本書 踢械,里面呢是介紹要用什么樣的心態(tài)去面對寫作,還有一些如何應(yīng)對自己在寫作中遇到的問題魄藕。 想要成為一...
    選妃閱讀 254評論 2 1