Flink處理數(shù)據(jù)時(shí)候,遇到比較耗時(shí)的操作時(shí)鸥诽,需要異步處理數(shù)據(jù)汰瘫。
例子如下:
DataStream<Order> asyncStream = AsyncDataStream.unorderedWait(orderStream, new RichAsyncFunction<Order, Order>() {
public transient ThreadPoolExecutor executor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor = new ThreadPoolExecutor(5, //
10, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());//
}
@Override
public void close() throws Exception {
super.close();
executor.shutdownNow();
}
@Override
public void timeout(SkuOrder input, ResultFuture<SkuOrder> resultFuture) {
//超時(shí)后的處理
}
@Override
public void asyncInvoke(Order input, ResultFuture<Order> resultFuture) throws Exception {
CompletableFuture.runAsync( ()->{
int mills = new Random().nextInt(10);
System.out.println("異步處理數(shù)據(jù):" + Thread.currentThread().getId() + "|" + JSON.toJSONString(input));
try {
TimeUnit.SECONDS.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultFuture.complete(Collections.singleton(input));
},executor);
}
},1, TimeUnit.MINUTES, 1000).setParallelism(1);
說(shuō)明:
1、AsyncDataStream有2個(gè)方法,unorderedWait表示數(shù)據(jù)不需要關(guān)注順序诬辈,處理完立即發(fā)送酵使,orderedWait表示數(shù)據(jù)需要關(guān)注順序,為了實(shí)現(xiàn)該目標(biāo)焙糟,操作算子會(huì)在該結(jié)果記錄之前的記錄為發(fā)送之前緩存該記錄口渔。這往往會(huì)引入額外的延遲和一些Checkpoint負(fù)載,因?yàn)橄啾扔跓o(wú)序模式結(jié)果記錄會(huì)保存在Checkpoint狀態(tài)內(nèi)部較長(zhǎng)的時(shí)間穿撮。
2缺脉、Timeout配置,主要是為了處理死掉或者失敗的任務(wù)悦穿,防止資源被長(zhǎng)期阻塞占用攻礼。
3、最后一個(gè)參數(shù)Capacity表示同時(shí)最多有多少個(gè)異步請(qǐng)求在處理栗柒,異步IO的方式會(huì)導(dǎo)致更高的吞吐量礁扮,但是對(duì)于實(shí)時(shí)應(yīng)用來(lái)說(shuō)該操作也是一個(gè)瓶頸。限制并發(fā)請(qǐng)求數(shù)瞬沦,算子不會(huì)積壓過(guò)多的未處理請(qǐng)求太伊,但是一旦超過(guò)容量的顯示會(huì)觸發(fā)背壓。
該參數(shù)可以不配置蛙埂,但是默認(rèn)是100倦畅。