1.為什么需要異步IO
flink在做實時處理時,有時候需要和外部數(shù)據(jù)交互免钻,但是通常情況下這個交互過程是同步的彼水,這樣就會產(chǎn)生大量的等待時間;而異步操作可以在單個函數(shù)實例中同時處理多個請求伯襟,并且同時接收相應(yīng)猿涨。這樣等待時間就平均分?jǐn)偟搅硕鄠€請求上,大大減少了請求的等待時長姆怪,可以提高實時處理的吞吐量叛赚。
flink異步io.png
2.使用flink異步IO的先決條件
- 需要所連接的數(shù)據(jù)庫支持異步客戶端
- 在沒有異步客戶端的情況下澡绩,可以通過創(chuàng)建多個客戶端并使用線程池處理同步調(diào)用來嘗試將同步客戶端轉(zhuǎn)變?yōu)橛邢薜牟l(fā)客戶端
3. flink異步IO的使用步驟
- 實現(xiàn)AsyncFunction接口;
- 一個回調(diào)俺附,該函數(shù)取回操作的結(jié)果肥卡,然后將結(jié)果傳遞給ResultFuture;
- 在DataStream上應(yīng)用異步IO操作事镣。
4. 使用示例
import scala.concurrent._
import ExecutionContext.Implicits.global
/**
* 使用scala并發(fā)包的Future模擬一個異步客戶端
*/
class DatabaseClient {
def query: Future[Long] = Future {
System.currentTimeMillis() / 1000
}
}
/** 'AsyncFunction' 的一個實現(xiàn)步鉴,向數(shù)據(jù)庫發(fā)送異步請求并設(shè)置回調(diào)
* 改編自官網(wǎng)實例
*/
class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit = {
// issue the asynchronous request, receive a future for the result
val resultFutureRequested: Future[Long] = client.query
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
resultFutureRequested.onSuccess {
case result: Long => resultFuture.complete(Iterable((str, result)))
}
}
}
object AsynchronousIOTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: immutable.Seq[Int] = Range(1, 10)
// 創(chuàng)建數(shù)據(jù)流
val dataStream: DataStream[Int] = env.fromCollection(data)
// 使用異步IO
val asyn = AsyncDataStream.unorderedWait(
dataStream,//執(zhí)行異步操作的DataStream
new AsyncDatabaseRequest,//
1000, TimeUnit.MILLISECONDS, //超時時間
100 // 進行中的異步請求的最大數(shù)量
)
asyn.print()
env.execute("AsynchronousIOTest")
}
}
結(jié)果順序
AsyncDataStream 有兩個靜態(tài)方法,orderedWait 和 unorderedWait璃哟,對應(yīng)了兩種輸出模式:有序和無序氛琢。
- 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark ),也就是先進先出随闪。
- 無序:
在 ProcessingTime 的情況下阳似,完全無序,先返回的結(jié)果先發(fā)送铐伴。
在 EventTime 的情況下撮奏,watermark 不能超越消息,消息也不能超越 watermark当宴,也就是說 watermark 定義的順序的邊界畜吊。在兩個 watermark 之間的消息的發(fā)送是無序的,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送户矢。
超時處理
當(dāng)異步IO請求超時時玲献,默認(rèn)情況下會引發(fā)異常并重新啟動作業(yè)。如果要處理超時梯浪,可以重寫AsyncFunction#timeout方法青自。
override def timeout(input: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit =
super.timeout(input, resultFuture)
容錯保證
異步IO運算符提供完全一次的容錯保證。它在檢查點中存儲正在進行的異步請求的記錄驱证,并在從故障中恢復(fù)時恢復(fù)/重新觸發(fā)請求。
其他資料
關(guān)于flink異步IO的更多信息可以參考flink官網(wǎng)或者Flink 原理與實現(xiàn):Aysnc I/O這篇文章恋腕。
Futures和事件驅(qū)動編程的知識可以參考《AKKA入門與實踐》這本書第二章的內(nèi)容:Actor與并發(fā)抹锄。
參考資料:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入門與實踐》