flink 1.7
注意:可查詢狀態(tài)的客戶端API當(dāng)前處于不斷發(fā)展的狀態(tài)努咐,并且不保證所提供接口的穩(wěn)定性。在即將推出的Flink版本中,客戶端可能會發(fā)生重大的API更改。
簡而言之超营,此功能將Flink的托管鍵控(分區(qū))狀態(tài)(請參閱使用狀態(tài))暴露給外部世界,并允許用戶從Flink外部查詢作業(yè)的狀態(tài)阅虫。對于某些情況演闭,可查詢狀態(tài)消除了對外部系統(tǒng)(例如鍵值存儲)的分布式操作/事務(wù)的需要,這通常是實(shí)踐中的瓶頸颓帝。此外米碰,此功能對于調(diào)試目的可能特別有用
注意:查詢狀態(tài)對象時,無需任何同步或復(fù)制即可從并發(fā)線程訪問該對象购城。這是一種設(shè)計(jì)選擇吕座,因?yàn)樯鲜鋈魏我环N都會導(dǎo)致增加的作業(yè)延遲,我們希望避免這種情況工猜。由于任何狀態(tài)后端使用Java堆空間米诉,例如MemoryStateBackend或FsStateBackend在檢索值時不能與副本一起使用,而是直接引用存儲的值篷帅,讀取 - 修改 - 寫入模式是不安全的史侣,并且可能導(dǎo)致可查詢狀態(tài)服務(wù)器由于并發(fā)修改而失敗。 RocksDBStateBackend可以避免這些問題魏身。
架構(gòu)
在展示使用狀態(tài)查詢前惊橱,我們先說明狀態(tài)查詢的實(shí)體組成部分,這是時分必要的箭昵。狀態(tài)查詢由 3部分實(shí)體組成
- QueryableStateClient税朴,運(yùn)行在flink集群之外,提交用戶查詢
- QueryableStateClientProxy家制, 運(yùn)行在每個TaskManager中(即FLink集群中)正林,它負(fù)責(zé)接收client的查詢,代表client向TaskManager拉取狀態(tài)信息颤殴,并將結(jié)果返回client
- QueryableStateServer 觅廓, 它運(yùn)行在TaskManager,負(fù)責(zé)服務(wù)本地狀態(tài)存儲
激活狀態(tài)查詢
在Flink集群開啟狀態(tài)查詢涵但,你只需將flink-queryable-state-runtime_2.11-1.7.1.jar從Flink的opt文件夾復(fù)制到lib文件夾杈绸。否則帖蔓,狀態(tài)查詢時不可用的。
通過檢查task manager日志中是否有"Started the Queryable State Proxy Server @ ..."
判斷狀態(tài)查詢是否開啟成功
使?fàn)顟B(tài)查詢可見
你已經(jīng)成功激活狀態(tài)查詢瞳脓,在使用之前塑娇,為使?fàn)顟B(tài)對外可見,需要明確一下兩點(diǎn):
- QueryableStateStream劫侧, 一個便利對象埋酬,充當(dāng)接收器并將其傳入值作為可查詢狀態(tài)提供
- stateDescriptor.setQueryable(String queryableStateName)方法,使得由狀態(tài)描述符表示的鍵控狀態(tài)是可查詢的
以下介紹如何使用這兩點(diǎn)
可查詢狀態(tài)流
在KeyedStream上調(diào)用.asQueryableState(stateName板辽,stateDescriptor)會返回一個QueryableStateStream奇瘦,它將其值提供為可查詢狀態(tài)。根據(jù)狀態(tài)的類型劲弦,asQueryableState()方法有以下變體:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
沒有可查詢的ListState接收器耳标,因?yàn)樗鼤?dǎo)致不斷增長的列表,這些列表可能無法清理邑跪,因此最終會消耗太多內(nèi)存
返回的QueryableStateStream可以看作是接收器次坡,無法進(jìn)一步轉(zhuǎn)換。在內(nèi)部画畅,QueryableStateStream被轉(zhuǎn)換為運(yùn)算符砸琅,該運(yùn)算符使用所有傳入記錄來更新可查詢狀態(tài)實(shí)例。更新邏輯由asQueryableState調(diào)用中提供的StateDescriptor的類型暗示轴踱。在如下所示的程序中症脂,鍵控流的所有記錄將用于通過ValueState.update(value)更新狀態(tài)實(shí)例
stream.keyBy(0).asQueryableState("query-name")
管理監(jiān)控狀態(tài)
通過StateDescriptor.setQueryable(String queryableStateName)查詢適當(dāng)?shù)臓顟B(tài)描述符,可以使運(yùn)算符的托管鍵控狀態(tài)(請參閱使用托管鍵控狀態(tài))可查詢淫僻,如下例所示
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name
queryableStateName參數(shù)可以任意選擇诱篷,僅用于查詢。它不必與state自己的名字相同雳灵。
該變體對于哪種類型的狀態(tài)可以被查詢沒有限制棕所。這意味著它可以用于任何ValueState,ReduceState悯辙,ListState琳省,MapState,AggregatingState和當(dāng)前不推薦使用的FoldingState躲撰。
狀態(tài)查詢
現(xiàn)在针贬,你已經(jīng)設(shè)置集群為可查詢狀態(tài)并聲明了可查詢的狀態(tài)。是時間去了解如何查詢了拢蛋。
狀態(tài)查詢需要用到QueryableStateClient 幫助類坚踩,它位于flink-queryable-state-client jar包中,你需要顯示的在pom中聲明引用瓤狐,并且它與flink core是相互獨(dú)立的瞬铸,如下
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java_2.11</artifactId>
<version>1.7.1</version>
</dependency>
你可以閱讀配置Flink程序
來了解更多的項(xiàng)目配置。
QueryableStateClient 向內(nèi)部代理發(fā)送查詢請求础锐,代理對象處理查詢請求并將結(jié)果返回嗓节。
client唯一需要初始化的是提供一個合法的的TaskManager 主機(jī)名和代理對象監(jiān)聽的端口號(記著呻粹,TaskManager運(yùn)行著狀態(tài)可查詢的代理)灭美。更多的代理配置及服務(wù)端口號請查看Configuration Section
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
在客戶端準(zhǔn)備好的情況下,要查詢與類型K的鍵關(guān)聯(lián)的類型V的狀態(tài)垮刹,您可以使用該方法
CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeInformation<K> keyTypeInfo,
StateDescriptor<S, V> stateDescriptor)
上面返回一個CompletableFuture信姓,最終保存由具有ID jobID的作業(yè)的queryableStateName標(biāo)識的可查詢狀態(tài)實(shí)例的狀態(tài)值鸵隧。key是你所感興趣的狀態(tài)值,keyTypeInfo 會告知Flink序列化它的方法意推。最后豆瘫,stateDescriptor包含有關(guān)請求狀態(tài)的必要信息,即其類型(Value菊值,Reduce等)以及有關(guān)如何序列化/反序列化它的必要信息外驱。
細(xì)心的讀者會 注意到返回的future包含一個S類值,即一個包含實(shí)際值的State對象腻窒。這可以是Flink支持的任何狀態(tài)類型:ValueState昵宇,ReduceState,ListState儿子,MapState瓦哎,AggregatingState和當(dāng)前不推薦使用的FoldingState。
這些狀態(tài)對象不允許修改包含的狀態(tài)柔逼。您可以使用它們來獲取狀態(tài)的實(shí)際值蒋譬,例如使用valueState.get(),或迭代所包含的<K卒落,V>條目羡铲,例如使用mapState.entries(),但您無法修改它們儡毕。例如也切,在返回的列表狀態(tài)上調(diào)用add()方法將拋出UnsupportedOperationException
客戶端是異步的,可以由多個線程共享腰湾。它需要在未使用時通過QueryableStateClient.shutdown()關(guān)閉以釋放資源雷恃。
例子
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
在作業(yè)中使用后,您可以檢索作業(yè)ID费坊,然后從該運(yùn)算符查詢?nèi)魏捂I的當(dāng)前狀態(tài)
// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
// now handle the returned value
resultFuture.thenAccept(response -> {
try {
Tuple2<Long, Long> res = response.get();
} catch (Exception e) {
e.printStackTrace();
}
});
配置
QueryableStateOptions 定義了影響狀態(tài)查詢服務(wù)和客戶端行為的配置參數(shù)
服務(wù)狀態(tài)
- query.server.ports: 可查詢狀態(tài)服務(wù)器的服務(wù)器端口范圍倒槐。如果有多個任務(wù)管理器在同一臺機(jī)器上運(yùn)行,這對于避免端口沖突很有用附井。指定的范圍可以是:端口:“9123”讨越,一系列端口:“50100-50200”两残,或范圍和/或點(diǎn)列表:“50100-50200,50300-50400,51234”。默認(rèn)端口為9067把跨。
- query.server.network-threads: 接收狀態(tài)服務(wù)器傳入請求的網(wǎng)絡(luò)(事件循環(huán))線程數(shù)(0 => #slots)
- query.server.query-threads: 處理/服務(wù)狀態(tài)服務(wù)器的傳入請求的線程數(shù)(0 => #slots)
代理
- query.proxy.ports: 可查詢狀態(tài)代理的服務(wù)器端口范圍人弓。如果有多個任務(wù)管理器在同一臺機(jī)器上運(yùn)行,這對于避免端口沖突很有用着逐。指定的范圍可以是:端口:“9123”崔赌,一系列端口:“50100-50200”,或范圍和/或點(diǎn)列表:“50100-50200,50300-50400,51234”耸别。默認(rèn)端口為9069健芭。
- query.proxy.network-threads: 接收客戶端代理的傳入請求的網(wǎng)絡(luò)(事件循環(huán))線程數(shù)(0 => #slots)
- query.proxy.query-threads: 處理/服務(wù)客戶端代理的傳入請求的線程數(shù)(0 => #slots)
限制
- 可查詢狀態(tài)生命周期與作業(yè)的生命周期綁定,例如秀姐,任務(wù)在啟動時注冊可查詢狀態(tài)慈迈,并在處理時取消注冊。在將來的版本中囊扳,需要將其解耦以便在任務(wù)完成后允許查詢吩翻,并通過狀態(tài)復(fù)制加速恢復(fù)
- 關(guān)于可用KvState的通知是通過一個簡單的告訴發(fā)生的。在未來锥咸,應(yīng)該通過詢問和確認(rèn)來改進(jìn)這一點(diǎn)
- 服務(wù)器和客戶端會跟蹤查詢的統(tǒng)計(jì)信息狭瞎。默認(rèn)情況下,這些功能目前處于禁用狀態(tài)搏予,因?yàn)樗麄儾荒茉偃魏蔚胤綄ν獗┞缎芏В灰懈玫闹С滞ㄟ^Metrics系統(tǒng)發(fā)布這些數(shù)字,我們就應(yīng)該啟用統(tǒng)計(jì)數(shù)據(jù)