譯:Flink---狀態(tài)查詢 Beta

flink 1.7

注意:可查詢狀態(tài)的客戶端API當(dāng)前處于不斷發(fā)展的狀態(tài)努咐,并且不保證所提供接口的穩(wěn)定性。在即將推出的Flink版本中,客戶端可能會發(fā)生重大的API更改。
簡而言之超营,此功能將Flink的托管鍵控(分區(qū))狀態(tài)(請參閱使用狀態(tài))暴露給外部世界,并允許用戶從Flink外部查詢作業(yè)的狀態(tài)阅虫。對于某些情況演闭,可查詢狀態(tài)消除了對外部系統(tǒng)(例如鍵值存儲)的分布式操作/事務(wù)的需要,這通常是實(shí)踐中的瓶頸颓帝。此外米碰,此功能對于調(diào)試目的可能特別有用
注意:查詢狀態(tài)對象時,無需任何同步或復(fù)制即可從并發(fā)線程訪問該對象购城。這是一種設(shè)計(jì)選擇吕座,因?yàn)樯鲜鋈魏我环N都會導(dǎo)致增加的作業(yè)延遲,我們希望避免這種情況工猜。由于任何狀態(tài)后端使用Java堆空間米诉,例如MemoryStateBackend或FsStateBackend在檢索值時不能與副本一起使用,而是直接引用存儲的值篷帅,讀取 - 修改 - 寫入模式是不安全的史侣,并且可能導(dǎo)致可查詢狀態(tài)服務(wù)器由于并發(fā)修改而失敗。 RocksDBStateBackend可以避免這些問題魏身。

架構(gòu)


在展示使用狀態(tài)查詢前惊橱,我們先說明狀態(tài)查詢的實(shí)體組成部分,這是時分必要的箭昵。狀態(tài)查詢由 3部分實(shí)體組成

  1. QueryableStateClient税朴,運(yùn)行在flink集群之外,提交用戶查詢
  2. QueryableStateClientProxy家制, 運(yùn)行在每個TaskManager中(即FLink集群中)正林,它負(fù)責(zé)接收client的查詢,代表client向TaskManager拉取狀態(tài)信息颤殴,并將結(jié)果返回client
  3. QueryableStateServer 觅廓, 它運(yùn)行在TaskManager,負(fù)責(zé)服務(wù)本地狀態(tài)存儲

激活狀態(tài)查詢


在Flink集群開啟狀態(tài)查詢涵但,你只需將flink-queryable-state-runtime_2.11-1.7.1.jar從Flink的opt文件夾復(fù)制到lib文件夾杈绸。否則帖蔓,狀態(tài)查詢時不可用的。
通過檢查task manager日志中是否有"Started the Queryable State Proxy Server @ ..."判斷狀態(tài)查詢是否開啟成功

使?fàn)顟B(tài)查詢可見


你已經(jīng)成功激活狀態(tài)查詢瞳脓,在使用之前塑娇,為使?fàn)顟B(tài)對外可見,需要明確一下兩點(diǎn):

  • QueryableStateStream劫侧, 一個便利對象埋酬,充當(dāng)接收器并將其傳入值作為可查詢狀態(tài)提供
  • stateDescriptor.setQueryable(String queryableStateName)方法,使得由狀態(tài)描述符表示的鍵控狀態(tài)是可查詢的

以下介紹如何使用這兩點(diǎn)

可查詢狀態(tài)流

在KeyedStream上調(diào)用.asQueryableState(stateName板辽,stateDescriptor)會返回一個QueryableStateStream奇瘦,它將其值提供為可查詢狀態(tài)。根據(jù)狀態(tài)的類型劲弦,asQueryableState()方法有以下變體:

// ValueState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ValueStateDescriptor stateDescriptor)

// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)

// FoldingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    FoldingStateDescriptor stateDescriptor)

// ReducingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor stateDescriptor)

沒有可查詢的ListState接收器耳标,因?yàn)樗鼤?dǎo)致不斷增長的列表,這些列表可能無法清理邑跪,因此最終會消耗太多內(nèi)存

返回的QueryableStateStream可以看作是接收器次坡,無法進(jìn)一步轉(zhuǎn)換。在內(nèi)部画畅,QueryableStateStream被轉(zhuǎn)換為運(yùn)算符砸琅,該運(yùn)算符使用所有傳入記錄來更新可查詢狀態(tài)實(shí)例。更新邏輯由asQueryableState調(diào)用中提供的StateDescriptor的類型暗示轴踱。在如下所示的程序中症脂,鍵控流的所有記錄將用于通過ValueState.update(value)更新狀態(tài)實(shí)例

stream.keyBy(0).asQueryableState("query-name")

管理監(jiān)控狀態(tài)

通過StateDescriptor.setQueryable(String queryableStateName)查詢適當(dāng)?shù)臓顟B(tài)描述符,可以使運(yùn)算符的托管鍵控狀態(tài)(請參閱使用托管鍵控狀態(tài))可查詢淫僻,如下例所示

ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name

queryableStateName參數(shù)可以任意選擇诱篷,僅用于查詢。它不必與state自己的名字相同雳灵。

該變體對于哪種類型的狀態(tài)可以被查詢沒有限制棕所。這意味著它可以用于任何ValueState,ReduceState悯辙,ListState琳省,MapState,AggregatingState和當(dāng)前不推薦使用的FoldingState躲撰。

狀態(tài)查詢


現(xiàn)在针贬,你已經(jīng)設(shè)置集群為可查詢狀態(tài)并聲明了可查詢的狀態(tài)。是時間去了解如何查詢了拢蛋。
狀態(tài)查詢需要用到QueryableStateClient 幫助類坚踩,它位于flink-queryable-state-client jar包中,你需要顯示的在pom中聲明引用瓤狐,并且它與flink core是相互獨(dú)立的瞬铸,如下

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.7.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-queryable-state-client-java_2.11</artifactId>
  <version>1.7.1</version>
</dependency>

你可以閱讀配置Flink程序來了解更多的項(xiàng)目配置。
QueryableStateClient 向內(nèi)部代理發(fā)送查詢請求础锐,代理對象處理查詢請求并將結(jié)果返回嗓节。
client唯一需要初始化的是提供一個合法的的TaskManager 主機(jī)名和代理對象監(jiān)聽的端口號(記著呻粹,TaskManager運(yùn)行著狀態(tài)可查詢的代理)灭美。更多的代理配置及服務(wù)端口號請查看Configuration Section

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

在客戶端準(zhǔn)備好的情況下,要查詢與類型K的鍵關(guān)聯(lián)的類型V的狀態(tài)垮刹,您可以使用該方法

CompletableFuture<S> getKvState(
    JobID jobId,
    String queryableStateName,
    K key,
    TypeInformation<K> keyTypeInfo,
    StateDescriptor<S, V> stateDescriptor)

上面返回一個CompletableFuture信姓,最終保存由具有ID jobID的作業(yè)的queryableStateName標(biāo)識的可查詢狀態(tài)實(shí)例的狀態(tài)值鸵隧。key是你所感興趣的狀態(tài)值,keyTypeInfo 會告知Flink序列化它的方法意推。最后豆瘫,stateDescriptor包含有關(guān)請求狀態(tài)的必要信息,即其類型(Value菊值,Reduce等)以及有關(guān)如何序列化/反序列化它的必要信息外驱。
細(xì)心的讀者會 注意到返回的future包含一個S類值,即一個包含實(shí)際值的State對象腻窒。這可以是Flink支持的任何狀態(tài)類型:ValueState昵宇,ReduceState,ListState儿子,MapState瓦哎,AggregatingState和當(dāng)前不推薦使用的FoldingState。
這些狀態(tài)對象不允許修改包含的狀態(tài)柔逼。您可以使用它們來獲取狀態(tài)的實(shí)際值蒋譬,例如使用valueState.get(),或迭代所包含的<K卒落,V>條目羡铲,例如使用mapState.entries(),但您無法修改它們儡毕。例如也切,在返回的列表狀態(tài)上調(diào)用add()方法將拋出UnsupportedOperationException

客戶端是異步的,可以由多個線程共享腰湾。它需要在未使用時通過QueryableStateClient.shutdown()關(guān)閉以釋放資源雷恃。

例子

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}

在作業(yè)中使用后,您可以檢索作業(yè)ID费坊,然后從該運(yùn)算符查詢?nèi)魏捂I的當(dāng)前狀態(tài)

// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
          "average",
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
        client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

// now handle the returned value
resultFuture.thenAccept(response -> {
        try {
            Tuple2<Long, Long> res = response.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
});

配置


QueryableStateOptions 定義了影響狀態(tài)查詢服務(wù)和客戶端行為的配置參數(shù)

服務(wù)狀態(tài)

  • query.server.ports: 可查詢狀態(tài)服務(wù)器的服務(wù)器端口范圍倒槐。如果有多個任務(wù)管理器在同一臺機(jī)器上運(yùn)行,這對于避免端口沖突很有用附井。指定的范圍可以是:端口:“9123”讨越,一系列端口:“50100-50200”两残,或范圍和/或點(diǎn)列表:“50100-50200,50300-50400,51234”。默認(rèn)端口為9067把跨。
  • query.server.network-threads: 接收狀態(tài)服務(wù)器傳入請求的網(wǎng)絡(luò)(事件循環(huán))線程數(shù)(0 => #slots)
  • query.server.query-threads: 處理/服務(wù)狀態(tài)服務(wù)器的傳入請求的線程數(shù)(0 => #slots)

代理

  • query.proxy.ports: 可查詢狀態(tài)代理的服務(wù)器端口范圍人弓。如果有多個任務(wù)管理器在同一臺機(jī)器上運(yùn)行,這對于避免端口沖突很有用着逐。指定的范圍可以是:端口:“9123”崔赌,一系列端口:“50100-50200”,或范圍和/或點(diǎn)列表:“50100-50200,50300-50400,51234”耸别。默認(rèn)端口為9069健芭。
  • query.proxy.network-threads: 接收客戶端代理的傳入請求的網(wǎng)絡(luò)(事件循環(huán))線程數(shù)(0 => #slots)
  • query.proxy.query-threads: 處理/服務(wù)客戶端代理的傳入請求的線程數(shù)(0 => #slots)

限制


  • 可查詢狀態(tài)生命周期與作業(yè)的生命周期綁定,例如秀姐,任務(wù)在啟動時注冊可查詢狀態(tài)慈迈,并在處理時取消注冊。在將來的版本中囊扳,需要將其解耦以便在任務(wù)完成后允許查詢吩翻,并通過狀態(tài)復(fù)制加速恢復(fù)
  • 關(guān)于可用KvState的通知是通過一個簡單的告訴發(fā)生的。在未來锥咸,應(yīng)該通過詢問和確認(rèn)來改進(jìn)這一點(diǎn)
  • 服務(wù)器和客戶端會跟蹤查詢的統(tǒng)計(jì)信息狭瞎。默認(rèn)情況下,這些功能目前處于禁用狀態(tài)搏予,因?yàn)樗麄儾荒茉偃魏蔚胤綄ν獗┞缎芏В灰懈玫闹С滞ㄟ^Metrics系統(tǒng)發(fā)布這些數(shù)字,我們就應(yīng)該啟用統(tǒng)計(jì)數(shù)據(jù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末雪侥,一起剝皮案震驚了整個濱河市碗殷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌速缨,老刑警劉巖锌妻,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異旬牲,居然都是意外死亡仿粹,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門原茅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吭历,“玉大人,你說我怎么就攤上這事擂橘∩吻” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長朗若。 經(jīng)常有香客問我恼五,道長,這世上最難降的妖魔是什么捡偏? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任唤冈,我火速辦了婚禮,結(jié)果婚禮上银伟,老公的妹妹穿的比我還像新娘。我一直安慰自己绘搞,他們只是感情好彤避,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著夯辖,像睡著了一般琉预。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蒿褂,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天圆米,我揣著相機(jī)與錄音,去河邊找鬼啄栓。 笑死娄帖,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的昙楚。 我是一名探鬼主播近速,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼堪旧!你這毒婦竟也來了削葱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤淳梦,失蹤者是張志新(化名)和其女友劉穎析砸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體爆袍,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡首繁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了螃宙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛮瞄。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖谆扎,靈堂內(nèi)的尸體忽然破棺而出挂捅,到底是詐尸還是另有隱情,我是刑警寧澤堂湖,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布闲先,位于F島的核電站状土,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏伺糠。R本人自食惡果不足惜蒙谓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望训桶。 院中可真熱鬧累驮,春花似錦、人聲如沸舵揭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽午绳。三九已至置侍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拦焚,已是汗流浹背蜡坊。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赎败,地道東北人秕衙。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像螟够,于是被迫代替她去往敵國和親灾梦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容